Module pyangstrom.transform
Expand source code
import logging
import warnings
from typing import TypedDict, Iterable
from enum import Enum, auto
from dataclasses import dataclass
import pandas as pd
import numpy as np
from pyangstrom.exp_setup import ExperimentalSetup
logger = logging.getLogger('transform')
TEMPERATURE_OFFSET = {
'Temperature (C)': 273.15,
}
class Direction(Enum):
"""The direction from which the sample is heated."""
LESSER_X = auto()
GREATER_X = auto()
LESSER_Y = auto()
GREATER_Y = auto()
class Point(TypedDict):
"""The coordinates of one pixel in the recording."""
x_pixels: int | float
y_pixels: int | float
class CartesianGeometry(TypedDict, total=False):
"""The bounds of a rectangle and from where it is being heated.
Attributes
----------
min_x_pixels
Lower x bound.
max_x_pixels
Upper x bound.
min_y_pixels
Lower y bound.
max_y_pixels
Upper y bound.
heat_source_x_pixels
The x-position of a vertical line heat source. Mutually exclusive from
heat_source_y_pixels.
heat_source_y_pixels
The y-position of a horizontal line heat source. Mutually exclusive from
heat_source_x_pixels.
"""
min_x_pixels: int
max_x_pixels: int
min_y_pixels: int
max_y_pixels: int
heat_source_x_pixels: int
heat_source_y_pixels: int
class PolarGeometry(TypedDict):
"""The bounds of an annular sector, centered on a point heat source.
Attributes
----------
center
The coordinates of the point heat source.
min_r_pixels
Inner radius.
max_r_pixels
Outer radius.
num_r
Number of radial subdivisions to use.
min_theta_degrees
Starting angle.
max_theta_degrees
Ending angle.
num_theta
Number of angular subdivisions to use.
"""
center: Point
min_r_pixels: int | float
max_r_pixels: int | float
num_r: int
min_theta_degrees: int | float
max_theta_degrees: int | float
num_theta: int
Geometry = CartesianGeometry | PolarGeometry
class RegionStructure(TypedDict, total=False):
"""Various options by which to to restructure a region's temperatures.
Attributes
----------
subtract_temperatures_by
If set to one of:
- 'mean'
- 'avg'
- 'average'
Temperatures will be subtracted by the average of all temperature
measurements.
If, instead, set to one of:
- 'min'
- 'minimum'
- 'lowest'
Temperatures will by subtracted by the lowest of all temperature
measurements.
Does not affect total number of dimensions present.
average_out_span
Whether temperatures with the same displacement from the heat source
should be averaged together. Removes a dimension.
num_deinterleaving_groups
The number of groups the temperatures should be separated into by way of
deinterleaving. Adds a dimension.
"""
subtract_temperatures_by: str
average_out_span: bool
num_deinterleaving_groups: int
class RegionConfig(TypedDict, total=False):
"""Specifications to extract a single region from the recording."""
geometry: Geometry
structure: RegionStructure
class RegionBatchConfig(TypedDict, total=False):
"""Specifications to extract multiple related regions from the recording."""
geometries: list[Geometry]
structure: RegionStructure
average_over_regions: bool
RegionInformation = RegionConfig | RegionBatchConfig | list[RegionConfig] | list[RegionBatchConfig]
@dataclass
class Margins:
"""Spatiotemporal measurements of each temperature data point off
standardized references.
Attributes
----------
seconds_elapsed
A 1-dimensional array of the time offsets of each recording frame from
the first frame's time.
displacements_meters
An N-dimensional array of the positional offset of each temperature
measurement from the heat source.
"""
seconds_elapsed: np.ndarray
displacements_meters: np.ndarray
@classmethod
def new(
cls,
df_recording: pd.DataFrame,
temperatures: np.ndarray,
min_displacement_pixels: int | float,
max_displacement_pixels: int | float,
setup: ExperimentalSetup,
) -> "Margins":
normalized_timestamps = df_recording.index - df_recording.index.min()
seconds_elapsed = normalized_timestamps.total_seconds().to_numpy()
disp = np.linspace(
min_displacement_pixels,
max_displacement_pixels,
temperatures.shape[1],
dtype=float,
)
disp = disp * setup['meters_per_pixel']
disp = np.stack(temperatures.shape[2] * [disp], axis=1)
return cls(seconds_elapsed, disp)
@dataclass
class Region:
"""Temperature data from a bounded region of the recording, transformed into
a standardized format.
Attributes
----------
timestamps
The timestamps of the original recording frames.
temperatures_kelvin
An N-dimensional array of temperatures grouped by time, displacement
from the heat source, and other factors based on the order of its axes.
margins
Spatiotemporal metadata of the temperatures.
"""
timestamps: pd.DatetimeIndex
temperatures_kelvin: np.ndarray
margins: Margins
def convert_temperatures_to_kelvin(
df_recording: pd.DataFrame,
arr_temps: np.ndarray,
) -> np.ndarray:
try:
offset = TEMPERATURE_OFFSET[df_recording['Units'].unique().item()]
return arr_temps + offset
except ValueError:
raise ValueError("More than one temperature unit found")
except KeyError:
msg = (f"Temperature unit \"{df_recording['Units'].unique().item()}\" "
"not accounted for")
raise NotImplementedError(msg)
def find_heat_source_direction(geometry: CartesianGeometry) -> Direction:
"""
Raises
------
KeyError
Field not found in geometry.
ValueError
Invalid geometry.
"""
if ('heat_source_x_pixels' in geometry
and 'heat_source_y_pixels' in geometry):
raise ValueError(
"Cannot have both heat_source_x_pixels and heat_source_y_pixels in "
"geometry."
)
elif 'heat_source_x_pixels' in geometry:
if geometry['heat_source_x_pixels'] <= geometry['min_x_pixels']:
return Direction.LESSER_X
elif geometry['heat_source_x_pixels'] >= geometry['max_x_pixels']:
return Direction.GREATER_X
else:
raise ValueError(
"heat_source_x_pixels cannot be between min_x_pixels and "
"max_x_pixels."
)
elif 'heat_source_y_pixels' in geometry:
if geometry['heat_source_y_pixels'] <= geometry['min_y_pixels']:
return Direction.LESSER_Y
elif geometry['heat_source_y_pixels'] >= geometry['max_y_pixels']:
return Direction.GREATER_Y
else:
raise ValueError(
"heat_source_y_pixels cannot be between min_y_pixels and "
"max_y_pixels."
)
else:
raise KeyError(
"Must have at either heat_source_x_pixels or heat_source_y_pixels "
"in geometry."
)
def extract_cartesian_region(
df_recording: pd.DataFrame,
geometry: CartesianGeometry,
setup: ExperimentalSetup,
) -> Region:
"""
Raises
------
KeyError
Field not found in geometry.
ValueError
Invalid geometry.
"""
temps = np.stack(df_recording['Samples']) # (time, height, width)
temps = np.moveaxis(temps, [-1, -2], [0, 1]) # (width, height, time)
temps = temps[
geometry['min_x_pixels'] : geometry['max_x_pixels'] + 1,
geometry['min_y_pixels'] : geometry['max_y_pixels'] + 1,
]
temps = np.moveaxis(temps, [0, 1], [1, 2]) # (time, width, height)
# (time, displacement, span)
match find_heat_source_direction(geometry):
case Direction.LESSER_X:
min_disp_px = geometry['min_x_pixels'] - geometry['heat_source_x_pixels']
max_disp_px = geometry['max_x_pixels'] - geometry['heat_source_x_pixels']
case Direction.GREATER_X:
temps = np.flip(temps, axis=1)
min_disp_px = geometry['heat_source_x_pixels'] - geometry['max_x_pixels']
max_disp_px = geometry['heat_source_x_pixels'] - geometry['min_x_pixels']
case Direction.LESSER_Y:
temps = np.swapaxes(temps, 1, 2)
min_disp_px = geometry['min_y_pixels'] - geometry['heat_source_y_pixels']
max_disp_px = geometry['max_y_pixels'] - geometry['heat_source_y_pixels']
case Direction.GREATER_Y:
temps = np.swapaxes(temps, 1, 2)
temps = np.flip(temps, axis=1)
min_disp_px = geometry['heat_source_y_pixels'] - geometry['max_y_pixels']
max_disp_px = geometry['heat_source_y_pixels'] - geometry['min_y_pixels']
temps_kelvin = convert_temperatures_to_kelvin(df_recording, temps)
region = Region(
df_recording.index,
temps_kelvin,
Margins.new(df_recording, temps, min_disp_px, max_disp_px, setup),
)
return region
def extract_polar_region(
df_recording: pd.DataFrame,
geometry: PolarGeometry,
setup: ExperimentalSetup,
) -> Region:
temps = np.stack(df_recording['Samples']) # (time, height, width)
temps = np.moveaxis(temps, [-1, -2], [0, 1]) # (width, height, time)
r_pixels = np.linspace(
geometry['min_r_pixels'],
geometry['max_r_pixels'],
geometry['num_r'],
)
theta_degrees = np.deg2rad(np.linspace(
geometry['min_theta_degrees'],
geometry['max_theta_degrees'],
geometry['num_theta'],
))
r_coord, theta_coord = np.meshgrid(r_pixels, theta_degrees, indexing='ij')
x_coord = r_coord * np.cos(theta_coord) + geometry['center']['x_pixels']
y_coord = r_coord * np.sin(theta_coord) + geometry['center']['y_pixels']
x_coord_floor = np.floor(x_coord).astype('int')
x_coord_floorp1 = x_coord_floor + 1
y_coord_floor = np.floor(y_coord).astype('int')
y_coord_floorp1 = y_coord_floor + 1
lower_x_weight = (x_coord_floorp1 - x_coord) / (x_coord_floorp1 - x_coord_floor)
upper_x_weight = (x_coord - x_coord_floor) / (x_coord_floorp1 - x_coord_floor)
lower_y_weight = (y_coord_floorp1 - y_coord) / (y_coord_floorp1 - y_coord_floor)
upper_y_weight = (y_coord - y_coord_floor) / (y_coord_floorp1 - y_coord_floor)
# (time, radius, angle)
lxly_temps = np.moveaxis(temps[x_coord_floor, y_coord_floor], [0, 1], [1, 2]) * lower_x_weight * lower_y_weight
lxuy_temps = np.moveaxis(temps[x_coord_floor, y_coord_floorp1], [0, 1], [1, 2]) * lower_x_weight * upper_y_weight
uxly_temps = np.moveaxis(temps[x_coord_floorp1, y_coord_floor], [0, 1], [1, 2]) * upper_x_weight * lower_y_weight
uxuy_temps = np.moveaxis(temps[x_coord_floorp1, y_coord_floorp1], [0, 1], [1, 2]) * upper_x_weight * upper_y_weight
temps_trans = lxly_temps + lxuy_temps + uxly_temps + uxuy_temps
temps_kelvin = convert_temperatures_to_kelvin(df_recording, temps_trans)
region = Region(
df_recording.index,
temps_kelvin,
Margins.new(
df_recording,
temps_kelvin,
geometry['min_r_pixels'],
geometry['max_r_pixels'],
setup,
),
)
return region
def geometry_to_region(
df_recording: pd.DataFrame,
geometry: Geometry,
setup: ExperimentalSetup,
) -> Region:
match geometry:
case {'min_x_pixels': _}:
return extract_cartesian_region(df_recording, geometry, setup)
case {'center': _}:
return extract_polar_region(df_recording, geometry, setup)
def truncate_region(region: Region, num_truncate: int, axis: int) -> Region:
new_time = region.timestamps
new_temps = np.moveaxis(region.temperatures_kelvin, axis, 0)
new_temps = new_temps[:-num_truncate]
new_temps = np.moveaxis(new_temps, 0, axis)
if axis == 0:
new_time = region.timestamps[:-num_truncate]
new_elapsed = region.margins.seconds_elapsed[:-num_truncate]
new_margins = Margins(new_elapsed, region.margins.displacements_meters)
else:
new_disp = region.margins.displacements_meters
new_disp = np.moveaxis(new_disp, axis - 1, 0)
new_disp = new_disp[:-num_truncate]
new_disp = np.moveaxis(new_disp, 0, axis - 1)
new_margins = Margins(region.margins.seconds_elapsed, new_disp)
new_region = Region(
new_time,
new_temps,
new_margins,
)
return new_region
def restructure_region(region: Region, structure: RegionStructure) -> Region:
if 'subtract_temperatures_by' in structure:
match structure['subtract_temperatures_by']:
case 'mean' | 'avg' | 'average':
subtrahend = region.temperatures_kelvin.mean()
case 'min' | 'minimum' | 'lowest':
subtrahend = region.temperatures_kelvin.min()
case _:
subtrahend = 0
warnings.warn(
f"Subtract temperatures by "
f"\'{structure['subtract_temperatures_by']}\' "
f"not understood"
)
region = Region(
region.timestamps,
region.temperatures_kelvin - subtrahend,
region.margins,
)
if 'average_out_span' in structure and structure['average_out_span']:
region = Region(
region.timestamps,
region.temperatures_kelvin.mean(axis=2),
Margins(
region.margins.seconds_elapsed,
region.margins.displacements_meters.mean(axis=1),
),
)
if 'num_deinterleaving_groups' in structure:
num_disp = region.temperatures_kelvin.shape[1]
new_num_disp, remainder = divmod(
num_disp,
structure['num_deinterleaving_groups'],
)
if remainder != 0:
region = truncate_region(region, remainder, axis=1)
lst_groups = np.split(region.temperatures_kelvin, new_num_disp, axis=1)
new_temps = np.stack(lst_groups, axis=1)
lst_disp = np.split(
region.margins.displacements_meters,
new_num_disp,
axis=0,
)
new_disp = np.stack(lst_disp, axis=0)
region = Region(
region.timestamps,
new_temps,
Margins(region.margins.seconds_elapsed, new_disp),
)
return region
def all_timestamps_same(regions: Iterable[Region]) -> bool:
idx0 = next(iter(regions)).timestamps
for region in regions:
if not idx0.symmetric_difference(region.timestamps).empty:
return False
return True
def all_temps_same_shape(regions: Iterable[Region]) -> bool:
dim_sizes = zip(*(r.temperatures_kelvin.shape for r in regions))
size_counts = [set(s) for s in dim_sizes]
return all(len(c) == 1 for c in size_counts)
def min_temps_shape(regions: Iterable[Region]) -> tuple:
dim_sizes = zip(*(r.temperatures_kelvin.shape for r in regions))
min_shape = tuple(min(s) for s in dim_sizes)
return min_shape
def trim_regions(regions: Iterable[Region]) -> list[Region]:
min_shape = min_temps_shape(regions)
new_regions = []
for region in regions:
for axis, size in enumerate(min_shape):
region = truncate_region(
region,
region.temperatures_kelvin.shape[axis] - size,
axis,
)
new_regions.append(region)
return new_regions
def fully_extract_region(
df_recording: pd.DataFrame,
information: RegionInformation,
setup: ExperimentalSetup,
) -> Region | list[Region] | list[list[Region]]:
"""Extract all regions of temperature from the recording as specified by the
region information configuration.
Raises
------
ValueError
Malformed information.
"""
match information:
case {'geometry': geometry}:
region = geometry_to_region(df_recording, geometry, setup)
if 'structure' in information:
region = restructure_region(region, information['structure'])
return region
case {'geometries': geometries}:
regions = [
geometry_to_region(df_recording, g, setup) for g in geometries
]
if 'structure' in information:
regions = [
restructure_region(r, information['structure'])
for r in regions
]
if ('average_over_regions' not in information
or not information['average_over_regions']):
return regions
assert all_timestamps_same(regions)
if not all_temps_same_shape(regions):
warnings.warn(
"Not all regions have the same number of samples. Trimming "
"regions to match minimum shape."
)
regions = trim_regions(regions)
new_temps = np.stack(
[r.temperatures_kelvin for r in regions],
axis=2,
)
new_temps = new_temps.mean(axis=2)
new_disp = np.stack(
[r.margins.displacements_meters for r in regions],
axis=1
)
new_disp = new_disp.mean(axis=1)
new_region = Region(
regions[0].timestamps,
new_temps,
Margins(regions[0].margins.seconds_elapsed, new_disp),
)
return new_region
case [*region_configs]:
regions = [
fully_extract_region(df_recording, c, setup)
for c in region_configs
]
return regions
case _:
raise ValueError(f"Invalid information format: {information}")
def collapse_region(region: Region) -> Region:
"""Reduce the temperatures to a 2-dimensional array, representing time
offsets and displacements.
"""
num_times, num_disp, *_ = region.temperatures_kelvin.shape
new_temps = (region.temperatures_kelvin
.reshape(num_times, num_disp, -1)
.mean(axis=2))
new_disp = (region.margins.displacements_meters
.reshape(num_disp, -1)
.mean(axis=1))
new_region = Region(
region.timestamps,
new_temps,
Margins(region.margins.seconds_elapsed, new_disp),
)
return new_region
Functions
def all_temps_same_shape(regions: Iterable[Region]) ‑> bool
-
Expand source code
def all_temps_same_shape(regions: Iterable[Region]) -> bool: dim_sizes = zip(*(r.temperatures_kelvin.shape for r in regions)) size_counts = [set(s) for s in dim_sizes] return all(len(c) == 1 for c in size_counts)
def all_timestamps_same(regions: Iterable[Region]) ‑> bool
-
Expand source code
def all_timestamps_same(regions: Iterable[Region]) -> bool: idx0 = next(iter(regions)).timestamps for region in regions: if not idx0.symmetric_difference(region.timestamps).empty: return False return True
def collapse_region(region: Region) ‑> Region
-
Reduce the temperatures to a 2-dimensional array, representing time offsets and displacements.
Expand source code
def collapse_region(region: Region) -> Region: """Reduce the temperatures to a 2-dimensional array, representing time offsets and displacements. """ num_times, num_disp, *_ = region.temperatures_kelvin.shape new_temps = (region.temperatures_kelvin .reshape(num_times, num_disp, -1) .mean(axis=2)) new_disp = (region.margins.displacements_meters .reshape(num_disp, -1) .mean(axis=1)) new_region = Region( region.timestamps, new_temps, Margins(region.margins.seconds_elapsed, new_disp), ) return new_region
def convert_temperatures_to_kelvin(df_recording: pandas.core.frame.DataFrame, arr_temps: numpy.ndarray) ‑> numpy.ndarray
-
Expand source code
def convert_temperatures_to_kelvin( df_recording: pd.DataFrame, arr_temps: np.ndarray, ) -> np.ndarray: try: offset = TEMPERATURE_OFFSET[df_recording['Units'].unique().item()] return arr_temps + offset except ValueError: raise ValueError("More than one temperature unit found") except KeyError: msg = (f"Temperature unit \"{df_recording['Units'].unique().item()}\" " "not accounted for") raise NotImplementedError(msg)
def extract_cartesian_region(df_recording: pandas.core.frame.DataFrame, geometry: CartesianGeometry, setup: ExperimentalSetup) ‑> Region
-
Raises
KeyError
- Field not found in geometry.
ValueError
- Invalid geometry.
Expand source code
def extract_cartesian_region( df_recording: pd.DataFrame, geometry: CartesianGeometry, setup: ExperimentalSetup, ) -> Region: """ Raises ------ KeyError Field not found in geometry. ValueError Invalid geometry. """ temps = np.stack(df_recording['Samples']) # (time, height, width) temps = np.moveaxis(temps, [-1, -2], [0, 1]) # (width, height, time) temps = temps[ geometry['min_x_pixels'] : geometry['max_x_pixels'] + 1, geometry['min_y_pixels'] : geometry['max_y_pixels'] + 1, ] temps = np.moveaxis(temps, [0, 1], [1, 2]) # (time, width, height) # (time, displacement, span) match find_heat_source_direction(geometry): case Direction.LESSER_X: min_disp_px = geometry['min_x_pixels'] - geometry['heat_source_x_pixels'] max_disp_px = geometry['max_x_pixels'] - geometry['heat_source_x_pixels'] case Direction.GREATER_X: temps = np.flip(temps, axis=1) min_disp_px = geometry['heat_source_x_pixels'] - geometry['max_x_pixels'] max_disp_px = geometry['heat_source_x_pixels'] - geometry['min_x_pixels'] case Direction.LESSER_Y: temps = np.swapaxes(temps, 1, 2) min_disp_px = geometry['min_y_pixels'] - geometry['heat_source_y_pixels'] max_disp_px = geometry['max_y_pixels'] - geometry['heat_source_y_pixels'] case Direction.GREATER_Y: temps = np.swapaxes(temps, 1, 2) temps = np.flip(temps, axis=1) min_disp_px = geometry['heat_source_y_pixels'] - geometry['max_y_pixels'] max_disp_px = geometry['heat_source_y_pixels'] - geometry['min_y_pixels'] temps_kelvin = convert_temperatures_to_kelvin(df_recording, temps) region = Region( df_recording.index, temps_kelvin, Margins.new(df_recording, temps, min_disp_px, max_disp_px, setup), ) return region
def extract_polar_region(df_recording: pandas.core.frame.DataFrame, geometry: PolarGeometry, setup: ExperimentalSetup) ‑> Region
-
Expand source code
def extract_polar_region( df_recording: pd.DataFrame, geometry: PolarGeometry, setup: ExperimentalSetup, ) -> Region: temps = np.stack(df_recording['Samples']) # (time, height, width) temps = np.moveaxis(temps, [-1, -2], [0, 1]) # (width, height, time) r_pixels = np.linspace( geometry['min_r_pixels'], geometry['max_r_pixels'], geometry['num_r'], ) theta_degrees = np.deg2rad(np.linspace( geometry['min_theta_degrees'], geometry['max_theta_degrees'], geometry['num_theta'], )) r_coord, theta_coord = np.meshgrid(r_pixels, theta_degrees, indexing='ij') x_coord = r_coord * np.cos(theta_coord) + geometry['center']['x_pixels'] y_coord = r_coord * np.sin(theta_coord) + geometry['center']['y_pixels'] x_coord_floor = np.floor(x_coord).astype('int') x_coord_floorp1 = x_coord_floor + 1 y_coord_floor = np.floor(y_coord).astype('int') y_coord_floorp1 = y_coord_floor + 1 lower_x_weight = (x_coord_floorp1 - x_coord) / (x_coord_floorp1 - x_coord_floor) upper_x_weight = (x_coord - x_coord_floor) / (x_coord_floorp1 - x_coord_floor) lower_y_weight = (y_coord_floorp1 - y_coord) / (y_coord_floorp1 - y_coord_floor) upper_y_weight = (y_coord - y_coord_floor) / (y_coord_floorp1 - y_coord_floor) # (time, radius, angle) lxly_temps = np.moveaxis(temps[x_coord_floor, y_coord_floor], [0, 1], [1, 2]) * lower_x_weight * lower_y_weight lxuy_temps = np.moveaxis(temps[x_coord_floor, y_coord_floorp1], [0, 1], [1, 2]) * lower_x_weight * upper_y_weight uxly_temps = np.moveaxis(temps[x_coord_floorp1, y_coord_floor], [0, 1], [1, 2]) * upper_x_weight * lower_y_weight uxuy_temps = np.moveaxis(temps[x_coord_floorp1, y_coord_floorp1], [0, 1], [1, 2]) * upper_x_weight * upper_y_weight temps_trans = lxly_temps + lxuy_temps + uxly_temps + uxuy_temps temps_kelvin = convert_temperatures_to_kelvin(df_recording, temps_trans) region = Region( df_recording.index, temps_kelvin, Margins.new( df_recording, temps_kelvin, geometry['min_r_pixels'], geometry['max_r_pixels'], setup, ), ) return region
def find_heat_source_direction(geometry: CartesianGeometry) ‑> Direction
-
Raises
KeyError
- Field not found in geometry.
ValueError
- Invalid geometry.
Expand source code
def find_heat_source_direction(geometry: CartesianGeometry) -> Direction: """ Raises ------ KeyError Field not found in geometry. ValueError Invalid geometry. """ if ('heat_source_x_pixels' in geometry and 'heat_source_y_pixels' in geometry): raise ValueError( "Cannot have both heat_source_x_pixels and heat_source_y_pixels in " "geometry." ) elif 'heat_source_x_pixels' in geometry: if geometry['heat_source_x_pixels'] <= geometry['min_x_pixels']: return Direction.LESSER_X elif geometry['heat_source_x_pixels'] >= geometry['max_x_pixels']: return Direction.GREATER_X else: raise ValueError( "heat_source_x_pixels cannot be between min_x_pixels and " "max_x_pixels." ) elif 'heat_source_y_pixels' in geometry: if geometry['heat_source_y_pixels'] <= geometry['min_y_pixels']: return Direction.LESSER_Y elif geometry['heat_source_y_pixels'] >= geometry['max_y_pixels']: return Direction.GREATER_Y else: raise ValueError( "heat_source_y_pixels cannot be between min_y_pixels and " "max_y_pixels." ) else: raise KeyError( "Must have at either heat_source_x_pixels or heat_source_y_pixels " "in geometry." )
def fully_extract_region(df_recording: pandas.core.frame.DataFrame, information: RegionConfig | RegionBatchConfig | list[RegionConfig] | list[RegionBatchConfig], setup: ExperimentalSetup) ‑> Region | list[Region] | list[list[Region]]
-
Extract all regions of temperature from the recording as specified by the region information configuration.
Raises
ValueError
- Malformed information.
Expand source code
def fully_extract_region( df_recording: pd.DataFrame, information: RegionInformation, setup: ExperimentalSetup, ) -> Region | list[Region] | list[list[Region]]: """Extract all regions of temperature from the recording as specified by the region information configuration. Raises ------ ValueError Malformed information. """ match information: case {'geometry': geometry}: region = geometry_to_region(df_recording, geometry, setup) if 'structure' in information: region = restructure_region(region, information['structure']) return region case {'geometries': geometries}: regions = [ geometry_to_region(df_recording, g, setup) for g in geometries ] if 'structure' in information: regions = [ restructure_region(r, information['structure']) for r in regions ] if ('average_over_regions' not in information or not information['average_over_regions']): return regions assert all_timestamps_same(regions) if not all_temps_same_shape(regions): warnings.warn( "Not all regions have the same number of samples. Trimming " "regions to match minimum shape." ) regions = trim_regions(regions) new_temps = np.stack( [r.temperatures_kelvin for r in regions], axis=2, ) new_temps = new_temps.mean(axis=2) new_disp = np.stack( [r.margins.displacements_meters for r in regions], axis=1 ) new_disp = new_disp.mean(axis=1) new_region = Region( regions[0].timestamps, new_temps, Margins(regions[0].margins.seconds_elapsed, new_disp), ) return new_region case [*region_configs]: regions = [ fully_extract_region(df_recording, c, setup) for c in region_configs ] return regions case _: raise ValueError(f"Invalid information format: {information}")
def geometry_to_region(df_recording: pandas.core.frame.DataFrame, geometry: CartesianGeometry | PolarGeometry, setup: ExperimentalSetup) ‑> Region
-
Expand source code
def geometry_to_region( df_recording: pd.DataFrame, geometry: Geometry, setup: ExperimentalSetup, ) -> Region: match geometry: case {'min_x_pixels': _}: return extract_cartesian_region(df_recording, geometry, setup) case {'center': _}: return extract_polar_region(df_recording, geometry, setup)
def min_temps_shape(regions: Iterable[Region]) ‑> tuple
-
Expand source code
def min_temps_shape(regions: Iterable[Region]) -> tuple: dim_sizes = zip(*(r.temperatures_kelvin.shape for r in regions)) min_shape = tuple(min(s) for s in dim_sizes) return min_shape
def restructure_region(region: Region, structure: RegionStructure) ‑> Region
-
Expand source code
def restructure_region(region: Region, structure: RegionStructure) -> Region: if 'subtract_temperatures_by' in structure: match structure['subtract_temperatures_by']: case 'mean' | 'avg' | 'average': subtrahend = region.temperatures_kelvin.mean() case 'min' | 'minimum' | 'lowest': subtrahend = region.temperatures_kelvin.min() case _: subtrahend = 0 warnings.warn( f"Subtract temperatures by " f"\'{structure['subtract_temperatures_by']}\' " f"not understood" ) region = Region( region.timestamps, region.temperatures_kelvin - subtrahend, region.margins, ) if 'average_out_span' in structure and structure['average_out_span']: region = Region( region.timestamps, region.temperatures_kelvin.mean(axis=2), Margins( region.margins.seconds_elapsed, region.margins.displacements_meters.mean(axis=1), ), ) if 'num_deinterleaving_groups' in structure: num_disp = region.temperatures_kelvin.shape[1] new_num_disp, remainder = divmod( num_disp, structure['num_deinterleaving_groups'], ) if remainder != 0: region = truncate_region(region, remainder, axis=1) lst_groups = np.split(region.temperatures_kelvin, new_num_disp, axis=1) new_temps = np.stack(lst_groups, axis=1) lst_disp = np.split( region.margins.displacements_meters, new_num_disp, axis=0, ) new_disp = np.stack(lst_disp, axis=0) region = Region( region.timestamps, new_temps, Margins(region.margins.seconds_elapsed, new_disp), ) return region
def trim_regions(regions: Iterable[Region]) ‑> list[Region]
-
Expand source code
def trim_regions(regions: Iterable[Region]) -> list[Region]: min_shape = min_temps_shape(regions) new_regions = [] for region in regions: for axis, size in enumerate(min_shape): region = truncate_region( region, region.temperatures_kelvin.shape[axis] - size, axis, ) new_regions.append(region) return new_regions
def truncate_region(region: Region, num_truncate: int, axis: int) ‑> Region
-
Expand source code
def truncate_region(region: Region, num_truncate: int, axis: int) -> Region: new_time = region.timestamps new_temps = np.moveaxis(region.temperatures_kelvin, axis, 0) new_temps = new_temps[:-num_truncate] new_temps = np.moveaxis(new_temps, 0, axis) if axis == 0: new_time = region.timestamps[:-num_truncate] new_elapsed = region.margins.seconds_elapsed[:-num_truncate] new_margins = Margins(new_elapsed, region.margins.displacements_meters) else: new_disp = region.margins.displacements_meters new_disp = np.moveaxis(new_disp, axis - 1, 0) new_disp = new_disp[:-num_truncate] new_disp = np.moveaxis(new_disp, 0, axis - 1) new_margins = Margins(region.margins.seconds_elapsed, new_disp) new_region = Region( new_time, new_temps, new_margins, ) return new_region
Classes
class CartesianGeometry (*args, **kwargs)
-
The bounds of a rectangle and from where it is being heated.
Attributes
min_x_pixels
- Lower x bound.
max_x_pixels
- Upper x bound.
min_y_pixels
- Lower y bound.
max_y_pixels
- Upper y bound.
heat_source_x_pixels
- The x-position of a vertical line heat source. Mutually exclusive from heat_source_y_pixels.
heat_source_y_pixels
- The y-position of a horizontal line heat source. Mutually exclusive from heat_source_x_pixels.
Expand source code
class CartesianGeometry(TypedDict, total=False): """The bounds of a rectangle and from where it is being heated. Attributes ---------- min_x_pixels Lower x bound. max_x_pixels Upper x bound. min_y_pixels Lower y bound. max_y_pixels Upper y bound. heat_source_x_pixels The x-position of a vertical line heat source. Mutually exclusive from heat_source_y_pixels. heat_source_y_pixels The y-position of a horizontal line heat source. Mutually exclusive from heat_source_x_pixels. """ min_x_pixels: int max_x_pixels: int min_y_pixels: int max_y_pixels: int heat_source_x_pixels: int heat_source_y_pixels: int
Ancestors
- builtins.dict
Class variables
var heat_source_x_pixels : int
var heat_source_y_pixels : int
var max_x_pixels : int
var max_y_pixels : int
var min_x_pixels : int
var min_y_pixels : int
class Direction (value, names=None, *, module=None, qualname=None, type=None, start=1)
-
The direction from which the sample is heated.
Expand source code
class Direction(Enum): """The direction from which the sample is heated.""" LESSER_X = auto() GREATER_X = auto() LESSER_Y = auto() GREATER_Y = auto()
Ancestors
- enum.Enum
Class variables
var GREATER_X
var GREATER_Y
var LESSER_X
var LESSER_Y
class Margins (seconds_elapsed: numpy.ndarray, displacements_meters: numpy.ndarray)
-
Spatiotemporal measurements of each temperature data point off standardized references.
Attributes
seconds_elapsed
- A 1-dimensional array of the time offsets of each recording frame from the first frame's time.
displacements_meters
- An N-dimensional array of the positional offset of each temperature measurement from the heat source.
Expand source code
@dataclass class Margins: """Spatiotemporal measurements of each temperature data point off standardized references. Attributes ---------- seconds_elapsed A 1-dimensional array of the time offsets of each recording frame from the first frame's time. displacements_meters An N-dimensional array of the positional offset of each temperature measurement from the heat source. """ seconds_elapsed: np.ndarray displacements_meters: np.ndarray @classmethod def new( cls, df_recording: pd.DataFrame, temperatures: np.ndarray, min_displacement_pixels: int | float, max_displacement_pixels: int | float, setup: ExperimentalSetup, ) -> "Margins": normalized_timestamps = df_recording.index - df_recording.index.min() seconds_elapsed = normalized_timestamps.total_seconds().to_numpy() disp = np.linspace( min_displacement_pixels, max_displacement_pixels, temperatures.shape[1], dtype=float, ) disp = disp * setup['meters_per_pixel'] disp = np.stack(temperatures.shape[2] * [disp], axis=1) return cls(seconds_elapsed, disp)
Class variables
var displacements_meters : numpy.ndarray
var seconds_elapsed : numpy.ndarray
Static methods
def new(df_recording: pandas.core.frame.DataFrame, temperatures: numpy.ndarray, min_displacement_pixels: int | float, max_displacement_pixels: int | float, setup: ExperimentalSetup) ‑> Margins
-
Expand source code
@classmethod def new( cls, df_recording: pd.DataFrame, temperatures: np.ndarray, min_displacement_pixels: int | float, max_displacement_pixels: int | float, setup: ExperimentalSetup, ) -> "Margins": normalized_timestamps = df_recording.index - df_recording.index.min() seconds_elapsed = normalized_timestamps.total_seconds().to_numpy() disp = np.linspace( min_displacement_pixels, max_displacement_pixels, temperatures.shape[1], dtype=float, ) disp = disp * setup['meters_per_pixel'] disp = np.stack(temperatures.shape[2] * [disp], axis=1) return cls(seconds_elapsed, disp)
class Point (*args, **kwargs)
-
The coordinates of one pixel in the recording.
Expand source code
class Point(TypedDict): """The coordinates of one pixel in the recording.""" x_pixels: int | float y_pixels: int | float
Ancestors
- builtins.dict
Class variables
var x_pixels : int | float
var y_pixels : int | float
class PolarGeometry (*args, **kwargs)
-
The bounds of an annular sector, centered on a point heat source.
Attributes
center
- The coordinates of the point heat source.
min_r_pixels
- Inner radius.
max_r_pixels
- Outer radius.
num_r
- Number of radial subdivisions to use.
min_theta_degrees
- Starting angle.
max_theta_degrees
- Ending angle.
num_theta
- Number of angular subdivisions to use.
Expand source code
class PolarGeometry(TypedDict): """The bounds of an annular sector, centered on a point heat source. Attributes ---------- center The coordinates of the point heat source. min_r_pixels Inner radius. max_r_pixels Outer radius. num_r Number of radial subdivisions to use. min_theta_degrees Starting angle. max_theta_degrees Ending angle. num_theta Number of angular subdivisions to use. """ center: Point min_r_pixels: int | float max_r_pixels: int | float num_r: int min_theta_degrees: int | float max_theta_degrees: int | float num_theta: int
Ancestors
- builtins.dict
Class variables
var center : Point
var max_r_pixels : int | float
var max_theta_degrees : int | float
var min_r_pixels : int | float
var min_theta_degrees : int | float
var num_r : int
var num_theta : int
class Region (timestamps: pandas.core.indexes.datetimes.DatetimeIndex, temperatures_kelvin: numpy.ndarray, margins: Margins)
-
Temperature data from a bounded region of the recording, transformed into a standardized format.
Attributes
timestamps
- The timestamps of the original recording frames.
temperatures_kelvin
- An N-dimensional array of temperatures grouped by time, displacement from the heat source, and other factors based on the order of its axes.
margins
- Spatiotemporal metadata of the temperatures.
Expand source code
@dataclass class Region: """Temperature data from a bounded region of the recording, transformed into a standardized format. Attributes ---------- timestamps The timestamps of the original recording frames. temperatures_kelvin An N-dimensional array of temperatures grouped by time, displacement from the heat source, and other factors based on the order of its axes. margins Spatiotemporal metadata of the temperatures. """ timestamps: pd.DatetimeIndex temperatures_kelvin: np.ndarray margins: Margins
Class variables
var margins : Margins
var temperatures_kelvin : numpy.ndarray
var timestamps : pandas.core.indexes.datetimes.DatetimeIndex
class RegionBatchConfig (*args, **kwargs)
-
Specifications to extract multiple related regions from the recording.
Expand source code
class RegionBatchConfig(TypedDict, total=False): """Specifications to extract multiple related regions from the recording.""" geometries: list[Geometry] structure: RegionStructure average_over_regions: bool
Ancestors
- builtins.dict
Class variables
var average_over_regions : bool
var geometries : list[CartesianGeometry | PolarGeometry]
var structure : RegionStructure
class RegionConfig (*args, **kwargs)
-
Specifications to extract a single region from the recording.
Expand source code
class RegionConfig(TypedDict, total=False): """Specifications to extract a single region from the recording.""" geometry: Geometry structure: RegionStructure
Ancestors
- builtins.dict
Class variables
var geometry : CartesianGeometry | PolarGeometry
var structure : RegionStructure
class RegionStructure (*args, **kwargs)
-
Various options by which to to restructure a region's temperatures.
Attributes
subtract_temperatures_by
-
If set to one of:
- 'mean'
- 'avg'
- 'average'
Temperatures will be subtracted by the average of all temperature measurements.
If, instead, set to one of:
- 'min'
- 'minimum'
- 'lowest'
Temperatures will by subtracted by the lowest of all temperature measurements.
Does not affect total number of dimensions present.
average_out_span
- Whether temperatures with the same displacement from the heat source should be averaged together. Removes a dimension.
num_deinterleaving_groups
- The number of groups the temperatures should be separated into by way of deinterleaving. Adds a dimension.
Expand source code
class RegionStructure(TypedDict, total=False): """Various options by which to to restructure a region's temperatures. Attributes ---------- subtract_temperatures_by If set to one of: - 'mean' - 'avg' - 'average' Temperatures will be subtracted by the average of all temperature measurements. If, instead, set to one of: - 'min' - 'minimum' - 'lowest' Temperatures will by subtracted by the lowest of all temperature measurements. Does not affect total number of dimensions present. average_out_span Whether temperatures with the same displacement from the heat source should be averaged together. Removes a dimension. num_deinterleaving_groups The number of groups the temperatures should be separated into by way of deinterleaving. Adds a dimension. """ subtract_temperatures_by: str average_out_span: bool num_deinterleaving_groups: int
Ancestors
- builtins.dict
Class variables
var average_out_span : bool
var num_deinterleaving_groups : int
var subtract_temperatures_by : str