Source code for rheojax.io.readers.trios.json

"""TA Instruments TRIOS JSON file reader.

This module provides a reader for TRIOS JSON exports with support for:
- Schema validation against official TRIOS JSON Export Schema
- Structured parsing using TRIOSExperiment dataclasses
- Multiple results and datasets
- Step/Segment columns for multi-step experiments
- Complex modulus construction (G' + iG'')

Usage:
    >>> from rheojax.io.readers.trios import load_trios_json
    >>> data = load_trios_json('relaxation.json')
    >>> print(data.test_mode)  # 'relaxation'
"""

from __future__ import annotations

import json
import warnings
from pathlib import Path
from typing import Any

import numpy as np

from rheojax.core.data import RheoData
from rheojax.io.readers.trios.common import (
    DataSegment,
    construct_complex_modulus,
    convert_unit,
    detect_step_column,
    detect_test_type,
    segment_to_rheodata,
    select_xy_columns,
    split_by_step,
)
from rheojax.io.readers.trios.schema import TRIOSExperiment
from rheojax.logging import get_logger

logger = get_logger(__name__)

# Path to bundled schema
SCHEMA_PATH = Path(__file__).parent / "schema" / "TRIOSJSONExportSchema.json"


def _load_schema() -> dict[str, Any] | None:
    """Load the bundled TRIOS JSON schema.

    Returns:
        Schema dictionary or None if not found
    """
    if not SCHEMA_PATH.exists():
        logger.debug("Schema file not found", schema_path=str(SCHEMA_PATH))
        return None

    logger.debug("Loading JSON schema", schema_path=str(SCHEMA_PATH))
    with open(SCHEMA_PATH, encoding="utf-8") as f:
        return json.load(f)


def validate_schema(
    data: dict[str, Any],
    *,
    raise_on_error: bool = False,
) -> tuple[bool, list[str]]:
    """Validate JSON data against bundled TRIOS schema.

    Args:
        data: Parsed JSON dictionary
        raise_on_error: Raise ValueError if validation fails

    Returns:
        Tuple of (is_valid, list of error messages)
    """
    logger.debug("Validating JSON schema")

    try:
        import jsonschema
    except ImportError:
        logger.debug("jsonschema not installed, skipping validation")
        return True, []

    schema = _load_schema()
    if schema is None:
        logger.warning("TRIOS JSON schema not found, skipping validation")
        return True, []

    errors: list[str] = []

    try:
        jsonschema.validate(data, schema)
        logger.debug("Schema validation passed")
        return True, []
    except jsonschema.ValidationError as e:
        error_msg = f"Schema validation error: {e.message}"
        errors.append(error_msg)
        logger.warning("Schema validation failed", error=e.message, path=list(e.path))

        if raise_on_error:
            raise ValueError(error_msg) from e

        return False, errors
    except jsonschema.SchemaError as e:
        error_msg = f"Schema error: {e.message}"
        errors.append(error_msg)
        logger.error("Schema error", error=e.message, exc_info=True)
        return False, errors


def parse_trios_json(
    filepath: str | Path,
    *,
    validate: bool = True,
) -> tuple[TRIOSExperiment, dict[str, Any]]:
    """Low-level JSON parser returning TRIOSExperiment and metadata.

    Args:
        filepath: Path to TRIOS JSON file
        validate: Validate against bundled schema

    Returns:
        Tuple of (TRIOSExperiment, metadata dict)

    Raises:
        FileNotFoundError: File does not exist
        json.JSONDecodeError: Invalid JSON syntax
        ValueError: Invalid structure or schema validation failed
    """
    filepath = Path(filepath)
    logger.info("Parsing TRIOS JSON file", filepath=str(filepath))

    if not filepath.exists():
        logger.error("File not found", filepath=str(filepath))
        raise FileNotFoundError(f"File not found: {filepath}")

    # Read and parse JSON with encoding cascade
    logger.debug("Reading JSON file", filepath=str(filepath))
    try:
        raw_bytes = filepath.read_bytes()
        text = None
        for enc in ("utf-8-sig", "utf-8", "latin-1"):
            try:
                text = raw_bytes.decode(enc)
                logger.debug("JSON decoded with encoding", encoding=enc)
                break
            except UnicodeDecodeError:
                continue
        if text is None:
            text = raw_bytes.decode("latin-1")  # latin-1 never fails
        data = json.loads(text)
        logger.debug("JSON parsed successfully", num_keys=len(data))
    except json.JSONDecodeError as e:
        logger.error(
            "Invalid JSON syntax",
            filepath=str(filepath),
            line=e.lineno,
            column=e.colno,
            error=e.msg,
            exc_info=True,
        )
        raise

    # Validate schema if requested
    if validate:
        is_valid, errors = validate_schema(data)
        if not is_valid:
            logger.warning(
                "Schema validation failed, attempting best-effort parsing",
                num_errors=len(errors),
            )

    # Check for schema version mismatch
    data_schema = data.get("$schema") or data.get("schemaVersion")
    if data_schema:
        logger.debug("JSON schema version", version=data_schema)

    # Parse into TRIOSExperiment
    try:
        experiment = TRIOSExperiment.from_json(data)
        logger.debug(
            "TRIOSExperiment created",
            num_results=experiment.n_results,
        )
    except (KeyError, TypeError) as e:
        logger.error(
            "Invalid TRIOS JSON structure",
            filepath=str(filepath),
            error=str(e),
            exc_info=True,
        )
        raise ValueError(f"Invalid TRIOS JSON structure: {e}") from e

    # Extract metadata
    metadata = experiment.get_metadata()
    metadata["source_file"] = Path(filepath).name
    metadata["source_format"] = "json"

    logger.info(
        "TRIOS JSON parsing complete",
        filepath=str(filepath),
        num_results=experiment.n_results,
    )

    return experiment, metadata


[docs] def load_trios_json( filepath: str | Path, *, return_all_segments: bool = False, test_mode: str | None = None, result_index: int = 0, validate_json_schema: bool = True, validate: bool = True, ) -> RheoData | list[RheoData]: """Load TRIOS JSON export file. Uses adapted tadatakit code to parse TRIOS JSON format with schema validation against official TRIOS JSON Export Schema. Args: filepath: Path to TRIOS JSON file return_all_segments: Return list for multi-step files test_mode: Override auto-detection ("creep", "relaxation", "oscillation", "rotation") result_index: Result set index to load (default: 0, or -1 for all) validate_json_schema: Validate against TRIOS schema (default: True) validate: Validate RheoData on creation Returns: Single RheoData or list of RheoData Raises: FileNotFoundError: File does not exist ValueError: Invalid JSON structure or schema mismatch json.JSONDecodeError: Invalid JSON syntax Notes: Schema version mismatch logs warning but attempts parsing. Example: >>> data = load_trios_json('relaxation.json') >>> print(data.test_mode) # 'relaxation' >>> print(data.x_units) # 's' (time) >>> print(data.y_units) # 'Pa' (relaxation modulus) """ logger.info("Loading TRIOS JSON file", filepath=str(filepath)) # Parse JSON file experiment, base_metadata = parse_trios_json( filepath, validate=validate_json_schema ) if experiment.n_results == 0: logger.error("No results found in file", filepath=str(filepath)) raise ValueError(f"No results found in {filepath}") # Determine which results to process if result_index == -1: result_indices = list(range(experiment.n_results)) else: if result_index >= experiment.n_results: logger.error( "Result index out of range", result_index=result_index, num_results=experiment.n_results, ) raise ValueError( f"Result index {result_index} out of range. " f"File contains {experiment.n_results} result(s)." ) result_indices = [result_index] logger.debug( "Processing results", result_indices=result_indices, total_results=experiment.n_results, ) rheo_data_list: list[RheoData] = [] for res_idx in result_indices: result = experiment.results[res_idx] df = result.get_dataframe() units = result.get_units() logger.debug( "Processing result", result_index=res_idx, shape=df.shape, columns=list(df.columns), ) if df.empty: logger.warning("Result has no data, skipping", result_index=res_idx) continue # Detect or use provided test mode. # IO-FIX-002: explicit None check avoids or-sentinel swallowing # falsy test_mode values (e.g. empty string). detected_mode = detect_test_type(df) if test_mode is None else test_mode logger.debug("Test mode", detected_mode=detected_mode, provided=test_mode) # Check for step column and split if needed step_col = detect_step_column(df) segments = ( [df] if not step_col or not return_all_segments else split_by_step(df, step_col) ) logger.debug( "Segments identified", step_col=step_col, num_segments=len(segments), ) for seg_idx, seg_df in enumerate(segments): # Select x/y columns x_col, y_col, y2_col = select_xy_columns(seg_df, detected_mode) if x_col is None or y_col is None: msg = ( f"Skipping TRIOS JSON segment {seg_idx} (result {res_idx}): " f"could not determine x/y columns. " f"Available columns: {list(seg_df.columns)}" ) warnings.warn(msg, stacklevel=2) logger.warning( "Could not determine x/y columns", result_index=res_idx, segment_index=seg_idx, available_columns=list(seg_df.columns), ) continue logger.debug( "Columns selected", result_index=res_idx, segment_index=seg_idx, x_col=x_col, y_col=y_col, y2_col=y2_col, ) # Extract data try: x_data = seg_df[x_col].values.astype(float) except (ValueError, TypeError) as e: raise ValueError( f"Column '{x_col}' contains non-numeric data that cannot be converted to float. " f"Sample values: {seg_df[x_col].head(3).tolist()}" ) from e # Get units x_units = units.get(x_col, "") y_units = units.get(y_col, "Pa") # Handle complex modulus case if y2_col is not None: try: y_real = seg_df[y_col].values.astype(float) except (ValueError, TypeError) as e: raise ValueError( f"Column '{y_col}' contains non-numeric data that cannot be converted to float. " f"Sample values: {seg_df[y_col].head(3).tolist()}" ) from e try: y_imag = seg_df[y2_col].values.astype(float) except (ValueError, TypeError) as e: raise ValueError( f"Column '{y2_col}' contains non-numeric data that cannot be converted to float. " f"Sample values: {seg_df[y2_col].head(3).tolist()}" ) from e # Convert units if needed y_units_orig = units.get(y_col, "Pa") y2_units_orig = units.get(y2_col, "Pa") y_real, _ = convert_unit(y_real, y_units_orig, "Pa") y_imag, _ = convert_unit(y_imag, y2_units_orig, "Pa") # Construct complex modulus y_data = construct_complex_modulus(y_real, y_imag) y_units = "Pa" is_complex = True else: try: y_data = seg_df[y_col].values.astype(float) except (ValueError, TypeError) as e: raise ValueError( f"Column '{y_col}' contains non-numeric data that cannot be converted to float. " f"Sample values: {seg_df[y_col].head(3).tolist()}" ) from e is_complex = False # Convert x units (e.g., Hz to rad/s for oscillation, ensure 1/s for rotation) if detected_mode == "oscillation": x_data, x_units = convert_unit(x_data, x_units, "rad/s") elif detected_mode == "rotation": x_data, x_units = convert_unit(x_data, x_units, "1/s") # Remove non-finite values (NaN and ±inf) to satisfy RheoData's # isfinite invariant and prevent corrupt values from poisoning fits. if is_complex: valid_mask = ( np.isfinite(x_data) & np.isfinite(np.real(y_data)) & np.isfinite(np.imag(y_data)) ) else: valid_mask = np.isfinite(x_data) & np.isfinite(y_data) x_data = x_data[valid_mask] y_data = y_data[valid_mask] if len(x_data) == 0: logger.warning( "Segment has 0 valid data points after non-finite filtering; skipping", segment_index=seg_idx, result_index=res_idx, ) continue # Determine default x_units based on test mode if not x_units: if detected_mode == "oscillation": x_units = "rad/s" elif detected_mode == "rotation": x_units = "1/s" else: x_units = "s" # Build metadata seg_metadata = base_metadata.copy() seg_metadata["test_mode"] = detected_mode seg_metadata["result_index"] = res_idx seg_metadata["x_column"] = x_col seg_metadata["y_column"] = y_col if y2_col: seg_metadata["y2_column"] = y2_col seg_metadata["is_complex"] = is_complex # Add result-level properties if result.properties: for key, value in result.properties.items(): seg_metadata[f"result_{_snake_case(key)}"] = value # Create DataSegment and convert to RheoData segment = DataSegment( segment_index=seg_idx, test_mode=detected_mode, x_data=x_data, y_data=y_data, x_column=x_col, y_column=y_col, x_units=x_units, y_units=y_units, is_complex=is_complex, metadata=seg_metadata, ) rheo_data = segment_to_rheodata(segment, validate=validate) rheo_data_list.append(rheo_data) logger.debug( "RheoData created", result_index=res_idx, segment_index=seg_idx, num_points=len(x_data), test_mode=detected_mode, is_complex=is_complex, ) if not rheo_data_list: logger.error("No valid data segments parsed", filepath=str(filepath)) raise ValueError(f"No valid data segments could be parsed from {filepath}") logger.info( "TRIOS JSON load complete", filepath=str(filepath), num_segments=len(rheo_data_list), ) # Return single or list if len(rheo_data_list) == 1 and not return_all_segments and result_index != -1: return rheo_data_list[0] return rheo_data_list
def _snake_case(s: str) -> str: """Convert CamelCase to snake_case.""" result = [] for i, char in enumerate(s): if char.isupper() and i > 0: result.append("_") result.append(char.lower()) return "".join(result)