Source code for exo_k.atm_evolution.atm_evol

# -*- coding: utf-8 -*-
"""
@author: jeremy leconte
"""
import pickle
import copy
import numpy as np
from exo_k.gas_mix import Gas_mix
from exo_k.atm import Atm
from exo_k.atm_2band import Atm_2band
from exo_k.util.cst import DAY, RGP
from .settings import Settings
from .tracers import Tracers
from .convection import turbulent_diffusion_numba, molecular_diffusion_numba, \
                convective_acceleration_numba, moist_convective_adjustment_numba, compute_condensation_numba, \
                compute_rainout_numba, moist_convective_adjustment_cloud_fraction_numba
from .condensation import Condensing_species, Condensation_Thermodynamical_Parameters 
#from .condensation_gcm import Condensing_species, Condensation_Thermodynamical_Parameters 
# The line above allows one to select the same cthermodynamics as in the LMDZ GCM. To be changed accordingly in convection.py

[docs] class Atm_evolution(object): """Model of atmospheric evolution. Uses exo_k.Atm class to compute radiative transfer """ def __init__(self, bg_vmr=None, verbose=False, **kwargs): """Initializes atmospheric profiles. Most arguments are passed directly to exo_k.Atm class through **kwargs .. warning:: Layers are counted from the top down (increasing pressure order). All methods follow the same convention. """ self.settings = Settings() self.settings.set_parameters(**kwargs) self.header={'rad':0,'conv':1,'turb':2,'cond':3,'madj':4,'rain':5,'tot':6} # setup background gas and thermodynamical properties self.bg_gas = Gas_mix(bg_vmr) self.M_bg = self.bg_gas.molar_mass() self.M_bg = self.settings.use_or_set('M_bg', self.M_bg) self.cp = self.bg_gas.cp() self.cp = self.settings.pop('cp', self.cp) self.rcp = RGP/(self.M_bg*self.cp) self.rcp = self.settings.use_or_set('rcp', self.rcp) if (not isinstance(self.rcp, float)) or (not isinstance(self.cp, float)): print('None of rcp or cp should be arrays. If you provided arrays') print('for the background gas composition, you should also') print('provide the effective cp and rcp for your atmosphere.') raise RuntimeError('None of rcp or cp should be arrays.') if verbose: print('cp, M_bg, rcp:', self.cp, self.M_bg, self.rcp) # setup tracers self.tracers = Tracers(self.settings, bg_vmr = self.bg_gas.composition, **self.settings.parameters) self.initialize_condensation(**self.settings.parameters) self.setup_radiative_model(gas_vmr = self.tracers.gas_vmr, **self.settings.parameters) self.Nlay = self.atm.Nlay self.tlay = self.atm.tlay self.compute_hybrid_coordinates() if verbose: print(self.settings.parameters) self.evol_tau = 0.
[docs] def set_options(self, reset_rad_model = False, check_keys = True, cp = None, verbose = False, **kwargs): """This method is used to store the global options in the `Settings` object. Arguments are all passed through **kwargs. Sometimes, one needs to reset the radiative model to take into account some modifications (like the databases). Normally, this should be automatic, but you can force it with `reset_rad_model=True` """ if check_keys: for key in kwargs.keys(): if key in self.settings._forbidden_changes: print('Warning!!! ', key, ' cannot be changed by set_options.') print('You should probably initialize a new Atm_evolution instance.') print('Use check_keys = False to remove this warning.') if 'tlay' not in kwargs.keys(): self.settings.set_parameters(tlay=self.tlay, logplay=self.atm.logplay, **kwargs) else: self.settings.set_parameters(**kwargs) if cp is not None: self.cp = cp if 'radiative_acceleration' in kwargs.keys(): print("'radiative_acceleration' is deprecated. Please use acceleration_mode instead.") print("acceleration_mode = 1 will emulate the previous behavior but other modes exist.") print("In particular acceleration_mode = 4 will also accelerate convergence in convective zones") raise DeprecationWarning('radiative_acceleration is deprecated. Remove radiative_acceleration from the options to get rid of this message') if not set(kwargs.keys()).issubset(self.settings._non_radiative_parameters): reset_rad_model = True if verbose: print('Radiative model will be reset.') if reset_rad_model: self.setup_radiative_model(gas_vmr = self.tracers.gas_vmr, **self.settings.parameters)
[docs] def initialize_condensation(self, condensing_species = None, **kwargs): """This method initializes the condensation module by listing all the condensing vapors and linking them to their condensed form. For each vapor-condensate pair, a :class:`Condensible_species` object is created with the thermodynamical data provided. Here is an example of dictionary to provide as input to include CH4 condensation ``` condensing_species={'ch4':{'Latent_heat_vaporization': 5.25e5, 'cp_vap': 2.232e3, 'Mvap': 16.e-3, 'T_ref': 90., 'Psat_ref': 0.11696e5}} ``` """ if condensing_species is None: condensing_species = {} self.condensing_pairs = list() self.condensing_pairs_idx = list() self.condensing_species_idx = dict() self.condensing_species_params=list() self.condensing_species_thermo=list() idx = 0 for name in self.tracers.namelist: if 'type' in self.tracers.dico[name]: if self.tracers.dico[name]['type'] == 'vapor': if 'condensed_form' not in self.tracers.dico[name]: print("You should identify the 'condensed_form' of:", name) raise RuntimeError() elif self.tracers.dico[name]['condensed_form'] not in self.tracers.namelist: print("The condensed form of a vapor should be a tracer.") raise RuntimeError() elif name in condensing_species.keys(): cond_name = self.tracers.dico[name]['condensed_form'] self.condensing_species_idx[name]=idx self.condensing_pairs.append([name, cond_name]) self.condensing_pairs_idx.append(\ [self.tracers.idx[name], self.tracers.idx[cond_name]]) self.condensing_species_params.append(\ Condensing_species(**condensing_species[name])) self.condensing_species_thermo.append(\ Condensation_Thermodynamical_Parameters(**condensing_species[name])) idx+=1 else: print("The thermodynamic parameters for:", name,'were not provided') print('through condensing_species = {}.') raise RuntimeError() self.Ncond=idx self.total_cloud_fraction = np.ones(self.Ncond)
[docs] def setup_radiative_model(self, k_database=None, k_database_stellar=None, cia_database=None, cia_database_stellar=None, gas_vmr=None, **kwargs): """This method initializes the exo_k.Atm object that will be used to carry out radiative transfer calculations. This is where the radiative data used are chosen and transmitted to the radiative transfer module, along with many other parameters including the incoming stellar flux (`flux_top_dw`), the blackbody temperature of the star (`Tstar`), the If a `k_database_stellar` is provided, then this is this database that will be used to treat the scattering and absorption of incoming radiation. In that case, `k_database` will be used to treat the emission of the atmosphere. The effective cos(zenith angle) for the incoming stellar radiation can then be specified independently with the `mu0_stellar` keyword. If no `k_database_stellar` is provided, `k_database` will be used to treat both the atmospheric emission and the stellar radiation. Running a model with `k_database_stellar=k_database` yields the same results at twice the cost. Parameters ---------- k_database, k_database_stellar: `exo_k.Kdatabase` objects radiative database for the molecules in the atmospheres. cia_database, cia_database_stellar: `exo_k.CIA_database` object radiative database for the CIA ofmolecules in the atmospheres. """ if k_database is None: raise RuntimeError('We need at least a k_database') if k_database_stellar is None: self.atm = Atm(k_database=k_database, cia_database=cia_database, composition=gas_vmr, Mgas=self.tracers.Mgas, **kwargs) else: raise DeprecationWarning("k_database_stellar is deprecated. Proceed at your own risk.") self.atm = Atm_2band(k_database=k_database, cia_database=cia_database, k_database_stellar=k_database_stellar, cia_database_stellar=cia_database_stellar, composition=gas_vmr, **kwargs) H, net = self.atm.heating_rate(compute_kernel=True, Mgas=self.tracers.Mgas, **kwargs)
[docs] def compute_average_fluxes(self): """Use the averaged heating rates to compute the various fluxes (W/m^2) at the level interfaces. These fluxes are positive when the energy flows upward. To be consistent with radiative fluxes, the first array value corresponds to the top of atmosphere and should be 0 in most cases. The last value corresponds to the flux between the deepest layer (considered to be the surface) and the layer just above. """ self.Fnet = np.zeros((7, self.Nlay)) self.Fnet[0] = self.Fnet_rad for ii in range(1,6): self.Fnet[ii]=np.concatenate([[0.], np.cumsum(self.H_ave[ii]*self.atm.dmass)[:-1]]) self.Fnet[-1] = np.sum(self.Fnet, axis=0)
[docs] def evolve(self, N_timestep=1, N_kernel=10000, timestep_factor=1., dT_max=100., verbose=False, check_cons=False, **kwargs): r"""The time variable used in the model is tau=t/cp. The equation we are solving in each layer is thus .. math:: c_p \frac{d T}{d t}= \frac{d T}{d tau} = \sum_i H_i For a given timestep `dtau`, the physical time elapsed in second can be computed using `dt=dtau*cp` To work, the heating rates (H) must be computed in W/kg. This also means that if one needs the physical rate of change of another quantity (like dq/dt) from the delta q over a time step, one needs to do `dq/dt = delta q / (timestep * cp)` Parameters ---------- N_timestep: int Number of timesteps to perform. N_kernel: int Maximal number of timesteps between two computations of the radiative kernel. timestep_factor: float Multiplicative factor applied to timestep computed automatically by the radiative module. timestep_factor > 1 can lead to unstabilities. dT_max: float Maximum temperature increment in a single timestep. """ if self.atm.k_database is None: print('This Atm_evolution instance is not linked to any k_database') print('Use self.set_options(k_database=, ...)') raise RuntimeError('No k_database provided.') self.H_ave = np.zeros((7, self.Nlay)) self.tlay_hist = np.zeros((N_timestep,self.Nlay)) self.Fnet_top = np.zeros((N_timestep)) self.timestep_hist = np.zeros((N_timestep)) if check_cons: self.nrj_cons = np.zeros((N_timestep,self.Nlay)) self.nrj_cons_dry = np.zeros((N_timestep,self.Nlay)) self.nrj_cons_cond = np.zeros((N_timestep,self.Nlay)) self.nrj_cons_rain = np.zeros((N_timestep,self.Nlay)) self.vapor_cons = np.zeros((N_timestep,self.Nlay)) self.cond_cons = np.zeros((N_timestep,self.Nlay)) self.cloud_fraction_hist = np.zeros((N_timestep)) tau0 = self.evol_tau self.N_last_ker = 0 compute_kernel = False dTlay_max = 2. * self.settings['dTmax_use_kernel'] self.tracers.update_gas_composition(update_vmr=True) for ii in range(N_timestep): if np.amax(np.abs(self.tlay-self.atm.tlay_kernel)) < self.settings['dTmax_use_kernel']: self.N_last_ker +=1 if verbose: print(self.N_last_ker, self.N_last_ker%N_kernel) compute_kernel = (self.N_last_ker%N_kernel == 0) if compute_kernel: self.N_last_ker = 0 else: if dTlay_max < 0.5 * self.settings['dTmax_use_kernel']: compute_kernel = True self.N_last_ker = 0 else: compute_kernel = False self.N_last_ker +=1 if ii == N_timestep-1: if ii != 0: compute_kernel=True self.H_tot=np.zeros(self.Nlay) if verbose: print('iter, compute_kernel:', ii, compute_kernel) if self.tracers.some_var_gases: gas_vmr_rad = self.tracers.gas_vmr else: gas_vmr_rad = None aer_reffs_densities = self.tracers.update_aerosol_properties(self.atm) self.H_rad, self.Fnet_rad = self.atm.heating_rate(compute_kernel=compute_kernel, rayleigh=self.settings['rayleigh'], dTmax_use_kernel=self.settings['dTmax_use_kernel'], gas_vmr=gas_vmr_rad, Mgas=self.tracers.Mgas, aer_reffs_densities=aer_reffs_densities, **kwargs) # if verbose and compute_kernel: print('H_rad', self.H_rad) self.H_tot += self.H_rad self.timestep = timestep_factor * self.atm.tau_rad #if verbose: print('tau_rad, dt:', self.atm.tau_rad, self.timestep) self.evol_tau += self.timestep if self.settings['convection']: self.H_conv = self.tracers.dry_convective_adjustment(self.timestep, self.H_tot, self.atm, verbose = verbose) self.H_tot += self.H_conv if check_cons: self.nrj_cons_dry[ii] += self.H_conv * self.atm.dmass else: self.H_conv = np.zeros(self.Nlay) if self.settings['diffusion']: if check_cons: self.vapor_cons[ii] -= self.tracers['H2O'] * self.atm.dmass self.cond_cons[ii] -= self.tracers['H2O_liq'] * self.atm.dmass self.H_turb = self.turbulent_diffusion(self.timestep, self.H_tot, self.atm, self.cp, index_dry_convective_top=self.tracers.index_dry_convective_top, Kzz_pressure_factor=self.settings['Kzz_pressure_factor'], verbose=verbose ) self.H_tot += self.H_turb if check_cons: self.nrj_cons_dry[ii] += self.H_turb * self.atm.dmass self.vapor_cons[ii] += self.tracers['H2O'] * self.atm.dmass self.cond_cons[ii] += self.tracers['H2O_liq'] * self.atm.dmass self.tracers.update_gas_composition(update_vmr=False) else: self.H_turb = np.zeros(self.Nlay) if self.settings['molecular_diffusion']: self.H_diff = self.molecular_diffusion(self.timestep, self.H_tot, self.atm, self.cp) self.H_tot += self.H_diff qarray_before_condensation = np.copy(self.tracers.qarray) if self.settings['moist_convection']: #if check_cons: # self.vapor_cons[ii] -= self.tracers['H2O'] * self.atm.dmass # self.cond_cons[ii] -= self.tracers['H2O_liq'] * self.atm.dmass self.H_madj = self.moist_convective_adjustment(self.timestep, self.H_tot, moist_inhibition=self.settings['moist_inhibition'], verbose = verbose) self.H_tot += self.H_madj if check_cons: if np.sum(self.H_madj * self.atm.dmass)<0.: print('madj<0:',self.H_madj, self.total_cloud_fraction[0]) self.nrj_cons[ii] += self.H_madj * self.atm.dmass #self.vapor_cons[ii] += self.tracers['H2O'] * self.atm.dmass #self.cond_cons[ii] += self.tracers['H2O_liq'] * self.atm.dmass self.cloud_fraction_hist[ii] = self.total_cloud_fraction[0] self.total_cloud_fraction[0] = 1. else: self.H_madj = np.zeros(self.Nlay) if self.settings['condensation']: #if check_cons: # self.vapor_cons[ii] -= self.tracers['H2O'] * self.atm.dmass # self.cond_cons[ii] -= self.tracers['H2O_liq'] * self.atm.dmass self.H_cond = self.condensation(self.timestep, self.H_tot, verbose = verbose) self.H_tot += self.H_cond if check_cons: self.nrj_cons_cond[ii] += self.H_cond * self.atm.dmass #self.vapor_cons[ii] += self.tracers['H2O'] * self.atm.dmass #self.cond_cons[ii] += self.tracers['H2O_liq'] * self.atm.dmass else: self.H_cond = np.zeros(self.Nlay) if self.settings['rain']: if check_cons: self.vapor_cons[ii] -= self.tracers['H2O'] * self.atm.dmass self.cond_cons[ii] -= self.tracers['H2O_liq'] * self.atm.dmass self.H_rain = self.rainout(self.timestep, self.H_tot, verbose = verbose) self.H_tot += self.H_rain if check_cons: self.nrj_cons_rain[ii] += self.H_rain * self.atm.dmass self.vapor_cons[ii] += self.tracers['H2O'] * self.atm.dmass self.cond_cons[ii] += self.tracers['H2O_liq'] * self.atm.dmass else: self.H_rain = np.zeros(self.Nlay) if self.settings['mass_redistribution']: self.mass_redistribution(qarray_before_condensation) if self.settings['surface_reservoir']: self.tracers.update_surface_reservoir(condensing_pairs_idx = self.condensing_pairs_idx, surf_layer_mass = self.atm.dmass[-1]) if self.settings['acceleration_mode'] > 0: self.radiative_acceleration(timestep = self.timestep, \ acceleration_mode = self.settings['acceleration_mode'], acceleration_top_pressure = self.settings['acceleration_top_pressure'], verbose = verbose) #self.H_tot *= self.acceleration_factor dTlay= self.H_tot * self.timestep dTlay_max = np.amax(np.abs(dTlay)) if dTlay_max > dT_max: print('dT > dTmax:',dTlay_max,' at k=',np.argmax(np.abs(dTlay))) if verbose: print('heat rates (rad, dry conv), dTmax:', np.sum(self.H_rad*self.atm.dmass), np.sum(self.H_conv*self.atm.dmass), dTlay_max) dTlay=np.clip(dTlay,-dT_max,dT_max) self.tlay = self.tlay + dTlay self.tlay_hist[ii] = self.tlay for jj, H in enumerate([self.H_rad, self.H_conv, self.H_turb, self.H_cond, self.H_madj, self.H_rain, self.H_tot]): self.H_ave[jj] += H * self.timestep self.Fnet_top[ii] = self.Fnet_rad[0] self.timestep_hist[ii] = self.timestep self.atm.set_T_profile(self.tlay) self.tracers.update_gas_composition(update_vmr=True) inv_delta_t = 1./(self.evol_tau-tau0) self.H_ave *= inv_delta_t self.compute_average_fluxes()
[docs] def equilibrate(self, Fnet_tolerance = None, N_iter_max = 10, N_timestep_ini = 100, N_timestep_max = 1000000, verbose = False, **kwargs): """Evolves an atmosphere until it is at equilibrium. Equilibrium is assumed to be reached when the net top of atmosphere flux remains within +-Fnet_tolerance of the internal flux for a whole evolution step. The number of timesteps per evolution step in multiplied by two at each iteration, starting from N_timestep_ini, until the limit of N_timestep_max is reached. Parameters ---------- Fnet_tolerance: float Tolerance on net flux in W/m^2 to identify convergence. N_iter_max: int Max number of successive calls to evolve N_timestep_ini: int Initial number of timesteps in a single evolution step N_timestep_max: int Max number of timesteps in a single evolution step """ iter=1 if Fnet_tolerance is None: raise RuntimeError('You should provide the maximum tolerance on the net flux: Fnet_tolerance (in W/m^2)') N_timestep = N_timestep_ini while iter <= N_iter_max: time_init = self.evol_tau self.evolve(N_timestep = N_timestep, **kwargs) net = self.Fnet_top - self.atm.internal_flux if verbose: print('iter: {iter}, N_timestep: {Nt}'.format(iter = iter, Nt = N_timestep)) print('Fnet mean: {fme:.3g} W/m^2, (min:{fmi:.3g}, max:{fma:.3g})'.format( \ fme = net.mean(), fmi = net.min(), fma = net.max())) print('timestep: {ts1:.3g} d | {ts2:.3g} s, total time: {ev_t:.3g} yr'.format( \ ts1 = self.timestep*self.cp/(DAY), ts2 = self.timestep*self.cp, ev_t = (self.evol_tau-time_init)*self.cp/(DAY*365.))) #print('timestep:',self.timestep*self.cp/(DAY),'days, ', # self.timestep*self.cp,'s, evol_time(yr):',(self.evol_tau-time_init)*self.cp/(DAY*365.)) if np.all(np.abs(net) < Fnet_tolerance): break N_timestep = min( N_timestep * 2, N_timestep_max) iter += 1
[docs] def moist_convective_adjustment(self, timestep, Htot, moist_inhibition = True, verbose = False): """This method computes the vapor and temperature tendencies do to moist convectoin in saturated layers. The tracer array in modified in place. Parameters ---------- timestep: float physical timestep of the current step (in s/cp). Htot: array, np.ndarray Total heating rate (in W/kg) of all physical processes already computed Return ------ H_madj: array, np.ndarray Heating rate due to large scale condensation (W/kg) """ new_t = self.atm.tlay + timestep * Htot H_madj = np.zeros(self.Nlay) for i_cond in range(self.Ncond): #careful i_cond is the index of the condensing pair # in the list of condensing species, idx_cond is the position of the # condensate linked to i_cond in the tracers array. idx_vap, idx_cond = self.condensing_pairs_idx[i_cond] thermo_parameters = self.condensing_species_thermo[i_cond].th_params if self.settings['humidity_distribution_width']<0.: H, qarray, new_t, self.total_cloud_fraction[i_cond] = moist_convective_adjustment_numba(timestep, self.Nlay, new_t, self.atm.play, self.atm.dmass, self.cp, self.tracers.Mgas, self.tracers.qarray, idx_vap, idx_cond, thermo_parameters, moist_inhibition = moist_inhibition, verbose = verbose) else: H, qarray, new_t, self.total_cloud_fraction[i_cond] = moist_convective_adjustment_cloud_fraction_numba(timestep, self.Nlay, new_t, self.atm.play, self.atm.dmass, self.cp, self.tracers.Mgas, self.tracers.qarray, idx_vap, idx_cond, thermo_parameters, moist_inhibition = moist_inhibition, humidity_distribution_width = self.settings['humidity_distribution_width'], verbose = verbose) #print('t after madj:', new_t) H_madj += H self.tracers.qarray = qarray if verbose: print(qarray[idx_cond]) return H_madj
[docs] def turbulent_diffusion(self, timestep, Htot, atm, cp, index_dry_convective_top=None, Kzz_pressure_factor=-1., verbose=False): """Mixes tracers following a diffusion equation with a constant Kzz parameter (self.Kzz in m^2/s). Parameters ---------- timestep: float physical timestep of the current step (in s/cp). (needs to be converted before it is sent to `turbulent diffusion`) Htot: array Total heating rate (in W/kg) of all physical processes already computed atm: :class:`Atm` object The Atm object used in the radiative transfer which contains many state variables. """ new_t = atm.tlay + timestep * Htot if self.settings['moist_inhibition']: Mgas_tmp = self.tracers.Mgas else: Mgas_tmp = np.mean(self.tracers.Mgas) Mgas_tmp = np.full_like(self.tracers.Mgas, Mgas_tmp) if Kzz_pressure_factor > 0.: p0 = self.atm.play[index_dry_convective_top] self.Kzz = self.settings['Kzz'] * np.where(self.atm.play>p0, 1., (self.atm.play/p0)**Kzz_pressure_factor) self.Kzz = np.core.umath.maximum(self.Kzz,self.settings['Kzz_min']) if verbose: print('Kzz=',self.Kzz) else: self.Kzz = np.ones(self.Nlay) * self.settings['Kzz'] H_turb, self.tracers.qarray = turbulent_diffusion_numba(timestep*cp, self.Nlay, atm.play, atm.plev, atm.dmass, new_t, atm.exner, new_t/Mgas_tmp, atm.grav, self.Kzz, self.tracers.qarray, cp, mix_potential_temp=self.settings['mix_potential_temp']) return H_turb
[docs] def molecular_diffusion(self, timestep, Htot, atm, cp): """Mixes energy following a diffusion equation with a constant Dmol parameter (self.Dmol in m^2/s). Parameters ---------- timestep: float physical timestep of the current step (in s/cp). (needs to be converted before it is sent to `turbulent diffusion) Htot: array, np.ndarray Total heating rate (in W/kg) of all physical processes already computed atm: :class:`Atm` object The Atm object used in the radiative transfer which contains many state variables. """ new_t = atm.tlay + timestep * Htot H_diff = molecular_diffusion_numba(timestep*cp, self.Nlay, atm.play, atm.plev, atm.dmass, new_t, self.tracers.Mgas, atm.grav, self.tracers.Dmol) return H_diff
[docs] def condensation(self, timestep, Htot, verbose = False): """This method computes the vapor and temperature tendencies do to large scale condensation in saturated layers. The tracer array in modified in place. Parameters ---------- timestep: float physical timestep of the current step (in s/cp). Htot: array, np.ndarray Total heating rate (in W/kg) of all physical processes already computed Return ------ H_cond: array, np.ndarray Heating rate due to large scale condensation (W/kg) """ new_t = self.atm.tlay + timestep * Htot H_cond = np.zeros(self.Nlay) for i_cond in range(self.Ncond): #careful i_cond is a dumy loop index, idx_cond is position of species i_cond in tracers array. idx_vap, idx_cond = self.condensing_pairs_idx[i_cond] thermo_parameters = self.condensing_species_thermo[i_cond].th_params H_cond += compute_condensation_numba(timestep, self.Nlay, new_t, self.atm.play, self.cp, self.tracers.Mgas, self.tracers.qarray, idx_vap, idx_cond, thermo_parameters, latent_heating = self.settings['latent_heating'], condensation_timestep_reducer = self.settings['condensation_timestep_reducer'], verbose = verbose) return H_cond
[docs] def rainout(self, timestep, Htot, verbose = False): """This method computes rainout. Condensates are carried down and reevaporated whenever there is "room" in an unsaturated layer. The option `evap_coeff` acts has an efficiency factor. `evap_coeff=1` is the efficient evaporation limit. When `evap_coeff<1` the maximum amount of condensates that can be reevaporated in a single layer is multiplied by `evap_coeff` All condensates are finaly evaporated in the last layer or when T > Tboil. The tracer array is modified in place. Parameters ---------- timestep: float physical timestep of the current step (in s/cp). Htot: array, np.ndarray Total heating rate (in W/kg) of all physical processes already computed Return ------ H_rain: array, np.ndarray Heating rate due to re evaporation (W/kg) """ new_t = self.atm.tlay + timestep * Htot H_rain=np.zeros(self.Nlay) for i_cond in range(self.Ncond): idx_vap, idx_cond = self.condensing_pairs_idx[i_cond] thermo_parameters = self.condensing_species_thermo[i_cond].th_params H_rain += compute_rainout_numba(timestep, self.Nlay, new_t, self.atm.play, self.atm.dmass, self.cp, self.tracers.Mgas, self.tracers.qarray, idx_vap, idx_cond, thermo_parameters, self.settings['evap_coeff'], self.tracers.qdeep[idx_vap], q_cloud=self.settings['q_cloud'], latent_heating = self.settings['latent_heating'], total_cloud_fraction = self.total_cloud_fraction[i_cond], humidity_distribution_width = self.settings['humidity_distribution_width'], verbose = verbose) return H_rain
[docs] def compute_hybrid_coordinates(self): """Compute hybrid coordinates as in GCM. This will be used when surface pressure changes. Convention : sigma = (p-ptop)/(psurf-ptop) For each layer/level, the pressure is p = sigma * psurf + gamma """ psurf = self.atm.plev[-1] ptop = self.atm.plev[0] self.sigma_lay = (self.atm.play-ptop)/(psurf-ptop) self.gamma_lay = (1.-self.sigma_lay)*ptop self.sigma_lev = (self.atm.plev-ptop)/(psurf-ptop) self.gamma_lev = (1.-self.sigma_lev)*ptop self.dsigma_lev = np.diff(self.sigma_lev)
[docs] def compute_mass_flux(self, dvapor_mass, sum_dvapor_mass): """Computes the mass flux through the hybrid coordinate interfaces (kg/s/m^2; positive upward). (see Methods in Leconte et al. (2013)) W[k] is the mass flux between layer k-1 et k. Parameters ---------- sum_dvapor_mass: float Total mass of vapor added to the atmosphere in the last timestep. dvapor_mass: array, np.ndarray mass of vapor added to each layer. For the moment, W[0] = W[Nlay] """ W = np.zeros(self.Nlay+1) #W[0] = 0. #No mass flux between the top of the atm and space W[1:] = np.cumsum(self.dsigma_lev * sum_dvapor_mass - dvapor_mass) W[-1] = 0. #No mass flux between the surface and the atm return W
[docs] def mass_redistribution(self, qarray_before_condensation): """Update new mass and new pressure of a layer due to the evaporation and condensation of a given species, for more details see Methods, Leconte et al., 2013 (Nature) Parameters ---------- """ if self.Ncond > 1: raise NotImplementedError('Mass redistribution limited to one condensing species') #Pour juste l'eau for i_cond in range(self.Ncond): idx_vap, idx_cond = self.condensing_pairs_idx[i_cond] dq_mass_redist = np.zeros_like(qarray_before_condensation) variation_qarray = self.tracers.qarray - qarray_before_condensation #Evolution of qarray before and after condensation/rain/moist_convection steps dvapor_mass = self.atm.dmass*variation_qarray[idx_vap] #Need to use idx_vap because dvapor_mass sum_dvapor_mass = np.sum(dvapor_mass) dcond_mass = self.atm.dmass*variation_qarray[idx_cond] self.dpsurf = sum_dvapor_mass*self.atm.grav dgas_mass = self.dsigma_lev*self.dpsurf/self.atm.grav if self.settings['compute_mass_fluxes']: self.W = self.compute_mass_flux(dvapor_mass, sum_dvapor_mass) for indice, q in enumerate(self.tracers.qarray): if indice == idx_vap: #Condensible gas in vapor form epsilon = dvapor_mass elif indice == idx_cond: #Condensible gas in condensed form epsilon = dcond_mass else: #Other tracers epsilon = 0. qarray_lev = np.zeros(self.Nlay+1) qarray_lev[1:-1] = (q[1:] + q[:-1]) / 2 #We choose the arithmetic mean value of the qarray as the value of the qarray at the middle of the levels ie q_(k+1/2) qarray_transport_through_sigma_lev = (qarray_lev[1:]-qarray_before_condensation[indice])*self.W[1:] \ - (qarray_lev[:-1]-qarray_before_condensation[indice])*self.W[:-1] #Attention si W à la surface est non nulnp.diff(self.qarray_lev*self.W) dq_mass_redist[indice] = (1./(self.atm.dmass+dgas_mass))* \ (qarray_transport_through_sigma_lev+epsilon-qarray_before_condensation[indice]*dvapor_mass) self.tracers.qarray[indice] = np.abs(qarray_before_condensation[indice] + dq_mass_redist[indice]) # carefull abs should be removed else: self.W = np.zeros(self.Nlay+1) for indice, q in enumerate(qarray_before_condensation): if indice==idx_vap: #Condensible gas in vapor form epsilon = dvapor_mass elif indice==idx_cond: #Condensible gas in condensed form epsilon = dcond_mass else: #Other tracer epsilon = 0. dq_mass_redist[indice] = (1./(self.atm.dmass+dgas_mass))*(epsilon-q*dgas_mass) self.tracers.qarray[indice] = qarray_before_condensation[indice] + dq_mass_redist[indice] plev = self.sigma_lev*(self.atm.psurf+self.dpsurf)+self.gamma_lev play = self.sigma_lay*(self.atm.psurf+self.dpsurf)+self.gamma_lay self.atm.update_pressure_profile(play = play, plev = plev)
[docs] def radiative_acceleration(self, timestep = 0., acceleration_mode = 0, acceleration_top_pressure = None, verbose = False, **kwargs): """"Computes an acceleration factor and a new heating rate to speed up convergence Parameters ---------- acceleration_mode: int 0: no acceleration 1 or 3: acceleration limited to radiative zones. 2 or 4: acceleration in convective zones as well. * 1: acceleration limited to radiative zones. The largest radiative timescale in non-radiative zones is used as reference radiative timsescale to compute acceleration. * 2: Same as mode 1 + acceleration in convective zones * 3 acceleration limited to radiative zones. The smallest radiative timescale in radiative zones is used as reference radiative timsescale to compute acceleration. * 4: Same as mode 3 + acceleration in convective zones """ self.acceleration_factor = np.ones_like(self.H_tot) rad_layers = np.isclose(self.H_tot, self.H_rad, atol=0.e0, rtol=1.e-10) rad_layers_for_convection = np.copy(rad_layers) if acceleration_top_pressure is not None: rad_layers[np.where(self.atm.play < acceleration_top_pressure)[0]] = False rad_layers_for_convection[np.where(self.atm.play < acceleration_top_pressure)[0]] = True # determines which layer is purely radiative if verbose: print('in acc rad_layers, H_tot, H_rad') print(rad_layers) print(np.transpose([rad_layers, self.H_tot,self.H_rad])) if (np.all(rad_layers)) or (not(np.any(rad_layers))): self.base_timescale = self.atm.tau_rad # we do not use timestep to avoid including the timestep_factor else: if acceleration_mode <= 2: self.base_timescale = np.amax(self.atm.tau_rads[np.logical_not(rad_layers)]) elif acceleration_mode <= 4: self.base_timescale = np.amin(self.atm.tau_rads[rad_layers]) else: raise NotImplementedError('Only acceleration_mode <= 4 is supported for the moment') self.acceleration_factor[rad_layers] = np.core.umath.maximum( self.atm.tau_rads[rad_layers] \ / self.base_timescale * self.settings['radiative_acceleration_factor'], 1.) if (acceleration_mode == 2) or (acceleration_mode == 4): self.H_acc = convective_acceleration_numba(timestep, self.Nlay, self.H_rad, rad_layers_for_convection, self.atm.tau_rad, self.atm.tau_rads, self.atm.dmass, convective_acceleration_mode = self.settings['convective_acceleration_mode'], convective_acceleration_factor = self.settings['convective_acceleration_factor'], verbose = verbose) else: self.H_acc = np.zeros(self.Nlay) if verbose: print('in acc acceleration_factor, H_tot, H_acc') print(np.transpose([self.acceleration_factor, self.H_tot, self.H_acc])) self.H_tot = self.H_tot * self.acceleration_factor + self.H_acc
@property def time(self): """Yields current time in seconds """ return self.evol_tau * self.cp @property def time_hist(self): """Yields the array of the times for the last call to evolve (in seconds) """ return np.cumsum(self.timestep_hist) * self.cp
[docs] def heating_rate(self, physical_process): """Returns heating rates in W/kg per layer averaged over last call to evolve. Possible physical_processes are rad, cond, conv, rain, madj, tot""" return self.H_ave[self.header[physical_process]]
[docs] def net_flux(self, physical_process): """Returns net_flux in W/m^2 averaged over last call to evolve. Possible physical_processes are rad, cond, conv, rain, madj, tot""" return self.Fnet[self.header[physical_process]]
[docs] def qsat(self, mol): """Returns the saturation specific concentration of molecule mol (kg/kg)""" cond_species_param = self.condensing_species_params[self.condensing_species_idx[mol]] psat = cond_species_param.Psat(self.atm.tlay) qsat = cond_species_param.qsat(psat, self.atm.play, cond_species_param.Mvap/self.tracers.Mgas) return qsat
[docs] def interpolate_profile(self, logplay, adiabatic_extrapolation=True, **kwargs): """Re interpolates the current atmosphere on a new log pressure grid. Extrapolation is isothermal at the top and can be adiabatic at the bottom Parameters ---------- logplay: array New log pressure grid adiabatic_extrapolation: bool Whether or not to extrapolate using the adiabat below the bottom """ self.tracers.interpolate_tracers_profile(logplay, self.atm.logplay) self.atm.interpolate_atm_profile(logplay, adiabatic_extrapolation=adiabatic_extrapolation, Mgas=self.tracers.Mgas,gas_vmr=self.tracers.gas_vmr, **kwargs) self.tlay = self.atm.tlay self.Nlay = self.atm.Nlay self.settings.set_parameters(logplay=logplay, tlay=self.tlay)
[docs] def write_pickle(self, filename, data_reduction_level = 1): """Saves the instance in a pickle file Parameters ---------- filename: str Path to pickle file data_reduction_level: int Level of data to delete. 0: keep everything (results in big files). 1: removes some arrays, should not affect subsequent evolution. 2: removes the k and cia databases. The radiative model will need to be reset. This can be done with `set_options(k_database=, cia_database=, reset_rad_model=True)` after re-loading the `Atm_evolution` instance. """ other = copy.deepcopy(self) if data_reduction_level >=1 : other.tlay_hist = None other.atm.asym_param = None other.atm.kdata = None other.atm.tau = None other.atm.dtau = None other.atm.flux_down_nu = None other.atm.flux_net_nu = None other.atm.flux_up_nu = None other.atm.piBatm = None other.atm.single_scat_albedo = None other.atm.gas_mix.kdata_scat = None if data_reduction_level >=2 : other.settings['k_database'] = None other.settings['cia_database'] = None other.atm.k_database = None other.atm.gas_mix.k_database = None other.atm.gas_mix.cia_database = None other.atm.kernel = None other.atm.tlay_kernel = None other.atm.H_kernel = None with open(filename, 'wb') as filehandler: pickle.dump(other, filehandler)