Source code for pytmosph3r.transmission

from pytmosph3r.aerosols import PrepareAerosols
import numba
import numpy as np
from tqdm.auto import tqdm
import exo_k as xk
from pytmosph3r.log import Logger
from pytmosph3r.rays import Rays
from pytmosph3r.atmosphere import AltitudeAtmosphere
from pytmosph3r.util.memory import *
from pytmosph3r.util.util import make_array, timer, spectral_chunks, prop
from pytmosph3r.constants import KBOLTZ

[docs]@numba.njit def compute_optical_depth_mie(P, T, rays_lengths, cross_section, mie_abs_coeff, opacity_indices, rays_list, n_rays, n_intersection, tau): """Computes tau, the optical depth.""" for r in range(n_rays): ray = rays_list[r] ray_coords = ray for i in range(n_intersection): data = rays_lengths[ray_coords][i] if data[0] == -1: break coord = (int(data[0]), int(data[1]), int(data[2])) ray_length = data[3] mie = mie_abs_coeff[opacity_indices[coord]] tau[ray] += ray_length * ( (P[coord] / (KBOLTZ*T[coord]) * cross_section[opacity_indices[coord]]) + mie )
[docs]@numba.njit def compute_optical_depth(P, T, rays_lengths, cross_section, opacity_indices, rays_list, n_rays, n_intersection, tau): """Computes tau, the optical depth.""" for r in range(n_rays): ray = rays_list[r] ray_coords = ray for i in range(n_intersection): data = rays_lengths[ray_coords][i] if data[0] == -1: break coord = (int(data[0]), int(data[1]), int(data[2])) ray_length = data[3] tau[ray] += ray_length * (P[coord] / (KBOLTZ*T[coord]) * cross_section[opacity_indices[coord]])
[docs]class Transmission(Logger): """The transmission module computes the transit depth :math:`(R_P/R_S)^2` given a set of :attr:`rays` crossing the atmosphere. Args: rays (:class:`~pytmosph3r.rays.Rays` or dict) : Defining the grid (`n_radial`, `n_angular`) of light rays that cross the atmosphere. store_transmittance (bool): Stores transmittance in output. identical (bool) : Activates the search for identical cells for which to compute the opacities only once. (Should be used when there is a lot of homogeneous data in the model). memory_aware (bool) : Try to stay under a memory fixed threshold (experimental). per_angle (bool) : Compute the transit depth per angle in the grid of rays, to be able to free the memory of the optical depths of the rays of that angle once they're done. Defaults to True. """ def __init__(self, rays=None, store_transmittance=None, identical=None, memory_aware=None, per_angle=True): super().__init__(self.__class__.__name__) self.store_transmittance = store_transmittance """Stores transmittance in output.""" if Logger.verbose: self.store_transmittance = True if self.store_transmittance: self.transmittance = [] self.identical = identical """Computes only once the opacity for cells with identical physical properties (True by default). You can use this to your advantage when an atmosphere has multiple cells with the same properties. If your atmosphere is completely heterogeneous however, consider removing this option (searching for identical cells is a waste of time).""" self.memory_aware = memory_aware """Divide the computation of opacities to fit into memory.""" self.per_angle = per_angle """Divide the computation along azimuthal angles of the rays grid.""" self.wn_contribution = self.wn_to_integral """Pointer to function to use for contribution by wns (by default computes integral, but can be set to compute transmittance by setting it to wn_to_transmittance).""" self.dim = None """For internal use only. Determines on which dimension the optical depth is computed (radial/angular dimension(s)). (0,1) will iterate over both. """ self.shape = None """For internal use only. Shape of the transmittance (without the spectral dimension).""" self.size = None """For internal use only. Length of cross-sections).""" self.rays = Rays() """Light rays characteristics. See :class:`~pytmosph3r.rays.Rays`.""" if rays is not None: if isinstance(rays, dict): self.rays = Rays(grid=rays) # transform dict into class else: if not isinstance(rays, Rays): self.error("Rays object is not from pytmosph3r. Proceed with care...") self.rays = rays # already a class self.model = None
[docs] def default_values(self): if self.model.parallel and self.model.parallel.dimension == "rays": self.per_angle = False if self.memory_aware is None: if not self.per_angle: self.memory_aware = True if self.identical is None: self.identical = True if self.model.filename is not None: self.identical = False self.info("memory_aware = %s ; identical = %s ; per_angle = %s" % (self.memory_aware, self.identical, self.per_angle))
[docs] def build(self, model): """Builds an atmospheric grid based on altitude coordinates. See :class:`~pytmosph3r.atmosphere.AltitudeAtmosphere`. """ self.model = model model.input_atmosphere.compute_altitude() model.atmosphere = AltitudeAtmosphere(model) model.atmosphere.build() self.rays.set_observer(model.observer) self.rays.build(model)
@prop def opacity(self): return self.model.opacity @prop def atmosphere(self): return self.model.atmosphere
[docs] @timer def prepare_opacities(self, *args, **kwargs): """Prepares opacities list that need to be computed by exo_k. Cells with the same physical properties are grouped together to compute their opacity only once. Timed function. Use :func:`_prepare_opacities` if time is not needed. """ return self._prepare_opacities(*args, **kwargs)
def _prepare_opacities(self, opacity_coords): """Prepares opacities list that need to be computed by exo_k. Cells with the same physical properties are grouped together to compute their opacity only once. """ self.opacity_indices = np.full(self.atmosphere.shape, np.nan, dtype='i') """To get opacity index fast""" # Initialization of data self.size = len(opacity_coords) cell_log_p = np.full((self.size), np.nan) cell_temperature = np.full((self.size), np.nan) cell_gas_vmr = {} for gas in self.atmosphere.gas_mix_ratio.keys(): if not isinstance(self.atmosphere.gas_mix_ratio[gas], (float, str)): cell_gas_vmr[gas] = np.full((self.size), np.nan) prepare_aerosols = PrepareAerosols(self.model, self.size) indices = {} for i, coordinates in enumerate(opacity_coords.keys()): self.opacity_indices[coordinates] = i cell_log_p[i] = np.log10(self.atmosphere.pressure[coordinates]) cell_temperature[i] = self.atmosphere.temperature[coordinates] for gas in self.atmosphere.gas_mix_ratio.keys(): if isinstance(self.atmosphere.gas_mix_ratio[gas], (float, str)): cell_gas_vmr[gas] = self.atmosphere.gas_mix_ratio[gas] else: cell_gas_vmr[gas][i] = self.atmosphere.gas_mix_ratio[gas][coordinates] if self.identical: gas_values = np.asarray([gas for gas in list(cell_gas_vmr.values()) if isinstance(gas, np.ndarray)]) if len(gas_values): key = (cell_log_p[i],cell_temperature[i], *gas_values[:, i]) else: key = (cell_log_p[i],cell_temperature[i]) if key not in indices: indices[key] = [] indices[key] += [i] aer_reff_densities = prepare_aerosols.compute(i, coordinates) if self.identical: self.debug("Computing %s out of %s"% (len(indices), len(opacity_coords))) if len(indices) == len(opacity_coords): self.warning("`identical` option useless. Maybe remove it next time? Your atmosphere is completely heterogeneous so we can't find identical cells.") else: indices = cell_log_p, cell_temperature, cell_gas_vmr self.n_opacities += len(indices) return indices, aer_reff_densities, cell_gas_vmr
[docs] @timer def compute_integral(self, *args, **kwargs): """Computes integral by iterating over opacities given by :func:`prepate_opacities` and if :attr:`memory_aware`, use :func:`wn_chunks` to subdivide the work along the spectral axis. Timed function. Use :func:`compute_contribution` if time is not needed. """ return self.compute_contribution(*args, **kwargs)
[docs] def compute_contribution(self, opacities): """Computes integral by iterating over opacities given by :func:`prepate_opacities` and if :attr:`memory_aware`, use :func:`wn_chunks` to subdivide the work along the spectral axis. """ indices = opacities[0] aer_reff_densities = opacities[1] cell_gas_vmr = opacities[2] # unpack data from prepare_opacities() if self.identical: data = np.asarray(list(indices.keys())).T log_p = data[0] temperature = data[1] gas_vmr = cell_gas_vmr.copy() for i, gas in enumerate([gas for gas, vmr in list(cell_gas_vmr.items()) if isinstance(vmr, np.ndarray)]): gas_vmr[gas] = data[i+2] else: log_p = indices[0] temperature = indices[1] gas_vmr = indices[2] if self.model.parallel and self.model.parallel.dimension == "spectral": integrals = self.model.parallel.compute_integral(self, indices, log_p, temperature, gas_vmr, aer_reff_densities) else: wn_chunks = self.wn_chunks(len(log_p)) integrals = [] # iterate over chunks of wavelengths that fit into memory for wn_chunk in wn_chunks: chunk_integral = self.wn_contribution(indices, log_p, temperature, gas_vmr, aer_reff_densities, wn_chunk) integrals.append(chunk_integral) try: # concatenate all wavelengths back together integral = np.concatenate(integrals) return integral except: return integrals
[docs] def wn_chunks(self, xsec_size): """Subdivides the work along the spectral axis. Useful when cross sections + transmittance are too large to fit into memory. Args: xsec_size (int): Size of the cross-sections array (opacities) to compute. """ transmittance_shape = self.shape + (self.opacity.k_data.Nw, ) xsec_shape = (xsec_size, self.opacity.k_data.Nw) if self.opacity.k_data.Ng: xsec_shape += (self.opacity.k_data.Ng, ) wn_chunks = [None] # all spectral range by default if self.memory_aware: mem_total, mem_available = self.available_memory(np.prod(xsec_shape) + np.prod(transmittance_shape), name="Cross section + Transmittance") try: self.check_memory(np.prod(xsec_shape) + np.prod(transmittance_shape), name="Cross section + Transmittance") except MemoryError as e: self.warning(str(e)) nchunks = int(mem_total/(mem_available*MemoryUtils.margin))+1 wn_chunks = spectral_chunks(self.opacity.k_data, nchunks) self.info("Subdividing the work into %s wavelength chunks..."%nchunks) new_xsec_shape = (xsec_size, int(np.ceil(self.opacity.k_data.Nw/nchunks))) if self.opacity.k_data.Ng: new_xsec_shape += (self.opacity.k_data.Ng, ) new_transmittance_shape = self.shape + (self.opacity.k_data.Nw/nchunks, ) mem_total, mem_available = self.available_memory(np.prod(new_xsec_shape) + np.prod(new_transmittance_shape), name="Cross section + Transmittance") self.debug("Transmittance + cross-sections should consume %s GB"%(mem_total/1e9)) return wn_chunks
[docs] def wn_to_integral(self, *args, **kwargs): transmittance = self.wn_to_transmittance(*args, **kwargs) return self.transmittance_to_integral(transmittance)
[docs] def wn_to_transmittance(self, indices, log_p, temperature, gas_vmr, aer_reff_densities, wn_range): self.opacity.compute(log_p, temperature, gas_vmr, aer_reff_densities, wn_range) self.cross_section = self.opacity.cross_section self.mie_abs_coeff = self.opacity.mie_abs_coeff if self.identical: self.cross_section = np.empty((self.size,)+self.opacity.cross_section.shape[1:]) for i, coords in enumerate(indices.values()): self.cross_section[coords] = self.opacity.cross_section[i] tau = self.compute_optical_depth() if self.opacity.k_data.Ng: # correlated-k transmittance = np.sum(np.exp(-tau) * self.opacity.k_data.weights, axis=-1) else: # cross-section transmittance = np.exp(-tau) if self.store_transmittance: self.transmittance.append(transmittance) return transmittance
[docs] def transmittance_to_integral(self, transmittance): # r = self.rays.r - self.rays.dz/2 # self.S = 2.* r * self.rays.dz / self.rays.n_angular self.S = 2.* self.rays.r * self.rays.dz / self.rays.n_angular S = self.S.reshape((-1,) + (1,)*(transmittance.ndim-1)) return np.sum((1. - transmittance) * S, axis=tuple(range(transmittance.ndim - 1)))
[docs] def compute_optical_depth(self): """Computes tau, the optical depth.""" # self.info("Optical depth...") tau = np.zeros(self.shape+self.cross_section.shape[1:]) rays_it = self.rays.walk(self.dim) rays_list = [] for ray in rays_it: rays_list.append(ray) # convert to list for numba if self.dim == (-1, ): rays_list = range(len(self.rays.rays_lengths)) rays_list = np.array(rays_list).flatten() if isinstance(self.rays.rays_lengths, list): rays_lengths = make_array(self.rays.rays_lengths) n_intersection = rays_lengths.shape[1] else: angle_idx = self.rays.angle_idx if self.per_angle and "angle_idx" in self. rays.__dict__ else -1 n_intersection = self.rays.rays_lengths.shape[2] rays_lengths = self.rays.rays_lengths if angle_idx > -1: rays_lengths = self.rays.rays_lengths[:, angle_idx] if not self.per_angle: rays_list = [r for r in range(self.rays.n_radial)] for i in range(self.rays.n_angular): if self.mie_abs_coeff is not None: compute_optical_depth_mie(self.atmosphere.pressure, self.atmosphere.temperature, rays_lengths[:, i], self.cross_section, self.mie_abs_coeff, self.opacity_indices, rays_list, len(rays_list), n_intersection, tau[:, i]) else: compute_optical_depth(self.atmosphere.pressure, self.atmosphere.temperature, rays_lengths[:, i], self.cross_section, self.opacity_indices, rays_list, len(rays_list), n_intersection, tau[:, i]) else: if self.mie_abs_coeff is not None: compute_optical_depth_mie(self.atmosphere.pressure, self.atmosphere.temperature, rays_lengths, self.cross_section, self.mie_abs_coeff, self.opacity_indices, rays_list, len(rays_list), n_intersection, tau) else: compute_optical_depth(self.atmosphere.pressure, self.atmosphere.temperature, rays_lengths, self.cross_section, self.opacity_indices, rays_list, len(rays_list), n_intersection, tau) return tau
[docs] def angle_to_integral(self): self.n_opacities = 0 self.rays.opacity_coords = {} # dictionary by coordinates for computing opacity if self.store_transmittance: self.transmittance = [] self.rays.compute_sub_rays_angle() opacities = self._prepare_opacities(self.rays.opacity_coords) integral_angle = self.compute_contribution(opacities) # free some memory just in case del self.rays.opacity_coords return integral_angle
[docs] def grid_to_transmittance(self, bounds=None): """Computes transmittance for rays between bounds. Args: bounds (tuple): Must be ((r_s,r_e), (a_s,a_e)), where r_s and r_e are the radial start and end points, and a_s and a_e the angular ones. """ self.per_angle = False self.dim = (-1, ) self.rays.rays_indices = [] self.rays.rays_lengths = [] self.rays.compute_sub_rays(bounds) self.shape = (len(self.rays.rays_indices), ) opacities = self._prepare_opacities(self.rays.opacity_coords) self.wn_contribution = self.wn_to_transmittance # compute only transmittance in this mode transmittance = self.compute_contribution(opacities) return transmittance
[docs] def compute(self, model): """Sums all optical depths and computes the transit depth (ratio of visible radius of the planet on the radius of the star to the square :math:`(R_P/R_S)^2`). Returns: Spectrum (exo_k object): Spectrum :math:`(R_P/R_S)^2` """ self.debug("Real Memory before transit depth = %s"% memory_usage()) self.debug("Virt Memory before transit depth = %s"% memory_usage("VmH")) if model.parallel: model = model.parallel.synchronize(model) # get model from P0 self.model = model self.rays = model.radiative_transfer.rays # may look disturbing, but it's for the parallel version to be synchronized self.n_opacities = 0 self.dim = (0,) self.shape = (self.rays.shape[0],) self.default_values() self.rays.init_subrays() if self.model.parallel and not self.model.parallel.dimension == "spectral": self.info("Per angle") integral = self.model.parallel.compute_integral(self) if self.per_angle: integral = np.sum(integral, axis=0) elif self.per_angle: self.info("Per angle") integrals = [] transmittance = [] for self.rays.angle_idx, ray_angle in enumerate(tqdm(self.rays.angles, leave=False)): integrals.append(self.angle_to_integral()) if self.store_transmittance: try: transmittance.append(np.concatenate(self.transmittance, axis=-1)) # get transmittance wavelengths chunks except: pass # parallel version doesn't support transmittance yet integral = np.sum(integrals, axis=0) if self.store_transmittance: try: self.transmittance = np.transpose(np.array(transmittance), [1,0,2]) except: pass # parallel version doesn't support transmittance yet else: self.info("Whole") self.dim = (0,1) self.shape = self.rays.shape self.rays.compute_sub_rays() self.info("Preparing opacities of %s cells..."%len(self.rays.opacity_coords)) opacities = self.prepare_opacities(self.rays.opacity_coords) self.info("Computing opacities...") integral = self.compute_integral(opacities) if self.store_transmittance: try: self.transmittance = np.concatenate(self.transmittance, axis=-1) except: pass # parallel version doesn't support transmittance yet if self.model.parallel and self.model.parallel.rk: return # Only proc 0 needs to compute the rest self.info("Transit depth computed") self.debug("Real Memory after transit depth = %s"% memory_usage()) self.debug("Virt Memory after transit depth = %s"% memory_usage("VmH")) self.debug("Number of opacities computed: %s"%self.n_opacities) value = ((self.rays.Rp**2.) + integral)/(self.rays.Rs**2) return xk.Spectrum(value, wns=self.opacity.k_data.wns, wnedges=self.opacity.k_data.wnedges)
[docs] def inputs(self): return ["rays", "memory_aware", "identical", "per_angle"]
[docs] def outputs(self): outputs= ["n_opacities"] if self.store_transmittance: outputs += ["transmittance"] try: assert self.model.parallel is not None return outputs except: return outputs + ["rays"]