easyvvuq.campaign
EasyVVUQ Campaign
This module contains the Campaign class that is used to coordinate all EasyVVUQ workflows.
1"""EasyVVUQ Campaign 2 3This module contains the Campaign class that is used to coordinate all 4EasyVVUQ workflows. 5""" 6import os 7import json 8import logging 9import tempfile 10import easyvvuq 11from concurrent.futures import ProcessPoolExecutor 12from easyvvuq.constants import default_campaign_prefix, Status 13from easyvvuq.data_structs import RunInfo, CampaignInfo, AppInfo 14from easyvvuq.sampling import BaseSamplingElement 15from easyvvuq.actions import ActionPool 16import easyvvuq.db.sql as db 17 18__copyright__ = """ 19 20 Copyright 2018 Robin A. Richardson, David W. Wright 21 22 This file is part of EasyVVUQ 23 24 EasyVVUQ is free software: you can redistribute it and/or modify 25 it under the terms of the Lesser GNU General Public License as published by 26 the Free Software Foundation, either version 3 of the License, or 27 (at your option) any later version. 28 29 EasyVVUQ is distributed in the hope that it will be useful, 30 but WITHOUT ANY WARRANTY; without even the implied warranty of 31 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 32 Lesser GNU General Public License for more details. 33 34 You should have received a copy of the Lesser GNU General Public License 35 along with this program. If not, see <https://www.gnu.org/licenses/>. 36 37""" 38__license__ = "LGPL" 39 40 41logger = logging.getLogger(__name__) 42 43 44class Campaign: 45 """Campaigns organise the dataflow in EasyVVUQ workflows. 46 47 The Campaign functions as as state machine for the VVUQ workflows. It uses a 48 database (CampaignDB) to store information on both the target application 49 and the VVUQ algorithms being employed. It also collects data from the simulations 50 and can be used to store and resume your state. 51 52 Notes 53 ----- 54 55 Multiple campaigns can be combined in a CampaignDB. Hence the particular 56 campaign we are currently working on will be specified using `campaign_id`. 57 58 Parameters 59 ---------- 60 name: str 61 Name of the Campaign. Freely chosen, serves as a human-readable way of distinguishing 62 between several campaigns in the same database. 63 params: dict, optional 64 Description of the parameters to associated with the application. Will be used to create 65 an app when creating the campaign. It is also possible to add apps manually using `add_app` 66 method of the Campaign class. But this can be a useful shorthand when working with single 67 app campaigns. To use this functionality both `params` and `actions` has to be specified. 68 The name of this app will be the same as the name of the Campaign. 69 actions: Actions, optional 70 Actions object associated with an application. See description of the `params` parameter 71 for more details. 72 db_location: str, optional 73 Location of the underlying campaign database - either a path or 74 acceptable URI for SQLAlchemy. 75 work_dir: str, optional, default='./' 76 Path to working directory - used to store campaign directory. 77 change_to_state: bool, optional, default=False 78 Should we change to the directory containing any specified `state_file` 79 in order to make relative paths work. 80 verify_all_runs: bool, optional, default=True 81 Check all new runs being added for unrecognised params (not defined for the currently set 82 app), values lying within defined physical range, type checking etc. This should normally 83 always be set to True, but in cases where the performance is too degraded, the checks can 84 be disabled by setting to False. 85 86 Attributes 87 ---------- 88 campaign_name : str or None 89 Name for the campaign/workflow. 90 _campaign_dir: str or None 91 Path to the directory campaign uses for local storage (runs inputs etc) 92 db_location : str or None 93 Location of the underlying campaign database - either a path or 94 acceptable URI for SQLAlchemy. 95 _log: list 96 The log of all elements that have been applied, with information about 97 their application 98 campaign_id : int 99 ID number for the current campaign in the db.CampaignDB. 100 campaign_db: easyvvuq.db.Basedb.CampaignDB 101 A campaign database object 102 last_analysis: 103 The result of the most recent analysis carried out on this campaign 104 _active_app: dict 105 Info about currently set app 106 _active_app_name: str 107 Name of currently set app 108 _active_sampler_id: int 109 The database id of the currently set Sampler object 110 111 Examples 112 -------- 113 A typical instantiation might look like this. 114 115 >>> params = { 116 "S0": {"type": "float", "default": 997}, 117 "I0": {"type": "float", "default": 3}, 118 "beta": {"type": "float", "default": 0.2}, 119 "gamma": {"type": "float", "default": 0.04, "min": 0.0, "max": 1.0}, 120 "iterations": {"type": "integer", "default": 100}, 121 "outfile": {"type": "string", "default": "output.csv"} 122 } 123 >>> encoder = uq.encoders.GenericEncoder(template_fname='sir.template', delimiter='$', target_filename='input.json') 124 >>> decoder = uq.decoders.SimpleCSV(target_filename='output.csv', output_columns=['I']) 125 >>> actions = uq.actions.local_execute(encoder, os.path.abspath('sir') + ' input.json', decoder) 126 >>> campaign = uq.Campaign(name='sir', params=params, actions=actions) 127 128 A simplified one (without an app) might look simply like this. 129 130 >>> campaign = Campaign('simple') 131 132 An app then can be added. 133 134 >>> campaign.add_app('simple_app', params=params, actions=actions) 135 """ 136 137 @staticmethod 138 def from_existing_data(name, 139 input_files, 140 output_files, 141 input_decoder=None, 142 output_decoder=None, 143 params=None, 144 output_columns=None, 145 work_dir="./", 146 auto_infer=True): 147 """ 148 Create a campaign from existing data files. 149 150 Parameters 151 ---------- 152 name : str 153 Name of the campaign 154 input_files : list of str 155 List of input file paths 156 output_files : list of str 157 List of output file paths 158 input_decoder : Decoder, optional 159 Decoder for input files (auto-created if None) 160 output_decoder : Decoder, optional 161 Decoder for output files (auto-created if None) 162 params : dict, optional 163 Parameter definitions (auto-inferred if None and auto_infer=True) 164 output_columns : list of str, optional 165 Output column names (auto-inferred if None and auto_infer=True) 166 work_dir : str, optional 167 Working directory (default: "./") 168 auto_infer : bool, optional 169 Whether to automatically infer parameters and outputs (default: True) 170 171 Returns 172 ------- 173 Campaign 174 A new campaign with the imported data 175 176 Examples 177 -------- 178 >>> campaign = Campaign.from_existing_data( 179 ... name="imported_sim", 180 ... input_files=["run1/input.json", "run2/input.json"], 181 ... output_files=["run1/output.csv", "run2/output.csv"] 182 ... ) 183 """ 184 from easyvvuq.utils.dataset_importer import create_campaign_from_files 185 186 return create_campaign_from_files( 187 input_files=input_files, 188 output_files=output_files, 189 campaign_name=name, 190 work_dir=work_dir, 191 input_decoder=input_decoder, 192 output_decoder=output_decoder, 193 auto_infer=auto_infer 194 ) 195 196 def __init__( 197 self, 198 name, 199 params=None, 200 actions=None, 201 db_location=None, 202 work_dir="./", 203 change_to_state=False, 204 verify_all_runs=True 205 ): 206 207 self.work_dir = os.path.realpath(os.path.expanduser(work_dir)) 208 self.verify_all_runs = verify_all_runs 209 210 self.campaign_name = name 211 self._campaign_dir = None 212 213 if db_location is None: 214 self._campaign_dir = tempfile.mkdtemp(prefix=name, dir=self.work_dir) 215 self.db_location = "sqlite:///" + self._campaign_dir + "/campaign.db" 216 else: 217 self.db_location = db_location 218 219 self.campaign_id = None 220 self.campaign_db = None 221 222 self.last_analysis = None 223 224 self._active_app = None 225 self._active_app_name = None 226 self._active_app_actions = None 227 228 self._active_sampler = None 229 self._active_sampler_id = None 230 231 self.init_db(name, self.work_dir) 232 self._state_dir = None 233 234 # here we assume that the user wants to add an app 235 if (params is not None) and (actions is not None): 236 self.add_app(name=name, params=params, actions=actions) 237 238 @property 239 def campaign_dir(self): 240 """Get the path in which to load/save files related to the campaign. 241 242 Returns 243 ------- 244 str 245 Path to the campaign directory - given as a subdirectory of the 246 working directory. 247 """ 248 249 return os.path.join(self.work_dir, self._campaign_dir) 250 251 def init_db(self, name, work_dir='.'): 252 """Initialize the connection with the database and either resume or create the campaign. 253 254 Parameters 255 ---------- 256 name: str 257 Name of the campaign. 258 work_dir: str 259 Work directory, defaults to cwd. 260 """ 261 self.campaign_db = db.CampaignDB(location=self.db_location) 262 if self.campaign_db.campaign_exists(name): 263 self.campaign_id = self.campaign_db.get_campaign_id(name) 264 self._active_app_name = self.campaign_db.get_active_app()[0].name 265 self.campaign_name = name 266 self._campaign_dir = self.campaign_db.campaign_dir(name) 267 if not os.path.exists(self._campaign_dir): 268 message = (f"Campaign directory ({self.campaign_dir}) does not exist.") 269 raise RuntimeError(message) 270 self._active_sampler_id = self.campaign_db.get_sampler_id(self.campaign_id) 271 self._active_sampler = self.campaign_db.resurrect_sampler(self._active_sampler_id) 272 self.set_app(self._active_app_name) 273 self.campaign_db.resume_campaign(name) 274 else: 275 if self._campaign_dir is None: 276 self._campaign_dir = tempfile.mkdtemp(prefix=name, dir=work_dir) 277 info = CampaignInfo( 278 name=name, 279 campaign_dir_prefix=default_campaign_prefix, 280 easyvvuq_version=easyvvuq.__version__, 281 campaign_dir=self._campaign_dir) 282 self.campaign_db.create_campaign(info) 283 self.campaign_name = name 284 self.campaign_id = self.campaign_db.get_campaign_id(self.campaign_name) 285 286 def add_app(self, name=None, params=None, actions=None, set_active=True): 287 """Add an application to the CampaignDB. 288 289 Parameters 290 ---------- 291 name : str 292 Name of the application. 293 params : dict 294 Description of the parameters to associate with the application. 295 actions : Actions 296 An instance of Actions containing actions to be executed 297 set_active: bool 298 Should the added app be set to be the currently active app? 299 """ 300 # Verify input parameters dict 301 paramsspec = easyvvuq.ParamsSpecification(params, appname=name) 302 # validate application input 303 app = AppInfo( 304 name=name, 305 paramsspec=paramsspec, 306 actions=actions, 307 ) 308 self.campaign_db.add_app(app) 309 if set_active: 310 self.set_app(app.name) 311 312 def set_app(self, app_name): 313 """Set active app for the campaign. 314 315 Application information is retrieved from `self.campaign_db`. 316 317 Parameters 318 ---------- 319 app_name: str 320 Name of selected app, if `None` given then first app will be 321 selected. 322 """ 323 self._active_app_name = app_name 324 self._active_app = self.campaign_db.app(name=app_name) 325 self.campaign_db.set_active_app(app_name) 326 # Resurrect the app encoder, decoder and collation elements 327 self._active_app_actions = self.campaign_db.resurrect_app(app_name) 328 329 def replace_actions(self, app_name, actions): 330 """Replace actions for an app with a given name. 331 332 Parameters 333 ---------- 334 app_name: str 335 Name of the app. 336 actions: Actions 337 `Actions` instance, will replace the current `Actions` of an app. 338 """ 339 self.campaign_db.replace_actions(app_name, actions) 340 self._active_app_actions = actions 341 342 def set_sampler(self, sampler, update=False): 343 """Set active sampler. 344 345 Parameters 346 ---------- 347 sampler : Sampler 348 Sampler that will be used to create runs for the current campaign. 349 update : bool 350 If set to True it will not add the sampler to the database, just change 351 it as the active sampler. 352 """ 353 self._active_sampler = sampler 354 if not update: 355 self._active_sampler_id = self.campaign_db.add_sampler(sampler) 356 sampler.sampler_id = self._active_sampler_id 357 self._active_sampler_id = self._active_sampler.sampler_id 358 self.campaign_db.set_sampler(self.campaign_id, self._active_sampler.sampler_id) 359 360 def add_external_runs(self, input_files, output_files, input_decoder, output_decoder, 361 validate_params=True, run_prefix="external_run"): 362 """Takes a list of files and adds them to the database. This method is to be 363 used when adding runs to the EasyVVUQ database that were not executed using 364 EasyVVUQ. 365 366 Parameters 367 ---------- 368 input_files: list of str 369 A list of input file paths to be loaded to the database. 370 output_files: list of str 371 A list of output file paths to be loaded to the database. 372 input_decoder: Decoder 373 A decoder that will be used to parse input files. 374 output_decoder: Decoder 375 A decoder that will be used to parse output files. 376 validate_params: bool, optional 377 Whether to validate parameters against the app definition (default: True) 378 run_prefix: str, optional 379 Prefix for run names (default: "external_run") 380 """ 381 if self._active_app is None: 382 msg = ("No app is currently set for this campaign. " 383 "Use set_app('name_of_app') or add_app() first.") 384 logging.error(msg) 385 raise Exception(msg) 386 387 if len(input_files) != len(output_files): 388 raise ValueError("Number of input files must match number of output files") 389 390 inputs = [] 391 outputs = [] 392 failed_runs = [] 393 394 # Parse input files 395 for i, input_file in enumerate(input_files): 396 try: 397 input_decoder.target_filename = os.path.basename(input_file) 398 params = input_decoder.parse_sim_output({'run_dir': os.path.dirname(input_file)}) 399 400 # Validate parameters if requested 401 if validate_params: 402 try: 403 app_default_params = self._active_app["params"] 404 validated_params = app_default_params.process_run(params, verify=self.verify_all_runs) 405 inputs.append(validated_params) 406 except Exception as e: 407 logging.warning(f"Parameter validation failed for {input_file}: {e}") 408 failed_runs.append(i) 409 continue 410 else: 411 inputs.append(params) 412 413 except Exception as e: 414 logging.error(f"Failed to parse input file {input_file}: {e}") 415 failed_runs.append(i) 416 continue 417 418 # Parse output files 419 for i, output_file in enumerate(output_files): 420 if i in failed_runs: 421 continue 422 423 try: 424 output_decoder.target_filename = os.path.basename(output_file) 425 result = output_decoder.parse_sim_output({'run_dir': os.path.dirname(output_file)}) 426 outputs.append(result) 427 except Exception as e: 428 logging.error(f"Failed to parse output file {output_file}: {e}") 429 failed_runs.append(i) 430 continue 431 432 # Add runs to database 433 run_counter = 0 434 for i, (params, result) in enumerate(zip(inputs, outputs)): 435 if i in failed_runs: 436 continue 437 438 run_counter += 1 439 table = db.RunTable(run_name=f'{run_prefix}_{run_counter}', 440 app=self._active_app['id'], 441 params=json.dumps(params), 442 status=Status.COLLATED, 443 run_dir=self.get_campaign_runs_dir(), 444 result=json.dumps(result), 445 campaign=self.campaign_id, 446 sampler=self._active_sampler_id) 447 self.campaign_db.session.add(table) 448 449 # Commit all changes at once 450 self.campaign_db.session.commit() 451 452 logging.info(f"Successfully imported {run_counter} runs") 453 if failed_runs: 454 logging.warning(f"Failed to import {len(failed_runs)} runs due to parsing or validation errors") 455 456 def add_runs(self, runs, mark_invalid=False): 457 """Add runs to the database. 458 459 Parameters 460 ---------- 461 runs : list of dicts 462 Each dict defines the value of each model parameter listed in 463 self.params_info for a run to be added to self.runs 464 mark_invalid : bool 465 Will mark runs that fail verification as invalid (but will not raise an exception) 466 """ 467 if self._active_app is None: 468 msg = ("No app is currently set for this campaign. " 469 "Use set_app('name_of_app').") 470 logging.error(msg) 471 raise Exception(msg) 472 app_default_params = self._active_app["params"] 473 run_info_list = [] 474 for new_run in runs: 475 if new_run is None: 476 msg = ("add_run() was passed new_run of type None. Bad sampler?") 477 logging.error(msg) 478 raise Exception(msg) 479 # Verify and complete run with missing/default param values 480 status = Status.NEW 481 try: 482 new_run = app_default_params.process_run(new_run, verify=self.verify_all_runs) 483 except RuntimeError: 484 if mark_invalid: 485 new_run = app_default_params.process_run(new_run, verify=False) 486 status = Status.INVALID 487 else: 488 raise 489 # Add to run queue 490 run_info = RunInfo(app=self._active_app['id'], 491 params=new_run, 492 sample=self._active_sampler_id, 493 campaign=self.campaign_id, 494 status=status) 495 run_info_list.append(run_info) 496 self.campaign_db.add_runs(run_info_list, iteration=self._active_sampler.iteration) 497 498 def draw_samples(self, num_samples=0, mark_invalid=False): 499 """Draws `num_samples` sets of parameters from the currently set 500 sampler, resulting in `num_samples` new runs added to the 501 runs list. If `num_samples` is 0 (its default value) then 502 this method draws ALL samples from the sampler, until exhaustion (this 503 will fail if the sampler is not finite). 504 505 Parameters 506 ---------- 507 num_samples : int 508 Number of samples to draw from the active sampling element. 509 By default is 0 (draw ALL samples) 510 mark_invalid : bool 511 If True will mark runs that go outside valid parameter range as INVALID. 512 This is useful for MCMC style methods where you want those runs to evaluate 513 to low probabilities. 514 """ 515 # Make sure `num_samples` is not 0 for an infinite generator 516 # (this would add runs forever...) 517 if not self._active_sampler.is_finite() and num_samples <= 0: 518 msg = (f"Sampling_element '{self._active_sampler.element_name()}' " 519 f"is an infinite generator, therefore a finite number of " 520 f"draws (n > 0) must be specified.") 521 raise RuntimeError(msg) 522 num_added = 0 523 new_runs = [] 524 for new_run in self._active_sampler: 525 new_runs.append(new_run) 526 num_added += 1 527 if num_samples != 0 and num_added >= num_samples: 528 break 529 self.add_runs(new_runs, mark_invalid) 530 # Write sampler's new state to database 531 self.campaign_db.update_sampler(self._active_sampler_id, self._active_sampler) 532 return new_runs 533 534 def list_runs(self, sampler=None, campaign=None, app_id=None, status=None): 535 """Get list of runs in the CampaignDB. 536 537 Parameters 538 ---------- 539 sampler: int 540 Sampler id to filter for. 541 campaign: int 542 Campaign id to filter for. 543 app_id: int 544 App id to filter for. 545 status: Status 546 Status to filter for. 547 548 Returns 549 ------- 550 list of runs 551 """ 552 return list(self.campaign_db.runs( 553 sampler=sampler, campaign=campaign, app_id=app_id, status=status)) 554 555 def get_campaign_runs_dir(self): 556 """Get the runs directory from the CampaignDB. 557 558 Returns 559 ------- 560 str 561 Path in which the runs information will be written. 562 """ 563 return self.campaign_db.runs_dir(self.campaign_name) 564 565 def relocate(self, campaign_dir): 566 """Relocate the campaign by specifying a new path where campaign is located. 567 568 Parameters 569 ---------- 570 new_path: str 571 new runs directory 572 """ 573 if not os.path.exists(campaign_dir): 574 raise RuntimeError("specified directory does not exist: {}".format(campaign_dir)) 575 self.campaign_db.relocate(campaign_dir, self.campaign_name) 576 577 def execute(self, nsamples=0, pool=None, mark_invalid=False, sequential=False): 578 """This will draw samples and execute the Actions on those samples. 579 580 Parameters 581 ---------- 582 nsamples: int 583 Number of samples to draw. For infinite samplers or when you want to process 584 samples in batches. 585 pool: Executor 586 A pool object to be used when processing runs (e.g. instance of `ThreadPoolExecutor` or 587 `ProcessPoolExecutor`). 588 mark_invalid: bool 589 Mark runs that go outside the specified input parameter range as INVALID. 590 sequential: bool 591 Whether to process samples sequentially (sometimes more efficient or you might 592 want to avoid the concurrent module for some reason). 593 """ 594 self.draw_samples(nsamples, mark_invalid=mark_invalid) 595 action_pool = self.apply_for_each_sample( 596 self._active_app_actions, sequential=sequential) 597 return action_pool.start(pool=pool) 598 599 def apply_for_each_sample(self, actions, status=Status.NEW, sequential=False): 600 """For each run in this Campaign's run list, apply the specified action 601 (an object of type Action). 602 603 Parameters 604 ---------- 605 actions: Actions 606 Actions to be applied to each relevant run in the database. 607 status: Status 608 Will apply the Actions only to those runs whose status is as specified. 609 sequential: bool 610 Whether to process samples sequentially (sometimes more efficient or you might 611 want to avoid the concurrent module for some reason). 612 613 Returns 614 ------- 615 ActionPool 616 An object containing ActionStatus instances to track action execution. 617 """ 618 # Loop through all runs in this campaign with status ENCODED, and 619 # run the specified action on each run's dir 620 def inits(): 621 for run_id, run_data in self.campaign_db.runs( 622 status=status, app_id=self._active_app['id']): 623 previous = {} 624 previous['run_id'] = run_id 625 previous['campaign_dir'] = self._campaign_dir 626 previous['rundir'] = run_data['run_dir'] 627 previous['run_info'] = run_data 628 previous['result'] = {} 629 previous['collated'] = False 630 yield previous 631 return ActionPool(self, actions, inits=inits(), sequential=sequential) 632 633 def iterate(self, nsamples=0, pool=None, mark_invalid=False, sequential=False): 634 """This is the equivalent of `execute` for methods that rely on the output of the 635 previous sampling stage (designed for MCMC, should work for others). 636 637 Parameters 638 ---------- 639 nsamples : int 640 Number of samples to draw (during a single iteration). 641 pool : Executor 642 An Executor instance. For example ThreadPoolExecutor or a Dask Client. Defaults 643 to the ThreadPoolExecutor. 644 mark_invalid : bool 645 Mark runs that go outside the specified input parameter range as INVALID. 646 sequential: bool 647 Will execute the `Actions` associated with runs sequentially. Might be more 648 efficient in some situations. 649 650 Yields 651 ------ 652 ActionPool 653 An object containing Futures instances to track action execution. 654 """ 655 while True: 656 self.draw_samples(nsamples, mark_invalid=mark_invalid) 657 action_pool = self.apply_for_each_sample( 658 self._active_app_actions, sequential=sequential) 659 yield action_pool.start(pool=pool) 660 result = self.get_collation_result(last_iteration=True) 661 invalid = self.get_invalid_runs(last_iteration=True) 662 ignored_runs = self._active_sampler.update(result, invalid) 663 for run_id in ignored_runs: 664 self.campaign_db.session.query(db.RunTable).\ 665 filter(db.RunTable.id == int(run_id)).\ 666 update({'status': easyvvuq.constants.Status.IGNORED}) 667 self.campaign_db.session.commit() 668 669 def recollate(self): 670 """Clears the current collation table, changes all COLLATED status runs 671 back to ENCODED, then runs collate() again 672 """ 673 collated_run_ids = list(self.campaign_db.run_ids(status=Status.COLLATED)) 674 self.campaign_db.set_run_statuses(collated_run_ids, Status.ENCODED) 675 self.collate() 676 677 def get_collation_result(self, last_iteration=False): 678 """Return dataframe containing all collated results 679 680 Parameters 681 ---------- 682 last_iteration : bool 683 Will only return the result of the last iteration. 684 685 Returns 686 ------- 687 DataFrame 688 A DataFrame with the simulation results along with the inputs 689 used to produce them. 690 """ 691 if last_iteration: 692 iteration = self._active_sampler.iteration - 1 693 else: 694 iteration = -1 695 return self.campaign_db.get_results( 696 self._active_app['name'], 697 self._active_sampler_id, 698 status=easyvvuq.constants.Status.COLLATED, 699 iteration=iteration) 700 701 def get_invalid_runs(self, last_iteration=False): 702 """Return dataframe containing all results marked as INVALID. 703 704 Parameters 705 ---------- 706 last_iteration : bool 707 Will only return the result of the last iteration. 708 709 Returns 710 ------- 711 DataFrame 712 A DataFrame with the results form simulations that were marked as INVALID. 713 These will usually be the ones that went outside the specified parameter ranges. 714 These still have to be accounted for in some way by some methods (e.g. MCMC). 715 """ 716 if last_iteration: 717 iteration = self._active_sampler.iteration - 1 718 else: 719 iteration = -1 720 return self.campaign_db.get_results( 721 self._active_app['name'], 722 self._active_sampler_id, 723 status=easyvvuq.constants.Status.INVALID, 724 iteration=iteration) 725 726 def apply_analysis(self, analysis): 727 """Run the `analysis` element on the output of the last run collation. 728 729 Parameters 730 ---------- 731 analysis : Analysis 732 Element that performs a VVUQ analysis on a dataframe summary of 733 run outputs. 734 """ 735 # Apply analysis element to most recent collation result 736 self.last_analysis = analysis.analyse(data_frame=self.get_collation_result()) 737 738 def analyse(self, **kwargs): 739 """If available will call an appropriate analysis class on the collation result. 740 741 Parameters 742 ---------- 743 **kwargs : dict 744 Argument to the analysis class constructor (after sampler). 745 746 Returns 747 ------- 748 AnalysisResults 749 An object representing analysis results. Can be used to interact with those results 750 in some way. Plot, retrieve surrogate models and so on. 751 See `easyvvuq.analysis.AnalysisResults` for further information. 752 """ 753 collation_result = self.get_collation_result() 754 try: 755 analysis = self._active_sampler.analysis_class(sampler=self._active_sampler, **kwargs) 756 return analysis.analyse(collation_result) 757 except NotImplementedError: 758 raise RuntimeError("This sampler does not have a corresponding analysis class") 759 760 def get_last_analysis(self): 761 """Return the output of the most recently run analysis element. 762 """ 763 if self.last_analysis is None: 764 logging.warning("No last analysis output available.") 765 return self.last_analysis 766 767 def __str__(self): 768 """Returns formatted summary of the current Campaign state. 769 Enables class to work with standard print() method 770 """ 771 return (f"db_location = {self.db_location}\n" 772 f"active_sampler_id = {self._active_sampler_id}\n" 773 f"campaign_name = {self.campaign_name}\n" 774 f"campaign_dir = {self.campaign_dir}\n" 775 f"campaign_id = {self.campaign_id}\n") 776 777 def get_active_sampler(self): 778 """Return the active sampler element in use by this campaign. 779 780 Returns 781 ------- 782 The sampler currently in use 783 """ 784 785 return self._active_sampler 786 787 def ignore_runs(self, list_of_run_IDs): 788 """Flags the specified runs to be IGNORED in future collation. Note that 789 this does NOT remove previously collated results from the collation table. 790 For that you must refresh the collation by running recollate(). 791 792 Parameters 793 ---------- 794 list 795 The list of run IDs for the runs that should be set to status IGNORED 796 """ 797 self.campaign_db.set_run_statuses(list_of_run_IDs, Status.IGNORED) 798 799 def rerun(self, list_of_run_IDs): 800 """Sets the status of the specified runs to ENCODED, so that their results 801 may be recollated later (presumably after extending, rerunning or otherwise 802 modifying the data in the relevant run folder). Note that this method will 803 NOT perform any execution - it simply flags the run in EasyVVUQ as being 804 uncollated. Actual execution is (as usual) the job of the user or middleware. 805 806 Parameters 807 ---------- 808 list 809 The list of run IDs for the runs that should be set to status ENCODED 810 """ 811 812 for run_ID in list_of_run_IDs: 813 status = self.campaign_db.get_run_status(run_ID) 814 if status == Status.NEW: 815 msg = (f"Cannot rerun {run_ID} as it has status NEW, and must" 816 f"be encoded before execution.") 817 raise RuntimeError(msg) 818 self.campaign_db.set_run_statuses(list_of_run_IDs, Status.ENCODED) 819 820 def get_active_app(self): 821 """Returns a dict of information regarding the application that is currently 822 set for this campaign. 823 """ 824 return self._active_app
45class Campaign: 46 """Campaigns organise the dataflow in EasyVVUQ workflows. 47 48 The Campaign functions as as state machine for the VVUQ workflows. It uses a 49 database (CampaignDB) to store information on both the target application 50 and the VVUQ algorithms being employed. It also collects data from the simulations 51 and can be used to store and resume your state. 52 53 Notes 54 ----- 55 56 Multiple campaigns can be combined in a CampaignDB. Hence the particular 57 campaign we are currently working on will be specified using `campaign_id`. 58 59 Parameters 60 ---------- 61 name: str 62 Name of the Campaign. Freely chosen, serves as a human-readable way of distinguishing 63 between several campaigns in the same database. 64 params: dict, optional 65 Description of the parameters to associated with the application. Will be used to create 66 an app when creating the campaign. It is also possible to add apps manually using `add_app` 67 method of the Campaign class. But this can be a useful shorthand when working with single 68 app campaigns. To use this functionality both `params` and `actions` has to be specified. 69 The name of this app will be the same as the name of the Campaign. 70 actions: Actions, optional 71 Actions object associated with an application. See description of the `params` parameter 72 for more details. 73 db_location: str, optional 74 Location of the underlying campaign database - either a path or 75 acceptable URI for SQLAlchemy. 76 work_dir: str, optional, default='./' 77 Path to working directory - used to store campaign directory. 78 change_to_state: bool, optional, default=False 79 Should we change to the directory containing any specified `state_file` 80 in order to make relative paths work. 81 verify_all_runs: bool, optional, default=True 82 Check all new runs being added for unrecognised params (not defined for the currently set 83 app), values lying within defined physical range, type checking etc. This should normally 84 always be set to True, but in cases where the performance is too degraded, the checks can 85 be disabled by setting to False. 86 87 Attributes 88 ---------- 89 campaign_name : str or None 90 Name for the campaign/workflow. 91 _campaign_dir: str or None 92 Path to the directory campaign uses for local storage (runs inputs etc) 93 db_location : str or None 94 Location of the underlying campaign database - either a path or 95 acceptable URI for SQLAlchemy. 96 _log: list 97 The log of all elements that have been applied, with information about 98 their application 99 campaign_id : int 100 ID number for the current campaign in the db.CampaignDB. 101 campaign_db: easyvvuq.db.Basedb.CampaignDB 102 A campaign database object 103 last_analysis: 104 The result of the most recent analysis carried out on this campaign 105 _active_app: dict 106 Info about currently set app 107 _active_app_name: str 108 Name of currently set app 109 _active_sampler_id: int 110 The database id of the currently set Sampler object 111 112 Examples 113 -------- 114 A typical instantiation might look like this. 115 116 >>> params = { 117 "S0": {"type": "float", "default": 997}, 118 "I0": {"type": "float", "default": 3}, 119 "beta": {"type": "float", "default": 0.2}, 120 "gamma": {"type": "float", "default": 0.04, "min": 0.0, "max": 1.0}, 121 "iterations": {"type": "integer", "default": 100}, 122 "outfile": {"type": "string", "default": "output.csv"} 123 } 124 >>> encoder = uq.encoders.GenericEncoder(template_fname='sir.template', delimiter='$', target_filename='input.json') 125 >>> decoder = uq.decoders.SimpleCSV(target_filename='output.csv', output_columns=['I']) 126 >>> actions = uq.actions.local_execute(encoder, os.path.abspath('sir') + ' input.json', decoder) 127 >>> campaign = uq.Campaign(name='sir', params=params, actions=actions) 128 129 A simplified one (without an app) might look simply like this. 130 131 >>> campaign = Campaign('simple') 132 133 An app then can be added. 134 135 >>> campaign.add_app('simple_app', params=params, actions=actions) 136 """ 137 138 @staticmethod 139 def from_existing_data(name, 140 input_files, 141 output_files, 142 input_decoder=None, 143 output_decoder=None, 144 params=None, 145 output_columns=None, 146 work_dir="./", 147 auto_infer=True): 148 """ 149 Create a campaign from existing data files. 150 151 Parameters 152 ---------- 153 name : str 154 Name of the campaign 155 input_files : list of str 156 List of input file paths 157 output_files : list of str 158 List of output file paths 159 input_decoder : Decoder, optional 160 Decoder for input files (auto-created if None) 161 output_decoder : Decoder, optional 162 Decoder for output files (auto-created if None) 163 params : dict, optional 164 Parameter definitions (auto-inferred if None and auto_infer=True) 165 output_columns : list of str, optional 166 Output column names (auto-inferred if None and auto_infer=True) 167 work_dir : str, optional 168 Working directory (default: "./") 169 auto_infer : bool, optional 170 Whether to automatically infer parameters and outputs (default: True) 171 172 Returns 173 ------- 174 Campaign 175 A new campaign with the imported data 176 177 Examples 178 -------- 179 >>> campaign = Campaign.from_existing_data( 180 ... name="imported_sim", 181 ... input_files=["run1/input.json", "run2/input.json"], 182 ... output_files=["run1/output.csv", "run2/output.csv"] 183 ... ) 184 """ 185 from easyvvuq.utils.dataset_importer import create_campaign_from_files 186 187 return create_campaign_from_files( 188 input_files=input_files, 189 output_files=output_files, 190 campaign_name=name, 191 work_dir=work_dir, 192 input_decoder=input_decoder, 193 output_decoder=output_decoder, 194 auto_infer=auto_infer 195 ) 196 197 def __init__( 198 self, 199 name, 200 params=None, 201 actions=None, 202 db_location=None, 203 work_dir="./", 204 change_to_state=False, 205 verify_all_runs=True 206 ): 207 208 self.work_dir = os.path.realpath(os.path.expanduser(work_dir)) 209 self.verify_all_runs = verify_all_runs 210 211 self.campaign_name = name 212 self._campaign_dir = None 213 214 if db_location is None: 215 self._campaign_dir = tempfile.mkdtemp(prefix=name, dir=self.work_dir) 216 self.db_location = "sqlite:///" + self._campaign_dir + "/campaign.db" 217 else: 218 self.db_location = db_location 219 220 self.campaign_id = None 221 self.campaign_db = None 222 223 self.last_analysis = None 224 225 self._active_app = None 226 self._active_app_name = None 227 self._active_app_actions = None 228 229 self._active_sampler = None 230 self._active_sampler_id = None 231 232 self.init_db(name, self.work_dir) 233 self._state_dir = None 234 235 # here we assume that the user wants to add an app 236 if (params is not None) and (actions is not None): 237 self.add_app(name=name, params=params, actions=actions) 238 239 @property 240 def campaign_dir(self): 241 """Get the path in which to load/save files related to the campaign. 242 243 Returns 244 ------- 245 str 246 Path to the campaign directory - given as a subdirectory of the 247 working directory. 248 """ 249 250 return os.path.join(self.work_dir, self._campaign_dir) 251 252 def init_db(self, name, work_dir='.'): 253 """Initialize the connection with the database and either resume or create the campaign. 254 255 Parameters 256 ---------- 257 name: str 258 Name of the campaign. 259 work_dir: str 260 Work directory, defaults to cwd. 261 """ 262 self.campaign_db = db.CampaignDB(location=self.db_location) 263 if self.campaign_db.campaign_exists(name): 264 self.campaign_id = self.campaign_db.get_campaign_id(name) 265 self._active_app_name = self.campaign_db.get_active_app()[0].name 266 self.campaign_name = name 267 self._campaign_dir = self.campaign_db.campaign_dir(name) 268 if not os.path.exists(self._campaign_dir): 269 message = (f"Campaign directory ({self.campaign_dir}) does not exist.") 270 raise RuntimeError(message) 271 self._active_sampler_id = self.campaign_db.get_sampler_id(self.campaign_id) 272 self._active_sampler = self.campaign_db.resurrect_sampler(self._active_sampler_id) 273 self.set_app(self._active_app_name) 274 self.campaign_db.resume_campaign(name) 275 else: 276 if self._campaign_dir is None: 277 self._campaign_dir = tempfile.mkdtemp(prefix=name, dir=work_dir) 278 info = CampaignInfo( 279 name=name, 280 campaign_dir_prefix=default_campaign_prefix, 281 easyvvuq_version=easyvvuq.__version__, 282 campaign_dir=self._campaign_dir) 283 self.campaign_db.create_campaign(info) 284 self.campaign_name = name 285 self.campaign_id = self.campaign_db.get_campaign_id(self.campaign_name) 286 287 def add_app(self, name=None, params=None, actions=None, set_active=True): 288 """Add an application to the CampaignDB. 289 290 Parameters 291 ---------- 292 name : str 293 Name of the application. 294 params : dict 295 Description of the parameters to associate with the application. 296 actions : Actions 297 An instance of Actions containing actions to be executed 298 set_active: bool 299 Should the added app be set to be the currently active app? 300 """ 301 # Verify input parameters dict 302 paramsspec = easyvvuq.ParamsSpecification(params, appname=name) 303 # validate application input 304 app = AppInfo( 305 name=name, 306 paramsspec=paramsspec, 307 actions=actions, 308 ) 309 self.campaign_db.add_app(app) 310 if set_active: 311 self.set_app(app.name) 312 313 def set_app(self, app_name): 314 """Set active app for the campaign. 315 316 Application information is retrieved from `self.campaign_db`. 317 318 Parameters 319 ---------- 320 app_name: str 321 Name of selected app, if `None` given then first app will be 322 selected. 323 """ 324 self._active_app_name = app_name 325 self._active_app = self.campaign_db.app(name=app_name) 326 self.campaign_db.set_active_app(app_name) 327 # Resurrect the app encoder, decoder and collation elements 328 self._active_app_actions = self.campaign_db.resurrect_app(app_name) 329 330 def replace_actions(self, app_name, actions): 331 """Replace actions for an app with a given name. 332 333 Parameters 334 ---------- 335 app_name: str 336 Name of the app. 337 actions: Actions 338 `Actions` instance, will replace the current `Actions` of an app. 339 """ 340 self.campaign_db.replace_actions(app_name, actions) 341 self._active_app_actions = actions 342 343 def set_sampler(self, sampler, update=False): 344 """Set active sampler. 345 346 Parameters 347 ---------- 348 sampler : Sampler 349 Sampler that will be used to create runs for the current campaign. 350 update : bool 351 If set to True it will not add the sampler to the database, just change 352 it as the active sampler. 353 """ 354 self._active_sampler = sampler 355 if not update: 356 self._active_sampler_id = self.campaign_db.add_sampler(sampler) 357 sampler.sampler_id = self._active_sampler_id 358 self._active_sampler_id = self._active_sampler.sampler_id 359 self.campaign_db.set_sampler(self.campaign_id, self._active_sampler.sampler_id) 360 361 def add_external_runs(self, input_files, output_files, input_decoder, output_decoder, 362 validate_params=True, run_prefix="external_run"): 363 """Takes a list of files and adds them to the database. This method is to be 364 used when adding runs to the EasyVVUQ database that were not executed using 365 EasyVVUQ. 366 367 Parameters 368 ---------- 369 input_files: list of str 370 A list of input file paths to be loaded to the database. 371 output_files: list of str 372 A list of output file paths to be loaded to the database. 373 input_decoder: Decoder 374 A decoder that will be used to parse input files. 375 output_decoder: Decoder 376 A decoder that will be used to parse output files. 377 validate_params: bool, optional 378 Whether to validate parameters against the app definition (default: True) 379 run_prefix: str, optional 380 Prefix for run names (default: "external_run") 381 """ 382 if self._active_app is None: 383 msg = ("No app is currently set for this campaign. " 384 "Use set_app('name_of_app') or add_app() first.") 385 logging.error(msg) 386 raise Exception(msg) 387 388 if len(input_files) != len(output_files): 389 raise ValueError("Number of input files must match number of output files") 390 391 inputs = [] 392 outputs = [] 393 failed_runs = [] 394 395 # Parse input files 396 for i, input_file in enumerate(input_files): 397 try: 398 input_decoder.target_filename = os.path.basename(input_file) 399 params = input_decoder.parse_sim_output({'run_dir': os.path.dirname(input_file)}) 400 401 # Validate parameters if requested 402 if validate_params: 403 try: 404 app_default_params = self._active_app["params"] 405 validated_params = app_default_params.process_run(params, verify=self.verify_all_runs) 406 inputs.append(validated_params) 407 except Exception as e: 408 logging.warning(f"Parameter validation failed for {input_file}: {e}") 409 failed_runs.append(i) 410 continue 411 else: 412 inputs.append(params) 413 414 except Exception as e: 415 logging.error(f"Failed to parse input file {input_file}: {e}") 416 failed_runs.append(i) 417 continue 418 419 # Parse output files 420 for i, output_file in enumerate(output_files): 421 if i in failed_runs: 422 continue 423 424 try: 425 output_decoder.target_filename = os.path.basename(output_file) 426 result = output_decoder.parse_sim_output({'run_dir': os.path.dirname(output_file)}) 427 outputs.append(result) 428 except Exception as e: 429 logging.error(f"Failed to parse output file {output_file}: {e}") 430 failed_runs.append(i) 431 continue 432 433 # Add runs to database 434 run_counter = 0 435 for i, (params, result) in enumerate(zip(inputs, outputs)): 436 if i in failed_runs: 437 continue 438 439 run_counter += 1 440 table = db.RunTable(run_name=f'{run_prefix}_{run_counter}', 441 app=self._active_app['id'], 442 params=json.dumps(params), 443 status=Status.COLLATED, 444 run_dir=self.get_campaign_runs_dir(), 445 result=json.dumps(result), 446 campaign=self.campaign_id, 447 sampler=self._active_sampler_id) 448 self.campaign_db.session.add(table) 449 450 # Commit all changes at once 451 self.campaign_db.session.commit() 452 453 logging.info(f"Successfully imported {run_counter} runs") 454 if failed_runs: 455 logging.warning(f"Failed to import {len(failed_runs)} runs due to parsing or validation errors") 456 457 def add_runs(self, runs, mark_invalid=False): 458 """Add runs to the database. 459 460 Parameters 461 ---------- 462 runs : list of dicts 463 Each dict defines the value of each model parameter listed in 464 self.params_info for a run to be added to self.runs 465 mark_invalid : bool 466 Will mark runs that fail verification as invalid (but will not raise an exception) 467 """ 468 if self._active_app is None: 469 msg = ("No app is currently set for this campaign. " 470 "Use set_app('name_of_app').") 471 logging.error(msg) 472 raise Exception(msg) 473 app_default_params = self._active_app["params"] 474 run_info_list = [] 475 for new_run in runs: 476 if new_run is None: 477 msg = ("add_run() was passed new_run of type None. Bad sampler?") 478 logging.error(msg) 479 raise Exception(msg) 480 # Verify and complete run with missing/default param values 481 status = Status.NEW 482 try: 483 new_run = app_default_params.process_run(new_run, verify=self.verify_all_runs) 484 except RuntimeError: 485 if mark_invalid: 486 new_run = app_default_params.process_run(new_run, verify=False) 487 status = Status.INVALID 488 else: 489 raise 490 # Add to run queue 491 run_info = RunInfo(app=self._active_app['id'], 492 params=new_run, 493 sample=self._active_sampler_id, 494 campaign=self.campaign_id, 495 status=status) 496 run_info_list.append(run_info) 497 self.campaign_db.add_runs(run_info_list, iteration=self._active_sampler.iteration) 498 499 def draw_samples(self, num_samples=0, mark_invalid=False): 500 """Draws `num_samples` sets of parameters from the currently set 501 sampler, resulting in `num_samples` new runs added to the 502 runs list. If `num_samples` is 0 (its default value) then 503 this method draws ALL samples from the sampler, until exhaustion (this 504 will fail if the sampler is not finite). 505 506 Parameters 507 ---------- 508 num_samples : int 509 Number of samples to draw from the active sampling element. 510 By default is 0 (draw ALL samples) 511 mark_invalid : bool 512 If True will mark runs that go outside valid parameter range as INVALID. 513 This is useful for MCMC style methods where you want those runs to evaluate 514 to low probabilities. 515 """ 516 # Make sure `num_samples` is not 0 for an infinite generator 517 # (this would add runs forever...) 518 if not self._active_sampler.is_finite() and num_samples <= 0: 519 msg = (f"Sampling_element '{self._active_sampler.element_name()}' " 520 f"is an infinite generator, therefore a finite number of " 521 f"draws (n > 0) must be specified.") 522 raise RuntimeError(msg) 523 num_added = 0 524 new_runs = [] 525 for new_run in self._active_sampler: 526 new_runs.append(new_run) 527 num_added += 1 528 if num_samples != 0 and num_added >= num_samples: 529 break 530 self.add_runs(new_runs, mark_invalid) 531 # Write sampler's new state to database 532 self.campaign_db.update_sampler(self._active_sampler_id, self._active_sampler) 533 return new_runs 534 535 def list_runs(self, sampler=None, campaign=None, app_id=None, status=None): 536 """Get list of runs in the CampaignDB. 537 538 Parameters 539 ---------- 540 sampler: int 541 Sampler id to filter for. 542 campaign: int 543 Campaign id to filter for. 544 app_id: int 545 App id to filter for. 546 status: Status 547 Status to filter for. 548 549 Returns 550 ------- 551 list of runs 552 """ 553 return list(self.campaign_db.runs( 554 sampler=sampler, campaign=campaign, app_id=app_id, status=status)) 555 556 def get_campaign_runs_dir(self): 557 """Get the runs directory from the CampaignDB. 558 559 Returns 560 ------- 561 str 562 Path in which the runs information will be written. 563 """ 564 return self.campaign_db.runs_dir(self.campaign_name) 565 566 def relocate(self, campaign_dir): 567 """Relocate the campaign by specifying a new path where campaign is located. 568 569 Parameters 570 ---------- 571 new_path: str 572 new runs directory 573 """ 574 if not os.path.exists(campaign_dir): 575 raise RuntimeError("specified directory does not exist: {}".format(campaign_dir)) 576 self.campaign_db.relocate(campaign_dir, self.campaign_name) 577 578 def execute(self, nsamples=0, pool=None, mark_invalid=False, sequential=False): 579 """This will draw samples and execute the Actions on those samples. 580 581 Parameters 582 ---------- 583 nsamples: int 584 Number of samples to draw. For infinite samplers or when you want to process 585 samples in batches. 586 pool: Executor 587 A pool object to be used when processing runs (e.g. instance of `ThreadPoolExecutor` or 588 `ProcessPoolExecutor`). 589 mark_invalid: bool 590 Mark runs that go outside the specified input parameter range as INVALID. 591 sequential: bool 592 Whether to process samples sequentially (sometimes more efficient or you might 593 want to avoid the concurrent module for some reason). 594 """ 595 self.draw_samples(nsamples, mark_invalid=mark_invalid) 596 action_pool = self.apply_for_each_sample( 597 self._active_app_actions, sequential=sequential) 598 return action_pool.start(pool=pool) 599 600 def apply_for_each_sample(self, actions, status=Status.NEW, sequential=False): 601 """For each run in this Campaign's run list, apply the specified action 602 (an object of type Action). 603 604 Parameters 605 ---------- 606 actions: Actions 607 Actions to be applied to each relevant run in the database. 608 status: Status 609 Will apply the Actions only to those runs whose status is as specified. 610 sequential: bool 611 Whether to process samples sequentially (sometimes more efficient or you might 612 want to avoid the concurrent module for some reason). 613 614 Returns 615 ------- 616 ActionPool 617 An object containing ActionStatus instances to track action execution. 618 """ 619 # Loop through all runs in this campaign with status ENCODED, and 620 # run the specified action on each run's dir 621 def inits(): 622 for run_id, run_data in self.campaign_db.runs( 623 status=status, app_id=self._active_app['id']): 624 previous = {} 625 previous['run_id'] = run_id 626 previous['campaign_dir'] = self._campaign_dir 627 previous['rundir'] = run_data['run_dir'] 628 previous['run_info'] = run_data 629 previous['result'] = {} 630 previous['collated'] = False 631 yield previous 632 return ActionPool(self, actions, inits=inits(), sequential=sequential) 633 634 def iterate(self, nsamples=0, pool=None, mark_invalid=False, sequential=False): 635 """This is the equivalent of `execute` for methods that rely on the output of the 636 previous sampling stage (designed for MCMC, should work for others). 637 638 Parameters 639 ---------- 640 nsamples : int 641 Number of samples to draw (during a single iteration). 642 pool : Executor 643 An Executor instance. For example ThreadPoolExecutor or a Dask Client. Defaults 644 to the ThreadPoolExecutor. 645 mark_invalid : bool 646 Mark runs that go outside the specified input parameter range as INVALID. 647 sequential: bool 648 Will execute the `Actions` associated with runs sequentially. Might be more 649 efficient in some situations. 650 651 Yields 652 ------ 653 ActionPool 654 An object containing Futures instances to track action execution. 655 """ 656 while True: 657 self.draw_samples(nsamples, mark_invalid=mark_invalid) 658 action_pool = self.apply_for_each_sample( 659 self._active_app_actions, sequential=sequential) 660 yield action_pool.start(pool=pool) 661 result = self.get_collation_result(last_iteration=True) 662 invalid = self.get_invalid_runs(last_iteration=True) 663 ignored_runs = self._active_sampler.update(result, invalid) 664 for run_id in ignored_runs: 665 self.campaign_db.session.query(db.RunTable).\ 666 filter(db.RunTable.id == int(run_id)).\ 667 update({'status': easyvvuq.constants.Status.IGNORED}) 668 self.campaign_db.session.commit() 669 670 def recollate(self): 671 """Clears the current collation table, changes all COLLATED status runs 672 back to ENCODED, then runs collate() again 673 """ 674 collated_run_ids = list(self.campaign_db.run_ids(status=Status.COLLATED)) 675 self.campaign_db.set_run_statuses(collated_run_ids, Status.ENCODED) 676 self.collate() 677 678 def get_collation_result(self, last_iteration=False): 679 """Return dataframe containing all collated results 680 681 Parameters 682 ---------- 683 last_iteration : bool 684 Will only return the result of the last iteration. 685 686 Returns 687 ------- 688 DataFrame 689 A DataFrame with the simulation results along with the inputs 690 used to produce them. 691 """ 692 if last_iteration: 693 iteration = self._active_sampler.iteration - 1 694 else: 695 iteration = -1 696 return self.campaign_db.get_results( 697 self._active_app['name'], 698 self._active_sampler_id, 699 status=easyvvuq.constants.Status.COLLATED, 700 iteration=iteration) 701 702 def get_invalid_runs(self, last_iteration=False): 703 """Return dataframe containing all results marked as INVALID. 704 705 Parameters 706 ---------- 707 last_iteration : bool 708 Will only return the result of the last iteration. 709 710 Returns 711 ------- 712 DataFrame 713 A DataFrame with the results form simulations that were marked as INVALID. 714 These will usually be the ones that went outside the specified parameter ranges. 715 These still have to be accounted for in some way by some methods (e.g. MCMC). 716 """ 717 if last_iteration: 718 iteration = self._active_sampler.iteration - 1 719 else: 720 iteration = -1 721 return self.campaign_db.get_results( 722 self._active_app['name'], 723 self._active_sampler_id, 724 status=easyvvuq.constants.Status.INVALID, 725 iteration=iteration) 726 727 def apply_analysis(self, analysis): 728 """Run the `analysis` element on the output of the last run collation. 729 730 Parameters 731 ---------- 732 analysis : Analysis 733 Element that performs a VVUQ analysis on a dataframe summary of 734 run outputs. 735 """ 736 # Apply analysis element to most recent collation result 737 self.last_analysis = analysis.analyse(data_frame=self.get_collation_result()) 738 739 def analyse(self, **kwargs): 740 """If available will call an appropriate analysis class on the collation result. 741 742 Parameters 743 ---------- 744 **kwargs : dict 745 Argument to the analysis class constructor (after sampler). 746 747 Returns 748 ------- 749 AnalysisResults 750 An object representing analysis results. Can be used to interact with those results 751 in some way. Plot, retrieve surrogate models and so on. 752 See `easyvvuq.analysis.AnalysisResults` for further information. 753 """ 754 collation_result = self.get_collation_result() 755 try: 756 analysis = self._active_sampler.analysis_class(sampler=self._active_sampler, **kwargs) 757 return analysis.analyse(collation_result) 758 except NotImplementedError: 759 raise RuntimeError("This sampler does not have a corresponding analysis class") 760 761 def get_last_analysis(self): 762 """Return the output of the most recently run analysis element. 763 """ 764 if self.last_analysis is None: 765 logging.warning("No last analysis output available.") 766 return self.last_analysis 767 768 def __str__(self): 769 """Returns formatted summary of the current Campaign state. 770 Enables class to work with standard print() method 771 """ 772 return (f"db_location = {self.db_location}\n" 773 f"active_sampler_id = {self._active_sampler_id}\n" 774 f"campaign_name = {self.campaign_name}\n" 775 f"campaign_dir = {self.campaign_dir}\n" 776 f"campaign_id = {self.campaign_id}\n") 777 778 def get_active_sampler(self): 779 """Return the active sampler element in use by this campaign. 780 781 Returns 782 ------- 783 The sampler currently in use 784 """ 785 786 return self._active_sampler 787 788 def ignore_runs(self, list_of_run_IDs): 789 """Flags the specified runs to be IGNORED in future collation. Note that 790 this does NOT remove previously collated results from the collation table. 791 For that you must refresh the collation by running recollate(). 792 793 Parameters 794 ---------- 795 list 796 The list of run IDs for the runs that should be set to status IGNORED 797 """ 798 self.campaign_db.set_run_statuses(list_of_run_IDs, Status.IGNORED) 799 800 def rerun(self, list_of_run_IDs): 801 """Sets the status of the specified runs to ENCODED, so that their results 802 may be recollated later (presumably after extending, rerunning or otherwise 803 modifying the data in the relevant run folder). Note that this method will 804 NOT perform any execution - it simply flags the run in EasyVVUQ as being 805 uncollated. Actual execution is (as usual) the job of the user or middleware. 806 807 Parameters 808 ---------- 809 list 810 The list of run IDs for the runs that should be set to status ENCODED 811 """ 812 813 for run_ID in list_of_run_IDs: 814 status = self.campaign_db.get_run_status(run_ID) 815 if status == Status.NEW: 816 msg = (f"Cannot rerun {run_ID} as it has status NEW, and must" 817 f"be encoded before execution.") 818 raise RuntimeError(msg) 819 self.campaign_db.set_run_statuses(list_of_run_IDs, Status.ENCODED) 820 821 def get_active_app(self): 822 """Returns a dict of information regarding the application that is currently 823 set for this campaign. 824 """ 825 return self._active_app
Campaigns organise the dataflow in EasyVVUQ workflows.
The Campaign functions as as state machine for the VVUQ workflows. It uses a database (CampaignDB) to store information on both the target application and the VVUQ algorithms being employed. It also collects data from the simulations and can be used to store and resume your state.
Notes
Multiple campaigns can be combined in a CampaignDB. Hence the particular
campaign we are currently working on will be specified using campaign_id.
Parameters
- name (str): Name of the Campaign. Freely chosen, serves as a human-readable way of distinguishing between several campaigns in the same database.
- params (dict, optional):
Description of the parameters to associated with the application. Will be used to create
an app when creating the campaign. It is also possible to add apps manually using
add_appmethod of the Campaign class. But this can be a useful shorthand when working with single app campaigns. To use this functionality bothparamsandactionshas to be specified. The name of this app will be the same as the name of the Campaign. - actions (Actions, optional):
Actions object associated with an application. See description of the
paramsparameter for more details. - db_location (str, optional): Location of the underlying campaign database - either a path or acceptable URI for SQLAlchemy.
- work_dir (str, optional, default='./'): Path to working directory - used to store campaign directory.
- change_to_state (bool, optional, default=False):
Should we change to the directory containing any specified
state_filein order to make relative paths work. - verify_all_runs (bool, optional, default=True): Check all new runs being added for unrecognised params (not defined for the currently set app), values lying within defined physical range, type checking etc. This should normally always be set to True, but in cases where the performance is too degraded, the checks can be disabled by setting to False.
Attributes
- campaign_name (str or None): Name for the campaign/workflow.
- _campaign_dir (str or None): Path to the directory campaign uses for local storage (runs inputs etc)
- db_location (str or None): Location of the underlying campaign database - either a path or acceptable URI for SQLAlchemy.
- _log (list): The log of all elements that have been applied, with information about their application
- campaign_id (int): ID number for the current campaign in the db.CampaignDB.
- campaign_db (easyvvuq.db.Basedb.CampaignDB): A campaign database object
- last_analysis:: The result of the most recent analysis carried out on this campaign
- _active_app (dict): Info about currently set app
- _active_app_name (str): Name of currently set app
- _active_sampler_id (int): The database id of the currently set Sampler object
Examples
A typical instantiation might look like this.
>>> params = {
"S0": {"type": "float", "default": 997},
"I0": {"type": "float", "default": 3},
"beta": {"type": "float", "default": 0.2},
"gamma": {"type": "float", "default": 0.04, "min": 0.0, "max": 1.0},
"iterations": {"type": "integer", "default": 100},
"outfile": {"type": "string", "default": "output.csv"}
}
>>> encoder = uq.encoders.GenericEncoder(template_fname='sir.template', delimiter='$', target_filename='input.json')
>>> decoder = uq.decoders.SimpleCSV(target_filename='output.csv', output_columns=['I'])
>>> actions = uq.actions.local_execute(encoder, os.path.abspath('sir') + ' input.json', decoder)
>>> campaign = uq.Campaign(name='sir', params=params, actions=actions)
A simplified one (without an app) might look simply like this.
>>> campaign = Campaign('simple')
An app then can be added.
>>> campaign.add_app('simple_app', params=params, actions=actions)
197 def __init__( 198 self, 199 name, 200 params=None, 201 actions=None, 202 db_location=None, 203 work_dir="./", 204 change_to_state=False, 205 verify_all_runs=True 206 ): 207 208 self.work_dir = os.path.realpath(os.path.expanduser(work_dir)) 209 self.verify_all_runs = verify_all_runs 210 211 self.campaign_name = name 212 self._campaign_dir = None 213 214 if db_location is None: 215 self._campaign_dir = tempfile.mkdtemp(prefix=name, dir=self.work_dir) 216 self.db_location = "sqlite:///" + self._campaign_dir + "/campaign.db" 217 else: 218 self.db_location = db_location 219 220 self.campaign_id = None 221 self.campaign_db = None 222 223 self.last_analysis = None 224 225 self._active_app = None 226 self._active_app_name = None 227 self._active_app_actions = None 228 229 self._active_sampler = None 230 self._active_sampler_id = None 231 232 self.init_db(name, self.work_dir) 233 self._state_dir = None 234 235 # here we assume that the user wants to add an app 236 if (params is not None) and (actions is not None): 237 self.add_app(name=name, params=params, actions=actions)
138 @staticmethod 139 def from_existing_data(name, 140 input_files, 141 output_files, 142 input_decoder=None, 143 output_decoder=None, 144 params=None, 145 output_columns=None, 146 work_dir="./", 147 auto_infer=True): 148 """ 149 Create a campaign from existing data files. 150 151 Parameters 152 ---------- 153 name : str 154 Name of the campaign 155 input_files : list of str 156 List of input file paths 157 output_files : list of str 158 List of output file paths 159 input_decoder : Decoder, optional 160 Decoder for input files (auto-created if None) 161 output_decoder : Decoder, optional 162 Decoder for output files (auto-created if None) 163 params : dict, optional 164 Parameter definitions (auto-inferred if None and auto_infer=True) 165 output_columns : list of str, optional 166 Output column names (auto-inferred if None and auto_infer=True) 167 work_dir : str, optional 168 Working directory (default: "./") 169 auto_infer : bool, optional 170 Whether to automatically infer parameters and outputs (default: True) 171 172 Returns 173 ------- 174 Campaign 175 A new campaign with the imported data 176 177 Examples 178 -------- 179 >>> campaign = Campaign.from_existing_data( 180 ... name="imported_sim", 181 ... input_files=["run1/input.json", "run2/input.json"], 182 ... output_files=["run1/output.csv", "run2/output.csv"] 183 ... ) 184 """ 185 from easyvvuq.utils.dataset_importer import create_campaign_from_files 186 187 return create_campaign_from_files( 188 input_files=input_files, 189 output_files=output_files, 190 campaign_name=name, 191 work_dir=work_dir, 192 input_decoder=input_decoder, 193 output_decoder=output_decoder, 194 auto_infer=auto_infer 195 )
Create a campaign from existing data files.
Parameters
- name (str): Name of the campaign
- input_files (list of str): List of input file paths
- output_files (list of str): List of output file paths
- input_decoder (Decoder, optional): Decoder for input files (auto-created if None)
- output_decoder (Decoder, optional): Decoder for output files (auto-created if None)
- params (dict, optional): Parameter definitions (auto-inferred if None and auto_infer=True)
- output_columns (list of str, optional): Output column names (auto-inferred if None and auto_infer=True)
- work_dir (str, optional): Working directory (default: "./")
- auto_infer (bool, optional): Whether to automatically infer parameters and outputs (default: True)
Returns
- Campaign: A new campaign with the imported data
Examples
>>> campaign = Campaign.from_existing_data(
... name="imported_sim",
... input_files=["run1/input.json", "run2/input.json"],
... output_files=["run1/output.csv", "run2/output.csv"]
... )
239 @property 240 def campaign_dir(self): 241 """Get the path in which to load/save files related to the campaign. 242 243 Returns 244 ------- 245 str 246 Path to the campaign directory - given as a subdirectory of the 247 working directory. 248 """ 249 250 return os.path.join(self.work_dir, self._campaign_dir)
Get the path in which to load/save files related to the campaign.
Returns
- str: Path to the campaign directory - given as a subdirectory of the working directory.
252 def init_db(self, name, work_dir='.'): 253 """Initialize the connection with the database and either resume or create the campaign. 254 255 Parameters 256 ---------- 257 name: str 258 Name of the campaign. 259 work_dir: str 260 Work directory, defaults to cwd. 261 """ 262 self.campaign_db = db.CampaignDB(location=self.db_location) 263 if self.campaign_db.campaign_exists(name): 264 self.campaign_id = self.campaign_db.get_campaign_id(name) 265 self._active_app_name = self.campaign_db.get_active_app()[0].name 266 self.campaign_name = name 267 self._campaign_dir = self.campaign_db.campaign_dir(name) 268 if not os.path.exists(self._campaign_dir): 269 message = (f"Campaign directory ({self.campaign_dir}) does not exist.") 270 raise RuntimeError(message) 271 self._active_sampler_id = self.campaign_db.get_sampler_id(self.campaign_id) 272 self._active_sampler = self.campaign_db.resurrect_sampler(self._active_sampler_id) 273 self.set_app(self._active_app_name) 274 self.campaign_db.resume_campaign(name) 275 else: 276 if self._campaign_dir is None: 277 self._campaign_dir = tempfile.mkdtemp(prefix=name, dir=work_dir) 278 info = CampaignInfo( 279 name=name, 280 campaign_dir_prefix=default_campaign_prefix, 281 easyvvuq_version=easyvvuq.__version__, 282 campaign_dir=self._campaign_dir) 283 self.campaign_db.create_campaign(info) 284 self.campaign_name = name 285 self.campaign_id = self.campaign_db.get_campaign_id(self.campaign_name)
Initialize the connection with the database and either resume or create the campaign.
Parameters
- name (str): Name of the campaign.
- work_dir (str): Work directory, defaults to cwd.
287 def add_app(self, name=None, params=None, actions=None, set_active=True): 288 """Add an application to the CampaignDB. 289 290 Parameters 291 ---------- 292 name : str 293 Name of the application. 294 params : dict 295 Description of the parameters to associate with the application. 296 actions : Actions 297 An instance of Actions containing actions to be executed 298 set_active: bool 299 Should the added app be set to be the currently active app? 300 """ 301 # Verify input parameters dict 302 paramsspec = easyvvuq.ParamsSpecification(params, appname=name) 303 # validate application input 304 app = AppInfo( 305 name=name, 306 paramsspec=paramsspec, 307 actions=actions, 308 ) 309 self.campaign_db.add_app(app) 310 if set_active: 311 self.set_app(app.name)
Add an application to the CampaignDB.
Parameters
- name (str): Name of the application.
- params (dict): Description of the parameters to associate with the application.
- actions (Actions): An instance of Actions containing actions to be executed
- set_active (bool): Should the added app be set to be the currently active app?
313 def set_app(self, app_name): 314 """Set active app for the campaign. 315 316 Application information is retrieved from `self.campaign_db`. 317 318 Parameters 319 ---------- 320 app_name: str 321 Name of selected app, if `None` given then first app will be 322 selected. 323 """ 324 self._active_app_name = app_name 325 self._active_app = self.campaign_db.app(name=app_name) 326 self.campaign_db.set_active_app(app_name) 327 # Resurrect the app encoder, decoder and collation elements 328 self._active_app_actions = self.campaign_db.resurrect_app(app_name)
Set active app for the campaign.
Application information is retrieved from self.campaign_db.
Parameters
- app_name (str):
Name of selected app, if
Nonegiven then first app will be selected.
330 def replace_actions(self, app_name, actions): 331 """Replace actions for an app with a given name. 332 333 Parameters 334 ---------- 335 app_name: str 336 Name of the app. 337 actions: Actions 338 `Actions` instance, will replace the current `Actions` of an app. 339 """ 340 self.campaign_db.replace_actions(app_name, actions) 341 self._active_app_actions = actions
Replace actions for an app with a given name.
Parameters
- app_name (str): Name of the app.
- actions (Actions):
Actionsinstance, will replace the currentActionsof an app.
343 def set_sampler(self, sampler, update=False): 344 """Set active sampler. 345 346 Parameters 347 ---------- 348 sampler : Sampler 349 Sampler that will be used to create runs for the current campaign. 350 update : bool 351 If set to True it will not add the sampler to the database, just change 352 it as the active sampler. 353 """ 354 self._active_sampler = sampler 355 if not update: 356 self._active_sampler_id = self.campaign_db.add_sampler(sampler) 357 sampler.sampler_id = self._active_sampler_id 358 self._active_sampler_id = self._active_sampler.sampler_id 359 self.campaign_db.set_sampler(self.campaign_id, self._active_sampler.sampler_id)
Set active sampler.
Parameters
- sampler (Sampler): Sampler that will be used to create runs for the current campaign.
- update (bool): If set to True it will not add the sampler to the database, just change it as the active sampler.
361 def add_external_runs(self, input_files, output_files, input_decoder, output_decoder, 362 validate_params=True, run_prefix="external_run"): 363 """Takes a list of files and adds them to the database. This method is to be 364 used when adding runs to the EasyVVUQ database that were not executed using 365 EasyVVUQ. 366 367 Parameters 368 ---------- 369 input_files: list of str 370 A list of input file paths to be loaded to the database. 371 output_files: list of str 372 A list of output file paths to be loaded to the database. 373 input_decoder: Decoder 374 A decoder that will be used to parse input files. 375 output_decoder: Decoder 376 A decoder that will be used to parse output files. 377 validate_params: bool, optional 378 Whether to validate parameters against the app definition (default: True) 379 run_prefix: str, optional 380 Prefix for run names (default: "external_run") 381 """ 382 if self._active_app is None: 383 msg = ("No app is currently set for this campaign. " 384 "Use set_app('name_of_app') or add_app() first.") 385 logging.error(msg) 386 raise Exception(msg) 387 388 if len(input_files) != len(output_files): 389 raise ValueError("Number of input files must match number of output files") 390 391 inputs = [] 392 outputs = [] 393 failed_runs = [] 394 395 # Parse input files 396 for i, input_file in enumerate(input_files): 397 try: 398 input_decoder.target_filename = os.path.basename(input_file) 399 params = input_decoder.parse_sim_output({'run_dir': os.path.dirname(input_file)}) 400 401 # Validate parameters if requested 402 if validate_params: 403 try: 404 app_default_params = self._active_app["params"] 405 validated_params = app_default_params.process_run(params, verify=self.verify_all_runs) 406 inputs.append(validated_params) 407 except Exception as e: 408 logging.warning(f"Parameter validation failed for {input_file}: {e}") 409 failed_runs.append(i) 410 continue 411 else: 412 inputs.append(params) 413 414 except Exception as e: 415 logging.error(f"Failed to parse input file {input_file}: {e}") 416 failed_runs.append(i) 417 continue 418 419 # Parse output files 420 for i, output_file in enumerate(output_files): 421 if i in failed_runs: 422 continue 423 424 try: 425 output_decoder.target_filename = os.path.basename(output_file) 426 result = output_decoder.parse_sim_output({'run_dir': os.path.dirname(output_file)}) 427 outputs.append(result) 428 except Exception as e: 429 logging.error(f"Failed to parse output file {output_file}: {e}") 430 failed_runs.append(i) 431 continue 432 433 # Add runs to database 434 run_counter = 0 435 for i, (params, result) in enumerate(zip(inputs, outputs)): 436 if i in failed_runs: 437 continue 438 439 run_counter += 1 440 table = db.RunTable(run_name=f'{run_prefix}_{run_counter}', 441 app=self._active_app['id'], 442 params=json.dumps(params), 443 status=Status.COLLATED, 444 run_dir=self.get_campaign_runs_dir(), 445 result=json.dumps(result), 446 campaign=self.campaign_id, 447 sampler=self._active_sampler_id) 448 self.campaign_db.session.add(table) 449 450 # Commit all changes at once 451 self.campaign_db.session.commit() 452 453 logging.info(f"Successfully imported {run_counter} runs") 454 if failed_runs: 455 logging.warning(f"Failed to import {len(failed_runs)} runs due to parsing or validation errors")
Takes a list of files and adds them to the database. This method is to be used when adding runs to the EasyVVUQ database that were not executed using EasyVVUQ.
Parameters
- input_files (list of str): A list of input file paths to be loaded to the database.
- output_files (list of str): A list of output file paths to be loaded to the database.
- input_decoder (Decoder): A decoder that will be used to parse input files.
- output_decoder (Decoder): A decoder that will be used to parse output files.
- validate_params (bool, optional): Whether to validate parameters against the app definition (default: True)
- run_prefix (str, optional): Prefix for run names (default: "external_run")
457 def add_runs(self, runs, mark_invalid=False): 458 """Add runs to the database. 459 460 Parameters 461 ---------- 462 runs : list of dicts 463 Each dict defines the value of each model parameter listed in 464 self.params_info for a run to be added to self.runs 465 mark_invalid : bool 466 Will mark runs that fail verification as invalid (but will not raise an exception) 467 """ 468 if self._active_app is None: 469 msg = ("No app is currently set for this campaign. " 470 "Use set_app('name_of_app').") 471 logging.error(msg) 472 raise Exception(msg) 473 app_default_params = self._active_app["params"] 474 run_info_list = [] 475 for new_run in runs: 476 if new_run is None: 477 msg = ("add_run() was passed new_run of type None. Bad sampler?") 478 logging.error(msg) 479 raise Exception(msg) 480 # Verify and complete run with missing/default param values 481 status = Status.NEW 482 try: 483 new_run = app_default_params.process_run(new_run, verify=self.verify_all_runs) 484 except RuntimeError: 485 if mark_invalid: 486 new_run = app_default_params.process_run(new_run, verify=False) 487 status = Status.INVALID 488 else: 489 raise 490 # Add to run queue 491 run_info = RunInfo(app=self._active_app['id'], 492 params=new_run, 493 sample=self._active_sampler_id, 494 campaign=self.campaign_id, 495 status=status) 496 run_info_list.append(run_info) 497 self.campaign_db.add_runs(run_info_list, iteration=self._active_sampler.iteration)
Add runs to the database.
Parameters
- runs (list of dicts): Each dict defines the value of each model parameter listed in self.params_info for a run to be added to self.runs
- mark_invalid (bool): Will mark runs that fail verification as invalid (but will not raise an exception)
499 def draw_samples(self, num_samples=0, mark_invalid=False): 500 """Draws `num_samples` sets of parameters from the currently set 501 sampler, resulting in `num_samples` new runs added to the 502 runs list. If `num_samples` is 0 (its default value) then 503 this method draws ALL samples from the sampler, until exhaustion (this 504 will fail if the sampler is not finite). 505 506 Parameters 507 ---------- 508 num_samples : int 509 Number of samples to draw from the active sampling element. 510 By default is 0 (draw ALL samples) 511 mark_invalid : bool 512 If True will mark runs that go outside valid parameter range as INVALID. 513 This is useful for MCMC style methods where you want those runs to evaluate 514 to low probabilities. 515 """ 516 # Make sure `num_samples` is not 0 for an infinite generator 517 # (this would add runs forever...) 518 if not self._active_sampler.is_finite() and num_samples <= 0: 519 msg = (f"Sampling_element '{self._active_sampler.element_name()}' " 520 f"is an infinite generator, therefore a finite number of " 521 f"draws (n > 0) must be specified.") 522 raise RuntimeError(msg) 523 num_added = 0 524 new_runs = [] 525 for new_run in self._active_sampler: 526 new_runs.append(new_run) 527 num_added += 1 528 if num_samples != 0 and num_added >= num_samples: 529 break 530 self.add_runs(new_runs, mark_invalid) 531 # Write sampler's new state to database 532 self.campaign_db.update_sampler(self._active_sampler_id, self._active_sampler) 533 return new_runs
Draws num_samples sets of parameters from the currently set
sampler, resulting in num_samples new runs added to the
runs list. If num_samples is 0 (its default value) then
this method draws ALL samples from the sampler, until exhaustion (this
will fail if the sampler is not finite).
Parameters
- num_samples (int): Number of samples to draw from the active sampling element. By default is 0 (draw ALL samples)
- mark_invalid (bool): If True will mark runs that go outside valid parameter range as INVALID. This is useful for MCMC style methods where you want those runs to evaluate to low probabilities.
535 def list_runs(self, sampler=None, campaign=None, app_id=None, status=None): 536 """Get list of runs in the CampaignDB. 537 538 Parameters 539 ---------- 540 sampler: int 541 Sampler id to filter for. 542 campaign: int 543 Campaign id to filter for. 544 app_id: int 545 App id to filter for. 546 status: Status 547 Status to filter for. 548 549 Returns 550 ------- 551 list of runs 552 """ 553 return list(self.campaign_db.runs( 554 sampler=sampler, campaign=campaign, app_id=app_id, status=status))
Get list of runs in the CampaignDB.
Parameters
- sampler (int): Sampler id to filter for.
- campaign (int): Campaign id to filter for.
- app_id (int): App id to filter for.
- status (Status): Status to filter for.
Returns
- list of runs
556 def get_campaign_runs_dir(self): 557 """Get the runs directory from the CampaignDB. 558 559 Returns 560 ------- 561 str 562 Path in which the runs information will be written. 563 """ 564 return self.campaign_db.runs_dir(self.campaign_name)
Get the runs directory from the CampaignDB.
Returns
- str: Path in which the runs information will be written.
566 def relocate(self, campaign_dir): 567 """Relocate the campaign by specifying a new path where campaign is located. 568 569 Parameters 570 ---------- 571 new_path: str 572 new runs directory 573 """ 574 if not os.path.exists(campaign_dir): 575 raise RuntimeError("specified directory does not exist: {}".format(campaign_dir)) 576 self.campaign_db.relocate(campaign_dir, self.campaign_name)
Relocate the campaign by specifying a new path where campaign is located.
Parameters
- new_path (str): new runs directory
578 def execute(self, nsamples=0, pool=None, mark_invalid=False, sequential=False): 579 """This will draw samples and execute the Actions on those samples. 580 581 Parameters 582 ---------- 583 nsamples: int 584 Number of samples to draw. For infinite samplers or when you want to process 585 samples in batches. 586 pool: Executor 587 A pool object to be used when processing runs (e.g. instance of `ThreadPoolExecutor` or 588 `ProcessPoolExecutor`). 589 mark_invalid: bool 590 Mark runs that go outside the specified input parameter range as INVALID. 591 sequential: bool 592 Whether to process samples sequentially (sometimes more efficient or you might 593 want to avoid the concurrent module for some reason). 594 """ 595 self.draw_samples(nsamples, mark_invalid=mark_invalid) 596 action_pool = self.apply_for_each_sample( 597 self._active_app_actions, sequential=sequential) 598 return action_pool.start(pool=pool)
This will draw samples and execute the Actions on those samples.
Parameters
- nsamples (int): Number of samples to draw. For infinite samplers or when you want to process samples in batches.
- pool (Executor):
A pool object to be used when processing runs (e.g. instance of
ThreadPoolExecutororProcessPoolExecutor). - mark_invalid (bool): Mark runs that go outside the specified input parameter range as INVALID.
- sequential (bool): Whether to process samples sequentially (sometimes more efficient or you might want to avoid the concurrent module for some reason).
600 def apply_for_each_sample(self, actions, status=Status.NEW, sequential=False): 601 """For each run in this Campaign's run list, apply the specified action 602 (an object of type Action). 603 604 Parameters 605 ---------- 606 actions: Actions 607 Actions to be applied to each relevant run in the database. 608 status: Status 609 Will apply the Actions only to those runs whose status is as specified. 610 sequential: bool 611 Whether to process samples sequentially (sometimes more efficient or you might 612 want to avoid the concurrent module for some reason). 613 614 Returns 615 ------- 616 ActionPool 617 An object containing ActionStatus instances to track action execution. 618 """ 619 # Loop through all runs in this campaign with status ENCODED, and 620 # run the specified action on each run's dir 621 def inits(): 622 for run_id, run_data in self.campaign_db.runs( 623 status=status, app_id=self._active_app['id']): 624 previous = {} 625 previous['run_id'] = run_id 626 previous['campaign_dir'] = self._campaign_dir 627 previous['rundir'] = run_data['run_dir'] 628 previous['run_info'] = run_data 629 previous['result'] = {} 630 previous['collated'] = False 631 yield previous 632 return ActionPool(self, actions, inits=inits(), sequential=sequential)
For each run in this Campaign's run list, apply the specified action (an object of type Action).
Parameters
- actions (Actions): Actions to be applied to each relevant run in the database.
- status (Status): Will apply the Actions only to those runs whose status is as specified.
- sequential (bool): Whether to process samples sequentially (sometimes more efficient or you might want to avoid the concurrent module for some reason).
Returns
- ActionPool: An object containing ActionStatus instances to track action execution.
634 def iterate(self, nsamples=0, pool=None, mark_invalid=False, sequential=False): 635 """This is the equivalent of `execute` for methods that rely on the output of the 636 previous sampling stage (designed for MCMC, should work for others). 637 638 Parameters 639 ---------- 640 nsamples : int 641 Number of samples to draw (during a single iteration). 642 pool : Executor 643 An Executor instance. For example ThreadPoolExecutor or a Dask Client. Defaults 644 to the ThreadPoolExecutor. 645 mark_invalid : bool 646 Mark runs that go outside the specified input parameter range as INVALID. 647 sequential: bool 648 Will execute the `Actions` associated with runs sequentially. Might be more 649 efficient in some situations. 650 651 Yields 652 ------ 653 ActionPool 654 An object containing Futures instances to track action execution. 655 """ 656 while True: 657 self.draw_samples(nsamples, mark_invalid=mark_invalid) 658 action_pool = self.apply_for_each_sample( 659 self._active_app_actions, sequential=sequential) 660 yield action_pool.start(pool=pool) 661 result = self.get_collation_result(last_iteration=True) 662 invalid = self.get_invalid_runs(last_iteration=True) 663 ignored_runs = self._active_sampler.update(result, invalid) 664 for run_id in ignored_runs: 665 self.campaign_db.session.query(db.RunTable).\ 666 filter(db.RunTable.id == int(run_id)).\ 667 update({'status': easyvvuq.constants.Status.IGNORED}) 668 self.campaign_db.session.commit()
This is the equivalent of execute for methods that rely on the output of the
previous sampling stage (designed for MCMC, should work for others).
Parameters
- nsamples (int): Number of samples to draw (during a single iteration).
- pool (Executor): An Executor instance. For example ThreadPoolExecutor or a Dask Client. Defaults to the ThreadPoolExecutor.
- mark_invalid (bool): Mark runs that go outside the specified input parameter range as INVALID.
- sequential (bool):
Will execute the
Actionsassociated with runs sequentially. Might be more efficient in some situations.
Yields
- ActionPool: An object containing Futures instances to track action execution.
670 def recollate(self): 671 """Clears the current collation table, changes all COLLATED status runs 672 back to ENCODED, then runs collate() again 673 """ 674 collated_run_ids = list(self.campaign_db.run_ids(status=Status.COLLATED)) 675 self.campaign_db.set_run_statuses(collated_run_ids, Status.ENCODED) 676 self.collate()
Clears the current collation table, changes all COLLATED status runs back to ENCODED, then runs collate() again
678 def get_collation_result(self, last_iteration=False): 679 """Return dataframe containing all collated results 680 681 Parameters 682 ---------- 683 last_iteration : bool 684 Will only return the result of the last iteration. 685 686 Returns 687 ------- 688 DataFrame 689 A DataFrame with the simulation results along with the inputs 690 used to produce them. 691 """ 692 if last_iteration: 693 iteration = self._active_sampler.iteration - 1 694 else: 695 iteration = -1 696 return self.campaign_db.get_results( 697 self._active_app['name'], 698 self._active_sampler_id, 699 status=easyvvuq.constants.Status.COLLATED, 700 iteration=iteration)
Return dataframe containing all collated results
Parameters
- last_iteration (bool): Will only return the result of the last iteration.
Returns
- DataFrame: A DataFrame with the simulation results along with the inputs used to produce them.
702 def get_invalid_runs(self, last_iteration=False): 703 """Return dataframe containing all results marked as INVALID. 704 705 Parameters 706 ---------- 707 last_iteration : bool 708 Will only return the result of the last iteration. 709 710 Returns 711 ------- 712 DataFrame 713 A DataFrame with the results form simulations that were marked as INVALID. 714 These will usually be the ones that went outside the specified parameter ranges. 715 These still have to be accounted for in some way by some methods (e.g. MCMC). 716 """ 717 if last_iteration: 718 iteration = self._active_sampler.iteration - 1 719 else: 720 iteration = -1 721 return self.campaign_db.get_results( 722 self._active_app['name'], 723 self._active_sampler_id, 724 status=easyvvuq.constants.Status.INVALID, 725 iteration=iteration)
Return dataframe containing all results marked as INVALID.
Parameters
- last_iteration (bool): Will only return the result of the last iteration.
Returns
- DataFrame: A DataFrame with the results form simulations that were marked as INVALID. These will usually be the ones that went outside the specified parameter ranges. These still have to be accounted for in some way by some methods (e.g. MCMC).
727 def apply_analysis(self, analysis): 728 """Run the `analysis` element on the output of the last run collation. 729 730 Parameters 731 ---------- 732 analysis : Analysis 733 Element that performs a VVUQ analysis on a dataframe summary of 734 run outputs. 735 """ 736 # Apply analysis element to most recent collation result 737 self.last_analysis = analysis.analyse(data_frame=self.get_collation_result())
Run the analysis element on the output of the last run collation.
Parameters
- analysis (Analysis): Element that performs a VVUQ analysis on a dataframe summary of run outputs.
739 def analyse(self, **kwargs): 740 """If available will call an appropriate analysis class on the collation result. 741 742 Parameters 743 ---------- 744 **kwargs : dict 745 Argument to the analysis class constructor (after sampler). 746 747 Returns 748 ------- 749 AnalysisResults 750 An object representing analysis results. Can be used to interact with those results 751 in some way. Plot, retrieve surrogate models and so on. 752 See `easyvvuq.analysis.AnalysisResults` for further information. 753 """ 754 collation_result = self.get_collation_result() 755 try: 756 analysis = self._active_sampler.analysis_class(sampler=self._active_sampler, **kwargs) 757 return analysis.analyse(collation_result) 758 except NotImplementedError: 759 raise RuntimeError("This sampler does not have a corresponding analysis class")
If available will call an appropriate analysis class on the collation result.
Parameters
- **kwargs (dict): Argument to the analysis class constructor (after sampler).
Returns
- AnalysisResults: An object representing analysis results. Can be used to interact with those results
in some way. Plot, retrieve surrogate models and so on.
See
easyvvuq.analysis.AnalysisResultsfor further information.
761 def get_last_analysis(self): 762 """Return the output of the most recently run analysis element. 763 """ 764 if self.last_analysis is None: 765 logging.warning("No last analysis output available.") 766 return self.last_analysis
Return the output of the most recently run analysis element.
778 def get_active_sampler(self): 779 """Return the active sampler element in use by this campaign. 780 781 Returns 782 ------- 783 The sampler currently in use 784 """ 785 786 return self._active_sampler
Return the active sampler element in use by this campaign.
Returns
- The sampler currently in use
788 def ignore_runs(self, list_of_run_IDs): 789 """Flags the specified runs to be IGNORED in future collation. Note that 790 this does NOT remove previously collated results from the collation table. 791 For that you must refresh the collation by running recollate(). 792 793 Parameters 794 ---------- 795 list 796 The list of run IDs for the runs that should be set to status IGNORED 797 """ 798 self.campaign_db.set_run_statuses(list_of_run_IDs, Status.IGNORED)
Flags the specified runs to be IGNORED in future collation. Note that this does NOT remove previously collated results from the collation table. For that you must refresh the collation by running recollate().
Parameters
- list: The list of run IDs for the runs that should be set to status IGNORED
800 def rerun(self, list_of_run_IDs): 801 """Sets the status of the specified runs to ENCODED, so that their results 802 may be recollated later (presumably after extending, rerunning or otherwise 803 modifying the data in the relevant run folder). Note that this method will 804 NOT perform any execution - it simply flags the run in EasyVVUQ as being 805 uncollated. Actual execution is (as usual) the job of the user or middleware. 806 807 Parameters 808 ---------- 809 list 810 The list of run IDs for the runs that should be set to status ENCODED 811 """ 812 813 for run_ID in list_of_run_IDs: 814 status = self.campaign_db.get_run_status(run_ID) 815 if status == Status.NEW: 816 msg = (f"Cannot rerun {run_ID} as it has status NEW, and must" 817 f"be encoded before execution.") 818 raise RuntimeError(msg) 819 self.campaign_db.set_run_statuses(list_of_run_IDs, Status.ENCODED)
Sets the status of the specified runs to ENCODED, so that their results may be recollated later (presumably after extending, rerunning or otherwise modifying the data in the relevant run folder). Note that this method will NOT perform any execution - it simply flags the run in EasyVVUQ as being uncollated. Actual execution is (as usual) the job of the user or middleware.
Parameters
- list: The list of run IDs for the runs that should be set to status ENCODED