Source code for rheojax.io.readers.trios.csv

"""TA Instruments TRIOS CSV file reader.

This module provides a reader for TRIOS CSV exports with support for:
- Tab or comma delimiters (auto-detected)
- Metadata header rows
- Units in parentheses or separate row
- Step/Segment columns for multi-step experiments
- Complex modulus construction (G' + iG'')
- Automatic encoding detection (UTF-8, Latin-1, CP1252)

Usage:
    >>> from rheojax.io.readers.trios import load_trios_csv
    >>> data = load_trios_csv('frequency_sweep.csv')
    >>> print(data.test_mode)  # 'oscillation'
"""

from __future__ import annotations

import re
import warnings
from collections.abc import Callable
from pathlib import Path
from typing import Any

import numpy as np
import pandas as pd

from rheojax.core.data import RheoData
from rheojax.io._exceptions import RheoJaxValidationWarning
from rheojax.io.readers.trios.common import (
    DataSegment,
    TRIOSFile,
    TRIOSTable,
    construct_complex_modulus,
    convert_unit,
    detect_step_column,
    detect_test_type,
    segment_to_rheodata,
    select_xy_columns,
    split_by_step,
)
from rheojax.logging import get_logger

logger = get_logger(__name__)

# Encoding cascade for auto-detection
ENCODING_CASCADE = ["utf-8", "latin-1", "cp1252"]

# Auto-chunking threshold (5 MB)
AUTO_CHUNK_THRESHOLD_MB = 5.0

# Unit substrings for identifying unit rows in TRIOS CSV files.
# Shared between first-table parsing and multi-table continuation loop.
_UNIT_SUBSTRINGS = frozenset(
    {
        "Pa",
        "Hz",
        "rad",
        "°C",
        "°F",
        "K",
        "/s",
        "%",
        "1/",
        "mN",
        "mPa",
        "kPa",
        "MPa",
        "N·m",
        "N.m",
        "J/",
        "W/",
        "m²",
        "m2",
        "mm",
        "μm",
        "nm",
    }
)


def detect_encoding(filepath: Path) -> str:
    """Detect file encoding using cascade approach.

    Tries UTF-8, Latin-1, and CP1252 in order.

    Args:
        filepath: Path to file

    Returns:
        Detected encoding string

    Raises:
        UnicodeDecodeError: If none of the encodings work
    """
    logger.debug("Detecting file encoding", filepath=str(filepath))
    for encoding in ENCODING_CASCADE:
        try:
            with open(filepath, encoding=encoding) as f:
                # Read first 1KB to check encoding
                f.read(1024)
            logger.debug("Encoding detected", encoding=encoding)
            return encoding
        except UnicodeDecodeError:
            logger.debug("Encoding failed", encoding=encoding)
            continue

    logger.error(
        "Failed to detect encoding",
        filepath=str(filepath),
        tried_encodings=ENCODING_CASCADE,
    )
    raise ValueError(f"Could not decode {filepath} with any of the attempted encodings")


def detect_delimiter(content: str, decimal_separator: str = ".") -> str:
    """Detect delimiter (tab vs comma) from file content.

    TRIOS CSV files typically use tabs, but may use commas.
    Metadata lines (Step/Procedure/Instrument/etc.) are skipped
    as they may use different delimiters than the actual data.

    When ``decimal_separator`` is ``","`` (EU locale), comma counts are inflated
    because every decimal number contributes a comma. In that case, any non-zero
    tab count is a stronger signal than the raw comma count.

    Args:
        content: First few lines of file content
        decimal_separator: Decimal separator used in the file ("." or ",")

    Returns:
        Delimiter character ('\t' or ',')
    """
    _METADATA_PREFIXES = (
        "step",
        "procedure",
        "instrument",
        "sample",
        "date",
        "time",
        "geometry",
        "filename",
        "operator",
        "rundate",
        "gap",
        "temperature",
        "number of points",
    )
    # Filter out metadata lines and section markers, then sample from last 5 non-metadata lines
    all_lines = content.split("\n")
    data_lines = [
        line
        for line in all_lines[:20]
        if line.strip()
        and not line.strip().lower().startswith(_METADATA_PREFIXES)
        and not line.strip().startswith("[")  # Skip [step] / section markers
    ]
    if not data_lines:
        # R8-IO-004: extend search past metadata instead of falling back to it
        extended = [
            line
            for line in all_lines[20:60]
            if line.strip()
            and not line.strip().lower().startswith(_METADATA_PREFIXES)
            and not line.strip().startswith("[")  # Skip [step] / section markers
        ]
        if extended:
            lines = extended[-5:]
        else:
            logger.debug("Could not detect delimiter from content; defaulting to tab")
            return "\t"
    else:
        lines = data_lines[-5:]
    tab_count = sum(line.count("\t") for line in lines)
    comma_count = sum(line.count(",") for line in lines)

    # EU decimal correction: when decimal_separator is comma and tabs exist,
    # most commas are decimal separators, not delimiters — prefer tab.
    if decimal_separator == "," and tab_count > 0:
        delimiter = "\t"
    elif tab_count >= comma_count:
        # Prefer tabs for TRIOS files (typical format)
        delimiter = "\t"
    else:
        delimiter = ","
    logger.debug(
        "Delimiter detected",
        delimiter=repr(delimiter),
        tab_count=tab_count,
        comma_count=comma_count,
    )
    return delimiter


def parse_metadata_header(
    lines: list[str],
    delimiter: str,
) -> tuple[dict[str, Any], int]:
    """Extract metadata from file header.

    TRIOS CSV files have metadata key-value pairs at the top,
    followed by the column headers.

    Args:
        lines: File lines
        delimiter: Field delimiter

    Returns:
        Tuple of (metadata dict, header row index)
    """
    logger.debug("Parsing metadata header", num_lines=len(lines))
    metadata: dict[str, Any] = {}
    header_row = 0

    # Known metadata patterns
    metadata_patterns = {
        "filename": r"^Filename",
        "instrument_serial_number": r"^Instrument serial number",
        "instrument_name": r"^Instrument name",
        "operator": r"^[Oo]perator",
        "run_date": r"^[Rr]undate",
        "sample_name": r"^Sample name",
        "geometry": r"^Geometry name",
        "geometry_type": r"^Geometry type",
        "gap": r"^Gap",
        "temperature": r"^Temperature",
    }

    for i, line in enumerate(lines):
        if not line.strip():
            continue

        # Check if this is a metadata line
        is_metadata = False
        for key, pattern in metadata_patterns.items():
            if re.match(pattern, line, re.IGNORECASE):
                parts = line.split(delimiter)
                if len(parts) >= 2:
                    value = parts[1].strip()
                    metadata[key] = value
                    logger.debug("Metadata field extracted", key=key, value=value)
                is_metadata = True
                break

        # Check if this looks like a header row (multiple text columns)
        if not is_metadata:
            parts = line.split(delimiter)
            # Header rows typically have "Variables" or multiple column names
            if (
                parts[0].strip().lower() == "variables"
                or len([p for p in parts if p.strip() and not p.strip().isdigit()]) >= 3
            ):
                header_row = i
                logger.debug("Header row found", header_row=header_row)
                break
            # Or if it starts with "Number of points" we're close to data
            if parts[0].strip().lower() == "number of points":
                if len(parts) >= 2:
                    try:
                        metadata["number_of_points"] = int(parts[1].strip())
                    except ValueError:
                        pass
                # Header is next line
                header_row = i + 1
                logger.debug(
                    "Header row found after 'Number of points'", header_row=header_row
                )
                break

    logger.debug(
        "Metadata parsing complete",
        metadata_fields=len(metadata),
        header_row=header_row,
    )
    return metadata, header_row


def detect_header_row(
    lines: list[str],
    delimiter: str,
    start_index: int = 0,
) -> int:
    """Find first row with column headers (data table start).

    Args:
        lines: File lines
        delimiter: Field delimiter
        start_index: Index to start searching from

    Returns:
        Header row index
    """
    logger.debug("Detecting header row", start_index=start_index, num_lines=len(lines))
    for i in range(start_index, len(lines)):
        line = lines[i].strip()
        if not line:
            continue

        parts = line.split(delimiter)

        # Check for "Variables" row (TRIOS format)
        if parts[0].strip().lower() == "variables":
            logger.debug("Header row detected via 'Variables' marker", row=i)
            return i

        # Check for "Number of points" - header is next
        if parts[0].strip().lower() == "number of points":
            logger.debug("Header row detected via 'Number of points' marker", row=i + 1)
            return i + 1

        # Check for multiple non-numeric columns (likely headers)
        non_numeric = 0
        for p in parts[1:]:  # Skip first column (often a label)
            if p.strip() and not _is_numeric(p.strip()):
                non_numeric += 1

        if non_numeric >= 2:
            logger.debug(
                "Header row detected via non-numeric columns",
                row=i,
                non_numeric_count=non_numeric,
            )
            return i

    logger.debug("No header row found, using start_index", start_index=start_index)
    return start_index


def _is_numeric(s: str) -> bool:
    """Check if string represents a numeric value."""
    try:
        float(s.replace(",", "."))
        return True
    except ValueError:
        return False


def _default_x_units(test_mode: str) -> str:
    """Get default x-axis units for a test mode."""
    if test_mode == "oscillation":
        return "rad/s"
    elif test_mode == "rotation":
        return "1/s"
    return "s"


def extract_units_from_header(
    header: list[str],
    unit_row: list[str] | None = None,
) -> dict[str, str]:
    """Parse units from column headers or separate unit row.

    TRIOS exports may have units in parentheses: "Angular Frequency (rad/s)"
    Or in a separate row below headers.

    Args:
        header: Column header names
        unit_row: Optional separate unit row

    Returns:
        Dict mapping column names to units
    """
    units: dict[str, str] = {}

    for i, col in enumerate(header):
        col_clean = col.strip()

        # Check for units in parentheses
        match = re.search(r"\(([^)]+)\)$", col_clean)
        if match:
            units[col_clean] = match.group(1)
            # Also store under name without units
            name_without_units = re.sub(r"\s*\([^)]+\)$", "", col_clean).strip()
            units[name_without_units] = match.group(1)
        elif unit_row and i < len(unit_row):
            # Use separate unit row
            unit = unit_row[i].strip()
            if unit:
                units[col_clean] = unit

    return units


def detect_repeated_headers(
    lines: list[str],
    delimiter: str,
    first_header: list[str],
    start_index: int,
) -> list[int]:
    """Find multi-table boundaries (repeated header rows).

    Args:
        lines: File lines
        delimiter: Field delimiter
        first_header: Column headers from first table
        start_index: Index after first table header

    Returns:
        List of line indices where new tables begin
    """
    table_starts = []
    header_pattern = [h.lower().strip() for h in first_header[:3] if h.strip()]

    for i in range(start_index, len(lines)):
        line = lines[i].strip()
        if not line:
            continue

        parts = line.split(delimiter)
        if len(parts) >= len(header_pattern):
            current = [p.lower().strip() for p in parts[:3] if p.strip()]
            # Check if this looks like a repeated header
            if current == header_pattern:
                table_starts.append(i)

    return table_starts


[docs] def parse_trios_csv( filepath: str | Path, *, encoding: str | None = None, decimal_separator: str = ".", delimiter: str | None = None, ) -> TRIOSFile: """Low-level CSV parser returning raw TRIOSFile structure. For advanced users who need access to raw tables and metadata before RheoData conversion. Args: filepath: Path to TRIOS CSV file encoding: File encoding (auto-detected if None) decimal_separator: Decimal separator ("." or ",") delimiter: Delimiter override (None = auto) Returns: TRIOSFile with parsed tables and metadata Raises: FileNotFoundError: File does not exist ValueError: No data tables found """ filepath = Path(filepath) logger.info("Parsing TRIOS CSV file", filepath=str(filepath)) if not filepath.exists(): logger.error("File not found", filepath=str(filepath)) raise FileNotFoundError(f"File not found: {filepath}") # Detect or use provided encoding if encoding is None: encoding = detect_encoding(filepath) logger.debug("Using encoding", encoding=encoding) # Read file content with open(filepath, encoding=encoding, errors="replace") as f: content = f.read() lines = content.split("\n") logger.debug("File read", num_lines=len(lines), content_bytes=len(content)) # Detect delimiter if delimiter is None: delimiter = detect_delimiter(content, decimal_separator=decimal_separator) # Parse metadata and find header row metadata, header_start = parse_metadata_header(lines, delimiter) # Find actual header row header_row = detect_header_row(lines, delimiter, header_start) if header_row >= len(lines): logger.error("No data tables found", filepath=str(filepath)) raise ValueError("No data tables found in TRIOS CSV file") # Parse header header_line = lines[header_row] header = [h.strip() for h in header_line.split(delimiter)] # Check for unit row (next line may contain units) unit_row = None data_start = header_row + 1 if data_start < len(lines): next_line = lines[data_start] parts = next_line.split(delimiter) # VIS-CSV2-001: Strengthen unit-row detection to require positive # evidence (at least one cell looks like a unit string) in addition to # the negative check (not numeric). This prevents non-numeric, non- # "data" text values (e.g., "N/A", "undefined", "--") from being # falsely consumed as unit rows, which silently drops the first data row. if parts and not parts[0].strip().lower().startswith("data"): # Check if it looks like units (not numeric values) is_unit_row = True for p in parts[1:5]: # Check first few columns if p.strip() and _is_numeric(p.strip()): is_unit_row = False break # Require positive evidence: at least one cell contains a known # unit substring. This rules out annotation/label rows. if is_unit_row: non_empty_parts = [p.strip() for p in parts[:20] if p.strip()] has_unit_evidence = any( any(u in p for u in _UNIT_SUBSTRINGS) for p in non_empty_parts ) if not has_unit_evidence: is_unit_row = False if is_unit_row: unit_row = parts data_start += 1 # Extract units units = extract_units_from_header(header, unit_row) # Determine whether the first column is a non-numeric label column BEFORE # parsing any data rows. This ensures the col_offset is applied uniformly # to every row, preventing rows from being 1 element shorter than the header. first_col_is_label = header[0].lower() in {"variables", "data point"} or header[ 0 ].lower().startswith("data") col_offset = 1 if first_col_is_label else 0 # Trim the header once, consistently with the data parsing below. if first_col_is_label: header = header[1:] units = { k: v for k, v in units.items() if k.lower() != "variables" and not k.lower().startswith("data") } # IO-004: Parse data rows from the current section and record where any # `[`-prefixed section header occurs so we can continue into the next table. # Previously `break` on `[` silently dropped all data after the first table # boundary in multi-step TRIOS CSV exports. data_rows = [] expected_cols = len(header) + col_offset skipped_rows = 0 next_section_start: int | None = None # IO-004: track for multi-table loop for i in range(data_start, len(lines)): line = lines[i].strip() if line.startswith("["): # Section header — end of this table's data. # IO-004: record the position so the outer loop can find the # next table's header rather than discarding everything after here. next_section_start = i break if not line: # IO-R6-007: Skip blank separator lines within multi-step data # blocks. TRIOS multi-step CSV exports commonly have blank lines # between step sections. Breaking here would silently truncate data. continue parts = line.split(delimiter) if len(parts) == expected_cols: row = [] for j, val in enumerate(parts): if j < col_offset: # Skip label column consistently for ALL rows continue val_clean = val.strip() if not val_clean: row.append(np.nan) else: try: if decimal_separator == ",": # EU decimal handling: try comma→dot first (preserves # dots in sci notation like "1.23E+04"); only do full # EU conversion (remove dots, swap comma) if that fails. try: row.append(float(val_clean.replace(",", "."))) except ValueError: eu_val = val_clean.replace(".", "").replace(",", ".") try: row.append(float(eu_val)) except ValueError: row.append(float(val_clean)) else: row.append(float(val_clean)) except ValueError: row.append(np.nan) if row: data_rows.append(row) else: skipped_rows += 1 if skipped_rows > 0: warnings.warn( f"Skipped {skipped_rows} malformed rows (expected {expected_cols} columns) in {filepath}", stacklevel=3, ) if not data_rows: logger.error("No data rows found", filepath=str(filepath)) raise ValueError("No data rows found in TRIOS CSV file") logger.debug("Data rows parsed", num_rows=len(data_rows)) # Create DataFrame # IO-R6-007: Warn if header is wider than data rows (instead of silent truncation). # Pad short rows with NaN to preserve all columns. n_data_cols = len(data_rows[0]) if data_rows else 0 n_header_cols = len(header) if n_data_cols < n_header_cols: logger.warning( "Data rows narrower than header — padding with NaN", header_cols=n_header_cols, data_cols=n_data_cols, dropped_headers=header[n_data_cols:], ) for row in data_rows: row.extend([np.nan] * (n_header_cols - len(row))) elif n_data_cols > n_header_cols: header = header + [f"col_{i}" for i in range(n_header_cols, n_data_cols)] df = pd.DataFrame(data_rows, columns=header[: max(n_data_cols, n_header_cols)]) logger.debug("DataFrame created", shape=df.shape, columns=list(df.columns)) # Detect step column step_col = detect_step_column(df) step_values = None if step_col: step_values = df[step_col].unique().tolist() logger.debug( "Step column detected", step_col=step_col, num_steps=len(step_values) ) # Create first TRIOSTable tables: list[TRIOSTable] = [ TRIOSTable( table_index=0, header=list(df.columns), units=units, df=df, step_values=step_values, ) ] # IO-004: Parse additional tables that follow `[`-prefixed section headers. # TRIOS multi-step CSV exports repeat the full `Variables`/header + data block # for each step, separated by `[Step N]` section-header lines. # Previously only the first block was parsed; everything after the first `[` # was silently discarded. search_start = next_section_start while search_start is not None: # Skip the `[...]` line itself, then find the next header row. next_header_row = detect_header_row(lines, delimiter, search_start + 1) if next_header_row >= len(lines): break # M-3: Validate that detect_header_row actually found a header and # didn't just return the start_index fallback. If the returned index # equals search_start + 1 and the line looks numeric, it's a data row # masquerading as a header — skip this section. if next_header_row == search_start + 1: _probe = lines[next_header_row].strip().split(delimiter) if _probe and _is_numeric(_probe[0].strip()): search_start = None for _si in range(next_header_row, len(lines)): if lines[_si].strip().startswith("["): search_start = _si break continue # Parse the repeated header next_header_line = lines[next_header_row] next_header = [h.strip() for h in next_header_line.split(delimiter)] # Determine unit row for this section next_unit_row = None next_data_start = next_header_row + 1 if next_data_start < len(lines): nxt_line = lines[next_data_start] nxt_parts = nxt_line.split(delimiter) if nxt_parts and not nxt_parts[0].strip().lower().startswith("data"): _is_u = True for _p in nxt_parts[1:5]: if _p.strip() and _is_numeric(_p.strip()): _is_u = False break if _is_u: _nep = [_p.strip() for _p in nxt_parts[:6] if _p.strip()] if any(any(u in _p for u in _UNIT_SUBSTRINGS) for _p in _nep): next_unit_row = nxt_parts next_data_start += 1 next_units = extract_units_from_header(next_header, next_unit_row) # Determine label-column offset for this section next_first_col_is_label = next_header[0].lower() in { "variables", "data point", } or next_header[0].lower().startswith("data") next_col_offset = 1 if next_first_col_is_label else 0 if next_first_col_is_label: next_header = next_header[1:] next_units = { k: v for k, v in next_units.items() if k.lower() != "variables" and not k.lower().startswith("data") } # CSV-MULTI-001: Guard against empty header after label-column stripping. # If the section header line contained only a label column (e.g. "Data") # with no data columns, next_header is now []. next_expected would be 0, # matching nothing (non-blank rows have len(parts) >= 1 after stripping), # and the section would produce 0 data rows followed by a silent skip. # Emit a debug log and scan forward for the next section marker. # R6-IO-001: Previously used stale next_section_start which caused an # infinite loop when the marker equalled search_start. if not next_header: logger.debug( "Multi-table section has no data columns after label-column strip; " "skipping section", section_start=search_start, ) # Scan forward from current position to find next '[' marker _scan_start = next_data_start search_start = None for _si in range(_scan_start, len(lines)): if lines[_si].strip().startswith("["): search_start = _si break continue # Parse data rows for this section next_data_rows: list[list[float]] = [] next_expected = len(next_header) + next_col_offset next_skipped = 0 next_section_start = None for i in range(next_data_start, len(lines)): line = lines[i].strip() if line.startswith("["): next_section_start = i break if not line: continue parts2 = line.split(delimiter) if len(parts2) == next_expected: row2: list[float] = [] for j2, val2 in enumerate(parts2): if j2 < next_col_offset: continue v2 = val2.strip() if not v2: row2.append(np.nan) else: try: if decimal_separator == ",": try: row2.append(float(v2.replace(",", "."))) except ValueError: eu2 = v2.replace(".", "").replace(",", ".") try: row2.append(float(eu2)) except ValueError: row2.append(float(v2)) else: row2.append(float(v2)) except ValueError: row2.append(np.nan) if row2: next_data_rows.append(row2) else: next_skipped += 1 if next_skipped > 0: warnings.warn( f"Skipped {next_skipped} malformed rows in additional table " f"(expected {next_expected} columns) in {filepath}", stacklevel=3, ) if not next_data_rows: # Empty section — skip but continue searching for more search_start = next_section_start continue # Build DataFrame for this section nd_cols = len(next_data_rows[0]) nh_cols = len(next_header) if nd_cols < nh_cols: for nr in next_data_rows: nr.extend([np.nan] * (nh_cols - len(nr))) elif nd_cols > nh_cols: next_header = next_header + [f"col_{i}" for i in range(nh_cols, nd_cols)] next_df = pd.DataFrame( next_data_rows, columns=next_header[: max(nd_cols, nh_cols)], ) next_step_col = detect_step_column(next_df) next_step_vals = None if next_step_col: next_step_vals = next_df[next_step_col].unique().tolist() tables.append( TRIOSTable( table_index=len(tables), header=list(next_df.columns), units=next_units, df=next_df, step_values=next_step_vals, ) ) logger.debug( "Additional table parsed", table_index=len(tables) - 1, num_rows=len(next_data_rows), ) search_start = next_section_start logger.info( "TRIOS CSV parsing complete", filepath=str(filepath), num_tables=len(tables), num_rows=len(data_rows), num_columns=len(df.columns), ) return TRIOSFile( filepath=str(filepath), format="csv", metadata=metadata, tables=tables, encoding=encoding, decimal_separator=decimal_separator, )
[docs] def load_trios_csv( filepath: str | Path, *, return_all_segments: bool = False, test_mode: str | None = None, encoding: str | None = None, decimal_separator: str = ".", delimiter: str | None = None, validate: bool = True, progress_callback: Callable[[int, int], None] | None = None, ) -> RheoData | list[RheoData]: """Load TRIOS CSV export file. Handles TRIOS-specific CSV format with: - Metadata header rows before data - Tab or comma delimiters (auto-detected) - Units in parentheses or separate row - Step/Segment columns for multi-step experiments - Repeated headers for multi-table files Args: filepath: Path to TRIOS CSV file return_all_segments: Return list for multi-step files test_mode: Override auto-detection ("creep", "relaxation", "oscillation", "rotation") encoding: File encoding (auto-detected: UTF-8, Latin-1, CP1252) decimal_separator: "." for US, "," for European delimiter: Override delimiter detection (None = auto) validate: Validate RheoData on creation progress_callback: Progress callback(current, total) Returns: Single RheoData or list of RheoData Raises: FileNotFoundError: File does not exist ValueError: No data found or invalid format Example: >>> data = load_trios_csv('frequency_sweep.csv') >>> print(data.test_mode) # 'oscillation' >>> print(np.iscomplexobj(data.y)) # True for G* = G' + iG'' """ logger.info("Loading TRIOS CSV file", filepath=str(filepath)) # Parse CSV file trios_file = parse_trios_csv( filepath, encoding=encoding, decimal_separator=decimal_separator, delimiter=delimiter, ) # Convert tables to RheoData rheo_data_list: list[RheoData] = [] for table_idx, table in enumerate(trios_file.tables): df = table.df units = table.units logger.debug( "Processing table", table_index=table_idx, shape=df.shape, columns=list(df.columns), ) # Detect or use provided test mode. # IO-FIX-002: use explicit None check (not `or`) — an empty-string # test_mode="" is falsy and would fall through to auto-detection, # ignoring the caller's intent. detected_mode = detect_test_type(df) if test_mode is None else test_mode logger.debug("Test mode", detected_mode=detected_mode, provided=test_mode) # Check for step column and split if needed step_col = detect_step_column(df) segments = ( [df] if not step_col or not return_all_segments else split_by_step(df, step_col) ) logger.debug( "Segments identified", step_col=step_col, num_segments=len(segments), ) for seg_idx, seg_df in enumerate(segments): # Select x/y columns x_col, y_col, y2_col = select_xy_columns(seg_df, detected_mode) if x_col is None or y_col is None: msg = ( f"Skipping TRIOS CSV segment {seg_idx}: could not determine " f"x/y columns. Available columns: {list(seg_df.columns)}" ) warnings.warn(msg, stacklevel=2) logger.warning( "Could not determine x/y columns", segment_index=seg_idx, available_columns=list(seg_df.columns), ) continue logger.debug( "Columns selected", segment_index=seg_idx, x_col=x_col, y_col=y_col, y2_col=y2_col, ) # Extract data try: x_data = seg_df[x_col].values.astype(float) except (ValueError, TypeError) as e: raise ValueError( f"Column '{x_col}' contains non-numeric data that cannot be converted to float. " f"Sample values: {seg_df[x_col].head(3).tolist()}" ) from e # Get units x_units = units.get(x_col, "") y_units = units.get(y_col, "Pa") # Handle complex modulus case if y2_col is not None: try: y_real = seg_df[y_col].values.astype(float) except (ValueError, TypeError) as e: raise ValueError( f"Column '{y_col}' contains non-numeric data that cannot be converted to float. " f"Sample values: {seg_df[y_col].head(3).tolist()}" ) from e try: y_imag = seg_df[y2_col].values.astype(float) except (ValueError, TypeError) as e: raise ValueError( f"Column '{y2_col}' contains non-numeric data that cannot be converted to float. " f"Sample values: {seg_df[y2_col].head(3).tolist()}" ) from e # Convert units if needed y_units_orig = units.get(y_col, "Pa") y2_units_orig = units.get(y2_col, "Pa") y_real, _ = convert_unit(y_real, y_units_orig, "Pa") y_imag, _ = convert_unit(y_imag, y2_units_orig, "Pa") # Construct complex modulus y_data = construct_complex_modulus(y_real, y_imag) y_units = "Pa" is_complex = True else: try: y_data = seg_df[y_col].values.astype(float) except (ValueError, TypeError) as e: raise ValueError( f"Column '{y_col}' contains non-numeric data that cannot be converted to float. " f"Sample values: {seg_df[y_col].head(3).tolist()}" ) from e is_complex = False # Convert x units (e.g., Hz to rad/s) if detected_mode == "oscillation": x_data, x_units = convert_unit(x_data, x_units, "rad/s") # Remove non-finite values (NaN and ±inf) — both poison model # fits and violate RheoData's isfinite invariant which raises # ValueError on inf. np.isfinite is False for both NaN and inf. if is_complex: valid_mask = ( np.isfinite(x_data) & np.isfinite(np.real(y_data)) & np.isfinite(np.imag(y_data)) ) else: valid_mask = np.isfinite(x_data) & np.isfinite(y_data) x_data = x_data[valid_mask] y_data = y_data[valid_mask] if len(x_data) == 0: warnings.warn( f"Segment {seg_idx} has no valid data after NaN filtering and was skipped.", RheoJaxValidationWarning, stacklevel=2, ) continue # Build metadata seg_metadata = trios_file.metadata.copy() seg_metadata["test_mode"] = detected_mode seg_metadata["source_format"] = "csv" seg_metadata["x_column"] = x_col seg_metadata["y_column"] = y_col if y2_col: seg_metadata["y2_column"] = y2_col seg_metadata["is_complex"] = is_complex # Create DataSegment and convert to RheoData segment = DataSegment( segment_index=seg_idx, test_mode=detected_mode, x_data=x_data, y_data=y_data, x_column=x_col, y_column=y_col, x_units=x_units or _default_x_units(detected_mode), y_units=y_units, is_complex=is_complex, metadata=seg_metadata, ) rheo_data = segment_to_rheodata(segment, validate=validate) rheo_data_list.append(rheo_data) logger.debug( "RheoData created", segment_index=seg_idx, num_points=len(x_data), test_mode=detected_mode, is_complex=is_complex, ) if not rheo_data_list: logger.error("No valid data segments parsed", filepath=str(filepath)) raise ValueError(f"No valid data segments could be parsed from {filepath}") logger.info( "TRIOS CSV load complete", filepath=str(filepath), num_segments=len(rheo_data_list), ) # Return single or list if len(rheo_data_list) == 1 and not return_all_segments: return rheo_data_list[0] return rheo_data_list