"""TA Instruments TRIOS file reader.
This module provides a reader for TA Instruments rheometer files exported
as .txt format using the TRIOS "Export to LIMS" functionality.
The reader supports two modes:
1. **Full Loading** (`load_trios()`): Loads entire file into memory
- Best for files < 10MB or < 50,000 data points
- Returns complete RheoData object(s)
- Simple API for typical use cases
2. **Chunked Reading** (`load_trios_chunked()`): Memory-efficient streaming
- Best for large files (> 10MB, > 50,000 data points)
- Returns generator yielding RheoData chunks
- Reduces memory usage by ~90% for large files
- Preserves metadata across all chunks
**Memory Requirements:**
- Full loading: ~80 bytes per data point (e.g., 8 MB for 100k points)
- Chunked reading: ~80 bytes × chunk_size (e.g., 800 KB for 10k chunk_size)
**Usage Example - Full Loading:**
>>> from rheojax.io.readers import load_trios
>>> data = load_trios('small_file.txt')
>>> print(f"Loaded {len(data.x)} points")
**Usage Example - Chunked Reading:**
>>> from rheojax.io.readers.trios import load_trios_chunked
>>>
>>> # Process large file in chunks of 10,000 points
>>> for i, chunk in enumerate(load_trios_chunked('large_file.txt', chunk_size=10000)):
... print(f"Chunk {i}: {len(chunk.x)} points")
... # Process chunk (e.g., fit model, transform, plot)
... model.fit(chunk.x, chunk.y)
>>>
>>> # Aggregate results across chunks
>>> results = []
>>> for chunk in load_trios_chunked('large_file.txt'):
... result = process_chunk(chunk)
... results.append(result)
>>> final_result = aggregate(results)
**When to Use Chunked Reading:**
- Files > 10 MB (typically > 50,000 data points)
- OWChirp arbitrary wave files (often 150k+ points, 66-80 MB)
- Memory-constrained environments
- Processing pipelines that can operate on chunks
- Parallel processing of independent segments
Reference: Ported from hermes-rheo TriosRheoReader
"""
from __future__ import annotations
import math
import os
import re
import warnings
from collections.abc import Callable
from pathlib import Path
import numpy as np
from rheojax.core.data import RheoData
from rheojax.logging import get_logger
# Configure logger for auto-chunking notifications
logger = get_logger(__name__)
# Auto-chunking threshold (5 MB)
AUTO_CHUNK_THRESHOLD_MB = 5.0
# VIS-TXT-001: Encoding helpers for TRIOS TXT files.
# EU-locale TA Instruments instruments (DE, FR, NL) export Latin-1/CP-1252.
_TXT_ENCODING_CASCADE = ("utf-8-sig", "utf-8", "latin-1")
def _detect_txt_encoding(filepath: Path) -> str:
"""Probe the first 4096 bytes to select the best encoding for a TXT file."""
raw = filepath.read_bytes()[:4096]
for enc in _TXT_ENCODING_CASCADE:
try:
raw.decode(enc)
return enc
except (UnicodeDecodeError, LookupError):
continue
return "latin-1" # guaranteed fallback (never raises on 8-bit data)
def _read_file_with_encoding_cascade(filepath: Path) -> str:
"""Read entire TXT file using the best detected encoding."""
encoding = _detect_txt_encoding(filepath)
logger.debug("Detected TXT encoding", filepath=str(filepath), encoding=encoding)
try:
return filepath.read_text(encoding=encoding)
except UnicodeDecodeError:
# Final fallback: latin-1 never raises on 8-bit data
return filepath.read_text(encoding="latin-1", errors="replace")
# Unit conversion factors
UNIT_CONVERSIONS = {
"MPa": ("Pa", 1e6),
"kPa": ("Pa", 1e3),
"%": ("unitless", 0.01),
}
def convert_units(
value: float | np.ndarray, from_unit: str, to_unit: str
) -> float | np.ndarray:
"""Convert values between units.
Args:
value: Value or array to convert
from_unit: Source unit
to_unit: Target unit
Returns:
Converted value(s)
"""
if from_unit == to_unit:
return value
if from_unit in UNIT_CONVERSIONS:
target, factor = UNIT_CONVERSIONS[from_unit]
if target == to_unit or to_unit == "Pa":
return value * factor
return value
def load_trios(filepath: str | Path, **kwargs) -> RheoData | list[RheoData]:
"""Load TA Instruments TRIOS .txt file.
Reads rheological data from TRIOS exported .txt files. Supports multiple
measurement types including:
- Frequency sweep (SAOS)
- Amplitude sweep
- Flow ramp (steady shear)
- Stress relaxation
- Creep
- Temperature sweep
- Arbitrary wave
**Auto-Chunking (v0.4.0+):**
Files larger than 5 MB are automatically loaded using chunked reading for
memory efficiency. This provides 50-87% memory reduction for large files.
**Performance Trade-off:**
Chunked loading trades latency for memory efficiency (2-4x slower loading
in exchange for 50-87% memory reduction). This is ideal for memory-constrained
environments where RAM is more critical than load time.
Args:
filepath: Path to TRIOS .txt file
**kwargs: Additional options
- return_all_segments: If True, return list of RheoData for each segment
- chunk_size: If provided, uses chunked reading (see load_trios_chunked)
- auto_chunk: If True (default), automatically use chunked reading for
files > 5 MB. Set to False to disable auto-detection.
- progress_callback: Optional callback for progress tracking during
chunked loading. Signature: callback(current, total)
Returns:
RheoData object or list of RheoData objects (if multiple segments)
Raises:
FileNotFoundError: If file doesn't exist
ValueError: If file format is not recognized
Notes:
- Auto-chunking threshold: 5 MB (configurable via AUTO_CHUNK_THRESHOLD_MB)
- Memory savings: 50-87% for files > 5 MB with 50k+ points
- Latency trade-off: 2-4x slower (acceptable for memory-constrained scenarios)
- Disable auto-chunking: Pass auto_chunk=False to force full loading
- Use case: Memory-constrained systems, embedded devices, large datasets
See Also:
load_trios_chunked: Memory-efficient streaming for large files
Example:
>>> # Automatic chunking for large files
>>> data = load_trios('large_file.txt') # Auto-chunks if > 5 MB
>>> # Disable auto-chunking
>>> data = load_trios('large_file.txt', auto_chunk=False)
>>> # With progress tracking
>>> def progress(current, total):
... print(f"Loading: {100*current/total:.1f}%")
>>> data = load_trios('large_file.txt', progress_callback=progress)
"""
filepath = Path(filepath)
logger.info("Loading TRIOS TXT file", filepath=str(filepath))
if not filepath.exists():
logger.error("File not found", filepath=str(filepath))
raise FileNotFoundError(f"File not found: {filepath}")
# Check auto-chunking setting (default: True)
auto_chunk = kwargs.pop("auto_chunk", True)
# If chunk_size is provided explicitly, delegate to chunked reader
if "chunk_size" in kwargs:
chunk_size = kwargs.pop("chunk_size")
logger.debug("Using explicit chunked reading", chunk_size=chunk_size)
# Aggregate chunks on-the-fly
progress_callback = kwargs.pop("progress_callback", None)
# R11-TXT-002: Default to segment_index=0 if not provided, and warn
# if multiple segments are detected but segment_index is not specified.
if "segment_index" not in kwargs:
# Quick scan to count segments for the warning
_enc = _detect_txt_encoding(filepath)
_seg_count = 0
with open(filepath, encoding=_enc, errors="replace") as _f:
for _line in _f:
if re.match(r"\[step\]", _line, re.IGNORECASE):
_seg_count += 1
if _seg_count > 1:
logger.warning(
"Multiple segments detected but segment_index not specified; "
"defaulting to segment_index=0 (all segments will be merged)",
filepath=str(filepath),
num_segments=_seg_count,
)
kwargs["segment_index"] = 0
x_parts = []
y_parts = []
first_chunk = None
chunk_count = 0
for chunk in load_trios_chunked(
filepath,
chunk_size=chunk_size,
progress_callback=progress_callback,
**kwargs,
):
if first_chunk is None:
first_chunk = chunk
# Append chunk data
x_parts.append(chunk.x)
y_parts.append(chunk.y)
chunk_count += 1
logger.debug("Chunks aggregated", num_chunks=chunk_count)
# Aggregate chunks into single RheoData object
if first_chunk is not None:
# Concatenate all chunk data
x_combined = np.concatenate(x_parts)
y_combined = np.concatenate(y_parts)
# Create aggregated RheoData
# IO-FIX-001: copy metadata so downstream mutations do not corrupt
# the first_chunk.metadata reference (RheoData stores dict by ref).
aggregated_data = RheoData(
x=x_combined,
y=y_combined,
x_units=first_chunk.x_units,
y_units=first_chunk.y_units,
domain=first_chunk.domain,
metadata=first_chunk.metadata.copy(),
initial_test_mode=first_chunk.test_mode,
validate=kwargs.get("validate_data", True),
)
logger.info(
"TRIOS TXT load complete (chunked)",
filepath=str(filepath),
num_points=len(x_combined),
num_chunks=chunk_count,
)
return aggregated_data
else:
logger.error("No data chunks returned", filepath=str(filepath))
raise ValueError("No data chunks returned from chunked reader")
# Auto-detect file size and use chunked loading if above threshold
if auto_chunk:
file_size_bytes = os.path.getsize(filepath)
file_size_mb = file_size_bytes / (1024 * 1024)
logger.debug(
"File size check",
file_size_mb=file_size_mb,
threshold_mb=AUTO_CHUNK_THRESHOLD_MB,
)
if file_size_mb > AUTO_CHUNK_THRESHOLD_MB:
# Log auto-chunking activation
logger.info(
f"Auto-chunking enabled for file ({file_size_mb:.1f} MB, threshold: {AUTO_CHUNK_THRESHOLD_MB} MB)",
filepath=str(filepath),
file_size_mb=round(file_size_mb, 1),
threshold_mb=AUTO_CHUNK_THRESHOLD_MB,
expected_memory_reduction="50-70%",
)
# Delegate to chunked reader with default chunk size
# Aggregate chunks on-the-fly to avoid keeping all in memory
progress_callback = kwargs.pop("progress_callback", None)
x_parts = []
y_parts = []
first_chunk = None
chunk_count = 0
for chunk in load_trios_chunked(
filepath,
chunk_size=10000,
segment_index=0, # Only first segment — matches non-chunked behavior
progress_callback=progress_callback,
**kwargs,
):
if first_chunk is None:
first_chunk = chunk
# Append chunk data (accumulate references, concatenate once at end)
x_parts.append(chunk.x)
y_parts.append(chunk.y)
chunk_count += 1
logger.debug("Auto-chunking complete", num_chunks=chunk_count)
# R11-TXT-001: Auto-chunk currently only returns segment 0.
# TODO: support multi-segment via return_all_segments.
if kwargs.get("return_all_segments", False):
logger.warning(
"return_all_segments is not supported in auto-chunk mode; "
"only segment 0 is returned. Use auto_chunk=False for multi-segment files.",
filepath=str(filepath),
)
# Aggregate chunks into single RheoData object
if first_chunk is not None:
# Concatenate all chunk data
x_combined = np.concatenate(x_parts)
y_combined = np.concatenate(y_parts)
# Create aggregated RheoData
aggregated_data = RheoData(
x=x_combined,
y=y_combined,
x_units=first_chunk.x_units,
y_units=first_chunk.y_units,
domain=first_chunk.domain,
metadata=first_chunk.metadata.copy(),
initial_test_mode=first_chunk.test_mode,
validate=kwargs.get("validate_data", True),
)
logger.info(
"TRIOS TXT load complete (auto-chunked)",
filepath=str(filepath),
num_points=len(x_combined),
num_chunks=chunk_count,
)
return aggregated_data
else:
logger.error(
"No data chunks returned from auto-chunking", filepath=str(filepath)
)
raise ValueError("No data chunks returned from chunked reader")
# Read file contents
# VIS-TXT-001: Use encoding cascade (UTF-8-sig → UTF-8 → Latin-1) to
# handle EU-locale TA Instruments TXT exports that use Latin-1/CP-1252.
# Hardcoded UTF-8 with errors="replace" silently corrupts non-UTF-8 bytes.
logger.debug("Reading file in full mode", filepath=str(filepath))
content = _read_file_with_encoding_cascade(filepath)
# Split into lines
lines = content.split("\n")
logger.debug("File read", num_lines=len(lines), content_bytes=len(content))
# Extract metadata
metadata = _extract_metadata(lines)
logger.debug("Metadata extracted", metadata_fields=len(metadata))
# Find all data segments
segments = _find_data_segments(lines)
logger.debug("Data segments found", num_segments=len(segments))
if not segments:
logger.error("No data segments found", filepath=str(filepath))
raise ValueError("No data segments found in TRIOS file")
# Parse each segment
rheo_data_list = []
for seg_start, seg_end in segments:
try:
data = _parse_segment(lines, seg_start, seg_end, metadata)
if data is not None:
rheo_data_list.append(data)
logger.debug(
"Segment parsed",
start_line=seg_start,
end_line=seg_end,
num_points=len(data.x), # type: ignore[arg-type]
)
except Exception as e:
logger.error(
"Failed to parse segment",
start_line=seg_start,
error=str(e),
exc_info=True,
)
warnings.warn(
f"Failed to parse segment starting at line {seg_start}: {e}",
stacklevel=2,
)
if not rheo_data_list:
logger.error("No valid data segments could be parsed", filepath=str(filepath))
raise ValueError("No valid data segments could be parsed")
logger.info(
"TRIOS TXT load complete",
filepath=str(filepath),
num_segments=len(rheo_data_list),
)
# Return single RheoData or list
return_all = kwargs.get("return_all_segments", False)
if len(rheo_data_list) == 1 and not return_all:
return rheo_data_list[0]
else:
return rheo_data_list
[docs]
def load_trios_chunked(
filepath: str | Path,
chunk_size: int = 10000,
progress_callback: Callable | None = None,
**kwargs,
):
"""Load TRIOS file in memory-efficient chunks (generator).
This function reads TRIOS files using a streaming approach that yields
RheoData objects for each chunk of data. This is ideal for large files
(> 10 MB, > 50,000 points) where loading the entire file would consume
excessive memory.
**Memory Efficiency:**
- Traditional loading: Entire file in memory (~80 bytes per point)
- Chunked loading: Only chunk_size points in memory at once
- Example: 150k point file with chunk_size=10k uses ~800 KB vs ~12 MB
**Important Notes:**
- Chunks are yielded sequentially as they are read
- Each chunk is an independent RheoData object with complete metadata
- Chunk boundaries are based on data rows, not time or other physical units
- File handle is automatically closed when generator completes or is interrupted
**Progress Tracking (v0.4.0+):**
- Optional progress_callback parameter for monitoring large file loading
- Callback signature: callback(current_points, total_points)
- Called every 5-10% of file processed for efficient monitoring
- Total points estimated from "Number of points" in TRIOS header
Args:
filepath: Path to TRIOS .txt file
chunk_size: Number of data points per chunk (default: 10,000)
- Smaller = less memory, more overhead
- Larger = more memory, less overhead
- Recommended: 5,000 - 20,000 for most files
progress_callback: Optional callback function for progress tracking.
Signature: callback(current_points: int, total_points: int)
Called periodically during loading (every 5-10% progress).
**kwargs: Additional options
- segment_index: If provided, only process this segment (0-based)
- validate_data: Validate each chunk (default: True)
Yields:
RheoData: Chunks of data with metadata preserved
Raises:
FileNotFoundError: If file doesn't exist
ValueError: If file format is not recognized or no segments found
Example:
>>> # Process large file in chunks
>>> for chunk in load_trios_chunked('large_file.txt', chunk_size=10000):
... print(f"Processing {len(chunk.x)} points")
... model.fit(chunk.x, chunk.y)
>>>
>>> # Aggregate results from chunks
>>> max_stress = -float('inf')
>>> for chunk in load_trios_chunked('file.txt'):
... max_stress = max(max_stress, chunk.y.max())
>>> print(f"Maximum stress: {max_stress}")
>>>
>>> # With progress tracking
>>> def progress(current, total):
... pct = 100 * current / total
... print(f"Loading: {pct:.1f}%")
>>> for chunk in load_trios_chunked('large_file.txt', progress_callback=progress):
... process(chunk)
See Also:
load_trios: Standard loading (entire file in memory), auto-chunks for files > 5 MB
"""
filepath = Path(filepath)
logger.info(
"Loading TRIOS TXT file (chunked)",
filepath=str(filepath),
chunk_size=chunk_size,
)
if not filepath.exists():
logger.error("File not found", filepath=str(filepath))
raise FileNotFoundError(f"File not found: {filepath}")
segment_index = kwargs.get("segment_index", None)
validate_data = kwargs.get("validate_data", True)
# First pass: extract metadata and locate segments without loading all data
# VIS-TXT-001: Use encoding cascade for the chunked path as well.
logger.debug("First pass: scanning for segments", filepath=str(filepath))
detected_encoding = _detect_txt_encoding(filepath)
with open(filepath, encoding=detected_encoding, errors="replace") as f:
# Read only header portion for metadata (first 500 lines typically sufficient)
header_lines = []
for i, line in enumerate(f):
header_lines.append(line.rstrip("\n"))
if i >= 499: # R11-TXT-004: 0-based index, so 499 = 500th line
break
# Extract metadata from header
metadata = _extract_metadata(header_lines)
logger.debug("Metadata extracted", metadata_fields=len(metadata))
# Reset to beginning for segment detection
f.seek(0)
# Find segments by scanning file
segment_starts = []
line_num = 0
for line in f:
if re.match(r"\[step\]", line, re.IGNORECASE):
segment_starts.append(line_num)
line_num += 1
if not segment_starts:
logger.error("No data segments found", filepath=str(filepath))
raise ValueError("No data segments found in TRIOS file")
logger.debug("Segments located", num_segments=len(segment_starts))
# Second pass: process each segment in chunks
target_segments = (
[segment_index] if segment_index is not None else range(len(segment_starts))
)
logger.debug(
"Processing segments",
target_segments=list(target_segments),
total_segments=len(segment_starts),
)
for seg_idx in target_segments:
if seg_idx >= len(segment_starts):
logger.warning(
"Segment not found",
segment_index=seg_idx,
total_segments=len(segment_starts),
)
warnings.warn(f"Segment {seg_idx} not found in file", stacklevel=2)
continue
seg_start = segment_starts[seg_idx]
seg_end = (
segment_starts[seg_idx + 1] if seg_idx + 1 < len(segment_starts) else None
)
logger.debug(
"Processing segment",
segment_index=seg_idx,
start_line=seg_start,
end_line=seg_end,
)
# Process this segment in chunks
yield from _read_segment_chunked(
filepath,
seg_start,
seg_end,
metadata,
chunk_size,
validate_data,
progress_callback,
encoding=detected_encoding,
)
# =============================================================================
# Helper functions for _read_segment_chunked (extracted for complexity reduction)
# =============================================================================
def _extract_step_temperature(line: str) -> float | None:
"""Extract temperature from step name line.
Args:
line: Line containing step name with temperature
Returns:
Temperature in Kelvin or None if not found
"""
temp_match = re.search(r"(-?\d+\.?\d*)\s*°C", line)
if temp_match:
temp_c = float(temp_match.group(1))
return temp_c + 273.15 # Convert to Kelvin
return None
def _parse_total_points(line: str) -> int | None:
"""Parse total number of points from header line.
Args:
line: Line containing "Number of points\\t12345"
Returns:
Total points or None if parsing fails
"""
parts = line.split("\t")
if len(parts) >= 2:
try:
return int(parts[1].strip())
except ValueError:
pass
return None
def _parse_headers_and_units(header_line: str, unit_line: str) -> tuple[list, list]:
"""Parse column headers and units from header lines.
Args:
header_line: Tab-separated column headers
unit_line: Tab-separated unit specifications
Returns:
Tuple of (columns, units) lists
"""
columns = [col.strip() for col in header_line.split("\t")]
units = (
[u.strip() for u in unit_line.split("\t")] if unit_line else [""] * len(columns)
)
# Ensure same number of units as columns
while len(units) < len(columns):
units.append("")
return columns, units
def _parse_row_values(values: list[str]) -> list[float]:
"""Convert tab-separated values to floats, skipping first column.
Args:
values: List of string values from a data row
Returns:
List of floats (np.nan for non-numeric values)
"""
row = []
for i, v in enumerate(values):
if i == 0:
# Skip first column (row label)
continue
if not v.strip():
row.append(np.nan)
else:
try:
row.append(float(v))
except ValueError:
# Handle hex values (status bits), dates, strings
row.append(np.nan)
return row
def _process_sample_array_complex(
sample_array: np.ndarray,
x_col: int,
y_col: int,
y_col2: int,
y_units_orig: str,
y_units2_orig: str,
) -> tuple[np.ndarray, np.ndarray]:
"""Process sample array for complex modulus data.
Args:
sample_array: Array of sample data
x_col: X column index
y_col: Y column index (storage modulus)
y_col2: Y2 column index (loss modulus)
y_units_orig: Original units for storage modulus
y_units2_orig: Original units for loss modulus
Returns:
Tuple of (x_chunk, y_chunk) arrays with NaN values removed
"""
x_chunk_array = np.real_if_close(np.asarray(sample_array[:, x_col]))
# Complex modulus: G* = G' + i*G''
y_chunk_real = convert_units(sample_array[:, y_col], y_units_orig, "Pa")
y_chunk_imag = convert_units(sample_array[:, y_col2], y_units2_orig, "Pa")
# Remove non-finite values (NaN and ±inf) — both violate RheoData's
# isfinite invariant and corrupt model fits.
y_chunk_real_array = np.real_if_close(np.asarray(y_chunk_real))
y_chunk_imag_array = np.real_if_close(np.asarray(y_chunk_imag))
valid_mask = (
np.isfinite(x_chunk_array)
& np.isfinite(y_chunk_real_array)
& np.isfinite(y_chunk_imag_array)
)
x_chunk = x_chunk_array[valid_mask]
y_chunk = (y_chunk_real_array + 1j * y_chunk_imag_array)[valid_mask]
return x_chunk, y_chunk
def _process_sample_array_real(
sample_array: np.ndarray, x_col: int, y_col: int
) -> tuple[np.ndarray, np.ndarray]:
"""Process sample array for real-valued data.
Args:
sample_array: Array of sample data
x_col: X column index
y_col: Y column index
Returns:
Tuple of (x_chunk, y_chunk) arrays with NaN and Inf values removed
"""
x_chunk_array = np.real_if_close(np.asarray(sample_array[:, x_col]))
y_chunk_array = np.real_if_close(np.asarray(sample_array[:, y_col]))
valid_mask = np.isfinite(x_chunk_array) & np.isfinite(y_chunk_array)
return x_chunk_array[valid_mask], y_chunk_array[valid_mask]
def _add_sample_to_buffers(
x_chunk: np.ndarray, y_chunk: np.ndarray, current_x: list, current_y: list
) -> int:
"""Add sample data to accumulator buffers.
Args:
x_chunk: X values from sample
y_chunk: Y values from sample
current_x: X accumulator list
current_y: Y accumulator list
Returns:
Number of points added
"""
count = 0
for x_val, y_val in zip(x_chunk, y_chunk, strict=True):
current_x.append(float(x_val) if np.isreal(x_val) else complex(x_val))
current_y.append(float(y_val) if np.isreal(y_val) else complex(y_val))
count += 1
return count
def _process_complex_row(
row: list[float],
x_col: int,
y_col: int,
y_col2: int,
y_units_orig: str,
y_units2_orig: str,
) -> tuple[float | None, complex | None]:
"""Process a single row for complex modulus data.
Args:
row: Parsed row values
x_col: X column index
y_col: Y column index (storage modulus)
y_col2: Y2 column index (loss modulus)
y_units_orig: Original units for storage modulus
y_units2_orig: Original units for loss modulus
Returns:
Tuple of (x_val, y_val) or (None, None) if invalid
"""
x_val = row[x_col]
y_val_real = convert_units(row[y_col], y_units_orig, "Pa")
y_val_imag = convert_units(row[y_col2], y_units2_orig, "Pa")
if np.isnan(x_val) or np.isnan(y_val_real) or np.isnan(y_val_imag):
return None, None
return x_val, complex(y_val_real, y_val_imag)
def _process_real_row(
row: list[float], x_col: int, y_col: int
) -> tuple[float | None, float | None]:
"""Process a single row for real-valued data.
Args:
row: Parsed row values
x_col: X column index
y_col: Y column index
Returns:
Tuple of (x_val, y_val) or (None, None) if invalid
"""
x_val = row[x_col]
y_val = row[y_col]
if np.isnan(x_val) or np.isnan(y_val):
return None, None
return x_val, y_val
def _create_rheodata_chunk(
current_x: list,
current_y: list,
x_units: str,
y_units: str,
domain: str,
metadata: dict,
validate: bool,
) -> RheoData:
"""Create RheoData from accumulated chunk data.
Args:
current_x: X values list
current_y: Y values list
x_units: X axis units
y_units: Y axis units
domain: Data domain
metadata: Segment metadata
validate: Whether to validate data
Returns:
RheoData object
"""
return RheoData(
x=np.array(current_x),
y=np.array(current_y),
x_units=x_units,
y_units=y_units,
domain=domain,
initial_test_mode=metadata.get("test_mode"),
metadata=metadata.copy(),
validate=validate,
)
def _read_sample_rows(
file_handle, chunk_size: int, num_columns: int
) -> tuple[list[list[float]], int]:
"""Read and parse sample data rows to determine column structure.
Args:
file_handle: Open file handle positioned after header
chunk_size: Chunk size to limit sample rows
num_columns: Expected number of columns
Returns:
Tuple of (sample_rows, lines_read)
"""
sample_rows = []
lines_read = 0
for _ in range(min(10, chunk_size)):
try:
line = next(file_handle)
lines_read += 1
if not line.strip() or line.startswith("["):
break
values = line.split("\t")
if len(values) == num_columns:
row = _parse_row_values(values)
if row:
sample_rows.append(row)
except (StopIteration, ValueError):
break
return sample_rows, lines_read
def _read_segment_chunked(
filepath: Path,
seg_start: int,
seg_end: int | None,
metadata: dict,
chunk_size: int,
validate_data: bool,
progress_callback: Callable | None = None,
encoding: str = "utf-8",
):
"""Read a single segment in chunks (internal generator).
Args:
filepath: Path to file
seg_start: Segment start line number
seg_end: Segment end line number (None for end of file)
metadata: File metadata dictionary
chunk_size: Number of data points per chunk
validate_data: Whether to validate each chunk
progress_callback: Optional progress callback (current_points, total_points)
encoding: File encoding (should match encoding detected by caller)
Yields:
RheoData: Chunks of segment data
"""
with open(filepath, encoding=encoding, errors="replace") as f:
# Skip to segment start
for _ in range(seg_start):
next(f)
# Phase 1: Parse segment header and find data section
header_result = _parse_segment_header(f, seg_start, seg_end)
if header_result is None:
return
step_temperature, line_num, num_points_line = header_result
# Phase 2: Process the data section starting at "Number of points"
yield from _process_data_section(
f,
line_num,
seg_end,
step_temperature,
num_points_line,
metadata,
chunk_size,
validate_data,
progress_callback,
)
def _parse_segment_header(file_handle, seg_start: int, seg_end: int | None):
"""Parse segment header to find data section start.
Args:
file_handle: Open file handle
seg_start: Segment start line number
seg_end: Segment end line number
Returns:
Tuple of (step_temperature, line_num, num_points_line) or None
"""
step_temperature = None
line_num = seg_start
for line in file_handle:
line_num += 1
# Extract temperature from step name
if "Step name" in line and step_temperature is None:
step_temperature = _extract_step_temperature(line)
# Check if we've reached segment end
if seg_end is not None and line_num >= seg_end:
return None
# Check if we found "Number of points" (data section starts next)
if line.startswith("Number of points"):
return step_temperature, line_num, line
return None
def _process_data_section(
file_handle,
line_num: int,
seg_end: int | None,
step_temperature: float | None,
num_points_line: str,
metadata: dict,
chunk_size: int,
validate_data: bool,
progress_callback: Callable | None,
):
"""Process the data section of a segment.
Args:
file_handle: Open file handle positioned at data section
line_num: Current line number
seg_end: Segment end line number
step_temperature: Temperature in Kelvin
num_points_line: Line containing number of points
metadata: File metadata
chunk_size: Chunk size
validate_data: Whether to validate data
progress_callback: Progress callback
Yields:
RheoData chunks
"""
# Parse total points for progress tracking
total_points = _parse_total_points(num_points_line) if progress_callback else None
# Read column headers and units
try:
header_line = next(file_handle).rstrip("\n")
unit_line = next(file_handle).rstrip("\n")
except StopIteration:
raise ValueError(
"TRIOS TXT segment truncated: expected header and unit lines"
) from None
line_num += 2
columns, units = _parse_headers_and_units(header_line, unit_line)
# Read sample rows to determine column structure
sample_rows, lines_read = _read_sample_rows(file_handle, chunk_size, len(columns))
line_num += lines_read
if not sample_rows:
return # No data in segment
sample_array = np.array(sample_rows)
# Adjust column indices since we skipped column 0
columns = columns[1:]
units = units[1:]
# Determine x/y columns
col_info = _determine_xy_columns(columns, units, sample_array)
x_col, x_units, y_col, y_units, y_col2, y_units2 = col_info
if x_col is None or y_col is None:
warnings.warn(f"Could not determine x/y columns from: {columns}", stacklevel=2)
return
if y_col2 is not None and y_col2 >= sample_array.shape[1]:
logger.warning("y_col2 index out of bounds; treating as non-complex data")
y_col2 = None
y_units2 = None
# Build segment metadata
domain, test_mode = _infer_domain_and_mode(
columns[x_col],
columns[y_col],
x_units,
y_units,
all_columns=columns,
)
segment_metadata = _build_segment_metadata(
metadata, test_mode, columns, units, step_temperature
)
# Track complex vs real data
is_complex = y_col2 is not None
y_units_orig = y_units
y_units2_orig = y_units2 if is_complex else None
# Process sample array
if is_complex:
if y_col2 is None or y_units2_orig is None: # pragma: no cover
raise ValueError("y_col2 and y_units2 required for complex data")
x_chunk, y_chunk = _process_sample_array_complex(
sample_array, x_col, y_col, y_col2, y_units_orig, y_units2_orig
)
y_units = "Pa" # Standardized after conversion
else:
x_chunk, y_chunk = _process_sample_array_real(sample_array, x_col, y_col)
# Initialize accumulators and progress tracking
current_x: list = []
current_y: list = []
total_points_read = _add_sample_to_buffers(x_chunk, y_chunk, current_x, current_y)
progress_interval = max(1, total_points // 20) if total_points else chunk_size
# Process remaining data rows
yield from _process_remaining_rows(
file_handle,
line_num,
seg_end,
columns,
x_col,
y_col,
y_col2,
is_complex,
y_units_orig,
y_units2_orig,
current_x,
current_y,
total_points_read,
total_points,
progress_interval,
progress_callback,
x_units,
y_units,
domain,
segment_metadata,
chunk_size,
validate_data,
)
def _build_segment_metadata(
base_metadata: dict,
test_mode: str,
columns: list,
units: list,
step_temperature: float | None,
) -> dict:
"""Build segment metadata dictionary.
Args:
base_metadata: Base file metadata
test_mode: Detected test mode
columns: Column names
units: Column units
step_temperature: Temperature in Kelvin
Returns:
Segment metadata dictionary
"""
segment_metadata = base_metadata.copy()
segment_metadata["test_mode"] = test_mode
segment_metadata["columns"] = columns
segment_metadata["units"] = units
if step_temperature is not None:
segment_metadata["temperature"] = step_temperature
# Preserve Celsius value for user convenience (step_temperature is in Kelvin)
segment_metadata["temperature_celsius"] = step_temperature - 273.15
return segment_metadata
def _process_remaining_rows(
file_handle,
line_num: int,
seg_end: int | None,
columns: list,
x_col: int,
y_col: int,
y_col2: int | None,
is_complex: bool,
y_units_orig: str,
y_units2_orig: str | None,
current_x: list,
current_y: list,
total_points_read: int,
total_points: int | None,
progress_interval: int,
progress_callback: Callable | None,
x_units: str,
y_units: str,
domain: str,
segment_metadata: dict,
chunk_size: int,
validate_data: bool,
):
"""Process remaining data rows after sample rows.
Args:
Various state and configuration parameters
Yields:
RheoData chunks
"""
last_progress_report = 0
expected_columns = len(columns) + 1 # +1 for the row label we skip
max_col_needed = max(x_col, y_col, y_col2 if y_col2 is not None else 0)
_skipped_rows = 0
for line in file_handle:
line_num += 1
# Check segment boundary
if seg_end is not None and line_num >= seg_end:
break
if not line.strip() or line.startswith("["):
break
values = line.split("\t")
if len(values) != expected_columns:
continue
try:
row = _parse_row_values(values)
if len(row) <= max_col_needed:
continue
# Process row based on data type
if is_complex:
if y_col2 is None or y_units2_orig is None: # pragma: no cover
raise ValueError("y_col2 and y_units2 required for complex data")
x_val, y_val = _process_complex_row(
row, x_col, y_col, y_col2, y_units_orig, y_units2_orig
)
else:
x_val, y_val = _process_real_row(row, x_col, y_col)
if x_val is not None:
current_x.append(x_val)
current_y.append(y_val)
total_points_read += 1
# Report progress periodically
if (
progress_callback is not None
and total_points is not None
and total_points_read - last_progress_report >= progress_interval
):
progress_callback(total_points_read, total_points)
last_progress_report = total_points_read
# Yield chunk when size reached
if len(current_x) >= chunk_size:
yield _create_rheodata_chunk(
current_x,
current_y,
x_units,
y_units,
domain,
segment_metadata,
validate_data,
)
current_x.clear()
current_y.clear()
except (ValueError, IndexError):
_skipped_rows += 1
continue
if _skipped_rows > 0:
logger.warning(
"Skipped malformed rows in TRIOS TXT file",
skipped_rows=_skipped_rows,
)
# Yield remaining data as final chunk
if current_x:
yield _create_rheodata_chunk(
current_x,
current_y,
x_units,
y_units,
domain,
segment_metadata,
validate_data,
)
# Final progress report
if progress_callback is not None and total_points is not None:
progress_callback(total_points_read, total_points)
def _extract_metadata(lines: list[str]) -> dict:
"""Extract metadata from file header.
Args:
lines: File lines
Returns:
Dictionary of metadata
"""
metadata = {}
# Regular expressions for metadata
patterns = {
"filename": r"Filename\s+(.*)",
"instrument_serial_number": r"Instrument serial number\s+(.*)",
"instrument_name": r"Instrument name\s+(.*)",
"operator": r"operator\s+(.*)",
"run_date": r"rundate\s+(.*)",
"sample_name": r"Sample name\s+(.*)",
"geometry": r"Geometry name\s+(.*)",
"geometry_type": r"Geometry type\s+(.*)",
}
# IO-009: extend metadata scan window for multi-step TRIOS files
for line in lines[:500]: # was 100
for key, pattern in patterns.items():
match = re.match(pattern, line, re.IGNORECASE)
if match:
metadata[key] = match.group(1).strip()
return metadata
def _find_data_segments(lines: list[str]) -> list[tuple]:
"""Find all [step] data segments in file.
Args:
lines: File lines
Returns:
List of (start_index, end_index) tuples
"""
segments = []
step_pattern = r"\[step\]"
for i, line in enumerate(lines):
if re.match(step_pattern, line, re.IGNORECASE):
segments.append(i)
# Convert to (start, end) pairs
segment_pairs = []
for i in range(len(segments)):
start = segments[i]
end = segments[i + 1] if i + 1 < len(segments) else len(lines)
segment_pairs.append((start, end))
return segment_pairs
def _parse_segment(
lines: list[str], start: int, end: int, metadata: dict
) -> RheoData | None:
"""Parse a single data segment.
Args:
lines: File lines
start: Segment start index
end: Segment end index
metadata: File metadata
Returns:
RheoData object or None if segment can't be parsed
"""
# Find header and data lines
segment_lines = lines[start:end]
# Extract temperature and protocol metadata from step name
# e.g., "Frequency sweep (150.0 °C)" or "Amplitude sweep (1.0 Hz, 25.0 °C)"
step_temperature = None
step_protocol_meta: dict = {}
for line in segment_lines[:5]: # Check first few lines
if "Step name" in line or line.startswith("Step name"):
# Extract temperature from format: "Step name\tFrequency sweep (150.0 °C)"
# Support negative temperatures with optional minus sign
temp_match = re.search(r"(-?\d+\.?\d*)\s*°C", line)
if temp_match:
temp_c = float(temp_match.group(1))
step_temperature = temp_c + 273.15 # Convert to Kelvin
# Extract angular frequency or frequency from step name
omega_match = re.search(r"(\d+\.?\d*)\s*(?:rad/s|rad s)", line)
if omega_match:
step_protocol_meta["omega"] = float(omega_match.group(1))
freq_match = re.search(r"(\d+\.?\d*)\s*Hz", line, re.IGNORECASE)
if freq_match and "omega" not in step_protocol_meta:
step_protocol_meta["omega"] = float(freq_match.group(1)) * 2 * math.pi
# Extract strain amplitude from step name (e.g. "1.0 %")
strain_match = re.search(r"(\d+\.?\d*)\s*%", line)
if strain_match:
step_protocol_meta["gamma_0"] = float(strain_match.group(1)) * 0.01
break
# Look for "Number of points" line
num_points_line = None
for i, line in enumerate(segment_lines):
if line.startswith("Number of points"):
num_points_line = i
break
if num_points_line is not None:
header_offset = num_points_line + 1
else:
# Try to find column headers
header_offset = 1
# Extract column headers and units
if header_offset >= len(segment_lines):
return None
header_line = segment_lines[header_offset].strip()
# Use rstrip only — the leading tab on the unit row is meaningful
# (column 0 is the row label with no unit).
unit_line = (
segment_lines[header_offset + 1].rstrip()
if header_offset + 1 < len(segment_lines)
else ""
)
if not header_line:
return None
# Parse column names
columns = [col.strip() for col in header_line.split("\t")]
units = (
[u.strip() for u in unit_line.split("\t")] if unit_line else [""] * len(columns)
)
# Ensure we have same number of units as columns
while len(units) < len(columns):
units.append("")
# Parse data rows
data_start = header_offset + 2
data_rows = []
for line in segment_lines[data_start:]:
if not line.strip() or line.startswith("["):
break
values = line.split("\t")
if len(values) == len(columns):
# Skip first column (row label like "Data point")
# Convert remaining columns, using np.nan for non-numeric values
row = []
for i, v in enumerate(values):
if i == 0:
# Skip first column (row label)
continue
if not v.strip():
row.append(np.nan)
else:
try:
row.append(float(v))
except ValueError:
# Handle hex values (status bits), dates, strings
row.append(np.nan)
if row: # Only add if we have data
data_rows.append(row)
if not data_rows:
return None
# Convert to numpy array
data_array = np.array(data_rows)
# Adjust column indices since we skipped column 0
columns = columns[1:] # Remove first column ("Variables" or similar)
units = units[1:] # Remove first unit
# Determine x and y columns based on common column names
x_col, x_units, y_col, y_units, y_col2, y_units2 = _determine_xy_columns(
columns, units, data_array
)
if x_col is None or y_col is None:
warnings.warn(f"Could not determine x/y columns from: {columns}", stacklevel=2)
return None
if y_col2 is not None and y_col2 >= data_array.shape[1]:
logger.warning("y_col2 index out of bounds; treating as non-complex data")
y_col2 = None
y_units2 = None
# Extract x data
x_data = data_array[:, x_col]
# Extract y data (construct complex modulus if both G' and G'' are available)
if y_col2 is not None:
# Complex modulus: G* = G' + i*G''
y_data_real = data_array[:, y_col] # Storage modulus (G')
y_data_imag = data_array[:, y_col2] # Loss modulus (G'')
# Apply unit conversions to both components
y_data_real = np.asarray(convert_units(y_data_real, y_units, "Pa"))
y_data_imag = np.asarray(convert_units(y_data_imag, y_units2, "Pa"))
x_data_array = np.real_if_close(np.asarray(x_data))
y_real_array = np.real_if_close(np.asarray(y_data_real))
y_imag_array = np.real_if_close(np.asarray(y_data_imag))
# Construct complex modulus
y_data = y_real_array + 1j * y_imag_array
y_units = "Pa" # Standardize to Pa for complex modulus
# Remove NaN values from either component
valid_mask = ~(
np.isnan(x_data_array) | np.isnan(y_real_array) | np.isnan(y_imag_array)
)
else:
# Real-valued data
y_data = data_array[:, y_col]
x_data_array = np.real_if_close(np.asarray(x_data))
y_data_array = np.real_if_close(np.asarray(y_data))
# Remove NaN values
valid_mask = ~(np.isnan(x_data_array) | np.isnan(y_data_array))
y_data = y_data_array # type: ignore[assignment]
x_data = x_data_array[valid_mask]
y_data = y_data[valid_mask]
if len(x_data) == 0:
return None
# Determine domain and test mode
domain, test_mode = _infer_domain_and_mode(
columns[x_col],
columns[y_col],
x_units,
y_units,
all_columns=columns,
)
# Update metadata
segment_metadata = metadata.copy()
segment_metadata["test_mode"] = test_mode
segment_metadata["columns"] = columns
segment_metadata["units"] = units
# Detect deformation mode from column names
col_names_lower = [c.lower() for c in columns]
if any(
"tensile" in c
or c.startswith("e' ")
or c == "e'"
or c.startswith("e'' ")
or c == "e''"
for c in col_names_lower
):
segment_metadata["deformation_mode"] = "tension"
# Add temperature if found (step_temperature is already in Kelvin)
if step_temperature is not None:
segment_metadata["temperature"] = step_temperature
# Preserve Celsius value for user convenience
segment_metadata["temperature_celsius"] = step_temperature - 273.15
# Add protocol metadata extracted from step name
segment_metadata.update(step_protocol_meta)
# Check for LAOS harmonic columns
col_names_lower_full = [c.lower() for c in columns]
harmonic_col = next(
(
i
for i, c in enumerate(col_names_lower_full)
if ("harmonic" in c and "number" in c) or c == "harmonic"
),
None,
)
in_phase_col = next(
(
i
for i, c in enumerate(col_names_lower_full)
if "in-phase" in c or "in_phase" in c or "in phase" in c
),
None,
)
out_of_phase_col = next(
(
i
for i, c in enumerate(col_names_lower_full)
if "out-of-phase" in c or "out_of_phase" in c or "out of phase" in c
),
None,
)
if harmonic_col is not None or (
in_phase_col is not None and out_of_phase_col is not None
):
laos_harmonics: dict = {}
if harmonic_col is not None and harmonic_col < data_array.shape[1]:
laos_harmonics["harmonic_numbers"] = data_array[:, harmonic_col]
if in_phase_col is not None and in_phase_col < data_array.shape[1]:
laos_harmonics["in_phase"] = data_array[:, in_phase_col]
if out_of_phase_col is not None and out_of_phase_col < data_array.shape[1]:
laos_harmonics["out_of_phase"] = data_array[:, out_of_phase_col]
if laos_harmonics:
segment_metadata["laos_harmonics"] = laos_harmonics
logger.debug(
"LAOS harmonic columns detected", keys=list(laos_harmonics.keys())
)
return RheoData(
x=x_data,
y=y_data,
x_units=x_units,
y_units=y_units,
domain=domain,
initial_test_mode=test_mode,
metadata=segment_metadata,
validate=True,
)
def _determine_xy_columns(
columns: list[str], units: list[str], data: np.ndarray
) -> tuple:
"""Determine which columns to use for x and y.
For oscillatory (SAOS) data with both Storage and Loss modulus columns,
this will return both column indices to construct complex modulus G* = G' + i·G''.
Args:
columns: Column names
units: Column units
data: Data array
Returns:
Tuple of (x_col_index, x_units, y_col_index, y_units, y_col2_index, y_units2)
where y_col2_index is None for non-complex data, or the Loss modulus column
index for complex modulus construction.
"""
columns_lower = [c.lower() for c in columns]
# Priority lists for x and y columns
# Note: Frequency comes before general "time" to prioritize frequency sweeps
x_priorities = [
"angular frequency",
"frequency",
"shear rate",
"temperature",
"step time",
"time",
"strain",
]
y_priorities = [
"tensile storage modulus",
"tensile loss modulus",
"e' ",
"e'' ",
"storage modulus",
"loss modulus",
"stress",
"strain",
"viscosity",
"complex modulus",
"complex viscosity",
"torque",
"normal stress",
]
# Find x column
x_col = None
for priority in x_priorities:
for i, col in enumerate(columns_lower):
if priority in col:
x_col = i
break
if x_col is not None:
break
# Check for BOTH storage and loss modulus (for complex modulus construction)
# Also handles tensile (E'/E'') patterns for DMTA data
storage_col = None
loss_col = None
for i, col in enumerate(columns_lower):
if (
"storage modulus" in col
or "tensile storage modulus" in col
or col.startswith("e' ")
) and i != x_col:
storage_col = i
elif (
"loss modulus" in col
or "tensile loss modulus" in col
or col.startswith("e'' ")
) and i != x_col:
loss_col = i
# If we have both G' and G'' (or E' and E''), use them to construct complex modulus
if storage_col is not None and loss_col is not None:
x_units = units[x_col] if x_col is not None and x_col < len(units) else ""
y_units = units[storage_col] if storage_col < len(units) else ""
y_units2 = units[loss_col] if loss_col < len(units) else ""
return x_col, x_units, storage_col, y_units, loss_col, y_units2
# Otherwise, find single y column (prefer storage/loss modulus for SAOS)
y_col = None
for priority in y_priorities:
for i, col in enumerate(columns_lower):
if priority in col and i != x_col:
y_col = i
break
if y_col is not None:
break
# Fallback: use first two numeric columns
if x_col is None or y_col is None:
numeric_cols = []
for i in range(min(data.shape[1], len(columns))):
if not np.all(np.isnan(data[:, i])):
numeric_cols.append(i)
if len(numeric_cols) >= 2:
x_col = numeric_cols[0] if x_col is None else x_col
y_col = numeric_cols[1] if y_col is None else y_col
if x_col is None or y_col is None:
return None, None, None, None, None, None
x_units = units[x_col] if x_col < len(units) else ""
y_units = units[y_col] if y_col < len(units) else ""
return x_col, x_units, y_col, y_units, None, None
def _infer_domain_and_mode(
x_name: str,
y_name: str,
x_units: str,
y_units: str,
all_columns: list[str] | None = None,
) -> tuple:
"""Infer domain and test mode from column names and units.
Args:
x_name: X column name
y_name: Y column name
x_units: X units
y_units: Y units
all_columns: All column names in the segment (used to detect
paired G'/G'' for disambiguating oscillation vs relaxation)
Returns:
Tuple of (domain, test_mode)
"""
x_lower = x_name.lower()
y_lower = y_name.lower()
# Frequency domain (SAOS)
if "frequency" in x_lower or "rad/s" in x_units.lower() or "hz" in x_units.lower():
if "modulus" in y_lower:
return "frequency", "oscillation"
# Time domain
# R11-TXT-003: Match "step time" as well as plain "time" by checking
# individual words via split() to avoid false matches on substrings.
x_words = x_lower.split()
if "time" in x_words or "s" == x_units.lower():
# R11-TXT-003: "Storage Modulus" vs time is only oscillation when a
# paired "Loss Modulus" column exists (time-sweep SAOS). Without
# the pair, TRIOS may label the relaxation modulus G(t) as
# "Storage Modulus", so fall through to the generic modulus check.
if "storage" in y_lower:
has_loss_pair = False
if all_columns is not None:
has_loss_pair = any("loss" in c.lower() for c in all_columns)
if has_loss_pair:
return "time", "oscillation"
# No loss pair → fall through to modulus check (relaxation)
if "stress" in y_lower:
# Check if strain or stress in name
if "relax" in y_lower:
return "time", "relaxation"
else:
return "time", "creep"
elif "modulus" in y_lower:
return "time", "relaxation"
# Shear rate (steady shear / flow)
if "shear rate" in x_lower or "1/s" in x_units:
return "time", "rotation"
# Temperature sweep
if "temperature" in x_lower:
if "modulus" in y_lower:
return "frequency", "oscillation" # Temperature sweep at constant frequency
else:
return "time", "temperature_sweep"
# Default
return "time", "unknown"