easyvvuq.utils.dataset_importer

Dataset Import Utilities for EasyVVUQ

This module provides utilities for creating EasyVVUQ campaigns from existing datasets that were not originally created with EasyVVUQ.

  1"""
  2Dataset Import Utilities for EasyVVUQ
  3
  4This module provides utilities for creating EasyVVUQ campaigns from existing datasets
  5that were not originally created with EasyVVUQ.
  6"""
  7
  8import os
  9import json
 10import glob
 11import logging
 12import pandas as pd
 13import numpy as np
 14from pathlib import Path
 15from typing import List, Dict, Tuple, Optional, Union, Any
 16from collections import defaultdict
 17
 18import easyvvuq as uq
 19from easyvvuq.constants import Status
 20from easyvvuq.actions import Actions, CreateRunDirectory, Encode, Decode, ExecuteLocal
 21
 22__copyright__ = """
 23
 24    Copyright 2018 Robin A. Richardson, David W. Wright
 25
 26    This file is part of EasyVVUQ
 27
 28    EasyVVUQ is free software: you can redistribute it and/or modify
 29    it under the terms of the Lesser GNU General Public License as published by
 30    the Free Software Foundation, either version 3 of the License, or
 31    (at your option) any later version.
 32
 33    EasyVVUQ is distributed in the hope that it will be useful,
 34    but WITHOUT ANY WARRANTY; without even the implied warranty of
 35    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 36    Lesser GNU General Public License for more details.
 37
 38    You should have received a copy of the Lesser GNU General Public License
 39    along with this program.  If not, see <https://www.gnu.org/licenses/>.
 40
 41"""
 42__license__ = "LGPL"
 43
 44logger = logging.getLogger(__name__)
 45
 46
 47class DatasetImporter:
 48    """
 49    A utility class for importing existing datasets into EasyVVUQ campaigns.
 50    
 51    This class provides methods to discover, validate, and import simulation data
 52    from existing directory structures or file collections.
 53    """
 54    
 55    def __init__(self, root_dir: str, campaign_name: str = "imported_campaign"):
 56        """
 57        Initialize the DatasetImporter.
 58        
 59        Parameters
 60        ----------
 61        root_dir : str
 62            Root directory containing the existing dataset
 63        campaign_name : str, optional
 64            Name for the campaign to be created (default: "imported_campaign")
 65        """
 66        self.root_dir = Path(root_dir).resolve()
 67        self.campaign_name = campaign_name
 68        self.discovered_runs = []
 69        self.parameters = {}
 70        self.output_columns = []
 71        
 72        if not self.root_dir.exists():
 73            raise ValueError(f"Root directory does not exist: {self.root_dir}")
 74    
 75    def discover_directory_structure(self, 
 76                                   input_patterns: List[str] = None,
 77                                   output_patterns: List[str] = None,
 78                                   max_depth: int = 3) -> Dict[str, Any]:
 79        """
 80        Automatically discover the directory structure and identify runs.
 81        
 82        Parameters
 83        ----------
 84        input_patterns : List[str], optional
 85            List of glob patterns to match input files (default: common patterns)
 86        output_patterns : List[str], optional
 87            List of glob patterns to match output files (default: common patterns)
 88        max_depth : int, optional
 89            Maximum depth to search for files (default: 3)
 90        
 91        Returns
 92        -------
 93        Dict[str, Any]
 94            Dictionary containing discovered structure information
 95        """
 96        if input_patterns is None:
 97            input_patterns = [
 98                "*.json", "*.txt", "*.csv", "*.yaml", "*.yml", "*.xml",
 99                "input.*", "params.*", "config.*", "*input*", "*param*"
100            ]
101        
102        if output_patterns is None:
103            output_patterns = [
104                "*.csv", "*.json", "*.txt", "*.out", "*.log", "*.dat",
105                "output.*", "result.*", "*.results", "*output*", "*result*"
106            ]
107        
108        discovered = {
109            'runs': [],
110            'structure_type': 'unknown',
111            'input_files': [],
112            'output_files': [],
113            'common_structure': None
114        }
115        
116        logger.info(f"Discovering directory structure in {self.root_dir}")
117        
118        # Search for potential run directories
119        for root, dirs, files in os.walk(self.root_dir):
120            current_depth = len(Path(root).relative_to(self.root_dir).parts)
121            if current_depth > max_depth:
122                continue
123                
124            # Look for input and output files in this directory
125            input_files = []
126            output_files = []
127            
128            for pattern in input_patterns:
129                input_files.extend(glob.glob(os.path.join(root, pattern)))
130            
131            for pattern in output_patterns:
132                output_files.extend(glob.glob(os.path.join(root, pattern)))
133            
134            # If we found both input and output files, this might be a run directory
135            if input_files and output_files:
136                run_info = {
137                    'directory': root,
138                    'input_files': input_files,
139                    'output_files': output_files,
140                    'relative_path': os.path.relpath(root, self.root_dir)
141                }
142                discovered['runs'].append(run_info)
143                discovered['input_files'].extend(input_files)
144                discovered['output_files'].extend(output_files)
145        
146        # Determine structure type
147        if len(discovered['runs']) > 0:
148            discovered['structure_type'] = 'run_directories'
149        elif discovered['input_files'] or discovered['output_files']:
150            discovered['structure_type'] = 'flat_files'
151        
152        logger.info(f"Discovered {len(discovered['runs'])} potential run directories")
153        self.discovered_runs = discovered['runs']
154        
155        return discovered
156    
157    def infer_parameters(self, sample_files: List[str] = None, 
158                        file_type: str = 'auto') -> Dict[str, Dict[str, Any]]:
159        """
160        Infer parameter definitions from sample input files.
161        
162        Parameters
163        ----------
164        sample_files : List[str], optional
165            List of sample input files to analyze (default: use discovered files)
166        file_type : str, optional
167            Type of input files ('json', 'csv', 'yaml', 'auto') (default: 'auto')
168        
169        Returns
170        -------
171        Dict[str, Dict[str, Any]]
172            Dictionary containing inferred parameter definitions
173        """
174        if sample_files is None:
175            if not self.discovered_runs:
176                raise ValueError("No runs discovered. Run discover_directory_structure first.")
177            sample_files = [run['input_files'][0] for run in self.discovered_runs[:5]]
178        
179        parameters = {}
180        all_params = defaultdict(list)
181        
182        for file_path in sample_files:
183            try:
184                params = self._parse_input_file(file_path, file_type)
185                for key, value in params.items():
186                    all_params[key].append(value)
187            except Exception as e:
188                logger.warning(f"Failed to parse {file_path}: {e}")
189                continue
190        
191        # Infer parameter types and ranges
192        for param_name, values in all_params.items():
193            param_info = self._infer_parameter_info(param_name, values)
194            parameters[param_name] = param_info
195        
196        self.parameters = parameters
197        logger.info(f"Inferred {len(parameters)} parameters: {list(parameters.keys())}")
198        
199        return parameters
200    
201    def infer_output_columns(self, sample_files: List[str] = None,
202                           file_type: str = 'auto') -> List[str]:
203        """
204        Infer output column names from sample output files.
205        
206        Parameters
207        ----------
208        sample_files : List[str], optional
209            List of sample output files to analyze (default: use discovered files)
210        file_type : str, optional
211            Type of output files ('json', 'csv', 'yaml', 'auto') (default: 'auto')
212        
213        Returns
214        -------
215        List[str]
216            List of output column names
217        """
218        if sample_files is None:
219            if not self.discovered_runs:
220                raise ValueError("No runs discovered. Run discover_directory_structure first.")
221            sample_files = [run['output_files'][0] for run in self.discovered_runs[:5]]
222        
223        all_columns = set()
224        
225        for file_path in sample_files:
226            try:
227                columns = self._get_output_columns(file_path, file_type)
228                all_columns.update(columns)
229            except Exception as e:
230                logger.warning(f"Failed to parse {file_path}: {e}")
231                continue
232        
233        self.output_columns = list(all_columns)
234        logger.info(f"Inferred {len(self.output_columns)} output columns: {self.output_columns}")
235        
236        return self.output_columns
237    
238    def create_campaign_from_dataset(self, 
239                                   work_dir: str = "./",
240                                   input_decoder: Optional[object] = None,
241                                   output_decoder: Optional[object] = None,
242                                   auto_infer: bool = True):
243        """
244        Create a new EasyVVUQ campaign from the discovered dataset.
245        
246        Parameters
247        ----------
248        work_dir : str, optional
249            Working directory for the campaign (default: "./")
250        input_decoder : object, optional
251            Custom input decoder (default: auto-create based on file types)
252        output_decoder : object, optional
253            Custom output decoder (default: auto-create based on file types)
254        auto_infer : bool, optional
255            Whether to automatically infer parameters and outputs (default: True)
256        
257        Returns
258        -------
259        Campaign
260            The created campaign with imported data
261        """
262        if not self.discovered_runs:
263            raise ValueError("No runs discovered. Run discover_directory_structure first.")
264        
265        if auto_infer:
266            if not self.parameters:
267                self.infer_parameters()
268            if not self.output_columns:
269                self.infer_output_columns()
270        
271        # Create campaign
272        campaign = uq.Campaign(name=self.campaign_name, work_dir=work_dir)
273        
274        # Create basic actions (since we're importing existing data, we don't need real execution)
275        actions = Actions(
276            CreateRunDirectory(work_dir),
277            Encode(uq.encoders.GenericEncoder('', '', target_filename='dummy_input')),
278            ExecuteLocal('echo "Imported data"'),
279            Decode(uq.decoders.SimpleCSV('dummy_output', self.output_columns))
280        )
281        
282        # Add app to campaign
283        campaign.add_app(
284            name=self.campaign_name,
285            params=self.parameters,
286            actions=actions
287        )
288        
289        # Set up decoders
290        if input_decoder is None:
291            input_decoder = self._create_auto_decoder('input')
292        if output_decoder is None:
293            output_decoder = self._create_auto_decoder('output')
294        
295        # Import the runs
296        self._import_runs_to_campaign(campaign, input_decoder, output_decoder)
297        
298        logger.info(f"Created campaign '{self.campaign_name}' with {len(self.discovered_runs)} runs")
299        
300        return campaign
301    
302    def _parse_input_file(self, file_path: str, file_type: str = 'auto') -> Dict[str, Any]:
303        """Parse an input file and return parameters."""
304        try:
305            if file_type == 'auto':
306                file_type = self._detect_file_type(file_path)
307            
308            if file_type == 'json':
309                with open(file_path, 'r') as f:
310                    return json.load(f)
311            elif file_type == 'csv':
312                df = pd.read_csv(file_path)
313                return df.iloc[0].to_dict() if len(df) > 0 else {}
314            elif file_type in ['yaml', 'yml']:
315                try:
316                    import yaml
317                    with open(file_path, 'r') as f:
318                        return yaml.safe_load(f)
319                except ImportError:
320                    logger.warning("PyYAML not installed, cannot parse YAML files")
321                    return {}
322            else:
323                # Try to parse as key-value pairs
324                params = {}
325                with open(file_path, 'r') as f:
326                    for line in f:
327                        line = line.strip()
328                        if '=' in line and not line.startswith('#'):
329                            key, value = line.split('=', 1)
330                            params[key.strip()] = self._convert_value(value.strip())
331                return params
332        except Exception as e:
333            logger.warning(f"Failed to parse file {file_path}: {e}")
334            return {}
335    
336    def _get_output_columns(self, file_path: str, file_type: str = 'auto') -> List[str]:
337        """Get column names from an output file."""
338        if file_type == 'auto':
339            file_type = self._detect_file_type(file_path)
340        
341        if file_type == 'json':
342            with open(file_path, 'r') as f:
343                data = json.load(f)
344                if isinstance(data, dict):
345                    return list(data.keys())
346                elif isinstance(data, list) and len(data) > 0 and isinstance(data[0], dict):
347                    return list(data[0].keys())
348        elif file_type == 'csv':
349            df = pd.read_csv(file_path, nrows=1)
350            return list(df.columns)
351        
352        return []
353    
354    def _detect_file_type(self, file_path: str) -> str:
355        """Detect file type based on extension."""
356        ext = Path(file_path).suffix.lower()
357        if ext == '.json':
358            return 'json'
359        elif ext == '.csv':
360            return 'csv'
361        elif ext in ['.yaml', '.yml']:
362            return 'yaml'
363        else:
364            return 'text'
365    
366    def _convert_value(self, value_str: str) -> Any:
367        """Convert string value to appropriate type."""
368        value_str = value_str.strip()
369        
370        # Try boolean
371        if value_str.lower() in ['true', 'false']:
372            return value_str.lower() == 'true'
373        
374        # Try integer
375        try:
376            return int(value_str)
377        except ValueError:
378            pass
379        
380        # Try float
381        try:
382            return float(value_str)
383        except ValueError:
384            pass
385        
386        # Return as string
387        return value_str
388    
389    def _infer_parameter_info(self, param_name: str, values: List[Any]) -> Dict[str, Any]:
390        """Infer parameter information from a list of values."""
391        if not values:
392            return {"type": "string", "default": ""}
393        
394        # Remove None values
395        values = [v for v in values if v is not None]
396        
397        if not values:
398            return {"type": "string", "default": ""}
399        
400        # Determine type
401        first_value = values[0]
402        if isinstance(first_value, bool):
403            param_type = "boolean"
404            default = first_value
405            param_info = {"type": param_type, "default": default}
406        elif isinstance(first_value, int):
407            param_type = "integer"
408            min_val = min(values)
409            max_val = max(values)
410            default = values[0]
411            param_info = {
412                "type": param_type,
413                "min": min_val,
414                "max": max_val,
415                "default": default
416            }
417        elif isinstance(first_value, float):
418            param_type = "float"
419            min_val = min(values)
420            max_val = max(values)
421            default = values[0]
422            param_info = {
423                "type": param_type,
424                "min": min_val,
425                "max": max_val,
426                "default": default
427            }
428        else:
429            param_type = "string"
430            default = str(first_value)
431            param_info = {"type": param_type, "default": default}
432        
433        return param_info
434    
435    def _create_auto_decoder(self, decoder_type: str) -> object:
436        """Create an appropriate decoder based on discovered file types."""
437        if decoder_type == 'input':
438            # For input files, try to detect the most common format
439            if self.discovered_runs:
440                sample_file = self.discovered_runs[0]['input_files'][0]
441                file_type = self._detect_file_type(sample_file)
442                
443                if file_type == 'json':
444                    return uq.decoders.JSONDecoder('', list(self.parameters.keys()))
445                elif file_type == 'csv':
446                    return uq.decoders.SimpleCSV('', list(self.parameters.keys()))
447        
448        elif decoder_type == 'output':
449            # For output files, try to detect the most common format
450            if self.discovered_runs:
451                sample_file = self.discovered_runs[0]['output_files'][0]
452                file_type = self._detect_file_type(sample_file)
453                
454                if file_type == 'json':
455                    return uq.decoders.JSONDecoder('', self.output_columns)
456                elif file_type == 'csv':
457                    return uq.decoders.SimpleCSV('', self.output_columns)
458        
459        # Default fallback
460        return uq.decoders.SimpleCSV('', self.output_columns)
461    
462    def _import_runs_to_campaign(self, campaign: uq.Campaign, 
463                               input_decoder: object, 
464                               output_decoder: object):
465        """Import discovered runs into the campaign."""
466        input_files = []
467        output_files = []
468        
469        for run in self.discovered_runs:
470            # Use the first input and output file from each run
471            if run['input_files']:
472                input_files.append(run['input_files'][0])
473            if run['output_files']:
474                output_files.append(run['output_files'][0])
475        
476        # Use the existing add_external_runs method
477        campaign.add_external_runs(input_files, output_files, input_decoder, output_decoder)
478
479
480def create_campaign_from_directory(root_dir: str,
481                                 campaign_name: str = "imported_campaign",
482                                 work_dir: str = "./",
483                                 input_patterns: List[str] = None,
484                                 output_patterns: List[str] = None,
485                                 auto_infer: bool = True):
486    """
487    Convenience function to create a campaign from an existing directory structure.
488    
489    Parameters
490    ----------
491    root_dir : str
492        Root directory containing the existing dataset
493    campaign_name : str, optional
494        Name for the campaign to be created (default: "imported_campaign")
495    work_dir : str, optional
496        Working directory for the campaign (default: "./")
497    input_patterns : List[str], optional
498        List of glob patterns to match input files
499    output_patterns : List[str], optional
500        List of glob patterns to match output files
501    auto_infer : bool, optional
502        Whether to automatically infer parameters and outputs (default: True)
503    
504    Returns
505    -------
506    Campaign
507        The created campaign with imported data
508    
509    Examples
510    --------
511    >>> campaign = create_campaign_from_directory(
512    ...     root_dir="/path/to/simulation/runs",
513    ...     campaign_name="my_imported_campaign"
514    ... )
515    """
516    importer = DatasetImporter(root_dir, campaign_name)
517    importer.discover_directory_structure(input_patterns, output_patterns)
518    return importer.create_campaign_from_dataset(work_dir=work_dir, auto_infer=auto_infer)
519
520
521def create_campaign_from_files(input_files: List[str],
522                             output_files: List[str],
523                             campaign_name: str = "imported_campaign",
524                             work_dir: str = "./",
525                             input_decoder: Optional[object] = None,
526                             output_decoder: Optional[object] = None,
527                             auto_infer: bool = True):
528    """
529    Create a campaign from explicit lists of input and output files.
530    
531    Parameters
532    ----------
533    input_files : List[str]
534        List of input file paths
535    output_files : List[str]
536        List of output file paths
537    campaign_name : str, optional
538        Name for the campaign to be created (default: "imported_campaign")
539    work_dir : str, optional
540        Working directory for the campaign (default: "./")
541    input_decoder : object, optional
542        Custom input decoder (default: auto-create)
543    output_decoder : object, optional
544        Custom output decoder (default: auto-create)
545    auto_infer : bool, optional
546        Whether to automatically infer parameters and outputs (default: True)
547    
548    Returns
549    -------
550    Campaign
551        The created campaign with imported data
552    
553    Examples
554    --------
555    >>> campaign = create_campaign_from_files(
556    ...     input_files=["run1/input.json", "run2/input.json"],
557    ...     output_files=["run1/output.csv", "run2/output.csv"],
558    ...     campaign_name="my_campaign"
559    ... )
560    """
561    if len(input_files) != len(output_files):
562        raise ValueError("Number of input files must match number of output files")
563    
564    # Create a temporary directory structure for the importer
565    temp_dir = Path(work_dir) / "temp_import"
566    temp_dir.mkdir(exist_ok=True)
567    
568    # Create fake run directories
569    discovered_runs = []
570    for i, (input_file, output_file) in enumerate(zip(input_files, output_files)):
571        run_dir = temp_dir / f"run_{i}"
572        run_dir.mkdir(exist_ok=True)
573        discovered_runs.append({
574            'directory': str(run_dir),
575            'input_files': [input_file],
576            'output_files': [output_file],
577            'relative_path': f"run_{i}"
578        })
579    
580    # Create importer and set up discovered runs
581    importer = DatasetImporter(str(temp_dir), campaign_name)
582    importer.discovered_runs = discovered_runs
583    
584    if auto_infer:
585        importer.infer_parameters(input_files)
586        importer.infer_output_columns(output_files)
587    
588    return importer.create_campaign_from_dataset(
589        work_dir=work_dir,
590        input_decoder=input_decoder,
591        output_decoder=output_decoder,
592        auto_infer=False  # Already done above
593    )
logger = <Logger easyvvuq.utils.dataset_importer (DEBUG)>
class DatasetImporter:
 48class DatasetImporter:
 49    """
 50    A utility class for importing existing datasets into EasyVVUQ campaigns.
 51    
 52    This class provides methods to discover, validate, and import simulation data
 53    from existing directory structures or file collections.
 54    """
 55    
 56    def __init__(self, root_dir: str, campaign_name: str = "imported_campaign"):
 57        """
 58        Initialize the DatasetImporter.
 59        
 60        Parameters
 61        ----------
 62        root_dir : str
 63            Root directory containing the existing dataset
 64        campaign_name : str, optional
 65            Name for the campaign to be created (default: "imported_campaign")
 66        """
 67        self.root_dir = Path(root_dir).resolve()
 68        self.campaign_name = campaign_name
 69        self.discovered_runs = []
 70        self.parameters = {}
 71        self.output_columns = []
 72        
 73        if not self.root_dir.exists():
 74            raise ValueError(f"Root directory does not exist: {self.root_dir}")
 75    
 76    def discover_directory_structure(self, 
 77                                   input_patterns: List[str] = None,
 78                                   output_patterns: List[str] = None,
 79                                   max_depth: int = 3) -> Dict[str, Any]:
 80        """
 81        Automatically discover the directory structure and identify runs.
 82        
 83        Parameters
 84        ----------
 85        input_patterns : List[str], optional
 86            List of glob patterns to match input files (default: common patterns)
 87        output_patterns : List[str], optional
 88            List of glob patterns to match output files (default: common patterns)
 89        max_depth : int, optional
 90            Maximum depth to search for files (default: 3)
 91        
 92        Returns
 93        -------
 94        Dict[str, Any]
 95            Dictionary containing discovered structure information
 96        """
 97        if input_patterns is None:
 98            input_patterns = [
 99                "*.json", "*.txt", "*.csv", "*.yaml", "*.yml", "*.xml",
100                "input.*", "params.*", "config.*", "*input*", "*param*"
101            ]
102        
103        if output_patterns is None:
104            output_patterns = [
105                "*.csv", "*.json", "*.txt", "*.out", "*.log", "*.dat",
106                "output.*", "result.*", "*.results", "*output*", "*result*"
107            ]
108        
109        discovered = {
110            'runs': [],
111            'structure_type': 'unknown',
112            'input_files': [],
113            'output_files': [],
114            'common_structure': None
115        }
116        
117        logger.info(f"Discovering directory structure in {self.root_dir}")
118        
119        # Search for potential run directories
120        for root, dirs, files in os.walk(self.root_dir):
121            current_depth = len(Path(root).relative_to(self.root_dir).parts)
122            if current_depth > max_depth:
123                continue
124                
125            # Look for input and output files in this directory
126            input_files = []
127            output_files = []
128            
129            for pattern in input_patterns:
130                input_files.extend(glob.glob(os.path.join(root, pattern)))
131            
132            for pattern in output_patterns:
133                output_files.extend(glob.glob(os.path.join(root, pattern)))
134            
135            # If we found both input and output files, this might be a run directory
136            if input_files and output_files:
137                run_info = {
138                    'directory': root,
139                    'input_files': input_files,
140                    'output_files': output_files,
141                    'relative_path': os.path.relpath(root, self.root_dir)
142                }
143                discovered['runs'].append(run_info)
144                discovered['input_files'].extend(input_files)
145                discovered['output_files'].extend(output_files)
146        
147        # Determine structure type
148        if len(discovered['runs']) > 0:
149            discovered['structure_type'] = 'run_directories'
150        elif discovered['input_files'] or discovered['output_files']:
151            discovered['structure_type'] = 'flat_files'
152        
153        logger.info(f"Discovered {len(discovered['runs'])} potential run directories")
154        self.discovered_runs = discovered['runs']
155        
156        return discovered
157    
158    def infer_parameters(self, sample_files: List[str] = None, 
159                        file_type: str = 'auto') -> Dict[str, Dict[str, Any]]:
160        """
161        Infer parameter definitions from sample input files.
162        
163        Parameters
164        ----------
165        sample_files : List[str], optional
166            List of sample input files to analyze (default: use discovered files)
167        file_type : str, optional
168            Type of input files ('json', 'csv', 'yaml', 'auto') (default: 'auto')
169        
170        Returns
171        -------
172        Dict[str, Dict[str, Any]]
173            Dictionary containing inferred parameter definitions
174        """
175        if sample_files is None:
176            if not self.discovered_runs:
177                raise ValueError("No runs discovered. Run discover_directory_structure first.")
178            sample_files = [run['input_files'][0] for run in self.discovered_runs[:5]]
179        
180        parameters = {}
181        all_params = defaultdict(list)
182        
183        for file_path in sample_files:
184            try:
185                params = self._parse_input_file(file_path, file_type)
186                for key, value in params.items():
187                    all_params[key].append(value)
188            except Exception as e:
189                logger.warning(f"Failed to parse {file_path}: {e}")
190                continue
191        
192        # Infer parameter types and ranges
193        for param_name, values in all_params.items():
194            param_info = self._infer_parameter_info(param_name, values)
195            parameters[param_name] = param_info
196        
197        self.parameters = parameters
198        logger.info(f"Inferred {len(parameters)} parameters: {list(parameters.keys())}")
199        
200        return parameters
201    
202    def infer_output_columns(self, sample_files: List[str] = None,
203                           file_type: str = 'auto') -> List[str]:
204        """
205        Infer output column names from sample output files.
206        
207        Parameters
208        ----------
209        sample_files : List[str], optional
210            List of sample output files to analyze (default: use discovered files)
211        file_type : str, optional
212            Type of output files ('json', 'csv', 'yaml', 'auto') (default: 'auto')
213        
214        Returns
215        -------
216        List[str]
217            List of output column names
218        """
219        if sample_files is None:
220            if not self.discovered_runs:
221                raise ValueError("No runs discovered. Run discover_directory_structure first.")
222            sample_files = [run['output_files'][0] for run in self.discovered_runs[:5]]
223        
224        all_columns = set()
225        
226        for file_path in sample_files:
227            try:
228                columns = self._get_output_columns(file_path, file_type)
229                all_columns.update(columns)
230            except Exception as e:
231                logger.warning(f"Failed to parse {file_path}: {e}")
232                continue
233        
234        self.output_columns = list(all_columns)
235        logger.info(f"Inferred {len(self.output_columns)} output columns: {self.output_columns}")
236        
237        return self.output_columns
238    
239    def create_campaign_from_dataset(self, 
240                                   work_dir: str = "./",
241                                   input_decoder: Optional[object] = None,
242                                   output_decoder: Optional[object] = None,
243                                   auto_infer: bool = True):
244        """
245        Create a new EasyVVUQ campaign from the discovered dataset.
246        
247        Parameters
248        ----------
249        work_dir : str, optional
250            Working directory for the campaign (default: "./")
251        input_decoder : object, optional
252            Custom input decoder (default: auto-create based on file types)
253        output_decoder : object, optional
254            Custom output decoder (default: auto-create based on file types)
255        auto_infer : bool, optional
256            Whether to automatically infer parameters and outputs (default: True)
257        
258        Returns
259        -------
260        Campaign
261            The created campaign with imported data
262        """
263        if not self.discovered_runs:
264            raise ValueError("No runs discovered. Run discover_directory_structure first.")
265        
266        if auto_infer:
267            if not self.parameters:
268                self.infer_parameters()
269            if not self.output_columns:
270                self.infer_output_columns()
271        
272        # Create campaign
273        campaign = uq.Campaign(name=self.campaign_name, work_dir=work_dir)
274        
275        # Create basic actions (since we're importing existing data, we don't need real execution)
276        actions = Actions(
277            CreateRunDirectory(work_dir),
278            Encode(uq.encoders.GenericEncoder('', '', target_filename='dummy_input')),
279            ExecuteLocal('echo "Imported data"'),
280            Decode(uq.decoders.SimpleCSV('dummy_output', self.output_columns))
281        )
282        
283        # Add app to campaign
284        campaign.add_app(
285            name=self.campaign_name,
286            params=self.parameters,
287            actions=actions
288        )
289        
290        # Set up decoders
291        if input_decoder is None:
292            input_decoder = self._create_auto_decoder('input')
293        if output_decoder is None:
294            output_decoder = self._create_auto_decoder('output')
295        
296        # Import the runs
297        self._import_runs_to_campaign(campaign, input_decoder, output_decoder)
298        
299        logger.info(f"Created campaign '{self.campaign_name}' with {len(self.discovered_runs)} runs")
300        
301        return campaign
302    
303    def _parse_input_file(self, file_path: str, file_type: str = 'auto') -> Dict[str, Any]:
304        """Parse an input file and return parameters."""
305        try:
306            if file_type == 'auto':
307                file_type = self._detect_file_type(file_path)
308            
309            if file_type == 'json':
310                with open(file_path, 'r') as f:
311                    return json.load(f)
312            elif file_type == 'csv':
313                df = pd.read_csv(file_path)
314                return df.iloc[0].to_dict() if len(df) > 0 else {}
315            elif file_type in ['yaml', 'yml']:
316                try:
317                    import yaml
318                    with open(file_path, 'r') as f:
319                        return yaml.safe_load(f)
320                except ImportError:
321                    logger.warning("PyYAML not installed, cannot parse YAML files")
322                    return {}
323            else:
324                # Try to parse as key-value pairs
325                params = {}
326                with open(file_path, 'r') as f:
327                    for line in f:
328                        line = line.strip()
329                        if '=' in line and not line.startswith('#'):
330                            key, value = line.split('=', 1)
331                            params[key.strip()] = self._convert_value(value.strip())
332                return params
333        except Exception as e:
334            logger.warning(f"Failed to parse file {file_path}: {e}")
335            return {}
336    
337    def _get_output_columns(self, file_path: str, file_type: str = 'auto') -> List[str]:
338        """Get column names from an output file."""
339        if file_type == 'auto':
340            file_type = self._detect_file_type(file_path)
341        
342        if file_type == 'json':
343            with open(file_path, 'r') as f:
344                data = json.load(f)
345                if isinstance(data, dict):
346                    return list(data.keys())
347                elif isinstance(data, list) and len(data) > 0 and isinstance(data[0], dict):
348                    return list(data[0].keys())
349        elif file_type == 'csv':
350            df = pd.read_csv(file_path, nrows=1)
351            return list(df.columns)
352        
353        return []
354    
355    def _detect_file_type(self, file_path: str) -> str:
356        """Detect file type based on extension."""
357        ext = Path(file_path).suffix.lower()
358        if ext == '.json':
359            return 'json'
360        elif ext == '.csv':
361            return 'csv'
362        elif ext in ['.yaml', '.yml']:
363            return 'yaml'
364        else:
365            return 'text'
366    
367    def _convert_value(self, value_str: str) -> Any:
368        """Convert string value to appropriate type."""
369        value_str = value_str.strip()
370        
371        # Try boolean
372        if value_str.lower() in ['true', 'false']:
373            return value_str.lower() == 'true'
374        
375        # Try integer
376        try:
377            return int(value_str)
378        except ValueError:
379            pass
380        
381        # Try float
382        try:
383            return float(value_str)
384        except ValueError:
385            pass
386        
387        # Return as string
388        return value_str
389    
390    def _infer_parameter_info(self, param_name: str, values: List[Any]) -> Dict[str, Any]:
391        """Infer parameter information from a list of values."""
392        if not values:
393            return {"type": "string", "default": ""}
394        
395        # Remove None values
396        values = [v for v in values if v is not None]
397        
398        if not values:
399            return {"type": "string", "default": ""}
400        
401        # Determine type
402        first_value = values[0]
403        if isinstance(first_value, bool):
404            param_type = "boolean"
405            default = first_value
406            param_info = {"type": param_type, "default": default}
407        elif isinstance(first_value, int):
408            param_type = "integer"
409            min_val = min(values)
410            max_val = max(values)
411            default = values[0]
412            param_info = {
413                "type": param_type,
414                "min": min_val,
415                "max": max_val,
416                "default": default
417            }
418        elif isinstance(first_value, float):
419            param_type = "float"
420            min_val = min(values)
421            max_val = max(values)
422            default = values[0]
423            param_info = {
424                "type": param_type,
425                "min": min_val,
426                "max": max_val,
427                "default": default
428            }
429        else:
430            param_type = "string"
431            default = str(first_value)
432            param_info = {"type": param_type, "default": default}
433        
434        return param_info
435    
436    def _create_auto_decoder(self, decoder_type: str) -> object:
437        """Create an appropriate decoder based on discovered file types."""
438        if decoder_type == 'input':
439            # For input files, try to detect the most common format
440            if self.discovered_runs:
441                sample_file = self.discovered_runs[0]['input_files'][0]
442                file_type = self._detect_file_type(sample_file)
443                
444                if file_type == 'json':
445                    return uq.decoders.JSONDecoder('', list(self.parameters.keys()))
446                elif file_type == 'csv':
447                    return uq.decoders.SimpleCSV('', list(self.parameters.keys()))
448        
449        elif decoder_type == 'output':
450            # For output files, try to detect the most common format
451            if self.discovered_runs:
452                sample_file = self.discovered_runs[0]['output_files'][0]
453                file_type = self._detect_file_type(sample_file)
454                
455                if file_type == 'json':
456                    return uq.decoders.JSONDecoder('', self.output_columns)
457                elif file_type == 'csv':
458                    return uq.decoders.SimpleCSV('', self.output_columns)
459        
460        # Default fallback
461        return uq.decoders.SimpleCSV('', self.output_columns)
462    
463    def _import_runs_to_campaign(self, campaign: uq.Campaign, 
464                               input_decoder: object, 
465                               output_decoder: object):
466        """Import discovered runs into the campaign."""
467        input_files = []
468        output_files = []
469        
470        for run in self.discovered_runs:
471            # Use the first input and output file from each run
472            if run['input_files']:
473                input_files.append(run['input_files'][0])
474            if run['output_files']:
475                output_files.append(run['output_files'][0])
476        
477        # Use the existing add_external_runs method
478        campaign.add_external_runs(input_files, output_files, input_decoder, output_decoder)

A utility class for importing existing datasets into EasyVVUQ campaigns.

This class provides methods to discover, validate, and import simulation data from existing directory structures or file collections.

DatasetImporter(root_dir: str, campaign_name: str = 'imported_campaign')
56    def __init__(self, root_dir: str, campaign_name: str = "imported_campaign"):
57        """
58        Initialize the DatasetImporter.
59        
60        Parameters
61        ----------
62        root_dir : str
63            Root directory containing the existing dataset
64        campaign_name : str, optional
65            Name for the campaign to be created (default: "imported_campaign")
66        """
67        self.root_dir = Path(root_dir).resolve()
68        self.campaign_name = campaign_name
69        self.discovered_runs = []
70        self.parameters = {}
71        self.output_columns = []
72        
73        if not self.root_dir.exists():
74            raise ValueError(f"Root directory does not exist: {self.root_dir}")

Initialize the DatasetImporter.

Parameters
  • root_dir (str): Root directory containing the existing dataset
  • campaign_name (str, optional): Name for the campaign to be created (default: "imported_campaign")
root_dir
campaign_name
discovered_runs
parameters
output_columns
def discover_directory_structure( self, input_patterns: List[str] = None, output_patterns: List[str] = None, max_depth: int = 3) -> Dict[str, Any]:
 76    def discover_directory_structure(self, 
 77                                   input_patterns: List[str] = None,
 78                                   output_patterns: List[str] = None,
 79                                   max_depth: int = 3) -> Dict[str, Any]:
 80        """
 81        Automatically discover the directory structure and identify runs.
 82        
 83        Parameters
 84        ----------
 85        input_patterns : List[str], optional
 86            List of glob patterns to match input files (default: common patterns)
 87        output_patterns : List[str], optional
 88            List of glob patterns to match output files (default: common patterns)
 89        max_depth : int, optional
 90            Maximum depth to search for files (default: 3)
 91        
 92        Returns
 93        -------
 94        Dict[str, Any]
 95            Dictionary containing discovered structure information
 96        """
 97        if input_patterns is None:
 98            input_patterns = [
 99                "*.json", "*.txt", "*.csv", "*.yaml", "*.yml", "*.xml",
100                "input.*", "params.*", "config.*", "*input*", "*param*"
101            ]
102        
103        if output_patterns is None:
104            output_patterns = [
105                "*.csv", "*.json", "*.txt", "*.out", "*.log", "*.dat",
106                "output.*", "result.*", "*.results", "*output*", "*result*"
107            ]
108        
109        discovered = {
110            'runs': [],
111            'structure_type': 'unknown',
112            'input_files': [],
113            'output_files': [],
114            'common_structure': None
115        }
116        
117        logger.info(f"Discovering directory structure in {self.root_dir}")
118        
119        # Search for potential run directories
120        for root, dirs, files in os.walk(self.root_dir):
121            current_depth = len(Path(root).relative_to(self.root_dir).parts)
122            if current_depth > max_depth:
123                continue
124                
125            # Look for input and output files in this directory
126            input_files = []
127            output_files = []
128            
129            for pattern in input_patterns:
130                input_files.extend(glob.glob(os.path.join(root, pattern)))
131            
132            for pattern in output_patterns:
133                output_files.extend(glob.glob(os.path.join(root, pattern)))
134            
135            # If we found both input and output files, this might be a run directory
136            if input_files and output_files:
137                run_info = {
138                    'directory': root,
139                    'input_files': input_files,
140                    'output_files': output_files,
141                    'relative_path': os.path.relpath(root, self.root_dir)
142                }
143                discovered['runs'].append(run_info)
144                discovered['input_files'].extend(input_files)
145                discovered['output_files'].extend(output_files)
146        
147        # Determine structure type
148        if len(discovered['runs']) > 0:
149            discovered['structure_type'] = 'run_directories'
150        elif discovered['input_files'] or discovered['output_files']:
151            discovered['structure_type'] = 'flat_files'
152        
153        logger.info(f"Discovered {len(discovered['runs'])} potential run directories")
154        self.discovered_runs = discovered['runs']
155        
156        return discovered

Automatically discover the directory structure and identify runs.

Parameters
  • input_patterns (List[str], optional): List of glob patterns to match input files (default: common patterns)
  • output_patterns (List[str], optional): List of glob patterns to match output files (default: common patterns)
  • max_depth (int, optional): Maximum depth to search for files (default: 3)
Returns
  • Dict[str, Any]: Dictionary containing discovered structure information
def infer_parameters( self, sample_files: List[str] = None, file_type: str = 'auto') -> Dict[str, Dict[str, Any]]:
158    def infer_parameters(self, sample_files: List[str] = None, 
159                        file_type: str = 'auto') -> Dict[str, Dict[str, Any]]:
160        """
161        Infer parameter definitions from sample input files.
162        
163        Parameters
164        ----------
165        sample_files : List[str], optional
166            List of sample input files to analyze (default: use discovered files)
167        file_type : str, optional
168            Type of input files ('json', 'csv', 'yaml', 'auto') (default: 'auto')
169        
170        Returns
171        -------
172        Dict[str, Dict[str, Any]]
173            Dictionary containing inferred parameter definitions
174        """
175        if sample_files is None:
176            if not self.discovered_runs:
177                raise ValueError("No runs discovered. Run discover_directory_structure first.")
178            sample_files = [run['input_files'][0] for run in self.discovered_runs[:5]]
179        
180        parameters = {}
181        all_params = defaultdict(list)
182        
183        for file_path in sample_files:
184            try:
185                params = self._parse_input_file(file_path, file_type)
186                for key, value in params.items():
187                    all_params[key].append(value)
188            except Exception as e:
189                logger.warning(f"Failed to parse {file_path}: {e}")
190                continue
191        
192        # Infer parameter types and ranges
193        for param_name, values in all_params.items():
194            param_info = self._infer_parameter_info(param_name, values)
195            parameters[param_name] = param_info
196        
197        self.parameters = parameters
198        logger.info(f"Inferred {len(parameters)} parameters: {list(parameters.keys())}")
199        
200        return parameters

Infer parameter definitions from sample input files.

Parameters
  • sample_files (List[str], optional): List of sample input files to analyze (default: use discovered files)
  • file_type (str, optional): Type of input files ('json', 'csv', 'yaml', 'auto') (default: 'auto')
Returns
  • Dict[str, Dict[str, Any]]: Dictionary containing inferred parameter definitions
def infer_output_columns( self, sample_files: List[str] = None, file_type: str = 'auto') -> List[str]:
202    def infer_output_columns(self, sample_files: List[str] = None,
203                           file_type: str = 'auto') -> List[str]:
204        """
205        Infer output column names from sample output files.
206        
207        Parameters
208        ----------
209        sample_files : List[str], optional
210            List of sample output files to analyze (default: use discovered files)
211        file_type : str, optional
212            Type of output files ('json', 'csv', 'yaml', 'auto') (default: 'auto')
213        
214        Returns
215        -------
216        List[str]
217            List of output column names
218        """
219        if sample_files is None:
220            if not self.discovered_runs:
221                raise ValueError("No runs discovered. Run discover_directory_structure first.")
222            sample_files = [run['output_files'][0] for run in self.discovered_runs[:5]]
223        
224        all_columns = set()
225        
226        for file_path in sample_files:
227            try:
228                columns = self._get_output_columns(file_path, file_type)
229                all_columns.update(columns)
230            except Exception as e:
231                logger.warning(f"Failed to parse {file_path}: {e}")
232                continue
233        
234        self.output_columns = list(all_columns)
235        logger.info(f"Inferred {len(self.output_columns)} output columns: {self.output_columns}")
236        
237        return self.output_columns

Infer output column names from sample output files.

Parameters
  • sample_files (List[str], optional): List of sample output files to analyze (default: use discovered files)
  • file_type (str, optional): Type of output files ('json', 'csv', 'yaml', 'auto') (default: 'auto')
Returns
  • List[str]: List of output column names
def create_campaign_from_dataset( self, work_dir: str = './', input_decoder: object | None = None, output_decoder: object | None = None, auto_infer: bool = True):
239    def create_campaign_from_dataset(self, 
240                                   work_dir: str = "./",
241                                   input_decoder: Optional[object] = None,
242                                   output_decoder: Optional[object] = None,
243                                   auto_infer: bool = True):
244        """
245        Create a new EasyVVUQ campaign from the discovered dataset.
246        
247        Parameters
248        ----------
249        work_dir : str, optional
250            Working directory for the campaign (default: "./")
251        input_decoder : object, optional
252            Custom input decoder (default: auto-create based on file types)
253        output_decoder : object, optional
254            Custom output decoder (default: auto-create based on file types)
255        auto_infer : bool, optional
256            Whether to automatically infer parameters and outputs (default: True)
257        
258        Returns
259        -------
260        Campaign
261            The created campaign with imported data
262        """
263        if not self.discovered_runs:
264            raise ValueError("No runs discovered. Run discover_directory_structure first.")
265        
266        if auto_infer:
267            if not self.parameters:
268                self.infer_parameters()
269            if not self.output_columns:
270                self.infer_output_columns()
271        
272        # Create campaign
273        campaign = uq.Campaign(name=self.campaign_name, work_dir=work_dir)
274        
275        # Create basic actions (since we're importing existing data, we don't need real execution)
276        actions = Actions(
277            CreateRunDirectory(work_dir),
278            Encode(uq.encoders.GenericEncoder('', '', target_filename='dummy_input')),
279            ExecuteLocal('echo "Imported data"'),
280            Decode(uq.decoders.SimpleCSV('dummy_output', self.output_columns))
281        )
282        
283        # Add app to campaign
284        campaign.add_app(
285            name=self.campaign_name,
286            params=self.parameters,
287            actions=actions
288        )
289        
290        # Set up decoders
291        if input_decoder is None:
292            input_decoder = self._create_auto_decoder('input')
293        if output_decoder is None:
294            output_decoder = self._create_auto_decoder('output')
295        
296        # Import the runs
297        self._import_runs_to_campaign(campaign, input_decoder, output_decoder)
298        
299        logger.info(f"Created campaign '{self.campaign_name}' with {len(self.discovered_runs)} runs")
300        
301        return campaign

Create a new EasyVVUQ campaign from the discovered dataset.

Parameters
  • work_dir (str, optional): Working directory for the campaign (default: "./")
  • input_decoder (object, optional): Custom input decoder (default: auto-create based on file types)
  • output_decoder (object, optional): Custom output decoder (default: auto-create based on file types)
  • auto_infer (bool, optional): Whether to automatically infer parameters and outputs (default: True)
Returns
  • Campaign: The created campaign with imported data
def create_campaign_from_directory( root_dir: str, campaign_name: str = 'imported_campaign', work_dir: str = './', input_patterns: List[str] = None, output_patterns: List[str] = None, auto_infer: bool = True):
481def create_campaign_from_directory(root_dir: str,
482                                 campaign_name: str = "imported_campaign",
483                                 work_dir: str = "./",
484                                 input_patterns: List[str] = None,
485                                 output_patterns: List[str] = None,
486                                 auto_infer: bool = True):
487    """
488    Convenience function to create a campaign from an existing directory structure.
489    
490    Parameters
491    ----------
492    root_dir : str
493        Root directory containing the existing dataset
494    campaign_name : str, optional
495        Name for the campaign to be created (default: "imported_campaign")
496    work_dir : str, optional
497        Working directory for the campaign (default: "./")
498    input_patterns : List[str], optional
499        List of glob patterns to match input files
500    output_patterns : List[str], optional
501        List of glob patterns to match output files
502    auto_infer : bool, optional
503        Whether to automatically infer parameters and outputs (default: True)
504    
505    Returns
506    -------
507    Campaign
508        The created campaign with imported data
509    
510    Examples
511    --------
512    >>> campaign = create_campaign_from_directory(
513    ...     root_dir="/path/to/simulation/runs",
514    ...     campaign_name="my_imported_campaign"
515    ... )
516    """
517    importer = DatasetImporter(root_dir, campaign_name)
518    importer.discover_directory_structure(input_patterns, output_patterns)
519    return importer.create_campaign_from_dataset(work_dir=work_dir, auto_infer=auto_infer)

Convenience function to create a campaign from an existing directory structure.

Parameters
  • root_dir (str): Root directory containing the existing dataset
  • campaign_name (str, optional): Name for the campaign to be created (default: "imported_campaign")
  • work_dir (str, optional): Working directory for the campaign (default: "./")
  • input_patterns (List[str], optional): List of glob patterns to match input files
  • output_patterns (List[str], optional): List of glob patterns to match output files
  • auto_infer (bool, optional): Whether to automatically infer parameters and outputs (default: True)
Returns
  • Campaign: The created campaign with imported data
Examples
>>> campaign = create_campaign_from_directory(
...     root_dir="/path/to/simulation/runs",
...     campaign_name="my_imported_campaign"
... )
def create_campaign_from_files( input_files: List[str], output_files: List[str], campaign_name: str = 'imported_campaign', work_dir: str = './', input_decoder: object | None = None, output_decoder: object | None = None, auto_infer: bool = True):
522def create_campaign_from_files(input_files: List[str],
523                             output_files: List[str],
524                             campaign_name: str = "imported_campaign",
525                             work_dir: str = "./",
526                             input_decoder: Optional[object] = None,
527                             output_decoder: Optional[object] = None,
528                             auto_infer: bool = True):
529    """
530    Create a campaign from explicit lists of input and output files.
531    
532    Parameters
533    ----------
534    input_files : List[str]
535        List of input file paths
536    output_files : List[str]
537        List of output file paths
538    campaign_name : str, optional
539        Name for the campaign to be created (default: "imported_campaign")
540    work_dir : str, optional
541        Working directory for the campaign (default: "./")
542    input_decoder : object, optional
543        Custom input decoder (default: auto-create)
544    output_decoder : object, optional
545        Custom output decoder (default: auto-create)
546    auto_infer : bool, optional
547        Whether to automatically infer parameters and outputs (default: True)
548    
549    Returns
550    -------
551    Campaign
552        The created campaign with imported data
553    
554    Examples
555    --------
556    >>> campaign = create_campaign_from_files(
557    ...     input_files=["run1/input.json", "run2/input.json"],
558    ...     output_files=["run1/output.csv", "run2/output.csv"],
559    ...     campaign_name="my_campaign"
560    ... )
561    """
562    if len(input_files) != len(output_files):
563        raise ValueError("Number of input files must match number of output files")
564    
565    # Create a temporary directory structure for the importer
566    temp_dir = Path(work_dir) / "temp_import"
567    temp_dir.mkdir(exist_ok=True)
568    
569    # Create fake run directories
570    discovered_runs = []
571    for i, (input_file, output_file) in enumerate(zip(input_files, output_files)):
572        run_dir = temp_dir / f"run_{i}"
573        run_dir.mkdir(exist_ok=True)
574        discovered_runs.append({
575            'directory': str(run_dir),
576            'input_files': [input_file],
577            'output_files': [output_file],
578            'relative_path': f"run_{i}"
579        })
580    
581    # Create importer and set up discovered runs
582    importer = DatasetImporter(str(temp_dir), campaign_name)
583    importer.discovered_runs = discovered_runs
584    
585    if auto_infer:
586        importer.infer_parameters(input_files)
587        importer.infer_output_columns(output_files)
588    
589    return importer.create_campaign_from_dataset(
590        work_dir=work_dir,
591        input_decoder=input_decoder,
592        output_decoder=output_decoder,
593        auto_infer=False  # Already done above
594    )

Create a campaign from explicit lists of input and output files.

Parameters
  • input_files (List[str]): List of input file paths
  • output_files (List[str]): List of output file paths
  • campaign_name (str, optional): Name for the campaign to be created (default: "imported_campaign")
  • work_dir (str, optional): Working directory for the campaign (default: "./")
  • input_decoder (object, optional): Custom input decoder (default: auto-create)
  • output_decoder (object, optional): Custom output decoder (default: auto-create)
  • auto_infer (bool, optional): Whether to automatically infer parameters and outputs (default: True)
Returns
  • Campaign: The created campaign with imported data
Examples
>>> campaign = create_campaign_from_files(
...     input_files=["run1/input.json", "run2/input.json"],
...     output_files=["run1/output.csv", "run2/output.csv"],
...     campaign_name="my_campaign"
... )