from typing import Dict, Optional
import exo_k as xk
import netCDF4
import numpy as np
from pytmosph3r.planetary_system import Planet
from pytmosph3r.util.util import aerosols_array_iterator, arrays_to_zeros, convert_log
from .model import FileModel, Model
from ..atmosphere import InputAtmosphere
from ..grid import Grid3D
[docs]
class DiagfiModel(FileModel):
"""Model reading from a diagfi.nc file, from a LMDZ GCM for example.
Same parameters as :class:`~pytmosph3r.model.hdf5model.FileModel`, plus the ones listed here.
See :class:`~pytmosph3r.interface.inputdata.DiagfiData` for the structure of the diagfi file.
"""
def __init__(self, t: int = -1, var_dict: Optional[Dict] = None,
gas_units: Optional[str] = None, aerosols_units: Optional[str] = None,
extra_longitude: bool = True, *args, **kwargs):
"""To read from a diagfi, you may need the following parameters:
Args:
t (int) : Time slice to read from diagfi input file. If you do not have time-dependent data,set it to False. By default, takes the last time slice.
var_dict (dict) : Dictionary listing the names of the variables in the input file. Includes pressure 'p' and temperature 'temp' for now, as well as their surface values, and the surface area. 'u','v' and 'w' are the zonal, meridional and vertical winds, respectively. Defaults to {'p':'p', 'temp':'temp', 'ps':'ps', 'tsurf':'tsurf', 'surface':'aire', 'u':'u', 'v':'v', 'w':'w'}.
gas_units (str) : Used to indicate what units are in the input file. For example "log_vmr" indicates it is a log of a VMR. Possible units are "vmr", "mmr", "log_vmr", "log_mmr","ln_vmr", "ln_mmr".
aerosols_units (str) : Used to indicate what units are in the input file. For example "log_mmr" indicates it is a log of a MMR. Possible units are "mmr", "log_mmr", "ln_mmr".
input_mu (float) : Molar mass (if you need to convert MMR to VMR).
"""
if var_dict is None:
var_dict = {}
if t is not None and t is not False:
self.t = int(t)
else:
self.t = t
"""Time slice to read from diagfi. By default -1."""
self.gas_units = gas_units
"""Units of gases in input file."""
self.aerosols_units = aerosols_units
"""Units of aerosols in input file."""
self.var_dict = {'latitude':'latitude', 'longitude':'longitude', 'temp':'temp', 'p':'p', 'ps':'ps', 'tsurf':'tsurf', 'surface':'aire', 'u':'u', 'v':'v', 'w':'w'}
"""Dictionary listing the names of the variables in the input file. This includes pressure 'p' and temperature 'temp', as well as surface pressure and temperature 'ps' and 'tsurf', or the surface 'surface'. 'u','v' and 'w' are the zonal, meridional and vertical winds, respectively."""
if var_dict is None or var_dict is False:
var_dict = {}
self.var_dict.update(var_dict)
self.extra_longitude = extra_longitude
"""If the last longitude is a duplicate of the first one.
"""
FileModel.__init__(self, *args, **kwargs)
[docs]
def read_data(self):
"""Read data from a diagfi.nc file generated by the LMDZ GCM.
The netCDF file should have at least a temperature ``temp`` and a pressure either defined by ``p`` on all grid points or by `ps` the surface pressure and coefficients ``ap``, ``bp`` and ``aps``, ``bps``.
For the gas and aerosols descriptions, see :any:`library_input <library_input>`.
The ``controle`` variable should (preferrably) list:
#. :math:`n_{longitudes}` (optional, last longitude is a duplicate from the first one)
#. :math:`n_{latitudes}` (optional)
#. :math:`n_{vertical}` (optional)
#. :math:`R_{p}` (planet radius) in `m`
#. #NOT USED#
#. :math:`g_{0}` (surface gravity)
#. :math:`\\mu` (molar mass) in `g/mol` (if abundances given in mass mixing ratios)
"""
self.info("Reading model from %s"% self.filename)
f = netCDF4.Dataset(self.filename)
try:
controle = f.variables['controle']
Rp = float(controle[4])
g0 = float(controle[6])
self.planet = Planet(surface_gravity=g0, radius=Rp)
except:
self.warning("Could not find planet information in 'controle'.")
controle = None
self.planet = None
n_latitudes = len(f.variables[self.var_dict["latitude"]])
n_longitudes = len(f.variables[self.var_dict["longitude"]]) -1 # ignore +180
n_longitudes = max(n_longitudes, 1)
"""Reading pressure and temperature."""
diagfi_temperature = self.get_var(f.variables[self.var_dict['temp']])
p_layers = None
p_levels = None
pressure = None
try:
surface_pressure = self.get_var(f.variables[self.var_dict['ps']], ndim=2)
ap = f.variables['ap'][:]
bp = f.variables['bp'][:]
p_levels = ap[:, None, None] + bp[:, None, None] * surface_pressure
aps = f.variables['aps'][:]
bps = f.variables['bps'][:]
p_layers = aps[:, None, None] + bps[:, None, None] * surface_pressure
n_vertical = len(p_layers)
# in Transmission mode, we merge levels and layers (in the input atmosphere only)
n_vertical = len(p_levels) + len(p_layers)
assert p_levels.size
except:
if self.var_dict['p'] in f.variables:
pressure = self.get_var(f.variables[self.var_dict['p']])
p_layers = pressure.copy() # layers or levels?
n_vertical = len(pressure)
else:
raise KeyError("The input file %s should have at least '%s' or 'ap'/'bp' entries defining the pressure in the model."%(self.filename, self.var_dict['p']))
"""Create the grid now that we now its dimension."""
grid = Grid3D(n_vertical=n_vertical, n_latitudes=n_latitudes, n_longitudes=n_longitudes)
self.grid_to_radians(f, grid)
diagfi_gas = self.read_gases(f, controle)
diagfi_aerosols = self.read_aerosols(f)
diagfi_winds = self.read_winds(f, grid)
"""Calculate surface of each cell, and compare to diagfi 'aire' information."""
try:
calculated_surface = self.surface
self.surface = self.get_var(f.variables[self.var_dict['surface']], t=False, ndim=2)
assert np.isclose(self.surface, calculated_surface)
except:
self.warning("Surface from diagfi either wrong or inexistent. We will recompute it...")
self.surface = None
if self.emission or self.phasecurve:
self.info("In emission mode, we read pressure from layers and the surface point 'ps'. We therefore add a surface point to temperature, gases, etc.")
p_emission = p_layers # we will add surface point below...
try:
# ... if 'ps' exists
ps = self.get_var(f.variables[self.var_dict['ps']], ndim=2)
if (ps > p_emission[0]).any():
p_emission = np.concatenate([[ps], p_emission])
except:
self.warning("Could not read '%s' (surface pressure) in input file."%self.var_dict['ps'])
if len(p_emission) == len(diagfi_temperature)+1:
extend = extend_b
gas_mix_ratio = {}
for gas, vmr in diagfi_gas.items():
gas_mix_ratio[gas] = extend(vmr)
aerosols = arrays_to_zeros(diagfi_aerosols, grid.shape)
for aerosol, key in aerosols_array_iterator(diagfi_aerosols):
aerosols[aerosol][key] = extend(diagfi_aerosols[aerosol][key])
try:
temperature = extend(diagfi_temperature, surf=[self.get_var(f.variables[self.var_dict['tsurf']], ndim=2)])
except:
temperature = extend(diagfi_temperature)
else:
aerosols = diagfi_aerosols
gas_mix_ratio = diagfi_gas
temperature = diagfi_temperature
emission_grid = Grid3D(len(p_emission), n_latitudes=n_latitudes, n_longitudes=n_longitudes)
self.emission_atmosphere = InputAtmosphere(grid=emission_grid, pressure=p_emission, temperature=temperature, gas_mix_ratio=gas_mix_ratio, aerosols=aerosols)
if not (self.transmission or self.lightcurve):
return # no need to compute input_atmosphere, necessary for transmission or lightcurve
"""Merging levels and layers (aps/ap bps/bp mode)."""
if pressure is None:
pressure = np.zeros(grid.shape)
temperature = np.zeros(grid.shape)
gas_mix_ratio = {}
for gas in diagfi_gas.keys():
gas_mix_ratio[gas] = np.zeros(grid.shape)
aerosols = arrays_to_zeros(diagfi_aerosols, grid.shape)
winds = np.zeros(grid.shape+(3,))
for i in range(n_vertical):
simple_idx = i == n_vertical-1 and -1 or int(i/2)
if i%2 == 0:
pressure[i] = p_levels[int(i/2)]
else:
pressure[i] = p_layers[int(i/2)]
temperature[i] = diagfi_temperature[simple_idx]
for gas, vmr in diagfi_gas.items():
gas_mix_ratio[gas][i] = vmr[simple_idx]
for aerosol, key in aerosols_array_iterator(diagfi_aerosols):
aerosols[aerosol][key][i] = diagfi_aerosols[aerosol][key][simple_idx]
winds[i] = diagfi_winds[simple_idx]
elif len(pressure) > len(diagfi_temperature):
"""Pressure is given in levels, and other data in layers, so we add a surface point to all data."""
self.warning("Pressure has been given as levels-wise while temperature as layers-wise. We therefore add a surface point to temperature, gases, etc.")
gas_mix_ratio = {}
for gas, vmr in diagfi_gas.items():
gas_mix_ratio[gas] = np.concatenate([vmr[:1], vmr])
aerosols = arrays_to_zeros(diagfi_aerosols, grid.shape)
for aerosol, key in aerosols_array_iterator(diagfi_aerosols):
aerosols[aerosol][key] = np.concatenate([diagfi_aerosols[aerosol][key][:1], diagfi_aerosols[aerosol][key]])
try:
temperature = np.concatenate([[self.get_var(f.variables[self.var_dict['tsurf']], ndim=2)], diagfi_temperature])
except:
temperature = np.concatenate([diagfi_temperature[:1], diagfi_temperature])
winds = np.concatenate([diagfi_winds[:1], diagfi_winds])
else: # nothing to report: all arrays have the same shape
aerosols = diagfi_aerosols
gas_mix_ratio = diagfi_gas
temperature = diagfi_temperature
winds = diagfi_winds
try:
assert temperature.shape == pressure.shape
except:
raise ValueError("The shapes of temperature %s and pressure %s are not equal. If you do not have time-dependant data, please ensure that you have set 't' to False."%(temperature.shape, pressure.shape))
try:
assert pressure.ndim == 3
except:
raise ValueError("The dimension of the pressure data is %s. If you do not have time-dependant data, please ensure that you have set 't' to False (%s currently)."%(pressure.shape, self.t))
"""Main output of this function: the input atmosphere (for transmission/lightcurve), from which we will get the altitude."""
self.input_atmosphere = InputAtmosphere(grid=grid, pressure=pressure, temperature=temperature, gas_mix_ratio=gas_mix_ratio, aerosols=aerosols, winds=winds)
# end of read_data()
[docs]
def grid_to_radians(self, f, grid):
"""Convert latitudes and longitudes to radians."""
try:
grid.mid_latitudes = np.radians(f.variables[self.var_dict["latitude"]])
self.info("Latitude read from file (%s to %s)."%(f.variables[self.var_dict["latitude"]][0], f.variables[self.var_dict["latitude"]][-1]))
except:
pass
try:
grid.mid_longitudes = np.radians(f.variables[self.var_dict["longitude"]][:-1])
self.info("Longitude read from file (%s to %s)."%(f.variables[self.var_dict["longitude"]][0], f.variables[self.var_dict["longitude"]][-1]))
except:
pass
try:
lon_units = None
lon_units = f.variables[self.var_dict["longitude"]].units
assert "degrees" in lon_units
except:
self.warning("Longitude units (%s) not recognized as degrees. However, we still transform it from degrees to radians. If your data (latitude included) is in radians, please convert it to degrees."%lon_units)
[docs]
def read_gases(self, f, controle):
"""Read gases from :attr:`f` file descriptor using :attr:`gas_dict`."""
diagfi_gas = {}
for gas, key in self.gas_dict.items():
try:
try:
diagfi_gas[gas] = np.asarray(self.get_var(f.variables[key]))
except:
self.error("Could not read timestep %s of %s. Reading timestep 0."% (self.t, gas))
diagfi_gas[gas] = np.asarray(self.get_var(f.variables[key], 0))
"""Convert from log space to normal space."""
diagfi_gas[gas] = convert_log(diagfi_gas[gas], self.gas_units)
"""Convert the data to VMR if needed."""
if self.gas_units is None and 'units' not in f.variables[key].__dict__:
# No units. If positive, we consider it is already a VMR, else we consider it is a log10 of a VMR.
if (diagfi_gas[gas] < 0).all():
self.warning("No gas units given. The data is negative so we will assume it is a VMR given in log10. Please add units in the input file if it is not, or set gas_units to 'vmr' or 'mmr' or 'log_vmr' or 'log_mmr'.")
diagfi_gas[gas] = np.power(10,diagfi_gas[gas])
elif (self.gas_units is not None and "mmr" in self.gas_units)\
or (self.gas_units is None and not f.variables[key].units == 'm^3/m^3'):
# The unit is supposed to be a MMR. We will need a molar mass for that.
try:
if not hasattr(self, "input_mu"):
self.input_mu = float(controle[7]/1000) # conversion g/mol to kg/mol
except:
self.error("Could not find input mu in 'controle'. Cannot convert %s from MMR to VMR (given unit is '%s')." % (gas,f.variables[key].units))
continue
diagfi_gas[gas] *= self.input_mu / xk.Molar_mass().fetch(gas)
except:
self.warning("Could not read %s or its units from input file. If you didn't set any unit, we will read it as a VMR."%key)
return diagfi_gas
[docs]
def read_aerosols(self, f):
"""Read aerosols from diagfi using :attr:`aerosols_dict`. Each key should be composed of the name of the aerosol + the entry it corresponds to. The entries we will try to find are: :code:`["mmr", "reff", "condensate_density", "p_min", "optical_properties"]`.
Example of a valid dictionary: {'H2O': 'h2o_ice', 'H2O_reff': 'H2Oice_reff'}."""
diagfi_aerosols = {}
if self.aerosols_dict is None:
self.aerosols_dict = {}
# These are the keys we read in the diagfi
keys_to_read = ["mmr", "reff", "condensate_density", "p_min", "optical_properties"]
for aerosol, key in self.aerosols_dict.items():
entry = "mmr" # in case of dictionary {'H2O': 'h2o_ice'}
for var in keys_to_read: # try to find which entry is `aerosol`
if aerosol.endswith("_"+var):
aerosol = aerosol[:-(len(var)+1)]
entry = var
break
if aerosol not in diagfi_aerosols.keys():
diagfi_aerosols[aerosol] = {}
diagfi_aerosols[aerosol][entry] = np.asarray(self.get_var(f.variables[key]))
"""Convert from log space to normal space."""
if entry == "mmr":
diagfi_aerosols[aerosol][entry] = convert_log(diagfi_aerosols[aerosol][entry], self.aerosols_units)
ng = np.where(diagfi_aerosols[aerosol][entry] < 0)
if not np.isclose(diagfi_aerosols[aerosol][entry][ng], 0).all():
self.error("Found negative values for aerosols %s. Min is %s. Will probably crash... Please remove these negative values if this is an error."%(entry, diagfi_aerosols[aerosol][entry][ng].min()))
diagfi_aerosols[aerosol][entry][ng] = 0 # rounding errors
return diagfi_aerosols
[docs]
def read_winds(self, f, grid):
"""Read winds from diagfi using :attr:`winds_dict`. Each key should be composed of the name of the aerosol + the entry it corresponds to. The entries we will try to find are: :code:`["mmr", "reff", "condensate_density", "p_min", "optical_properties"]`.
Example of a valid dictionary: {'H2O': 'h2o_ice', 'H2O_reff': 'H2Oice_reff'}."""
diagfi_winds = np.zeros(grid.shape + (3,))
if self.doppler:
u = np.asarray(self.get_var(f.variables[self.var_dict["u"]]))
v = np.asarray(self.get_var(f.variables[self.var_dict["v"]]))
w = np.asarray(self.get_var(f.variables[self.var_dict["w"]]))
if v.shape[1]>u.shape[1]:
v = v[:,:-1]
if w.shape[0]>u.shape[0]:
w = w[:-1]
diagfi_winds = np.stack((u,v,w), axis=-1)
return diagfi_winds
[docs]
def get_var(self, var, t=None, ndim=3):
"""Try to fit the dimensions of one variable read from the input file. If the attribute :attr:`extra_longitude` of the class is set, we remove one longitude. If :attr:`t`: is not None nor False, we take the corresponding timestep in the simulation.
Args:
var (ndarray): Raw array read from the input file.
t (int, optional): Timestep to read in input file. If None, uses the attribute :attr:`t: of the class. If False, does not read any timestep. Defaults to None.
ndim (int, optional): Check if the output variable has the required number of dimensions `ndim`. Defaults to 3.
Returns:
ndarray: Array at timestep `t` and that should have `ndim` dimensions.
"""
if self.extra_longitude and var.shape[-1] > 1:
var = var[..., :-1]
if t is None:
t = self.t
if t is not None and t is not False:
var = var[t]
if var.ndim < ndim:
self.warning("The number of dimensions of the data is %s after reading (%s required). If this is not intended, it may be that you have time-independant data (set 't' to False then)."%(var.ndim, ndim))
return var
[docs]
def extend_bt(array, surf=None):
if surf is None:
surf = array[:1]
return np.concatenate([surf, array, array[-1:]])
[docs]
def extend_b(array, surf=None):
if surf is None:
surf = array[:1]
return np.concatenate([surf, array])