diff --git a/src/pymatlib/core/data_handler.py b/src/pymatlib/core/data_handler.py index 68bd7f71493fd759e8191298e5b1dcbcd3d8c8f5..132c67951e6a53e8de42e9520e521bec26b8295c 100644 --- a/src/pymatlib/core/data_handler.py +++ b/src/pymatlib/core/data_handler.py @@ -1,564 +1,230 @@ import os +import logging import numpy as np -from pathlib import Path -from typing import Union, Tuple, Dict, Any +from typing import Union, Tuple, Dict from matplotlib import pyplot as plt import pandas as pd +logger = logging.getLogger(__name__) -def print_results(file_path: str, temperatures: np.ndarray, material_property: np.ndarray) -> None: - """ - Prints the results of the test. - - Parameters: - file_path (str): The path to the data file. - temperatures (np.ndarray): Array of temperatures. - material_property (np.ndarray): Array of material properties. - """ - print(f"File Path: {file_path}") - print(f"Temperatures: {temperatures}") - print(f"Material Properties: {material_property}") - print("-" * 40) - - -def read_data_from_txt(file_path: str, header: bool = True) -> Tuple[np.ndarray, np.ndarray]: - """ - Reads temperature and property data from a txt file. - - Args: - file_path (str): The path to the txt file. - header (bool): Indicates if the file contains a header row. - - Returns: - Tuple[np.ndarray, np.ndarray]: Temperature and property arrays. - - Raises: - ValueError: If: - - Data has incorrect number of columns - - Data contains NaN values - - Data contains duplicate temperature entries - """ - print(f"Reading data from txt file: {file_path}") - data = np.loadtxt(file_path, dtype=float, skiprows=1 if header else 0) - - if data.ndim != 2 or data.shape[1] != 2: - raise ValueError("Data should have exactly two columns") - - temp = data[:, 0] - prop = data[:, 1] - - # Check for NaN values - if np.any(np.isnan(temp)) or np.any(np.isnan(prop)): - nan_rows = np.where(np.isnan(temp) | np.isnan(prop))[0] + 1 - raise ValueError(f"Data contains NaN values in rows: {', '.join(map(str, nan_rows))}") - - # Check for duplicate temperatures - unique_temp, counts = np.unique(temp, return_counts=True) - duplicates = unique_temp[counts > 1] - if len(duplicates) > 0: - duplicate_rows = [str(idx + 1) for idx, value in enumerate(temp) if value in duplicates] - raise ValueError(f"Duplicate temperature entries found in rows: {', '.join(duplicate_rows)}") - - return temp, prop - -def read_data_from_excel(file_path: str, temp_col: str, prop_col: str) -> Tuple[np.ndarray, np.ndarray]: - """ - Reads temperature and property data from specific columns in an Excel file. - - Args: - file_path (str): Path to the Excel file - temp_col (str): Column name/letter for temperature data - prop_col (str): Column name/letter for property data - - Returns: - Tuple[np.ndarray, np.ndarray]: Temperature and property arrays - - Raises: - ValueError: If: - - Required columns are not found - - Data contains NaN values - - Data contains duplicate temperature entries - """ - print(f"Reading data from Excel file: {file_path}") - - # Read specific columns from Excel - df = pd.read_excel(file_path) - - # Convert to numpy arrays - temp = df[temp_col].to_numpy() - prop = df[prop_col].to_numpy() - - # Check for NaN values - if np.any(np.isnan(temp)) or np.any(np.isnan(prop)): - nan_rows = np.where(np.isnan(temp) | np.isnan(prop))[0] + 1 - raise ValueError(f"Data contains NaN values in rows: {', '.join(map(str, nan_rows))}") - - # Check for duplicate temperatures - unique_temp, counts = np.unique(temp, return_counts=True) - duplicates = unique_temp[counts > 1] - if len(duplicates) > 0: - duplicate_rows = [str(idx + 1) for idx, value in enumerate(temp) if value in duplicates] - raise ValueError(f"Duplicate temperature entries found in rows: {', '.join(duplicate_rows)}") - - return temp, prop - -def read_data_from_file1(file_config: Union[str, Dict], header: bool = True) -> Tuple[np.ndarray, np.ndarray]: +def read_data_from_file(file_config: Dict, header: bool = True) -> Tuple[np.ndarray, np.ndarray]: """ - Reads temperature and property data from a file based on configuration. - + Reads temperature and property data from a file with robust missing value handling. Args: - file_config: Either a path string or a dictionary containing file configuration - If dictionary, required keys: - - file: Path to data file - - temp_col: Temperature column name/index - - prop_col: Property column name/index - If string, treated as direct file path - header (bool): Indicates if the file contains a header row. - + file_config (Dict): Dictionary containing file configuration with keys: + - file_path: Path to data file + - temperature_header: Temperature column name/index + - value_header: Property column name/index + header (bool): Indicates if the file contains a header row Returns: Tuple[np.ndarray, np.ndarray]: Temperature and property arrays - Raises: - ValueError: If: - - For txt files: Data has incorrect number of columns (must be exactly 2) - - Data contains NaN values - - Data contains duplicate temperature entries - """ - # Handle string (direct path) or dictionary configuration - if isinstance(file_config, str): - #print('string') - file_path = file_config - # For direct file paths, assume first two columns are temperature and property - temp_col = 0 - prop_col = 1 - else: - #print('dict') - file_path = file_config['file_path'] - temp_col = file_config['temperature_header'] - prop_col = file_config['value_header'] - #temp_col = file_config.get('temp_col', 0) - #prop_col = file_config.get('prop_col', 1) - - print(f"Reading data from file: {file_path}") - - if file_path.endswith('.xlsx'): - df = pd.read_excel(file_path, header=0 if header else None) - elif file_path.endswith('.csv'): - # Use pandas read_csv for CSV files - df = pd.read_csv(file_path, header=0 if header else None) - else: - # For txt files, assume columns are space/tab separated - data = np.loadtxt(file_path, dtype=float, skiprows=1 if header else 0) - - # Check for correct dimensions - only for txt files when using direct path - if isinstance(file_config, str) and (data.ndim != 2 or data.shape[1] != 2): - raise ValueError("Data should have exactly two columns") - elif data.ndim != 2: - raise ValueError("Data should be two-dimensional") - - # Handle both column name (which would be an index for txt files) and column index - if isinstance(temp_col, int): - if temp_col >= data.shape[1]: - raise ValueError(f"Temperature column index {temp_col} out of bounds (file has {data.shape[1]} columns)") - temp = data[:, temp_col] - else: - temp = data[:, 0] # Default to first column - - if isinstance(prop_col, int): - if prop_col >= data.shape[1]: - raise ValueError(f"Property column index {prop_col} out of bounds (file has {data.shape[1]} columns)") - prop = data[:, prop_col] + ValueError: If data validation fails or missing values cannot be handled + FileNotFoundError: If the specified file doesn't exist + """ + # Extract configuration + file_path = file_config['file_path'] + temp_col = file_config['temperature_header'] + prop_col = file_config['value_header'] + # Validate file exists + if not os.path.exists(file_path): + raise FileNotFoundError(f"File not found: {file_path}") + logger.info(f"Reading data from file: {file_path}") + NA_VALUES = ('', ' ', ' ', ' ', 'nan', 'NaN', 'NULL', 'null', 'N/A', 'n/a', 'NA') + try: + # Read file based on extension with comprehensive missing value handling + if file_path.endswith('.xlsx'): + df = pd.read_excel(file_path, header=0 if header else None, na_values=NA_VALUES) + elif file_path.endswith('.csv'): + df = pd.read_csv(file_path, header=0 if header else None, na_values=NA_VALUES) else: - prop = data[:, 1] # Default to second column - - # Check for NaN values - if np.any(np.isnan(temp)) or np.any(np.isnan(prop)): - nan_rows = np.where(np.isnan(temp) | np.isnan(prop))[0] + 1 - raise ValueError(f"Data contains NaN values in rows: {', '.join(map(str, nan_rows))}") - - # Check for duplicate temperatures - unique_temp, counts = np.unique(temp, return_counts=True) - duplicates = unique_temp[counts > 1] - if len(duplicates) > 0: - duplicate_rows = [str(idx + 1) for idx, value in enumerate(temp) if value in duplicates] - raise ValueError(f"Duplicate temperature entries found in rows: {', '.join(duplicate_rows)}") - - return temp, prop - - # Process pandas DataFrame (for both Excel and CSV) - # Handle both column name (string) and column index (integer) - if isinstance(temp_col, str): - if temp_col not in df.columns: - raise ValueError(f"Temperature column '{temp_col}' not found in file") - temp = df[temp_col].to_numpy(dtype=np.float64) - else: - if temp_col >= len(df.columns): - raise ValueError(f"Temperature column index {temp_col} out of bounds (file has {len(df.columns)} columns)") - temp = df.iloc[:, temp_col].to_numpy(dtype=np.float64) - - if isinstance(prop_col, str): - if prop_col not in df.columns: - raise ValueError(f"Property column '{prop_col}' not found in file") - prop = df[prop_col].to_numpy(dtype=np.float64) - else: - if prop_col >= len(df.columns): - raise ValueError(f"Property column index {prop_col} out of bounds (file has {len(df.columns)} columns)") - prop = df.iloc[:, prop_col].to_numpy(dtype=np.float64) - - # Check for NaN values - if np.any(np.isnan(temp)) or np.any(np.isnan(prop)): - nan_rows = np.where(np.isnan(temp) | np.isnan(prop))[0] + 1 - raise ValueError(f"Data contains NaN values in rows: {', '.join(map(str, nan_rows))}") - - # Check for duplicate temperatures - unique_temp, counts = np.unique(temp, return_counts=True) - duplicates = unique_temp[counts > 1] - if len(duplicates) > 0: - duplicate_rows = [str(idx + 1) for idx, value in enumerate(temp) if value in duplicates] - raise ValueError(f"Duplicate temperature entries found in rows: {', '.join(duplicate_rows)}") - + # Handle text files + return _handle_text_file(file_path, header, temp_col, prop_col) + except Exception as e: + raise ValueError(f"Error reading file {file_path}: {str(e)}") + # Extract data from DataFrame with missing value handling + temp, prop = _extract_dataframe_columns(df, temp_col, prop_col, file_path) + # Validate and clean the data + temp, prop = _validate_and_clean_data(temp, prop, file_path) return temp, prop -def read_data_from_file(file_config: Union[str, Dict], header: bool = True) -> Tuple[np.ndarray, np.ndarray]: - """ - Reads temperature and property data from a file based on configuration. - - Args: - file_config: Either a path string or a dictionary containing file configuration - If string (direct path): - - File must have exactly 2 columns - - First column is temperature, second is property - If dictionary: - - file: Path to data file - - temp_col: Temperature column name/index - - prop_col: Property column name/index - header (bool): Indicates if the file contains a header row. - - Returns: - Tuple[np.ndarray, np.ndarray]: Temperature and property arrays - - Raises: - ValueError: If: - - For direct path: Data doesn't have exactly two columns - - For dictionary config: Specified column names don't match headers - - Data contains NaN values - - Data contains duplicate temperature entries - """ - # Handle string (direct path) or dictionary configuration - if isinstance(file_config, str): - file_path = file_config - direct_path = True - # For direct file paths, assume first two columns are temperature and property - temp_col = 0 - prop_col = 1 - else: - file_path = file_config['file_path'] - direct_path = False - temp_col = file_config['temperature_header'] - prop_col = file_config['value_header'] - - print(f"Reading data from file: {file_path}") - - if file_path.endswith('.xlsx'): - df = pd.read_excel(file_path, header=0 if header else None) - elif file_path.endswith('.csv'): - # Use pandas read_csv for CSV files - df = pd.read_csv(file_path, header=0 if header else None) - else: - # For txt files +def _handle_text_file(file_path: str, header: bool, temp_col: Union[str, int], prop_col: Union[str, int]) -> Tuple[np.ndarray, np.ndarray]: + """Handle text file reading with missing value support.""" + NA_VALUES = ('', ' ', ' ', ' ', 'nan', 'NaN', 'NULL', 'null', 'N/A', 'n/a', 'NA') + try: if header: - # Read the header line to get column names + # Read header to get column names with open(file_path, 'r') as f: header_line = f.readline().strip() column_names = header_line.split() - - # Now read the data - data = np.loadtxt(file_path, dtype=float, skiprows=1) - - # Direct path case - check for exactly 2 columns - if direct_path: - if data.shape[1] != 2: - raise ValueError(f"Data should have exactly two columns, but found {data.shape[1]} columns") - temp = data[:, 0] - prop = data[:, 1] - # Dictionary case - match column names - else: - # Handle temperature column - if isinstance(temp_col, str): - if temp_col in column_names: - temp_idx = column_names.index(temp_col) - else: - raise ValueError(f"Temperature column '{temp_col}' not found in file. " - f"Available columns: {', '.join(column_names)}") - else: - if temp_col >= data.shape[1]: - raise ValueError(f"Temperature column index {temp_col} out of bounds (file has {data.shape[1]} columns)") - temp_idx = temp_col - - # Handle property column - if isinstance(prop_col, str): - if prop_col in column_names: - prop_idx = column_names.index(prop_col) - else: - raise ValueError(f"Property column '{prop_col}' not found in file. " - f"Available columns: {', '.join(column_names)}") - else: - if prop_col >= data.shape[1]: - raise ValueError(f"Property column index {prop_col} out of bounds (file has {data.shape[1]} columns)") - prop_idx = prop_col - - temp = data[:, temp_idx] - prop = data[:, prop_idx] + # Read data, treating various strings as NaN + data = pd.read_csv(file_path, sep=r'\s+', skiprows=1, header=None, na_values=NA_VALUES).values + # Handle column name/index mapping + temp_idx = _get_column_index(temp_col, column_names, data.shape[1], "temperature") + prop_idx = _get_column_index(prop_col, column_names, data.shape[1], "property") + print(f"Using temperature column: {temp_col} (index {temp_idx}), ") + print(f"Using property column: {prop_col} (index {prop_idx})") + # quit() else: - # No header - data = np.loadtxt(file_path, dtype=float, skiprows=0) - - # Direct path case - check for exactly 2 columns - if direct_path: - if data.shape[1] != 2: - raise ValueError(f"Data should have exactly two columns, but found {data.shape[1]} columns") - temp = data[:, 0] - prop = data[:, 1] - # Dictionary case - use column indices - else: - if isinstance(temp_col, str): - raise ValueError(f"Column name '{temp_col}' specified, but file has no header row") - if isinstance(prop_col, str): - raise ValueError(f"Column name '{prop_col}' specified, but file has no header row") - - if temp_col >= data.shape[1]: - raise ValueError(f"Temperature column index {temp_col} out of bounds (file has {data.shape[1]} columns)") - if prop_col >= data.shape[1]: - raise ValueError(f"Property column index {prop_col} out of bounds (file has {data.shape[1]} columns)") - - temp = data[:, temp_col] - prop = data[:, prop_col] - - # Check for NaN values - if np.any(np.isnan(temp)) or np.any(np.isnan(prop)): - nan_rows = np.where(np.isnan(temp) | np.isnan(prop))[0] + 1 - raise ValueError(f"Data contains NaN values in rows: {', '.join(map(str, nan_rows))}") - - # Check for duplicate temperatures - unique_temp, counts = np.unique(temp, return_counts=True) - duplicates = unique_temp[counts > 1] - if len(duplicates) > 0: - duplicate_rows = [str(idx + 1) for idx, value in enumerate(temp) if value in duplicates] - raise ValueError(f"Duplicate temperature entries found in rows: {', '.join(duplicate_rows)}") - + # No header case + data = pd.read_csv(file_path, sep=r'\s+', header=None, na_values=NA_VALUES).values + if isinstance(temp_col, str) or isinstance(prop_col, str): + raise ValueError("Column names specified, but file has no header row") + temp_idx, prop_idx = temp_col, prop_col + temp = data[:, temp_idx] + prop = data[:, prop_idx] return temp, prop + except Exception as e: + raise ValueError(f"Error processing text file: {str(e)}") - # Process pandas DataFrame (for both Excel and CSV) - # Handle both column name (string) and column index (integer) + +def _extract_dataframe_columns(df: pd.DataFrame, temp_col: Union[str, int], prop_col: Union[str, int], + file_path: str) -> Tuple[np.ndarray, np.ndarray]: + """Extract temperature and property columns from DataFrame with robust error handling.""" + # Handle temperature column if isinstance(temp_col, str): - if temp_col in df.columns: - temp = df[temp_col].to_numpy(dtype=np.float64) - else: - raise ValueError(f"Temperature column '{temp_col}' not found in file. " - f"\n -> Available columns: {', '.join(df.columns)}") + if temp_col not in df.columns: + available_cols = ', '.join(df.columns.astype(str)) + raise ValueError(f"Temperature column '{temp_col}' not found in file {file_path}. " + f"Available columns: {available_cols}") + temp_series = df[temp_col] else: if temp_col >= len(df.columns): - raise ValueError(f"Temperature column index {temp_col} out of bounds (file has {len(df.columns)} columns)") - temp = df.iloc[:, temp_col].to_numpy(dtype=np.float64) - + raise ValueError(f"Temperature column index {temp_col} out of bounds " + f"(file has {len(df.columns)} columns)") + temp_series = df.iloc[:, temp_col] + # Handle property column if isinstance(prop_col, str): - if prop_col in df.columns: - prop = df[prop_col].to_numpy(dtype=np.float64) - else: - raise ValueError(f"Property column '{prop_col}' not found in file. " - f"\n -> Available columns: {', '.join(df.columns)}") + if prop_col not in df.columns: + available_cols = ', '.join(df.columns.astype(str)) + raise ValueError(f"Property column '{prop_col}' not found in file {file_path}. " + f"Available columns: {available_cols}") + prop_series = df[prop_col] else: if prop_col >= len(df.columns): - raise ValueError(f"Property column index {prop_col} out of bounds (file has {len(df.columns)} columns)") - prop = df.iloc[:, prop_col].to_numpy(dtype=np.float64) + raise ValueError(f"Property column index {prop_col} out of bounds " + f"(file has {len(df.columns)} columns)") + prop_series = df.iloc[:, prop_col] + # Convert to numeric, coercing errors to NaN + temp = pd.to_numeric(temp_series, errors='coerce').to_numpy(dtype=np.float64) + prop = pd.to_numeric(prop_series, errors='coerce').to_numpy(dtype=np.float64) + return temp, prop - # Check for NaN values - if np.any(np.isnan(temp)) or np.any(np.isnan(prop)): - nan_rows = np.where(np.isnan(temp) | np.isnan(prop))[0] + 1 - raise ValueError(f"Data contains NaN values in rows: {', '.join(map(str, nan_rows))}") +def _get_column_index(col_identifier: Union[str, int], column_names: list, + num_cols: int, col_type: str) -> int: + """Get column index from name or validate numeric index.""" + if isinstance(col_identifier, str): + if col_identifier in column_names: + return column_names.index(col_identifier) + else: + raise ValueError(f"{col_type.capitalize()} column '{col_identifier}' not found. " + f"Available columns: {', '.join(column_names)}") + else: + if col_identifier >= num_cols: + raise ValueError(f"{col_type.capitalize()} column index {col_identifier} " + f"out of bounds (file has {num_cols} columns)") + return col_identifier + + +def _validate_and_clean_data(temp: np.ndarray, prop: np.ndarray, file_path: str) -> Tuple[np.ndarray, np.ndarray]: + """Validate data quality and handle missing values appropriately.""" + # Check for completely empty arrays + if len(temp) == 0 or len(prop) == 0: + raise ValueError(f"No valid data found in file: {file_path}") + # Identify rows with missing values + temp_nan_mask = np.isnan(temp) + prop_nan_mask = np.isnan(prop) + any_nan_mask = temp_nan_mask | prop_nan_mask + # Report missing values if found + if np.any(any_nan_mask): + nan_count = np.sum(any_nan_mask) + total_count = len(temp) + nan_percentage = (nan_count / total_count) * 100 + logger.warning(f"Found {nan_count} rows ({nan_percentage:.1f}%) with missing values in {file_path}") + # If too many missing values, raise an error + if nan_percentage > 50: + raise ValueError(f"Too many missing values ({nan_percentage:.1f}%) in file: {file_path}. " + "Please clean the data or check file format.") + # Remove rows with missing values + valid_mask = ~any_nan_mask + temp = temp[valid_mask] + prop = prop[valid_mask] + logger.info(f"Removed {nan_count} rows with missing values. " + f"Remaining data points: {len(temp)}") + # Final validation + if len(temp) < 2: + raise ValueError(f"Insufficient valid data points ({len(temp)}) after cleaning missing values") # Check for duplicate temperatures unique_temp, counts = np.unique(temp, return_counts=True) duplicates = unique_temp[counts > 1] if len(duplicates) > 0: - duplicate_rows = [str(idx + 1) for idx, value in enumerate(temp) if value in duplicates] - raise ValueError(f"Duplicate temperature entries found in rows: {', '.join(duplicate_rows)}") - + duplicate_indices = [] + for dup_temp in duplicates: + indices = np.where(temp == dup_temp)[0] + duplicate_indices.extend(indices[1:]) # Keep first occurrence + logger.warning(f"Found {len(duplicate_indices)} duplicate temperature entries. " + f"Removing duplicates.") + # Remove duplicates + keep_mask = np.ones(len(temp), dtype=bool) + keep_mask[duplicate_indices] = False + temp = temp[keep_mask] + prop = prop[keep_mask] + # Ensure strictly increasing temperature order + if not np.all(np.diff(temp) > 0): + # Sort by temperature + sort_indices = np.argsort(temp) + temp = temp[sort_indices] + prop = prop[sort_indices] + logger.info("Data sorted by temperature for consistency") return temp, prop -def celsius_to_kelvin(temp: Union[float, np.ndarray]) -> Union[float, np.ndarray]: +def check_monotonicity(arr: np.ndarray, name: str = "Array", + mode: str = "strictly_increasing", + threshold: float = 1e-10, + raise_error: bool = True) -> bool: """ - Converts Celsius temperatures to Kelvin. - - Parameters: - temp (Union[float, np.ndarray]): Temperature(s) in Celsius. - - Returns: - Union[float, np.ndarray]: Temperature(s) in Kelvin. - """ - return temp + 273.15 - - -def fahrenheit_to_kelvin(temp: Union[float, np.ndarray]) -> Union[float, np.ndarray]: - """ - Converts Fahrenheit temperatures to Kelvin. - - Parameters: - temp (Union[float, np.ndarray]): Temperature(s) in Fahrenheit. - - Returns: - Union[float, np.ndarray]: Temperature(s) in Kelvin. - """ - return celsius_to_kelvin((temp - 32.0) * (5.0 / 9.0)) - - -# Moved from interpolators.py to data_handler.py -def check_equidistant(temp: np.ndarray, rtol: float = 1e-5, atol: float = 1e-10) -> float: - """ - Tests if the temperature values are equidistant. - - :param temp: Array of temperature values. - :param rtol: Relative tolerance for comparison. - :param atol: Absolute tolerance for comparison. - :return: The common difference if equidistant, otherwise 0. - """ - if len(temp) < 2: - raise ValueError(f"Array has length < 2") - - temperature_diffs = np.diff(temp) - if np.allclose(temperature_diffs, temperature_diffs[0], rtol=rtol, atol=atol): - return float(temperature_diffs[0]) - return 0.0 - - -def check_strictly_increasing(arr, name="Array", threshold=1e-10, raise_error=True): - """ - Check if array is strictly monotonically increasing. - + Universal monotonicity checker supporting multiple modes. Args: arr: numpy array to check name: name of array for reporting + mode: 'strictly_increasing', 'non_decreasing', 'strictly_decreasing', 'non_increasing' threshold: minimum required difference between consecutive elements raise_error: if True, raises ValueError; if False, returns False on failure - - Returns: - bool: True if array is strictly increasing, False otherwise (if raise_error=False) - - Raises: - ValueError: If array is not strictly increasing and raise_error=True """ for i in range(1, len(arr)): diff = arr[i] - arr[i-1] - if diff <= threshold: - # Prepare error message with context + # Check condition based on mode + violation = False + if mode == "strictly_increasing" and diff <= threshold: + violation = True + elif mode == "non_decreasing" and diff < -threshold: + violation = True + elif mode == "strictly_decreasing" and diff >= -threshold: + violation = True + elif mode == "non_increasing" and diff > threshold: + violation = True + if violation: + # Create detailed error message start_idx = max(0, i-2) end_idx = min(len(arr), i+3) context = "\nSurrounding values:\n" for j in range(start_idx, end_idx): context += f"Index {j}: {arr[j]:.10e}\n" - error_msg = ( - f"{name} is not strictly increasing at index {i}:\n" + f"{name} is not {mode.replace('_', ' ')} at index {i}:\n" f"Previous value ({i-1}): {arr[i-1]:.10e}\n" - f"Current value ({i}): {arr[i]:.10e}\n" + f"Current value ({i}): {arr[i]:.10e}\n" f"Difference: {diff:.10e}\n" f"{context}" ) - if raise_error: raise ValueError(error_msg) else: print(f"Warning: {error_msg}") return False - - print(f"{name} is strictly monotonically increasing") + logger.info(f"{name} is {mode.replace('_', ' ')}") return True - - -def find_min_max_temperature(temperatures_input) -> tuple: - """ - Find the minimum and maximum temperature from either a text file or a NumPy array. - - Args: - temperatures_input (str or np.ndarray): - - If str, it is the path to the text file. - - If np.ndarray, it is the array of temperatures. - - Returns: - tuple: A tuple containing (min_temperature, max_temperature). - """ - try: - # Case 1: Input is a file path - if isinstance(temperatures_input, str): - temperatures = [] - with open(temperatures_input, 'r') as file: - for line in file: - # Skip empty lines or non-data lines - if not line.strip() or not line[0].isdigit(): - continue - - # Split the line and extract the first column (temperature) - parts = line.split() - temperature = float(parts[0]) - temperatures.append(temperature) - - # Case 2: Input is a NumPy array - elif isinstance(temperatures_input, np.ndarray): - temperatures = temperatures_input.tolist() - - else: - raise TypeError("Input must be either a file path (str) or a numpy array.") - - # Get min and max temperatures - min_temperature = min(temperatures) - max_temperature = max(temperatures) - - return min_temperature, max_temperature - - except FileNotFoundError: - raise FileNotFoundError(f"The file '{temperatures_input}' does not exist.") - except Exception as e: - raise ValueError(f"An error occurred while processing the input: {e}") - - -def plot_arrays(x_arr: np.ndarray, y_arr: np.ndarray, x_label: str = None, y_label: str = None) -> None: - # Set labels and titles - x_label = x_label or "x-axis" - y_label = y_label or "y-axis" - - # Create the plot - plt.figure(figsize=(10, 6)) - plt.plot(x_arr, y_arr, 'b-', linewidth=1) - plt.xlabel(x_label) - plt.ylabel(y_label) - plt.title(f'{y_label} vs {x_label}') - plt.grid(True) - - # Define filename and directory - filename = f"{y_label.replace('/', '_')}_vs_{x_label.replace('/', '_')}.png" - directory = "plots" - os.makedirs(directory, exist_ok=True) # Ensure the directory exists - - filepath = os.path.join(directory, filename) - plt.savefig(filepath, dpi=300, bbox_inches="tight") - plt.show() - print(f"Plot saved as {filepath}") - - -if __name__ == '__main__': - # Example usage: - # 1. Using a file path - base_dir = Path(__file__).parent # Directory of the current file - _file_path = str( base_dir / '..' / 'data' / 'alloys' / 'SS304L' / 'density_temperature.txt' ) - min_temp, max_temp = find_min_max_temperature(_file_path) - print(f"Minimum Temperature from file: {min_temp}") - print(f"Maximum Temperature from file: {max_temp}") - - # 2. Using a numpy array - temperature_array = np.array([3300, 500, 800, 1000, 1500]) - min_temp, max_temp = find_min_max_temperature(temperature_array) - print(f"Minimum Temperature from array: {min_temp}") - print(f"Maximum Temperature from array: {max_temp}") diff --git a/src/pymatlib/core/yaml_parser/common_utils.py b/src/pymatlib/core/yaml_parser/common_utils.py index 5925710ccf97d257dbf0c6cdd05041409712e87a..3c32ba905e199dcc7926ed2771e362962a1214ea 100644 --- a/src/pymatlib/core/yaml_parser/common_utils.py +++ b/src/pymatlib/core/yaml_parser/common_utils.py @@ -1,15 +1,17 @@ import logging -from typing import List, Tuple, Union +from typing import List, Literal, Tuple, Union import numpy as np import pwlf import sympy as sp +from pymatlib.core.data_handler import check_monotonicity from pymatlib.core.yaml_parser.yaml_keys import CONSTANT_KEY, REGRESSION_KEY, SIMPLIFY_KEY, DEGREE_KEY, \ SEGMENTS_KEY, EXTRAPOLATE_KEY logger = logging.getLogger(__name__) + # --- Shared/Utility Methods --- def ensure_ascending_order(temp_array: np.ndarray, *value_arrays: np.ndarray) -> Tuple[np.ndarray, ...]: """Ensure temperature array is in ascending order, flipping all provided arrays if needed.""" @@ -27,6 +29,7 @@ def ensure_ascending_order(temp_array: np.ndarray, *value_arrays: np.ndarray) -> else: raise ValueError(f"Array is not strictly ascending or strictly descending: {temp_array}") + def validate_temperature_range( prop: str, temp_array: np.ndarray, @@ -58,6 +61,7 @@ def validate_temperature_range( f"\n -> Found {len(out_of_range)} out-of-range values at indices {out_of_range}: {out_values}" ) + def interpolate_value( T: float, x_array: np.ndarray, @@ -95,6 +99,7 @@ def interpolate_value( else: return float(np.interp(T, x_array, y_array)) + def process_regression_params( prop_config: dict, prop_name: str, @@ -125,23 +130,49 @@ def _process_regression(temp_array, prop_array, T, lower_bound_type, upper_bound v_pwlf.fit(n_segments=segments) return sp.Piecewise(*get_symbolic_conditions(v_pwlf, T, lower_bound_type, upper_bound_type)) -def create_raw_piecewise(temp_array: np.ndarray, prop_array: np.ndarray, T: sp.Symbol, lower: str = Union['constant', 'extrapolate'], upper: str = Union['constant', 'extrapolate'],): - logger.debug("""create_raw_piecewise: - temp_array: %r - prop_array: %r - T: %r - lower: %r - upper: %r""", - temp_array.shape if temp_array is not None else None, - prop_array.shape if prop_array is not None else None, - T, lower, upper) - if temp_array is None or len(temp_array) == 0: raise ValueError("Temperature array is empty") - if temp_array[0] > temp_array[-1]: raise ValueError("Temperature array is not in ascending order.") - conditions = [((prop_array[0] if lower==CONSTANT_KEY else prop_array[0]+(prop_array[1]-prop_array[0])/(temp_array[1]-temp_array[0])*(T-temp_array[0])), T<temp_array[0])] + \ - [(prop_array[i]+(prop_array[i+1]-prop_array[i])/(temp_array[i+1]-temp_array[i])*(T-temp_array[i]), sp.And(T>=temp_array[i], T<temp_array[i+1])) for i in range(len(temp_array)-1)] + \ - [((prop_array[-1] if upper==CONSTANT_KEY else prop_array[-1]+(prop_array[-1]-prop_array[-2])/(temp_array[-1]-temp_array[-2])*(T-temp_array[-1])), T>=temp_array[-1])] + +def create_raw_piecewise( + temp_array: np.ndarray, + prop_array: np.ndarray, + T: sp.Symbol, + lower: Literal['constant', 'extrapolate'], + upper: Literal['constant', 'extrapolate']) -> sp.Piecewise: + """Create a basic piecewise linear interpolation function.""" + # Validation + if temp_array is None or len(temp_array) == 0: + raise ValueError("Temperature array is empty") + if temp_array[0] > temp_array[-1]: + raise ValueError("Temperature array is not in ascending order.") + conditions = [] + # Handle lower bound (T < temp_array[0]) + if lower == CONSTANT_KEY: + lower_expr = prop_array[0] + else: # extrapolate + if len(temp_array) > 1: + slope = (prop_array[1] - prop_array[0]) / (temp_array[1] - temp_array[0]) + lower_expr = prop_array[0] + slope * (T - temp_array[0]) + else: + lower_expr = prop_array[0] + conditions.append((lower_expr, T < temp_array[0])) + # Handle main interpolation segments + for i in range(len(temp_array) - 1): + slope = (prop_array[i+1] - prop_array[i]) / (temp_array[i+1] - temp_array[i]) + expr = prop_array[i] + slope * (T - temp_array[i]) + condition = sp.And(T >= temp_array[i], T < temp_array[i+1]) + conditions.append((expr, condition)) + # Handle upper bound (T >= temp_array[-1]) + if upper == CONSTANT_KEY: + upper_expr = prop_array[-1] + else: # extrapolate + if len(temp_array) > 1: + slope = (prop_array[-1] - prop_array[-2]) / (temp_array[-1] - temp_array[-2]) + upper_expr = prop_array[-1] + slope * (T - temp_array[-1]) + else: + upper_expr = prop_array[-1] + conditions.append((upper_expr, T >= temp_array[-1])) return sp.Piecewise(*conditions) + def create_piecewise_from_formulas( temp_points: np.ndarray, eqn_exprs: List[Union[str, sp.Expr]], @@ -195,7 +226,8 @@ def create_piecewise_from_formulas( conditions.append((processed_exprs[-1].subs(T, temp_points[-1]), T >= temp_points[-1])) return sp.Piecewise(*conditions) -#https://github.com/cjekel/piecewise_linear_fit_py/blob/master/examples/understanding_higher_degrees/polynomials_in_pwlf.ipynb + +# https://github.com/cjekel/piecewise_linear_fit_py/blob/master/examples/understanding_higher_degrees/polynomials_in_pwlf.ipynb def get_symbolic_eqn(pwlf_: pwlf.PiecewiseLinFit, segment_number: int, x: Union[float, sp.Symbol]): if pwlf_.degree < 1: raise ValueError('Degree must be at least 1') @@ -220,6 +252,7 @@ def get_symbolic_eqn(pwlf_: pwlf.PiecewiseLinFit, segment_number: int, x: Union[ else: # For numeric x, just return the equation return my_eqn + def get_symbolic_conditions(pwlf_: pwlf.PiecewiseLinFit, x: sp.Symbol, lower_: str, upper_: str): """Create symbolic conditions for a piecewise function from a pwlf fit.""" conditions = [] @@ -248,3 +281,33 @@ def get_symbolic_conditions(pwlf_: pwlf.PiecewiseLinFit, x: sp.Symbol, lower_: s eqn = get_symbolic_eqn(pwlf_, pwlf_.n_segments, x) conditions.append((eqn.evalf(subs={x: pwlf_.fit_breaks[-1]}), x >= pwlf_.fit_breaks[-1])) return conditions + + +def _validate_energy_density_monotonicity(prop_name: str, temp_array: np.ndarray, prop_array: np.ndarray, tolerance: float = 1e-10) -> None: + """Validate that energy_density is monotonically non-decreasing with temperature.""" + if prop_name != 'energy_density': + return + if len(temp_array) < 2: + return + try: + check_monotonicity(prop_array, "Energy density", "non_decreasing", tolerance, True) + except ValueError as e: + # Add domain-specific context + diffs = np.diff(prop_array) + decreasing_indices = np.where(diffs < -tolerance)[0] + if len(decreasing_indices) > 0: + problem_temps = temp_array[decreasing_indices] + problem_values = prop_array[decreasing_indices] + next_values = prop_array[decreasing_indices + 1] + error_details = [] + for i, (temp, val, next_val) in enumerate(zip(problem_temps[:3], problem_values[:3], next_values[:3])): + decrease = val - next_val + error_details.append(f"T={temp:.2f}K: {val:.6e} → {next_val:.6e} (decrease: {decrease:.6e})") + enhanced_msg = ( + f"Energy density must be monotonically non-decreasing with temperature. " + f"Found {len(decreasing_indices)} decreasing points:\n" + + "\n".join(error_details) + ) + if len(decreasing_indices) > 3: + enhanced_msg += f"\n... and {len(decreasing_indices) - 3} more violations" + raise ValueError(enhanced_msg) from e diff --git a/src/pymatlib/core/yaml_parser/property_processing.py b/src/pymatlib/core/yaml_parser/property_processing.py index 84da70d38efe7343a29ccf8c89e5420f0cfb6b73..0992d0e6a7a4ae44472bdc09173183bf4652a7e9 100644 --- a/src/pymatlib/core/yaml_parser/property_processing.py +++ b/src/pymatlib/core/yaml_parser/property_processing.py @@ -16,7 +16,8 @@ from pymatlib.core.yaml_parser.common_utils import ( get_symbolic_conditions, interpolate_value, process_regression_params, - validate_temperature_range + validate_temperature_range, + _validate_energy_density_monotonicity ) from pymatlib.core.yaml_parser.custom_error import DependencyError, CircularDependencyError from pymatlib.core.yaml_parser.property_types import PropertyType @@ -27,8 +28,10 @@ from pymatlib.core.yaml_parser.yaml_keys import MELTING_TEMPERATURE_KEY, BOILING logger = logging.getLogger(__name__) + seed = 13579 + class PropertyProcessor: """Handles processing of different property types for material objects.""" @@ -43,6 +46,7 @@ class PropertyProcessor: self.visualizer = None self.processed_properties: set = set() + # --- Public API --- def process_properties( self, @@ -96,6 +100,7 @@ class PropertyProcessor: except Exception as e: raise ValueError(f"Failed to process properties \n -> {str(e)}") from e + # --- Property-Type Processing Methods --- def _process_constant_property(self, material: Material, prop_name: str, prop_config: Union[float, str], T: Union[float, sp.Symbol]) -> None: """Process constant float property.""" @@ -121,6 +126,7 @@ class PropertyProcessor: except (ValueError, TypeError) as e: raise ValueError(f"Failed to process constant property \n -> {str(e)}") from e + def _process_latent_heat_constant(self, material: Material, prop_name: str, prop_config: Union[float, str], T: Union[float, sp.Symbol]) -> None: """Process latent heat properties when provided as constants.""" logger.debug("""PropertyProcessor: _process_latent_heat_constant: @@ -171,6 +177,7 @@ class PropertyProcessor: except (ValueError, TypeError) as e: raise ValueError(f"Failed to process {prop_name} constant \n -> {str(e)}") from e + def _process_file_property(self, material: Material, prop_name: str, file_config: Union[str, Dict[str, Any]], T: Union[float, sp.Symbol]) -> None: """Process property data from a file configuration.""" logger.debug("""PropertyProcessor: _process_file_property: @@ -193,6 +200,7 @@ class PropertyProcessor: raise ValueError(f"Non-finite values detected in property '{prop_name}': " + "; ".join(msg)) self._validate_temperature_range(prop_name, temp_array) temp_array, prop_array = ensure_ascending_order(temp_array, prop_array) + _validate_energy_density_monotonicity(prop_name, temp_array, prop_array) lower_bound_type, upper_bound_type = file_config[BOUNDS_KEY] lower_bound = np.min(temp_array) upper_bound = np.max(temp_array) @@ -229,6 +237,7 @@ class PropertyProcessor: except Exception as e: raise ValueError(f"Failed to process file property {prop_name} \n -> {str(e)}") from e + def _process_key_val_property(self, material: Material, prop_name: str, prop_config: Dict[str, Any], T: Union[float, sp.Symbol]) -> None: """Process property defined with key-val pairs.""" logger.debug("""PropertyProcessor: _process_key_val_property: @@ -287,6 +296,7 @@ class PropertyProcessor: if prop_name not in ['latent_heat_of_fusion', 'latent_heat_of_vaporization']: self._validate_temperature_range(prop_name, key_array) key_array, val_array = ensure_ascending_order(key_array, val_array) + _validate_energy_density_monotonicity(prop_name, key_array, val_array) lower_bound_type, upper_bound_type = prop_config[BOUNDS_KEY] lower_bound = np.min(key_array) upper_bound = np.max(key_array) @@ -323,6 +333,7 @@ class PropertyProcessor: except Exception as e: raise ValueError(f"Failed to process key-val property '{prop_name}' \n -> {str(e)}") from e + # --- Key-Value Helpers --- def _process_key_definition(self, key_def: Union[str, List[Union[str, float]]], val_array: List[float], material: Material) -> np.ndarray: """Process temperature key definition.""" @@ -340,6 +351,7 @@ class PropertyProcessor: except Exception as e: raise ValueError(f"Failed to process key definition \n -> {str(e)}") from e + @staticmethod def _process_equidistant_key(key_def: str, n_points: int) -> np.ndarray: """Process equidistant key definition.""" @@ -356,6 +368,7 @@ class PropertyProcessor: except Exception as e: raise ValueError(f"Invalid equidistant format: {key_def} \n -> {str(e)}") from e + @staticmethod def _process_list_key(key_def: List[Union[str, float]], material: Material) -> np.ndarray: """Process list key definition.""" @@ -430,6 +443,7 @@ class PropertyProcessor: except Exception as e: raise ValueError(f"Error processing list key \n -> {str(e)}") from e + def _process_piecewise_equation_property(self, material: Material, prop_name: str, prop_config: Dict[str, Any], T: Union[float, sp.Symbol]) -> None: """Process piecewise equation property.""" logger.debug("""PropertyProcessor: _process_piecewise_equation_property: @@ -437,10 +451,8 @@ class PropertyProcessor: prop_name: %r prop_config: %r T: %r""", material, prop_name, prop_config, T) - temp_points = np.array(prop_config[TEMPERATURE_KEY], dtype=float) eqn_strings = prop_config[EQUATION_KEY] - # Validate that only 'T' is used in equations for eq in eqn_strings: expr = sp.sympify(eq) @@ -448,39 +460,31 @@ class PropertyProcessor: for sym in symbols: if str(sym) != 'T': raise ValueError(f"Unsupported symbol '{sym}' found in equation '{eq}' for property '{prop_name}'. Only 'T' is allowed.") - lower_bound_type, upper_bound_type = prop_config[BOUNDS_KEY] - # Use standard symbol for parsing T_standard = sp.Symbol('T') temp_points, eqn_strings = ensure_ascending_order(temp_points, eqn_strings) - # Parse expressions with standard symbol eqn_exprs = [sp.sympify(eq, locals={'T': T_standard}) for eq in eqn_strings] - # Create piecewise function with standard symbol pw_standard = self._create_piecewise_from_formulas(temp_points, eqn_exprs, T_standard, lower_bound_type, upper_bound_type) - # If T is a different symbol, substitute T_standard with the actual symbol if isinstance(T, sp.Symbol) and str(T) != 'T': pw = pw_standard.subs(T_standard, T) else: pw = pw_standard - # Handle numeric temperature if not isinstance(T, sp.Symbol): value = float(pw_standard.subs(T_standard, T).evalf()) setattr(material, prop_name, sp.Float(value)) self.processed_properties.add(prop_name) return - has_regression, simplify_type, degree, segments = self._process_regression_params(prop_config, prop_name, len(temp_points)) - f_pw = sp.lambdify(T, pw, 'numpy') diff = abs(self.temperature_array[1] - self.temperature_array[0]) temp_dense = np.arange(temp_points[0], temp_points[-1]+diff/2, diff) y_dense = f_pw(temp_dense) - + _validate_energy_density_monotonicity(prop_name, temp_dense, y_dense) if has_regression and simplify_type == PRE_KEY: # Use T_standard for regression and then substitute if needed pw_reg = _process_regression(temp_dense, y_dense, T_standard, lower_bound_type, upper_bound_type, degree, segments, seed) @@ -491,9 +495,6 @@ class PropertyProcessor: else: # No regression OR not pre raw_pw = pw setattr(material, prop_name, raw_pw) - - self.processed_properties.add(prop_name) - # Only visualize if visualizer is available if self.visualizer is not None: self.visualizer.visualize_property( @@ -512,21 +513,20 @@ class PropertyProcessor: lower_bound_type=lower_bound_type, upper_bound_type=upper_bound_type ) + self.processed_properties.add(prop_name) + def _process_computed_property(self, material: Material, prop_name: str, T: Union[float, sp.Symbol]) -> None: """Process computed properties using predefined models with dependency checking.""" logger.debug("PropertyProcessor: _process_computed_property - prop_name: %r, T: %r", prop_name, T) - # Skip if already processed if prop_name in self.processed_properties: return - try: # Get property configuration prop_config = self.properties.get(prop_name) if prop_config is None: raise ValueError(f"Property '{prop_name}' not found in configuration") - # Extract expression from config if isinstance(prop_config, str): expression = prop_config @@ -534,7 +534,6 @@ class PropertyProcessor: expression = prop_config[EQUATION_KEY] else: raise ValueError(f"Unsupported property configuration format for {prop_name}: {prop_config}") - # Process the expression try: material_property = self._parse_and_process_expression(expression, material, T) @@ -544,10 +543,8 @@ class PropertyProcessor: except Exception as e: # Wrap other exceptions with more context raise ValueError(f"Failed to process computed property '{prop_name}' \n -> {str(e)}") from e - # Set property on material setattr(material, prop_name, material_property) - # Handle non-symbolic temperature case if not isinstance(T, sp.Symbol): # For numeric T, we've already substituted the value in _parse_and_process_expression @@ -557,22 +554,17 @@ class PropertyProcessor: else: value = float(material_property) setattr(material, prop_name, sp.Float(value)) - self.processed_properties.add(prop_name) - # Extract bounds and regression parameters for visualization lower_bound_type, upper_bound_type = CONSTANT_KEY, CONSTANT_KEY if isinstance(prop_config, dict) and BOUNDS_KEY in prop_config: if isinstance(prop_config[BOUNDS_KEY], list) and len(prop_config[BOUNDS_KEY]) == 2: lower_bound_type, upper_bound_type = prop_config[BOUNDS_KEY] - temp_array = self.temperature_array lower_bound = np.min(temp_array) upper_bound = np.max(temp_array) - # Get regression parameters has_regression, simplify_type, degree, segments = self._process_regression_params(prop_config, prop_name, len(temp_array)) - # Create function from symbolic expression # Use standard symbol for lambdification if T is a different symbol T_standard = sp.Symbol('T') @@ -581,11 +573,9 @@ class PropertyProcessor: f_pw = sp.lambdify(T_standard, standard_expr, 'numpy') else: f_pw = sp.lambdify(T, material_property, 'numpy') - # Evaluate the function over the temperature range try: y_dense = f_pw(temp_array) - # Check for invalid values if not np.all(np.isfinite(y_dense)): invalid_count = np.sum(~np.isfinite(y_dense)) @@ -593,13 +583,12 @@ class PropertyProcessor: f"Property '{prop_name}' has {invalid_count} non-finite values. " f"This may indicate issues with the expression: {expression}" ) - + _validate_energy_density_monotonicity(prop_name, temp_array, y_dense) # Apply regression if needed if has_regression and simplify_type == PRE_KEY: # Use T_standard for regression and then substitute if needed T_for_regression = T_standard if isinstance(T, sp.Symbol) and str(T) != 'T' else T pw_reg = _process_regression(temp_array, y_dense, T_for_regression, lower_bound_type, upper_bound_type, degree, segments, seed) - # If T is a different symbol, substitute T_standard with the actual symbol if isinstance(T, sp.Symbol) and str(T) != 'T': pw_reg = pw_reg.subs(T_standard, T) @@ -607,9 +596,6 @@ class PropertyProcessor: else: # No regression OR not pre raw_pw = self._create_raw_piecewise(temp_array, y_dense, T, lower_bound_type, upper_bound_type) setattr(material, prop_name, raw_pw) - - self.processed_properties.add(prop_name) - # Only visualize if visualizer is available if self.visualizer is not None: self.visualizer.visualize_property( @@ -628,44 +614,37 @@ class PropertyProcessor: lower_bound_type=lower_bound_type, upper_bound_type=upper_bound_type ) - + self.processed_properties.add(prop_name) except Exception as e: raise ValueError(f"Error evaluating expression for '{prop_name}': {str(e)}") from e - except CircularDependencyError: # Re-raise CircularDependencyError without wrapping it raise except Exception as e: raise ValueError(f"Failed to process computed property '{prop_name}' \n -> {str(e)}") from e + # --- Expression/Dependency Helpers --- def _parse_and_process_expression(self, expression: str, material: Material, T: Union[float, sp.Symbol]) -> sp.Expr: """Parse and process a mathematical expression string into a SymPy expression.""" logger.debug("PropertyProcessor: _parse_and_process_expression - expression: %r, T: %r", expression, T) - try: # Create a standard symbol 'T' for parsing the expression T_standard = sp.Symbol('T') - # Parse the expression with the standard symbol sympy_expr = sp.sympify(expression, evaluate=False) - # Extract dependencies (always excluding 'T' as it's handled separately) dependencies = [str(symbol) for symbol in sympy_expr.free_symbols if str(symbol) != 'T'] - # Check for missing dependencies before processing missing_deps = [] for dep in dependencies: if not hasattr(material, dep) and dep not in self.properties: missing_deps.append(dep) - if missing_deps: available_props = sorted(list(self.properties.keys())) raise DependencyError(expression=expression, missing_deps=missing_deps, available_props=available_props) - # Check for circular dependencies self._check_circular_dependencies(prop_name=None, current_deps=dependencies, visited=set()) - # Process dependencies first for dep in dependencies: if not hasattr(material, dep) or getattr(material, dep) is None: @@ -675,12 +654,10 @@ class PropertyProcessor: # This should not happen due to the check above, but just in case available_props = sorted(list(self.properties.keys())) raise DependencyError(expression=expression, missing_deps=[dep], available_props=available_props) - # Verify all dependencies are now available missing_deps = [dep for dep in dependencies if not hasattr(material, dep) or getattr(material, dep) is None] if missing_deps: raise ValueError(f"Cannot compute expression. Missing dependencies: {missing_deps}") - # Create substitution dictionary substitutions = {} for dep in dependencies: @@ -692,7 +669,6 @@ class PropertyProcessor: if dep_symbol is None: raise ValueError(f"Symbol '{dep}' not found in symbol registry") substitutions[dep_symbol] = dep_value - # Handle temperature substitution based on type if isinstance(T, sp.Symbol): # If T is a symbolic variable, substitute the standard 'T' with it @@ -700,14 +676,11 @@ class PropertyProcessor: else: # For numeric T, substitute with the value directly substitutions[T_standard] = T - # Perform substitution and evaluate integrals if present result_expr = sympy_expr.subs(substitutions) if isinstance(result_expr, sp.Integral): result_expr = result_expr.doit() - return result_expr - except CircularDependencyError: # Re-raise the circular dependency error directly without wrapping it raise @@ -718,6 +691,7 @@ class PropertyProcessor: # Wrap other exceptions with more context raise ValueError(f"Failed to parse and process expression: {expression}") from e + def _check_circular_dependencies(self, prop_name, current_deps, visited, path=None): """Check for circular dependencies in property definitions.""" logger.debug("""PropertyProcessor: _check_circular_dependencies: @@ -725,25 +699,19 @@ class PropertyProcessor: current_deps: %r visited: %r path: %r""", prop_name, current_deps, visited, path) - if path is None: path = [] - # Always filter out 'T' from dependencies to check current_deps = [dep for dep in current_deps if dep != 'T'] - if prop_name is not None: if prop_name in visited: cycle_path = path + [prop_name] raise CircularDependencyError(dependency_path=cycle_path) - visited.add(prop_name) path = path + [prop_name] - for dep in current_deps: if dep in self.properties: dep_config = self.properties[dep] - if isinstance(dep_config, str): expr = sp.sympify(dep_config) # Always exclude 'T' from dependencies @@ -763,10 +731,10 @@ class PropertyProcessor: dep_deps = [str(symbol) for symbol in expr.free_symbols if str(symbol) != 'T'] else: dep_deps = [] - if dep_deps: self._check_circular_dependencies(dep, dep_deps, visited.copy(), path) + # --- Post-Processing --- def _post_process_properties(self, material: Material, T: Union[float, sp.Symbol]) -> None: """Perform post-processing on properties after all have been initially processed.""" @@ -788,18 +756,6 @@ class PropertyProcessor: prop_value = getattr(material, prop_name) if isinstance(prop_value, sp.Integral): raise ValueError(f"Property '{prop_name}' is an integral and cannot be post-processed.") - result = prop_value.doit() - if isinstance(result, sp.Integral): - temp_array = self.temperature_array - values = np.array([float(prop_value.evalf(subs={T: t})) for t in temp_array], dtype=float) - degree = regression_config[DEGREE_KEY] - segments = regression_config[SEGMENTS_KEY] - lower_bound_type, upper_bound_type = prop_config[BOUNDS_KEY] - pw = _process_regression(temp_array, values, T, lower_bound_type, upper_bound_type, degree, segments, seed) - prop_value = pw - else: - prop_value = result - setattr(material, prop_name, prop_value) if not isinstance(prop_value, sp.Expr): logger.debug("""PropertyVisualizer: _post_process_properties: Skipping - not symbolic for property: %r @@ -831,6 +787,7 @@ class PropertyProcessor: error_msg = "\n".join(errors) raise ValueError(f"Post-processing errors occurred: \n -> {error_msg}") + def _validate_temperature_range(self, prop: str, temp_array: np.ndarray) -> None: from pymatlib.core.yaml_parser.common_utils import validate_temperature_range return validate_temperature_range(prop, temp_array, self.temperature_array)