Source code for synpivimage.particles

import enum
import logging
import pathlib
from copy import deepcopy
from dataclasses import dataclass
from typing import Tuple
from typing import Union, Dict, Optional, List

import numpy as np
import scipy

from .component import Component

SQRT2 = np.sqrt(2)
PARTICLE_INFLUENCE_FACTOR = 6
logger = logging.getLogger('synpivimage')


class ParticleFlag(enum.Enum):
    """Particle status flags."""
    INACTIVE = 0
    # ACTIVE = 1  # ILLUMINATED (in laser sheet) and in FOV
    ILLUMINATED = 1  # ILLUMINATED (in laser sheet) and in FOV
    IN_FOV = 2  # not captured by the sensor because it is out of the field of view
    OUT_OF_PLANE = 4  # particle not in laser sheet in z-direction should be same as weakly illuminated
    DISABLED = 8
    ACTIVE = 2 + 1  # ILLUMINATED (in laser sheet) and in FOV
    # OUT_OF_PLANE = 4
    # EXITED_FOV = 8  # in x or y direction due to displacement
    # IN_FOV = 16  # not captured by the sensor because it is out of the field of view


@dataclass
class ParticleDisplacement:
    x: np.ndarray
    y: np.ndarray
    z: np.ndarray
    size: np.ndarray
    intensity: np.ndarray
    flagA: np.ndarray
    flagB: np.ndarray

    def __repr__(self):
        return f'ParticleDisplacement()'


[docs]class Particles(Component): """Particle class Contains position, size and flag information: - pixel (!) position: (x,y,z) within the light sheet. Mid of light sheet is z=0. - size: (real)) particle size in arbitrary units. - flag: Indicating status of a particle (active, out of plane, ...) """
[docs] def __init__(self, *, x: Union[float, List[float], np.ndarray], y: Union[float, List[float], np.ndarray], z: Union[float, List[float], np.ndarray], size: Union[float, List[float], np.ndarray], source_intensity: Optional[Union[float, List[float], np.ndarray]] = None, max_image_photons: Optional[Union[float, List[float], np.ndarray]] = None, image_electrons: Optional[Union[float, List[float], np.ndarray]] = None, image_quantized_electrons: Optional[Union[float, List[float], np.ndarray]] = None, flag: Union[int, List[int], np.ndarray] = None): """ Parameters ---------- x : np.ndarray x-coordinate of the particles on the sensor in pixels y : np.ndarray y-coordinate of the particles on the sensor in pixels z : np.ndarray z-coordinate of the particles on the sensor in arbitrary units size : np.ndarray Particle size in pixels source_intensity : np.ndarray Source intensity of the particles, which is the intensity it emits as a point source. The peak intensity on the sensor may be higher. see property `irrad_photons` max_image_photons : np.ndarray Maximum number of photons on the sensor image_electrons : np.ndarray Number of electrons on the sensor image_quantized_electrons : np.ndarray Number of quantized electrons on the sensor """ def _parse_array(_arr, _n: int, dtype=None): if _arr is None: _arr = np.zeros(shape=(_n,), dtype=dtype) if isinstance(_arr, (tuple, list)): _arr = np.asarray(_arr) elif isinstance(_arr, (float, int)): _arr = np.array([_arr, ]) else: if not isinstance(_arr, np.ndarray): raise TypeError(f"Expected array, got {type(_arr)}") if not _arr.ndim == 1: raise ValueError(f"Expected 1D array, got {_arr.ndim}D") if _n is None: return _arr if _arr.size != _n: raise ValueError(f"Expected array of size {_n}, got {_arr.size}") return _arr if x is None: raise ValueError("x cannot be None") self._x = _parse_array(x, None) N = self._x.size self._y = _parse_array(y, N) self._z = _parse_array(z, N) self._size = _parse_array(size, N) self._source_intensity = _parse_array(source_intensity, N) self._max_image_photons = _parse_array(max_image_photons, N) self._image_electrons = _parse_array(image_electrons, N) self._image_quantized_electrons = _parse_array(image_quantized_electrons, N) self._flag = _parse_array(flag, N, dtype=int) self._xlim = None self._ylim = None self._zlim = None
@property def x(self): """x-coordinate of the particles on the sensor in pixels""" return self._x @property def y(self): """y-coordinate of the particles on the sensor in pixels""" return self._y @property def z(self): """z-coordinate of the particles on the sensor in arbitrary units""" return self._z @property def size(self): """Particle size in pixels""" return self._size @property def source_intensity(self): """Source intensity of the particles, which is the intensity it emits as a point source. The peak intensity on the sensor may be higher. see property `irrad_photons`""" return self._source_intensity @source_intensity.setter def source_intensity(self, value): assert value.ndim == 1, f"Expected 1D array, got {value.ndim}D" assert value.size == self.n_particles, f"Expected array of size {self.n_particles}, got {value.size}" self._source_intensity = value @property def max_image_photons(self): """Maximum number of photons on the sensor""" return self._max_image_photons @max_image_photons.setter def max_image_photons(self, value): assert value.ndim == 1, f"Expected 1D array, got {value.ndim}D" assert value.size == self.n_particles, f"Expected array of size {self.n_particles}, got {value.size}" self.max_image_photons = value @property def image_electrons(self): """Number of electrons on the sensor""" return self._image_electrons @image_electrons.setter def image_electrons(self, value): assert value.ndim == 1, f"Expected 1D array, got {value.ndim}D" assert value.size == self.n_particles, f"Expected array of size {self.n_particles}, got {value.size}" self.image_electrons = value @property def image_quantized_electrons(self): """Number of quantized electrons on the sensor""" return self._image_quantized_electrons @image_quantized_electrons.setter def image_quantized_electrons(self, value): assert value.ndim == 1, f"Expected 1D array, got {value.ndim}D" assert value.size == self.n_particles, f"Expected array of size {self.n_particles}, got {value.size}" self.image_quantized_electrons = value @property def flag(self): """Indicating status of a particle (active, out of plane, ...)""" return self._flag @property def irrad_photons(self): """The number of photons irradiated by the particles on the sensor""" return self._source_intensity @irrad_photons.setter def irrad_photons(self, value): assert value.ndim == 1, f"Expected 1D array, got {value.ndim}D" assert value.size == self.n_particles, f"Expected array of size {self.n_particles}, got {value.size}" self.irrad_photons = value @property def n_particles(self): """Return the number of particles. Equal to `len(self)`""" return self.x.size def reset(self): """Sets all intensities to zero and flags to zero""" self._source_intensity = np.zeros_like(self.x) self._max_image_photons = np.zeros_like(self.x) self._image_electrons = np.zeros_like(self.x) self._image_quantized_electrons = np.zeros_like(self.x) self._flag = np.zeros_like(self.x, dtype=int) @classmethod def generate( cls, ppp: float, dx_max: float, dy_max: float, dz_max: float, size: float, camera: "Camera", laser: "Laser" ) -> "Particles": """Generate particles based on a certain ppp (particles per pixel). With dx, dy, dz the maximum displacement of the particles can be set. The camera and laser are used to determine the sensor size and the laser width. Parameters ---------- ppp : float Particles per pixel dx_max : float Maximum displacement in x-direction dy_max : float Maximum displacement in y-direction dz_max : float Maximum displacement in z-direction size : float Particle size camera : Camera Camera model laser : Laser Laser model """ from .utils import generate_particles return generate_particles(ppp=ppp, dx_max=dx_max, dy_max=dy_max, dz_max=dz_max, size=size, camera=camera, laser=laser) def get_ppp(self, camera_size: int) -> float: """Return the particles per pixel""" return self.active.sum() / camera_size def regenerate(self) -> "Particles": """Regenerate the particles of this object. The locations (x, y, z) of the particles will change and the intensities will be reset .. note:: Does NOT create a new object! The returned object is the same """ self.reset() N = len(self.x) if self._xlim is None: self._x = np.random.uniform(min(self.x), max(self.x), N) else: self._x = np.random.uniform(*self._xlim, N) if self._ylim is None: self._y = np.random.uniform(min(self.y), max(self.y), N) else: self._y = np.random.uniform(*self._ylim, N) if self._ylim is None: self._z = np.random.uniform(min(self.z), max(self.z), N) else: self._z = np.random.uniform(*self._zlim, N) return self def __len__(self): return self.x.size def dict(self) -> Dict: """Returns a dictionary representation of the particle data""" return {'x': self.x, 'y': self.y, 'z': self.z, 'size': self.size, 'flag': self.flag, 'source_intensity': self.source_intensity, 'max_image_photons': self.max_image_photons, 'image_electrons': self.image_electrons, 'image_quantized_electrons': self.image_quantized_electrons} def __getitem__(self, item): data: Dict = self.dict() x = data['x'][item] if x.ndim == 0: return Particles(**{k: [v[item], ] for k, v in data.items()}) return Particles(**{k: v[item] for k, v in data.items()}) def __iter__(self): for i in range(len(self)): yield self[i] def model_dump(self) -> Dict: """Returns a dictionary representation of the particle data where list instead of numpy arrays are used. This is useful for dumping to JSON""" return {'x': self.x.tolist(), 'y': self.y.tolist(), 'z': self.z.tolist(), 'size': self.size.tolist(), 'source_intensity': self.source_intensity.tolist(), 'max_image_photons': self.max_image_photons.tolist(), 'image_electrons': self.image_electrons.tolist(), 'image_quantized_electrons': self.image_quantized_electrons.tolist(), 'flag': self.flag.tolist()} def save_jsonld(self, filename: Union[str, pathlib.Path]): from pivmetalib import pivmeta from pivmetalib import m4i from .codemeta import get_package_meta filename = pathlib.Path(filename) # .with_suffix('.jsonld') source_intensity = 0. if np.all(self.source_intensity == 0) else self.source_intensity max_image_photons = 0. if np.all(self.max_image_photons == 0) else self.max_image_photons image_electrons = 0. if np.all(self.image_electrons == 0) else self.image_electrons image_quantized_electrons = 0. if np.all( self.image_quantized_electrons == 0) else self.image_quantized_electrons hasParameter = [ m4i.variable.NumericalVariable( label='x', hasNumericalValue=self.x.astype("float16").tolist() ), m4i.variable.NumericalVariable( label='y', hasNumericalValue=self.y.astype("float16").tolist() ), m4i.variable.NumericalVariable( label='z', hasNumericalValue=self.z.astype("float16").tolist() ), m4i.variable.NumericalVariable( label='size', hasNumericalValue=self.size.astype("float16").tolist() ), m4i.variable.NumericalVariable( label='flag', hasNumericalValue=self.flag.astype("uint8").tolist() ), m4i.variable.NumericalVariable( label='source_intensity', hasNumericalValue=np.asarray(source_intensity, dtype="uint16").tolist() ), m4i.variable.NumericalVariable( label='max_image_photons', hasNumericalValue=np.asarray(max_image_photons, dtype="uint16").tolist() ), m4i.variable.NumericalVariable( label='image_electrons', hasNumericalValue=np.asarray(image_electrons, dtype="uint16").tolist() ), m4i.variable.NumericalVariable( label='image_quantized_electrons', hasNumericalValue=np.asarray(image_quantized_electrons, dtype="uint16").tolist() ), ] particles = pivmeta.SyntheticParticle( hasSourceCode=get_package_meta(), hasParameter=hasParameter ) with open(filename, 'w') as f: f.write( particles.model_dump_jsonld( # context={ # "@import": 'https://raw.githubusercontent.com/matthiasprobst/pivmeta/main/pivmeta_context.jsonld', # } ) ) return filename def displace(self, dx=None, dy=None, dz=None): """Displace the particles. Can only be done if particles are not inactive. Raises ------ ValueError If particles are inactive, which means that the particles have not been illuminated yet, hence they cannot be displaced. Call `synpivimage.take_image` first """ if self.inactive.sum() > 0: raise ValueError("Cannot displace particles if they have been illuminated once, so a image has been taken") if dx is not None: new_x = self.x + dx else: new_x = self.x if dy is not None: new_y = self.y + dy else: new_y = self.y if dz is not None: new_z = self.z + dz else: new_z = self.z return self.__class__(x=new_x, y=new_y, z=new_z, size=self.size, source_intensity=None, max_image_photons=None, flag=None) @property def inactive(self): """Return mask of inactive particles""" return np.asarray(self.flag & ParticleFlag.INACTIVE.value, dtype=bool) @property def active(self): """Return mask of illuminated particles""" flag = ParticleFlag.ACTIVE.value return np.asarray(self.flag & flag == flag, dtype=bool) @property def n_active(self): """Return the number of active particles""" return np.sum(self.active) @property def source_density_number(self): """Return the number of particles in the laser sheet (and FOV)""" return np.sum(self.active) @property def disabled(self): """Return mask of disabled particles""" return np.asarray(self.flag & ParticleFlag.DISABLED.value, dtype=bool) @property def in_fov(self): """Return mask of particles in the FOV""" flag = ParticleFlag.IN_FOV.value return np.asarray(self.flag & flag, dtype=bool) @property def out_of_plane(self): """Return mask of particles out of plane""" flag = ParticleFlag.OUT_OF_PLANE.value return np.asarray(self.flag & flag == flag, dtype=bool) @property def in_fov_and_out_of_plane(self): """Return mask of particles out of plane""" flag = ParticleFlag.IN_FOV.value | ParticleFlag.OUT_OF_PLANE.value return np.asarray(self.flag & flag == flag, dtype=bool) @property def n_out_of_plane_loss(self) -> int: """Return the number of particles that are out of plane""" return np.sum(self.in_fov_and_out_of_plane) def info(self): """Prints some useful information about the particles""" print("=== Particle Information === ") print(f" > Number of simulated particles: {self.x.size}") print(f" > Number of active (illuminated and in FOV) particles: {self.active.sum()}") flag = ParticleFlag.IN_FOV.value n_in_fov = np.sum(self.flag & flag == flag) print(f" > Number of particles outside of FOV: {self.x.size - n_in_fov}") print(f" > Out of plane particles: {self.n_out_of_plane_loss}") # print(f" > Disabled particles due to out-of-FOV: {self.out_of_fov.sum()}") @classmethod def generate_uniform(cls, n_particles: int, size: Union[float, Tuple[float, float]], x_bounds: Tuple[float, float], y_bounds: Tuple[float, float], z_bounds: Tuple[float, float]): """Generate particles uniformly""" assert len(x_bounds) == 2 assert len(y_bounds) == 2 assert len(z_bounds) == 2 assert x_bounds[1] > x_bounds[0] assert y_bounds[1] > y_bounds[0] assert z_bounds[1] >= z_bounds[0] x = np.random.uniform(x_bounds[0], x_bounds[1], n_particles) y = np.random.uniform(y_bounds[0], y_bounds[1], n_particles) z = np.random.uniform(z_bounds[0], z_bounds[1], n_particles) if isinstance(size, (float, int)): size = np.ones_like(x) * size elif isinstance(size, (list, tuple)): assert len(size) == 2 # generate a normal distribution, which is cut at +/- 2 sigma size = np.random.normal(size[0], size[1], n_particles) # cut the tails min_size = max(0, size[0] - 2 * size[1]) max_size = size[0] + 2 * size[1] size[size < min_size] = 0 size[size > max_size] = max_size else: raise ValueError(f"Size {size} not supported") intensity = np.zeros_like(x) # no intensity by default flag = np.zeros_like(x, dtype=bool) # disabled by default return cls(x, y, z, size, intensity, flag) def __sub__(self, other: "Particles") -> ParticleDisplacement: """Subtract two particle sets""" return ParticleDisplacement(x=self.x - other.x, y=self.y - other.y, z=self.z - other.z, size=self.size - other.size, intensity=self.source_intensity - other.source_intensity, flagA=self.flag, flagB=other.flag) def copy(self): """Return a copy of this object""" return deepcopy(self) def load_jsonld(self): raise NotImplementedError("Not implemented yet")
def compute_intensity_distribution( x, y, xp, yp, dp, sigmax, sigmay, fill_ratio_x, fill_ratio_y): """Computes the sensor intensity based on the error function as used in SIG by Lecordier et al. (2003) Parameters ---------- x : np.ndarray x-coordinate of the sensor pixels y : np.ndarray y-coordinate of the sensor pixels xp : float x-coordinate of the particles on the sensor in pixels yp : float y-coordinate of the particles on the sensor in pixels dp : float particle image diameter (in pixels) sigmax : float standard deviation of the Gaussian in x-direction sigmay : float standard deviation of the Gaussian in y-direction fill_ratio_x : float fill ratio of the sensor in x-direction fill_ratio_y : float fill ratio of the sensor in y-direction Returns ------- np.ndarray The intensity distribution of the particles on the sensor """ frx05 = 0.5 * fill_ratio_x fry05 = 0.5 * fill_ratio_y dxp = x - xp dyp = y - yp erf1 = (scipy.special.erf((dxp + frx05) / (SQRT2 * sigmax)) - scipy.special.erf( (dxp - frx05) / (SQRT2 * sigmax))) erf2 = (scipy.special.erf((dyp + fry05) / (SQRT2 * sigmay)) - scipy.special.erf( (dyp - fry05) / (SQRT2 * sigmay))) intensity = np.pi / 2 * dp ** 2 * sigmax * sigmay * erf1 * erf2 return intensity def model_image_particles( particles: Particles, nx: int, ny: int, sigmax: float, sigmay: float, fill_ratio_x: float, fill_ratio_y: float, ): """Model the photons irradiated by the particles on the sensor.""" image_shape = (ny, nx) irrad_photons = np.zeros(image_shape) delta = int(PARTICLE_INFLUENCE_FACTOR * max(sigmax, sigmay)) max_image_photons = np.zeros_like(particles.x) for ip, (x, y, p_size, pint) in enumerate( zip(particles.x, particles.y, particles.size, particles.source_intensity)): xint = int(x) yint = int(y) xmin = max(0, xint - delta) ymin = max(0, yint - delta) xmax = min(nx, xint + delta) ymax = min(ny, yint + delta) sub_img_shape = (ymax - ymin, xmax - xmin) px = x - xmin py = y - ymin xx, yy = np.meshgrid(range(sub_img_shape[1]), range(sub_img_shape[0])) Ip = compute_intensity_distribution( x=xx, y=yy, xp=px, yp=py, dp=p_size, sigmax=sigmax, sigmay=sigmay, fill_ratio_x=fill_ratio_x, fill_ratio_y=fill_ratio_y, ) irrad_photons[ymin:ymax, xmin:xmax] += Ip * pint max_image_photons[ip] = np.max(Ip * pint) return irrad_photons, max_image_photons