Source code for rheojax.io.readers.anton_paar

"""RheoCompass CSV parser for Anton Paar rheometer exports.

This module provides a complete parser for RheoCompass CSV exports with:
- Interval-based data block parsing
- Automatic encoding detection (UTF-16, UTF-8, Latin-1)
- Test type auto-detection (creep, relaxation, oscillation, rotation)
- Metadata extraction (geometry, gap, temperature)
- Unit normalization to SI
- Derived quantity computation (J(t), G(t), G*)

The parser handles RheoCompass-specific format features including tab-separated
values, "Interval and data points:" markers, and locale-aware decimal separators.
"""

from __future__ import annotations

import math
import re
import warnings
from collections.abc import Callable
from dataclasses import dataclass
from functools import lru_cache
from pathlib import Path
from typing import Any

import numpy as np
import pandas as pd

from rheojax.core.data import RheoData
from rheojax.io.readers._utils import normalize_temperature
from rheojax.logging import get_logger

logger = get_logger(__name__)


# =============================================================================
# Data Structures (T007)
# =============================================================================


@dataclass
class IntervalBlock:
    """Container for a single interval's data and metadata.

    Attributes:
        interval_index: 1-based interval number from file
        n_points: Number of data points (if specified in header)
        units: Column name to unit string mapping
        df: Parsed data as DataFrame
    """

    interval_index: int
    n_points: int | None
    units: dict[str, str]
    df: pd.DataFrame


# =============================================================================
# Column Mappings (T008)
# =============================================================================

# Maps RheoCompass column name patterns to canonical RheoJAX names
# Each entry: canonical_name -> (regex patterns, SI unit, applicable test types)
COLUMN_MAPPINGS: dict[str, tuple[list[str], str, list[str]]] = {
    "time": (
        [r"^time$", r"^t$", r"^zeit$"],
        "s",
        ["creep", "relaxation", "oscillation", "rotation"],
    ),
    "angular_frequency": (
        [r"^angular[\s_]?frequency$", r"^frequency$", r"^omega$", r"^ω$"],
        "rad/s",
        ["oscillation"],
    ),
    "shear_stress": (
        [r"^shear[\s_]?stress$", r"^stress$", r"^τ$", r"^tau$"],
        "Pa",
        ["creep", "relaxation", "rotation"],
    ),
    "shear_strain": (
        [r"^shear[\s_]?strain$", r"^strain$", r"^γ$", r"^gamma$"],
        "dimensionless",
        ["creep", "relaxation"],
    ),
    "shear_rate": (
        [r"^shear[\s_]?rate$", r"^γ̇$", r"^gamma[\s_]?dot$"],
        "1/s",
        ["rotation"],
    ),
    "compliance": (
        [r"^compliance$", r"^j\(?t\)?$"],
        "1/Pa",
        ["creep"],
    ),
    "relaxation_modulus": (
        [r"^relaxation[\s_]?modulus$", r"^g\(?t\)?$"],
        "Pa",
        ["relaxation"],
    ),
    "storage_modulus": (
        [r"^storage[\s_]?modulus$", r"^g'$", r"^g_prime$"],
        "Pa",
        ["oscillation"],
    ),
    "loss_modulus": (
        [r"^loss[\s_]?modulus$", r"^g''$", r'^g"$', r"^g_double_prime$"],
        "Pa",
        ["oscillation"],
    ),
    "complex_modulus": (
        [r"^complex[\s_]?modulus$", r"^g\*$", r"^\|g\*\|$"],
        "Pa",
        ["oscillation"],
    ),
    "tensile_storage_modulus": (
        [
            r"^e'$",
            r"^e_prime$",
            r"^e_stor$",
            r"^tensile[\s_]?storage[\s_]?modulus$",
            r"^young'?s?[\s_]?storage[\s_]?modulus$",
        ],
        "Pa",
        ["oscillation"],
    ),
    "tensile_loss_modulus": (
        [
            r"^e''$",
            r'^e"$',
            r"^e_double_prime$",
            r"^e_loss$",
            r"^tensile[\s_]?loss[\s_]?modulus$",
            r"^young'?s?[\s_]?loss[\s_]?modulus$",
        ],
        "Pa",
        ["oscillation"],
    ),
    "viscosity": (
        [r"^viscosity$", r"^η$", r"^eta$"],
        "Pa.s",
        ["rotation"],
    ),
    "complex_viscosity": (
        [r"^complex[\s_]?viscosity$", r"^η\*$", r"^eta\*$"],
        "Pa.s",
        ["oscillation"],
    ),
    "phase_angle": (
        [r"^phase[\s_]?angle$", r"^δ$", r"^delta$"],
        "deg",
        ["oscillation"],
    ),
    "temperature": (
        [r"^temperature$", r"^temp$"],
        "°C",
        ["creep", "relaxation", "oscillation", "rotation"],
    ),
    "normal_force": (
        [r"^normal[\s_]?force$"],
        "N",
        ["creep", "relaxation", "oscillation", "rotation"],
    ),
    "torque": (
        [r"^torque$"],
        "N.m",
        ["rotation"],
    ),
    "strain_amplitude": (
        [r"^strain[\s_]?amplitude$"],
        "dimensionless",
        ["oscillation"],
    ),
    "stress_amplitude": (
        [r"^stress[\s_]?amplitude$"],
        "Pa",
        ["oscillation"],
    ),
}

# Pre-compiled patterns for column mapping (performance optimization)
_COLUMN_PATTERNS_COMPILED: dict[str, list[re.Pattern]] = {
    canonical: [re.compile(p, re.IGNORECASE) for p in patterns]
    for canonical, (patterns, _, _) in COLUMN_MAPPINGS.items()
}

# Pre-compiled pattern for unit extraction
_UNIT_EXTRACTION_PATTERN = re.compile(r"^(.*?)[\[(](.*?)[\])]")


# =============================================================================
# Unit Conversions (T009)
# =============================================================================

# Maps source units to (target_unit, conversion_factor)
UNIT_CONVERSIONS: dict[str, tuple[str, float]] = {
    "hz": ("rad/s", 2 * math.pi),
    "1/hz": ("rad/s", 2 * math.pi),
    "ms": ("s", 0.001),
    "min": ("s", 60.0),
    "mins": ("s", 60.0),
    "minutes": ("s", 60.0),
    "kpa": ("Pa", 1000.0),
    "mpa": ("Pa", 1e6),
    "mpa·s": ("Pa.s", 0.001),
    "mpa.s": ("Pa.s", 0.001),
    "%": ("dimensionless", 0.01),
}


# =============================================================================
# Encoding Detection (T010)
# =============================================================================


def _detect_encoding(filepath: Path) -> str:
    """Detect file encoding using cascade approach.

    RheoCompass exports are typically UTF-16 with BOM. Falls back through
    common encodings.

    Args:
        filepath: Path to file

    Returns:
        Detected encoding string

    Raises:
        UnicodeDecodeError: If no encoding works
    """
    encodings = ["utf-16", "utf-8-sig", "utf-8", "latin-1"]

    for encoding in encodings:
        try:
            with open(filepath, encoding=encoding) as f:
                # Read a sample to verify encoding works
                f.read(4096)
            logger.debug("Detected encoding", encoding=encoding)
            return encoding
        except (UnicodeDecodeError, UnicodeError):
            continue

    # Last resort: latin-1 with error replacement
    logger.warning("Could not detect encoding, using latin-1 with error replacement")
    return "latin-1"


def _detect_encoding_cached(filepath_str: str) -> str:
    """Detect encoding with mtime-aware caching.

    Re-runs detection when the file modification time or size changes so that a
    file overwritten in the same process does not receive stale results.

    Args:
        filepath_str: File path as string (for hashability)

    Returns:
        Detected encoding string
    """
    stat = Path(filepath_str).stat()
    mtime = stat.st_mtime
    file_size = stat.st_size
    return _detect_encoding_impl(filepath_str, mtime, file_size)


@lru_cache(maxsize=128)
def _detect_encoding_impl(filepath_str: str, mtime: float, file_size: int) -> str:
    """Cached implementation keyed on path + modification time + file size.

    Args:
        filepath_str: File path as string
        mtime: File modification time (invalidates cache on file change)
        file_size: File size in bytes (additional cache invalidation key)

    Returns:
        Detected encoding string
    """
    return _detect_encoding(Path(filepath_str))


# =============================================================================
# Decimal Separator Detection (T015)
# =============================================================================


def _detect_decimal_separator(text_sample: str) -> str:
    """Detect decimal separator from text sample.

    European locales may use comma as decimal separator and period as
    thousands separator.

    Args:
        text_sample: Sample text containing numeric values

    Returns:
        Detected decimal separator ('.' or ',')
    """
    # Count patterns like "digit.digit" and "digit,digit"
    dot_pattern = re.findall(r"\d\.\d", text_sample)
    comma_pattern = re.findall(r"\d,\d", text_sample)

    dot_count = len(dot_pattern)
    comma_count = len(comma_pattern)

    if comma_count > dot_count * 2:
        decimal_sep = ","
    else:
        decimal_sep = "."

    if decimal_sep == "," and abs(comma_count - dot_count) < 5:
        warnings.warn(
            f"Decimal separator detection is uncertain (commas={comma_count}, dots={dot_count}). "
            f"Assuming '{decimal_sep}'. Pass decimal_sep= explicitly if incorrect.",
            stacklevel=3,
        )

    return decimal_sep


# =============================================================================
# Global Metadata Extraction (T012)
# =============================================================================


def _extract_global_metadata(lines: list[str]) -> dict[str, Any]:
    """Extract key:value metadata pairs before first interval marker.

    Args:
        lines: All lines from file

    Returns:
        Dictionary of metadata key-value pairs
    """
    metadata: dict[str, Any] = {}

    for line in lines:
        # Stop at first interval marker
        if line.strip().startswith("Interval and data points:"):
            break

        # Parse key:\tvalue or key:\tvalue format
        if "\t" in line:
            parts = line.split("\t", 1)
            if len(parts) == 2:
                key = parts[0].strip().rstrip(":")
                value = parts[1].strip()
                if key and value:
                    metadata[key] = value
        elif ":" in line and not line.strip().startswith("Interval"):
            parts = line.split(":", 1)
            if len(parts) == 2:
                key = parts[0].strip()
                value = parts[1].strip()
                if key and value:
                    metadata[key] = value

    return metadata


# =============================================================================
# Interval Boundary Detection (T013)
# =============================================================================


def _find_interval_boundaries(lines: list[str]) -> list[tuple[int, int, int | None]]:
    """Find all interval markers and their boundaries.

    Args:
        lines: All lines from file

    Returns:
        List of (start_line_idx, interval_index, n_points) tuples
    """
    boundaries = []
    marker_pattern = re.compile(r"Interval and data points:\s*(\d+)(?:\s+(\d+))?")

    for i, line in enumerate(lines):
        match = marker_pattern.search(line)
        if match:
            interval_idx = int(match.group(1))
            n_points = int(match.group(2)) if match.group(2) else None
            boundaries.append((i, interval_idx, n_points))

    return boundaries


# =============================================================================
# Single Interval Parsing (T014)
# =============================================================================


def _extract_unit(column_name: str) -> tuple[str, str | None]:
    """Extract base name and unit from column header.

    Handles both bracket [unit] and parentheses (unit) notation.

    Args:
        column_name: Column header like "Time [s]" or "Stress (Pa)"

    Returns:
        Tuple of (base_name, unit) where unit may be None
    """
    # Match [unit] or (unit) using pre-compiled pattern
    match = _UNIT_EXTRACTION_PATTERN.search(column_name)
    if match:
        base = match.group(1).strip()
        unit = match.group(2).strip()
        return base, unit
    return column_name.strip(), None


def _parse_single_interval(
    lines: list[str], start_idx: int, end_idx: int | None, decimal_sep: str
) -> IntervalBlock:
    """Parse a single interval block into an IntervalBlock.

    Args:
        lines: All lines from file
        start_idx: Start line index (at interval marker)
        end_idx: End line index (exclusive), None for end of file
        decimal_sep: Decimal separator to use

    Returns:
        Parsed IntervalBlock
    """
    interval_lines = lines[start_idx : end_idx if end_idx else len(lines)]

    # Parse interval header
    header_match = re.search(
        r"Interval and data points:\s*(\d+)(?:\s+(\d+))?", interval_lines[0]
    )
    interval_idx = int(header_match.group(1)) if header_match else 1
    n_points = (
        int(header_match.group(2)) if header_match and header_match.group(2) else None
    )

    logger.debug("Parsing interval", interval_index=interval_idx, n_points=n_points)

    # Find "Interval data:" line with column headers
    data_start_idx = None
    column_headers = []
    units_dict: dict[str, str] = {}

    for i, line in enumerate(interval_lines[1:], 1):
        if line.strip().startswith("Interval data:"):
            # Column headers follow "Interval data:" prefix
            parts = line.split("\t")
            # Skip "Interval data:" prefix
            column_headers = [p.strip() for p in parts[1:] if p.strip()]
            data_start_idx = i + 1
            break

    if data_start_idx is None or not column_headers:
        raise ValueError(
            f"Could not find 'Interval data:' header in interval {interval_idx}"
        )

    # Check for units line (starts with tab and contains [unit])
    if data_start_idx < len(interval_lines):
        potential_units_line = interval_lines[data_start_idx]
        if potential_units_line.strip().startswith("[") or (
            "\t[" in potential_units_line
            and not potential_units_line.strip()[0].isdigit()
        ):
            # Parse units - skip empty first part if line starts with tab
            unit_parts = potential_units_line.split("\t")
            # Filter out empty parts and align with columns
            unit_parts = [p.strip() for p in unit_parts if p.strip()]
            for col, unit_str in zip(column_headers, unit_parts, strict=False):
                if unit_str.startswith("[") and unit_str.endswith("]"):
                    units_dict[col] = unit_str[1:-1]
                elif unit_str.startswith("(") and unit_str.endswith(")"):
                    units_dict[col] = unit_str[1:-1]
                elif unit_str:
                    units_dict[col] = unit_str
            data_start_idx += 1

    # Extract column units from headers if not in separate line
    for col in column_headers:
        base_name, unit = _extract_unit(col)
        if unit and col not in units_dict:
            units_dict[col] = unit

    # Collect data rows
    data_rows = []
    for line in interval_lines[data_start_idx:]:
        stripped = line.strip()
        if not stripped:
            continue
        # Stop at next interval marker or metadata-like lines
        if stripped.startswith("Interval and data points:"):
            break

        # Parse numeric values
        parts = line.split("\t")
        row_values = []
        for p in parts:
            p = p.strip()
            if not p:
                continue
            # IO-R6-001: Normalize decimal separator safely.
            # Try parsing as-is first (handles both "0.5" and "1000.5").
            # Only apply EU normalization (remove thousands dots, convert comma)
            # if direct parsing fails.
            try:
                row_values.append(float(p))
            except ValueError:
                pass
            else:
                continue
            if decimal_sep == ",":
                # IO-001: Only strip thousands-separator dots (before the comma),
                # not decimal dots in scientific notation mantissa (e.g. "1.23E+04")
                inner_parts = p.split(",", 1)
                inner_parts[0] = inner_parts[0].replace(".", "")
                p = ".".join(inner_parts)
            try:
                row_values.append(float(p))
            except ValueError:
                # Non-numeric value - could be end of data
                break

        if row_values and len(row_values) == len(column_headers):
            data_rows.append(row_values)
        elif row_values and len(row_values) > 0:
            # Partial row - pad with NaN
            n_missing = len(column_headers) - len(row_values)
            logger.warning(
                "Partial row padded with NaN",
                interval=interval_idx,
                row_index=len(data_rows),
                expected_cols=len(column_headers),
                actual_cols=len(row_values),
                n_padded=n_missing,
            )
            while len(row_values) < len(column_headers):
                row_values.append(float("nan"))
            data_rows.append(row_values)

    if not data_rows:
        raise ValueError(f"No valid data rows found in interval {interval_idx}")

    # Create DataFrame
    df = pd.DataFrame(data_rows, columns=column_headers)

    logger.debug(
        "Interval parsed",
        interval_index=interval_idx,
        n_rows=len(df),
        n_cols=len(df.columns),
    )

    return IntervalBlock(
        interval_index=interval_idx,
        n_points=n_points,
        units=units_dict,
        df=df,
    )


# =============================================================================
# Main Interval Parser (T016)
# =============================================================================


[docs] def parse_rheocompass_intervals( filepath: str | Path, *, encoding: str | None = None, marker: str = "Interval and data points:", ) -> tuple[dict[str, Any], list[IntervalBlock]]: """Parse RheoCompass file returning raw interval blocks. Low-level parser for advanced users who need full access to all columns and metadata without RheoData mapping. Args: filepath: Path to RheoCompass CSV export file encoding: File encoding override (auto-detected if None) marker: Interval start marker string Returns: Tuple of (global_metadata, interval_blocks) Raises: FileNotFoundError: File does not exist ValueError: No interval blocks found UnicodeDecodeError: Encoding detection failed """ filepath = Path(filepath) logger.info("Opening file", filepath=str(filepath)) if not filepath.exists(): logger.error("File not found", filepath=str(filepath)) raise FileNotFoundError(f"File not found: {filepath}") # Detect encoding (using cached version for repeated file access) if encoding is None: encoding = _detect_encoding_cached(str(Path(filepath).resolve())) # Read entire file with open(filepath, encoding=encoding, errors="replace") as f: content = f.read() lines = content.splitlines() # Detect decimal separator from content sample decimal_sep = _detect_decimal_separator(content[:4096]) logger.debug("Detected decimal separator", decimal_sep=decimal_sep) # Extract global metadata global_metadata = _extract_global_metadata(lines) # Find interval boundaries boundaries = _find_interval_boundaries(lines) if not boundaries: logger.error("No interval blocks found", filepath=str(filepath)) raise ValueError( f"No interval blocks found in file. " f"Expected '{marker}' markers in RheoCompass format." ) logger.debug("Found interval boundaries", n_intervals=len(boundaries)) # Parse each interval, tracking skipped intervals for data integrity blocks = [] skipped_intervals = [] for i, (start_idx, interval_idx, _n_points) in enumerate(boundaries): end_idx = boundaries[i + 1][0] if i + 1 < len(boundaries) else None try: block = _parse_single_interval(lines, start_idx, end_idx, decimal_sep) blocks.append(block) except ValueError as e: logger.warning( "Skipping unparseable interval — data will be incomplete", filepath=str(filepath), interval=interval_idx, error=str(e), ) skipped_intervals.append((interval_idx, str(e))) continue if not blocks: logger.error("Failed to parse any interval blocks", filepath=str(filepath)) raise ValueError("Failed to parse any interval blocks from file") # Warn loudly if a significant fraction of intervals was lost n_total = len(boundaries) n_skipped = len(skipped_intervals) if n_skipped > 0: skip_pct = 100.0 * n_skipped / n_total logger.warning( "Some intervals could not be parsed", filepath=str(filepath), skipped=n_skipped, total=n_total, skip_percent=f"{skip_pct:.0f}%", skipped_ids=[s[0] for s in skipped_intervals], ) if n_skipped > n_total / 2: raise ValueError( f"More than half of the intervals ({n_skipped}/{n_total}) " f"failed to parse. The file may be corrupt or in an " f"unsupported format. Skipped intervals: " f"{[s[0] for s in skipped_intervals]}" ) logger.info( "File parsed", filepath=str(filepath), n_intervals=len(blocks), n_skipped=n_skipped, ) global_metadata["skipped_intervals"] = [ (idx, reason) for idx, reason in skipped_intervals ] global_metadata["n_intervals_total"] = n_total global_metadata["n_intervals_skipped"] = n_skipped return global_metadata, blocks
# ============================================================================= # Column Mapping (T022) # ============================================================================= def _map_column_to_canonical(column_name: str) -> str | None: """Map a RheoCompass column name to canonical name. Args: column_name: Original column name (may include unit) Returns: Canonical name or None if no match """ # Extract base name without unit base_name, _ = _extract_unit(column_name) base_lower = base_name.lower().strip() # Use pre-compiled patterns for performance for canonical, patterns in _COLUMN_PATTERNS_COMPILED.items(): for pattern in patterns: if pattern.match(base_lower): return canonical return None def _convert_unit( values: np.ndarray, source_unit: str | None, target_unit: str ) -> tuple[np.ndarray, str]: """Convert values from source unit to target SI unit. Args: values: Array of values source_unit: Source unit string (may be None) target_unit: Target SI unit Returns: Tuple of (converted_values, actual_unit) """ if source_unit is None: return values, target_unit source_lower = source_unit.lower().strip() if source_lower in UNIT_CONVERSIONS: target, factor = UNIT_CONVERSIONS[source_lower] return values * factor, target return values, source_unit def _map_columns_to_canonical( df: pd.DataFrame, units_dict: dict[str, str] ) -> tuple[pd.DataFrame, dict[str, str]]: """Map DataFrame columns to canonical names with SI units. Args: df: Original DataFrame units_dict: Column name to unit mapping Returns: Tuple of (mapped DataFrame, canonical units dict) """ mapped_df = pd.DataFrame() mapped_units: dict[str, str] = {} for col in df.columns: canonical = _map_column_to_canonical(col) source_unit = units_dict.get(col) if canonical: # Get target SI unit _, target_unit, _ = COLUMN_MAPPINGS[canonical] values = df[col].values converted, actual_unit = _convert_unit(values, source_unit, target_unit) mapped_df[canonical] = converted mapped_units[canonical] = actual_unit else: # Keep original column name (for auxiliary data) base_name, _ = _extract_unit(col) mapped_df[base_name] = df[col].values if source_unit: mapped_units[base_name] = source_unit return mapped_df, mapped_units # ============================================================================= # Derived Quantity Computation (T023, T024, T031) # ============================================================================= def _compute_compliance(df: pd.DataFrame) -> pd.DataFrame: """Calculate compliance J(t) = strain/stress when absent. Args: df: DataFrame with canonical column names Returns: DataFrame with compliance column added if computed """ if "compliance" in df.columns: return df if "shear_strain" in df.columns and "shear_stress" in df.columns: strain = df["shear_strain"].values stress = df["shear_stress"].values # Avoid division by zero with np.errstate(divide="ignore", invalid="ignore"): compliance = np.where(stress != 0, strain / stress, np.nan) df = df.copy() df["compliance"] = compliance logger.debug("Computed compliance J(t) = strain/stress") return df def _compute_relaxation_modulus(df: pd.DataFrame) -> pd.DataFrame: """Calculate relaxation modulus G(t) = stress/strain when absent. Args: df: DataFrame with canonical column names Returns: DataFrame with relaxation_modulus column added if computed """ if "relaxation_modulus" in df.columns: return df if "shear_stress" in df.columns and "shear_strain" in df.columns: stress = df["shear_stress"].values strain = df["shear_strain"].values # Avoid division by zero — use NaN (not 0.0) at t=0 where strain is zero. # IO-R6-009: 0.0 fabricates a physically nonsensical G(t)=0 that biases # downstream NLSQ/Bayesian fits. NaN correctly signals "undefined". with np.errstate(divide="ignore", invalid="ignore"): modulus = np.where(strain != 0, stress / strain, np.nan) df = df.copy() df["relaxation_modulus"] = modulus logger.debug("Computed relaxation modulus G(t) = stress/strain") return df def _compute_complex_modulus( df: pd.DataFrame, ) -> tuple[np.ndarray | None, str | None]: """Calculate complex modulus G* = G' + i*G'' or E* = E' + i*E''. Checks shear modulus columns first, then tensile modulus columns. Args: df: DataFrame with canonical column names Returns: Tuple of (complex array, deformation_mode) where deformation_mode is "shear", "tension", or None if cannot compute. """ if "storage_modulus" in df.columns and "loss_modulus" in df.columns: g_prime = df["storage_modulus"].values g_double_prime = df["loss_modulus"].values return g_prime + 1j * g_double_prime, "shear" if "tensile_storage_modulus" in df.columns and "tensile_loss_modulus" in df.columns: e_prime = df["tensile_storage_modulus"].values e_double_prime = df["tensile_loss_modulus"].values return e_prime + 1j * e_double_prime, "tension" return None, None # ============================================================================= # Test Type Detection (T041, T042) # ============================================================================= def _is_column_constant(series: pd.Series, threshold: float = 0.01) -> bool: """Check if a column has constant values (low variance). Args: series: Pandas series to check threshold: Relative variance threshold (default 1%) Returns: True if column appears constant """ values = series.dropna().values if len(values) < 2: return True mean_val = np.mean(np.abs(values)) if mean_val == 0: return True std_val = np.std(values) # Absolute tolerance for near-zero data if std_val < 1e-10: return True return (std_val / mean_val) < threshold def _detect_test_type(df: pd.DataFrame) -> str | None: """Detect test type from column presence and data characteristics. Detection rules (evaluated in priority order): 1. Oscillatory: Has G'/G'' and frequency 2. Creep: Has compliance/strain with constant stress 3. Relaxation: Has G(t)/stress with constant strain 4. Rotation: Has shear rate and viscosity/stress Args: df: DataFrame with canonical column names Returns: Test mode string or None if ambiguous """ columns = set(df.columns) # Priority 1: Oscillatory (frequency domain) — includes tensile moduli (DMTA) has_frequency = "angular_frequency" in columns has_moduli = "storage_modulus" in columns or "loss_modulus" in columns has_tensile_moduli = ( "tensile_storage_modulus" in columns or "tensile_loss_modulus" in columns ) if has_frequency and (has_moduli or has_tensile_moduli): return "oscillation" # Priority 2: Creep (time domain, constant stress) has_time = "time" in columns has_compliance_data = "compliance" in columns or "shear_strain" in columns if has_time and has_compliance_data: if "shear_stress" in columns: if _is_column_constant(df["shear_stress"]): return "creep" elif "compliance" in columns: # Has explicit compliance column - likely creep return "creep" # Priority 3: Relaxation (time domain, constant strain) has_relaxation_data = "relaxation_modulus" in columns or "shear_stress" in columns if has_time and has_relaxation_data: if "shear_strain" in columns: if _is_column_constant(df["shear_strain"]): return "relaxation" elif "relaxation_modulus" in columns: # Has explicit G(t) column - likely relaxation return "relaxation" # Priority 4: Rotation (flow test) has_shear_rate = "shear_rate" in columns has_flow_data = "viscosity" in columns or "shear_stress" in columns if has_shear_rate and has_flow_data: # Make sure it's not oscillatory if not has_moduli: return "rotation" return None # ============================================================================= # Metadata Extraction (T050, T051, T052) # ============================================================================= def _extract_geometry_metadata(global_meta: dict[str, Any]) -> dict[str, Any]: """Extract geometry information from global metadata. Args: global_meta: Global metadata dictionary Returns: Dictionary with geometry, gap, diameter keys """ geometry_meta: dict[str, Any] = {} # Common geometry keys for key in ["Geometry", "geometry", "Measuring System"]: if key in global_meta: geometry_meta["geometry"] = global_meta[key] break for key in ["Gap", "gap", "Measuring Gap"]: if key in global_meta: geometry_meta["gap"] = global_meta[key] break for key in ["Diameter", "diameter"]: if key in global_meta: geometry_meta["diameter"] = global_meta[key] break return geometry_meta def _extract_temperature_metadata( global_meta: dict[str, Any], df: pd.DataFrame ) -> dict[str, Any]: """Extract temperature from header and per-point data, normalizing to Kelvin. Preserves the original Celsius value as ``temperature_celsius`` when a Celsius reading is detected. The ``temperature`` key always holds Kelvin. Args: global_meta: Global metadata dictionary df: DataFrame with data columns Returns: Dictionary with temperature info (temperature in Kelvin) """ temp_meta: dict[str, Any] = {} # Header temperature — detect unit and convert to Kelvin when Celsius for key in ["Temperature", "temperature", "Temp"]: if key in global_meta: raw_value = global_meta[key] # Check for an explicit unit key (e.g. "temperature_unit": "°C") unit_key = ( f"{key}_unit" if f"{key}_unit" in global_meta else "temperature_unit" ) raw_unit = global_meta.get(unit_key, "") # Determine whether the value is in Celsius is_celsius = "°C" in str(raw_unit) or raw_unit.strip().lower() in ( "c", "°c", "celsius", ) if not is_celsius: # Fall back: if the raw value looks like it could be Celsius # (e.g. stored as a plain float without an explicit unit key) # and no unit says Kelvin/Fahrenheit, assume Celsius. unit_lower = str(raw_unit).strip().lower() is_kelvin = unit_lower in ("k", "kelvin") is_fahrenheit = unit_lower in ("f", "°f", "fahrenheit") if not is_kelvin and not is_fahrenheit: # No unit information — treat as Celsius (most common for # RheoCompass exports which always record in °C) is_celsius = True try: numeric_value = float(raw_value) if is_celsius: temp_meta["temperature_celsius"] = numeric_value temp_meta["temperature"] = normalize_temperature(numeric_value, "C") else: temp_meta["temperature"] = numeric_value except (TypeError, ValueError): # Non-numeric value (e.g. a string label) — store as-is temp_meta["temperature"] = raw_value break # Per-point temperature — convert column data to Kelvin if in Celsius if "temperature" in df.columns: temp_values = df["temperature"].values.astype(float) # Per-point data from RheoCompass is always in °C; convert to K temp_meta["temperature_celsius_data"] = temp_values temp_meta["temperature_data"] = temp_values + 273.15 return temp_meta def _extract_auxiliary_columns( df: pd.DataFrame, units_dict: dict[str, str] ) -> dict[str, Any]: """Extract auxiliary columns (normal force, torque) into metadata. Args: df: DataFrame with canonical column names units_dict: Column units Returns: Dictionary with auxiliary data """ aux_meta: dict[str, Any] = {} for col in ["normal_force", "torque", "phase_angle", "complex_viscosity"]: if col in df.columns: aux_meta[col] = df[col].values if col in units_dict: aux_meta[f"{col}_units"] = units_dict[col] return aux_meta # ============================================================================= # RheoData Converters (T025, T026, T032, T058) # ============================================================================= def _interval_to_rheodata_creep( block: IntervalBlock, global_meta: dict[str, Any], mapped_df: pd.DataFrame, mapped_units: dict[str, str], ) -> RheoData: """Convert interval block to RheoData for creep test. Args: block: Parsed interval block global_meta: Global file metadata mapped_df: DataFrame with canonical columns mapped_units: Units for canonical columns Returns: RheoData configured for creep analysis """ # Compute compliance if needed mapped_df = _compute_compliance(mapped_df) # Extract x (time) and y (compliance) x = ( mapped_df["time"].values if "time" in mapped_df.columns else np.arange(len(mapped_df)) ) # Prefer compliance over raw strain if "compliance" in mapped_df.columns: y = mapped_df["compliance"].values y_units = mapped_units.get("compliance", "1/Pa") else: y = mapped_df["shear_strain"].values y_units = mapped_units.get("shear_strain", "dimensionless") x_units = mapped_units.get("time", "s") # Build metadata metadata = { "source": "rheocompass", "interval_index": block.interval_index, "test_mode": "creep", **_extract_geometry_metadata(global_meta), **_extract_temperature_metadata(global_meta, mapped_df), **_extract_auxiliary_columns(mapped_df, mapped_units), "columns": list(mapped_df.columns), "global_metadata": global_meta, } return RheoData( x=x, y=y, x_units=x_units, y_units=y_units, domain="time", initial_test_mode="creep", metadata=metadata, ) def _interval_to_rheodata_relaxation( block: IntervalBlock, global_meta: dict[str, Any], mapped_df: pd.DataFrame, mapped_units: dict[str, str], ) -> RheoData: """Convert interval block to RheoData for relaxation test. Args: block: Parsed interval block global_meta: Global file metadata mapped_df: DataFrame with canonical columns mapped_units: Units for canonical columns Returns: RheoData configured for relaxation analysis """ # Compute relaxation modulus if needed mapped_df = _compute_relaxation_modulus(mapped_df) # Extract x (time) and y (G(t)) x = ( mapped_df["time"].values if "time" in mapped_df.columns else np.arange(len(mapped_df)) ) # Prefer relaxation_modulus, then shear_stress, fallback to first y-like column if "relaxation_modulus" in mapped_df.columns: y = mapped_df["relaxation_modulus"].values y_units = mapped_units.get("relaxation_modulus", "Pa") elif "shear_stress" in mapped_df.columns: y = mapped_df["shear_stress"].values y_units = mapped_units.get("shear_stress", "Pa") else: # Fallback: use second column if available cols = [c for c in mapped_df.columns if c != "time"] if cols: y = mapped_df[cols[0]].values y_units = mapped_units.get(cols[0], "Pa") else: raise ValueError( f"No y-data column found in relaxation interval. " f"Available columns: {list(mapped_df.columns)}. " "Expected 'relaxation_modulus' or 'shear_stress'." ) x_units = mapped_units.get("time", "s") # Build metadata metadata = { "source": "rheocompass", "interval_index": block.interval_index, "test_mode": "relaxation", **_extract_geometry_metadata(global_meta), **_extract_temperature_metadata(global_meta, mapped_df), **_extract_auxiliary_columns(mapped_df, mapped_units), "columns": list(mapped_df.columns), "global_metadata": global_meta, } return RheoData( x=x, y=y, x_units=x_units, y_units=y_units, domain="time", initial_test_mode="relaxation", metadata=metadata, ) def _interval_to_rheodata_oscillation( block: IntervalBlock, global_meta: dict[str, Any], mapped_df: pd.DataFrame, mapped_units: dict[str, str], ) -> RheoData: """Convert interval block to RheoData for oscillatory test. Args: block: Parsed interval block global_meta: Global file metadata mapped_df: DataFrame with canonical columns mapped_units: Units for canonical columns Returns: RheoData configured for oscillatory analysis with complex G* """ # Extract x (frequency) x = ( mapped_df["angular_frequency"].values if "angular_frequency" in mapped_df.columns else np.arange(len(mapped_df)) ) # Compute complex modulus G* = G' + i*G'' or E* = E' + i*E'' modulus_star, deformation_mode = _compute_complex_modulus(mapped_df) if modulus_star is not None: y = modulus_star elif "complex_modulus" in mapped_df.columns: y = mapped_df["complex_modulus"].values deformation_mode = "shear" else: # Fallback to storage modulus only (shear or tensile) if "storage_modulus" in mapped_df.columns: y = mapped_df["storage_modulus"].values deformation_mode = "shear" elif "tensile_storage_modulus" in mapped_df.columns: y = mapped_df["tensile_storage_modulus"].values deformation_mode = "tension" else: raise ValueError( "Oscillation data requires 'storage_modulus'/'loss_modulus' or " "'tensile_storage_modulus'/'tensile_loss_modulus' columns. " f"Available columns: {list(mapped_df.columns)}" ) x_units = mapped_units.get("angular_frequency", "rad/s") y_units = "Pa" # Complex modulus in Pa # Build metadata with G' and G'' accessible metadata = { "source": "rheocompass", "interval_index": block.interval_index, "test_mode": "oscillation", **_extract_geometry_metadata(global_meta), **_extract_temperature_metadata(global_meta, mapped_df), **_extract_auxiliary_columns(mapped_df, mapped_units), "columns": list(mapped_df.columns), "global_metadata": global_meta, } # Set deformation_mode if detected from column names if deformation_mode is not None: metadata["deformation_mode"] = deformation_mode return RheoData( x=x, y=y, x_units=x_units, y_units=y_units, domain="frequency", initial_test_mode="oscillation", metadata=metadata, ) def _interval_to_rheodata_rotation( block: IntervalBlock, global_meta: dict[str, Any], mapped_df: pd.DataFrame, mapped_units: dict[str, str], ) -> RheoData: """Convert interval block to RheoData for rotational/flow test. Args: block: Parsed interval block global_meta: Global file metadata mapped_df: DataFrame with canonical columns mapped_units: Units for canonical columns Returns: RheoData configured for flow analysis """ # Extract x (shear rate) and y (viscosity) x = ( mapped_df["shear_rate"].values if "shear_rate" in mapped_df.columns else np.arange(len(mapped_df)) ) if "viscosity" in mapped_df.columns: y = mapped_df["viscosity"].values y_units = mapped_units.get("viscosity", "Pa.s") elif "shear_stress" in mapped_df.columns: y = mapped_df["shear_stress"].values y_units = mapped_units.get("shear_stress", "Pa") else: raise ValueError( f"Rotation interval has no recognized y-data column " f"(expected 'viscosity' or 'shear_stress'). " f"Available columns: {list(mapped_df.columns)}" ) x_units = mapped_units.get("shear_rate", "1/s") # Build metadata metadata = { "source": "rheocompass", "interval_index": block.interval_index, "test_mode": "rotation", **_extract_geometry_metadata(global_meta), **_extract_temperature_metadata(global_meta, mapped_df), **_extract_auxiliary_columns(mapped_df, mapped_units), "columns": list(mapped_df.columns), "global_metadata": global_meta, } return RheoData( x=x, y=y, x_units=x_units, y_units=y_units, domain="time", # Flow curves are rate-domain but use time paradigm initial_test_mode="rotation", metadata=metadata, ) # ============================================================================= # Main API (T065) # =============================================================================
[docs] def load_anton_paar( filepath: str | Path, *, test_mode: str | None = None, interval: int | None = None, return_all: bool = False, encoding: str | None = None, x_col: str | None = None, y_col: str | None = None, progress_callback: Callable[[int, int], None] | None = None, ) -> RheoData | list[RheoData]: """Load RheoCompass CSV export file and return RheoData object(s). Handles interval-based file structure, auto-detects test type, extracts metadata, and normalizes units to SI. Args: filepath: Path to RheoCompass CSV export file test_mode: Explicit test mode override ("creep", "relaxation", "oscillation", "rotation"). If None, auto-detected from columns. interval: Specific interval index to load (1-based). If None with return_all=False, returns first interval. return_all: If True, always return list of RheoData. encoding: File encoding override (auto-detected if None). x_col: Override for x-axis column selection. y_col: Override for y-axis column selection. progress_callback: Callback receiving (current, total) for progress. Returns: Single RheoData for single-interval files (unless return_all=True). List of RheoData for multi-interval files or when return_all=True. Raises: FileNotFoundError: File does not exist ValueError: No interval blocks, cannot detect test type, or interval index out of range """ # Parse raw intervals global_meta, blocks = parse_rheocompass_intervals(filepath, encoding=encoding) if not blocks: raise ValueError("No interval blocks found in file") # Handle interval selection if interval is not None: # Find block with matching interval index matching = [b for b in blocks if b.interval_index == interval] if not matching: valid_indices = [b.interval_index for b in blocks] logger.error( "Interval not found", interval=interval, valid_indices=valid_indices ) raise ValueError( f"Interval {interval} not found. Valid intervals: {valid_indices}" ) blocks = matching total_blocks = len(blocks) results: list[RheoData] = [] for i, block in enumerate(blocks): if progress_callback: progress_callback(i + 1, total_blocks) # Map columns to canonical names mapped_df, mapped_units = _map_columns_to_canonical(block.df, block.units) # Handle custom x/y column selection if x_col is not None and x_col not in mapped_df.columns: logger.warning( "x_col override is not supported for Anton Paar format; " "column selection is automatic based on test mode", x_col=x_col, ) if y_col is not None and y_col not in mapped_df.columns: logger.warning( "y_col override is not supported for Anton Paar format; " "column selection is automatic based on test mode", y_col=y_col, ) # Detect or use specified test mode detected_mode = test_mode if detected_mode is None: detected_mode = _detect_test_type(mapped_df) logger.debug( "Auto-detected test mode", test_mode=detected_mode, interval=block.interval_index, ) if detected_mode is None: warnings.warn( f"Could not auto-detect test type for interval {block.interval_index}. " "Specify test_mode parameter explicitly.", UserWarning, stacklevel=2, ) # Default to relaxation as safest assumption for time-domain data detected_mode = "relaxation" # Convert to RheoData using appropriate converter if detected_mode == "creep": rheo_data = _interval_to_rheodata_creep( block, global_meta, mapped_df, mapped_units ) elif detected_mode == "relaxation": rheo_data = _interval_to_rheodata_relaxation( block, global_meta, mapped_df, mapped_units ) elif detected_mode == "oscillation": rheo_data = _interval_to_rheodata_oscillation( block, global_meta, mapped_df, mapped_units ) elif detected_mode == "rotation": rheo_data = _interval_to_rheodata_rotation( block, global_meta, mapped_df, mapped_units ) else: logger.error("Unknown test mode", test_mode=detected_mode) raise ValueError(f"Unknown test mode: {detected_mode}") # Handle custom column overrides if x_col is not None and x_col in mapped_df.columns: rheo_data = RheoData( x=mapped_df[x_col].values, y=rheo_data.y, x_units=mapped_units.get(x_col), y_units=rheo_data.y_units, domain=rheo_data.domain, initial_test_mode=detected_mode, metadata=rheo_data.metadata, ) if y_col is not None and y_col in mapped_df.columns: rheo_data = RheoData( x=rheo_data.x, y=mapped_df[y_col].values, x_units=rheo_data.x_units, y_units=mapped_units.get(y_col), domain=rheo_data.domain, initial_test_mode=detected_mode, metadata=rheo_data.metadata, ) results.append(rheo_data) # Return single or list based on parameters if return_all or len(results) > 1: return results return results[0]
# ============================================================================= # Excel Export (save_intervals_to_excel) # =============================================================================
[docs] def save_intervals_to_excel( rheo_data_list: list[RheoData] | RheoData, filepath: str | Path, *, include_metadata_sheet: bool = True, sheet_prefix: str = "Interval", ) -> None: """Export multi-interval RheoData to Excel with one sheet per interval. Creates an Excel workbook where each interval becomes its own sheet (Interval_1, Interval_2, ...) plus an optional Metadata sheet containing global metadata and per-interval summary. Args: rheo_data_list: Single RheoData or list of RheoData objects (typically from load_anton_paar with return_all=True) filepath: Output Excel file path (.xlsx) include_metadata_sheet: Add a Metadata sheet with global info (default True) sheet_prefix: Prefix for interval sheet names (default "Interval") Raises: ImportError: If pandas or openpyxl not installed ValueError: If rheo_data_list is empty Example: >>> data_list = load_anton_paar("temp_sweep.csv", return_all=True) >>> save_intervals_to_excel(data_list, "output.xlsx") # Creates: Metadata, Interval_1, Interval_2, Interval_3 sheets """ try: import pandas as pd except ImportError as exc: logger.error("pandas not installed for Excel export", exc_info=True) raise ImportError( "pandas is required for Excel export. Install with: pip install pandas openpyxl" ) from exc # Normalize input to list if isinstance(rheo_data_list, RheoData): rheo_data_list = [rheo_data_list] if not rheo_data_list: raise ValueError("rheo_data_list cannot be empty") filepath = Path(filepath) filepath.parent.mkdir(parents=True, exist_ok=True) with pd.ExcelWriter(filepath, engine="openpyxl") as writer: # Write Metadata sheet first if include_metadata_sheet: metadata_df = _create_metadata_sheet(rheo_data_list) metadata_df.to_excel(writer, sheet_name="Metadata", index=False) # Write each interval as its own sheet for i, rheo_data in enumerate(rheo_data_list, start=1): # Get interval index from metadata if available interval_idx = rheo_data.metadata.get("interval_index", i) sheet_name = f"{sheet_prefix}_{interval_idx}" # Create DataFrame for this interval interval_df = _create_interval_dataframe(rheo_data) interval_df.to_excel(writer, sheet_name=sheet_name, index=False) logger.info( "Exported intervals to Excel", filepath=str(filepath), n_intervals=len(rheo_data_list), )
def _create_metadata_sheet(rheo_data_list: list[RheoData]) -> pd.DataFrame: """Create metadata DataFrame summarizing all intervals. Args: rheo_data_list: List of RheoData objects Returns: DataFrame with global metadata and per-interval summary """ import pandas as pd rows = [] # Extract global metadata from first interval first_data = rheo_data_list[0] global_meta = first_data.metadata.get("global_metadata", {}) # Add global metadata rows for key, value in global_meta.items(): rows.append({"Property": key, "Value": str(value), "Interval": "Global"}) # Add per-interval summary for i, rheo_data in enumerate(rheo_data_list, start=1): interval_idx = rheo_data.metadata.get("interval_index", i) rows.append( { "Property": f"Interval {interval_idx} - Test Mode", "Value": rheo_data.test_mode, "Interval": str(interval_idx), } ) rows.append( { "Property": f"Interval {interval_idx} - Points", "Value": str(len(rheo_data.x)), # type: ignore[arg-type] "Interval": str(interval_idx), } ) rows.append( { "Property": f"Interval {interval_idx} - X Units", "Value": rheo_data.x_units or "", "Interval": str(interval_idx), } ) rows.append( { "Property": f"Interval {interval_idx} - Y Units", "Value": rheo_data.y_units or "", "Interval": str(interval_idx), } ) # Add temperature if available # IO-R6-002: Use `is not None` to avoid swallowing temperature=0.0 temp = rheo_data.metadata.get("temperature") if temp is not None: rows.append( { "Property": f"Interval {interval_idx} - Temperature", "Value": str(temp), "Interval": str(interval_idx), } ) return pd.DataFrame(rows) def _create_interval_dataframe(rheo_data: RheoData) -> pd.DataFrame: """Create DataFrame for a single interval's data. Args: rheo_data: RheoData object for one interval Returns: DataFrame with x, y (and y_real/y_imag for complex) columns """ import pandas as pd # Determine column names based on test mode test_mode = rheo_data.test_mode x_name = _get_x_column_name(test_mode, rheo_data.x_units) y_name = _get_y_column_name(test_mode, rheo_data.y_units) data: dict[str, np.ndarray] = {} # Add x column data[x_name] = np.asarray(rheo_data.x) # Add y column(s) - handle complex data if rheo_data.is_complex: # For complex data, add separate G' and G'' columns data["G' (Storage Modulus) [Pa]"] = np.asarray(rheo_data.y_real) data["G'' (Loss Modulus) [Pa]"] = np.asarray(rheo_data.y_imag) data["|G*| (Complex Modulus) [Pa]"] = np.abs(np.asarray(rheo_data.y)) else: data[y_name] = np.asarray(rheo_data.y) # Add auxiliary columns from metadata for aux_col in ["temperature_data", "normal_force", "torque", "phase_angle"]: if aux_col in rheo_data.metadata: aux_data = rheo_data.metadata[aux_col] if len(aux_data) == len(rheo_data.x): # type: ignore[arg-type] col_name = _format_aux_column_name(aux_col, rheo_data.metadata) data[col_name] = np.asarray(aux_data) return pd.DataFrame(data) def _get_x_column_name(test_mode: str, units: str | None) -> str: """Get descriptive x-axis column name based on test mode.""" unit_str = f" [{units}]" if units else "" names = { "creep": f"Time{unit_str}", "relaxation": f"Time{unit_str}", "oscillation": f"Angular Frequency{unit_str}", "rotation": f"Shear Rate{unit_str}", } return names.get(test_mode, f"X{unit_str}") def _get_y_column_name(test_mode: str, units: str | None) -> str: """Get descriptive y-axis column name based on test mode.""" unit_str = f" [{units}]" if units else "" names = { "creep": f"Compliance J(t){unit_str}", "relaxation": f"Relaxation Modulus G(t){unit_str}", "oscillation": f"Complex Modulus G*{unit_str}", "rotation": f"Viscosity η{unit_str}", } return names.get(test_mode, f"Y{unit_str}") def _format_aux_column_name(col_name: str, metadata: dict) -> str: """Format auxiliary column name with units.""" units_key = f"{col_name}_units" units = metadata.get(units_key, "") unit_str = f" [{units}]" if units else "" names = { "temperature_data": f"Temperature{unit_str}", "normal_force": f"Normal Force{unit_str}", "torque": f"Torque{unit_str}", "phase_angle": f"Phase Angle{unit_str}", } return names.get(col_name, col_name)