Source code for exo_k.ktable

# -*- coding: utf-8 -*-
"""
@author: jeremy leconte
"""
import os
from math import log10
import time
import numpy as np
import astropy.units as u
from .ktable_io import Ktable_io
from .ktable5d import Ktable5d
from .util.interp import rebin_ind_weights, rebin, is_sorted, \
        gauss_legendre, split_gauss_legendre, spectrum_to_kdist, \
        kdata_conv_loop, bin_down_corrk_numba, g_sample_4d
from .hires_spectrum import Hires_spectrum
from .util.filenames import create_fname_grid_Kspectrum_LMDZ, select_kwargs, create_fname_tp_grid_Kspectrum_HELIOS
from .util.cst import KBOLTZ


[docs] class Ktable(Ktable_io): """A class that handles 4D tables of k-coefficients. Based on the :class:`~exo_k.data_table.Data_table` class that handles basic operations common to Xtable. Based on the :class:`~exo_k.ktable_io.Ktable_io` class that incorporates all io routines (read/write_xxx for all the xxx supported formats). """ def __init__(self, *filename_filters, filename=None, xtable=None, path=None, p_unit='unspecified', file_p_unit='unspecified', kdata_unit='unspecified', file_kdata_unit='unspecified', remove_zeros=False, search_path=None, mol=None, **kwargs): """Initializes k coeff table and supporting data from various sources (see below by order of precedence) Parameters ---------- filename: str, optional Relative or absolute name of the file to be loaded. filename_filters: sequence of string As many strings as necessary to uniquely define a file in the global search path defined in :class:`~exo_k.settings.Settings`. This path will be searched for a file with all the filename_filters in the name. The filename_filters can contain '*'. xtable: Xtable object If no filename nor filename_filters are provided, this xtable object will be used to create a ktable. In this case, wavenumber bins must be given with the wnedges keyword. path: str If none of the above is specifed, path can point to a directory with a LMDZ type k coeff table. In this case, see read_LMDZ for the keywords to specify. If none of the parameters above is specified, just creates an empty object to be filled later. Other Parameters ---------------- p_unit: str, optional String identifying the pressure units to convert to (e.g. 'bar', 'Pa', 'mbar', or any pressure unit recognized by the astropy.units library). If ='unspecified', no conversion is done. file_p_unit: str, optional String to specify the current pressure unit if it is unspecified or if you have reasons to believe it is wrong (e.g. you just read a file where you know that the pressure grid and the pressure unit do not correspond) kdata_unit: str, optional String to identify the unit to convert to. Accepts 'cm^2', 'm^2' or any surface unit recognized by the astropy.units library. If ='unspecified', no conversion is done. In general, kdata should be kept in 'per number' or 'per volume' units (as opposed to 'per mass' units) as composition will always be assumed to be a number or volume mixing ratio. Opacities per unit mass are not supported yet. Note that you do not need to specify the '/molec' or '/molecule' in the unit. file_kdata_unit: str, optional String to specify the current kdata unit if it is unspecified or if you have reasons to believe it is wrong (e.g. you just read a file where you know that the kdata grid and the kdata unit do not correspond) remove_zeros: boolean, optional If True, the zeros in the kdata table are replaced by a value 10 orders of magnitude smaller than the smallest positive value mol: str, optional The name of the gas or molecule described by the :class:`Ktable` search_path: str, optional If search_path is provided, it locally overrides the global _search_path in :class:`~exo_k.settings.Settings` and only files in search_path are returned. """ super().__init__() if filename is not None: self.filename=filename elif filename_filters or (mol is not None and (xtable is None and path is None)): # a none empty sequence returns a True in a conditional statement self.filename=self._settings.list_files(*filename_filters, molecule = mol, only_one=True, search_path=search_path, path_type='ktable')[0] if self.filename is not None: if self.filename.lower().endswith('pickle'): self.read_pickle(filename=self.filename) elif self.filename.lower().endswith(('.hdf5', '.h5')): self.read_hdf5(filename=self.filename, mol=mol, **kwargs) elif self.filename.lower().endswith(('.ncdf', '.nc')): self.read_atmo(filename=self.filename, mol=mol, **kwargs) elif self.filename.lower().endswith('.kta'): self.read_nemesis(filename=self.filename, mol=mol) elif self.filename.lower().endswith('ktable.exorem.txt'): self.read_exorem(filename=self.filename, mol=mol) elif self.filename.lower().endswith('.fits'): self.read_arcis(filename=self.filename, mol=mol) else: raise NotImplementedError("""Requested format not recognized. Should end with .pickle, .hdf5, .h5, .fits, ktable.exorem.txt, or .kta""") elif xtable is not None: self.xtable_to_ktable(xtable=xtable, **kwargs) elif path is not None: self.read_LMDZ(path=path, mol=mol, **kwargs) else: #if there is no input file, just create an empty object self.wnedges=None self.weights=None self.ggrid=None self.gedges=None super().finalize_init(p_unit=p_unit, file_p_unit=file_p_unit, kdata_unit=kdata_unit, file_kdata_unit=file_kdata_unit, remove_zeros=remove_zeros)
[docs] def xtable_to_ktable(self, xtable=None, wnedges=None, weights=None, ggrid=None, quad='legendre', order=20, g_split=0.9, mid_dw=True, write=0, remove_zeros=False): """Fills the :class:`~exo_k.ktable.Ktable` object with a k-coeff table computed from a :class:`~exo_k.xtable.Xtable` object (inplace). The p and kcorr units are inherited from the :class:`~exo_k.xtable.Xtable` object. Parameters ---------- xtable: :class:`~exo_k.xtable.Xtable` input Xtable object instance wnedges: Array edges of the wavenumber bins to be used to compute the corrk Other Parameters ---------------- weights: array, np.ndarray, optional If weights are provided, they are used instead of the legendre quadrature. quad: string, optional Type of quadrature used. Default is 'legendre'. Also available: 'split-legendre' which uses half quadrature points between 0. and `g_split` and half between `g_split` and 1. order: Integer, optional Order of the Gauss legendre quadrature used. Default is 20. g_split: float, optional Used only if quad='split-legendre'. See above. mid_dw: boolean, optional * If True, the Xsec values in the high resolution xtable data are assumed to cover a spectral interval that is centered around the corresponding wavenumber value. The first and last Xsec values are discarded. * If False, each interval runs from the wavenumber value to the next one. The last Xsec value is dicarded. """ if xtable is None: raise TypeError("You should provide an input Xtable object") if wnedges is None: raise TypeError("You should provide an input wavenumber array") self.copy_attr(xtable,cp_kdata=False) self.wnedges=np.array(wnedges) if self.wnedges.size<2: raise TypeError('wnedges should at least have two values') self.wns=0.5*(self.wnedges[1:]+self.wnedges[:-1]) if weights is not None: self.weights=np.array(weights) self.gedges=np.concatenate(([0],np.cumsum(self.weights))) if ggrid is not None: self.ggrid=np.array(ggrid) else: self.ggrid=(self.gedges[1:]+self.gedges[:-1])*0.5 self.sampling_method='custom' else: if quad=='legendre': self.weights,self.ggrid,self.gedges=gauss_legendre(order) elif quad=='split-legendre': self.weights,self.ggrid,self.gedges=split_gauss_legendre(order, g_split) else: raise NotImplementedError("Type of quadrature (quad keyword) not known.") self.sampling_method=quad self.Ng=self.weights.size if write >6 : print(self.weights,self.ggrid,self.gedges) self.logk=False self.Nw=self.wns.size self.kdata=np.zeros(self.shape) if mid_dw: dwn_hr=(xtable.wns[2:]-xtable.wns[:-2])*0.5 wn_hr=xtable.wns[1:-1] else: dwn_hr=(xtable.wns[1:]-xtable.wns[:-1]) wn_hr=xtable.wns[:-1] for iP in range(self.Np): for iT in range(self.Nt): if mid_dw: k_hr=xtable.kdata[iP,iT,1:-1] else: k_hr=xtable.kdata[iP,iT,:-1] #print(self.ggrid) #print(wn_hr[[0,-1]]) #print(self.wnedges[[0,-1]]) self.kdata[iP,iT]=spectrum_to_kdist(k_hr,wn_hr,dwn_hr,self.wnedges,self.ggrid) if remove_zeros : self.remove_zeros(deltalog_min_value=10.)
[docs] def hires_to_ktable(self, path=None, filename_grid=None, logpgrid=None, tgrid=None, wnedges=None, quad='legendre', order=20, g_split=0.9, weights=None, ggrid=None, mid_dw=True, write=0, mol=None, grid_p_unit='Pa', p_unit='unspecified', kdata_unit='unspecified', file_kdata_unit='unspecified', helios=False, remove_zeros=False, **kwargs): """Computes a k coeff table from :class:`~exo_k.util.hires_spectrum.Hires_spectrum` objects (inplace). .. warning:: By default, log pressures are specified in Pa in logpgrid!!! If you want to use another unit, do not forget to specify it with the grid_p_unit keyword. Parameters ---------- path : String directory with the input files filename_grid : array, np.ndarray of str with shape (logpgrid.size,tgrid.size) Names of the input high-res spectra. If None, the files are assumed to follow Kspectrum/LMDZ convention, i.e. be of the type 'k001', 'k002', etc. See :func:`~exo_k.util.filenames.create_fname_grid_Kspectrum_LMDZ` for possible additional keyword arguments. logpgrid: array, np.ndarray Grid in log(pressure) of the input. Default unit is Pa, but can be changed with the `grid_p_unit` keyword. tgrid: array, np.ndarray Grid in temperature of the input. wnedges : array, np.ndarray Edges of the wavenumber bins to be used to compute the corr-k Other Parameters ---------------- weights: array, np.ndarray, optional If weights are provided, they are used instead of the legendre quadrature. quad: string, optional Type of quadrature used. Default is 'legendre'. Also available: 'split-legendre' which uses half quadrature points between 0. and `g_split` and half between `g_split` and 1. order: Integer, optional Order of the Gauss legendre quadrature used. Default is 20. g_split: float, optional Used only if quad='split-legendre'. See above. mid_dw: boolean, optional * If True, the Xsec values in the high resolution xsec data are assumed to cover a spectral interval that is centered around the corresponding wavenumber value. The first and last Xsec values are discarded. * If False, each interval runs from the wavenumber value to the next one. The last Xsec value is dicarded. mol: string, optional Give a name to the molecule. Useful when used later in a Kdatabase to track molecules. p_unit: str, optional Pressure unit to convert to. grid_p_unit : str, optional Unit of the specified `logpgrid`. kdata_unit : str, optional Kdata unit to convert to. file_kdata_unit : str, optional Kdata unit in input files. helios : bool, optional Read HELIOS output .bin files. See :func:`~exo_k.util.filenames.create_fname_grid_Kspectrum_LMDZ` or :class:`~exo_k.util.hires_spectrum.Hires_spectrum` for a list of additional arguments that can be provided to those funtion through `**kwargs`. """ if path is None: raise TypeError("You should provide an input hires_spectrum directory") if wnedges is None: raise TypeError("You should provide an input wavenumber array") self.filename=path if mol is not None: self.mol=mol else: self.mol=os.path.basename(self.filename).split(self._settings._delimiter)[0] if weights is not None: self.weights=np.array(weights) self.gedges=np.concatenate(([0],np.cumsum(self.weights))) if ggrid is not None: self.ggrid=np.array(ggrid) else: self.ggrid=(self.gedges[1:]+self.gedges[:-1])*0.5 self.sampling_method='custom' else: if quad=='legendre': self.weights,self.ggrid,self.gedges=gauss_legendre(order) elif quad=='split-legendre': self.weights,self.ggrid,self.gedges=split_gauss_legendre(order, g_split) else: raise NotImplementedError("Type of quadrature (quad keyword) not known.") self.sampling_method=quad self.Ng=self.weights.size conversion_factor=u.Unit(grid_p_unit).to(u.Unit('Pa')) self.logpgrid=np.array(logpgrid, dtype=float)+np.log10(conversion_factor) self.pgrid=10**self.logpgrid #in Pa self.p_unit='Pa' self.Np=self.logpgrid.size if write >= 3 : print(self.Np,self.pgrid) self.tgrid=np.array(tgrid) self.Nt=self.tgrid.size if write >= 3 : print(self.Nt,self.tgrid) self.wnedges=np.array(wnedges) if self.wnedges.size<2: raise TypeError('wnedges should at least have two values') self.wns=(self.wnedges[1:]+self.wnedges[:-1])*0.5 self.Nw=self.wns.size self.kdata=np.zeros(self.shape) if filename_grid is None: if helios: filename_grid,self.pgrid,self.tgrid = create_fname_tp_grid_Kspectrum_HELIOS(path) # self.pgrid=10**self.logpgrid #in Pa self.logpgrid = np.log10(self.pgrid) self.Nt = self.tgrid.size self.Np = self.pgrid.size self.kdata=np.zeros(self.shape) # redefined PT grid else: filename_grid=create_fname_grid_Kspectrum_LMDZ(self.Np, self.Nt, **select_kwargs(kwargs,['suffix','nb_digit'])) else: filename_grid=np.array(filename_grid) for iP in range(self.Np): for iT in range(self.Nt): filename=filename_grid[iP,iT] fname=os.path.join(path,filename) if write >= 3 : print(fname) spec_hr=Hires_spectrum(fname, file_kdata_unit=file_kdata_unit, helios=helios, **select_kwargs(kwargs,['skiprows','wn_column','mult_factor', 'kdata_column','data_type','binary','mass_amu'])) # for later conversion, the real kdata_unit is in spec_hr.kdata_unit self.kdata_unit=spec_hr.kdata_unit was_xsec=(spec_hr.data_type=='xsec') wn_hr=spec_hr.wns k_hr=spec_hr.kdata if mid_dw: dwn_hr=(wn_hr[2:]-wn_hr[:-2])*0.5 wn_hr=wn_hr[1:-1] k_hr=k_hr[1:-1] else: dwn_hr=(wn_hr[1:]-wn_hr[:-1]) wn_hr=wn_hr[:-1] k_hr=k_hr[:-1] self.kdata[iP,iT]=spectrum_to_kdist(k_hr,wn_hr,dwn_hr,self.wnedges,self.ggrid) if not was_xsec: self.kdata[iP,iT]=self.kdata[iP,iT]*KBOLTZ*self.tgrid[iT]/self.pgrid[iP] if not was_xsec: self.kdata=self.kdata*u.Unit(self.kdata_unit).to(u.Unit('m^-1')) self.kdata_unit='m^2/molecule' # Accounts for the conversion of the abs_coeff to m^-1, so we know that # self.kdata is now in m^2/molecule. Can now convert to the desired unit. if self._settings._convert_to_mks and kdata_unit == 'unspecified': kdata_unit='m^2/molecule' self.convert_kdata_unit(kdata_unit=kdata_unit) # converts from self.kdata_unit wich is either: # - 'm^2/molecule' data_type was 'abs_coeff' # - The units after Hires_spectrum (which have already taken # file_kdata_unit into account) # to the desired unit. self.convert_p_unit(p_unit=p_unit) if remove_zeros : self.remove_zeros(deltalog_min_value=10.)
[docs] def copy(self, cp_kdata=True, ktab5d=False): """Creates a new instance of :class:`Ktable` object and (deep) copies data into it. Parameters ---------- cp_kdata: bool, optional If false, the kdata table is not copied and only the structure and metadata are. ktab5d: bool, optional If true, creates a Ktable5d object with the same structure. Data are not copied. Returns ------- :class:`Ktable` or :class:`Ktable5d` A new :class:`Ktable` or :class:`Ktable5d` instance with the same structure as self. """ if ktab5d: res=Ktable5d() cp_kdata=False else: res=Ktable() res.copy_attr(self,cp_kdata=cp_kdata) res.weights = np.copy(self.weights) res.ggrid = np.copy(self.ggrid) res.gedges = np.copy(self.gedges) return res
[docs] def gindex(self, g): """Finds the index corresponding to the given g """ return min(np.searchsorted(self.ggrid,g),self.Ng-1)
[docs] def spectrum_to_plot(self, p=1.e-5, t=200., x=1., g=None): """provide the spectrum for a given point to be plotted Parameters ---------- p : float Pressure (Ktable pressure unit) t : float Temperature(K) g: float Gauss point x: float Mixing ratio of the species """ if g is None: raise RuntimeError('A gauss point should be provided with the g= keyword.') gindex=self.gindex(g) return self.interpolate_kdata(log10(p), t, x)[0,:,gindex]
[docs] def plot_distrib(self, ax, p=1.e-5, t=200., wl=1., x=1., xscale=None, yscale='log', **kwarg): """Plot the distribution for a given point Parameters ---------- p : float Pressure (Ktable pressure unit) t : float Temperature(K) wl: float Wavelength (micron) """ wlindex=self.wlindex(wl) toplot=self.interpolate_kdata(log10(p),t)[0,wlindex]*x if xscale is not None: ax.set_xscale(xscale) ax.plot(1.-self.ggrid,toplot,**kwarg) ax.set_xlabel('1-g') else: ax.plot(self.ggrid,toplot,**kwarg) ax.set_xlabel('g') ax.set_ylabel('Cross section ('+self.kdata_unit+')') if yscale is not None: ax.set_yscale(yscale)
def __repr__(self): """Method to output header """ out1=super().__repr__() output=out1+""" weights : {wg} data oredered following p, t, wn, g shape : {shape} """.format(wg=self.weights, shape=self.shape) return output
[docs] def RandOverlap(self, other, x_self, x_other, write=0, use_rebin=False): """Method to randomly mix the opacities of 2 species (self and other). Parameters ---------- other: :class:`Ktable` A :class:`Ktable` object to be mixed with. Dimensions should be the same as self. x_self: float or array Volume mixing ratio of the first species x_other: float or array Volume mixing ratio of the species to be mixed with. If one of these is None, the kcoeffs of the species in question are considered to be already normalized with respect to the mixing ratio. Returns ------- array array of k-coefficients for the mix. """ if not np.array_equal(self.shape,other.shape): raise TypeError("""in RandOverlap: kdata tables do not have the same dimensions. I'll stop now!""") Ng=self.Ng weights=self.weights try: kdatas=self.vmr_normalize(x_self) except TypeError: raise TypeError('Gave bad mixing ratio format to vmr_normalize') try: kdatao=other.vmr_normalize(x_other) except TypeError: raise TypeError('Gave bad mixing ratio format to vmr_normalize') kdataconv=np.zeros((self.Np,self.Nt,self.Nw,Ng**2)) weightsconv=np.zeros(Ng**2) newkdata=np.zeros(self.shape) # newkdata2=np.zeros((self.shape[0]*self.shape[1]*self.shape[2],Ng)) for jj in range(Ng): for ii in range(Ng): weightsconv[ii*Ng+jj]=weights[jj]*weights[ii] if write >= 3 : start=time.time() kdata_conv_loop(kdatas,kdatao,kdataconv,self.shape) if write >= 3 : end=time.time();print("kdata conv: ",end - start) if write >= 3 : start=time.time() for iP in range(self.Np): for iT in range(self.Nt): for iW in range(self.Nw): tmp=kdataconv[iP,iT,iW,:] indices=np.argsort(tmp) kdatasort=tmp[indices] weightssort=weightsconv[indices] newggrid=np.cumsum(weightssort) if use_rebin: newggrid=np.concatenate(([0.],newggrid)) kdatasort=np.concatenate(([kdatasort[0]],kdatasort)) newkdata[iP,iT,iW,:]=rebin(kdatasort,newggrid,self.gedges) else: newkdata[iP,iT,iW,:]=np.interp(self.ggrid,newggrid,kdatasort) if write >= 3 : end=time.time();print("kdata rebin with loop: ",end - start) return newkdata
[docs] def bin_down(self, wnedges=None, weights=None, ggrid=None, remove_zeros=False, num=300, use_rebin=False, write=0): """Method to bin down a kcoeff table to a new grid of wavenumbers (inplace). Parameters ---------- wnedges: array, np.ndarray Edges of the new bins of wavenumbers (cm-1) onto which the kcoeff should be binned down. if you want Nwnew bin in the end, wnedges.size must be Nwnew+1 wnedges[0] should be greater than self.wnedges[0] (JL20 not sure anymore) wnedges[-1] should be lower than self.wnedges[-1] weights: array, np.ndarray, optional Desired weights for the resulting Ktable. ggrid: array, np.ndarray, optional Desired g-points for the resulting Ktable. Must be consistent with provided weights. If not given, they are taken at the midpoints of the array given by the cumulative sum of the weights """ old_ggrid=self.ggrid if weights is not None: self.weights=np.array(weights) self.gedges=np.concatenate(([0],np.cumsum(self.weights))) if ggrid is not None: self.ggrid=np.array(ggrid) else: self.ggrid=(self.gedges[1:]+self.gedges[:-1])*0.5 self.Ng=self.ggrid.size wnedges=np.array(wnedges) if wnedges.size<2: raise TypeError('wnedges should at least have two values') wngrid_filter = np.where((wnedges <= self.wnedges[-1]) & (wnedges >= self.wnedges[0]))[0] if not is_sorted(wnedges): raise RuntimeError('wnedges should be sorted.') indicestosum,wn_weigths=rebin_ind_weights(self.wnedges, wnedges[wngrid_filter]) if write> 10 :print(indicestosum);print(wn_weigths) self.kdata=bin_down_corrk_numba((self.Np,self.Nt,wnedges.size-1,self.Ng), \ self.kdata, old_ggrid, self.ggrid, self.gedges, indicestosum, wngrid_filter, \ wn_weigths, num, use_rebin) self.wnedges=wnedges self.wns=(wnedges[1:]+wnedges[:-1])*0.5 self.Nw=self.wns.size if remove_zeros : self.remove_zeros(deltalog_min_value=10.)
[docs] def remap_g(self, ggrid=None, weights=None): """Method to resample a kcoeff table to a new g grid (inplace). Parameters ---------- ggrid: array, np.ndarray New grid of abcissas for quadrature weights: array, np.ndarray New grid of weights """ weights=np.array(weights) ggrid=np.array(ggrid) self.Ng=weights.size newkdata=np.zeros((self.Np, self.Nt, self.Nw, self.Ng)) g_sample_4d(ggrid, newkdata, self.ggrid, self.kdata) self.kdata=newkdata self.ggrid=ggrid self.weights=weights self.gedges=np.concatenate(([0],np.cumsum(self.weights)))