Source code for rheojax.models.flow.carreau_yasuda

"""Carreau-Yasuda model for non-Newtonian flow.

This module implements the Carreau-Yasuda model, an extension of the Carreau
model with an additional parameter 'a' for better control over the transition
region between Newtonian and power-law behavior (ROTATION test mode).

Theory:
    η(γ̇) = η_∞ + (η_0 - η_∞) [1 + (λγ̇)^a]^((n-1)/a)

    - η_0: Zero-shear viscosity (Newtonian plateau at low γ̇)
    - η_∞: Infinite-shear viscosity (Newtonian plateau at high γ̇)
    - λ: Time constant (characteristic relaxation time)
    - n: Power-law index (controls asymptotic behavior)
    - a: Transition parameter (controls transition width/smoothness)

References:
    - Yasuda, K., et al. (1981). Rheol. Acta 20, 163-178.
"""

from __future__ import annotations

from rheojax.core.jax_config import safe_import_jax

jax, jnp = safe_import_jax()

import numpy as np

from rheojax.core.base import BaseModel, ParameterSet
from rheojax.core.data import RheoData
from rheojax.core.inventory import Protocol
from rheojax.core.registry import ModelRegistry
from rheojax.core.test_modes import DeformationMode, TestMode, detect_test_mode
from rheojax.logging import get_logger, log_fit

# Module logger
logger = get_logger(__name__)


[docs] @ModelRegistry.register( "carreau_yasuda", protocols=[Protocol.FLOW_CURVE], deformation_modes=[DeformationMode.SHEAR], ) class CarreauYasuda(BaseModel): """Carreau-Yasuda model for non-Newtonian flow (ROTATION only). The Carreau-Yasuda model extends the Carreau model with an additional parameter 'a' that provides better control over the transition region between Newtonian and power-law behavior. When a=2, it reduces to the standard Carreau model. Parameters: eta0: Zero-shear viscosity (Pa·s), Newtonian plateau at low γ̇ eta_inf: Infinite-shear viscosity (Pa·s), Newtonian plateau at high γ̇ lambda_: Time constant (s), characteristic relaxation time n: Power-law index (dimensionless), controls asymptotic behavior a: Transition parameter (dimensionless), controls transition width Constitutive Equation: η(γ̇) = η_∞ + (η_0 - η_∞) [1 + (``λ`` γ̇)^a]^((n-1)/a) Special Cases: a = 2: Reduces to Carreau model ``λ`` → 0: Newtonian fluid with η = η_0 n = 1: Newtonian fluid for all shear rates Test Mode: ROTATION (steady shear) only """
[docs] def __init__(self): """Initialize Carreau-Yasuda model.""" super().__init__() self.parameters = ParameterSet() self.parameters.add( name="eta0", value=1000.0, bounds=(1e-3, 1e12), units="Pa·s", description="Zero-shear viscosity", ) self.parameters.add( name="eta_inf", value=0.001, bounds=(1e-6, 1e6), units="Pa·s", description="Infinite-shear viscosity", ) self.parameters.add( name="lambda_", value=1.0, bounds=(1e-6, 1e6), units="s", description="Time constant", ) self.parameters.add( name="n", value=0.5, bounds=(0.01, 1.0), units="dimensionless", description="Power-law index", ) self.parameters.add( name="a", value=2.0, bounds=(0.1, 2.0), units="dimensionless", description="Transition parameter", )
def _fit(self, X: np.ndarray, y: np.ndarray, **kwargs) -> CarreauYasuda: """Fit Carreau-Yasuda parameters to data. Args: X: Shear rate data (γ̇) y: Viscosity data **kwargs: Additional fitting options Returns: self for method chaining """ # P3-FLOW-001: Cache test_mode for Bayesian _resolve_test_mode() consistency self._test_mode = kwargs.get("test_mode", "rotation") with log_fit(logger, model="CarreauYasuda", data_shape=X.shape) as ctx: try: logger.debug( "Starting Carreau-Yasuda model fit", n_points=len(X), gamma_dot_range=(float(np.min(X)), float(np.max(X))), viscosity_range=(float(np.min(y)), float(np.max(y))), ) # Simple heuristic fitting (similar to Carreau) # Sort by shear rate sort_idx = np.argsort(X) X_sorted = X[sort_idx] y_sorted = y[sort_idx] # Estimate plateaus eta0_est = np.max(y_sorted[: len(y_sorted) // 10 + 1]) eta_inf_est = np.min(y_sorted[-len(y_sorted) // 10 :]) logger.debug( "Estimated viscosity plateaus", eta0_est=float(eta0_est), eta_inf_est=float(eta_inf_est), ) # Find characteristic shear rate (midpoint) eta_mid = (eta0_est + eta_inf_est) / 2.0 idx_mid = np.argmin(np.abs(y_sorted - eta_mid)) lambda_est = 1.0 / X_sorted[idx_mid] if X_sorted[idx_mid] > 0 else 1.0 logger.debug( "Estimated time constant from midpoint", eta_mid=float(eta_mid), lambda_est=float(lambda_est), ) # Estimate n from power-law region slope mid_start = len(X_sorted) // 3 mid_end = 2 * len(X_sorted) // 3 if mid_end > mid_start + 1: log_gamma = np.log(np.maximum(X_sorted[mid_start:mid_end], 1e-30)) log_eta = np.log(np.maximum(y_sorted[mid_start:mid_end], 1e-30)) coeffs = np.polyfit(log_gamma, log_eta, 1) n_est = coeffs[0] + 1.0 else: n_est = 0.5 logger.debug( "Estimated power-law index from slope", n_est=float(n_est), ) # Default a to 2.0 (Carreau model) a_est = 2.0 # Clip to bounds eta0_est = np.clip(eta0_est, 1e-3, 1e12) eta_inf_est = np.clip(eta_inf_est, 1e-6, 1e6) lambda_est = np.clip(lambda_est, 1e-6, 1e6) n_est = np.clip(n_est, 0.01, 1.0) a_est = np.clip(a_est, 0.1, 2.0) # Ensure eta0 > eta_inf if eta0_est <= eta_inf_est: eta0_est = eta_inf_est * 10.0 logger.debug( "Adjusted eta0 to ensure eta0 > eta_inf", eta0_adjusted=float(eta0_est), ) self.parameters.set_value("eta0", float(eta0_est)) self.parameters.set_value("eta_inf", float(eta_inf_est)) self.parameters.set_value("lambda_", float(lambda_est)) self.parameters.set_value("n", float(n_est)) self.parameters.set_value("a", float(a_est)) # Add fit results to context for logging ctx["eta0"] = float(eta0_est) ctx["eta_inf"] = float(eta_inf_est) ctx["lambda_"] = float(lambda_est) ctx["n"] = float(n_est) ctx["a"] = float(a_est) logger.info( "Carreau-Yasuda model fit completed", eta0=float(eta0_est), eta_inf=float(eta_inf_est), lambda_=float(lambda_est), n=float(n_est), a=float(a_est), ) return self except Exception as e: logger.error( "Carreau-Yasuda model fit failed", error=str(e), exc_info=True, ) raise def _predict(self, X: np.ndarray, **kwargs) -> np.ndarray: """Predict viscosity for given shear rates. Args: X: Shear rate data (γ̇) Returns: Predicted viscosity η(γ̇) """ eta0 = self.parameters.get_value("eta0") eta_inf = self.parameters.get_value("eta_inf") lambda_ = self.parameters.get_value("lambda_") n = self.parameters.get_value("n") a = self.parameters.get_value("a") # Convert to JAX for computation gamma_dot = jnp.array(X) # Compute viscosity viscosity = self._predict_viscosity(gamma_dot, eta0, eta_inf, lambda_, n, a) # Convert back to numpy return np.array(viscosity)
[docs] def model_function(self, X, params, test_mode=None, **kwargs): """Model function for Bayesian inference. This method is required by BayesianMixin for NumPyro NUTS sampling. It computes predictions given input X and a parameter array. Args: X: Independent variable (shear rate γ̇) params: Array of parameter values [eta0, eta_inf, ``lambda_``, n, a] Returns: Model predictions as JAX array """ # Extract parameters from array (in order they were added to ParameterSet) eta0 = params[0] eta_inf = params[1] lambda_ = params[2] n = params[3] a = params[4] # Flow model only supports ROTATION test mode # Compute prediction using the internal JAX method return self._predict_viscosity(X, eta0, eta_inf, lambda_, n, a)
@staticmethod @jax.jit def _predict_viscosity( gamma_dot: jnp.ndarray, eta0: float, eta_inf: float, lambda_: float, n: float, a: float, ) -> jnp.ndarray: """Compute viscosity using Carreau-Yasuda model. Args: gamma_dot: Shear rate (s^-1) eta0: Zero-shear viscosity (Pa·s) eta_inf: Infinite-shear viscosity (Pa·s) lambda_: Time constant (s) n: Power-law index a: Transition parameter Returns: Viscosity (Pa·s) """ # η(γ̇) = η_∞ + (η_0 - η_∞) [1 + (λγ̇)^a]^((n-1)/a) lambda_gamma = lambda_ * jnp.abs(gamma_dot) factor = jnp.power(1.0 + jnp.power(lambda_gamma, a), (n - 1.0) / a) return eta_inf + (eta0 - eta_inf) * factor @staticmethod @jax.jit def _predict_stress( gamma_dot: jnp.ndarray, eta0: float, eta_inf: float, lambda_: float, n: float, a: float, ) -> jnp.ndarray: """Compute shear stress using Carreau-Yasuda model. Args: gamma_dot: Shear rate (s^-1) eta0: Zero-shear viscosity (Pa·s) eta_inf: Infinite-shear viscosity (Pa·s) lambda_: Time constant (s) n: Power-law index a: Transition parameter Returns: Shear stress (Pa) """ # σ(γ̇) = η(γ̇) * γ̇ viscosity = CarreauYasuda._predict_viscosity( gamma_dot, eta0, eta_inf, lambda_, n, a ) return viscosity * jnp.abs(gamma_dot)
[docs] def predict_stress(self, gamma_dot: np.ndarray) -> np.ndarray: """Predict shear stress for given shear rates. Args: gamma_dot: Shear rate data (γ̇) Returns: Predicted shear stress σ(γ̇) """ eta0 = self.parameters.get_value("eta0") eta_inf = self.parameters.get_value("eta_inf") lambda_ = self.parameters.get_value("lambda_") n = self.parameters.get_value("n") a = self.parameters.get_value("a") # Convert to JAX for computation gamma_dot_jax = jnp.array(gamma_dot) # Compute stress stress = self._predict_stress(gamma_dot_jax, eta0, eta_inf, lambda_, n, a) # Convert back to numpy return np.array(stress)
[docs] def predict_rheo( self, rheo_data: RheoData, test_mode: TestMode | None = None, output: str = "viscosity", ) -> RheoData: """Predict rheological response for RheoData. Args: rheo_data: Input rheological data test_mode: Test mode (must be ROTATION) output: Output type ('viscosity' or 'stress') Returns: RheoData with predictions Raises: ValueError: If test mode is not ROTATION """ # Detect test mode if not provided if test_mode is None: test_mode = detect_test_mode(rheo_data) # Validate test mode if test_mode != TestMode.ROTATION: raise ValueError( f"Carreau-Yasuda model only supports ROTATION test mode, got {test_mode}" ) # Get shear rate data gamma_dot = rheo_data.x # Get parameters eta0 = self.parameters.get_value("eta0") eta_inf = self.parameters.get_value("eta_inf") lambda_ = self.parameters.get_value("lambda_") n = self.parameters.get_value("n") a = self.parameters.get_value("a") # Convert to JAX gamma_dot_jax = jnp.array(gamma_dot) # Compute prediction based on output type if output == "viscosity": y_pred = self._predict_viscosity( gamma_dot_jax, eta0, eta_inf, lambda_, n, a ) y_units = "Pa·s" elif output == "stress": y_pred = self._predict_stress(gamma_dot_jax, eta0, eta_inf, lambda_, n, a) y_units = "Pa" else: raise ValueError( f"Invalid output type: {output}. Must be 'viscosity' or 'stress'" ) # Convert back to numpy y_pred = np.array(y_pred) # Create output RheoData return RheoData( x=np.array(gamma_dot), y=y_pred, x_units=rheo_data.x_units or "1/s", y_units=y_units, domain="time", metadata={ "model": "CarreauYasuda", "test_mode": TestMode.ROTATION, "output": output, "eta0": eta0, "eta_inf": eta_inf, "lambda_": lambda_, "n": n, "a": a, }, validate=False, )
[docs] def __repr__(self) -> str: """String representation.""" eta0 = self.parameters.get_value("eta0") eta_inf = self.parameters.get_value("eta_inf") lambda_ = self.parameters.get_value("lambda_") n = self.parameters.get_value("n") a = self.parameters.get_value("a") return ( f"CarreauYasuda(eta0={eta0:.3e}, eta_inf={eta_inf:.3e}, " f"lambda={lambda_:.3e}, n={n:.3f}, a={a:.3f})" )
__all__ = ["CarreauYasuda"]