Source code for pytmosph3r.observations.emission

from typing import Optional

import exo_k as xk
import numba
import numpy as np
from exo_k import Spectrum
from math import ceil
from tqdm.auto import tqdm

from pytmosph3r.log import Logger
from pytmosph3r.planetary_system import Orbit
from pytmosph3r.util.geometry import cos_central_angle
from ..aerosols import PrepareAerosols
from ..atmosphere import Atmosphere


[docs] @numba.njit(cache=True) def surface_projection(local_latitudes, n_per_latitudes: int, local_longitudes, n_per_longitudes: int, n_total_longitudes: int, obs_latitude: float, obs_longitude: float, planet_radius: float, local_projection): """Compute surface projection of :attr:`local_latitudes` and :attr:`local_longitudes` on the plane of the sky (as seen from the observer). :attr:`local_projection` is the output of the function. """ for i in range(n_per_latitudes): lat_bounds: np.ndarray = np.array([local_latitudes[i], local_latitudes[i + 1]]) latitude: np.ndarray = np.mean(lat_bounds) surface: float = (2 * np.pi * planet_radius ** 2 * (np.sin(lat_bounds[1])-np.sin(lat_bounds[0])) / n_total_longitudes) for j in range(n_per_longitudes): long_bounds = np.array([local_longitudes[j], local_longitudes[j+1]]) longitude = np.mean(long_bounds) # phi = central angle between (lat, lon) and observer (@ infinite distance) cos_phi = cos_central_angle(latitude, longitude, obs_latitude, obs_longitude) local_projection[i,j] = cos_phi * surface
[docs] class Emission(Logger): """The emission module computes the flux emitted by a body by iterating over the latitudes and longitudes of the model. It relies on `Exo_k <http://perso.astrophy.u-bordeaux.fr/~jleconte/exo_k-doc/autoapi/exo_k/atm/index.html?highlight=emission#exo_k.atm.Atm.emission_spectrum_2stream>`_ for the computation of the 1D flux in each column. The flux is then scaled with respected to the surface of the atmosphere projected onto the plane of the sky. All options are deactivated by default. Args: planet_to_star_flux_ratio (bool): The output spectrum will be a ratio between the flux of the planet and that of the star (instead of the flux of the planet). surface_resolution (int) : Number of grid points to calculate projected surface (the more points, the more accurate). Defaults to 500. store_raw_flux (bool) : Whether to store raw flux (over each (latitude, longitude)). top_flux_from_star (bool) : Whether to use the star flux as top flux (weighted by the cosinus of the angle between the column (lat,lon) and the star). mu_from_obs (bool) : Whether to use the position of the observer for computing a :math:`\\mu_0` for :func:`emission_spectrum_2stream`. compute_contribution_function (bool) : Compute the contribution function. See `documentation of Exo_k <http://perso.astrophy.u-bordeaux.fr/~jleconte/exo_k-doc/autoapi/exo_k/atm/index.html#exo_k.atm.Atm.contribution_function>` kwargs (dict): See `documentation of Exo_k <http://perso.astrophy.u-bordeaux.fr/~jleconte/exo_k-doc/autoapi/exo_k/atm/index.html?highlight=emission#exo_k.atm.Atm.emission_spectrum_2stream>`_ to see what other options are available. Returns: (Spectrum): If :attr:`planet_to_star_flux_ratio` is True, the planet to star flux ratio ( :math:`F_P/F_S * (R_P/R_S)^2`), else the planet flux (in :math:`W/m^2/cm^{-1}`). """ def __init__(self, planet_to_star_flux_ratio: bool = False, surface_resolution: int = 500, store_raw_flux: bool = False, top_flux_from_star: bool = False, mu_from_obs: bool = False, compute_contribution_function: bool = False, **kwargs): super().__init__(self.__class__.__name__) self.planet_to_star_flux_ratio: bool = planet_to_star_flux_ratio """Returns the planet to star flux ratio instead of the planet flux.""" self.surface_resolution: Optional[int] = surface_resolution """Number of grid points to calculate projected surface.""" self.store_raw_flux: bool = store_raw_flux """Whether or not to store raw flux (over each (latitude, longitude)).""" self.top_flux_from_star: bool = top_flux_from_star """Whether or not to use the star flux as top flux (weighted by the cosine of the angle between the column (lat,lon) and the star). """ self.mu_from_obs: bool = mu_from_obs r"""Use the position of the observer for computing a :math:`\mu_0` for :func:`emission_spectrum_2stream`. """ self.compute_contribution_function: bool = compute_contribution_function """Allow to compute the contribution function.""" self.kwargs = kwargs self.spectrum: Optional[Spectrum] = None """Output spectrum (Exo_k object).""" self.spectrum_normalized: Optional[Spectrum] = None """Output spectrum (Exo_k object) normalized with the stellar flux.""" self.factor_normalized : Optional[Spectrum] = None # Attribute created by the methods # self.model: Optional['Model'] = None # self.observer: Optional['Observer'] = None # self.projected_surface: Optional[np.ndarray] = None # self.spectrum: Spectrum # self.raw_flux: Optional[np.ndarray] = None # self.observer_longitudes: Optional[float] = None # self.phase_curve_flux: Optional[np.ndarray] = None
[docs] def build(self, model): """No need for an altitude-based grid (as done in transmission), so we just copy the input grid. """ self.model = model if hasattr(model, "emission_atmosphere") and model.emission_atmosphere is not None: self.atmosphere = model.emission_atmosphere # simple ref else: self.atmosphere = model.input_atmosphere # simple ref self.atmosphere.build(model) self.observer = model.observer
[docs] def compute_projection(self): """Compute the projection surface of the grid cells over the plane of the sky.""" obs = self.model.observer atm = self.atmosphere self.projected_surface = np.zeros(atm.shape[1:]) latitudes = atm.grid.latitudes longitudes = atm.grid.all_longitudes n_per_longitudes = int(ceil(self.surface_resolution/atm.grid.n_longitudes)) n_total_longitudes = n_per_longitudes * atm.grid.n_longitudes n_per_latitudes = int(ceil(self.surface_resolution/atm.grid.n_latitudes)) n_total_latitudes = n_per_latitudes * max(atm.grid.n_latitudes-2, 1) # discretize surface among n_total_{latitudes, longitudes} (for better accuracy) surface_longitudes = np.linspace(longitudes[0], longitudes[-1], n_total_longitudes+1) surface_latitudes = np.linspace(latitudes[1],latitudes[-2],n_total_latitudes+1) # except poles, since they're halved for lat, lon in atm.grid.walk([1,2]): # Create local subdivisions of latitude and longitude to compute surface projection with a better accuracy. if lat == 0 or lat == atm.n_latitudes-1: # exception for the poles, because they're halved local_latitudes = np.linspace(latitudes[lat],latitudes[lat+1],n_per_latitudes+1) else: local_latitudes = surface_latitudes[(lat-1) * n_per_latitudes:lat * n_per_latitudes + 1] assert np.isclose(local_latitudes[-1], latitudes[lat+1]), "Local latitude (%s) is not correct (%s). Report this as a bug." % (local_latitudes[-1], latitudes[lat+1]) assert np.isclose(local_latitudes[0], latitudes[lat]), "Local latitude is not correct. Report this as a bug." local_longitudes = surface_longitudes[lon * n_per_longitudes:(lon+1) * n_per_longitudes+1] local_projection = np.zeros((n_per_latitudes, n_per_longitudes)) surface_projection(local_latitudes, n_per_latitudes, local_longitudes, n_per_longitudes, n_total_longitudes, obs.latitude, obs.longitude, self.model.planet.radius, local_projection) # local_projection is a 2D array of projected surfaces of the local subdivisions, we sum it to # get the projected surface of the current cell self.projected_surface[lat, lon] = np.sum(local_projection[np.where(local_projection > 0)])
[docs] def compute(self, model): """Iterate over vertical columns (lat/lon) of the model, in which we use the exo_k 1D two-stream emission function (emission_spectrum_2stream() for the `Atm` class). Then integrate the output flux in the direction of the observer. For that, we compute the solid angle associated to the position of the observer, projecting the visible surface of the atmosphere onto the plane of the sky. The flux is scaled with respect to that projected surface. If :attr:`planet_to_star_flux_ratio` is True, the flux is scaled to the flux of the star (a blackbody) and stored in `self.spectrum_normalized`. If the phase curve mode is activated, this function also computes a :attr:`phase_curve` object with the spectrum associated to all :attr:`n_phases` phase/observer longitudes (see :func:`compute_phase_curve`). Args: model (:class:`~pytmosph3r.model.model.Model`): Model in which to read latitudes, longitudes, pressure, etc. Returns: Spectrum (exo_k object): Spectrum, planet flux (stored in `self.spectrum`). `self.spectrum_normalized` store the planet to star ratio if `self.planet_to_star_flux_ratio` is `True`. """ if "dummy" in self.kwargs: return if model.parallel: model = model.parallel.synchronize(model) # get model from P0 self.model = model atm:Atmosphere = self.atmosphere opacity = self.model.opacity self.compute_projection() self.spectrum: Spectrum = xk.Spectrum(0, model.opacity.k_data.wns, model.opacity.k_data.wnedges) star_spectrum_top_atm: Optional[Spectrum] = None # stellar spectrum at the planet star_flux_top_atm: Optional[float] = None # total stellar flux at the planet star_pos = None orbit: Optional[Orbit] = self.model.orbit self.info("Computing output flux...") if self.store_raw_flux: self.raw_flux = np.zeros((atm.n_latitudes, atm.n_longitudes, model.opacity.k_data.Nw)) # F TOA (W/m2/cm-1) self.raw_flux_toa = np.zeros((atm.n_latitudes, atm.n_longitudes)) # Flux TOA (W/m2) # To allow comparison, divide the flux by the planet surface ie add [/m^2] self.factor = np.pi * self.model.planet.radius ** 2 if self.planet_to_star_flux_ratio is True: # Normalize with star flux, the star_flux_surface is taken at the surface of the star star_flux_surface = self.model.star.spectrum(wnedges=opacity.wnedges, wns=opacity.wns) # We cancel the [/m^2] while also computing the total flux from the star # ( pi is already taken into account in the star_flux, see BB) self.factor_normalized = star_flux_surface * self.factor * ( self.model.star.radius/self.model.planet.radius) ** 2 if self.top_flux_from_star is True: star_pos = orbit.star_coordinates if star_pos is None or star_pos is False: raise ValueError("Define star coordinates to proceed with the option 'top_flux_from_star'.") # We compute the stellar spectrum and total flux at the planet star_spectrum_top_atm = model.star.spectrum(wnedges=opacity.wnedges, wns=opacity.wns, distance=orbit.r()) star_flux_top_atm = star_spectrum_top_atm.total longitudes = atm.grid.mid_longitudes if self.compute_contribution_function: self.contribution_function: np.ndarray = np.zeros((atm.n_latitudes, atm.n_longitudes, atm.grid.n_vertical - 1,)) if atm.n_latitudes < 2 and atm.n_longitudes > 1: longitudes = tqdm(atm.grid.mid_longitudes, leave=None) for lat, latitude in enumerate(tqdm(atm.grid.mid_latitudes, leave=False, )): for lon, longitude in enumerate(longitudes): if self.mu_from_obs: obs = self.model.observer self.kwargs["mu0"] = cos_central_angle(latitude, longitude, obs.latitude, obs.longitude) if not self.store_raw_flux: # Apply skip check if we don't want the raw flux for each columns of the atmosphere. if self.projected_surface[lat, lon] <= 0: # Surface is not seen by the observer continue if 'mu0' in self.kwargs.keys() and self.kwargs["mu0"] < 0: # Surface is not seen by the observer # todo : check if projected_surface take care of this case. continue # The data should go from the top to the bottom in Exo_k (+ pressure in log10) layers = slice(None, None,-1) coordinates = (layers, lat, lon) flux_top_dw: Optional[float] = None albedo_params = {} if self.top_flux_from_star is True: # Flux received from the star, need to be positive else `None` to not be taken into account mu_star = cos_central_angle(latitude, longitude, star_pos[0], star_pos[1]) flux_top_dw = star_flux_top_atm * mu_star if mu_star > 0. else None # Prepare albedo parameters if atm.albedo_surf is not None: albedo_params['albedo_surf'] = atm.albedo_surf[(lat, lon)] if atm.wn_albedo_cutoff is not None: albedo_params['wn_albedo_cutoff'] = atm.wn_albedo_cutoff[(lat, lon)] # Select data at coordinates lat x lon logplay = np.log10(atm.pressure[coordinates]) tlay = atm.temperature[coordinates] gas_vmr = {} for gas, vmr in atm.gas_mix_ratio.items(): if isinstance(vmr, (str, int, float)): gas_vmr[gas] = vmr else: gas_vmr[gas] = vmr[coordinates] prepare_aerosols = PrepareAerosols(self.model, atm, atm.n_vertical) aer_reff_densities = prepare_aerosols.compute(slice(None, None), coordinates) # Effectively compute the flux via the 2stream function from exo_k xk_atm = xk.Atm(logplay=logplay, tlay=tlay, grav=self.model.planet.surface_gravity, composition=gas_vmr, aerosols=aer_reff_densities, k_database=opacity.k_data, cia_database=opacity.cia_data, a_database=opacity.aerosol_data, Rp=model.planet.radius, rcp=atm.rcp, **albedo_params) flux: xk.Spectrum = xk_atm.emission_spectrum_2stream(rayleigh=opacity.rayleigh, log_interp=False, flux_top_dw=flux_top_dw, stellar_spectrum=star_spectrum_top_atm, **self.kwargs) if self.compute_contribution_function: # Compute contribution (Shape: (Nlay - 1, Nw)) and perform the spectral integration. # Should return an array of the shape (Nlay - 1,) # TODO: Replace manual integration by `xk_atm.spectral_integration` but without the # `g_integration`. self.contribution_function[lat, lon, :] = np.sum(xk_atm.contribution_function() * xk_atm.dwnedges, axis=-1) if self.store_raw_flux: self.raw_flux[lat, lon] = flux.value # * self.model.surface[lat, lon] / (np.pi*self.model.planet.radius ** 2) self.raw_flux_toa[lat, lon] = flux.total if self.projected_surface[lat, lon] >= 0.: flux *= self.projected_surface[lat, lon] self.spectrum += flux if self.compute_contribution_function: # Normalize contributions across all columns. self.contribution_function /= self.contribution_function.max() self.info("DONE") if self.planet_to_star_flux_ratio is True: self.spectrum_normalized = self.spectrum.copy() self.spectrum_normalized /= self.factor_normalized self.spectrum /= self.factor return self.spectrum
[docs] def outputs(self): outputs = ["spectrum", "spectrum_normalized", "raw_flux", "projected_surface"] if self.compute_contribution_function: outputs += ['contribution_function'] return outputs