# -*- coding: utf-8 -*-
"""
@author: jeremy leconte
"""
from math import log10
import os
import h5py
import numpy as np
import astropy.units as u
from scipy.interpolate import RegularGridInterpolator
from .data_table import Data_table
from .util.interp import rm_molec, rebin_ind_weights, \
gauss_legendre, split_gauss_legendre, spectrum_to_kdist, \
is_sorted, bin_down_corrk_numba, g_sample_5d
from .util.cst import KBOLTZ
from .hires_spectrum import Hires_spectrum
from .util.filenames import create_fname_grid_Kspectrum_LMDZ, select_kwargs
[docs]
class Ktable5d(Data_table):
"""A class that handles tables of k-coefficients with a variable gas.
Based on the Data_table class that handles basic operations common to Ktable and Xtable.
This class is specifically designed to deal with
LMDZ type ktable where there is a variable gas.
"""
def __init__(self, *filename_filters, filename=None, path=None,
p_unit='unspecified', file_p_unit='unspecified',
kdata_unit='unspecified', file_kdata_unit='unspecified',
remove_zeros=False, search_path=None, mol=None,
**kwargs):
"""Initializes k coeff table with variable gas and
supporting data from various sources (see below by order of precedence)
Parameters
----------
filename: str (optional)
Relative or absolute name of the file to be loaded.
path: str
If none of the above is specifed,
path can point to a directory with a LMDZ type k coeff table.
In this case, see read_LMDZ for the keywords to specify.
If there is no input, just creates an empty object to be filled later
See :class:`~exo_k.ktable.Ktable` __init__ mehthod for documentation on
`p_unit`, `file_p_unit`, `kdata_unit`, `file_kdata_unit`, `remove_zeros`,
`search_path`, and `mol` arguments.
"""
super().__init__()
if filename is not None:
self.filename=filename
elif filename_filters or (mol is not None and path is None):
# a none empty sequence returns a True in a conditional statement
self.filename=self._settings.list_files(*filename_filters, molecule = mol,
only_one=True, search_path=search_path, path_type='ktable')[0]
if self.filename is not None:
if self.filename.lower().endswith(('.hdf5', '.h5')):
self.read_hdf5(filename=self.filename, mol=mol)
else:
raise NotImplementedError( \
'Requested format not recognized. Should end with .hdf5 or .h5')
elif path is not None:
self.read_LMDZ(path=path, mol=mol, **kwargs)
else: #if there is no input file, just create an empty object
self.wnedges=None
self.weights=None
self.ggrid=None
self.gedges=None
self.xgrid=None
self._finterp_kdata=None
super().finalize_init(p_unit=p_unit, file_p_unit=file_p_unit,
kdata_unit=kdata_unit, file_kdata_unit=file_kdata_unit,
remove_zeros=remove_zeros)
if self.kdata is not None:
self.setup_interpolation()
[docs]
def read_hdf5(self, filename=None, mol=None, wn_range=None, wl_range=None):
"""Initializes k coeff table and supporting data from an Exomol hdf5 file
Parameters
----------
filename : str
Name of the input hdf5 file
"""
if (filename is None or not filename.lower().endswith(('.hdf5', '.h5'))):
raise RuntimeError("You should provide an input hdf5 file")
with h5py.File(filename, 'r') as f:
if 'mol_name' in f:
self.mol=f['mol_name'][()]
elif 'mol_name' in f.attrs:
self.mol=f.attrs['mol_name']
else:
if mol is not None:
self.mol=mol
else:
self.mol=os.path.basename(filename).split(self._settings._delimiter)[0]
if isinstance(self.mol, np.ndarray): self.mol=self.mol[0]
if isinstance(self.mol, bytes): self.mol=self.mol.decode('UTF-8')
if 'method' in f:
self.sampling_method=f['method'][()][0]
if isinstance(self.sampling_method, bytes):
self.sampling_method=self.sampling_method.decode('UTF-8')
if 'DOI' in f:
self.DOI=f['DOI'][()][0]
if isinstance(self.DOI, bytes): self.DOI=self.DOI.decode('UTF-8')
self.wns=f['bin_centers'][...]
self.wnedges=f['bin_edges'][...]
iw_min, iw_max = self.select_spectral_range(wn_range, wl_range)
if 'units' in f['bin_edges'].attrs:
self.wn_unit=f['bin_edges'].attrs['units']
else:
if 'units' in f['bin_centers'].attrs:
self.wn_unit=f['bin_centers'].attrs['units']
self.kdata=f['kcoeff'][:,:, iw_min:iw_max]
self.kdata_unit=f['kcoeff'].attrs['units']
self.tgrid=f['t'][...]
self.pgrid=f['p'][...]
self.logpgrid=np.log10(self.pgrid)
self.p_unit=f['p'].attrs['units']
self.xgrid=f['x'][...]
if 'weights' in f.keys():
self.weights=f['weights'][...]
else:
raise RuntimeError("""No weights keyword.
This file is probably a cross section file.""")
self.ggrid=f['samples'][...]
self.gedges=np.insert(np.cumsum(self.weights),0,0.)
self.logk=False
f.close()
self.Np,self.Nt,self.Nx,self.Nw,self.Ng=self.kdata.shape
[docs]
def write_hdf5(self, filename, compression="gzip", compression_level=9,
kdata_unit=None, p_unit=None):
"""Saves data in a hdf5 format
Parameters
----------
filename: str
Name of the file to be created and saved
"""
dt = h5py.string_dtype(encoding='utf-8')
fullfilename=filename
if not filename.lower().endswith(('.hdf5', '.h5')):
fullfilename=filename+'.h5'
with h5py.File(fullfilename, 'w') as f:
f.create_dataset("x", data=self.xgrid, compression=compression,
compression_opts=compression_level)
f["x"].attrs["units"] = 'dimensionless'
if kdata_unit is not None:
conv_factor=u.Unit(rm_molec(self.kdata_unit)).to(u.Unit(rm_molec(kdata_unit)))
data_to_write=self.kdata*conv_factor
f.create_dataset("kcoeff", data=data_to_write,
compression=compression, compression_opts=compression_level)
f["kcoeff"].attrs["units"] = kdata_unit
else:
f.create_dataset("kcoeff", data=self.kdata,
compression=compression, compression_opts=compression_level)
f["kcoeff"].attrs["units"] = self.kdata_unit
f.create_dataset("method", (1,), data=self.sampling_method, dtype=dt)
f.create_dataset("samples", data=self.ggrid,
compression=compression, compression_opts=compression_level)
f.create_dataset("weights", data=self.weights,
compression=compression, compression_opts=compression_level)
f.create_dataset("ngauss", data=self.Ng)
f.create_dataset("bin_centers", data=self.wns,
compression=compression, compression_opts=compression_level)
f.create_dataset("bin_edges", data=self.wnedges,
compression=compression, compression_opts=compression_level)
# where most of the data is actually written
self.write_hdf5_common(f, compression=compression, compression_level=compression_level,
p_unit=p_unit)
[docs]
def read_LMDZ(self, path=None, res=None, band=None, mol=None):
"""Initializes k coeff table and supporting data from a .dat file
in a gcm friendly format.
Units are assumed to be cm^2 for kdata and mbar for pressure.
Parameters
----------
path: str
Name of the directory with the various input files
res: str
"IRxVI" where IR and VI are the numbers of bands
in the infrared and visible of the k table to load.
band: str
"IR" or "VI" to specify which band to load.
"""
if (path is None) or (res is None): \
raise TypeError("You should provide an input directory name and a resolution")
self.filename=path
self.weights=np.loadtxt(os.path.join(path,'g.dat'),skiprows=1)[:-1]
# we remove the last point that is always zero.
# in the gcm this last point is intended to take care of continuum
self.Ng=self.weights.size
self.gedges=np.insert(np.cumsum(self.weights),0,0.)
self.ggrid=(self.gedges[1:]+self.gedges[:-1])*0.5
self.p_unit='mbar'
self.logpgrid=np.loadtxt(os.path.join(path,'p.dat'),skiprows=1)*1.
self.Np=self.logpgrid.size
self.pgrid=10**self.logpgrid
self.tgrid=np.loadtxt(os.path.join(path,'T.dat'),skiprows=1)
self.Nt=self.tgrid.size
_, self.mol, self.Nx, self.xgrid = read_Qdat(os.path.join(path,'Q.dat'))
if mol is not None: self.mol=mol
if band is None:
raw=np.loadtxt(os.path.join(path,res,'narrowbands.in'), skiprows=1, unpack=True)
else:
raw=np.loadtxt(os.path.join(path,res,'narrowbands_'+band+'.in'), \
skiprows=1, unpack=True)
self.wnedges=np.append(raw[0],raw[1,-1])
self.wns=(self.wnedges[1:]+self.wnedges[:-1])*0.5
self.Nw=self.wns.size
self.kdata_unit='cm^2/molecule'
if band is None:
file_to_load=os.path.join(path,res,'corrk_gcm.dat')
else:
file_to_load=os.path.join(path,res,'corrk_gcm_'+band+'.dat')
tmp=np.loadtxt(file_to_load) \
.reshape((self.Nt,self.Np,self.Nx,self.Nw,self.Ng+1),order='F')
self.kdata=tmp[:,:,:,:,:-1].transpose((1,0,2,3,4))
# also removing the last g point which is equal to 0.
self.logk=False
return None
[docs]
def write_LMDZ(self, path, band='IR', fmt='%22.15e', write_only_metadata=False):
"""Saves data in a LMDZ friendly format.
The gcm requires p in mbar and kdata in cm^2/molec.
The conversion is done automatically.
Parameters
----------
path: str
Name of the directory to be created and saved,
the one that will contain all the necessary files
band: str
The band you are computing: 'IR' or 'VI'
fmt: str
Fortran format for the corrk file.
write_only_metadata: bool, optional
If `True`, only supporting files are written (T.dat, p.dat, etc.)
"""
try:
os.mkdir(path)
except FileExistsError:
print('Directory was already there: '+path)
with open(os.path.join(path,'p.dat'), "w") as file:
file.write(str(self.Np)+'\n')
lp_to_write=self.logpgrid+np.log10(u.Unit(self.p_unit).to(u.Unit('mbar')))
for lp in lp_to_write:
file.write(str(lp)+'\n')
with open(os.path.join(path,'T.dat'), "w") as file:
file.write(str(self.Nt)+'\n')
for t in self.tgrid:
file.write(str(t)+'\n')
with open(os.path.join(path,'g.dat'), "w") as file:
file.write(str(self.Ng+1)+'\n')
for g in self.weights:
file.write(str(g)+'\n')
file.write(str(0.)+'\n')
dirname=os.path.join(path,band+str(self.Nw))
try:
os.mkdir(dirname)
except FileExistsError:
print('Directory was already there: '+dirname)
with open(os.path.join(dirname,'narrowbands_'+band+'.in'), "w") as file:
file.write(str(self.Nw)+'\n')
for iw in range(self.Nw):
file.write(str(self.wnedges[iw])+' '+str(self.wnedges[iw+1])+'\n')
if not write_only_metadata:
#file = open(dirname+'/corrk_gcm_IR.in', "w")
data_to_write=self.kdata.transpose((1,0,2,3,4)).flatten(order='F')
data_to_write=data_to_write*u.Unit(rm_molec(self.kdata_unit)).to(u.Unit('cm^2'))
data_to_write=np.append(data_to_write, \
np.zeros(self.Np*self.Nt*self.Nx*self.Nw)) \
.reshape((1,self.Np*self.Nt*self.Nx*self.Nw*(self.Ng+1)))
np.savetxt(os.path.join(dirname,'corrk_gcm_'+band+'.dat'),data_to_write,fmt=fmt)
[docs]
def hires_to_ktable(self, path=None, filename_grid=None,
logpgrid=None, tgrid=None, xgrid=None, wnedges=None,
quad='legendre', order=20, g_split=0.9, weights=None, ggrid=None,
mid_dw=True, write=0, mol=None,
grid_p_unit='Pa', p_unit='unspecified',
kdata_unit='unspecified', file_kdata_unit='unspecified',
remove_zeros=False,
**kwargs):
"""Computes a k coeff table from :class:`~exo_k.util.hires_spectrum.Hires_spectrum`
objects.
see :func:`exo_k.ktable.Ktable.hires_to_ktable` method for details
on the arguments and options.
Other Parameters
----------------
xgrid: array, np.ndarray
Input grid in vmr of the variable gas. Needed for a Ktable5d.
.. warning::
By default, log pressures are specified in Pa in logpgrid!!! If you want
to use another unit, do not forget to specify it with the grid_p_unit keyword.
"""
if path is None: raise TypeError("You should provide an input hires_spectrum directory")
if wnedges is None: raise TypeError("You should provide an input wavenumber array")
self.filename=path
if mol is not None:
self.mol=mol
else:
self.mol=os.path.basename(self.filename).split(self._settings._delimiter)[0]
if weights is not None:
self.weights=weights
self.gedges=np.concatenate(([0],np.cumsum(self.weights)))
if ggrid is not None:
self.ggrid=np.array(ggrid)
else:
self.ggrid=(self.gedges[1:]+self.gedges[:-1])*0.5
self.sampling_method='custom'
else:
if quad=='legendre':
self.weights,self.ggrid,self.gedges=gauss_legendre(order)
elif quad=='split-legendre':
self.weights,self.ggrid,self.gedges=split_gauss_legendre(order, g_split)
else:
raise NotImplementedError("Type of quadrature (quad keyword) not known.")
self.sampling_method=quad
self.Ng=self.weights.size
conversion_factor=u.Unit(grid_p_unit).to(u.Unit('Pa'))
self.logpgrid=np.array(logpgrid, dtype=float)+np.log10(conversion_factor)
self.pgrid=10**self.logpgrid #in Pa
self.p_unit='Pa'
self.Np=self.logpgrid.size
if write >= 3 : print(self.Np,self.pgrid)
self.tgrid=np.array(tgrid)
self.Nt=self.tgrid.size
if write >= 3 : print(self.Nt,self.tgrid)
self.xgrid=np.array(xgrid)
self.Nx=self.xgrid.size
if write >= 3 : print(self.Nx,self.xgrid)
self.wnedges=np.array(wnedges)
if self.wnedges.size<2: raise TypeError('wnedges should at least have two values')
self.wns=(self.wnedges[1:]+self.wnedges[:-1])*0.5
self.Nw=self.wns.size
self.kdata=np.zeros(self.shape)
if filename_grid is None:
filename_grid=create_fname_grid_Kspectrum_LMDZ(self.Np, self.Nt,
**select_kwargs(kwargs,['suffix','nb_digit']))
else:
filename_grid=np.array(filename_grid)
for iP in range(self.Np):
for iT in range(self.Nt):
for iX in range(self.Nx):
filename=filename_grid[iP,iT,iX]
fname=os.path.join(path,filename)
if write >= 3 : print(fname)
spec_hr=Hires_spectrum(fname, file_kdata_unit=file_kdata_unit,
**select_kwargs(kwargs,['skiprows','wn_column','mult_factor',
'kdata_column','data_type','binary','mass_amu']))
# for later conversion, the real kdata_unit is in spec_hr.kdata_unit
self.kdata_unit=spec_hr.kdata_unit
was_xsec=(spec_hr.data_type=='xsec')
wn_hr=spec_hr.wns
k_hr=spec_hr.kdata
if mid_dw:
dwn_hr=(wn_hr[2:]-wn_hr[:-2])*0.5
wn_hr=wn_hr[1:-1]
k_hr=k_hr[1:-1]
else:
dwn_hr=(wn_hr[1:]-wn_hr[:-1])
wn_hr=wn_hr[:-1]
k_hr=k_hr[:-1]
self.kdata[iP,iT,iX]=spectrum_to_kdist(k_hr,wn_hr,dwn_hr,self.wnedges,self.ggrid)
if not was_xsec:
self.kdata[iP,iT,iX]=self.kdata[iP,iT,iX]*KBOLTZ*self.tgrid[iT]/self.pgrid[iP]
if not was_xsec:
self.kdata=self.kdata*u.Unit(self.kdata_unit).to(u.Unit('m^-1'))
self.kdata_unit='m^2/molecule'
# Accounts for the conversion of the abs_coeff to m^-1, so we know that
# self.kdata is now in m^2/molecule. Can now convert to the desired unit.
if self._settings._convert_to_mks and kdata_unit == 'unspecified':
kdata_unit='m^2/molecule'
self.convert_kdata_unit(kdata_unit=kdata_unit)
# converts from self.kdata_unit wich is either:
# - 'm^2/molecule' data_type was 'abs_coeff'
# - The units after Hires_spectrum (which have already taken
# file_kdata_unit into account)
# to the desired unit.
self.convert_p_unit(p_unit=p_unit)
if remove_zeros : self.remove_zeros(deltalog_min_value=10.)
self.setup_interpolation()
[docs]
def setup_interpolation(self, log_interp=None):
"""Creates interpolating functions to be called later on. and loads
it as attribute (inplace).
"""
if log_interp is None: log_interp=self._settings._log_interp
self._local_log_interp=log_interp
if self._local_log_interp:
self._finterp_kdata=RegularGridInterpolator( \
(self.logpgrid,self.tgrid,np.log(self.xgrid)), np.log(self.kdata), \
bounds_error=True)
else:
self._finterp_kdata=RegularGridInterpolator( \
(self.logpgrid, self.tgrid, self.xgrid), self.kdata, \
bounds_error=True)
[docs]
def set_kdata(self, new_kdata):
"""Changes kdata (inplace). this is preferred to directly accessing kdata because one
could forget to run setup_interpolation().
Parameters
----------
new_kdata: array, np.ndarray
New array of kdata.
"""
super().set_kdata(new_kdata)
self.setup_interpolation()
[docs]
def interpolate_kdata(self, logp_array=None, t_array=None, x_array= None,
log_interp=None, logp_interp=True, wngrid_limit=None, **kwargs):
"""interpolate_kdata interpolates the kdata at on a given temperature and
log pressure profile.
Parameters
----------
logp_array: Array
log 10 pressure array to interpolate to
t_array: Array, same size a logp_array
Temperature array to interpolate to
x_array: Array, same size a logp_array
vmr of variable gas array to interpolate to
If floats are given, they are interpreted as arrays of size 1.
wngrid_limit: list or array, optional
if an array is given, interpolates only within this array
log_interp: bool, dummy
Dummy variable to be consistent with interpolate_kdata in data_table.
Whether the interpolation is linear in kdata or in log(kdata) is actually
controlled by self._settings._log_interp but only when the ktable is loaded.
If you change that after the loading, you should rerun setup_interpolation().
Returns
-------
array of shape (logp_array.size, self.Nw , self.Ng)
The interpolated kdata.
"""
#clipping data
if not logp_interp:
raise RuntimeError('Linear p interpolation not yet supported for Ktable5d.')
if log_interp != self._local_log_interp:
self.setup_interpolation(log_interp=log_interp)
logp_array=np.clip(logp_array, self.logpgrid[0], self.logpgrid[-1])
t_array=np.clip(t_array, self.tgrid[0], self.tgrid[-1])
x_array=np.clip(x_array, self.xgrid[0], self.xgrid[-1])
if wngrid_limit is None:
wngrid_filter = slice(None)
else:
wngrid_limit=np.array(wngrid_limit)
wngrid_filter = np.where((self.wnedges > wngrid_limit.min()) & (
self.wnedges <= wngrid_limit.max()))[0][:-1]
if self._local_log_interp:
coord_to_interp=np.array([logp_array,t_array,np.log(x_array)]).transpose()
tmp_res=self._finterp_kdata(coord_to_interp)
return np.exp(tmp_res[:,wngrid_filter])
else:
coord_to_interp=np.array([logp_array,t_array,x_array]).transpose()
tmp_res=self._finterp_kdata(coord_to_interp)
return tmp_res[:,wngrid_filter]
[docs]
def remap_logPT(self, logp_array=None, t_array=None, x_array= None):
"""remap_logPT re-interpolates the kdata on a new temprature and log pressure grid
(inplace). If an `x_array` is specified, the interpolation occurs along the
vmr axis as well.
Parameters
----------
logp_array: Array
log 10 pressure array to interpolate to
t_array: Array
temperature array to interpolate to
x_array: Array
vmr of variable gas array to interpolate to
.. warning::
Whether the interpolation is linear in kdata or in log(kdata)
is controlled by self._settings._log_interp but only when the ktable is loaded.
If you change that after the loading, you should rerun setup_interpolation().
"""
#clipping data
logp_array_tmp=np.clip(logp_array, self.logpgrid[0], self.logpgrid[-1])
t_array_tmp=np.clip(t_array, self.tgrid[0], self.tgrid[-1])
if x_array is None:
x_array_tmp=self.xgrid
#it is a bit silly to interpolate on x anyway, even when the user
# only wants to remap on P and T, but it is
# way simpler to write!
else:
x_array_tmp=np.clip(x_array, self.xgrid[0], self.xgrid[-1])
if self._local_log_interp:
coord=np.array(np.meshgrid(
logp_array_tmp, t_array_tmp, np.log(x_array_tmp))).transpose((2,1,3,0))
tmp_res=np.exp(self._finterp_kdata(coord))
else:
coord=np.array(np.meshgrid(
logp_array_tmp, t_array_tmp, x_array_tmp)).transpose((2,1,3,0))
tmp_res=self._finterp_kdata(coord)
self.logpgrid= logp_array
self.pgrid = 10**self.logpgrid
self.Np = logp_array.size
self.tgrid = t_array
self.Nt = t_array.size
if x_array is not None:
self.xgrid = x_array
self.Nx = x_array.size
self.set_kdata(tmp_res)
# self.setup_interpolation()
[docs]
def copy(self,cp_kdata=True):
"""Creates a new instance of :class:`Ktable5d` object and (deep) copies data into it
Parameters
----------
cp_kdata: bool, optional
If false, the kdata table is not copied and
only the structure and metadata are.
Returns
-------
:class:`Ktable`
A new :class:`Ktable5d` instance with the same structure as self.
"""
res=Ktable5d()
res.copy_attr(self,cp_kdata=cp_kdata)
res._local_log_interp=self._local_log_interp
res.xgrid = np.copy(self.xgrid)
res.weights = np.copy(self.weights)
res.ggrid = np.copy(self.ggrid)
res.gedges = np.copy(self.gedges)
if res.kdata is not None: res.setup_interpolation()
return res
[docs]
def gindex(self, g):
"""Finds the index corresponding to the given g
"""
return min(np.searchsorted(self.ggrid,g),self.Ng-1)
[docs]
def xindex(self, x):
"""Finds the index corresponding to the given x
"""
return min(np.searchsorted(self.xgrid,x),self.Nx-1)
[docs]
def spectrum_to_plot(self, p=1.e-5, t=200., x=1., g=None):
"""provide the spectrum for a given point to be plotted
Parameters
----------
p : float
Pressure (Ktable pressure unit)
t : float
Temperature(K)
g: float
Gauss point
x: float
Mixing ratio of the species
"""
if g is None: raise RuntimeError('A gauss point should be provided with the g= keyword.')
gindex=self.gindex(g)
return self.interpolate_kdata(log10(p),t,x)[0,:,gindex]
[docs]
def plot_distrib(self, ax, p=1.e-5, t=200., wl=1., x=1., xscale=None, yscale='log', **kwarg):
"""Plot the distribution for a given point
Parameters
----------
p : float
Pressure (Ktable pressure unit)
t : float
Temperature(K)
wl: float
Wavelength (micron)
"""
wlindex=self.wlindex(wl)
toplot=self.interpolate_kdata(log10(p),t,x)[0,wlindex]
if xscale is not None:
ax.set_xscale(xscale)
ax.plot(1.-self.ggrid,toplot,**kwarg)
ax.set_xlabel('1-g')
else:
ax.plot(self.ggrid,toplot,**kwarg)
ax.set_xlabel('g')
ax.set_ylabel('Cross section ('+self.kdata_unit+')')
if yscale is not None: ax.set_yscale(yscale)
def __repr__(self):
"""Method to output header
"""
out1=super().__repr__()
output=out1+"""
weights : {wg}
x (vmr) : {xgrid}
data oredered following p, t, x, wn, g
shape : {shape}
""".format(wg=self.weights, xgrid=self.xgrid, shape=self.shape)
return output
[docs]
def combine_with(self, other, x_self=None, x_other=None, **kwargs):
"""Method to create a new :class:`Ktable5d` where the kdata of 'self' are
randomly mixed with 'other' (that must be a :class:`Ktable`).
The main purpose is to add the opacity of a trace species to the background gas
of the :class:`Ktable5d` instance.
.. warning::
Because:
* the opacity from the background and variable gases cannot be
isolated,
* The values of the array for the vmr of the variable gas (self.xgrid)
are not modified (diluted),
the treatment here is valid only if `x_other` << 1.
For this reason, `x_self` should be either left to None, or 1-`x_other` depending
exactly what you want to do. But if you put `x_self`<<1, you are on your own.
Parameters
----------
other: :class:`~exo_k.ktable.Ktable`
A :class:`Ktable` object to be mixed with. Dimensions should be the same as self
(except for xgrid).
x_self: float only, optional
Volume mixing ratio of self.
x_other: float or array, optional
Volume mixing ratio of the species to be mixed with (other).
If either x_self or x_other are set to `None` (default),
the cross section of the species in question
are considered to be already normalized with respect to the mixing ratio.
Returns
-------
:class:`Ktable5d`
A new Ktable5d with the opacity of the new species added.
"""
if not ((self.Np == other.Np) and (self.Nt == other.Nt) and (self.Nw == other.Nw) \
and (self.Ng == other.Ng)):
raise TypeError("""in combine_with: kdata tables do not have the same dimensions.
I'll stop now!""")
if other.Nx is not None:
raise TypeError("""in combine_with: cannot combine 2 Ktable5d""")
if (self.p_unit!=other.p_unit) or \
(rm_molec(self.kdata_unit)!=rm_molec(other.kdata_unit)):
raise RuntimeError("""in combine_with: tables do not have the same units.
I'll stop now!""")
res=self.copy(cp_kdata=True)
tmp=other.copy()
if x_other is None:
x_other=1.
for iX in range(self.Nx):
tmp.kdata=self.kdata[:,:,iX,:,:]
res.kdata[:,:,iX,:,:]= \
tmp.RandOverlap(other, x_self, x_other*(1.-self.xgrid[iX]), **kwargs)
res.setup_interpolation()
return res
[docs]
def bin_down(self, wnedges=None, weights=None, ggrid=None,
remove_zeros=False, num=300, use_rebin=False, write=0):
"""Method to bin down a kcoeff table to a new grid of wavenumbers (inplace).
Parameters
----------
wnedges: array, np.ndarray
Edges of the new bins of wavenumbers (cm-1)
onto which the kcoeff should be binned down.
if you want Nwnew bin in the end, wnedges.size must be Nwnew+1
wnedges[0] should be greater than self.wnedges[0] (JL20 not sure anymore)
wnedges[-1] should be lower than self.wnedges[-1]
weights: array, np.ndarray, optional
Desired weights for the resulting Ktable.
ggrid: array, np.ndarray, optional
Desired g-points for the resulting Ktable.
Must be consistent with provided weights.
If not given, they are taken at the midpoints of the array
given by the cumulative sum of the weights
"""
old_ggrid=self.ggrid
if weights is not None:
self.weights=np.array(weights)
self.gedges=np.concatenate(([0],np.cumsum(self.weights)))
if ggrid is not None:
self.ggrid=np.array(ggrid)
else:
self.ggrid=(self.gedges[1:]+self.gedges[:-1])*0.5
self.Ng=self.ggrid.size
wnedges=np.array(wnedges)
if wnedges.size<2: raise TypeError('wnedges should at least have two values')
wngrid_filter = np.where((wnedges <= self.wnedges[-1]) & (wnedges >= self.wnedges[0]))[0]
if not is_sorted(wnedges):
raise RuntimeError('wnedges should be sorted.')
indicestosum,wn_weigths=rebin_ind_weights(self.wnedges, wnedges[wngrid_filter])
if write> 10 :print(indicestosum);print(wn_weigths)
newkdata=np.zeros((self.Np,self.Nt,self.Nx,wnedges.size-1,self.Ng), dtype=np.float64)
for iX in range(self.Nx):
newkdata[:,:,iX]=bin_down_corrk_numba((self.Np,self.Nt,wnedges.size-1,self.Ng), \
self.kdata[:,:,iX], old_ggrid, self.ggrid, self.gedges, indicestosum, \
wngrid_filter, wn_weigths, num, use_rebin)
self.kdata=newkdata
self.wnedges=wnedges
self.wns=(wnedges[1:]+wnedges[:-1])*0.5
self.Nw=self.wns.size
if remove_zeros : self.remove_zeros(deltalog_min_value=10.)
self.setup_interpolation()
[docs]
def clip_spectral_range(self, wn_range=None, wl_range=None):
"""Limits the data to the provided spectral range (inplace):
* Wavenumber in cm^-1 if using wn_range argument
* Wavelength in micron if using wl_range
"""
super().clip_spectral_range(wn_range=wn_range, wl_range=wl_range)
if self.kdata is not None:
self.setup_interpolation()
[docs]
def extend_spectral_range(self, **kwargs):
"""Extends the spectral range of an existing table (inplace).
The new bins are filled with zeros (except if remove_zeros=True).
See :func:`exo_k.data_table.Data_table.extend_spectral_range` method for details
on the arguments and options.
"""
super().extend_spectral_range(**kwargs)
self.setup_interpolation()
[docs]
def remap_g(self, ggrid=None, weights=None):
"""Method to resample a kcoeff table to a new g grid (inplace).
Parameters
----------
ggrid: array, np.ndarray
New grid of abcissas for quadrature
weights: array, np.ndarray
New grid of weights
"""
weights=np.array(weights)
ggrid=np.array(ggrid)
self.Ng=weights.size
newkdata=np.zeros((self.Np, self.Nt, self.Nx, self.Nw, self.Ng))
g_sample_5d(ggrid, newkdata, self.ggrid, self.kdata)
self.ggrid=ggrid
self.weights=weights
self.gedges=np.concatenate(([0],np.cumsum(self.weights)))
self.set_kdata(newkdata)
[docs]
def read_Qdat(filename):
"""Reads Q.dat files LMDZ style and extract the vmr grid.
Parameters
----------
filename: str
Path to file to read.
Returns
-------
background_mol_names: list
list of names of molecules in background gas
var_mol: str
Name of variable molecule
Nx: int
Size of xgrid
xgrid: array, np.ndarray
grid of vmr for variable gas
"""
with open(filename, "r") as file:
Nmol=int(file.readline())
background_mol_names=[]
for ii in range(Nmol-1):
#print(file.readline().split()[0])
background_mol_names.append(file.readline().split()[0])
var_mol=file.readline().split()[0]
Nx=int(file.readline())
xgrid=np.zeros(Nx)
for ii in range(Nx):
xgrid[ii]=float(file.readline())
return background_mol_names,var_mol,Nx,xgrid