easyvvuq.utils.dataset_importer
Dataset Import Utilities for EasyVVUQ
This module provides utilities for creating EasyVVUQ campaigns from existing datasets that were not originally created with EasyVVUQ.
1""" 2Dataset Import Utilities for EasyVVUQ 3 4This module provides utilities for creating EasyVVUQ campaigns from existing datasets 5that were not originally created with EasyVVUQ. 6""" 7 8import os 9import json 10import glob 11import logging 12import pandas as pd 13import numpy as np 14from pathlib import Path 15from typing import List, Dict, Tuple, Optional, Union, Any 16from collections import defaultdict 17 18import easyvvuq as uq 19from easyvvuq.constants import Status 20from easyvvuq.actions import Actions, CreateRunDirectory, Encode, Decode, ExecuteLocal 21 22__copyright__ = """ 23 24 Copyright 2018 Robin A. Richardson, David W. Wright 25 26 This file is part of EasyVVUQ 27 28 EasyVVUQ is free software: you can redistribute it and/or modify 29 it under the terms of the Lesser GNU General Public License as published by 30 the Free Software Foundation, either version 3 of the License, or 31 (at your option) any later version. 32 33 EasyVVUQ is distributed in the hope that it will be useful, 34 but WITHOUT ANY WARRANTY; without even the implied warranty of 35 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 36 Lesser GNU General Public License for more details. 37 38 You should have received a copy of the Lesser GNU General Public License 39 along with this program. If not, see <https://www.gnu.org/licenses/>. 40 41""" 42__license__ = "LGPL" 43 44logger = logging.getLogger(__name__) 45 46 47class DatasetImporter: 48 """ 49 A utility class for importing existing datasets into EasyVVUQ campaigns. 50 51 This class provides methods to discover, validate, and import simulation data 52 from existing directory structures or file collections. 53 """ 54 55 def __init__(self, root_dir: str, campaign_name: str = "imported_campaign"): 56 """ 57 Initialize the DatasetImporter. 58 59 Parameters 60 ---------- 61 root_dir : str 62 Root directory containing the existing dataset 63 campaign_name : str, optional 64 Name for the campaign to be created (default: "imported_campaign") 65 """ 66 self.root_dir = Path(root_dir).resolve() 67 self.campaign_name = campaign_name 68 self.discovered_runs = [] 69 self.parameters = {} 70 self.output_columns = [] 71 72 if not self.root_dir.exists(): 73 raise ValueError(f"Root directory does not exist: {self.root_dir}") 74 75 def discover_directory_structure(self, 76 input_patterns: List[str] = None, 77 output_patterns: List[str] = None, 78 max_depth: int = 3) -> Dict[str, Any]: 79 """ 80 Automatically discover the directory structure and identify runs. 81 82 Parameters 83 ---------- 84 input_patterns : List[str], optional 85 List of glob patterns to match input files (default: common patterns) 86 output_patterns : List[str], optional 87 List of glob patterns to match output files (default: common patterns) 88 max_depth : int, optional 89 Maximum depth to search for files (default: 3) 90 91 Returns 92 ------- 93 Dict[str, Any] 94 Dictionary containing discovered structure information 95 """ 96 if input_patterns is None: 97 input_patterns = [ 98 "*.json", "*.txt", "*.csv", "*.yaml", "*.yml", "*.xml", 99 "input.*", "params.*", "config.*", "*input*", "*param*" 100 ] 101 102 if output_patterns is None: 103 output_patterns = [ 104 "*.csv", "*.json", "*.txt", "*.out", "*.log", "*.dat", 105 "output.*", "result.*", "*.results", "*output*", "*result*" 106 ] 107 108 discovered = { 109 'runs': [], 110 'structure_type': 'unknown', 111 'input_files': [], 112 'output_files': [], 113 'common_structure': None 114 } 115 116 logger.info(f"Discovering directory structure in {self.root_dir}") 117 118 # Search for potential run directories 119 for root, dirs, files in os.walk(self.root_dir): 120 current_depth = len(Path(root).relative_to(self.root_dir).parts) 121 if current_depth > max_depth: 122 continue 123 124 # Look for input and output files in this directory 125 input_files = [] 126 output_files = [] 127 128 for pattern in input_patterns: 129 input_files.extend(glob.glob(os.path.join(root, pattern))) 130 131 for pattern in output_patterns: 132 output_files.extend(glob.glob(os.path.join(root, pattern))) 133 134 # If we found both input and output files, this might be a run directory 135 if input_files and output_files: 136 run_info = { 137 'directory': root, 138 'input_files': input_files, 139 'output_files': output_files, 140 'relative_path': os.path.relpath(root, self.root_dir) 141 } 142 discovered['runs'].append(run_info) 143 discovered['input_files'].extend(input_files) 144 discovered['output_files'].extend(output_files) 145 146 # Determine structure type 147 if len(discovered['runs']) > 0: 148 discovered['structure_type'] = 'run_directories' 149 elif discovered['input_files'] or discovered['output_files']: 150 discovered['structure_type'] = 'flat_files' 151 152 logger.info(f"Discovered {len(discovered['runs'])} potential run directories") 153 self.discovered_runs = discovered['runs'] 154 155 return discovered 156 157 def infer_parameters(self, sample_files: List[str] = None, 158 file_type: str = 'auto') -> Dict[str, Dict[str, Any]]: 159 """ 160 Infer parameter definitions from sample input files. 161 162 Parameters 163 ---------- 164 sample_files : List[str], optional 165 List of sample input files to analyze (default: use discovered files) 166 file_type : str, optional 167 Type of input files ('json', 'csv', 'yaml', 'auto') (default: 'auto') 168 169 Returns 170 ------- 171 Dict[str, Dict[str, Any]] 172 Dictionary containing inferred parameter definitions 173 """ 174 if sample_files is None: 175 if not self.discovered_runs: 176 raise ValueError("No runs discovered. Run discover_directory_structure first.") 177 sample_files = [run['input_files'][0] for run in self.discovered_runs[:5]] 178 179 parameters = {} 180 all_params = defaultdict(list) 181 182 for file_path in sample_files: 183 try: 184 params = self._parse_input_file(file_path, file_type) 185 for key, value in params.items(): 186 all_params[key].append(value) 187 except Exception as e: 188 logger.warning(f"Failed to parse {file_path}: {e}") 189 continue 190 191 # Infer parameter types and ranges 192 for param_name, values in all_params.items(): 193 param_info = self._infer_parameter_info(param_name, values) 194 parameters[param_name] = param_info 195 196 self.parameters = parameters 197 logger.info(f"Inferred {len(parameters)} parameters: {list(parameters.keys())}") 198 199 return parameters 200 201 def infer_output_columns(self, sample_files: List[str] = None, 202 file_type: str = 'auto') -> List[str]: 203 """ 204 Infer output column names from sample output files. 205 206 Parameters 207 ---------- 208 sample_files : List[str], optional 209 List of sample output files to analyze (default: use discovered files) 210 file_type : str, optional 211 Type of output files ('json', 'csv', 'yaml', 'auto') (default: 'auto') 212 213 Returns 214 ------- 215 List[str] 216 List of output column names 217 """ 218 if sample_files is None: 219 if not self.discovered_runs: 220 raise ValueError("No runs discovered. Run discover_directory_structure first.") 221 sample_files = [run['output_files'][0] for run in self.discovered_runs[:5]] 222 223 all_columns = set() 224 225 for file_path in sample_files: 226 try: 227 columns = self._get_output_columns(file_path, file_type) 228 all_columns.update(columns) 229 except Exception as e: 230 logger.warning(f"Failed to parse {file_path}: {e}") 231 continue 232 233 self.output_columns = list(all_columns) 234 logger.info(f"Inferred {len(self.output_columns)} output columns: {self.output_columns}") 235 236 return self.output_columns 237 238 def create_campaign_from_dataset(self, 239 work_dir: str = "./", 240 input_decoder: Optional[object] = None, 241 output_decoder: Optional[object] = None, 242 auto_infer: bool = True): 243 """ 244 Create a new EasyVVUQ campaign from the discovered dataset. 245 246 Parameters 247 ---------- 248 work_dir : str, optional 249 Working directory for the campaign (default: "./") 250 input_decoder : object, optional 251 Custom input decoder (default: auto-create based on file types) 252 output_decoder : object, optional 253 Custom output decoder (default: auto-create based on file types) 254 auto_infer : bool, optional 255 Whether to automatically infer parameters and outputs (default: True) 256 257 Returns 258 ------- 259 Campaign 260 The created campaign with imported data 261 """ 262 if not self.discovered_runs: 263 raise ValueError("No runs discovered. Run discover_directory_structure first.") 264 265 if auto_infer: 266 if not self.parameters: 267 self.infer_parameters() 268 if not self.output_columns: 269 self.infer_output_columns() 270 271 # Create campaign 272 campaign = uq.Campaign(name=self.campaign_name, work_dir=work_dir) 273 274 # Create basic actions (since we're importing existing data, we don't need real execution) 275 actions = Actions( 276 CreateRunDirectory(work_dir), 277 Encode(uq.encoders.GenericEncoder('', '', target_filename='dummy_input')), 278 ExecuteLocal('echo "Imported data"'), 279 Decode(uq.decoders.SimpleCSV('dummy_output', self.output_columns)) 280 ) 281 282 # Add app to campaign 283 campaign.add_app( 284 name=self.campaign_name, 285 params=self.parameters, 286 actions=actions 287 ) 288 289 # Set up decoders 290 if input_decoder is None: 291 input_decoder = self._create_auto_decoder('input') 292 if output_decoder is None: 293 output_decoder = self._create_auto_decoder('output') 294 295 # Import the runs 296 self._import_runs_to_campaign(campaign, input_decoder, output_decoder) 297 298 logger.info(f"Created campaign '{self.campaign_name}' with {len(self.discovered_runs)} runs") 299 300 return campaign 301 302 def _parse_input_file(self, file_path: str, file_type: str = 'auto') -> Dict[str, Any]: 303 """Parse an input file and return parameters.""" 304 try: 305 if file_type == 'auto': 306 file_type = self._detect_file_type(file_path) 307 308 if file_type == 'json': 309 with open(file_path, 'r') as f: 310 return json.load(f) 311 elif file_type == 'csv': 312 df = pd.read_csv(file_path) 313 return df.iloc[0].to_dict() if len(df) > 0 else {} 314 elif file_type in ['yaml', 'yml']: 315 try: 316 import yaml 317 with open(file_path, 'r') as f: 318 return yaml.safe_load(f) 319 except ImportError: 320 logger.warning("PyYAML not installed, cannot parse YAML files") 321 return {} 322 else: 323 # Try to parse as key-value pairs 324 params = {} 325 with open(file_path, 'r') as f: 326 for line in f: 327 line = line.strip() 328 if '=' in line and not line.startswith('#'): 329 key, value = line.split('=', 1) 330 params[key.strip()] = self._convert_value(value.strip()) 331 return params 332 except Exception as e: 333 logger.warning(f"Failed to parse file {file_path}: {e}") 334 return {} 335 336 def _get_output_columns(self, file_path: str, file_type: str = 'auto') -> List[str]: 337 """Get column names from an output file.""" 338 if file_type == 'auto': 339 file_type = self._detect_file_type(file_path) 340 341 if file_type == 'json': 342 with open(file_path, 'r') as f: 343 data = json.load(f) 344 if isinstance(data, dict): 345 return list(data.keys()) 346 elif isinstance(data, list) and len(data) > 0 and isinstance(data[0], dict): 347 return list(data[0].keys()) 348 elif file_type == 'csv': 349 df = pd.read_csv(file_path, nrows=1) 350 return list(df.columns) 351 352 return [] 353 354 def _detect_file_type(self, file_path: str) -> str: 355 """Detect file type based on extension.""" 356 ext = Path(file_path).suffix.lower() 357 if ext == '.json': 358 return 'json' 359 elif ext == '.csv': 360 return 'csv' 361 elif ext in ['.yaml', '.yml']: 362 return 'yaml' 363 else: 364 return 'text' 365 366 def _convert_value(self, value_str: str) -> Any: 367 """Convert string value to appropriate type.""" 368 value_str = value_str.strip() 369 370 # Try boolean 371 if value_str.lower() in ['true', 'false']: 372 return value_str.lower() == 'true' 373 374 # Try integer 375 try: 376 return int(value_str) 377 except ValueError: 378 pass 379 380 # Try float 381 try: 382 return float(value_str) 383 except ValueError: 384 pass 385 386 # Return as string 387 return value_str 388 389 def _infer_parameter_info(self, param_name: str, values: List[Any]) -> Dict[str, Any]: 390 """Infer parameter information from a list of values.""" 391 if not values: 392 return {"type": "string", "default": ""} 393 394 # Remove None values 395 values = [v for v in values if v is not None] 396 397 if not values: 398 return {"type": "string", "default": ""} 399 400 # Determine type 401 first_value = values[0] 402 if isinstance(first_value, bool): 403 param_type = "boolean" 404 default = first_value 405 param_info = {"type": param_type, "default": default} 406 elif isinstance(first_value, int): 407 param_type = "integer" 408 min_val = min(values) 409 max_val = max(values) 410 default = values[0] 411 param_info = { 412 "type": param_type, 413 "min": min_val, 414 "max": max_val, 415 "default": default 416 } 417 elif isinstance(first_value, float): 418 param_type = "float" 419 min_val = min(values) 420 max_val = max(values) 421 default = values[0] 422 param_info = { 423 "type": param_type, 424 "min": min_val, 425 "max": max_val, 426 "default": default 427 } 428 else: 429 param_type = "string" 430 default = str(first_value) 431 param_info = {"type": param_type, "default": default} 432 433 return param_info 434 435 def _create_auto_decoder(self, decoder_type: str) -> object: 436 """Create an appropriate decoder based on discovered file types.""" 437 if decoder_type == 'input': 438 # For input files, try to detect the most common format 439 if self.discovered_runs: 440 sample_file = self.discovered_runs[0]['input_files'][0] 441 file_type = self._detect_file_type(sample_file) 442 443 if file_type == 'json': 444 return uq.decoders.JSONDecoder('', list(self.parameters.keys())) 445 elif file_type == 'csv': 446 return uq.decoders.SimpleCSV('', list(self.parameters.keys())) 447 448 elif decoder_type == 'output': 449 # For output files, try to detect the most common format 450 if self.discovered_runs: 451 sample_file = self.discovered_runs[0]['output_files'][0] 452 file_type = self._detect_file_type(sample_file) 453 454 if file_type == 'json': 455 return uq.decoders.JSONDecoder('', self.output_columns) 456 elif file_type == 'csv': 457 return uq.decoders.SimpleCSV('', self.output_columns) 458 459 # Default fallback 460 return uq.decoders.SimpleCSV('', self.output_columns) 461 462 def _import_runs_to_campaign(self, campaign: uq.Campaign, 463 input_decoder: object, 464 output_decoder: object): 465 """Import discovered runs into the campaign.""" 466 input_files = [] 467 output_files = [] 468 469 for run in self.discovered_runs: 470 # Use the first input and output file from each run 471 if run['input_files']: 472 input_files.append(run['input_files'][0]) 473 if run['output_files']: 474 output_files.append(run['output_files'][0]) 475 476 # Use the existing add_external_runs method 477 campaign.add_external_runs(input_files, output_files, input_decoder, output_decoder) 478 479 480def create_campaign_from_directory(root_dir: str, 481 campaign_name: str = "imported_campaign", 482 work_dir: str = "./", 483 input_patterns: List[str] = None, 484 output_patterns: List[str] = None, 485 auto_infer: bool = True): 486 """ 487 Convenience function to create a campaign from an existing directory structure. 488 489 Parameters 490 ---------- 491 root_dir : str 492 Root directory containing the existing dataset 493 campaign_name : str, optional 494 Name for the campaign to be created (default: "imported_campaign") 495 work_dir : str, optional 496 Working directory for the campaign (default: "./") 497 input_patterns : List[str], optional 498 List of glob patterns to match input files 499 output_patterns : List[str], optional 500 List of glob patterns to match output files 501 auto_infer : bool, optional 502 Whether to automatically infer parameters and outputs (default: True) 503 504 Returns 505 ------- 506 Campaign 507 The created campaign with imported data 508 509 Examples 510 -------- 511 >>> campaign = create_campaign_from_directory( 512 ... root_dir="/path/to/simulation/runs", 513 ... campaign_name="my_imported_campaign" 514 ... ) 515 """ 516 importer = DatasetImporter(root_dir, campaign_name) 517 importer.discover_directory_structure(input_patterns, output_patterns) 518 return importer.create_campaign_from_dataset(work_dir=work_dir, auto_infer=auto_infer) 519 520 521def create_campaign_from_files(input_files: List[str], 522 output_files: List[str], 523 campaign_name: str = "imported_campaign", 524 work_dir: str = "./", 525 input_decoder: Optional[object] = None, 526 output_decoder: Optional[object] = None, 527 auto_infer: bool = True): 528 """ 529 Create a campaign from explicit lists of input and output files. 530 531 Parameters 532 ---------- 533 input_files : List[str] 534 List of input file paths 535 output_files : List[str] 536 List of output file paths 537 campaign_name : str, optional 538 Name for the campaign to be created (default: "imported_campaign") 539 work_dir : str, optional 540 Working directory for the campaign (default: "./") 541 input_decoder : object, optional 542 Custom input decoder (default: auto-create) 543 output_decoder : object, optional 544 Custom output decoder (default: auto-create) 545 auto_infer : bool, optional 546 Whether to automatically infer parameters and outputs (default: True) 547 548 Returns 549 ------- 550 Campaign 551 The created campaign with imported data 552 553 Examples 554 -------- 555 >>> campaign = create_campaign_from_files( 556 ... input_files=["run1/input.json", "run2/input.json"], 557 ... output_files=["run1/output.csv", "run2/output.csv"], 558 ... campaign_name="my_campaign" 559 ... ) 560 """ 561 if len(input_files) != len(output_files): 562 raise ValueError("Number of input files must match number of output files") 563 564 # Create a temporary directory structure for the importer 565 temp_dir = Path(work_dir) / "temp_import" 566 temp_dir.mkdir(exist_ok=True) 567 568 # Create fake run directories 569 discovered_runs = [] 570 for i, (input_file, output_file) in enumerate(zip(input_files, output_files)): 571 run_dir = temp_dir / f"run_{i}" 572 run_dir.mkdir(exist_ok=True) 573 discovered_runs.append({ 574 'directory': str(run_dir), 575 'input_files': [input_file], 576 'output_files': [output_file], 577 'relative_path': f"run_{i}" 578 }) 579 580 # Create importer and set up discovered runs 581 importer = DatasetImporter(str(temp_dir), campaign_name) 582 importer.discovered_runs = discovered_runs 583 584 if auto_infer: 585 importer.infer_parameters(input_files) 586 importer.infer_output_columns(output_files) 587 588 return importer.create_campaign_from_dataset( 589 work_dir=work_dir, 590 input_decoder=input_decoder, 591 output_decoder=output_decoder, 592 auto_infer=False # Already done above 593 )
48class DatasetImporter: 49 """ 50 A utility class for importing existing datasets into EasyVVUQ campaigns. 51 52 This class provides methods to discover, validate, and import simulation data 53 from existing directory structures or file collections. 54 """ 55 56 def __init__(self, root_dir: str, campaign_name: str = "imported_campaign"): 57 """ 58 Initialize the DatasetImporter. 59 60 Parameters 61 ---------- 62 root_dir : str 63 Root directory containing the existing dataset 64 campaign_name : str, optional 65 Name for the campaign to be created (default: "imported_campaign") 66 """ 67 self.root_dir = Path(root_dir).resolve() 68 self.campaign_name = campaign_name 69 self.discovered_runs = [] 70 self.parameters = {} 71 self.output_columns = [] 72 73 if not self.root_dir.exists(): 74 raise ValueError(f"Root directory does not exist: {self.root_dir}") 75 76 def discover_directory_structure(self, 77 input_patterns: List[str] = None, 78 output_patterns: List[str] = None, 79 max_depth: int = 3) -> Dict[str, Any]: 80 """ 81 Automatically discover the directory structure and identify runs. 82 83 Parameters 84 ---------- 85 input_patterns : List[str], optional 86 List of glob patterns to match input files (default: common patterns) 87 output_patterns : List[str], optional 88 List of glob patterns to match output files (default: common patterns) 89 max_depth : int, optional 90 Maximum depth to search for files (default: 3) 91 92 Returns 93 ------- 94 Dict[str, Any] 95 Dictionary containing discovered structure information 96 """ 97 if input_patterns is None: 98 input_patterns = [ 99 "*.json", "*.txt", "*.csv", "*.yaml", "*.yml", "*.xml", 100 "input.*", "params.*", "config.*", "*input*", "*param*" 101 ] 102 103 if output_patterns is None: 104 output_patterns = [ 105 "*.csv", "*.json", "*.txt", "*.out", "*.log", "*.dat", 106 "output.*", "result.*", "*.results", "*output*", "*result*" 107 ] 108 109 discovered = { 110 'runs': [], 111 'structure_type': 'unknown', 112 'input_files': [], 113 'output_files': [], 114 'common_structure': None 115 } 116 117 logger.info(f"Discovering directory structure in {self.root_dir}") 118 119 # Search for potential run directories 120 for root, dirs, files in os.walk(self.root_dir): 121 current_depth = len(Path(root).relative_to(self.root_dir).parts) 122 if current_depth > max_depth: 123 continue 124 125 # Look for input and output files in this directory 126 input_files = [] 127 output_files = [] 128 129 for pattern in input_patterns: 130 input_files.extend(glob.glob(os.path.join(root, pattern))) 131 132 for pattern in output_patterns: 133 output_files.extend(glob.glob(os.path.join(root, pattern))) 134 135 # If we found both input and output files, this might be a run directory 136 if input_files and output_files: 137 run_info = { 138 'directory': root, 139 'input_files': input_files, 140 'output_files': output_files, 141 'relative_path': os.path.relpath(root, self.root_dir) 142 } 143 discovered['runs'].append(run_info) 144 discovered['input_files'].extend(input_files) 145 discovered['output_files'].extend(output_files) 146 147 # Determine structure type 148 if len(discovered['runs']) > 0: 149 discovered['structure_type'] = 'run_directories' 150 elif discovered['input_files'] or discovered['output_files']: 151 discovered['structure_type'] = 'flat_files' 152 153 logger.info(f"Discovered {len(discovered['runs'])} potential run directories") 154 self.discovered_runs = discovered['runs'] 155 156 return discovered 157 158 def infer_parameters(self, sample_files: List[str] = None, 159 file_type: str = 'auto') -> Dict[str, Dict[str, Any]]: 160 """ 161 Infer parameter definitions from sample input files. 162 163 Parameters 164 ---------- 165 sample_files : List[str], optional 166 List of sample input files to analyze (default: use discovered files) 167 file_type : str, optional 168 Type of input files ('json', 'csv', 'yaml', 'auto') (default: 'auto') 169 170 Returns 171 ------- 172 Dict[str, Dict[str, Any]] 173 Dictionary containing inferred parameter definitions 174 """ 175 if sample_files is None: 176 if not self.discovered_runs: 177 raise ValueError("No runs discovered. Run discover_directory_structure first.") 178 sample_files = [run['input_files'][0] for run in self.discovered_runs[:5]] 179 180 parameters = {} 181 all_params = defaultdict(list) 182 183 for file_path in sample_files: 184 try: 185 params = self._parse_input_file(file_path, file_type) 186 for key, value in params.items(): 187 all_params[key].append(value) 188 except Exception as e: 189 logger.warning(f"Failed to parse {file_path}: {e}") 190 continue 191 192 # Infer parameter types and ranges 193 for param_name, values in all_params.items(): 194 param_info = self._infer_parameter_info(param_name, values) 195 parameters[param_name] = param_info 196 197 self.parameters = parameters 198 logger.info(f"Inferred {len(parameters)} parameters: {list(parameters.keys())}") 199 200 return parameters 201 202 def infer_output_columns(self, sample_files: List[str] = None, 203 file_type: str = 'auto') -> List[str]: 204 """ 205 Infer output column names from sample output files. 206 207 Parameters 208 ---------- 209 sample_files : List[str], optional 210 List of sample output files to analyze (default: use discovered files) 211 file_type : str, optional 212 Type of output files ('json', 'csv', 'yaml', 'auto') (default: 'auto') 213 214 Returns 215 ------- 216 List[str] 217 List of output column names 218 """ 219 if sample_files is None: 220 if not self.discovered_runs: 221 raise ValueError("No runs discovered. Run discover_directory_structure first.") 222 sample_files = [run['output_files'][0] for run in self.discovered_runs[:5]] 223 224 all_columns = set() 225 226 for file_path in sample_files: 227 try: 228 columns = self._get_output_columns(file_path, file_type) 229 all_columns.update(columns) 230 except Exception as e: 231 logger.warning(f"Failed to parse {file_path}: {e}") 232 continue 233 234 self.output_columns = list(all_columns) 235 logger.info(f"Inferred {len(self.output_columns)} output columns: {self.output_columns}") 236 237 return self.output_columns 238 239 def create_campaign_from_dataset(self, 240 work_dir: str = "./", 241 input_decoder: Optional[object] = None, 242 output_decoder: Optional[object] = None, 243 auto_infer: bool = True): 244 """ 245 Create a new EasyVVUQ campaign from the discovered dataset. 246 247 Parameters 248 ---------- 249 work_dir : str, optional 250 Working directory for the campaign (default: "./") 251 input_decoder : object, optional 252 Custom input decoder (default: auto-create based on file types) 253 output_decoder : object, optional 254 Custom output decoder (default: auto-create based on file types) 255 auto_infer : bool, optional 256 Whether to automatically infer parameters and outputs (default: True) 257 258 Returns 259 ------- 260 Campaign 261 The created campaign with imported data 262 """ 263 if not self.discovered_runs: 264 raise ValueError("No runs discovered. Run discover_directory_structure first.") 265 266 if auto_infer: 267 if not self.parameters: 268 self.infer_parameters() 269 if not self.output_columns: 270 self.infer_output_columns() 271 272 # Create campaign 273 campaign = uq.Campaign(name=self.campaign_name, work_dir=work_dir) 274 275 # Create basic actions (since we're importing existing data, we don't need real execution) 276 actions = Actions( 277 CreateRunDirectory(work_dir), 278 Encode(uq.encoders.GenericEncoder('', '', target_filename='dummy_input')), 279 ExecuteLocal('echo "Imported data"'), 280 Decode(uq.decoders.SimpleCSV('dummy_output', self.output_columns)) 281 ) 282 283 # Add app to campaign 284 campaign.add_app( 285 name=self.campaign_name, 286 params=self.parameters, 287 actions=actions 288 ) 289 290 # Set up decoders 291 if input_decoder is None: 292 input_decoder = self._create_auto_decoder('input') 293 if output_decoder is None: 294 output_decoder = self._create_auto_decoder('output') 295 296 # Import the runs 297 self._import_runs_to_campaign(campaign, input_decoder, output_decoder) 298 299 logger.info(f"Created campaign '{self.campaign_name}' with {len(self.discovered_runs)} runs") 300 301 return campaign 302 303 def _parse_input_file(self, file_path: str, file_type: str = 'auto') -> Dict[str, Any]: 304 """Parse an input file and return parameters.""" 305 try: 306 if file_type == 'auto': 307 file_type = self._detect_file_type(file_path) 308 309 if file_type == 'json': 310 with open(file_path, 'r') as f: 311 return json.load(f) 312 elif file_type == 'csv': 313 df = pd.read_csv(file_path) 314 return df.iloc[0].to_dict() if len(df) > 0 else {} 315 elif file_type in ['yaml', 'yml']: 316 try: 317 import yaml 318 with open(file_path, 'r') as f: 319 return yaml.safe_load(f) 320 except ImportError: 321 logger.warning("PyYAML not installed, cannot parse YAML files") 322 return {} 323 else: 324 # Try to parse as key-value pairs 325 params = {} 326 with open(file_path, 'r') as f: 327 for line in f: 328 line = line.strip() 329 if '=' in line and not line.startswith('#'): 330 key, value = line.split('=', 1) 331 params[key.strip()] = self._convert_value(value.strip()) 332 return params 333 except Exception as e: 334 logger.warning(f"Failed to parse file {file_path}: {e}") 335 return {} 336 337 def _get_output_columns(self, file_path: str, file_type: str = 'auto') -> List[str]: 338 """Get column names from an output file.""" 339 if file_type == 'auto': 340 file_type = self._detect_file_type(file_path) 341 342 if file_type == 'json': 343 with open(file_path, 'r') as f: 344 data = json.load(f) 345 if isinstance(data, dict): 346 return list(data.keys()) 347 elif isinstance(data, list) and len(data) > 0 and isinstance(data[0], dict): 348 return list(data[0].keys()) 349 elif file_type == 'csv': 350 df = pd.read_csv(file_path, nrows=1) 351 return list(df.columns) 352 353 return [] 354 355 def _detect_file_type(self, file_path: str) -> str: 356 """Detect file type based on extension.""" 357 ext = Path(file_path).suffix.lower() 358 if ext == '.json': 359 return 'json' 360 elif ext == '.csv': 361 return 'csv' 362 elif ext in ['.yaml', '.yml']: 363 return 'yaml' 364 else: 365 return 'text' 366 367 def _convert_value(self, value_str: str) -> Any: 368 """Convert string value to appropriate type.""" 369 value_str = value_str.strip() 370 371 # Try boolean 372 if value_str.lower() in ['true', 'false']: 373 return value_str.lower() == 'true' 374 375 # Try integer 376 try: 377 return int(value_str) 378 except ValueError: 379 pass 380 381 # Try float 382 try: 383 return float(value_str) 384 except ValueError: 385 pass 386 387 # Return as string 388 return value_str 389 390 def _infer_parameter_info(self, param_name: str, values: List[Any]) -> Dict[str, Any]: 391 """Infer parameter information from a list of values.""" 392 if not values: 393 return {"type": "string", "default": ""} 394 395 # Remove None values 396 values = [v for v in values if v is not None] 397 398 if not values: 399 return {"type": "string", "default": ""} 400 401 # Determine type 402 first_value = values[0] 403 if isinstance(first_value, bool): 404 param_type = "boolean" 405 default = first_value 406 param_info = {"type": param_type, "default": default} 407 elif isinstance(first_value, int): 408 param_type = "integer" 409 min_val = min(values) 410 max_val = max(values) 411 default = values[0] 412 param_info = { 413 "type": param_type, 414 "min": min_val, 415 "max": max_val, 416 "default": default 417 } 418 elif isinstance(first_value, float): 419 param_type = "float" 420 min_val = min(values) 421 max_val = max(values) 422 default = values[0] 423 param_info = { 424 "type": param_type, 425 "min": min_val, 426 "max": max_val, 427 "default": default 428 } 429 else: 430 param_type = "string" 431 default = str(first_value) 432 param_info = {"type": param_type, "default": default} 433 434 return param_info 435 436 def _create_auto_decoder(self, decoder_type: str) -> object: 437 """Create an appropriate decoder based on discovered file types.""" 438 if decoder_type == 'input': 439 # For input files, try to detect the most common format 440 if self.discovered_runs: 441 sample_file = self.discovered_runs[0]['input_files'][0] 442 file_type = self._detect_file_type(sample_file) 443 444 if file_type == 'json': 445 return uq.decoders.JSONDecoder('', list(self.parameters.keys())) 446 elif file_type == 'csv': 447 return uq.decoders.SimpleCSV('', list(self.parameters.keys())) 448 449 elif decoder_type == 'output': 450 # For output files, try to detect the most common format 451 if self.discovered_runs: 452 sample_file = self.discovered_runs[0]['output_files'][0] 453 file_type = self._detect_file_type(sample_file) 454 455 if file_type == 'json': 456 return uq.decoders.JSONDecoder('', self.output_columns) 457 elif file_type == 'csv': 458 return uq.decoders.SimpleCSV('', self.output_columns) 459 460 # Default fallback 461 return uq.decoders.SimpleCSV('', self.output_columns) 462 463 def _import_runs_to_campaign(self, campaign: uq.Campaign, 464 input_decoder: object, 465 output_decoder: object): 466 """Import discovered runs into the campaign.""" 467 input_files = [] 468 output_files = [] 469 470 for run in self.discovered_runs: 471 # Use the first input and output file from each run 472 if run['input_files']: 473 input_files.append(run['input_files'][0]) 474 if run['output_files']: 475 output_files.append(run['output_files'][0]) 476 477 # Use the existing add_external_runs method 478 campaign.add_external_runs(input_files, output_files, input_decoder, output_decoder)
A utility class for importing existing datasets into EasyVVUQ campaigns.
This class provides methods to discover, validate, and import simulation data from existing directory structures or file collections.
56 def __init__(self, root_dir: str, campaign_name: str = "imported_campaign"): 57 """ 58 Initialize the DatasetImporter. 59 60 Parameters 61 ---------- 62 root_dir : str 63 Root directory containing the existing dataset 64 campaign_name : str, optional 65 Name for the campaign to be created (default: "imported_campaign") 66 """ 67 self.root_dir = Path(root_dir).resolve() 68 self.campaign_name = campaign_name 69 self.discovered_runs = [] 70 self.parameters = {} 71 self.output_columns = [] 72 73 if not self.root_dir.exists(): 74 raise ValueError(f"Root directory does not exist: {self.root_dir}")
Initialize the DatasetImporter.
Parameters
- root_dir (str): Root directory containing the existing dataset
- campaign_name (str, optional): Name for the campaign to be created (default: "imported_campaign")
76 def discover_directory_structure(self, 77 input_patterns: List[str] = None, 78 output_patterns: List[str] = None, 79 max_depth: int = 3) -> Dict[str, Any]: 80 """ 81 Automatically discover the directory structure and identify runs. 82 83 Parameters 84 ---------- 85 input_patterns : List[str], optional 86 List of glob patterns to match input files (default: common patterns) 87 output_patterns : List[str], optional 88 List of glob patterns to match output files (default: common patterns) 89 max_depth : int, optional 90 Maximum depth to search for files (default: 3) 91 92 Returns 93 ------- 94 Dict[str, Any] 95 Dictionary containing discovered structure information 96 """ 97 if input_patterns is None: 98 input_patterns = [ 99 "*.json", "*.txt", "*.csv", "*.yaml", "*.yml", "*.xml", 100 "input.*", "params.*", "config.*", "*input*", "*param*" 101 ] 102 103 if output_patterns is None: 104 output_patterns = [ 105 "*.csv", "*.json", "*.txt", "*.out", "*.log", "*.dat", 106 "output.*", "result.*", "*.results", "*output*", "*result*" 107 ] 108 109 discovered = { 110 'runs': [], 111 'structure_type': 'unknown', 112 'input_files': [], 113 'output_files': [], 114 'common_structure': None 115 } 116 117 logger.info(f"Discovering directory structure in {self.root_dir}") 118 119 # Search for potential run directories 120 for root, dirs, files in os.walk(self.root_dir): 121 current_depth = len(Path(root).relative_to(self.root_dir).parts) 122 if current_depth > max_depth: 123 continue 124 125 # Look for input and output files in this directory 126 input_files = [] 127 output_files = [] 128 129 for pattern in input_patterns: 130 input_files.extend(glob.glob(os.path.join(root, pattern))) 131 132 for pattern in output_patterns: 133 output_files.extend(glob.glob(os.path.join(root, pattern))) 134 135 # If we found both input and output files, this might be a run directory 136 if input_files and output_files: 137 run_info = { 138 'directory': root, 139 'input_files': input_files, 140 'output_files': output_files, 141 'relative_path': os.path.relpath(root, self.root_dir) 142 } 143 discovered['runs'].append(run_info) 144 discovered['input_files'].extend(input_files) 145 discovered['output_files'].extend(output_files) 146 147 # Determine structure type 148 if len(discovered['runs']) > 0: 149 discovered['structure_type'] = 'run_directories' 150 elif discovered['input_files'] or discovered['output_files']: 151 discovered['structure_type'] = 'flat_files' 152 153 logger.info(f"Discovered {len(discovered['runs'])} potential run directories") 154 self.discovered_runs = discovered['runs'] 155 156 return discovered
Automatically discover the directory structure and identify runs.
Parameters
- input_patterns (List[str], optional): List of glob patterns to match input files (default: common patterns)
- output_patterns (List[str], optional): List of glob patterns to match output files (default: common patterns)
- max_depth (int, optional): Maximum depth to search for files (default: 3)
Returns
- Dict[str, Any]: Dictionary containing discovered structure information
158 def infer_parameters(self, sample_files: List[str] = None, 159 file_type: str = 'auto') -> Dict[str, Dict[str, Any]]: 160 """ 161 Infer parameter definitions from sample input files. 162 163 Parameters 164 ---------- 165 sample_files : List[str], optional 166 List of sample input files to analyze (default: use discovered files) 167 file_type : str, optional 168 Type of input files ('json', 'csv', 'yaml', 'auto') (default: 'auto') 169 170 Returns 171 ------- 172 Dict[str, Dict[str, Any]] 173 Dictionary containing inferred parameter definitions 174 """ 175 if sample_files is None: 176 if not self.discovered_runs: 177 raise ValueError("No runs discovered. Run discover_directory_structure first.") 178 sample_files = [run['input_files'][0] for run in self.discovered_runs[:5]] 179 180 parameters = {} 181 all_params = defaultdict(list) 182 183 for file_path in sample_files: 184 try: 185 params = self._parse_input_file(file_path, file_type) 186 for key, value in params.items(): 187 all_params[key].append(value) 188 except Exception as e: 189 logger.warning(f"Failed to parse {file_path}: {e}") 190 continue 191 192 # Infer parameter types and ranges 193 for param_name, values in all_params.items(): 194 param_info = self._infer_parameter_info(param_name, values) 195 parameters[param_name] = param_info 196 197 self.parameters = parameters 198 logger.info(f"Inferred {len(parameters)} parameters: {list(parameters.keys())}") 199 200 return parameters
Infer parameter definitions from sample input files.
Parameters
- sample_files (List[str], optional): List of sample input files to analyze (default: use discovered files)
- file_type (str, optional): Type of input files ('json', 'csv', 'yaml', 'auto') (default: 'auto')
Returns
- Dict[str, Dict[str, Any]]: Dictionary containing inferred parameter definitions
202 def infer_output_columns(self, sample_files: List[str] = None, 203 file_type: str = 'auto') -> List[str]: 204 """ 205 Infer output column names from sample output files. 206 207 Parameters 208 ---------- 209 sample_files : List[str], optional 210 List of sample output files to analyze (default: use discovered files) 211 file_type : str, optional 212 Type of output files ('json', 'csv', 'yaml', 'auto') (default: 'auto') 213 214 Returns 215 ------- 216 List[str] 217 List of output column names 218 """ 219 if sample_files is None: 220 if not self.discovered_runs: 221 raise ValueError("No runs discovered. Run discover_directory_structure first.") 222 sample_files = [run['output_files'][0] for run in self.discovered_runs[:5]] 223 224 all_columns = set() 225 226 for file_path in sample_files: 227 try: 228 columns = self._get_output_columns(file_path, file_type) 229 all_columns.update(columns) 230 except Exception as e: 231 logger.warning(f"Failed to parse {file_path}: {e}") 232 continue 233 234 self.output_columns = list(all_columns) 235 logger.info(f"Inferred {len(self.output_columns)} output columns: {self.output_columns}") 236 237 return self.output_columns
Infer output column names from sample output files.
Parameters
- sample_files (List[str], optional): List of sample output files to analyze (default: use discovered files)
- file_type (str, optional): Type of output files ('json', 'csv', 'yaml', 'auto') (default: 'auto')
Returns
- List[str]: List of output column names
239 def create_campaign_from_dataset(self, 240 work_dir: str = "./", 241 input_decoder: Optional[object] = None, 242 output_decoder: Optional[object] = None, 243 auto_infer: bool = True): 244 """ 245 Create a new EasyVVUQ campaign from the discovered dataset. 246 247 Parameters 248 ---------- 249 work_dir : str, optional 250 Working directory for the campaign (default: "./") 251 input_decoder : object, optional 252 Custom input decoder (default: auto-create based on file types) 253 output_decoder : object, optional 254 Custom output decoder (default: auto-create based on file types) 255 auto_infer : bool, optional 256 Whether to automatically infer parameters and outputs (default: True) 257 258 Returns 259 ------- 260 Campaign 261 The created campaign with imported data 262 """ 263 if not self.discovered_runs: 264 raise ValueError("No runs discovered. Run discover_directory_structure first.") 265 266 if auto_infer: 267 if not self.parameters: 268 self.infer_parameters() 269 if not self.output_columns: 270 self.infer_output_columns() 271 272 # Create campaign 273 campaign = uq.Campaign(name=self.campaign_name, work_dir=work_dir) 274 275 # Create basic actions (since we're importing existing data, we don't need real execution) 276 actions = Actions( 277 CreateRunDirectory(work_dir), 278 Encode(uq.encoders.GenericEncoder('', '', target_filename='dummy_input')), 279 ExecuteLocal('echo "Imported data"'), 280 Decode(uq.decoders.SimpleCSV('dummy_output', self.output_columns)) 281 ) 282 283 # Add app to campaign 284 campaign.add_app( 285 name=self.campaign_name, 286 params=self.parameters, 287 actions=actions 288 ) 289 290 # Set up decoders 291 if input_decoder is None: 292 input_decoder = self._create_auto_decoder('input') 293 if output_decoder is None: 294 output_decoder = self._create_auto_decoder('output') 295 296 # Import the runs 297 self._import_runs_to_campaign(campaign, input_decoder, output_decoder) 298 299 logger.info(f"Created campaign '{self.campaign_name}' with {len(self.discovered_runs)} runs") 300 301 return campaign
Create a new EasyVVUQ campaign from the discovered dataset.
Parameters
- work_dir (str, optional): Working directory for the campaign (default: "./")
- input_decoder (object, optional): Custom input decoder (default: auto-create based on file types)
- output_decoder (object, optional): Custom output decoder (default: auto-create based on file types)
- auto_infer (bool, optional): Whether to automatically infer parameters and outputs (default: True)
Returns
- Campaign: The created campaign with imported data
481def create_campaign_from_directory(root_dir: str, 482 campaign_name: str = "imported_campaign", 483 work_dir: str = "./", 484 input_patterns: List[str] = None, 485 output_patterns: List[str] = None, 486 auto_infer: bool = True): 487 """ 488 Convenience function to create a campaign from an existing directory structure. 489 490 Parameters 491 ---------- 492 root_dir : str 493 Root directory containing the existing dataset 494 campaign_name : str, optional 495 Name for the campaign to be created (default: "imported_campaign") 496 work_dir : str, optional 497 Working directory for the campaign (default: "./") 498 input_patterns : List[str], optional 499 List of glob patterns to match input files 500 output_patterns : List[str], optional 501 List of glob patterns to match output files 502 auto_infer : bool, optional 503 Whether to automatically infer parameters and outputs (default: True) 504 505 Returns 506 ------- 507 Campaign 508 The created campaign with imported data 509 510 Examples 511 -------- 512 >>> campaign = create_campaign_from_directory( 513 ... root_dir="/path/to/simulation/runs", 514 ... campaign_name="my_imported_campaign" 515 ... ) 516 """ 517 importer = DatasetImporter(root_dir, campaign_name) 518 importer.discover_directory_structure(input_patterns, output_patterns) 519 return importer.create_campaign_from_dataset(work_dir=work_dir, auto_infer=auto_infer)
Convenience function to create a campaign from an existing directory structure.
Parameters
- root_dir (str): Root directory containing the existing dataset
- campaign_name (str, optional): Name for the campaign to be created (default: "imported_campaign")
- work_dir (str, optional): Working directory for the campaign (default: "./")
- input_patterns (List[str], optional): List of glob patterns to match input files
- output_patterns (List[str], optional): List of glob patterns to match output files
- auto_infer (bool, optional): Whether to automatically infer parameters and outputs (default: True)
Returns
- Campaign: The created campaign with imported data
Examples
>>> campaign = create_campaign_from_directory(
... root_dir="/path/to/simulation/runs",
... campaign_name="my_imported_campaign"
... )
522def create_campaign_from_files(input_files: List[str], 523 output_files: List[str], 524 campaign_name: str = "imported_campaign", 525 work_dir: str = "./", 526 input_decoder: Optional[object] = None, 527 output_decoder: Optional[object] = None, 528 auto_infer: bool = True): 529 """ 530 Create a campaign from explicit lists of input and output files. 531 532 Parameters 533 ---------- 534 input_files : List[str] 535 List of input file paths 536 output_files : List[str] 537 List of output file paths 538 campaign_name : str, optional 539 Name for the campaign to be created (default: "imported_campaign") 540 work_dir : str, optional 541 Working directory for the campaign (default: "./") 542 input_decoder : object, optional 543 Custom input decoder (default: auto-create) 544 output_decoder : object, optional 545 Custom output decoder (default: auto-create) 546 auto_infer : bool, optional 547 Whether to automatically infer parameters and outputs (default: True) 548 549 Returns 550 ------- 551 Campaign 552 The created campaign with imported data 553 554 Examples 555 -------- 556 >>> campaign = create_campaign_from_files( 557 ... input_files=["run1/input.json", "run2/input.json"], 558 ... output_files=["run1/output.csv", "run2/output.csv"], 559 ... campaign_name="my_campaign" 560 ... ) 561 """ 562 if len(input_files) != len(output_files): 563 raise ValueError("Number of input files must match number of output files") 564 565 # Create a temporary directory structure for the importer 566 temp_dir = Path(work_dir) / "temp_import" 567 temp_dir.mkdir(exist_ok=True) 568 569 # Create fake run directories 570 discovered_runs = [] 571 for i, (input_file, output_file) in enumerate(zip(input_files, output_files)): 572 run_dir = temp_dir / f"run_{i}" 573 run_dir.mkdir(exist_ok=True) 574 discovered_runs.append({ 575 'directory': str(run_dir), 576 'input_files': [input_file], 577 'output_files': [output_file], 578 'relative_path': f"run_{i}" 579 }) 580 581 # Create importer and set up discovered runs 582 importer = DatasetImporter(str(temp_dir), campaign_name) 583 importer.discovered_runs = discovered_runs 584 585 if auto_infer: 586 importer.infer_parameters(input_files) 587 importer.infer_output_columns(output_files) 588 589 return importer.create_campaign_from_dataset( 590 work_dir=work_dir, 591 input_decoder=input_decoder, 592 output_decoder=output_decoder, 593 auto_infer=False # Already done above 594 )
Create a campaign from explicit lists of input and output files.
Parameters
- input_files (List[str]): List of input file paths
- output_files (List[str]): List of output file paths
- campaign_name (str, optional): Name for the campaign to be created (default: "imported_campaign")
- work_dir (str, optional): Working directory for the campaign (default: "./")
- input_decoder (object, optional): Custom input decoder (default: auto-create)
- output_decoder (object, optional): Custom output decoder (default: auto-create)
- auto_infer (bool, optional): Whether to automatically infer parameters and outputs (default: True)
Returns
- Campaign: The created campaign with imported data
Examples
>>> campaign = create_campaign_from_files(
... input_files=["run1/input.json", "run2/input.json"],
... output_files=["run1/output.csv", "run2/output.csv"],
... campaign_name="my_campaign"
... )