easyvvuq.campaign

EasyVVUQ Campaign

This module contains the Campaign class that is used to coordinate all EasyVVUQ workflows.

  1"""EasyVVUQ Campaign
  2
  3This module contains the Campaign class that is used to coordinate all
  4EasyVVUQ workflows.
  5"""
  6import os
  7import json
  8import logging
  9import tempfile
 10import easyvvuq
 11from concurrent.futures import ProcessPoolExecutor
 12from easyvvuq.constants import default_campaign_prefix, Status
 13from easyvvuq.data_structs import RunInfo, CampaignInfo, AppInfo
 14from easyvvuq.sampling import BaseSamplingElement
 15from easyvvuq.actions import ActionPool
 16import easyvvuq.db.sql as db
 17
 18__copyright__ = """
 19
 20    Copyright 2018 Robin A. Richardson, David W. Wright
 21
 22    This file is part of EasyVVUQ
 23
 24    EasyVVUQ is free software: you can redistribute it and/or modify
 25    it under the terms of the Lesser GNU General Public License as published by
 26    the Free Software Foundation, either version 3 of the License, or
 27    (at your option) any later version.
 28
 29    EasyVVUQ is distributed in the hope that it will be useful,
 30    but WITHOUT ANY WARRANTY; without even the implied warranty of
 31    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 32    Lesser GNU General Public License for more details.
 33
 34    You should have received a copy of the Lesser GNU General Public License
 35    along with this program.  If not, see <https://www.gnu.org/licenses/>.
 36
 37"""
 38__license__ = "LGPL"
 39
 40
 41logger = logging.getLogger(__name__)
 42
 43
 44class Campaign:
 45    """Campaigns organise the dataflow in EasyVVUQ workflows.
 46
 47    The Campaign functions as as state machine for the VVUQ workflows. It uses a
 48    database (CampaignDB) to store information on both the target application
 49    and the VVUQ algorithms being employed. It also collects data from the simulations
 50    and can be used to store and resume your state.
 51
 52    Notes
 53    -----
 54
 55    Multiple campaigns can be combined in a CampaignDB. Hence the particular
 56    campaign we are currently working on will be specified using `campaign_id`.
 57
 58    Parameters
 59    ----------
 60    name: str
 61        Name of the Campaign. Freely chosen, serves as a human-readable way of distinguishing
 62        between several campaigns in the same database.
 63    params: dict, optional
 64        Description of the parameters to associated with the application. Will be used to create
 65        an app when creating the campaign. It is also possible to add apps manually using `add_app`
 66        method of the Campaign class. But this can be a useful shorthand when working with single
 67        app campaigns. To use this functionality both `params` and `actions` has to be specified.
 68        The name of this app will be the same as the name of the Campaign.
 69    actions: Actions, optional
 70        Actions object associated with an application. See description of the `params` parameter
 71        for more details.
 72    db_location: str, optional
 73        Location of the underlying campaign database - either a path or
 74        acceptable URI for SQLAlchemy.
 75    work_dir: str, optional, default='./'
 76        Path to working directory - used to store campaign directory.
 77    change_to_state: bool, optional, default=False
 78        Should we change to the directory containing any specified `state_file`
 79        in order to make relative paths work.
 80    verify_all_runs: bool, optional, default=True
 81        Check all new runs being added for unrecognised params (not defined for the currently set
 82        app), values lying within defined physical range, type checking etc. This should normally
 83        always be set to True, but in cases where the performance is too degraded, the checks can
 84        be disabled by setting to False.
 85
 86    Attributes
 87    ----------
 88    campaign_name : str or None
 89        Name for the campaign/workflow.
 90    _campaign_dir: str or None
 91        Path to the directory campaign uses for local storage (runs inputs etc)
 92    db_location : str or None
 93        Location of the underlying campaign database - either a path or
 94        acceptable URI for SQLAlchemy.
 95    _log: list
 96        The log of all elements that have been applied, with information about
 97        their application
 98    campaign_id : int
 99        ID number for the current campaign in the db.CampaignDB.
100    campaign_db: easyvvuq.db.Basedb.CampaignDB
101        A campaign database object
102    last_analysis:
103        The result of the most recent analysis carried out on this campaign
104    _active_app: dict
105        Info about currently set app
106    _active_app_name: str
107        Name of currently set app
108    _active_sampler_id: int
109        The database id of the currently set Sampler object
110
111    Examples
112    --------
113    A typical instantiation might look like this.
114
115    >>> params = {
116            "S0": {"type": "float", "default": 997},
117            "I0": {"type": "float", "default": 3},
118            "beta": {"type": "float", "default": 0.2},
119            "gamma": {"type": "float", "default": 0.04, "min": 0.0, "max": 1.0},
120            "iterations": {"type": "integer", "default": 100},
121            "outfile": {"type": "string", "default": "output.csv"}
122        }
123    >>> encoder = uq.encoders.GenericEncoder(template_fname='sir.template', delimiter='$', target_filename='input.json')
124    >>> decoder = uq.decoders.SimpleCSV(target_filename='output.csv', output_columns=['I'])
125    >>> actions = uq.actions.local_execute(encoder, os.path.abspath('sir') + ' input.json', decoder)
126    >>> campaign = uq.Campaign(name='sir', params=params, actions=actions)
127
128    A simplified one (without an app) might look simply like this.
129
130    >>> campaign = Campaign('simple')
131
132    An app then can be added.
133
134    >>> campaign.add_app('simple_app', params=params, actions=actions)
135    """
136
137    @staticmethod
138    def from_existing_data(name, 
139                          input_files, 
140                          output_files, 
141                          input_decoder=None, 
142                          output_decoder=None,
143                          params=None,
144                          output_columns=None,
145                          work_dir="./",
146                          auto_infer=True):
147        """
148        Create a campaign from existing data files.
149        
150        Parameters
151        ----------
152        name : str
153            Name of the campaign
154        input_files : list of str
155            List of input file paths
156        output_files : list of str
157            List of output file paths
158        input_decoder : Decoder, optional
159            Decoder for input files (auto-created if None)
160        output_decoder : Decoder, optional
161            Decoder for output files (auto-created if None)
162        params : dict, optional
163            Parameter definitions (auto-inferred if None and auto_infer=True)
164        output_columns : list of str, optional
165            Output column names (auto-inferred if None and auto_infer=True)
166        work_dir : str, optional
167            Working directory (default: "./")
168        auto_infer : bool, optional
169            Whether to automatically infer parameters and outputs (default: True)
170        
171        Returns
172        -------
173        Campaign
174            A new campaign with the imported data
175        
176        Examples
177        --------
178        >>> campaign = Campaign.from_existing_data(
179        ...     name="imported_sim",
180        ...     input_files=["run1/input.json", "run2/input.json"],
181        ...     output_files=["run1/output.csv", "run2/output.csv"]
182        ... )
183        """
184        from easyvvuq.utils.dataset_importer import create_campaign_from_files
185        
186        return create_campaign_from_files(
187            input_files=input_files,
188            output_files=output_files,
189            campaign_name=name,
190            work_dir=work_dir,
191            input_decoder=input_decoder,
192            output_decoder=output_decoder,
193            auto_infer=auto_infer
194        )
195
196    def __init__(
197            self,
198            name,
199            params=None,
200            actions=None,
201            db_location=None,
202            work_dir="./",
203            change_to_state=False,
204            verify_all_runs=True
205    ):
206
207        self.work_dir = os.path.realpath(os.path.expanduser(work_dir))
208        self.verify_all_runs = verify_all_runs
209
210        self.campaign_name = name
211        self._campaign_dir = None
212
213        if db_location is None:
214            self._campaign_dir = tempfile.mkdtemp(prefix=name, dir=self.work_dir)
215            self.db_location = "sqlite:///" + self._campaign_dir + "/campaign.db"
216        else:
217            self.db_location = db_location
218
219        self.campaign_id = None
220        self.campaign_db = None
221
222        self.last_analysis = None
223
224        self._active_app = None
225        self._active_app_name = None
226        self._active_app_actions = None
227
228        self._active_sampler = None
229        self._active_sampler_id = None
230
231        self.init_db(name, self.work_dir)
232        self._state_dir = None
233
234        # here we assume that the user wants to add an app
235        if (params is not None) and (actions is not None):
236            self.add_app(name=name, params=params, actions=actions)
237
238    @property
239    def campaign_dir(self):
240        """Get the path in which to load/save files related to the campaign.
241
242        Returns
243        -------
244        str
245            Path to the campaign directory - given as a subdirectory of the
246            working directory.
247        """
248
249        return os.path.join(self.work_dir, self._campaign_dir)
250
251    def init_db(self, name, work_dir='.'):
252        """Initialize the connection with the database and either resume or create the campaign.
253
254        Parameters
255        ----------
256        name: str
257            Name of the campaign.
258        work_dir: str
259            Work directory, defaults to cwd.
260        """
261        self.campaign_db = db.CampaignDB(location=self.db_location)
262        if self.campaign_db.campaign_exists(name):
263            self.campaign_id = self.campaign_db.get_campaign_id(name)
264            self._active_app_name = self.campaign_db.get_active_app()[0].name
265            self.campaign_name = name
266            self._campaign_dir = self.campaign_db.campaign_dir(name)
267            if not os.path.exists(self._campaign_dir):
268                message = (f"Campaign directory ({self.campaign_dir}) does not exist.")
269                raise RuntimeError(message)
270            self._active_sampler_id = self.campaign_db.get_sampler_id(self.campaign_id)
271            self._active_sampler = self.campaign_db.resurrect_sampler(self._active_sampler_id)
272            self.set_app(self._active_app_name)
273            self.campaign_db.resume_campaign(name)
274        else:
275            if self._campaign_dir is None:
276                self._campaign_dir = tempfile.mkdtemp(prefix=name, dir=work_dir)
277            info = CampaignInfo(
278                name=name,
279                campaign_dir_prefix=default_campaign_prefix,
280                easyvvuq_version=easyvvuq.__version__,
281                campaign_dir=self._campaign_dir)
282            self.campaign_db.create_campaign(info)
283            self.campaign_name = name
284            self.campaign_id = self.campaign_db.get_campaign_id(self.campaign_name)
285
286    def add_app(self, name=None, params=None, actions=None, set_active=True):
287        """Add an application to the CampaignDB.
288
289        Parameters
290        ----------
291        name : str
292            Name of the application.
293        params : dict
294            Description of the parameters to associate with the application.
295        actions : Actions
296            An instance of Actions containing actions to be executed
297        set_active: bool
298            Should the added app be set to be the currently active app?
299        """
300        # Verify input parameters dict
301        paramsspec = easyvvuq.ParamsSpecification(params, appname=name)
302        # validate application input
303        app = AppInfo(
304            name=name,
305            paramsspec=paramsspec,
306            actions=actions,
307        )
308        self.campaign_db.add_app(app)
309        if set_active:
310            self.set_app(app.name)
311
312    def set_app(self, app_name):
313        """Set active app for the campaign.
314
315        Application information is retrieved from `self.campaign_db`.
316
317        Parameters
318        ----------
319        app_name: str
320            Name of selected app, if `None` given then first app will be
321            selected.
322        """
323        self._active_app_name = app_name
324        self._active_app = self.campaign_db.app(name=app_name)
325        self.campaign_db.set_active_app(app_name)
326        # Resurrect the app encoder, decoder and collation elements
327        self._active_app_actions = self.campaign_db.resurrect_app(app_name)
328
329    def replace_actions(self, app_name, actions):
330        """Replace actions for an app with a given name.
331
332        Parameters
333        ----------
334        app_name: str
335            Name of the app.
336        actions: Actions
337            `Actions` instance, will replace the current `Actions` of an app.
338        """
339        self.campaign_db.replace_actions(app_name, actions)
340        self._active_app_actions = actions
341
342    def set_sampler(self, sampler, update=False):
343        """Set active sampler.
344
345        Parameters
346        ----------
347        sampler : Sampler
348            Sampler that will be used to create runs for the current campaign.
349        update : bool
350            If set to True it will not add the sampler to the database, just change
351            it as the active sampler.
352        """
353        self._active_sampler = sampler
354        if not update:
355            self._active_sampler_id = self.campaign_db.add_sampler(sampler)
356            sampler.sampler_id = self._active_sampler_id
357        self._active_sampler_id = self._active_sampler.sampler_id
358        self.campaign_db.set_sampler(self.campaign_id, self._active_sampler.sampler_id)
359
360    def add_external_runs(self, input_files, output_files, input_decoder, output_decoder,
361                         validate_params=True, run_prefix="external_run"):
362        """Takes a list of files and adds them to the database. This method is to be
363        used when adding runs to the EasyVVUQ database that were not executed using
364        EasyVVUQ.
365
366        Parameters
367        ----------
368        input_files: list of str
369            A list of input file paths to be loaded to the database.
370        output_files: list of str
371            A list of output file paths to be loaded to the database.
372        input_decoder: Decoder
373            A decoder that will be used to parse input files.
374        output_decoder: Decoder
375            A decoder that will be used to parse output files.
376        validate_params: bool, optional
377            Whether to validate parameters against the app definition (default: True)
378        run_prefix: str, optional
379            Prefix for run names (default: "external_run")
380        """
381        if self._active_app is None:
382            msg = ("No app is currently set for this campaign. "
383                   "Use set_app('name_of_app') or add_app() first.")
384            logging.error(msg)
385            raise Exception(msg)
386        
387        if len(input_files) != len(output_files):
388            raise ValueError("Number of input files must match number of output files")
389        
390        inputs = []
391        outputs = []
392        failed_runs = []
393        
394        # Parse input files
395        for i, input_file in enumerate(input_files):
396            try:
397                input_decoder.target_filename = os.path.basename(input_file)
398                params = input_decoder.parse_sim_output({'run_dir': os.path.dirname(input_file)})
399                
400                # Validate parameters if requested
401                if validate_params:
402                    try:
403                        app_default_params = self._active_app["params"]
404                        validated_params = app_default_params.process_run(params, verify=self.verify_all_runs)
405                        inputs.append(validated_params)
406                    except Exception as e:
407                        logging.warning(f"Parameter validation failed for {input_file}: {e}")
408                        failed_runs.append(i)
409                        continue
410                else:
411                    inputs.append(params)
412                    
413            except Exception as e:
414                logging.error(f"Failed to parse input file {input_file}: {e}")
415                failed_runs.append(i)
416                continue
417        
418        # Parse output files
419        for i, output_file in enumerate(output_files):
420            if i in failed_runs:
421                continue
422                
423            try:
424                output_decoder.target_filename = os.path.basename(output_file)
425                result = output_decoder.parse_sim_output({'run_dir': os.path.dirname(output_file)})
426                outputs.append(result)
427            except Exception as e:
428                logging.error(f"Failed to parse output file {output_file}: {e}")
429                failed_runs.append(i)
430                continue
431        
432        # Add runs to database
433        run_counter = 0
434        for i, (params, result) in enumerate(zip(inputs, outputs)):
435            if i in failed_runs:
436                continue
437                
438            run_counter += 1
439            table = db.RunTable(run_name=f'{run_prefix}_{run_counter}',
440                                app=self._active_app['id'],
441                                params=json.dumps(params),
442                                status=Status.COLLATED,
443                                run_dir=self.get_campaign_runs_dir(),
444                                result=json.dumps(result),
445                                campaign=self.campaign_id,
446                                sampler=self._active_sampler_id)
447            self.campaign_db.session.add(table)
448        
449        # Commit all changes at once
450        self.campaign_db.session.commit()
451        
452        logging.info(f"Successfully imported {run_counter} runs")
453        if failed_runs:
454            logging.warning(f"Failed to import {len(failed_runs)} runs due to parsing or validation errors")
455
456    def add_runs(self, runs, mark_invalid=False):
457        """Add runs to the database.
458
459        Parameters
460        ----------
461        runs : list of dicts
462            Each dict defines the value of each model parameter listed in
463            self.params_info for a run to be added to self.runs
464        mark_invalid : bool
465            Will mark runs that fail verification as invalid (but will not raise an exception)
466        """
467        if self._active_app is None:
468            msg = ("No app is currently set for this campaign. "
469                   "Use set_app('name_of_app').")
470            logging.error(msg)
471            raise Exception(msg)
472        app_default_params = self._active_app["params"]
473        run_info_list = []
474        for new_run in runs:
475            if new_run is None:
476                msg = ("add_run() was passed new_run of type None. Bad sampler?")
477                logging.error(msg)
478                raise Exception(msg)
479            # Verify and complete run with missing/default param values
480            status = Status.NEW
481            try:
482                new_run = app_default_params.process_run(new_run, verify=self.verify_all_runs)
483            except RuntimeError:
484                if mark_invalid:
485                    new_run = app_default_params.process_run(new_run, verify=False)
486                    status = Status.INVALID
487                else:
488                    raise
489            # Add to run queue
490            run_info = RunInfo(app=self._active_app['id'],
491                               params=new_run,
492                               sample=self._active_sampler_id,
493                               campaign=self.campaign_id,
494                               status=status)
495            run_info_list.append(run_info)
496        self.campaign_db.add_runs(run_info_list, iteration=self._active_sampler.iteration)
497
498    def draw_samples(self, num_samples=0, mark_invalid=False):
499        """Draws `num_samples` sets of parameters from the currently set
500        sampler, resulting in `num_samples` new runs added to the
501        runs list. If `num_samples` is 0 (its default value) then
502        this method draws ALL samples from the sampler, until exhaustion (this
503        will fail if the sampler is not finite).
504
505        Parameters
506        ----------
507        num_samples : int
508            Number of samples to draw from the active sampling element.
509            By default is 0 (draw ALL samples)
510        mark_invalid : bool
511            If True will mark runs that go outside valid parameter range as INVALID.
512            This is useful for MCMC style methods where you want those runs to evaluate
513            to low probabilities.
514        """
515        # Make sure `num_samples` is not 0 for an infinite generator
516        # (this would add runs forever...)
517        if not self._active_sampler.is_finite() and num_samples <= 0:
518            msg = (f"Sampling_element '{self._active_sampler.element_name()}' "
519                   f"is an infinite generator, therefore a finite number of "
520                   f"draws (n > 0) must be specified.")
521            raise RuntimeError(msg)
522        num_added = 0
523        new_runs = []
524        for new_run in self._active_sampler:
525            new_runs.append(new_run)
526            num_added += 1
527            if num_samples != 0 and num_added >= num_samples:
528                break
529        self.add_runs(new_runs, mark_invalid)
530        # Write sampler's new state to database
531        self.campaign_db.update_sampler(self._active_sampler_id, self._active_sampler)
532        return new_runs
533
534    def list_runs(self, sampler=None, campaign=None, app_id=None, status=None):
535        """Get list of runs in the CampaignDB.
536
537        Parameters
538        ----------
539        sampler: int
540            Sampler id to filter for.
541        campaign: int
542            Campaign id to filter for.
543        app_id: int
544            App id to filter for.
545        status: Status
546            Status to filter for.
547
548        Returns
549        -------
550        list of runs
551        """
552        return list(self.campaign_db.runs(
553            sampler=sampler, campaign=campaign, app_id=app_id, status=status))
554
555    def get_campaign_runs_dir(self):
556        """Get the runs directory from the CampaignDB.
557
558        Returns
559        -------
560        str
561            Path in which the runs information will be written.
562        """
563        return self.campaign_db.runs_dir(self.campaign_name)
564
565    def relocate(self, campaign_dir):
566        """Relocate the campaign by specifying a new path where campaign is located.
567
568        Parameters
569        ----------
570        new_path: str
571            new runs directory
572        """
573        if not os.path.exists(campaign_dir):
574            raise RuntimeError("specified directory does not exist: {}".format(campaign_dir))
575        self.campaign_db.relocate(campaign_dir, self.campaign_name)
576
577    def execute(self, nsamples=0, pool=None, mark_invalid=False, sequential=False):
578        """This will draw samples and execute the Actions on those samples.
579
580        Parameters
581        ----------
582        nsamples: int
583            Number of samples to draw. For infinite samplers or when you want to process
584            samples in batches.
585        pool: Executor
586            A pool object to be used when processing runs (e.g. instance of `ThreadPoolExecutor` or
587            `ProcessPoolExecutor`).
588        mark_invalid: bool
589            Mark runs that go outside the specified input parameter range as INVALID.
590        sequential: bool
591            Whether to process samples sequentially (sometimes more efficient or you might
592            want to avoid the concurrent module for some reason).
593        """
594        self.draw_samples(nsamples, mark_invalid=mark_invalid)
595        action_pool = self.apply_for_each_sample(
596            self._active_app_actions, sequential=sequential)
597        return action_pool.start(pool=pool)
598
599    def apply_for_each_sample(self, actions, status=Status.NEW, sequential=False):
600        """For each run in this Campaign's run list, apply the specified action
601        (an object of type Action).
602
603        Parameters
604        ----------
605        actions: Actions
606            Actions to be applied to each relevant run in the database.
607        status: Status
608            Will apply the Actions only to those runs whose status is as specified.
609        sequential: bool
610            Whether to process samples sequentially (sometimes more efficient or you might
611            want to avoid the concurrent module for some reason).
612
613        Returns
614        -------
615        ActionPool
616            An object containing ActionStatus instances to track action execution.
617        """
618        # Loop through all runs in this campaign with status ENCODED, and
619        # run the specified action on each run's dir
620        def inits():
621            for run_id, run_data in self.campaign_db.runs(
622                    status=status, app_id=self._active_app['id']):
623                previous = {}
624                previous['run_id'] = run_id
625                previous['campaign_dir'] = self._campaign_dir
626                previous['rundir'] = run_data['run_dir']
627                previous['run_info'] = run_data
628                previous['result'] = {}
629                previous['collated'] = False
630                yield previous
631        return ActionPool(self, actions, inits=inits(), sequential=sequential)
632
633    def iterate(self, nsamples=0, pool=None, mark_invalid=False, sequential=False):
634        """This is the equivalent of `execute` for methods that rely on the output of the
635        previous sampling stage (designed for MCMC, should work for others).
636
637        Parameters
638        ----------
639        nsamples : int
640            Number of samples to draw (during a single iteration).
641        pool : Executor
642            An Executor instance. For example ThreadPoolExecutor or a Dask Client. Defaults
643            to the ThreadPoolExecutor.
644        mark_invalid : bool
645            Mark runs that go outside the specified input parameter range as INVALID.
646        sequential: bool
647            Will execute the `Actions` associated with runs sequentially. Might be more
648            efficient in some situations.
649
650        Yields
651        ------
652        ActionPool
653            An object containing Futures instances to track action execution.
654        """
655        while True:
656            self.draw_samples(nsamples, mark_invalid=mark_invalid)
657            action_pool = self.apply_for_each_sample(
658                self._active_app_actions, sequential=sequential)
659            yield action_pool.start(pool=pool)
660            result = self.get_collation_result(last_iteration=True)
661            invalid = self.get_invalid_runs(last_iteration=True)
662            ignored_runs = self._active_sampler.update(result, invalid)
663            for run_id in ignored_runs:
664                self.campaign_db.session.query(db.RunTable).\
665                    filter(db.RunTable.id == int(run_id)).\
666                    update({'status': easyvvuq.constants.Status.IGNORED})
667            self.campaign_db.session.commit()
668
669    def recollate(self):
670        """Clears the current collation table, changes all COLLATED status runs
671           back to ENCODED, then runs collate() again
672        """
673        collated_run_ids = list(self.campaign_db.run_ids(status=Status.COLLATED))
674        self.campaign_db.set_run_statuses(collated_run_ids, Status.ENCODED)
675        self.collate()
676
677    def get_collation_result(self, last_iteration=False):
678        """Return dataframe containing all collated results
679
680        Parameters
681        ----------
682        last_iteration : bool
683            Will only return the result of the last iteration.
684
685        Returns
686        -------
687        DataFrame
688            A DataFrame with the simulation results along with the inputs
689            used to produce them.
690        """
691        if last_iteration:
692            iteration = self._active_sampler.iteration - 1
693        else:
694            iteration = -1
695        return self.campaign_db.get_results(
696            self._active_app['name'],
697            self._active_sampler_id,
698            status=easyvvuq.constants.Status.COLLATED,
699            iteration=iteration)
700
701    def get_invalid_runs(self, last_iteration=False):
702        """Return dataframe containing all results marked as INVALID.
703
704        Parameters
705        ----------
706        last_iteration : bool
707            Will only return the result of the last iteration.
708
709        Returns
710        -------
711        DataFrame
712            A DataFrame with the results form simulations that were marked as INVALID.
713            These will usually be the ones that went outside the specified parameter ranges.
714            These still have to be accounted for in some way by some methods (e.g. MCMC).
715        """
716        if last_iteration:
717            iteration = self._active_sampler.iteration - 1
718        else:
719            iteration = -1
720        return self.campaign_db.get_results(
721            self._active_app['name'],
722            self._active_sampler_id,
723            status=easyvvuq.constants.Status.INVALID,
724            iteration=iteration)
725
726    def apply_analysis(self, analysis):
727        """Run the `analysis` element on the output of the last run collation.
728
729        Parameters
730        ----------
731        analysis : Analysis
732            Element that performs a VVUQ analysis on a dataframe summary of
733            run outputs.
734        """
735        # Apply analysis element to most recent collation result
736        self.last_analysis = analysis.analyse(data_frame=self.get_collation_result())
737
738    def analyse(self, **kwargs):
739        """If available will call an appropriate analysis class on the collation result.
740
741        Parameters
742        ----------
743        **kwargs : dict
744            Argument to the analysis class constructor (after sampler).
745
746        Returns
747        -------
748        AnalysisResults
749            An object representing analysis results. Can be used to interact with those results
750            in some way. Plot, retrieve surrogate models and so on.
751            See `easyvvuq.analysis.AnalysisResults` for further information.
752        """
753        collation_result = self.get_collation_result()
754        try:
755            analysis = self._active_sampler.analysis_class(sampler=self._active_sampler, **kwargs)
756            return analysis.analyse(collation_result)
757        except NotImplementedError:
758            raise RuntimeError("This sampler does not have a corresponding analysis class")
759
760    def get_last_analysis(self):
761        """Return the output of the most recently run analysis element.
762        """
763        if self.last_analysis is None:
764            logging.warning("No last analysis output available.")
765        return self.last_analysis
766
767    def __str__(self):
768        """Returns formatted summary of the current Campaign state.
769        Enables class to work with standard print() method
770        """
771        return (f"db_location = {self.db_location}\n"
772                f"active_sampler_id = {self._active_sampler_id}\n"
773                f"campaign_name = {self.campaign_name}\n"
774                f"campaign_dir = {self.campaign_dir}\n"
775                f"campaign_id = {self.campaign_id}\n")
776
777    def get_active_sampler(self):
778        """Return the active sampler element in use by this campaign.
779
780        Returns
781        -------
782        The sampler currently in use
783        """
784
785        return self._active_sampler
786
787    def ignore_runs(self, list_of_run_IDs):
788        """Flags the specified runs to be IGNORED in future collation. Note that
789        this does NOT remove previously collated results from the collation table.
790        For that you must refresh the collation by running recollate().
791
792        Parameters
793        ----------
794        list
795            The list of run IDs for the runs that should be set to status IGNORED
796        """
797        self.campaign_db.set_run_statuses(list_of_run_IDs, Status.IGNORED)
798
799    def rerun(self, list_of_run_IDs):
800        """Sets the status of the specified runs to ENCODED, so that their results
801        may be recollated later (presumably after extending, rerunning or otherwise
802        modifying the data in the relevant run folder). Note that this method will
803        NOT perform any execution - it simply flags the run in EasyVVUQ as being
804        uncollated. Actual execution is (as usual) the job of the user or middleware.
805
806        Parameters
807        ----------
808        list
809            The list of run IDs for the runs that should be set to status ENCODED
810        """
811
812        for run_ID in list_of_run_IDs:
813            status = self.campaign_db.get_run_status(run_ID)
814            if status == Status.NEW:
815                msg = (f"Cannot rerun {run_ID} as it has status NEW, and must"
816                       f"be encoded before execution.")
817                raise RuntimeError(msg)
818        self.campaign_db.set_run_statuses(list_of_run_IDs, Status.ENCODED)
819
820    def get_active_app(self):
821        """Returns a dict of information regarding the application that is currently
822        set for this campaign.
823        """
824        return self._active_app
logger = <Logger easyvvuq.campaign (DEBUG)>
class Campaign:
 45class Campaign:
 46    """Campaigns organise the dataflow in EasyVVUQ workflows.
 47
 48    The Campaign functions as as state machine for the VVUQ workflows. It uses a
 49    database (CampaignDB) to store information on both the target application
 50    and the VVUQ algorithms being employed. It also collects data from the simulations
 51    and can be used to store and resume your state.
 52
 53    Notes
 54    -----
 55
 56    Multiple campaigns can be combined in a CampaignDB. Hence the particular
 57    campaign we are currently working on will be specified using `campaign_id`.
 58
 59    Parameters
 60    ----------
 61    name: str
 62        Name of the Campaign. Freely chosen, serves as a human-readable way of distinguishing
 63        between several campaigns in the same database.
 64    params: dict, optional
 65        Description of the parameters to associated with the application. Will be used to create
 66        an app when creating the campaign. It is also possible to add apps manually using `add_app`
 67        method of the Campaign class. But this can be a useful shorthand when working with single
 68        app campaigns. To use this functionality both `params` and `actions` has to be specified.
 69        The name of this app will be the same as the name of the Campaign.
 70    actions: Actions, optional
 71        Actions object associated with an application. See description of the `params` parameter
 72        for more details.
 73    db_location: str, optional
 74        Location of the underlying campaign database - either a path or
 75        acceptable URI for SQLAlchemy.
 76    work_dir: str, optional, default='./'
 77        Path to working directory - used to store campaign directory.
 78    change_to_state: bool, optional, default=False
 79        Should we change to the directory containing any specified `state_file`
 80        in order to make relative paths work.
 81    verify_all_runs: bool, optional, default=True
 82        Check all new runs being added for unrecognised params (not defined for the currently set
 83        app), values lying within defined physical range, type checking etc. This should normally
 84        always be set to True, but in cases where the performance is too degraded, the checks can
 85        be disabled by setting to False.
 86
 87    Attributes
 88    ----------
 89    campaign_name : str or None
 90        Name for the campaign/workflow.
 91    _campaign_dir: str or None
 92        Path to the directory campaign uses for local storage (runs inputs etc)
 93    db_location : str or None
 94        Location of the underlying campaign database - either a path or
 95        acceptable URI for SQLAlchemy.
 96    _log: list
 97        The log of all elements that have been applied, with information about
 98        their application
 99    campaign_id : int
100        ID number for the current campaign in the db.CampaignDB.
101    campaign_db: easyvvuq.db.Basedb.CampaignDB
102        A campaign database object
103    last_analysis:
104        The result of the most recent analysis carried out on this campaign
105    _active_app: dict
106        Info about currently set app
107    _active_app_name: str
108        Name of currently set app
109    _active_sampler_id: int
110        The database id of the currently set Sampler object
111
112    Examples
113    --------
114    A typical instantiation might look like this.
115
116    >>> params = {
117            "S0": {"type": "float", "default": 997},
118            "I0": {"type": "float", "default": 3},
119            "beta": {"type": "float", "default": 0.2},
120            "gamma": {"type": "float", "default": 0.04, "min": 0.0, "max": 1.0},
121            "iterations": {"type": "integer", "default": 100},
122            "outfile": {"type": "string", "default": "output.csv"}
123        }
124    >>> encoder = uq.encoders.GenericEncoder(template_fname='sir.template', delimiter='$', target_filename='input.json')
125    >>> decoder = uq.decoders.SimpleCSV(target_filename='output.csv', output_columns=['I'])
126    >>> actions = uq.actions.local_execute(encoder, os.path.abspath('sir') + ' input.json', decoder)
127    >>> campaign = uq.Campaign(name='sir', params=params, actions=actions)
128
129    A simplified one (without an app) might look simply like this.
130
131    >>> campaign = Campaign('simple')
132
133    An app then can be added.
134
135    >>> campaign.add_app('simple_app', params=params, actions=actions)
136    """
137
138    @staticmethod
139    def from_existing_data(name, 
140                          input_files, 
141                          output_files, 
142                          input_decoder=None, 
143                          output_decoder=None,
144                          params=None,
145                          output_columns=None,
146                          work_dir="./",
147                          auto_infer=True):
148        """
149        Create a campaign from existing data files.
150        
151        Parameters
152        ----------
153        name : str
154            Name of the campaign
155        input_files : list of str
156            List of input file paths
157        output_files : list of str
158            List of output file paths
159        input_decoder : Decoder, optional
160            Decoder for input files (auto-created if None)
161        output_decoder : Decoder, optional
162            Decoder for output files (auto-created if None)
163        params : dict, optional
164            Parameter definitions (auto-inferred if None and auto_infer=True)
165        output_columns : list of str, optional
166            Output column names (auto-inferred if None and auto_infer=True)
167        work_dir : str, optional
168            Working directory (default: "./")
169        auto_infer : bool, optional
170            Whether to automatically infer parameters and outputs (default: True)
171        
172        Returns
173        -------
174        Campaign
175            A new campaign with the imported data
176        
177        Examples
178        --------
179        >>> campaign = Campaign.from_existing_data(
180        ...     name="imported_sim",
181        ...     input_files=["run1/input.json", "run2/input.json"],
182        ...     output_files=["run1/output.csv", "run2/output.csv"]
183        ... )
184        """
185        from easyvvuq.utils.dataset_importer import create_campaign_from_files
186        
187        return create_campaign_from_files(
188            input_files=input_files,
189            output_files=output_files,
190            campaign_name=name,
191            work_dir=work_dir,
192            input_decoder=input_decoder,
193            output_decoder=output_decoder,
194            auto_infer=auto_infer
195        )
196
197    def __init__(
198            self,
199            name,
200            params=None,
201            actions=None,
202            db_location=None,
203            work_dir="./",
204            change_to_state=False,
205            verify_all_runs=True
206    ):
207
208        self.work_dir = os.path.realpath(os.path.expanduser(work_dir))
209        self.verify_all_runs = verify_all_runs
210
211        self.campaign_name = name
212        self._campaign_dir = None
213
214        if db_location is None:
215            self._campaign_dir = tempfile.mkdtemp(prefix=name, dir=self.work_dir)
216            self.db_location = "sqlite:///" + self._campaign_dir + "/campaign.db"
217        else:
218            self.db_location = db_location
219
220        self.campaign_id = None
221        self.campaign_db = None
222
223        self.last_analysis = None
224
225        self._active_app = None
226        self._active_app_name = None
227        self._active_app_actions = None
228
229        self._active_sampler = None
230        self._active_sampler_id = None
231
232        self.init_db(name, self.work_dir)
233        self._state_dir = None
234
235        # here we assume that the user wants to add an app
236        if (params is not None) and (actions is not None):
237            self.add_app(name=name, params=params, actions=actions)
238
239    @property
240    def campaign_dir(self):
241        """Get the path in which to load/save files related to the campaign.
242
243        Returns
244        -------
245        str
246            Path to the campaign directory - given as a subdirectory of the
247            working directory.
248        """
249
250        return os.path.join(self.work_dir, self._campaign_dir)
251
252    def init_db(self, name, work_dir='.'):
253        """Initialize the connection with the database and either resume or create the campaign.
254
255        Parameters
256        ----------
257        name: str
258            Name of the campaign.
259        work_dir: str
260            Work directory, defaults to cwd.
261        """
262        self.campaign_db = db.CampaignDB(location=self.db_location)
263        if self.campaign_db.campaign_exists(name):
264            self.campaign_id = self.campaign_db.get_campaign_id(name)
265            self._active_app_name = self.campaign_db.get_active_app()[0].name
266            self.campaign_name = name
267            self._campaign_dir = self.campaign_db.campaign_dir(name)
268            if not os.path.exists(self._campaign_dir):
269                message = (f"Campaign directory ({self.campaign_dir}) does not exist.")
270                raise RuntimeError(message)
271            self._active_sampler_id = self.campaign_db.get_sampler_id(self.campaign_id)
272            self._active_sampler = self.campaign_db.resurrect_sampler(self._active_sampler_id)
273            self.set_app(self._active_app_name)
274            self.campaign_db.resume_campaign(name)
275        else:
276            if self._campaign_dir is None:
277                self._campaign_dir = tempfile.mkdtemp(prefix=name, dir=work_dir)
278            info = CampaignInfo(
279                name=name,
280                campaign_dir_prefix=default_campaign_prefix,
281                easyvvuq_version=easyvvuq.__version__,
282                campaign_dir=self._campaign_dir)
283            self.campaign_db.create_campaign(info)
284            self.campaign_name = name
285            self.campaign_id = self.campaign_db.get_campaign_id(self.campaign_name)
286
287    def add_app(self, name=None, params=None, actions=None, set_active=True):
288        """Add an application to the CampaignDB.
289
290        Parameters
291        ----------
292        name : str
293            Name of the application.
294        params : dict
295            Description of the parameters to associate with the application.
296        actions : Actions
297            An instance of Actions containing actions to be executed
298        set_active: bool
299            Should the added app be set to be the currently active app?
300        """
301        # Verify input parameters dict
302        paramsspec = easyvvuq.ParamsSpecification(params, appname=name)
303        # validate application input
304        app = AppInfo(
305            name=name,
306            paramsspec=paramsspec,
307            actions=actions,
308        )
309        self.campaign_db.add_app(app)
310        if set_active:
311            self.set_app(app.name)
312
313    def set_app(self, app_name):
314        """Set active app for the campaign.
315
316        Application information is retrieved from `self.campaign_db`.
317
318        Parameters
319        ----------
320        app_name: str
321            Name of selected app, if `None` given then first app will be
322            selected.
323        """
324        self._active_app_name = app_name
325        self._active_app = self.campaign_db.app(name=app_name)
326        self.campaign_db.set_active_app(app_name)
327        # Resurrect the app encoder, decoder and collation elements
328        self._active_app_actions = self.campaign_db.resurrect_app(app_name)
329
330    def replace_actions(self, app_name, actions):
331        """Replace actions for an app with a given name.
332
333        Parameters
334        ----------
335        app_name: str
336            Name of the app.
337        actions: Actions
338            `Actions` instance, will replace the current `Actions` of an app.
339        """
340        self.campaign_db.replace_actions(app_name, actions)
341        self._active_app_actions = actions
342
343    def set_sampler(self, sampler, update=False):
344        """Set active sampler.
345
346        Parameters
347        ----------
348        sampler : Sampler
349            Sampler that will be used to create runs for the current campaign.
350        update : bool
351            If set to True it will not add the sampler to the database, just change
352            it as the active sampler.
353        """
354        self._active_sampler = sampler
355        if not update:
356            self._active_sampler_id = self.campaign_db.add_sampler(sampler)
357            sampler.sampler_id = self._active_sampler_id
358        self._active_sampler_id = self._active_sampler.sampler_id
359        self.campaign_db.set_sampler(self.campaign_id, self._active_sampler.sampler_id)
360
361    def add_external_runs(self, input_files, output_files, input_decoder, output_decoder,
362                         validate_params=True, run_prefix="external_run"):
363        """Takes a list of files and adds them to the database. This method is to be
364        used when adding runs to the EasyVVUQ database that were not executed using
365        EasyVVUQ.
366
367        Parameters
368        ----------
369        input_files: list of str
370            A list of input file paths to be loaded to the database.
371        output_files: list of str
372            A list of output file paths to be loaded to the database.
373        input_decoder: Decoder
374            A decoder that will be used to parse input files.
375        output_decoder: Decoder
376            A decoder that will be used to parse output files.
377        validate_params: bool, optional
378            Whether to validate parameters against the app definition (default: True)
379        run_prefix: str, optional
380            Prefix for run names (default: "external_run")
381        """
382        if self._active_app is None:
383            msg = ("No app is currently set for this campaign. "
384                   "Use set_app('name_of_app') or add_app() first.")
385            logging.error(msg)
386            raise Exception(msg)
387        
388        if len(input_files) != len(output_files):
389            raise ValueError("Number of input files must match number of output files")
390        
391        inputs = []
392        outputs = []
393        failed_runs = []
394        
395        # Parse input files
396        for i, input_file in enumerate(input_files):
397            try:
398                input_decoder.target_filename = os.path.basename(input_file)
399                params = input_decoder.parse_sim_output({'run_dir': os.path.dirname(input_file)})
400                
401                # Validate parameters if requested
402                if validate_params:
403                    try:
404                        app_default_params = self._active_app["params"]
405                        validated_params = app_default_params.process_run(params, verify=self.verify_all_runs)
406                        inputs.append(validated_params)
407                    except Exception as e:
408                        logging.warning(f"Parameter validation failed for {input_file}: {e}")
409                        failed_runs.append(i)
410                        continue
411                else:
412                    inputs.append(params)
413                    
414            except Exception as e:
415                logging.error(f"Failed to parse input file {input_file}: {e}")
416                failed_runs.append(i)
417                continue
418        
419        # Parse output files
420        for i, output_file in enumerate(output_files):
421            if i in failed_runs:
422                continue
423                
424            try:
425                output_decoder.target_filename = os.path.basename(output_file)
426                result = output_decoder.parse_sim_output({'run_dir': os.path.dirname(output_file)})
427                outputs.append(result)
428            except Exception as e:
429                logging.error(f"Failed to parse output file {output_file}: {e}")
430                failed_runs.append(i)
431                continue
432        
433        # Add runs to database
434        run_counter = 0
435        for i, (params, result) in enumerate(zip(inputs, outputs)):
436            if i in failed_runs:
437                continue
438                
439            run_counter += 1
440            table = db.RunTable(run_name=f'{run_prefix}_{run_counter}',
441                                app=self._active_app['id'],
442                                params=json.dumps(params),
443                                status=Status.COLLATED,
444                                run_dir=self.get_campaign_runs_dir(),
445                                result=json.dumps(result),
446                                campaign=self.campaign_id,
447                                sampler=self._active_sampler_id)
448            self.campaign_db.session.add(table)
449        
450        # Commit all changes at once
451        self.campaign_db.session.commit()
452        
453        logging.info(f"Successfully imported {run_counter} runs")
454        if failed_runs:
455            logging.warning(f"Failed to import {len(failed_runs)} runs due to parsing or validation errors")
456
457    def add_runs(self, runs, mark_invalid=False):
458        """Add runs to the database.
459
460        Parameters
461        ----------
462        runs : list of dicts
463            Each dict defines the value of each model parameter listed in
464            self.params_info for a run to be added to self.runs
465        mark_invalid : bool
466            Will mark runs that fail verification as invalid (but will not raise an exception)
467        """
468        if self._active_app is None:
469            msg = ("No app is currently set for this campaign. "
470                   "Use set_app('name_of_app').")
471            logging.error(msg)
472            raise Exception(msg)
473        app_default_params = self._active_app["params"]
474        run_info_list = []
475        for new_run in runs:
476            if new_run is None:
477                msg = ("add_run() was passed new_run of type None. Bad sampler?")
478                logging.error(msg)
479                raise Exception(msg)
480            # Verify and complete run with missing/default param values
481            status = Status.NEW
482            try:
483                new_run = app_default_params.process_run(new_run, verify=self.verify_all_runs)
484            except RuntimeError:
485                if mark_invalid:
486                    new_run = app_default_params.process_run(new_run, verify=False)
487                    status = Status.INVALID
488                else:
489                    raise
490            # Add to run queue
491            run_info = RunInfo(app=self._active_app['id'],
492                               params=new_run,
493                               sample=self._active_sampler_id,
494                               campaign=self.campaign_id,
495                               status=status)
496            run_info_list.append(run_info)
497        self.campaign_db.add_runs(run_info_list, iteration=self._active_sampler.iteration)
498
499    def draw_samples(self, num_samples=0, mark_invalid=False):
500        """Draws `num_samples` sets of parameters from the currently set
501        sampler, resulting in `num_samples` new runs added to the
502        runs list. If `num_samples` is 0 (its default value) then
503        this method draws ALL samples from the sampler, until exhaustion (this
504        will fail if the sampler is not finite).
505
506        Parameters
507        ----------
508        num_samples : int
509            Number of samples to draw from the active sampling element.
510            By default is 0 (draw ALL samples)
511        mark_invalid : bool
512            If True will mark runs that go outside valid parameter range as INVALID.
513            This is useful for MCMC style methods where you want those runs to evaluate
514            to low probabilities.
515        """
516        # Make sure `num_samples` is not 0 for an infinite generator
517        # (this would add runs forever...)
518        if not self._active_sampler.is_finite() and num_samples <= 0:
519            msg = (f"Sampling_element '{self._active_sampler.element_name()}' "
520                   f"is an infinite generator, therefore a finite number of "
521                   f"draws (n > 0) must be specified.")
522            raise RuntimeError(msg)
523        num_added = 0
524        new_runs = []
525        for new_run in self._active_sampler:
526            new_runs.append(new_run)
527            num_added += 1
528            if num_samples != 0 and num_added >= num_samples:
529                break
530        self.add_runs(new_runs, mark_invalid)
531        # Write sampler's new state to database
532        self.campaign_db.update_sampler(self._active_sampler_id, self._active_sampler)
533        return new_runs
534
535    def list_runs(self, sampler=None, campaign=None, app_id=None, status=None):
536        """Get list of runs in the CampaignDB.
537
538        Parameters
539        ----------
540        sampler: int
541            Sampler id to filter for.
542        campaign: int
543            Campaign id to filter for.
544        app_id: int
545            App id to filter for.
546        status: Status
547            Status to filter for.
548
549        Returns
550        -------
551        list of runs
552        """
553        return list(self.campaign_db.runs(
554            sampler=sampler, campaign=campaign, app_id=app_id, status=status))
555
556    def get_campaign_runs_dir(self):
557        """Get the runs directory from the CampaignDB.
558
559        Returns
560        -------
561        str
562            Path in which the runs information will be written.
563        """
564        return self.campaign_db.runs_dir(self.campaign_name)
565
566    def relocate(self, campaign_dir):
567        """Relocate the campaign by specifying a new path where campaign is located.
568
569        Parameters
570        ----------
571        new_path: str
572            new runs directory
573        """
574        if not os.path.exists(campaign_dir):
575            raise RuntimeError("specified directory does not exist: {}".format(campaign_dir))
576        self.campaign_db.relocate(campaign_dir, self.campaign_name)
577
578    def execute(self, nsamples=0, pool=None, mark_invalid=False, sequential=False):
579        """This will draw samples and execute the Actions on those samples.
580
581        Parameters
582        ----------
583        nsamples: int
584            Number of samples to draw. For infinite samplers or when you want to process
585            samples in batches.
586        pool: Executor
587            A pool object to be used when processing runs (e.g. instance of `ThreadPoolExecutor` or
588            `ProcessPoolExecutor`).
589        mark_invalid: bool
590            Mark runs that go outside the specified input parameter range as INVALID.
591        sequential: bool
592            Whether to process samples sequentially (sometimes more efficient or you might
593            want to avoid the concurrent module for some reason).
594        """
595        self.draw_samples(nsamples, mark_invalid=mark_invalid)
596        action_pool = self.apply_for_each_sample(
597            self._active_app_actions, sequential=sequential)
598        return action_pool.start(pool=pool)
599
600    def apply_for_each_sample(self, actions, status=Status.NEW, sequential=False):
601        """For each run in this Campaign's run list, apply the specified action
602        (an object of type Action).
603
604        Parameters
605        ----------
606        actions: Actions
607            Actions to be applied to each relevant run in the database.
608        status: Status
609            Will apply the Actions only to those runs whose status is as specified.
610        sequential: bool
611            Whether to process samples sequentially (sometimes more efficient or you might
612            want to avoid the concurrent module for some reason).
613
614        Returns
615        -------
616        ActionPool
617            An object containing ActionStatus instances to track action execution.
618        """
619        # Loop through all runs in this campaign with status ENCODED, and
620        # run the specified action on each run's dir
621        def inits():
622            for run_id, run_data in self.campaign_db.runs(
623                    status=status, app_id=self._active_app['id']):
624                previous = {}
625                previous['run_id'] = run_id
626                previous['campaign_dir'] = self._campaign_dir
627                previous['rundir'] = run_data['run_dir']
628                previous['run_info'] = run_data
629                previous['result'] = {}
630                previous['collated'] = False
631                yield previous
632        return ActionPool(self, actions, inits=inits(), sequential=sequential)
633
634    def iterate(self, nsamples=0, pool=None, mark_invalid=False, sequential=False):
635        """This is the equivalent of `execute` for methods that rely on the output of the
636        previous sampling stage (designed for MCMC, should work for others).
637
638        Parameters
639        ----------
640        nsamples : int
641            Number of samples to draw (during a single iteration).
642        pool : Executor
643            An Executor instance. For example ThreadPoolExecutor or a Dask Client. Defaults
644            to the ThreadPoolExecutor.
645        mark_invalid : bool
646            Mark runs that go outside the specified input parameter range as INVALID.
647        sequential: bool
648            Will execute the `Actions` associated with runs sequentially. Might be more
649            efficient in some situations.
650
651        Yields
652        ------
653        ActionPool
654            An object containing Futures instances to track action execution.
655        """
656        while True:
657            self.draw_samples(nsamples, mark_invalid=mark_invalid)
658            action_pool = self.apply_for_each_sample(
659                self._active_app_actions, sequential=sequential)
660            yield action_pool.start(pool=pool)
661            result = self.get_collation_result(last_iteration=True)
662            invalid = self.get_invalid_runs(last_iteration=True)
663            ignored_runs = self._active_sampler.update(result, invalid)
664            for run_id in ignored_runs:
665                self.campaign_db.session.query(db.RunTable).\
666                    filter(db.RunTable.id == int(run_id)).\
667                    update({'status': easyvvuq.constants.Status.IGNORED})
668            self.campaign_db.session.commit()
669
670    def recollate(self):
671        """Clears the current collation table, changes all COLLATED status runs
672           back to ENCODED, then runs collate() again
673        """
674        collated_run_ids = list(self.campaign_db.run_ids(status=Status.COLLATED))
675        self.campaign_db.set_run_statuses(collated_run_ids, Status.ENCODED)
676        self.collate()
677
678    def get_collation_result(self, last_iteration=False):
679        """Return dataframe containing all collated results
680
681        Parameters
682        ----------
683        last_iteration : bool
684            Will only return the result of the last iteration.
685
686        Returns
687        -------
688        DataFrame
689            A DataFrame with the simulation results along with the inputs
690            used to produce them.
691        """
692        if last_iteration:
693            iteration = self._active_sampler.iteration - 1
694        else:
695            iteration = -1
696        return self.campaign_db.get_results(
697            self._active_app['name'],
698            self._active_sampler_id,
699            status=easyvvuq.constants.Status.COLLATED,
700            iteration=iteration)
701
702    def get_invalid_runs(self, last_iteration=False):
703        """Return dataframe containing all results marked as INVALID.
704
705        Parameters
706        ----------
707        last_iteration : bool
708            Will only return the result of the last iteration.
709
710        Returns
711        -------
712        DataFrame
713            A DataFrame with the results form simulations that were marked as INVALID.
714            These will usually be the ones that went outside the specified parameter ranges.
715            These still have to be accounted for in some way by some methods (e.g. MCMC).
716        """
717        if last_iteration:
718            iteration = self._active_sampler.iteration - 1
719        else:
720            iteration = -1
721        return self.campaign_db.get_results(
722            self._active_app['name'],
723            self._active_sampler_id,
724            status=easyvvuq.constants.Status.INVALID,
725            iteration=iteration)
726
727    def apply_analysis(self, analysis):
728        """Run the `analysis` element on the output of the last run collation.
729
730        Parameters
731        ----------
732        analysis : Analysis
733            Element that performs a VVUQ analysis on a dataframe summary of
734            run outputs.
735        """
736        # Apply analysis element to most recent collation result
737        self.last_analysis = analysis.analyse(data_frame=self.get_collation_result())
738
739    def analyse(self, **kwargs):
740        """If available will call an appropriate analysis class on the collation result.
741
742        Parameters
743        ----------
744        **kwargs : dict
745            Argument to the analysis class constructor (after sampler).
746
747        Returns
748        -------
749        AnalysisResults
750            An object representing analysis results. Can be used to interact with those results
751            in some way. Plot, retrieve surrogate models and so on.
752            See `easyvvuq.analysis.AnalysisResults` for further information.
753        """
754        collation_result = self.get_collation_result()
755        try:
756            analysis = self._active_sampler.analysis_class(sampler=self._active_sampler, **kwargs)
757            return analysis.analyse(collation_result)
758        except NotImplementedError:
759            raise RuntimeError("This sampler does not have a corresponding analysis class")
760
761    def get_last_analysis(self):
762        """Return the output of the most recently run analysis element.
763        """
764        if self.last_analysis is None:
765            logging.warning("No last analysis output available.")
766        return self.last_analysis
767
768    def __str__(self):
769        """Returns formatted summary of the current Campaign state.
770        Enables class to work with standard print() method
771        """
772        return (f"db_location = {self.db_location}\n"
773                f"active_sampler_id = {self._active_sampler_id}\n"
774                f"campaign_name = {self.campaign_name}\n"
775                f"campaign_dir = {self.campaign_dir}\n"
776                f"campaign_id = {self.campaign_id}\n")
777
778    def get_active_sampler(self):
779        """Return the active sampler element in use by this campaign.
780
781        Returns
782        -------
783        The sampler currently in use
784        """
785
786        return self._active_sampler
787
788    def ignore_runs(self, list_of_run_IDs):
789        """Flags the specified runs to be IGNORED in future collation. Note that
790        this does NOT remove previously collated results from the collation table.
791        For that you must refresh the collation by running recollate().
792
793        Parameters
794        ----------
795        list
796            The list of run IDs for the runs that should be set to status IGNORED
797        """
798        self.campaign_db.set_run_statuses(list_of_run_IDs, Status.IGNORED)
799
800    def rerun(self, list_of_run_IDs):
801        """Sets the status of the specified runs to ENCODED, so that their results
802        may be recollated later (presumably after extending, rerunning or otherwise
803        modifying the data in the relevant run folder). Note that this method will
804        NOT perform any execution - it simply flags the run in EasyVVUQ as being
805        uncollated. Actual execution is (as usual) the job of the user or middleware.
806
807        Parameters
808        ----------
809        list
810            The list of run IDs for the runs that should be set to status ENCODED
811        """
812
813        for run_ID in list_of_run_IDs:
814            status = self.campaign_db.get_run_status(run_ID)
815            if status == Status.NEW:
816                msg = (f"Cannot rerun {run_ID} as it has status NEW, and must"
817                       f"be encoded before execution.")
818                raise RuntimeError(msg)
819        self.campaign_db.set_run_statuses(list_of_run_IDs, Status.ENCODED)
820
821    def get_active_app(self):
822        """Returns a dict of information regarding the application that is currently
823        set for this campaign.
824        """
825        return self._active_app

Campaigns organise the dataflow in EasyVVUQ workflows.

The Campaign functions as as state machine for the VVUQ workflows. It uses a database (CampaignDB) to store information on both the target application and the VVUQ algorithms being employed. It also collects data from the simulations and can be used to store and resume your state.

Notes

Multiple campaigns can be combined in a CampaignDB. Hence the particular campaign we are currently working on will be specified using campaign_id.

Parameters
  • name (str): Name of the Campaign. Freely chosen, serves as a human-readable way of distinguishing between several campaigns in the same database.
  • params (dict, optional): Description of the parameters to associated with the application. Will be used to create an app when creating the campaign. It is also possible to add apps manually using add_app method of the Campaign class. But this can be a useful shorthand when working with single app campaigns. To use this functionality both params and actions has to be specified. The name of this app will be the same as the name of the Campaign.
  • actions (Actions, optional): Actions object associated with an application. See description of the params parameter for more details.
  • db_location (str, optional): Location of the underlying campaign database - either a path or acceptable URI for SQLAlchemy.
  • work_dir (str, optional, default='./'): Path to working directory - used to store campaign directory.
  • change_to_state (bool, optional, default=False): Should we change to the directory containing any specified state_file in order to make relative paths work.
  • verify_all_runs (bool, optional, default=True): Check all new runs being added for unrecognised params (not defined for the currently set app), values lying within defined physical range, type checking etc. This should normally always be set to True, but in cases where the performance is too degraded, the checks can be disabled by setting to False.
Attributes
  • campaign_name (str or None): Name for the campaign/workflow.
  • _campaign_dir (str or None): Path to the directory campaign uses for local storage (runs inputs etc)
  • db_location (str or None): Location of the underlying campaign database - either a path or acceptable URI for SQLAlchemy.
  • _log (list): The log of all elements that have been applied, with information about their application
  • campaign_id (int): ID number for the current campaign in the db.CampaignDB.
  • campaign_db (easyvvuq.db.Basedb.CampaignDB): A campaign database object
  • last_analysis:: The result of the most recent analysis carried out on this campaign
  • _active_app (dict): Info about currently set app
  • _active_app_name (str): Name of currently set app
  • _active_sampler_id (int): The database id of the currently set Sampler object
Examples

A typical instantiation might look like this.

>>> params = {
        "S0": {"type": "float", "default": 997},
        "I0": {"type": "float", "default": 3},
        "beta": {"type": "float", "default": 0.2},
        "gamma": {"type": "float", "default": 0.04, "min": 0.0, "max": 1.0},
        "iterations": {"type": "integer", "default": 100},
        "outfile": {"type": "string", "default": "output.csv"}
    }
>>> encoder = uq.encoders.GenericEncoder(template_fname='sir.template', delimiter='$', target_filename='input.json')
>>> decoder = uq.decoders.SimpleCSV(target_filename='output.csv', output_columns=['I'])
>>> actions = uq.actions.local_execute(encoder, os.path.abspath('sir') + ' input.json', decoder)
>>> campaign = uq.Campaign(name='sir', params=params, actions=actions)

A simplified one (without an app) might look simply like this.

>>> campaign = Campaign('simple')

An app then can be added.

>>> campaign.add_app('simple_app', params=params, actions=actions)
Campaign( name, params=None, actions=None, db_location=None, work_dir='./', change_to_state=False, verify_all_runs=True)
197    def __init__(
198            self,
199            name,
200            params=None,
201            actions=None,
202            db_location=None,
203            work_dir="./",
204            change_to_state=False,
205            verify_all_runs=True
206    ):
207
208        self.work_dir = os.path.realpath(os.path.expanduser(work_dir))
209        self.verify_all_runs = verify_all_runs
210
211        self.campaign_name = name
212        self._campaign_dir = None
213
214        if db_location is None:
215            self._campaign_dir = tempfile.mkdtemp(prefix=name, dir=self.work_dir)
216            self.db_location = "sqlite:///" + self._campaign_dir + "/campaign.db"
217        else:
218            self.db_location = db_location
219
220        self.campaign_id = None
221        self.campaign_db = None
222
223        self.last_analysis = None
224
225        self._active_app = None
226        self._active_app_name = None
227        self._active_app_actions = None
228
229        self._active_sampler = None
230        self._active_sampler_id = None
231
232        self.init_db(name, self.work_dir)
233        self._state_dir = None
234
235        # here we assume that the user wants to add an app
236        if (params is not None) and (actions is not None):
237            self.add_app(name=name, params=params, actions=actions)
@staticmethod
def from_existing_data( name, input_files, output_files, input_decoder=None, output_decoder=None, params=None, output_columns=None, work_dir='./', auto_infer=True):
138    @staticmethod
139    def from_existing_data(name, 
140                          input_files, 
141                          output_files, 
142                          input_decoder=None, 
143                          output_decoder=None,
144                          params=None,
145                          output_columns=None,
146                          work_dir="./",
147                          auto_infer=True):
148        """
149        Create a campaign from existing data files.
150        
151        Parameters
152        ----------
153        name : str
154            Name of the campaign
155        input_files : list of str
156            List of input file paths
157        output_files : list of str
158            List of output file paths
159        input_decoder : Decoder, optional
160            Decoder for input files (auto-created if None)
161        output_decoder : Decoder, optional
162            Decoder for output files (auto-created if None)
163        params : dict, optional
164            Parameter definitions (auto-inferred if None and auto_infer=True)
165        output_columns : list of str, optional
166            Output column names (auto-inferred if None and auto_infer=True)
167        work_dir : str, optional
168            Working directory (default: "./")
169        auto_infer : bool, optional
170            Whether to automatically infer parameters and outputs (default: True)
171        
172        Returns
173        -------
174        Campaign
175            A new campaign with the imported data
176        
177        Examples
178        --------
179        >>> campaign = Campaign.from_existing_data(
180        ...     name="imported_sim",
181        ...     input_files=["run1/input.json", "run2/input.json"],
182        ...     output_files=["run1/output.csv", "run2/output.csv"]
183        ... )
184        """
185        from easyvvuq.utils.dataset_importer import create_campaign_from_files
186        
187        return create_campaign_from_files(
188            input_files=input_files,
189            output_files=output_files,
190            campaign_name=name,
191            work_dir=work_dir,
192            input_decoder=input_decoder,
193            output_decoder=output_decoder,
194            auto_infer=auto_infer
195        )

Create a campaign from existing data files.

Parameters
  • name (str): Name of the campaign
  • input_files (list of str): List of input file paths
  • output_files (list of str): List of output file paths
  • input_decoder (Decoder, optional): Decoder for input files (auto-created if None)
  • output_decoder (Decoder, optional): Decoder for output files (auto-created if None)
  • params (dict, optional): Parameter definitions (auto-inferred if None and auto_infer=True)
  • output_columns (list of str, optional): Output column names (auto-inferred if None and auto_infer=True)
  • work_dir (str, optional): Working directory (default: "./")
  • auto_infer (bool, optional): Whether to automatically infer parameters and outputs (default: True)
Returns
  • Campaign: A new campaign with the imported data
Examples
>>> campaign = Campaign.from_existing_data(
...     name="imported_sim",
...     input_files=["run1/input.json", "run2/input.json"],
...     output_files=["run1/output.csv", "run2/output.csv"]
... )
work_dir
verify_all_runs
campaign_name
campaign_id
campaign_db
last_analysis
campaign_dir
239    @property
240    def campaign_dir(self):
241        """Get the path in which to load/save files related to the campaign.
242
243        Returns
244        -------
245        str
246            Path to the campaign directory - given as a subdirectory of the
247            working directory.
248        """
249
250        return os.path.join(self.work_dir, self._campaign_dir)

Get the path in which to load/save files related to the campaign.

Returns
  • str: Path to the campaign directory - given as a subdirectory of the working directory.
def init_db(self, name, work_dir='.'):
252    def init_db(self, name, work_dir='.'):
253        """Initialize the connection with the database and either resume or create the campaign.
254
255        Parameters
256        ----------
257        name: str
258            Name of the campaign.
259        work_dir: str
260            Work directory, defaults to cwd.
261        """
262        self.campaign_db = db.CampaignDB(location=self.db_location)
263        if self.campaign_db.campaign_exists(name):
264            self.campaign_id = self.campaign_db.get_campaign_id(name)
265            self._active_app_name = self.campaign_db.get_active_app()[0].name
266            self.campaign_name = name
267            self._campaign_dir = self.campaign_db.campaign_dir(name)
268            if not os.path.exists(self._campaign_dir):
269                message = (f"Campaign directory ({self.campaign_dir}) does not exist.")
270                raise RuntimeError(message)
271            self._active_sampler_id = self.campaign_db.get_sampler_id(self.campaign_id)
272            self._active_sampler = self.campaign_db.resurrect_sampler(self._active_sampler_id)
273            self.set_app(self._active_app_name)
274            self.campaign_db.resume_campaign(name)
275        else:
276            if self._campaign_dir is None:
277                self._campaign_dir = tempfile.mkdtemp(prefix=name, dir=work_dir)
278            info = CampaignInfo(
279                name=name,
280                campaign_dir_prefix=default_campaign_prefix,
281                easyvvuq_version=easyvvuq.__version__,
282                campaign_dir=self._campaign_dir)
283            self.campaign_db.create_campaign(info)
284            self.campaign_name = name
285            self.campaign_id = self.campaign_db.get_campaign_id(self.campaign_name)

Initialize the connection with the database and either resume or create the campaign.

Parameters
  • name (str): Name of the campaign.
  • work_dir (str): Work directory, defaults to cwd.
def add_app(self, name=None, params=None, actions=None, set_active=True):
287    def add_app(self, name=None, params=None, actions=None, set_active=True):
288        """Add an application to the CampaignDB.
289
290        Parameters
291        ----------
292        name : str
293            Name of the application.
294        params : dict
295            Description of the parameters to associate with the application.
296        actions : Actions
297            An instance of Actions containing actions to be executed
298        set_active: bool
299            Should the added app be set to be the currently active app?
300        """
301        # Verify input parameters dict
302        paramsspec = easyvvuq.ParamsSpecification(params, appname=name)
303        # validate application input
304        app = AppInfo(
305            name=name,
306            paramsspec=paramsspec,
307            actions=actions,
308        )
309        self.campaign_db.add_app(app)
310        if set_active:
311            self.set_app(app.name)

Add an application to the CampaignDB.

Parameters
  • name (str): Name of the application.
  • params (dict): Description of the parameters to associate with the application.
  • actions (Actions): An instance of Actions containing actions to be executed
  • set_active (bool): Should the added app be set to be the currently active app?
def set_app(self, app_name):
313    def set_app(self, app_name):
314        """Set active app for the campaign.
315
316        Application information is retrieved from `self.campaign_db`.
317
318        Parameters
319        ----------
320        app_name: str
321            Name of selected app, if `None` given then first app will be
322            selected.
323        """
324        self._active_app_name = app_name
325        self._active_app = self.campaign_db.app(name=app_name)
326        self.campaign_db.set_active_app(app_name)
327        # Resurrect the app encoder, decoder and collation elements
328        self._active_app_actions = self.campaign_db.resurrect_app(app_name)

Set active app for the campaign.

Application information is retrieved from self.campaign_db.

Parameters
  • app_name (str): Name of selected app, if None given then first app will be selected.
def replace_actions(self, app_name, actions):
330    def replace_actions(self, app_name, actions):
331        """Replace actions for an app with a given name.
332
333        Parameters
334        ----------
335        app_name: str
336            Name of the app.
337        actions: Actions
338            `Actions` instance, will replace the current `Actions` of an app.
339        """
340        self.campaign_db.replace_actions(app_name, actions)
341        self._active_app_actions = actions

Replace actions for an app with a given name.

Parameters
  • app_name (str): Name of the app.
  • actions (Actions): Actions instance, will replace the current Actions of an app.
def set_sampler(self, sampler, update=False):
343    def set_sampler(self, sampler, update=False):
344        """Set active sampler.
345
346        Parameters
347        ----------
348        sampler : Sampler
349            Sampler that will be used to create runs for the current campaign.
350        update : bool
351            If set to True it will not add the sampler to the database, just change
352            it as the active sampler.
353        """
354        self._active_sampler = sampler
355        if not update:
356            self._active_sampler_id = self.campaign_db.add_sampler(sampler)
357            sampler.sampler_id = self._active_sampler_id
358        self._active_sampler_id = self._active_sampler.sampler_id
359        self.campaign_db.set_sampler(self.campaign_id, self._active_sampler.sampler_id)

Set active sampler.

Parameters
  • sampler (Sampler): Sampler that will be used to create runs for the current campaign.
  • update (bool): If set to True it will not add the sampler to the database, just change it as the active sampler.
def add_external_runs( self, input_files, output_files, input_decoder, output_decoder, validate_params=True, run_prefix='external_run'):
361    def add_external_runs(self, input_files, output_files, input_decoder, output_decoder,
362                         validate_params=True, run_prefix="external_run"):
363        """Takes a list of files and adds them to the database. This method is to be
364        used when adding runs to the EasyVVUQ database that were not executed using
365        EasyVVUQ.
366
367        Parameters
368        ----------
369        input_files: list of str
370            A list of input file paths to be loaded to the database.
371        output_files: list of str
372            A list of output file paths to be loaded to the database.
373        input_decoder: Decoder
374            A decoder that will be used to parse input files.
375        output_decoder: Decoder
376            A decoder that will be used to parse output files.
377        validate_params: bool, optional
378            Whether to validate parameters against the app definition (default: True)
379        run_prefix: str, optional
380            Prefix for run names (default: "external_run")
381        """
382        if self._active_app is None:
383            msg = ("No app is currently set for this campaign. "
384                   "Use set_app('name_of_app') or add_app() first.")
385            logging.error(msg)
386            raise Exception(msg)
387        
388        if len(input_files) != len(output_files):
389            raise ValueError("Number of input files must match number of output files")
390        
391        inputs = []
392        outputs = []
393        failed_runs = []
394        
395        # Parse input files
396        for i, input_file in enumerate(input_files):
397            try:
398                input_decoder.target_filename = os.path.basename(input_file)
399                params = input_decoder.parse_sim_output({'run_dir': os.path.dirname(input_file)})
400                
401                # Validate parameters if requested
402                if validate_params:
403                    try:
404                        app_default_params = self._active_app["params"]
405                        validated_params = app_default_params.process_run(params, verify=self.verify_all_runs)
406                        inputs.append(validated_params)
407                    except Exception as e:
408                        logging.warning(f"Parameter validation failed for {input_file}: {e}")
409                        failed_runs.append(i)
410                        continue
411                else:
412                    inputs.append(params)
413                    
414            except Exception as e:
415                logging.error(f"Failed to parse input file {input_file}: {e}")
416                failed_runs.append(i)
417                continue
418        
419        # Parse output files
420        for i, output_file in enumerate(output_files):
421            if i in failed_runs:
422                continue
423                
424            try:
425                output_decoder.target_filename = os.path.basename(output_file)
426                result = output_decoder.parse_sim_output({'run_dir': os.path.dirname(output_file)})
427                outputs.append(result)
428            except Exception as e:
429                logging.error(f"Failed to parse output file {output_file}: {e}")
430                failed_runs.append(i)
431                continue
432        
433        # Add runs to database
434        run_counter = 0
435        for i, (params, result) in enumerate(zip(inputs, outputs)):
436            if i in failed_runs:
437                continue
438                
439            run_counter += 1
440            table = db.RunTable(run_name=f'{run_prefix}_{run_counter}',
441                                app=self._active_app['id'],
442                                params=json.dumps(params),
443                                status=Status.COLLATED,
444                                run_dir=self.get_campaign_runs_dir(),
445                                result=json.dumps(result),
446                                campaign=self.campaign_id,
447                                sampler=self._active_sampler_id)
448            self.campaign_db.session.add(table)
449        
450        # Commit all changes at once
451        self.campaign_db.session.commit()
452        
453        logging.info(f"Successfully imported {run_counter} runs")
454        if failed_runs:
455            logging.warning(f"Failed to import {len(failed_runs)} runs due to parsing or validation errors")

Takes a list of files and adds them to the database. This method is to be used when adding runs to the EasyVVUQ database that were not executed using EasyVVUQ.

Parameters
  • input_files (list of str): A list of input file paths to be loaded to the database.
  • output_files (list of str): A list of output file paths to be loaded to the database.
  • input_decoder (Decoder): A decoder that will be used to parse input files.
  • output_decoder (Decoder): A decoder that will be used to parse output files.
  • validate_params (bool, optional): Whether to validate parameters against the app definition (default: True)
  • run_prefix (str, optional): Prefix for run names (default: "external_run")
def add_runs(self, runs, mark_invalid=False):
457    def add_runs(self, runs, mark_invalid=False):
458        """Add runs to the database.
459
460        Parameters
461        ----------
462        runs : list of dicts
463            Each dict defines the value of each model parameter listed in
464            self.params_info for a run to be added to self.runs
465        mark_invalid : bool
466            Will mark runs that fail verification as invalid (but will not raise an exception)
467        """
468        if self._active_app is None:
469            msg = ("No app is currently set for this campaign. "
470                   "Use set_app('name_of_app').")
471            logging.error(msg)
472            raise Exception(msg)
473        app_default_params = self._active_app["params"]
474        run_info_list = []
475        for new_run in runs:
476            if new_run is None:
477                msg = ("add_run() was passed new_run of type None. Bad sampler?")
478                logging.error(msg)
479                raise Exception(msg)
480            # Verify and complete run with missing/default param values
481            status = Status.NEW
482            try:
483                new_run = app_default_params.process_run(new_run, verify=self.verify_all_runs)
484            except RuntimeError:
485                if mark_invalid:
486                    new_run = app_default_params.process_run(new_run, verify=False)
487                    status = Status.INVALID
488                else:
489                    raise
490            # Add to run queue
491            run_info = RunInfo(app=self._active_app['id'],
492                               params=new_run,
493                               sample=self._active_sampler_id,
494                               campaign=self.campaign_id,
495                               status=status)
496            run_info_list.append(run_info)
497        self.campaign_db.add_runs(run_info_list, iteration=self._active_sampler.iteration)

Add runs to the database.

Parameters
  • runs (list of dicts): Each dict defines the value of each model parameter listed in self.params_info for a run to be added to self.runs
  • mark_invalid (bool): Will mark runs that fail verification as invalid (but will not raise an exception)
def draw_samples(self, num_samples=0, mark_invalid=False):
499    def draw_samples(self, num_samples=0, mark_invalid=False):
500        """Draws `num_samples` sets of parameters from the currently set
501        sampler, resulting in `num_samples` new runs added to the
502        runs list. If `num_samples` is 0 (its default value) then
503        this method draws ALL samples from the sampler, until exhaustion (this
504        will fail if the sampler is not finite).
505
506        Parameters
507        ----------
508        num_samples : int
509            Number of samples to draw from the active sampling element.
510            By default is 0 (draw ALL samples)
511        mark_invalid : bool
512            If True will mark runs that go outside valid parameter range as INVALID.
513            This is useful for MCMC style methods where you want those runs to evaluate
514            to low probabilities.
515        """
516        # Make sure `num_samples` is not 0 for an infinite generator
517        # (this would add runs forever...)
518        if not self._active_sampler.is_finite() and num_samples <= 0:
519            msg = (f"Sampling_element '{self._active_sampler.element_name()}' "
520                   f"is an infinite generator, therefore a finite number of "
521                   f"draws (n > 0) must be specified.")
522            raise RuntimeError(msg)
523        num_added = 0
524        new_runs = []
525        for new_run in self._active_sampler:
526            new_runs.append(new_run)
527            num_added += 1
528            if num_samples != 0 and num_added >= num_samples:
529                break
530        self.add_runs(new_runs, mark_invalid)
531        # Write sampler's new state to database
532        self.campaign_db.update_sampler(self._active_sampler_id, self._active_sampler)
533        return new_runs

Draws num_samples sets of parameters from the currently set sampler, resulting in num_samples new runs added to the runs list. If num_samples is 0 (its default value) then this method draws ALL samples from the sampler, until exhaustion (this will fail if the sampler is not finite).

Parameters
  • num_samples (int): Number of samples to draw from the active sampling element. By default is 0 (draw ALL samples)
  • mark_invalid (bool): If True will mark runs that go outside valid parameter range as INVALID. This is useful for MCMC style methods where you want those runs to evaluate to low probabilities.
def list_runs(self, sampler=None, campaign=None, app_id=None, status=None):
535    def list_runs(self, sampler=None, campaign=None, app_id=None, status=None):
536        """Get list of runs in the CampaignDB.
537
538        Parameters
539        ----------
540        sampler: int
541            Sampler id to filter for.
542        campaign: int
543            Campaign id to filter for.
544        app_id: int
545            App id to filter for.
546        status: Status
547            Status to filter for.
548
549        Returns
550        -------
551        list of runs
552        """
553        return list(self.campaign_db.runs(
554            sampler=sampler, campaign=campaign, app_id=app_id, status=status))

Get list of runs in the CampaignDB.

Parameters
  • sampler (int): Sampler id to filter for.
  • campaign (int): Campaign id to filter for.
  • app_id (int): App id to filter for.
  • status (Status): Status to filter for.
Returns
  • list of runs
def get_campaign_runs_dir(self):
556    def get_campaign_runs_dir(self):
557        """Get the runs directory from the CampaignDB.
558
559        Returns
560        -------
561        str
562            Path in which the runs information will be written.
563        """
564        return self.campaign_db.runs_dir(self.campaign_name)

Get the runs directory from the CampaignDB.

Returns
  • str: Path in which the runs information will be written.
def relocate(self, campaign_dir):
566    def relocate(self, campaign_dir):
567        """Relocate the campaign by specifying a new path where campaign is located.
568
569        Parameters
570        ----------
571        new_path: str
572            new runs directory
573        """
574        if not os.path.exists(campaign_dir):
575            raise RuntimeError("specified directory does not exist: {}".format(campaign_dir))
576        self.campaign_db.relocate(campaign_dir, self.campaign_name)

Relocate the campaign by specifying a new path where campaign is located.

Parameters
  • new_path (str): new runs directory
def execute(self, nsamples=0, pool=None, mark_invalid=False, sequential=False):
578    def execute(self, nsamples=0, pool=None, mark_invalid=False, sequential=False):
579        """This will draw samples and execute the Actions on those samples.
580
581        Parameters
582        ----------
583        nsamples: int
584            Number of samples to draw. For infinite samplers or when you want to process
585            samples in batches.
586        pool: Executor
587            A pool object to be used when processing runs (e.g. instance of `ThreadPoolExecutor` or
588            `ProcessPoolExecutor`).
589        mark_invalid: bool
590            Mark runs that go outside the specified input parameter range as INVALID.
591        sequential: bool
592            Whether to process samples sequentially (sometimes more efficient or you might
593            want to avoid the concurrent module for some reason).
594        """
595        self.draw_samples(nsamples, mark_invalid=mark_invalid)
596        action_pool = self.apply_for_each_sample(
597            self._active_app_actions, sequential=sequential)
598        return action_pool.start(pool=pool)

This will draw samples and execute the Actions on those samples.

Parameters
  • nsamples (int): Number of samples to draw. For infinite samplers or when you want to process samples in batches.
  • pool (Executor): A pool object to be used when processing runs (e.g. instance of ThreadPoolExecutor or ProcessPoolExecutor).
  • mark_invalid (bool): Mark runs that go outside the specified input parameter range as INVALID.
  • sequential (bool): Whether to process samples sequentially (sometimes more efficient or you might want to avoid the concurrent module for some reason).
def apply_for_each_sample(self, actions, status=<Status.NEW: 1>, sequential=False):
600    def apply_for_each_sample(self, actions, status=Status.NEW, sequential=False):
601        """For each run in this Campaign's run list, apply the specified action
602        (an object of type Action).
603
604        Parameters
605        ----------
606        actions: Actions
607            Actions to be applied to each relevant run in the database.
608        status: Status
609            Will apply the Actions only to those runs whose status is as specified.
610        sequential: bool
611            Whether to process samples sequentially (sometimes more efficient or you might
612            want to avoid the concurrent module for some reason).
613
614        Returns
615        -------
616        ActionPool
617            An object containing ActionStatus instances to track action execution.
618        """
619        # Loop through all runs in this campaign with status ENCODED, and
620        # run the specified action on each run's dir
621        def inits():
622            for run_id, run_data in self.campaign_db.runs(
623                    status=status, app_id=self._active_app['id']):
624                previous = {}
625                previous['run_id'] = run_id
626                previous['campaign_dir'] = self._campaign_dir
627                previous['rundir'] = run_data['run_dir']
628                previous['run_info'] = run_data
629                previous['result'] = {}
630                previous['collated'] = False
631                yield previous
632        return ActionPool(self, actions, inits=inits(), sequential=sequential)

For each run in this Campaign's run list, apply the specified action (an object of type Action).

Parameters
  • actions (Actions): Actions to be applied to each relevant run in the database.
  • status (Status): Will apply the Actions only to those runs whose status is as specified.
  • sequential (bool): Whether to process samples sequentially (sometimes more efficient or you might want to avoid the concurrent module for some reason).
Returns
  • ActionPool: An object containing ActionStatus instances to track action execution.
def iterate(self, nsamples=0, pool=None, mark_invalid=False, sequential=False):
634    def iterate(self, nsamples=0, pool=None, mark_invalid=False, sequential=False):
635        """This is the equivalent of `execute` for methods that rely on the output of the
636        previous sampling stage (designed for MCMC, should work for others).
637
638        Parameters
639        ----------
640        nsamples : int
641            Number of samples to draw (during a single iteration).
642        pool : Executor
643            An Executor instance. For example ThreadPoolExecutor or a Dask Client. Defaults
644            to the ThreadPoolExecutor.
645        mark_invalid : bool
646            Mark runs that go outside the specified input parameter range as INVALID.
647        sequential: bool
648            Will execute the `Actions` associated with runs sequentially. Might be more
649            efficient in some situations.
650
651        Yields
652        ------
653        ActionPool
654            An object containing Futures instances to track action execution.
655        """
656        while True:
657            self.draw_samples(nsamples, mark_invalid=mark_invalid)
658            action_pool = self.apply_for_each_sample(
659                self._active_app_actions, sequential=sequential)
660            yield action_pool.start(pool=pool)
661            result = self.get_collation_result(last_iteration=True)
662            invalid = self.get_invalid_runs(last_iteration=True)
663            ignored_runs = self._active_sampler.update(result, invalid)
664            for run_id in ignored_runs:
665                self.campaign_db.session.query(db.RunTable).\
666                    filter(db.RunTable.id == int(run_id)).\
667                    update({'status': easyvvuq.constants.Status.IGNORED})
668            self.campaign_db.session.commit()

This is the equivalent of execute for methods that rely on the output of the previous sampling stage (designed for MCMC, should work for others).

Parameters
  • nsamples (int): Number of samples to draw (during a single iteration).
  • pool (Executor): An Executor instance. For example ThreadPoolExecutor or a Dask Client. Defaults to the ThreadPoolExecutor.
  • mark_invalid (bool): Mark runs that go outside the specified input parameter range as INVALID.
  • sequential (bool): Will execute the Actions associated with runs sequentially. Might be more efficient in some situations.
Yields
  • ActionPool: An object containing Futures instances to track action execution.
def recollate(self):
670    def recollate(self):
671        """Clears the current collation table, changes all COLLATED status runs
672           back to ENCODED, then runs collate() again
673        """
674        collated_run_ids = list(self.campaign_db.run_ids(status=Status.COLLATED))
675        self.campaign_db.set_run_statuses(collated_run_ids, Status.ENCODED)
676        self.collate()

Clears the current collation table, changes all COLLATED status runs back to ENCODED, then runs collate() again

def get_collation_result(self, last_iteration=False):
678    def get_collation_result(self, last_iteration=False):
679        """Return dataframe containing all collated results
680
681        Parameters
682        ----------
683        last_iteration : bool
684            Will only return the result of the last iteration.
685
686        Returns
687        -------
688        DataFrame
689            A DataFrame with the simulation results along with the inputs
690            used to produce them.
691        """
692        if last_iteration:
693            iteration = self._active_sampler.iteration - 1
694        else:
695            iteration = -1
696        return self.campaign_db.get_results(
697            self._active_app['name'],
698            self._active_sampler_id,
699            status=easyvvuq.constants.Status.COLLATED,
700            iteration=iteration)

Return dataframe containing all collated results

Parameters
  • last_iteration (bool): Will only return the result of the last iteration.
Returns
  • DataFrame: A DataFrame with the simulation results along with the inputs used to produce them.
def get_invalid_runs(self, last_iteration=False):
702    def get_invalid_runs(self, last_iteration=False):
703        """Return dataframe containing all results marked as INVALID.
704
705        Parameters
706        ----------
707        last_iteration : bool
708            Will only return the result of the last iteration.
709
710        Returns
711        -------
712        DataFrame
713            A DataFrame with the results form simulations that were marked as INVALID.
714            These will usually be the ones that went outside the specified parameter ranges.
715            These still have to be accounted for in some way by some methods (e.g. MCMC).
716        """
717        if last_iteration:
718            iteration = self._active_sampler.iteration - 1
719        else:
720            iteration = -1
721        return self.campaign_db.get_results(
722            self._active_app['name'],
723            self._active_sampler_id,
724            status=easyvvuq.constants.Status.INVALID,
725            iteration=iteration)

Return dataframe containing all results marked as INVALID.

Parameters
  • last_iteration (bool): Will only return the result of the last iteration.
Returns
  • DataFrame: A DataFrame with the results form simulations that were marked as INVALID. These will usually be the ones that went outside the specified parameter ranges. These still have to be accounted for in some way by some methods (e.g. MCMC).
def apply_analysis(self, analysis):
727    def apply_analysis(self, analysis):
728        """Run the `analysis` element on the output of the last run collation.
729
730        Parameters
731        ----------
732        analysis : Analysis
733            Element that performs a VVUQ analysis on a dataframe summary of
734            run outputs.
735        """
736        # Apply analysis element to most recent collation result
737        self.last_analysis = analysis.analyse(data_frame=self.get_collation_result())

Run the analysis element on the output of the last run collation.

Parameters
  • analysis (Analysis): Element that performs a VVUQ analysis on a dataframe summary of run outputs.
def analyse(self, **kwargs):
739    def analyse(self, **kwargs):
740        """If available will call an appropriate analysis class on the collation result.
741
742        Parameters
743        ----------
744        **kwargs : dict
745            Argument to the analysis class constructor (after sampler).
746
747        Returns
748        -------
749        AnalysisResults
750            An object representing analysis results. Can be used to interact with those results
751            in some way. Plot, retrieve surrogate models and so on.
752            See `easyvvuq.analysis.AnalysisResults` for further information.
753        """
754        collation_result = self.get_collation_result()
755        try:
756            analysis = self._active_sampler.analysis_class(sampler=self._active_sampler, **kwargs)
757            return analysis.analyse(collation_result)
758        except NotImplementedError:
759            raise RuntimeError("This sampler does not have a corresponding analysis class")

If available will call an appropriate analysis class on the collation result.

Parameters
  • **kwargs (dict): Argument to the analysis class constructor (after sampler).
Returns
  • AnalysisResults: An object representing analysis results. Can be used to interact with those results in some way. Plot, retrieve surrogate models and so on. See easyvvuq.analysis.AnalysisResults for further information.
def get_last_analysis(self):
761    def get_last_analysis(self):
762        """Return the output of the most recently run analysis element.
763        """
764        if self.last_analysis is None:
765            logging.warning("No last analysis output available.")
766        return self.last_analysis

Return the output of the most recently run analysis element.

def get_active_sampler(self):
778    def get_active_sampler(self):
779        """Return the active sampler element in use by this campaign.
780
781        Returns
782        -------
783        The sampler currently in use
784        """
785
786        return self._active_sampler

Return the active sampler element in use by this campaign.

Returns
  • The sampler currently in use
def ignore_runs(self, list_of_run_IDs):
788    def ignore_runs(self, list_of_run_IDs):
789        """Flags the specified runs to be IGNORED in future collation. Note that
790        this does NOT remove previously collated results from the collation table.
791        For that you must refresh the collation by running recollate().
792
793        Parameters
794        ----------
795        list
796            The list of run IDs for the runs that should be set to status IGNORED
797        """
798        self.campaign_db.set_run_statuses(list_of_run_IDs, Status.IGNORED)

Flags the specified runs to be IGNORED in future collation. Note that this does NOT remove previously collated results from the collation table. For that you must refresh the collation by running recollate().

Parameters
  • list: The list of run IDs for the runs that should be set to status IGNORED
def rerun(self, list_of_run_IDs):
800    def rerun(self, list_of_run_IDs):
801        """Sets the status of the specified runs to ENCODED, so that their results
802        may be recollated later (presumably after extending, rerunning or otherwise
803        modifying the data in the relevant run folder). Note that this method will
804        NOT perform any execution - it simply flags the run in EasyVVUQ as being
805        uncollated. Actual execution is (as usual) the job of the user or middleware.
806
807        Parameters
808        ----------
809        list
810            The list of run IDs for the runs that should be set to status ENCODED
811        """
812
813        for run_ID in list_of_run_IDs:
814            status = self.campaign_db.get_run_status(run_ID)
815            if status == Status.NEW:
816                msg = (f"Cannot rerun {run_ID} as it has status NEW, and must"
817                       f"be encoded before execution.")
818                raise RuntimeError(msg)
819        self.campaign_db.set_run_statuses(list_of_run_IDs, Status.ENCODED)

Sets the status of the specified runs to ENCODED, so that their results may be recollated later (presumably after extending, rerunning or otherwise modifying the data in the relevant run folder). Note that this method will NOT perform any execution - it simply flags the run in EasyVVUQ as being uncollated. Actual execution is (as usual) the job of the user or middleware.

Parameters
  • list: The list of run IDs for the runs that should be set to status ENCODED
def get_active_app(self):
821    def get_active_app(self):
822        """Returns a dict of information regarding the application that is currently
823        set for this campaign.
824        """
825        return self._active_app

Returns a dict of information regarding the application that is currently set for this campaign.