Module pyangstrom.pipeline

Expand source code
import logging
import warnings
from typing import Optional
from pathlib import PurePath, Path

import matplotlib.pyplot as plt

from pyangstrom.config import Config
from pyangstrom.caching import Cache
from pyangstrom.io import (
    recording_cache_exists,
    load_recording_csv,
    save_recording_cache,
    load_recording_cache,
    load_config,
)
from pyangstrom.transform import fully_extract_region
from pyangstrom.signal import signal_process_region
from pyangstrom.fit import autofit
from pyangstrom.visualization.recording import animate_recording
from pyangstrom.visualization.region import (
    plot_geometry,
    plot_spatiotemporal_heat_map,
    plot_isotherms,
    plot_groups,
)
from pyangstrom.visualization.signal import (
    plot_amplitude_ratios,
    plot_phase_differences,
)


logger = logging.getLogger('pipeline')

def fully_extract_result(func, data, *args):
    if isinstance(data, list):
        return [fully_extract_result(func, d, *args) for d in data]
    return func(data, *args)

def visualize_recording(df_recording):
    return df_recording, animate_recording(df_recording)

def visualize_region(df_recording, region_result, region_information):
    if isinstance(region_result, list):
        _, figs = zip(*(
            visualize_region(df_recording, r, i) for r, i
            in zip(
                region_result,
                region_information
                if isinstance(region_information, list)
                else [{'geometry': g} for g in region_information['geometries']]
            )
        ))
        return region_result, list(figs)

    fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(16, 16))
    if 'geometry' in region_information:
        plot_geometry(ax1, df_recording, region_information['geometry'])
    else:
        plot_geometry(ax1, df_recording, region_information['geometries'])
    plot_spatiotemporal_heat_map(ax2, region_result)
    plot_isotherms(ax3, region_result)
    plot_groups(ax4, region_result)
    return region_result, fig

def visualize_signal(df_recording, signal_result, region_information):
    if isinstance(signal_result, list):
        _, figs = zip(*(
            visualize_signal(df_recording, s, i) for s, i
            in zip(
                signal_result,
                region_information
                if isinstance(region_information, list)
                else [{'geometry': g} for g in region_information['geometries']]
            )
        ))
        return signal_result, list(figs)

    fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(16, 16))
    if 'geometry' in region_information:
        plot_geometry(ax1, df_recording, region_information['geometry'])
    else:
        plot_geometry(ax1, df_recording, region_information['geometries'])
    plot_isotherms(ax2, signal_result.processed_region)
    plot_amplitude_ratios(ax3, signal_result)
    plot_phase_differences(ax4, signal_result)
    return signal_result, fig

def visualize_fit(
        df_recording,
        signal_result,
        fitting_result,
        region_information,
):
    if isinstance(fitting_result, list):
        _, figs = zip(*(
            visualize_fit(df_recording, s, f, i) for s, f, i
            in zip(
                signal_result,
                fitting_result,
                region_information
                if isinstance(region_information, list)
                else [{'geometry': g} for g in region_information['geometries']]
            )
        ))
        return fitting_result, list(figs)

    fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(16, 16))
    if 'geometry' in region_information:
        plot_geometry(ax1, df_recording, region_information['geometry'])
    else:
        plot_geometry(ax1, df_recording, region_information['geometries'])
    plot_isotherms(ax2, signal_result.processed_region)
    plot_amplitude_ratios(ax3, signal_result, fitting_result)
    plot_phase_differences(ax4, signal_result, fitting_result)
    # ax1: geometry
    # ax2: isotherms (maybe observed + theoretical)
    # ax3: amp ratio / displacement (observed + theoretical)
    # ax4: phase diff / displacement (observed + theoretical)
    # axN: fitting method-specific plots
    return fitting_result, fig

def analyze_recording(
        recording_path: str | Path,
        config: str | Path | Config,
        *,
        debug: bool = False,
        return_visualization: bool = False,
        memory_cache: Optional[Cache] = None,
        recording_cache_path: Optional[str | Path] = None,
):
    """Compute the thermal conductivity of the recorded sample.

    Parameters
    ----------
    recording_path
        Path to the directory holding CSV IR camera data. Does not have to be a
        real path if there is a cache file with a matching name in
        recording_cache_path.
    config
        Path to a JSON config file or the deserialized config object itself.

    Other Parameters
    ----------------
    debug
        If set to True, stops analyze_recording from catching general
        Exceptions, allowing errors to fall through and exposing the full stack
        trace.
    return_visualization
        If set to True, analyze_recording returns a tuple in which the normal
        results are the first element and their corresponding Matplotlib Figures
        are in the second element.
    memory_cache
        Holds intermediate results in memory, allowing for faster reruns.
    recording_cache_path
        Path to a directory in which cached IR camera data will be saved.
    """
    logger.info("Loading recording")
    if recording_cache_path:
        recording_path = PurePath(recording_path)
        recording_cache_path = Path(recording_cache_path)
        if recording_cache_exists(recording_cache_path, recording_path.stem):
            logger.debug("Recording cache found")
            df_recording = load_recording_cache(
                recording_cache_path,
                recording_path.stem,
            )
            logger.debug(f"{df_recording[:1]=}")
        else:
            logger.debug("Recording cache not found")
            recording_path = Path(recording_path)
            df_recording = load_recording_csv(recording_path)
            save_recording_cache(
                df_recording,
                recording_cache_path,
                recording_path.stem,
            )
            logger.debug(f"Saved to cache: {df_recording[:1]=}")
    else:
        recording_path = Path(recording_path)
        df_recording = load_recording_csv(recording_path)
        logger.debug(f"Loaded csv: {df_recording[:1]=}")
    if not isinstance(config, dict):
        logger.info("Loading config")
        config = load_config(Path(config))
        logger.debug(f"{config=}")
    if 'region_information' not in config:
        if return_visualization:
            return visualize_recording(df_recording)
        else:
            return df_recording
    if 'experimental_setup' not in config:
        warnings.warn("Field experimental_setup required to extract regions")
        if return_visualization:
            return visualize_recording(df_recording)
        else:
            return df_recording
    logger.info("Extracting region(s)")
    try:
        region_result = fully_extract_region(
            df_recording,
            config['region_information'],
            config['experimental_setup'],
        )
    except Exception as e:
        if debug:
            raise
        warnings.warn(repr(e))
        if return_visualization:
            return visualize_recording(df_recording)
        else:
            return df_recording
    if 'signal_processor' not in config:
        if return_visualization:
            return visualize_region(
                df_recording,
                region_result,
                config['region_information'],
            )
        else:
            return region_result
    if isinstance(region_result, list):
        if isinstance(region_result[0], list):
            debug_region = region_result[0][0]
        else:
            debug_region = region_result[0]
    else:
        debug_region = region_result
    logger.debug(f"{debug_region.timestamps[:1]=}")
    logger.debug(f"{debug_region.temperatures_kelvin.flatten()[:1]=}")
    logger.debug(f"{debug_region.margins.seconds_elapsed[:1]=}")
    logger.debug(f"{debug_region.margins.displacements_meters.flatten()[:1]=}")
    logger.info("Signal processing")
    try:
        signal_result = fully_extract_result(
            signal_process_region,
            region_result,
            config['signal_processor'],
            config['experimental_setup']
        )
    except Exception as e:
        if debug:
            raise
        warnings.warn(repr(e))
        if return_visualization:
            return visualize_region(
                df_recording,
                region_result,
                config['region_information'],
            )
        else:
            return region_result
    if 'solver' not in config or 'fitter' not in config:
        if return_visualization:
            return visualize_signal(
                df_recording,
                signal_result,
                config['region_information'],
            )
        else:
            return signal_result
    if isinstance(signal_result, list):
        if isinstance(signal_result[0], list):
            debug_signal = signal_result[0][0]
        else:
            debug_signal = signal_result[0]
    else:
        debug_signal = signal_result
    for name, prop in debug_signal.signal_properties._asdict().items():
        logger.debug(f"{name}={prop[:3]}")
    logger.info("Fitting")
    try:
        fitting_result = fully_extract_result(
            autofit,
            signal_result,
            config['solver'],
            config['fitter'],
            config['experimental_setup'],
        )
    except Exception as e:
        if debug:
            raise
        warnings.warn(repr(e))
        if return_visualization:
            return visualize_signal(
                df_recording,
                signal_result,
                config['region_information'],
            )
        else:
            return signal_result
    if return_visualization:
        return visualize_fit(
            df_recording,
            signal_result,
            fitting_result,
            config['region_information'],
        )
    else:
        return fitting_result

Functions

def analyze_recording(recording_path: str | pathlib.Path, config: str | pathlib.Path | Config, *, debug: bool = False, return_visualization: bool = False, memory_cache: Optional[Cache] = None, recording_cache_path: Union[str, pathlib.Path, NoneType] = None)

Compute the thermal conductivity of the recorded sample.

Parameters

recording_path
Path to the directory holding CSV IR camera data. Does not have to be a real path if there is a cache file with a matching name in recording_cache_path.
config
Path to a JSON config file or the deserialized config object itself.

Other Parameters

debug
If set to True, stops analyze_recording from catching general Exceptions, allowing errors to fall through and exposing the full stack trace.
return_visualization
If set to True, analyze_recording returns a tuple in which the normal results are the first element and their corresponding Matplotlib Figures are in the second element.
memory_cache
Holds intermediate results in memory, allowing for faster reruns.
recording_cache_path
Path to a directory in which cached IR camera data will be saved.
Expand source code
def analyze_recording(
        recording_path: str | Path,
        config: str | Path | Config,
        *,
        debug: bool = False,
        return_visualization: bool = False,
        memory_cache: Optional[Cache] = None,
        recording_cache_path: Optional[str | Path] = None,
):
    """Compute the thermal conductivity of the recorded sample.

    Parameters
    ----------
    recording_path
        Path to the directory holding CSV IR camera data. Does not have to be a
        real path if there is a cache file with a matching name in
        recording_cache_path.
    config
        Path to a JSON config file or the deserialized config object itself.

    Other Parameters
    ----------------
    debug
        If set to True, stops analyze_recording from catching general
        Exceptions, allowing errors to fall through and exposing the full stack
        trace.
    return_visualization
        If set to True, analyze_recording returns a tuple in which the normal
        results are the first element and their corresponding Matplotlib Figures
        are in the second element.
    memory_cache
        Holds intermediate results in memory, allowing for faster reruns.
    recording_cache_path
        Path to a directory in which cached IR camera data will be saved.
    """
    logger.info("Loading recording")
    if recording_cache_path:
        recording_path = PurePath(recording_path)
        recording_cache_path = Path(recording_cache_path)
        if recording_cache_exists(recording_cache_path, recording_path.stem):
            logger.debug("Recording cache found")
            df_recording = load_recording_cache(
                recording_cache_path,
                recording_path.stem,
            )
            logger.debug(f"{df_recording[:1]=}")
        else:
            logger.debug("Recording cache not found")
            recording_path = Path(recording_path)
            df_recording = load_recording_csv(recording_path)
            save_recording_cache(
                df_recording,
                recording_cache_path,
                recording_path.stem,
            )
            logger.debug(f"Saved to cache: {df_recording[:1]=}")
    else:
        recording_path = Path(recording_path)
        df_recording = load_recording_csv(recording_path)
        logger.debug(f"Loaded csv: {df_recording[:1]=}")
    if not isinstance(config, dict):
        logger.info("Loading config")
        config = load_config(Path(config))
        logger.debug(f"{config=}")
    if 'region_information' not in config:
        if return_visualization:
            return visualize_recording(df_recording)
        else:
            return df_recording
    if 'experimental_setup' not in config:
        warnings.warn("Field experimental_setup required to extract regions")
        if return_visualization:
            return visualize_recording(df_recording)
        else:
            return df_recording
    logger.info("Extracting region(s)")
    try:
        region_result = fully_extract_region(
            df_recording,
            config['region_information'],
            config['experimental_setup'],
        )
    except Exception as e:
        if debug:
            raise
        warnings.warn(repr(e))
        if return_visualization:
            return visualize_recording(df_recording)
        else:
            return df_recording
    if 'signal_processor' not in config:
        if return_visualization:
            return visualize_region(
                df_recording,
                region_result,
                config['region_information'],
            )
        else:
            return region_result
    if isinstance(region_result, list):
        if isinstance(region_result[0], list):
            debug_region = region_result[0][0]
        else:
            debug_region = region_result[0]
    else:
        debug_region = region_result
    logger.debug(f"{debug_region.timestamps[:1]=}")
    logger.debug(f"{debug_region.temperatures_kelvin.flatten()[:1]=}")
    logger.debug(f"{debug_region.margins.seconds_elapsed[:1]=}")
    logger.debug(f"{debug_region.margins.displacements_meters.flatten()[:1]=}")
    logger.info("Signal processing")
    try:
        signal_result = fully_extract_result(
            signal_process_region,
            region_result,
            config['signal_processor'],
            config['experimental_setup']
        )
    except Exception as e:
        if debug:
            raise
        warnings.warn(repr(e))
        if return_visualization:
            return visualize_region(
                df_recording,
                region_result,
                config['region_information'],
            )
        else:
            return region_result
    if 'solver' not in config or 'fitter' not in config:
        if return_visualization:
            return visualize_signal(
                df_recording,
                signal_result,
                config['region_information'],
            )
        else:
            return signal_result
    if isinstance(signal_result, list):
        if isinstance(signal_result[0], list):
            debug_signal = signal_result[0][0]
        else:
            debug_signal = signal_result[0]
    else:
        debug_signal = signal_result
    for name, prop in debug_signal.signal_properties._asdict().items():
        logger.debug(f"{name}={prop[:3]}")
    logger.info("Fitting")
    try:
        fitting_result = fully_extract_result(
            autofit,
            signal_result,
            config['solver'],
            config['fitter'],
            config['experimental_setup'],
        )
    except Exception as e:
        if debug:
            raise
        warnings.warn(repr(e))
        if return_visualization:
            return visualize_signal(
                df_recording,
                signal_result,
                config['region_information'],
            )
        else:
            return signal_result
    if return_visualization:
        return visualize_fit(
            df_recording,
            signal_result,
            fitting_result,
            config['region_information'],
        )
    else:
        return fitting_result
def fully_extract_result(func, data, *args)
Expand source code
def fully_extract_result(func, data, *args):
    if isinstance(data, list):
        return [fully_extract_result(func, d, *args) for d in data]
    return func(data, *args)
def visualize_fit(df_recording, signal_result, fitting_result, region_information)
Expand source code
def visualize_fit(
        df_recording,
        signal_result,
        fitting_result,
        region_information,
):
    if isinstance(fitting_result, list):
        _, figs = zip(*(
            visualize_fit(df_recording, s, f, i) for s, f, i
            in zip(
                signal_result,
                fitting_result,
                region_information
                if isinstance(region_information, list)
                else [{'geometry': g} for g in region_information['geometries']]
            )
        ))
        return fitting_result, list(figs)

    fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(16, 16))
    if 'geometry' in region_information:
        plot_geometry(ax1, df_recording, region_information['geometry'])
    else:
        plot_geometry(ax1, df_recording, region_information['geometries'])
    plot_isotherms(ax2, signal_result.processed_region)
    plot_amplitude_ratios(ax3, signal_result, fitting_result)
    plot_phase_differences(ax4, signal_result, fitting_result)
    # ax1: geometry
    # ax2: isotherms (maybe observed + theoretical)
    # ax3: amp ratio / displacement (observed + theoretical)
    # ax4: phase diff / displacement (observed + theoretical)
    # axN: fitting method-specific plots
    return fitting_result, fig
def visualize_recording(df_recording)
Expand source code
def visualize_recording(df_recording):
    return df_recording, animate_recording(df_recording)
def visualize_region(df_recording, region_result, region_information)
Expand source code
def visualize_region(df_recording, region_result, region_information):
    if isinstance(region_result, list):
        _, figs = zip(*(
            visualize_region(df_recording, r, i) for r, i
            in zip(
                region_result,
                region_information
                if isinstance(region_information, list)
                else [{'geometry': g} for g in region_information['geometries']]
            )
        ))
        return region_result, list(figs)

    fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(16, 16))
    if 'geometry' in region_information:
        plot_geometry(ax1, df_recording, region_information['geometry'])
    else:
        plot_geometry(ax1, df_recording, region_information['geometries'])
    plot_spatiotemporal_heat_map(ax2, region_result)
    plot_isotherms(ax3, region_result)
    plot_groups(ax4, region_result)
    return region_result, fig
def visualize_signal(df_recording, signal_result, region_information)
Expand source code
def visualize_signal(df_recording, signal_result, region_information):
    if isinstance(signal_result, list):
        _, figs = zip(*(
            visualize_signal(df_recording, s, i) for s, i
            in zip(
                signal_result,
                region_information
                if isinstance(region_information, list)
                else [{'geometry': g} for g in region_information['geometries']]
            )
        ))
        return signal_result, list(figs)

    fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(16, 16))
    if 'geometry' in region_information:
        plot_geometry(ax1, df_recording, region_information['geometry'])
    else:
        plot_geometry(ax1, df_recording, region_information['geometries'])
    plot_isotherms(ax2, signal_result.processed_region)
    plot_amplitude_ratios(ax3, signal_result)
    plot_phase_differences(ax4, signal_result)
    return signal_result, fig