Source code for oap_model.psd

# Particle size distribution model
# Author: Oliver Driver
# Date: 01/06/2023

from collections import namedtuple
from random import choices
from abc import ABC, abstractmethod
from enum import Enum
import logging

import numpy as np
from numpy.random import randint
from numpy.typing import ArrayLike

from .ast_model import ASTModel
from .diameters import measure_diameters


Particle = namedtuple("Particle", ["diameter", "angle", "model"])
PositionedParticle = namedtuple("PositionedParticle", ["diameter",  "angle", "model", "position"])

[docs]class CrystalModel(Enum): """Enum for crystal types.""" SPHERE = 1 RECT_AR5 = 2 ROS_6 = 3 COL_AR5_ROT = 4 RECT_AR5_ROT = 5 def __init__(self, *args) -> None: self.model_names = { 1: "Sphere", 2: "Rectangular with aspect ratio 5", 3: "Rosette with 6 arms, 100µm width", 4: "Rectangular with aspect ratio 5, rotated in 3D", 5: "Rectangular with aspect ratio 5, rotated in plane" } super().__init__(*args) def __str__(self): return self.model_names[self.value]
[docs] def get_generator(self): if self == CrystalModel.SPHERE: return lambda particle, **kwargs: ASTModel.from_diameter(particle.diameter*1e6, **kwargs) elif self == CrystalModel.RECT_AR5: return lambda particle, **kwargs: ASTModel.from_diameter_rectangular(particle.diameter*1e6, 5, **kwargs) elif self == CrystalModel.RECT_AR5_ROT: return lambda particle, **kwargs: ASTModel.from_diameter_rectangular(particle.diameter*1e6, 5, angle=particle.angle[0], **kwargs) elif self == CrystalModel.COL_AR5_ROT: return lambda particle, **kwargs: ASTModel.from_diameter_rectangular(particle.diameter*1e6, 5, angle=particle.angle, **kwargs) elif self == CrystalModel.ROS_6: return lambda particle, **kwargs: ASTModel.from_diameter_rosette(particle.diameter*1e6, 3, width=40, **kwargs) else: raise ValueError("Crystal model not recognised.")
[docs] def min_diameter(self, pixel_size): model_generator = self.get_generator() diameter = 0 while True: model = model_generator(Particle(diameter,(0,0),self), pixel_size=pixel_size) if model.opaque_shape.any(): break diameter += pixel_size/10 return diameter
[docs]def rejection_sampler(p, xbounds, pmax): """Returns a value sampled from a bounded probability distribution. Args: p (callable): The probability distribution function. xbounds (list): The bounds of the distribution. pmax (float): The maximum value of the probability distribution.""" while True: # Generate a random x and y value x = np.random.rand(1) * (xbounds[1] - xbounds[0]) + xbounds[0] y = np.random.rand(1) * pmax # If the y value is below the probability distribution, return the x value if y <= p(x): return x
[docs]class PSD(ABC): """Base class for particle size distribution objects.""" def __init__(self, bins: list[float] = None, model: CrystalModel = CrystalModel.SPHERE): if bins is None and getattr(self,"xlim", None) is not None: lower_lim = 5e-7 if self.xlim[0] == 0 else self.xlim[0] bins = np.arange(lower_lim, self.xlim[1], 5e-7) elif bins is None: bins = np.arange(5e-7, 1e-3, 5e-7) self.bins = bins self.model = model
[docs] @abstractmethod def dn_dd(self, d: ArrayLike) -> np.ndarray: """Calculate the particle size distribution value given diameters. """ pass
[docs] def moment(self, n): """Calculate the nth moment of the PSD.""" return np.trapz(self.dn_dd(self.midpoints) * (self.midpoints ** n), self.midpoints)
@property def binned_distribution(self): """Calculate the binned particle size distribution. Returns: np.ndarray: The number of particles in each bin. """ return self.dn_dd(self.midpoints) * (np.diff(self.bins)) @property def midpoints(self): """Calculate the binned particle size distribution. Returns: np.ndarray: The number of particles in each bin. """ midpoints = (self.bins[1:] + self.bins[:-1]) / 2 return midpoints @property def max_dn_dd(self): """Calculate the maximum value of the particle size distribution.""" return self.dn_dd(self.midpoints).max()
[docs] def generate_diameters(self, n_particles) -> tuple[list[float], list[ASTModel]]: """Generate a particle diameter from the PSD.""" diameters = choices( self.midpoints, weights=self.binned_distribution, k=n_particles ) return diameters, [self.model] * n_particles
@property def total_number_density(self) -> float: """Calculate the total number density of particles.""" # Uses the analytical expression for the integral of the gamma distribution # return self.intercept * self.slope ** (-self.shape - 1) * np.math.gamma(self.shape + 1) return self.binned_distribution.sum()
[docs] def plot(self, ax, retrieval:"Retrieval"=None, **kwargs): """Plot the PSD value against diameter.""" if retrieval is None: x_vals = self.bins else: x_vals = self.adjusted_bins(retrieval)# FIXME: this does not change the dn/dD values to account for the change in bin width... handle = ax.plot(x_vals, self.dn_dd(self.bins), **kwargs) # ax.set_xscale('log') # ax.set_yscale('log') ax.set_xlabel('Diameter (m)') ax.set_ylabel('PSD ($\mathrm{m}^{-3}\,\mathrm{m}^{-1}$)') return handle
[docs] def adjusted_bins(self, retrieval): # for each bin edge, calculate the focused diameter adjusted_bins = [] for diameter in self.bins: particle = Particle(diameter, (0,0), self.model) model = self.model.get_generator()(particle, wavenumber=2*np.pi/retrieval.run.detector.wavelength, pixel_size=retrieval.run.detector.pixel_size) focused_amp = model.process(0) diameter = measure_diameters(focused_amp, retrieval.spec, force_nominsep=True) if len(diameter) > 1: raise ValueError("Multiple diameters found in focused image... Something is very wrong.") elif len(diameter) == 0: adjusted_bins.append(0) else: adjusted_bins.append(next(iter(diameter.values()))) return np.array(adjusted_bins) * 1e-6
[docs]class CompositePSD(PSD): """A composite particle size distribution object. Contains the distribution parameters, and binning information. Args: psds (list[PSD]): The list of PSDs to combine. bins (np.ndarray, optional): The bin edges in metres. Defaults to 100 bins between :math:`1\times10^{-7}` and :math:`1\times10^{-7}`. """ def __init__(self, psds: list[PSD], bins: list[float] = None): self.psds = psds self.psd_weights = [psd.total_number_density for psd in psds] self.xlim = (min([psd.xlim[0] for psd in psds]), max([psd.xlim[1] for psd in psds])) super().__init__(bins=bins)
[docs] def dn_dd(self, d: ArrayLike) -> np.ndarray: """Calculate the particle size distribution value given diameters. """ return sum([psd.dn_dd(d) for psd in self.psds])
[docs] def generate_diameters(self, n_particles) -> tuple[list[float], list[ASTModel]]: """Generate a particle diameter from the PSD.""" n_from_psd = [int(n_particles * weight / self.total_number_density) for weight in self.psd_weights] #check sum if sum(n_from_psd) != n_particles: n_from_psd[randint(0,len(self.psds))] += n_particles - sum(n_from_psd) diameters = np.array([]) models = [] for psd, n in zip(self.psds, n_from_psd): psd_diameters, psd_models = psd.generate_diameters(n) diameters = np.append(diameters, psd_diameters) models += psd_models return diameters, models
[docs]class GammaPSD(PSD): r"""Gamma particle size distribution object. Contains the distribution parameters, and binning information. .. math:: n_N(r) = N_0 \left(\frac{r}{\text{1 m}}\right)^\mu \mathrm{e}^{-\Lambda r}. Args: intercept (float): :math:`N_0` in :math:`\mathrm{m^{-3}}`. slope (float): :math:`\Lambda` in :math:`\mathrm{m^{-1}}`. shape (float): :math:`\mu`. bins (np.ndarray, optional): The bin edges in metres. Defaults to 100 bins between :math:`1\times10^{-7}` and :math:`1\times10^{-7}`. """ def __init__(self, intercept: float, slope: float, shape: float, bins: list[float] = None, model: CrystalModel = CrystalModel.SPHERE): self.intercept = intercept self.slope = slope self.shape = shape self.xlim = self.mean + ( 10 * np.sqrt(self.variance) * np.array([-1,1]) ) self.xlim[0] = max(self.xlim[0], 0) self.xlim[1] = max(self.xlim[1], 5e-4) super().__init__(bins, model) def __str__(self) -> str: return super().__str__() + f"GammaPSD({self.intercept}, {self.slope}, {self.shape})"
[docs] @classmethod def from_litres_microns(cls, intercept_per_litre_per_micron, slope_per_micron, shape, **kwargs): # Note the extra factor of 1e-6^shape for conversion of power law. intercept = intercept_per_litre_per_micron * 1e3 * 1e6 * (1e6**shape) slope = slope_per_micron * 1e6 return cls(intercept, slope, shape, **kwargs)
[docs] @classmethod def from_litres_cm(cls, intercept_per_litre_per_cm, slope_per_cm, shape, **kwargs): # Note the extra factor of 1e-2^shape for conversion of power law. intercept = intercept_per_litre_per_cm * 1e3 * (1e2**(1+shape)) slope = slope_per_cm * 1e2 return cls(intercept, slope, shape, **kwargs)
[docs] @classmethod def w19_parameterisation(cls, temp, intercept=None, total_number_density=None, insitu_origin=False, liquid_origin=False): """Using Wolf et al. 2019 parameterisation. Returns: float: The number density per field. """ if intercept is None and total_number_density is None: raise ValueError("Must specify either intercept or total_number_concentration.") if intercept and total_number_density: raise ValueError("Cannot specify both intercept and total_number_concentration.") if not (insitu_origin or liquid_origin): raise ValueError("Must specify either insitu_origin or liquid_origin.") if insitu_origin and liquid_origin: raise ValueError("Cannot specify both insitu_origin and liquid_origin.") if insitu_origin: ln_slope_param = lambda temp: -0.06837 * temp + 3.492 #cm^-1 shape_param = lambda ln_slope: 0.009*np.exp(ln_slope*0.85) # Muhlbauer 2014 # shape_param = lambda ln_slope: 0.02819 * np.exp(0.7216*ln_slope) elif liquid_origin: ln_slope_param = lambda temp: 4.937 * np.exp(-0.001846*temp) #cm^-1 shape_param = lambda ln_slope: 0.104*np.exp(ln_slope*0.71) - 1.7 # shape_param = lambda ln_slope: 0.001379 * np.exp(1.285*ln_slope) ln_slope = ln_slope_param(temp) shape = shape_param(ln_slope) if total_number_density: # in per litre intercept = total_number_density * (np.exp(ln_slope) ** (shape + 1)) / np.math.gamma(shape + 1) return cls.from_litres_cm(intercept, np.exp(ln_slope), shape)
[docs] def parameter_description(self) -> str: return f"$N_0={self.intercept:.2e}$, $\lambda={self.slope:.2e}$, $\mu={self.shape:.2f}$"
[docs] @classmethod def from_concentration(cls, number_concentration, slope, shape, **kwargs): """Calculate the number density per field. Args: number_concentration (float): The number concentration in :math:`\mathrm{m^{-3}}`. slope (float): :math:`\Lambda` in :math:`\mathrm{m^{-1}}`. shape (float): :math:`\mu`. Returns: float: The number density per field. """ intercept = number_concentration * (slope ** (shape + 1)) / np.math.gamma(shape + 1) return cls(intercept, slope, shape, **kwargs)
@staticmethod def _dn_gamma_dd(d:ArrayLike, intercept:float, slope:float, shape:float): """The gamma distribution probability distribution function. Args: d (ArrayLike): The diameters in metres. intercept (float): :math:`N_0` in :math:`\mathrm{m^{-4}}`. slope (float): :math:`\Lambda` in :math:`\mathrm{m^{-1}}`. shape (float): :math:`\mu`. Returns: callable: The gamma distribution probability distribution function. """ # "shape" expects diameter values in cm in the O'Shea formulation return intercept * (d)**shape * np.exp(-1 * slope * d)
[docs] def dn_dd(self, d: ArrayLike) -> np.ndarray: """Calculate the particle size distribution value given diameters. """ # "shape" expects diameter values in cm in the O'Shea formulation - hence the 1e-2 fudge factor return self._dn_gamma_dd(d, self.intercept, self.slope, self.shape)
@property def mean(self): """Calculate the mean particle diameter.""" return (self.shape-1) / self.slope @property def variance(self): """Calculate the variance of the particle diameter.""" return np.abs((self.shape-1) / self.slope**2)
[docs] @classmethod def from_mean_variance(cls, number_concentration, mean, variance, **kwargs): """Create a GammaPSD object from the mean and variance of the particle diameter.""" slope = mean / variance shape = (mean * slope) + 1 return cls.from_concentration(number_concentration, slope, shape, **kwargs)
[docs] @classmethod def fit(cls, diameters, dn_dd, min_considered_diameter=50e-6): from scipy.optimize import curve_fit diameter_vals = diameters[(diameters >= min_considered_diameter)] dn_dd_vals = dn_dd[(diameters >= min_considered_diameter)] # expon = lambda d, intercept, slope: intercept * np.exp(-1 * slope * d) # log_gamma = lambda d, intercept, slope, shape: np.log10(GammaPSD._dn_gamma_dd(d, intercept, slope, shape)) fit_gamma = lambda d, intercept, slope, shape: GammaPSD._dn_gamma_dd(d, intercept, slope, shape) results = curve_fit(GammaPSD._dn_gamma_dd, diameter_vals, dn_dd_vals / 1e6, p0=[1, 1, 1], maxfev=10000, bounds=([0, 0, 1], [np.inf, np.inf, np.inf]), # method='dogbox', full_output=True ) intercept_l_mcb, slope, shape = results[0] intercept = intercept_l_mcb * 1e6 logging.info(f"\t{results[3]}") return cls(intercept, slope, shape)
[docs]class OSheaGammaPSD(GammaPSD):
[docs] def dn_dd(self, d: ArrayLike) -> np.ndarray: """Calculate the particle size distribution value given diameters. """ # "shape" expects diameter values in cm in the O'Shea formulation - hence the 1e-2 fudge factor return self._dn_gamma_dd(d, self.intercept / (1e-2**self.shape), self.slope, self.shape)
[docs]class TwoMomentGammaPSD(PSD): def __init__(self, m2, m3, bins: list[float] = None, model: CrystalModel = CrystalModel.SPHERE): self.m2 = m2 self.m3 = m3 super().__init__(bins, model)
[docs] @classmethod def from_m2_tc(cls, m2, tc, **kwargs): """Create a TwoMomentGammaPSD object from the second moment and in-cloud temperature. """ a_n = lambda n: np.exp(13.6 - 7.76 * n + 0.479 * n**2) b_n = lambda n: -0.0361 + 0.0151 * n + 0.00149 * n**2 c_n = lambda n: 0.807 + 0.00581 * n - 0.0457 * n**2 m3 = a_n(3) * np.exp(b_n(3) * tc) * m2**c_n(3) return cls(m2, m3, **kwargs)
[docs] def dn_dd(self, d: ArrayLike) -> np.ndarray: """Calculate the particle size distribution value given diameters. """ x = d * self.m2 / self.m3 # now neatly dimensionless # TODO: Currently only the mid-latitude parameters phi_23 = GammaPSD._dn_gamma_dd(x, 102, 4.82, 2.07) return phi_23 / (self.m3**3 / self.m2**4)
@property def characteristic_diameter(self): return self.m3 / self.m2
[docs]class SamplingModel: # NOTE: Not really used anymore; deprecated in favour of CloudModel, DetectorRun, and Retrieval """Particle size distribution model. A modelling class that contains a PSD object, and simulates particle measurement from it. Particles are assumed to be distributed uniformly across the inlet. Particle size is assumed to be independent of position along the inlet. Args: psd (PSD): The particle size distribution function, dN/dr in :math:`\mathrm{m^{-3}}`. z_dist (callable, optional, unimplimented): The probability distribution of particles along the z-axis, normalised to 1 when integrated wrt z, in m^-1. Defaults to uniform across array. inlet_length (float, optional): The length of the inlet in metres. """ def __init__(self, psd: GammaPSD, z_dist: callable = None, inlet_length: float = 7.35e-3): self.ast_models = {} self.psd = psd if z_dist is not None: # not implemented # TODO: work out xbounds and pmax self.z_dist = z_dist self.zbounds = [-1, 1] self.z_pmax = 1 else: self.z_pmax = 1 / inlet_length self.zbounds = [-inlet_length / 2, inlet_length / 2] self.z_dist = lambda z: self.z_pmax if abs( z) < self.zbounds[1] else 0
[docs] def generate(self, n_particles: int) -> np.ndarray: """Generate a sample of particles from the PSD, at random z positions. Args: n_particles (int): The number of particles to generate. Returns: np.ndarray: The particles represented by tuples of (diameter, z). """ # generate particle size and z positions particles = np.zeros((n_particles, 2)) for i in range(n_particles): particles[i, 0] = choices( self.psd.bins[1:], weights=self.psd.binned_distribution )[0] particles[i, 1] = rejection_sampler( self.z_dist, self.zbounds, self.z_pmax) return particles
[docs] def simulate_distribution(self, n_particles: int, single_particle: bool = False, keep_models=False) -> np.ndarray: """Simulate the observation of spherical particles. Generates a sample of spherical particles and then simulates the observation of the particles by producing new AST models. Args: n_particles (int): The number of particles to generate. single_particle (bool, optional): Whether to consider only the largest particle in each diffraction pattern. Defaults to False. keep_models (bool, optional): Whether to keep the AST models after simulation. Warning: this will use a lot of memory. Defaults to False. Returns: np.ndarray: The measured particle diameters. """ particles = self.generate(n_particles) diameters_measured = [] for i in range(particles.shape[0]): diameter = particles[i, 0] z_value = particles[i, 1] if diameter not in self.ast_models: self.ast_models[diameter] = ASTModel.from_diameter( diameter / 1e-6) intensity = self.ast_models[diameter].process(z_val=z_value).intensity if single_particle: diameters = [intensity.measure_xy_diameter().tolist()] else: diameters = intensity.measure_xy_diameters() diameters_measured += diameters if not keep_models: del self.ast_models[diameter] return np.array(diameters_measured)
[docs] def simulate_distribution_from_scaling( self, n_particles: int, single_particle: bool = False, base_model: ASTModel = None, keep_models=False ) -> np.ndarray: """Simulate the observation of similar particles. Generates a sample of particles, produces new AST models by scaling a base model, and then simulates the observation of the particles. Args: n_particles (int): The number of particles to generate. single_particle (bool, optional): Whether to consider only the largest particle in each diffraction pattern. Defaults to False. base_model (ASTModel, optional): The base model to scale. Should have a high resolution relative to typical particle sizes. Defaults to a 1000 nm diameter particle. keep_models (bool, optional): Whether to keep the AST models after simulation. Warning: this will use a lot of memory. Defaults to False. Returns: np.ndarray: The measured particle diameters. """ if base_model is None: base_model = ASTModel.from_diameter(1000) base_diameter, _ = base_model.process(0).intensity.measure_xy_diameter() particles = self.generate(n_particles) diameters_measured = {} for i in range(particles.shape[0]): diameter = particles[i, 0] z_value = particles[i, 1] if diameter not in self.ast_models: self.ast_models[diameter] = base_model.rescale( (diameter / 1e-6)/base_diameter) self.ast_models[diameter].regrid() intensity = self.ast_models[diameter].process(z_val=z_value).intensity if single_particle: diameters = [intensity.measure_xy_diameter().tolist()] else: diameters = intensity.measure_xy_diameters() diameters_measured[(diameter, z_value)] = diameters if not keep_models: del self.ast_models[diameter] # summing with an empty list flattens the list of lists return np.array(sum(diameters_measured.values(), []))
# def fit_gamma_distribution(diameters, bins): # NOTE: Not really used anymore # """Fit a gamma distribution to a set of diameters. # Args: # diameters (np.ndarray): The diameters to fit. # bins (np.ndarray): The bins to fit over. # Returns: # np.ndarray: The fitted gamma distribution. # """ # counts, _ = np.histogram(diameters, bins=bins) # dN_dr = counts / (bins[1:] - bins[:-1]) # TODO: Need to divide by sample volume. # bins = bins[:-1] # bins = bins[counts > 0] # dN_dr = dN_dr[counts > 0] # popt, pcov = curve_fit( # lambda d, intercept, slope: GammaPSD._dn_gamma_dd(d, intercept, slope, 2.5), # bins, dN_dr,p0=[1e10, 1e4]) # return popt, pcov