"""TA Instruments TRIOS CSV file reader.
This module provides a reader for TRIOS CSV exports with support for:
- Tab or comma delimiters (auto-detected)
- Metadata header rows
- Units in parentheses or separate row
- Step/Segment columns for multi-step experiments
- Complex modulus construction (G' + iG'')
- Automatic encoding detection (UTF-8, Latin-1, CP1252)
Usage:
>>> from rheojax.io.readers.trios import load_trios_csv
>>> data = load_trios_csv('frequency_sweep.csv')
>>> print(data.test_mode) # 'oscillation'
"""
from __future__ import annotations
import re
import warnings
from collections.abc import Callable
from pathlib import Path
from typing import Any
import numpy as np
import pandas as pd
from rheojax.core.data import RheoData
from rheojax.io._exceptions import RheoJaxValidationWarning
from rheojax.io.readers.trios.common import (
DataSegment,
TRIOSFile,
TRIOSTable,
construct_complex_modulus,
convert_unit,
detect_step_column,
detect_test_type,
segment_to_rheodata,
select_xy_columns,
split_by_step,
)
from rheojax.logging import get_logger
logger = get_logger(__name__)
# Encoding cascade for auto-detection
ENCODING_CASCADE = ["utf-8", "latin-1", "cp1252"]
# Auto-chunking threshold (5 MB)
AUTO_CHUNK_THRESHOLD_MB = 5.0
# Unit substrings for identifying unit rows in TRIOS CSV files.
# Shared between first-table parsing and multi-table continuation loop.
_UNIT_SUBSTRINGS = frozenset(
{
"Pa",
"Hz",
"rad",
"°C",
"°F",
"K",
"/s",
"%",
"1/",
"mN",
"mPa",
"kPa",
"MPa",
"N·m",
"N.m",
"J/",
"W/",
"m²",
"m2",
"mm",
"μm",
"nm",
}
)
def detect_encoding(filepath: Path) -> str:
"""Detect file encoding using cascade approach.
Tries UTF-8, Latin-1, and CP1252 in order.
Args:
filepath: Path to file
Returns:
Detected encoding string
Raises:
UnicodeDecodeError: If none of the encodings work
"""
logger.debug("Detecting file encoding", filepath=str(filepath))
for encoding in ENCODING_CASCADE:
try:
with open(filepath, encoding=encoding) as f:
# Read first 1KB to check encoding
f.read(1024)
logger.debug("Encoding detected", encoding=encoding)
return encoding
except UnicodeDecodeError:
logger.debug("Encoding failed", encoding=encoding)
continue
logger.error(
"Failed to detect encoding",
filepath=str(filepath),
tried_encodings=ENCODING_CASCADE,
)
raise ValueError(f"Could not decode {filepath} with any of the attempted encodings")
def detect_delimiter(content: str, decimal_separator: str = ".") -> str:
"""Detect delimiter (tab vs comma) from file content.
TRIOS CSV files typically use tabs, but may use commas.
Metadata lines (Step/Procedure/Instrument/etc.) are skipped
as they may use different delimiters than the actual data.
When ``decimal_separator`` is ``","`` (EU locale), comma counts are inflated
because every decimal number contributes a comma. In that case, any non-zero
tab count is a stronger signal than the raw comma count.
Args:
content: First few lines of file content
decimal_separator: Decimal separator used in the file ("." or ",")
Returns:
Delimiter character ('\t' or ',')
"""
_METADATA_PREFIXES = (
"step",
"procedure",
"instrument",
"sample",
"date",
"time",
"geometry",
"filename",
"operator",
"rundate",
"gap",
"temperature",
"number of points",
)
# Filter out metadata lines and section markers, then sample from last 5 non-metadata lines
all_lines = content.split("\n")
data_lines = [
line
for line in all_lines[:20]
if line.strip()
and not line.strip().lower().startswith(_METADATA_PREFIXES)
and not line.strip().startswith("[") # Skip [step] / section markers
]
if not data_lines:
# R8-IO-004: extend search past metadata instead of falling back to it
extended = [
line
for line in all_lines[20:60]
if line.strip()
and not line.strip().lower().startswith(_METADATA_PREFIXES)
and not line.strip().startswith("[") # Skip [step] / section markers
]
if extended:
lines = extended[-5:]
else:
logger.debug("Could not detect delimiter from content; defaulting to tab")
return "\t"
else:
lines = data_lines[-5:]
tab_count = sum(line.count("\t") for line in lines)
comma_count = sum(line.count(",") for line in lines)
# EU decimal correction: when decimal_separator is comma and tabs exist,
# most commas are decimal separators, not delimiters — prefer tab.
if decimal_separator == "," and tab_count > 0:
delimiter = "\t"
elif tab_count >= comma_count:
# Prefer tabs for TRIOS files (typical format)
delimiter = "\t"
else:
delimiter = ","
logger.debug(
"Delimiter detected",
delimiter=repr(delimiter),
tab_count=tab_count,
comma_count=comma_count,
)
return delimiter
def parse_metadata_header(
lines: list[str],
delimiter: str,
) -> tuple[dict[str, Any], int]:
"""Extract metadata from file header.
TRIOS CSV files have metadata key-value pairs at the top,
followed by the column headers.
Args:
lines: File lines
delimiter: Field delimiter
Returns:
Tuple of (metadata dict, header row index)
"""
logger.debug("Parsing metadata header", num_lines=len(lines))
metadata: dict[str, Any] = {}
header_row = 0
# Known metadata patterns
metadata_patterns = {
"filename": r"^Filename",
"instrument_serial_number": r"^Instrument serial number",
"instrument_name": r"^Instrument name",
"operator": r"^[Oo]perator",
"run_date": r"^[Rr]undate",
"sample_name": r"^Sample name",
"geometry": r"^Geometry name",
"geometry_type": r"^Geometry type",
"gap": r"^Gap",
"temperature": r"^Temperature",
}
for i, line in enumerate(lines):
if not line.strip():
continue
# Check if this is a metadata line
is_metadata = False
for key, pattern in metadata_patterns.items():
if re.match(pattern, line, re.IGNORECASE):
parts = line.split(delimiter)
if len(parts) >= 2:
value = parts[1].strip()
metadata[key] = value
logger.debug("Metadata field extracted", key=key, value=value)
is_metadata = True
break
# Check if this looks like a header row (multiple text columns)
if not is_metadata:
parts = line.split(delimiter)
# Header rows typically have "Variables" or multiple column names
if (
parts[0].strip().lower() == "variables"
or len([p for p in parts if p.strip() and not p.strip().isdigit()]) >= 3
):
header_row = i
logger.debug("Header row found", header_row=header_row)
break
# Or if it starts with "Number of points" we're close to data
if parts[0].strip().lower() == "number of points":
if len(parts) >= 2:
try:
metadata["number_of_points"] = int(parts[1].strip())
except ValueError:
pass
# Header is next line
header_row = i + 1
logger.debug(
"Header row found after 'Number of points'", header_row=header_row
)
break
logger.debug(
"Metadata parsing complete",
metadata_fields=len(metadata),
header_row=header_row,
)
return metadata, header_row
def detect_header_row(
lines: list[str],
delimiter: str,
start_index: int = 0,
) -> int:
"""Find first row with column headers (data table start).
Args:
lines: File lines
delimiter: Field delimiter
start_index: Index to start searching from
Returns:
Header row index
"""
logger.debug("Detecting header row", start_index=start_index, num_lines=len(lines))
for i in range(start_index, len(lines)):
line = lines[i].strip()
if not line:
continue
parts = line.split(delimiter)
# Check for "Variables" row (TRIOS format)
if parts[0].strip().lower() == "variables":
logger.debug("Header row detected via 'Variables' marker", row=i)
return i
# Check for "Number of points" - header is next
if parts[0].strip().lower() == "number of points":
logger.debug("Header row detected via 'Number of points' marker", row=i + 1)
return i + 1
# Check for multiple non-numeric columns (likely headers)
non_numeric = 0
for p in parts[1:]: # Skip first column (often a label)
if p.strip() and not _is_numeric(p.strip()):
non_numeric += 1
if non_numeric >= 2:
logger.debug(
"Header row detected via non-numeric columns",
row=i,
non_numeric_count=non_numeric,
)
return i
logger.debug("No header row found, using start_index", start_index=start_index)
return start_index
def _is_numeric(s: str) -> bool:
"""Check if string represents a numeric value."""
try:
float(s.replace(",", "."))
return True
except ValueError:
return False
def _default_x_units(test_mode: str) -> str:
"""Get default x-axis units for a test mode."""
if test_mode == "oscillation":
return "rad/s"
elif test_mode == "rotation":
return "1/s"
return "s"
def extract_units_from_header(
header: list[str],
unit_row: list[str] | None = None,
) -> dict[str, str]:
"""Parse units from column headers or separate unit row.
TRIOS exports may have units in parentheses: "Angular Frequency (rad/s)"
Or in a separate row below headers.
Args:
header: Column header names
unit_row: Optional separate unit row
Returns:
Dict mapping column names to units
"""
units: dict[str, str] = {}
for i, col in enumerate(header):
col_clean = col.strip()
# Check for units in parentheses
match = re.search(r"\(([^)]+)\)$", col_clean)
if match:
units[col_clean] = match.group(1)
# Also store under name without units
name_without_units = re.sub(r"\s*\([^)]+\)$", "", col_clean).strip()
units[name_without_units] = match.group(1)
elif unit_row and i < len(unit_row):
# Use separate unit row
unit = unit_row[i].strip()
if unit:
units[col_clean] = unit
return units
def detect_repeated_headers(
lines: list[str],
delimiter: str,
first_header: list[str],
start_index: int,
) -> list[int]:
"""Find multi-table boundaries (repeated header rows).
Args:
lines: File lines
delimiter: Field delimiter
first_header: Column headers from first table
start_index: Index after first table header
Returns:
List of line indices where new tables begin
"""
table_starts = []
header_pattern = [h.lower().strip() for h in first_header[:3] if h.strip()]
for i in range(start_index, len(lines)):
line = lines[i].strip()
if not line:
continue
parts = line.split(delimiter)
if len(parts) >= len(header_pattern):
current = [p.lower().strip() for p in parts[:3] if p.strip()]
# Check if this looks like a repeated header
if current == header_pattern:
table_starts.append(i)
return table_starts
[docs]
def parse_trios_csv(
filepath: str | Path,
*,
encoding: str | None = None,
decimal_separator: str = ".",
delimiter: str | None = None,
) -> TRIOSFile:
"""Low-level CSV parser returning raw TRIOSFile structure.
For advanced users who need access to raw tables and metadata
before RheoData conversion.
Args:
filepath: Path to TRIOS CSV file
encoding: File encoding (auto-detected if None)
decimal_separator: Decimal separator ("." or ",")
delimiter: Delimiter override (None = auto)
Returns:
TRIOSFile with parsed tables and metadata
Raises:
FileNotFoundError: File does not exist
ValueError: No data tables found
"""
filepath = Path(filepath)
logger.info("Parsing TRIOS CSV file", filepath=str(filepath))
if not filepath.exists():
logger.error("File not found", filepath=str(filepath))
raise FileNotFoundError(f"File not found: {filepath}")
# Detect or use provided encoding
if encoding is None:
encoding = detect_encoding(filepath)
logger.debug("Using encoding", encoding=encoding)
# Read file content
with open(filepath, encoding=encoding, errors="replace") as f:
content = f.read()
lines = content.split("\n")
logger.debug("File read", num_lines=len(lines), content_bytes=len(content))
# Detect delimiter
if delimiter is None:
delimiter = detect_delimiter(content, decimal_separator=decimal_separator)
# Parse metadata and find header row
metadata, header_start = parse_metadata_header(lines, delimiter)
# Find actual header row
header_row = detect_header_row(lines, delimiter, header_start)
if header_row >= len(lines):
logger.error("No data tables found", filepath=str(filepath))
raise ValueError("No data tables found in TRIOS CSV file")
# Parse header
header_line = lines[header_row]
header = [h.strip() for h in header_line.split(delimiter)]
# Check for unit row (next line may contain units)
unit_row = None
data_start = header_row + 1
if data_start < len(lines):
next_line = lines[data_start]
parts = next_line.split(delimiter)
# VIS-CSV2-001: Strengthen unit-row detection to require positive
# evidence (at least one cell looks like a unit string) in addition to
# the negative check (not numeric). This prevents non-numeric, non-
# "data" text values (e.g., "N/A", "undefined", "--") from being
# falsely consumed as unit rows, which silently drops the first data row.
if parts and not parts[0].strip().lower().startswith("data"):
# Check if it looks like units (not numeric values)
is_unit_row = True
for p in parts[1:5]: # Check first few columns
if p.strip() and _is_numeric(p.strip()):
is_unit_row = False
break
# Require positive evidence: at least one cell contains a known
# unit substring. This rules out annotation/label rows.
if is_unit_row:
non_empty_parts = [p.strip() for p in parts[:20] if p.strip()]
has_unit_evidence = any(
any(u in p for u in _UNIT_SUBSTRINGS) for p in non_empty_parts
)
if not has_unit_evidence:
is_unit_row = False
if is_unit_row:
unit_row = parts
data_start += 1
# Extract units
units = extract_units_from_header(header, unit_row)
# Determine whether the first column is a non-numeric label column BEFORE
# parsing any data rows. This ensures the col_offset is applied uniformly
# to every row, preventing rows from being 1 element shorter than the header.
first_col_is_label = header[0].lower() in {"variables", "data point"} or header[
0
].lower().startswith("data")
col_offset = 1 if first_col_is_label else 0
# Trim the header once, consistently with the data parsing below.
if first_col_is_label:
header = header[1:]
units = {
k: v
for k, v in units.items()
if k.lower() != "variables" and not k.lower().startswith("data")
}
# IO-004: Parse data rows from the current section and record where any
# `[`-prefixed section header occurs so we can continue into the next table.
# Previously `break` on `[` silently dropped all data after the first table
# boundary in multi-step TRIOS CSV exports.
data_rows = []
expected_cols = len(header) + col_offset
skipped_rows = 0
next_section_start: int | None = None # IO-004: track for multi-table loop
for i in range(data_start, len(lines)):
line = lines[i].strip()
if line.startswith("["):
# Section header — end of this table's data.
# IO-004: record the position so the outer loop can find the
# next table's header rather than discarding everything after here.
next_section_start = i
break
if not line:
# IO-R6-007: Skip blank separator lines within multi-step data
# blocks. TRIOS multi-step CSV exports commonly have blank lines
# between step sections. Breaking here would silently truncate data.
continue
parts = line.split(delimiter)
if len(parts) == expected_cols:
row = []
for j, val in enumerate(parts):
if j < col_offset:
# Skip label column consistently for ALL rows
continue
val_clean = val.strip()
if not val_clean:
row.append(np.nan)
else:
try:
if decimal_separator == ",":
# EU decimal handling: try comma→dot first (preserves
# dots in sci notation like "1.23E+04"); only do full
# EU conversion (remove dots, swap comma) if that fails.
try:
row.append(float(val_clean.replace(",", ".")))
except ValueError:
eu_val = val_clean.replace(".", "").replace(",", ".")
try:
row.append(float(eu_val))
except ValueError:
row.append(float(val_clean))
else:
row.append(float(val_clean))
except ValueError:
row.append(np.nan)
if row:
data_rows.append(row)
else:
skipped_rows += 1
if skipped_rows > 0:
warnings.warn(
f"Skipped {skipped_rows} malformed rows (expected {expected_cols} columns) in {filepath}",
stacklevel=3,
)
if not data_rows:
logger.error("No data rows found", filepath=str(filepath))
raise ValueError("No data rows found in TRIOS CSV file")
logger.debug("Data rows parsed", num_rows=len(data_rows))
# Create DataFrame
# IO-R6-007: Warn if header is wider than data rows (instead of silent truncation).
# Pad short rows with NaN to preserve all columns.
n_data_cols = len(data_rows[0]) if data_rows else 0
n_header_cols = len(header)
if n_data_cols < n_header_cols:
logger.warning(
"Data rows narrower than header — padding with NaN",
header_cols=n_header_cols,
data_cols=n_data_cols,
dropped_headers=header[n_data_cols:],
)
for row in data_rows:
row.extend([np.nan] * (n_header_cols - len(row)))
elif n_data_cols > n_header_cols:
header = header + [f"col_{i}" for i in range(n_header_cols, n_data_cols)]
df = pd.DataFrame(data_rows, columns=header[: max(n_data_cols, n_header_cols)])
logger.debug("DataFrame created", shape=df.shape, columns=list(df.columns))
# Detect step column
step_col = detect_step_column(df)
step_values = None
if step_col:
step_values = df[step_col].unique().tolist()
logger.debug(
"Step column detected", step_col=step_col, num_steps=len(step_values)
)
# Create first TRIOSTable
tables: list[TRIOSTable] = [
TRIOSTable(
table_index=0,
header=list(df.columns),
units=units,
df=df,
step_values=step_values,
)
]
# IO-004: Parse additional tables that follow `[`-prefixed section headers.
# TRIOS multi-step CSV exports repeat the full `Variables`/header + data block
# for each step, separated by `[Step N]` section-header lines.
# Previously only the first block was parsed; everything after the first `[`
# was silently discarded.
search_start = next_section_start
while search_start is not None:
# Skip the `[...]` line itself, then find the next header row.
next_header_row = detect_header_row(lines, delimiter, search_start + 1)
if next_header_row >= len(lines):
break
# M-3: Validate that detect_header_row actually found a header and
# didn't just return the start_index fallback. If the returned index
# equals search_start + 1 and the line looks numeric, it's a data row
# masquerading as a header — skip this section.
if next_header_row == search_start + 1:
_probe = lines[next_header_row].strip().split(delimiter)
if _probe and _is_numeric(_probe[0].strip()):
search_start = None
for _si in range(next_header_row, len(lines)):
if lines[_si].strip().startswith("["):
search_start = _si
break
continue
# Parse the repeated header
next_header_line = lines[next_header_row]
next_header = [h.strip() for h in next_header_line.split(delimiter)]
# Determine unit row for this section
next_unit_row = None
next_data_start = next_header_row + 1
if next_data_start < len(lines):
nxt_line = lines[next_data_start]
nxt_parts = nxt_line.split(delimiter)
if nxt_parts and not nxt_parts[0].strip().lower().startswith("data"):
_is_u = True
for _p in nxt_parts[1:5]:
if _p.strip() and _is_numeric(_p.strip()):
_is_u = False
break
if _is_u:
_nep = [_p.strip() for _p in nxt_parts[:6] if _p.strip()]
if any(any(u in _p for u in _UNIT_SUBSTRINGS) for _p in _nep):
next_unit_row = nxt_parts
next_data_start += 1
next_units = extract_units_from_header(next_header, next_unit_row)
# Determine label-column offset for this section
next_first_col_is_label = next_header[0].lower() in {
"variables",
"data point",
} or next_header[0].lower().startswith("data")
next_col_offset = 1 if next_first_col_is_label else 0
if next_first_col_is_label:
next_header = next_header[1:]
next_units = {
k: v
for k, v in next_units.items()
if k.lower() != "variables" and not k.lower().startswith("data")
}
# CSV-MULTI-001: Guard against empty header after label-column stripping.
# If the section header line contained only a label column (e.g. "Data")
# with no data columns, next_header is now []. next_expected would be 0,
# matching nothing (non-blank rows have len(parts) >= 1 after stripping),
# and the section would produce 0 data rows followed by a silent skip.
# Emit a debug log and scan forward for the next section marker.
# R6-IO-001: Previously used stale next_section_start which caused an
# infinite loop when the marker equalled search_start.
if not next_header:
logger.debug(
"Multi-table section has no data columns after label-column strip; "
"skipping section",
section_start=search_start,
)
# Scan forward from current position to find next '[' marker
_scan_start = next_data_start
search_start = None
for _si in range(_scan_start, len(lines)):
if lines[_si].strip().startswith("["):
search_start = _si
break
continue
# Parse data rows for this section
next_data_rows: list[list[float]] = []
next_expected = len(next_header) + next_col_offset
next_skipped = 0
next_section_start = None
for i in range(next_data_start, len(lines)):
line = lines[i].strip()
if line.startswith("["):
next_section_start = i
break
if not line:
continue
parts2 = line.split(delimiter)
if len(parts2) == next_expected:
row2: list[float] = []
for j2, val2 in enumerate(parts2):
if j2 < next_col_offset:
continue
v2 = val2.strip()
if not v2:
row2.append(np.nan)
else:
try:
if decimal_separator == ",":
try:
row2.append(float(v2.replace(",", ".")))
except ValueError:
eu2 = v2.replace(".", "").replace(",", ".")
try:
row2.append(float(eu2))
except ValueError:
row2.append(float(v2))
else:
row2.append(float(v2))
except ValueError:
row2.append(np.nan)
if row2:
next_data_rows.append(row2)
else:
next_skipped += 1
if next_skipped > 0:
warnings.warn(
f"Skipped {next_skipped} malformed rows in additional table "
f"(expected {next_expected} columns) in {filepath}",
stacklevel=3,
)
if not next_data_rows:
# Empty section — skip but continue searching for more
search_start = next_section_start
continue
# Build DataFrame for this section
nd_cols = len(next_data_rows[0])
nh_cols = len(next_header)
if nd_cols < nh_cols:
for nr in next_data_rows:
nr.extend([np.nan] * (nh_cols - len(nr)))
elif nd_cols > nh_cols:
next_header = next_header + [f"col_{i}" for i in range(nh_cols, nd_cols)]
next_df = pd.DataFrame(
next_data_rows,
columns=next_header[: max(nd_cols, nh_cols)],
)
next_step_col = detect_step_column(next_df)
next_step_vals = None
if next_step_col:
next_step_vals = next_df[next_step_col].unique().tolist()
tables.append(
TRIOSTable(
table_index=len(tables),
header=list(next_df.columns),
units=next_units,
df=next_df,
step_values=next_step_vals,
)
)
logger.debug(
"Additional table parsed",
table_index=len(tables) - 1,
num_rows=len(next_data_rows),
)
search_start = next_section_start
logger.info(
"TRIOS CSV parsing complete",
filepath=str(filepath),
num_tables=len(tables),
num_rows=len(data_rows),
num_columns=len(df.columns),
)
return TRIOSFile(
filepath=str(filepath),
format="csv",
metadata=metadata,
tables=tables,
encoding=encoding,
decimal_separator=decimal_separator,
)
[docs]
def load_trios_csv(
filepath: str | Path,
*,
return_all_segments: bool = False,
test_mode: str | None = None,
encoding: str | None = None,
decimal_separator: str = ".",
delimiter: str | None = None,
validate: bool = True,
progress_callback: Callable[[int, int], None] | None = None,
) -> RheoData | list[RheoData]:
"""Load TRIOS CSV export file.
Handles TRIOS-specific CSV format with:
- Metadata header rows before data
- Tab or comma delimiters (auto-detected)
- Units in parentheses or separate row
- Step/Segment columns for multi-step experiments
- Repeated headers for multi-table files
Args:
filepath: Path to TRIOS CSV file
return_all_segments: Return list for multi-step files
test_mode: Override auto-detection ("creep", "relaxation", "oscillation", "rotation")
encoding: File encoding (auto-detected: UTF-8, Latin-1, CP1252)
decimal_separator: "." for US, "," for European
delimiter: Override delimiter detection (None = auto)
validate: Validate RheoData on creation
progress_callback: Progress callback(current, total)
Returns:
Single RheoData or list of RheoData
Raises:
FileNotFoundError: File does not exist
ValueError: No data found or invalid format
Example:
>>> data = load_trios_csv('frequency_sweep.csv')
>>> print(data.test_mode) # 'oscillation'
>>> print(np.iscomplexobj(data.y)) # True for G* = G' + iG''
"""
logger.info("Loading TRIOS CSV file", filepath=str(filepath))
# Parse CSV file
trios_file = parse_trios_csv(
filepath,
encoding=encoding,
decimal_separator=decimal_separator,
delimiter=delimiter,
)
# Convert tables to RheoData
rheo_data_list: list[RheoData] = []
for table_idx, table in enumerate(trios_file.tables):
df = table.df
units = table.units
logger.debug(
"Processing table",
table_index=table_idx,
shape=df.shape,
columns=list(df.columns),
)
# Detect or use provided test mode.
# IO-FIX-002: use explicit None check (not `or`) — an empty-string
# test_mode="" is falsy and would fall through to auto-detection,
# ignoring the caller's intent.
detected_mode = detect_test_type(df) if test_mode is None else test_mode
logger.debug("Test mode", detected_mode=detected_mode, provided=test_mode)
# Check for step column and split if needed
step_col = detect_step_column(df)
segments = (
[df]
if not step_col or not return_all_segments
else split_by_step(df, step_col)
)
logger.debug(
"Segments identified",
step_col=step_col,
num_segments=len(segments),
)
for seg_idx, seg_df in enumerate(segments):
# Select x/y columns
x_col, y_col, y2_col = select_xy_columns(seg_df, detected_mode)
if x_col is None or y_col is None:
msg = (
f"Skipping TRIOS CSV segment {seg_idx}: could not determine "
f"x/y columns. Available columns: {list(seg_df.columns)}"
)
warnings.warn(msg, stacklevel=2)
logger.warning(
"Could not determine x/y columns",
segment_index=seg_idx,
available_columns=list(seg_df.columns),
)
continue
logger.debug(
"Columns selected",
segment_index=seg_idx,
x_col=x_col,
y_col=y_col,
y2_col=y2_col,
)
# Extract data
try:
x_data = seg_df[x_col].values.astype(float)
except (ValueError, TypeError) as e:
raise ValueError(
f"Column '{x_col}' contains non-numeric data that cannot be converted to float. "
f"Sample values: {seg_df[x_col].head(3).tolist()}"
) from e
# Get units
x_units = units.get(x_col, "")
y_units = units.get(y_col, "Pa")
# Handle complex modulus case
if y2_col is not None:
try:
y_real = seg_df[y_col].values.astype(float)
except (ValueError, TypeError) as e:
raise ValueError(
f"Column '{y_col}' contains non-numeric data that cannot be converted to float. "
f"Sample values: {seg_df[y_col].head(3).tolist()}"
) from e
try:
y_imag = seg_df[y2_col].values.astype(float)
except (ValueError, TypeError) as e:
raise ValueError(
f"Column '{y2_col}' contains non-numeric data that cannot be converted to float. "
f"Sample values: {seg_df[y2_col].head(3).tolist()}"
) from e
# Convert units if needed
y_units_orig = units.get(y_col, "Pa")
y2_units_orig = units.get(y2_col, "Pa")
y_real, _ = convert_unit(y_real, y_units_orig, "Pa")
y_imag, _ = convert_unit(y_imag, y2_units_orig, "Pa")
# Construct complex modulus
y_data = construct_complex_modulus(y_real, y_imag)
y_units = "Pa"
is_complex = True
else:
try:
y_data = seg_df[y_col].values.astype(float)
except (ValueError, TypeError) as e:
raise ValueError(
f"Column '{y_col}' contains non-numeric data that cannot be converted to float. "
f"Sample values: {seg_df[y_col].head(3).tolist()}"
) from e
is_complex = False
# Convert x units (e.g., Hz to rad/s)
if detected_mode == "oscillation":
x_data, x_units = convert_unit(x_data, x_units, "rad/s")
# Remove non-finite values (NaN and ±inf) — both poison model
# fits and violate RheoData's isfinite invariant which raises
# ValueError on inf. np.isfinite is False for both NaN and inf.
if is_complex:
valid_mask = (
np.isfinite(x_data)
& np.isfinite(np.real(y_data))
& np.isfinite(np.imag(y_data))
)
else:
valid_mask = np.isfinite(x_data) & np.isfinite(y_data)
x_data = x_data[valid_mask]
y_data = y_data[valid_mask]
if len(x_data) == 0:
warnings.warn(
f"Segment {seg_idx} has no valid data after NaN filtering and was skipped.",
RheoJaxValidationWarning,
stacklevel=2,
)
continue
# Build metadata
seg_metadata = trios_file.metadata.copy()
seg_metadata["test_mode"] = detected_mode
seg_metadata["source_format"] = "csv"
seg_metadata["x_column"] = x_col
seg_metadata["y_column"] = y_col
if y2_col:
seg_metadata["y2_column"] = y2_col
seg_metadata["is_complex"] = is_complex
# Create DataSegment and convert to RheoData
segment = DataSegment(
segment_index=seg_idx,
test_mode=detected_mode,
x_data=x_data,
y_data=y_data,
x_column=x_col,
y_column=y_col,
x_units=x_units or _default_x_units(detected_mode),
y_units=y_units,
is_complex=is_complex,
metadata=seg_metadata,
)
rheo_data = segment_to_rheodata(segment, validate=validate)
rheo_data_list.append(rheo_data)
logger.debug(
"RheoData created",
segment_index=seg_idx,
num_points=len(x_data),
test_mode=detected_mode,
is_complex=is_complex,
)
if not rheo_data_list:
logger.error("No valid data segments parsed", filepath=str(filepath))
raise ValueError(f"No valid data segments could be parsed from {filepath}")
logger.info(
"TRIOS CSV load complete",
filepath=str(filepath),
num_segments=len(rheo_data_list),
)
# Return single or list
if len(rheo_data_list) == 1 and not return_all_segments:
return rheo_data_list[0]
return rheo_data_list