# -*- coding: utf-8 -*-
"""
@author: jeremy leconte
"""
import os.path
from math import log10
import pickle
import h5py
import numpy as np
import astropy.units as u
from .data_table import Data_table
from .util.interp import rebin_ind_weights, rm_molec
from .util.cst import KBOLTZ
from .hires_spectrum import Hires_spectrum
from .util.filenames import create_fname_grid_Kspectrum_LMDZ, select_kwargs
[docs]
class Xtable(Data_table):
"""A class that handles tables of cross sections.
"""
def __init__(self, *filename_filters, filename=None,
p_unit='unspecified', file_p_unit='unspecified',
kdata_unit='unspecified', file_kdata_unit='unspecified',
remove_zeros=False, search_path=None, mol=None, **kwargs):
"""Initializes cross section table and
supporting data from a file based on its extension.
Parameters
----------
filename: str
Relative or absolute path to the input file.
filename_filters: sequence of string
As many strings as necessary to uniquely define
a file in the global search path defined in
:class:`~exo_k.settings.Settings`.
This path will be searched for a file
with all the filename_filters in the name.
The filename_filters can contain '*'.
If there is no filename or filename_filters provided,
just creates an empty object to be filled later
See :class:`~exo_k.ktable.Ktable` __init__ mehthod for documentation on
`p_unit`, `file_p_unit`, `kdata_unit`, `file_kdata_unit`, `remove_zeros`,
`search_path`, and `mol` keywords.
"""
super().__init__()
if filename is not None:
self.filename=filename
elif filename_filters or mol is not None:
self.filename=self._settings.list_files(*filename_filters, molecule = mol,
only_one=True, search_path=search_path, path_type='xtable')[0]
if self.filename is not None:
if self.filename.lower().endswith('pickle'):
self.read_pickle(filename=self.filename)
elif self.filename.lower().endswith(('.hdf5', '.h5')):
self.read_hdf5(filename=self.filename, mol=mol, **kwargs)
elif self.filename.lower().endswith(('.dat')):
self.read_exo_transmit(filename=self.filename, mol=mol)
else:
raise NotImplementedError("""Requested format not recognized.
Currently recognized formats are Exomol .pickle, .hdf5, and Exo_Transmit .dat.""")
super().finalize_init(p_unit=p_unit, file_p_unit=file_p_unit,
kdata_unit=kdata_unit, file_kdata_unit=file_kdata_unit,
remove_zeros=remove_zeros)
[docs]
def read_hdf5(self, filename=None, mol=None, wn_range=None, wl_range=None):
"""Initializes k coeff table and supporting data from an Exomol hdf5 file
Parameters
----------
filename : str
Name of the input hdf5 file
mol : str, optional
Overrides the name of the molecule to be put in the :class:`Xtable` object.
"""
if (filename is None or not filename.lower().endswith(('.hdf5', '.h5'))):
raise RuntimeError("You should provide an input hdf5 file")
with h5py.File(filename, 'r') as f:
if 'mol_name' in f:
self.mol=f['mol_name'][()]
elif 'mol_name' in f.attrs:
self.mol=f.attrs['mol_name']
else:
if mol is not None:
self.mol=mol
else:
self.mol=os.path.basename(filename).split(self._settings._delimiter)[0]
if isinstance(self.mol, np.ndarray): self.mol=self.mol[0]
if isinstance(self.mol, bytes): self.mol=self.mol.decode('UTF-8')
if 'DOI' in f:
self.DOI=f['DOI'][()][0]
if isinstance(self.DOI, bytes): self.DOI=self.DOI.decode('UTF-8')
if 'bin_edges' in f:
self.wns=f['bin_edges'][...]
if 'units' in f['bin_edges'].attrs:
self.wn_unit=f['bin_edges'].attrs['units']
else:
self.wns=f['bin_centers'][...]
if 'units' in f['bin_centers'].attrs:
self.wn_unit=f['bin_centers'].attrs['units']
self.wnedges=np.concatenate( \
([self.wns[0]],(self.wns[:-1]+self.wns[1:])*0.5,[self.wns[-1]]))
iw_min, iw_max = self.select_spectral_range(wn_range, wl_range)
self.kdata=f['xsecarr'][:,:, iw_min:iw_max]
self.kdata_unit=f['xsecarr'].attrs['units']
self.tgrid=f['t'][...]
self.pgrid=f['p'][...]
self.logpgrid=np.log10(self.pgrid)
self.p_unit=f['p'].attrs['units']
self.logk=False
self.Np,self.Nt,self.Nw=self.kdata.shape
[docs]
def write_hdf5(self, filename, compression="gzip", compression_level=9,
kdata_unit=None, p_unit=None, exomol_units=False):
"""Saves data in a hdf5 format
Parameters
----------
filename: str
Name of the file to be created and saved
exomol_units: bool (optional)
If True, data are converted back to
cm^2 and bar units before being written.
"""
fullfilename=filename
if not filename.lower().endswith(('.hdf5', '.h5')):
fullfilename=filename+'.h5'
folder = os.path.dirname(fullfilename)
os.makedirs(folder, exist_ok=True)
with h5py.File(fullfilename, 'w') as f:
f.create_dataset("key_iso_ll", (1,), data=self.isotopolog_id)
f.create_dataset("mol_mass", (1,), data=self.molar_mass*1000.)
f["mol_mass"].attrs["units"] = 'AMU'
if exomol_units:
kdata_unit='cm^2/molecule'
p_unit='bar'
if kdata_unit is not None:
conv_factor=u.Unit(rm_molec(self.kdata_unit)).to(u.Unit(rm_molec(kdata_unit)))
data_to_write=self.kdata*conv_factor
f.create_dataset("xsecarr", data=data_to_write, compression=compression,
compression_opts=compression_level)
f["xsecarr"].attrs["units"] = kdata_unit
else:
f.create_dataset("xsecarr", data=self.kdata, compression=compression,
compression_opts=compression_level)
f["xsecarr"].attrs["units"] = self.kdata_unit
f.create_dataset("bin_edges", data=self.wns,
compression=compression, compression_opts=compression_level)
# where most of the data is actually written
self.write_hdf5_common(f, compression=compression, compression_level=compression_level,
p_unit=p_unit)
f["bin_edges"].attrs["long_name"] = 'Wavenumber grid'
[docs]
def read_exo_transmit(self, filename, mol=None):
"""Creates an xsec object from an exo_transmit like spectra.
See https://github.com/elizakempton/Exo_Transmit or Kempton et al. (2016) for details.
Pressures are expected to be in Pa and cross sections in m^2/molecule
Parameters
----------
filename : str
Name of the input file.
mol: str
Overrides the name of the molecule to be put in the :class:`Xtable` object.
"""
tmp_wlgrid=[]
tmp_kdata=[]
with open(filename, 'r') as file:
tmp = file.readline().split()
self.tgrid=np.array([float(ii) for ii in tmp])
tmp = file.readline().split()
self.pgrid=np.array([float(ii) for ii in tmp])
self.logpgrid=np.log10(self.pgrid)
self.Np=self.pgrid.size
self.Nt=self.tgrid.size
while True:
line=file.readline()
if line is None or line=='': break
tmp_wlgrid.append(float(line.split()[0]))
tmp_kdata.append([])
for _ in range(self.Np):
tmp_kdata[-1].append(file.readline().split()[1:])
tmp_wlgrid=np.array(tmp_wlgrid)
self.Nw=tmp_wlgrid.size
self.kdata=np.zeros((self.Np,self.Nt,self.Nw))
for iP in range(self.Np):
for iT in range(self.Nt):
for iW in range(self.Nw):
self.kdata[iP,iT,iW]=tmp_kdata[self.Nw-iW-1][iP][iT]
self.wns=0.01/tmp_wlgrid[::-1]
self.wnedges=np.concatenate( \
([self.wns[0]],(self.wns[:-1]+self.wns[1:])*0.5,[self.wns[-1]]))
self.p_unit='Pa'
self.kdata_unit='m^2/molecule'
if mol is not None:
self.mol=mol
else:
self.mol=os.path.basename(filename).split(self._settings._delimiter)[0]
[docs]
def hires_to_xtable(self, path=None, filename_grid=None,
logpgrid=None, tgrid=None,
write=0, mol=None,
grid_p_unit='Pa', p_unit='unspecified',
kdata_unit='unspecified', file_kdata_unit='unspecified',
**kwargs):
"""Loads an `Xtable` from high-resolution spectra (inplace).
.. warning::
By default, log pressures are specified in Pa in logpgrid!!! If you want
to use another unit, do not forget to specify it with the grid_p_unit keyword.
see :func:`exo_k.ktable.Ktable.hires_to_ktable` method for details
on the arguments and options.
"""
first=True
if path is None: raise TypeError("You should provide an input hires_spectrum directory")
self.filename=path
if mol is not None:
self.mol=mol
else:
self.mol=os.path.basename(self.filename).split(self._settings._delimiter)[0]
conversion_factor=u.Unit(grid_p_unit).to(u.Unit('Pa'))
self.logpgrid=np.array(logpgrid, dtype=float)+np.log10(conversion_factor)
self.pgrid=10**self.logpgrid #in Pa
self.p_unit='Pa'
self.Np=self.logpgrid.size
if write >= 3 : print(self.Np,self.pgrid)
self.tgrid=np.array(tgrid)
self.Nt=self.tgrid.size
if write >= 3 : print(self.Nt,self.tgrid)
if filename_grid is None:
filename_grid=create_fname_grid_Kspectrum_LMDZ(self.Np, self.Nt,
**select_kwargs(kwargs,['suffix','nb_digit']))
else:
filename_grid=np.array(filename_grid)
for iP in range(self.Np):
for iT in range(self.Nt):
filename=filename_grid[iP,iT]
fname=os.path.join(path,filename)
if write >= 3 : print(fname)
spec_hr=Hires_spectrum(fname, file_kdata_unit=file_kdata_unit,
**select_kwargs(kwargs,['skiprows','wn_column','mult_factor',
'kdata_column','data_type','binary','mass_amu','wns']))
# for later conversion, the real kdata_unit is in spec_hr.kdata_unit
self.kdata_unit=spec_hr.kdata_unit
was_xsec=(spec_hr.data_type=='xsec')
wn_hr=spec_hr.wns
k_hr=spec_hr.kdata
if not was_xsec:
k_hr=k_hr*KBOLTZ*self.tgrid[iT]/self.pgrid[iP]
if first:
self.wns=wn_hr[1:-1] #to be consistent with kcorr
self.wnedges=0.5*(wn_hr[:-1]+wn_hr[1:])
self.Nw=self.wns.size
self.kdata=np.zeros((self.Np,self.Nt,self.Nw))
self.kdata[iP,iT]=k_hr[1:-1]
first=False
else:
self.kdata[iP,iT]=np.interp(self.wns,wn_hr[1:-1],k_hr[1:-1])
if not was_xsec:
self.kdata=self.kdata*u.Unit(self.kdata_unit).to(u.Unit('m^-1'))
self.kdata_unit='m^2/molecule'
# Accounts for the conversion of the abs_coeff to m^-1, so we know that
# self.kdata is now in m^2/molecule. Can now convert to the desired unit.
if self._settings._convert_to_mks and kdata_unit == 'unspecified':
kdata_unit='m^2/molecule'
self.convert_kdata_unit(kdata_unit=kdata_unit)
# converts from self.kdata_unit wich is either:
# - 'm^2/molecule' data_type was 'abs_coeff'
# - The units after Hires_spectrum (which have already taken
# file_kdata_unit into account)
# to the desired unit.
self.convert_p_unit(p_unit=p_unit)
[docs]
def bin_down(self, wnedges=None, remove_zeros=False, write=0):
"""Method to bin down a xsec table to a new grid of wavenumbers (inplace).
Parameters
----------
wnedges : array, np.ndarray
Edges of the new bins of wavenumbers (cm-1) onto which the xsec
should be binned down.
if you want Nwnew bin in the end, wngrid.size must be Nwnew+1
wnedges[0] should be greater than self.wnedges[0]
wnedges[-1] should be lower than self.wnedges[-1]
remove_zeros: bool, optional
If True, remove zeros in kdata.
"""
wnedges=np.array(wnedges)
if wnedges.size<2: raise TypeError('wnedges should at least have two values')
wngrid_filter = np.where((wnedges <= self.wnedges[-1]) & (wnedges >= self.wnedges[0]))[0]
if write>=3:
print(self.wnedges);print(wnedges);print(wngrid_filter);print(wnedges[wngrid_filter])
indicestosum,weights=rebin_ind_weights(self.wnedges, wnedges[wngrid_filter])
if write>=3: print(indicestosum[0]);print(weights[0])
Nnew=wnedges.size-1
Ntmp=wnedges[wngrid_filter].size-1
newxsec=np.zeros((self.Np,self.Nt,Nnew))
for iP in range(self.Np):
for iT in range(self.Nt):
tmp=self.kdata[iP,iT,:]
newxsec[iP,iT,wngrid_filter[0:-1]]= \
[np.dot(tmp[indicestosum[ii]-1:indicestosum[ii+1]],weights[ii]) \
for ii in range(Ntmp)]
self.kdata=newxsec
self.wnedges=wnedges
self.wns=(wnedges[1:]+wnedges[:-1])*0.5
self.Nw=self.wns.size
if remove_zeros : self.remove_zeros(deltalog_min_value=10.)
[docs]
def sample(self, wngrid, remove_zeros=False, log_interp=None):
"""Method to re sample a xsec table to a new grid of wavenumbers (inplace).
Parameters
----------
wngrid : array, np.ndarray
Location of the new wavenumbers points (cm-1)
"""
wngrid=np.array(wngrid)
wngrid_filter = np.where((wngrid <= self.wnedges[-1]) & (wngrid >= self.wnedges[0]))[0]
Nnew=wngrid.size
newxsec=np.zeros((self.Np,self.Nt,Nnew))
if log_interp is None: log_interp=self._settings._log_interp
if log_interp:
for iP in range(self.Np):
for iT in range(self.Nt):
tmp=np.log(self.kdata[iP,iT,:])
newxsec[iP,iT,wngrid_filter]=np.interp(wngrid[wngrid_filter],self.wns,tmp)
self.kdata=np.exp(newxsec)
else:
for iP in range(self.Np):
for iT in range(self.Nt):
tmp=self.kdata[iP,iT,:]
newxsec[iP,iT,wngrid_filter]=np.interp(wngrid[wngrid_filter],self.wns,tmp)
self.kdata=newxsec
self.wns=wngrid
self.wnedges=np.concatenate( \
([self.wns[0]],0.5*(self.wns[1:]+self.wns[:-1]),[self.wns[-1]]))
self.Nw=Nnew
if remove_zeros : self.remove_zeros(deltalog_min_value=10.)
[docs]
def sample_cp(self, wngrid, **kwargs):
"""Creates a copy of the instance before resampling it.
Parameters
----------
See sample method for details.
Returns
-------
:class:`Xtable` object
the re-sampled :class:`Xtable`
"""
res=self.copy()
res.sample(wngrid, **kwargs)
return res
[docs]
def spectrum_to_plot(self, p=1.e-5, t=200., x=1., g=None):
"""provide the spectrum for a given point to be plotted
Parameters
----------
p : float
Pressure (Ktable pressure unit)
t : float
Temperature(K)
x: float
Volume mixing ratio of the species
g: is unused but here to be consistent with the method in data_table
"""
return self.interpolate_kdata(log10(p),t)[0]*x
[docs]
def copy(self, cp_kdata=True):
"""Creates a new instance of :class:`Xtable` object and (deep) copies data into it
Parameters
----------
cp_kdata: bool, optional
If false, the kdata table is not copied and
only the structure and metadata are.
Returns
-------
:class:`Xtable`
A new :class:`Xtable` instance with the same structure as self.
"""
res=Xtable()
res.copy_attr(self, cp_kdata=cp_kdata)
return res
def __repr__(self):
"""Method to output header
"""
out1=super().__repr__()
output=out1+"""
data oredered following p, t, wn
shape : {shape}
""".format(shape=self.shape)
return output
[docs]
def read_pickle(self, filename=None):
"""Initializes xsec table and supporting data from an Exomol pickle file
Parameters
----------
filename : str
Relative or absolute name of the input pickle file
mol : str, optional
Overrides the name of the molecule to be put in the :class:`Xtable` object.
"""
if filename is None: raise TypeError("You should provide an input pickle filename")
with open(filename,'rb') as pickle_file:
raw=pickle.load(pickle_file, encoding='latin1')
self.mol=raw['name']
if self.mol=='H2OP': self.mol='H2O'
self.tgrid=raw['t']
self.wns=raw['wno']
self.wnedges=np.concatenate( \
([self.wns[0]],(self.wns[:-1]+self.wns[1:])*0.5,[self.wns[-1]]))
self.logk=False
#deals with the p grid and units
if 'p_unit' in raw.keys():
self.p_unit=raw['p_unit']
else:
self.p_unit='bar'
self.pgrid=raw['p']
self.logpgrid=np.log10(self.pgrid)
#deals with the k grid and units
if 'kdata_unit' in raw.keys():
self.kdata_unit=raw['kdata_unit']
else:
self.kdata_unit='cm^2/molec'
self.kdata=raw['xsecarr']
if 'wn_unit' in raw.keys(): self.wn_unit=raw['wn_unit']
self.Np,self.Nt,self.Nw=self.kdata.shape
[docs]
def write_pickle(self, filename):
"""Saves data in a pickle format
Parameters
----------
filename: str
Relative or absolute name of the file to be created and saved
"""
fullfilename=filename
if not filename.lower().endswith('.pickle'): fullfilename=filename+'.pickle'
with open(fullfilename,'wb') as pickle_file:
dictout={'name':self.mol,
'p':self.pgrid,
'p_unit':self.p_unit,
't':self.tgrid,
'wno':self.wns,
'wn_unit':self.wn_unit,
'xsecarr':self.kdata,
'kdata_unit':self.kdata_unit}
#print(dictout)
pickle.dump(dictout,pickle_file,protocol=-1)