easyvvuq.db.sql

Provides class that allows access to an SQL Database that serves as the back-end to EasyVVUQ.

  1"""Provides class that allows access to an SQL Database that serves as the back-end to EasyVVUQ.
  2
  3
  4"""
  5import os
  6import json
  7import logging
  8import pandas as pd
  9import numpy as np
 10from sqlalchemy.sql import case
 11from sqlalchemy import create_engine, Column, Integer, String, ForeignKey
 12from sqlalchemy.orm import sessionmaker, declarative_base
 13from sqlalchemy import MetaData
 14from sqlalchemy import text
 15from sqlalchemy.engine import Engine
 16from sqlalchemy import event
 17from .base import BaseCampaignDB
 18from easyvvuq import constants
 19from easyvvuq import ParamsSpecification
 20from easyvvuq.utils.helpers import easyvvuq_serialize, easyvvuq_deserialize
 21
 22
 23__copyright__ = """
 24
 25    Copyright 2018 Robin A. Richardson, David W. Wright
 26
 27    This file is part of EasyVVUQ
 28
 29    EasyVVUQ is free software: you can redistribute it and/or modify
 30    it under the terms of the Lesser GNU General Public License as published by
 31    the Free Software Foundation, either version 3 of the License, or
 32    (at your option) any later version.
 33
 34    EasyVVUQ is distributed in the hope that it will be useful,
 35    but WITHOUT ANY WARRANTY; without even the implied warranty of
 36    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 37    Lesser GNU General Public License for more details.
 38
 39    You should have received a copy of the Lesser GNU General Public License
 40    along with this program.  If not, see <https://www.gnu.org/licenses/>.
 41
 42"""
 43__license__ = "LGPL"
 44
 45COMMIT_RATE = 50000
 46
 47logger = logging.getLogger(__name__)
 48
 49Base = declarative_base()
 50
 51
 52class DBInfoTable(Base):
 53    """An SQLAlchemy schema for the database information table.
 54    """
 55    __tablename__ = 'db_info'
 56    id = Column(Integer, primary_key=True)
 57    next_run = Column(Integer)
 58
 59
 60class CampaignTable(Base):
 61    """An SQLAlchemy schema for the campaign information table.
 62    """
 63    __tablename__ = 'campaign_info'
 64    id = Column(Integer, primary_key=True)
 65    name = Column(String, unique=True)
 66    easyvvuq_version = Column(String)
 67    campaign_dir_prefix = Column(String)
 68    campaign_dir = Column(String)
 69    runs_dir = Column(String)
 70    sampler = Column(Integer, ForeignKey('sampler.id'))
 71    active_app = Column(Integer, ForeignKey('app.id'))
 72
 73
 74class AppTable(Base):
 75    """An SQLAlchemy schema for the app table.
 76    """
 77    __tablename__ = 'app'
 78    id = Column(Integer, primary_key=True)
 79    name = Column(String, unique=True)
 80    params = Column(String)
 81    actions = Column(String)
 82
 83
 84class RunTable(Base):
 85    """An SQLAlchemy schema for the run table.
 86    """
 87    __tablename__ = 'run'
 88    id = Column(Integer, primary_key=True)
 89    run_name = Column(String, index=True)
 90    app = Column(Integer, ForeignKey('app.id'))
 91    params = Column(String)
 92    status = Column(Integer)
 93    run_dir = Column(String)
 94    result = Column(String, default="{}")
 95    execution_info = Column(String, default="{}")
 96    campaign = Column(Integer, ForeignKey('campaign_info.id'))
 97    sampler = Column(Integer, ForeignKey('sampler.id'))
 98    iteration = Column(Integer, default=0)
 99
100
101class SamplerTable(Base):
102    """An SQLAlchemy schema for the run table.
103    """
104    __tablename__ = 'sampler'
105    id = Column(Integer, primary_key=True)
106    sampler = Column(String)
107
108
109@event.listens_for(Engine, "connect")
110def set_sqlite_pragma(dbapi_connection, connection_record):
111    cursor = dbapi_connection.cursor()
112    cursor.execute("PRAGMA synchronous = OFF")
113    cursor.execute("PRAGMA journal_mode = OFF")
114    cursor.close()
115
116
117class CampaignDB(BaseCampaignDB):
118    """An interface between the campaign database and the campaign.
119
120    Parameters
121    ----------
122    location: str
123       database URI as needed by SQLAlchemy
124    """
125
126    def __init__(self, location=None):
127        if location is not None:
128            self.engine = create_engine(location)
129        else:
130            self.engine = create_engine('sqlite://')
131        self.commit_counter = 0
132        session_maker = sessionmaker(bind=self.engine)
133        self.session = session_maker()
134        Base.metadata.create_all(self.engine, checkfirst=True)
135
136    def resume_campaign(self, name):
137        """Resumes campaign.
138
139        Parameters
140        ----------
141        name: str
142           Name of the Campaign to resume. Must already exist in the database.
143        """
144        info = self.session.query(
145            CampaignTable).filter_by(name=name).first()
146        if info is None:
147            raise ValueError('Campaign with the given name not found.')
148        db_info = self.session.query(DBInfoTable).first()
149        self._next_run = db_info.next_run
150
151    def create_campaign(self, info):
152        """Creates a new campaign in the database.
153
154        Parameters
155        ----------
156        info: CampaignInfo
157            This `easyvvuq.data_structs.CampaignInfo` will contain information
158            needed to construct the Campaign table.
159        """
160        is_db_empty = (self.session.query(CampaignTable).first() is None)
161        version_check = self.session.query(
162            CampaignTable).filter(CampaignTable.easyvvuq_version != info.easyvvuq_version).all()
163        if (not is_db_empty) and (len(version_check) != 0):
164            raise RuntimeError('Database contains campaign created with an incompatible' +
165                               ' version of EasyVVUQ!')
166        self._next_run = 1
167        self.session.add(CampaignTable(**info.to_dict(flatten=True)))
168        self.session.add(DBInfoTable(next_run=self._next_run))
169        self.session.commit()
170
171    def get_active_app(self):
172        """Returns active app table.
173
174        Returns
175        -------
176        AppTable
177        """
178        return self.session.query(AppTable, CampaignTable).filter(
179            AppTable.id == CampaignTable.active_app).first()
180
181    def campaign_exists(self, name):
182        """Check if campaign specified by that name already exists.
183
184        Parameters
185        ----------
186        name: str
187
188        Returns
189        -------
190        bool
191          True if such a campaign already exists, False otherwise
192        """
193        result = self.session.query(CampaignTable).filter(
194            CampaignTable.name == name).all()
195        return len(result) > 0
196
197    def app(self, name=None):
198        """Get app information. Specific applications selected by `name`,
199        otherwise first entry in database 'app' selected.
200
201        Parameters
202        ----------
203        name : str or None
204            Name of selected app, if `None` given then first app will be
205            selected.
206
207        Returns
208        -------
209        dict
210            Information about the application.
211        """
212
213        if name is None:
214            selected = self.session.query(AppTable).all()
215        else:
216            selected = self.session.query(AppTable).filter_by(name=name).all()
217
218        if len(selected) == 0:
219            message = f'No entry for app: ({name}).'
220            logger.critical(message)
221            raise RuntimeError(message)
222
223        selected_app = selected[0]
224
225        app_dict = {
226            'id': selected_app.id,
227            'name': selected_app.name,
228            'params': ParamsSpecification.deserialize(selected_app.params),
229            'actions': selected_app.actions,
230        }
231
232        return app_dict
233
234    def set_active_app(self, name):
235        """Set an app specified by name as active.
236
237        Parameters
238        ----------
239        name: str
240           name of the app to set as active
241        """
242        selected = self.session.query(AppTable).filter_by(name=name).all()
243        if len(selected) == 0:
244            raise RuntimeError('no such app - {}'.format(name))
245        assert (not (len(selected) > 1))
246        app = selected[0]
247        self.session.query(CampaignTable).update({'active_app': app.id})
248        self.session.commit()
249
250    def add_app(self, app_info):
251        """Add application to the 'app' table.
252
253        Parameters
254        ----------
255        app_info: AppInfo
256            Application definition.
257        """
258
259        # Check that no app with same name exists
260        name = app_info.name
261        selected = self.session.query(AppTable).filter_by(name=name).all()
262        if len(selected) > 0:
263            message = (
264                f'There is already an app in this database with name {name}'
265                f'(found {len(selected)}).'
266            )
267            logger.critical(message)
268            raise RuntimeError(message)
269
270        app_dict = app_info.to_dict(flatten=True)
271
272        db_entry = AppTable(**app_dict)
273        self.session.add(db_entry)
274        self.session.commit()
275
276    def replace_actions(self, app_name, actions):
277        """Replace actions for an app with a given name.
278
279        Parameters
280        ----------
281        app_name: str
282            Name of the app.
283        actions: Actions
284            `Actions` instance, will replace the current `Actions` of an app.
285        """
286        self.session.query(AppTable).filter_by(name=app_name).update(
287            {'actions': easyvvuq_serialize(actions)})
288        self.session.commit()
289
290    def add_sampler(self, sampler_element):
291        """Add new Sampler to the 'sampler' table.
292
293        Parameters
294        ----------
295        sampler_element: Sampler
296            An EasyVVUQ sampler.
297
298        Returns
299        -------
300        int
301            The sampler `id` in the database.
302        """
303        db_entry = SamplerTable(sampler=easyvvuq_serialize(sampler_element))
304
305        self.session.add(db_entry)
306        self.session.commit()
307
308        return db_entry.id
309
310    def update_sampler(self, sampler_id, sampler_element):
311        """Update the state of the Sampler with id 'sampler_id' to
312        that in the passed 'sampler_element'
313
314        Parameters
315        ----------
316        sampler_id: int
317            The id of the sampler in the db to update
318        sampler_element: Sampler
319            The sampler that should be used as the new state
320        """
321
322        selected = self.session.get(SamplerTable,sampler_id)
323        selected.sampler = easyvvuq_serialize(sampler_element)
324        self.session.commit()
325
326    def resurrect_sampler(self, sampler_id):
327        """Return the sampler object corresponding to id sampler_id in the database.
328        It is deserialized from the state stored in the database.
329
330        Parameters
331        ----------
332        sampler_id: int
333            The id of the sampler to resurrect
334
335        Returns
336        -------
337        Sampler
338            The 'live' sampler object, deserialized from the state in the db
339        """
340        try:
341            serialized_sampler = self.session.get(SamplerTable,sampler_id).sampler 
342            sampler = easyvvuq_deserialize(serialized_sampler.encode('utf-8'))
343        except AttributeError:
344            sampler = None
345        return sampler
346
347    def resurrect_app(self, app_name):
348        """Return the 'live' encoder, decoder and collation objects corresponding to the app with
349        name 'app_name' in the database. They are deserialized from the states previously
350        stored in the database.
351
352        Parameters
353        ----------
354        app_name: string
355            Name of the app to resurrect
356
357        Returns
358        -------
359        Actions
360            The 'live' `Actions` object associated with this app. Used to execute the simulation
361            associated with the app as well as do any pre- and post-processing.
362        """
363        app_info = self.app(app_name)
364        actions = easyvvuq_deserialize(app_info['actions'])
365        return actions
366
367    def add_runs(self, run_info_list=None, run_prefix='run_', iteration=0):
368        """Add list of runs to the `runs` table in the database.
369
370        Parameters
371        ----------
372        run_info_list: List of RunInfo objects
373            Each RunInfo object contains relevant run fields: params, status (where in the
374            EasyVVUQ workflow is this RunTable), campaign (id number), sample, app
375        run_prefix: str
376            Prefix for run name
377        iteration: int
378            Iteration number used by iterative workflows. For example, MCMC. Can be left
379            as default zero in other cases.
380        """
381        # Add all runs to RunTable
382        commit_counter = 0
383        for run_info in run_info_list:
384            run_info.run_name = f"{run_prefix}{self._next_run}"
385            run_info.iteration = iteration
386            run = RunTable(**run_info.to_dict(flatten=True))
387            self.session.add(run)
388            self._next_run += 1
389            commit_counter += 1
390            if commit_counter % COMMIT_RATE == 0:
391                self.session.commit()
392        # Update run and ensemble counters in db
393        db_info = self.session.query(DBInfoTable).first()
394        db_info.next_run = self._next_run
395        self.session.commit()
396
397    @staticmethod
398    def _run_to_dict(run_row):
399        """Convert the provided row from 'runs' table into a dictionary
400
401        Parameters
402        ----------
403        run_row: RunTable
404            Information on a particular run in the database.
405
406        Returns
407        -------
408        dict
409            Contains run information (keys = run_name, params, status, sample,
410            campaign and app)
411        """
412
413        run_info = {
414            'run_name': run_row.run_name,
415            'params': json.loads(run_row.params),
416            'status': constants.Status(run_row.status),
417            'sampler': run_row.sampler,
418            'campaign': run_row.campaign,
419            'app': run_row.app,
420            'result': run_row.result,
421            'run_dir': run_row.run_dir
422        }
423
424        return run_info
425
426    def set_dir_for_run(self, run_name, run_dir, campaign=None, sampler=None):
427        """Set the 'run_dir' path for the specified run in the database.
428
429        Parameters
430        ----------
431        run_name: str
432            Name of run to filter for.
433        run_dir: str
434            Directory path associated to set for this run.
435        campaign:  int or None
436            Campaign id to filter for.
437        sampler: int or None
438            Sample id to filter for.
439        """
440        filter_options = {'run_name': run_name}
441        if campaign:
442            filter_options['campaign'] = campaign
443        if sampler:
444            filter_options['sampler'] = sampler
445        selected = self.session.query(RunTable).filter_by(**filter_options)
446        if selected.count() != 1:
447            logging.critical('Multiple runs selected - using the first')
448        selected = selected.first()
449        selected.run_dir = run_dir
450        self.session.commit()
451
452    def get_run_status(self, run_id, campaign=None, sampler=None):
453        """Return the status (enum) for the run with name 'run_name' (and, optionally,
454        filtering for campaign and sampler by id)
455
456        Parameters
457        ----------
458        run_id: int
459            id of the run
460        campaign: int
461            ID of the desired Campaign
462        sampler: int
463            ID of the desired Sampler
464
465        Returns
466        -------
467        enum(Status)
468            Status of the run.
469        """
470        filter_options = {'id': run_id}
471        if campaign:
472            filter_options['campaign'] = campaign
473        if sampler:
474            filter_options['sampler'] = sampler
475        selected = self.session.query(RunTable).filter_by(**filter_options)
476        if selected.count() != 1:
477            logging.critical('Multiple runs selected - using the first')
478        selected = selected.first()
479        return constants.Status(selected.status)
480
481    def set_run_statuses(self, run_id_list, status):
482        """Set the specified 'status' (enum) for all runs in the list run_id_list
483
484        Parameters
485        ----------
486        run_id_list: list of int
487            a list of run ids
488        status: enum(Status)
489            The new status all listed runs should now have
490        """
491        self.session.query(RunTable).filter(
492            RunTable.id.in_(run_id_list)).update(
493                {RunTable.status: status}, synchronize_session='fetch')
494        self.session.commit()
495
496    def campaigns(self):
497        """Get list of campaigns for which information is stored in the
498        database.
499
500        Returns
501        -------
502        list
503            Campaign names.
504        """
505
506        return [c.name for c in self.session.query(CampaignTable).all()]
507
508    def _get_campaign_info(self, campaign_name=None):
509        """Retrieves Campaign info based on name.
510
511        Parameters
512        ----------
513        campaign_name: str
514            Name of campaign to select.
515
516        Returns
517        -------
518            SQLAlchemy query for campaign with this name.
519        """
520        assert (isinstance(campaign_name, str) or campaign_name is None)
521        query = self.session.query(CampaignTable)
522        if campaign_name is None:
523            campaign_info = query
524        else:
525            campaign_info = query.filter_by(name=campaign_name).all()
526        if campaign_name is not None:
527            if len(campaign_info) > 1:
528                logger.warning(
529                    'More than one campaign selected - using first one.')
530            elif len(campaign_info) == 0:
531                message = 'No campaign available.'
532                logger.critical(message)
533                raise RuntimeError(message)
534            return campaign_info[0]
535        return campaign_info.first()
536
537    def get_campaign_id(self, name):
538        """Return the (database) id corresponding to the campaign with name 'name'.
539
540        Parameters
541        ----------
542        name: str
543            Name of the campaign.
544
545        Returns
546        -------
547        int
548            The id of the campaign with the specified name
549        """
550
551        selected = self.session.query(
552            CampaignTable.name.label(name),
553            CampaignTable.id).filter(CampaignTable.name == name).all()
554        if len(selected) == 0:
555            msg = f"No campaign with name {name} found in campaign database"
556            logger.error(msg)
557            raise RuntimeError(msg)
558        if len(selected) > 1:
559            msg = (
560                f"More than one campaign with name {name} found in"
561                f"campaign database. Database state is compromised."
562            )
563            logger.error(msg)
564            raise RuntimeError(msg)
565        # Return the database ID for the specified campaign
566        return selected[0][1]
567
568    def get_sampler_id(self, campaign_id):
569        """Return the (database) id corresponding to the sampler currently set
570        for the campaign with id 'campaign_id'
571
572        Parameters
573        ----------
574        campaign_id: int
575            ID of the campaign.
576
577        Returns
578        -------
579        int
580            The id of the sampler set for the specified campaign
581        """
582        sampler_id = self.session.get(CampaignTable,campaign_id).sampler
583        return sampler_id
584
585    def set_sampler(self, campaign_id, sampler_id):
586        """Set specified campaign to be using specified sampler
587
588        Parameters
589        ----------
590        campaign_id: int
591            ID of the campaign.
592        sampler_id: int
593            ID of the sampler.
594        """
595        self.session.get(CampaignTable,campaign_id).sampler = sampler_id
596        self.session.commit()
597
598    def campaign_dir(self, campaign_name=None):
599        """Get campaign directory for `campaign_name`.
600
601        Parameters
602        ----------
603        campaign_name: str
604            Name of campaign to select
605
606        Returns
607        -------
608        str
609            Path to campaign directory.
610        """
611        return self._get_campaign_info(campaign_name=campaign_name).campaign_dir
612
613    def _select_runs(
614            self,
615            name=None,
616            campaign=None,
617            sampler=None,
618            status=None,
619            not_status=None,
620            app_id=None):
621        """Select all runs in the database which match the input criteria.
622
623        Parameters
624        ----------
625        name: str
626            Name of run to filter for.
627        campaign:  int or None
628            Campaign id to filter for.
629        sampler: int or None
630            Sampler id to filter for.
631        status: enum(Status) or None
632            Status string to filter for.
633        not_status: enum(Status) or None
634            Exclude runs with this status string
635        app_id: int or None
636            App id to filter for.
637
638        Returns
639        -------
640        sqlalchemy.orm.query.Query
641            Selected runs from the database run table.
642        """
643        filter_options = {}
644        if name:
645            filter_options['run_name'] = name
646        if campaign:
647            filter_options['campaign'] = campaign
648        if sampler:
649            filter_options['sampler'] = sampler
650        if status:
651            filter_options['status'] = status
652        if app_id:
653            filter_options['app'] = app_id
654
655        # Note that for some databases this can be sped up with a yield_per(), but not all
656        selected = self.session.query(RunTable).filter_by(
657            **filter_options).filter(RunTable.status != not_status)
658
659        return selected
660
661    def run(self, name, campaign=None, sampler=None, status=None, not_status=None, app_id=None):
662        """Get the information for a specified run.
663
664        Parameters
665        ----------
666        name: str
667            Name of run to filter for.
668        campaign:  int or None
669            Campaign id to filter for.
670        sampler: int or None
671            Sampler id to filter for.
672        status: enum(Status) or None
673            Status string to filter for.
674        not_status: enum(Status) or None
675            Exclude runs with this status string
676        app_id: int or None
677            App id to filter for.
678
679        Returns
680        -------
681        dict
682            Containing run information (run_name, params, status, sample,
683            campaign, app)
684        """
685        selected = self._select_runs(
686            name=name,
687            campaign=campaign,
688            sampler=sampler,
689            status=status,
690            not_status=not_status,
691            app_id=app_id)
692        if selected.count() != 1:
693            logging.warning('Multiple runs selected - using the first')
694        selected = selected.first()
695        return self._run_to_dict(selected)
696
697    def runs(self, campaign=None, sampler=None, status=None, not_status=None, app_id=None):
698        """A generator to return all run information for selected `campaign` and `sampler`.
699
700        Parameters
701        ----------
702        campaign: int or None
703            Campaign id to filter for.
704        sampler: int or None
705            Sampler id to filter for.
706        status: enum(Status) or None
707            Status string to filter for.
708        not_status: enum(Status) or None
709            Exclude runs with this status string
710        app_id: int or None
711            App id to filter for.
712
713        Yields
714        ------
715        dict
716            Information on each selected run (key = run_name, value = dict of
717            run information fields.), one at a time.
718        """
719        selected = self._select_runs(
720            campaign=campaign,
721            sampler=sampler,
722            status=status,
723            not_status=not_status,
724            app_id=app_id)
725        for r in selected:
726            yield r.id, self._run_to_dict(r)
727
728    def run_ids(self, campaign=None, sampler=None, status=None, not_status=None, app_id=None):
729        """A generator to return all run IDs for selected `campaign` and `sampler`.
730
731        Parameters
732        ----------
733        campaign: int or None
734            Campaign id to filter for.
735        sampler: int or None
736            Sampler id to filter for.
737        status: enum(Status) or None
738            Status string to filter for.
739        not_status: enum(Status) or None
740            Exclude runs with this status string
741        app_id: int or None
742            App id to filter for.
743
744        Yields
745        ------
746        str
747            run ID for each selected run, one at a time.
748        """
749        selected = self._select_runs(
750            campaign=campaign,
751            sampler=sampler,
752            status=status,
753            not_status=not_status,
754            app_id=app_id)
755        for r in selected:
756            yield r.run_name
757
758    def get_num_runs(self, campaign=None, sampler=None, status=None, not_status=None):
759        """Returns the number of runs matching the filtering criteria.
760
761        Parameters
762        ----------
763        campaign: int or None
764            Campaign id to filter for.
765        sampler: int or None
766            Sampler id to filter for.
767        status: enum(Status) or None
768            Status string to filter for.
769        not_status: enum(Status) or None
770            Exclude runs with this status string
771
772        Returns
773        -------
774        int
775            The number of runs in the database matching the filtering criteria
776
777        """
778        selected = self._select_runs(
779            campaign=campaign,
780            sampler=sampler,
781            status=status,
782            not_status=not_status)
783        return selected.count()
784
785    def runs_dir(self, campaign_name=None):
786        """Get the directory used to store run information for `campaign_name`.
787
788        Parameters
789        ----------
790        campaign_name: str
791            Name of the selected campaign.
792
793        Returns
794        -------
795        str
796            Path containing run outputs.
797        """
798        return self._get_campaign_info(campaign_name=campaign_name).runs_dir
799
800    def store_result(self, run_id, result, change_status=True):
801        """Stores results of a simulation inside the RunTable given a run id.
802
803        Parameters
804        ----------
805        run_id: int
806            The id of a run to store the results in. This will be the run with which these
807            results are associated with. Namely the run that has the inputs used to generate
808            these results.
809        result: dict
810            Results in dictionary form. This is the same format as used by the `Decoder`.
811        change_status: bool
812            If set to False will not update the runs' status to COLLATED. This is sometimes
813            useful in scenarios where you want several apps to work on the same runs.
814        """
815        self.commit_counter += 1
816
817        def convert_nonserializable(obj):
818            if isinstance(obj, np.int64):
819                return int(obj)
820            raise TypeError('Unknown type:', type(obj))
821        result_ = result['result']
822        result.pop('result')
823        result.pop('run_info')
824        if change_status:
825            self.session.query(RunTable).\
826                filter(RunTable.id == run_id).\
827                update({'result': json.dumps(result_, default=convert_nonserializable),
828                        'status': constants.Status.COLLATED,
829                        'run_dir': result['rundir']})
830        else:
831            self.session.query(RunTable).\
832                filter(RunTable.id == run_id).\
833                update({'result': json.dumps(result_, default=convert_nonserializable),
834                        'run_dir': result['rundir']})
835        if self.commit_counter % COMMIT_RATE == 0:
836            self.session.commit()
837
838    def store_results(self, app_name, results):
839        """Stores the results from a given run in the database.
840
841        Parameters
842        ----------
843        run_name: str
844            name of the run
845        results: dict
846            dictionary with the results (from the decoder)
847        """
848        try:
849            app_id = self.session.query(AppTable).filter(AppTable.name == app_name).all()[0].id
850        except IndexError:
851            raise RuntimeError("app with the name {} not found".format(app_name))
852        commit_counter = 0
853        for run_id, result in results:
854            try:
855                self.session.query(RunTable).\
856                    filter(RunTable.id == run_id, RunTable.app == app_id).\
857                    update({'result': json.dumps(result), 'status': constants.Status.COLLATED})
858                commit_counter += 1
859                if commit_counter % COMMIT_RATE == 0:
860                    self.session.commit()
861            except IndexError:
862                raise RuntimeError("no runs with name {} found".format(run_id))
863        self.session.commit()
864
865    def get_results(self, app_name, sampler_id, status=constants.Status.COLLATED, iteration=-1):
866        """Returns the results as a pandas DataFrame.
867
868        Parameters
869        ----------
870        app_name: str
871            Name of the app to return data for.
872        sampler_id: int
873            ID of the sampler.
874        status: STATUS
875            Run status to filter for.
876        iteration: int
877            If a positive integer will return the results for a given iteration only.
878
879        Returns
880        -------
881        DataFrame
882            Will construct a `DataFrame` from the decoder output dictionaries.
883        """
884        try:
885            app_id = self.session.query(AppTable).filter(AppTable.name == app_name).all()[0].id
886        except IndexError:
887            raise RuntimeError("app with the name {} not found".format(app_name))
888        pd_result = {}
889        query = self.session.query(RunTable).\
890            filter(RunTable.app == app_id).\
891            filter(RunTable.sampler == sampler_id).\
892            filter(RunTable.status == status)
893        # if only a specific iteration is requested filter it out
894        if iteration >= 0:
895            query = query.filter(RunTable.iteration == iteration)
896        for row in query:
897            params = {'run_id': row.id}
898            params['iteration'] = row.iteration
899            params = {**params, **json.loads(row.params)}
900            result = json.loads(row.result)
901            pd_dict = {**params, **result}
902            for key in pd_dict.keys():
903                if not isinstance(pd_dict[key], list):
904                    try:
905                        pd_result[(key, 0)].append(pd_dict[key])
906                    except KeyError:
907                        pd_result[(key, 0)] = [pd_dict[key]]
908                else:
909                    for i, elt in enumerate(pd_dict[key]):
910                        try:
911                            pd_result[(key, i)].append(pd_dict[key][i])
912                        except KeyError:
913                            pd_result[(key, i)] = [pd_dict[key][i]]
914        try:
915            return pd.DataFrame(pd_result)
916        except ValueError:
917            raise RuntimeError(
918                'the results received from the database seem to be malformed - commonly because a vector quantity of interest changes dimensionality')
919
920    def relocate(self, new_path, campaign_name):
921        """Update all runs in the db with the new campaign path.
922
923        Parameters
924        ----------
925        new_path: str
926            new runs directory
927        campaign_name: str
928            name of the campaign
929        """
930        campaign_id = self.get_campaign_id(campaign_name)
931        campaign_info = self.session.query(CampaignTable).\
932            filter(CampaignTable.id == campaign_id).first()
933        path, runs_dir = os.path.split(campaign_info.runs_dir)
934        self.session.query(CampaignTable).\
935            filter(CampaignTable.id == campaign_id).\
936            update({'campaign_dir': str(new_path),
937                    'runs_dir': str(os.path.join(new_path, runs_dir))})
938        self.session.commit()
939
940    def dump(self):
941        """Dump the database as JSON for debugging purposes.
942
943        Returns
944        -------
945        dict
946            A database dump in JSON format.
947        """
948        meta = MetaData()
949        meta.reflect(bind=self.engine)
950        result = {}
951
952        # Create a connection from the engine
953        with self.engine.connect() as connection:
954            for table in meta.sorted_tables:
955                try:
956                    query_result = connection.execute(table.select()).fetchall()
957                    result[table.name] = [dict(row) for row in query_result]
958                except Exception as e:
959                    result[table.name] = str(e)
960
961        return json.dumps(result)
962    
COMMIT_RATE = 50000
logger = <Logger easyvvuq.db.sql (DEBUG)>
class Base(sqlalchemy.orm.decl_api._DynamicAttributesType, sqlalchemy.inspection.Inspectable[sqlalchemy.orm.mapper.Mapper[typing.Any]]):

The base class of the class hierarchy.

When called, it accepts no arguments and returns a new featureless instance that has no instance attributes and cannot be given any.

Base(**kwargs: Any)
2167def _declarative_constructor(self: Any, **kwargs: Any) -> None:
2168    """A simple constructor that allows initialization from kwargs.
2169
2170    Sets attributes on the constructed instance using the names and
2171    values in ``kwargs``.
2172
2173    Only keys that are present as
2174    attributes of the instance's class are allowed. These could be,
2175    for example, any mapped columns or relationships.
2176    """
2177    cls_ = type(self)
2178    for k in kwargs:
2179        if not hasattr(cls_, k):
2180            raise TypeError(
2181                "%r is an invalid keyword argument for %s" % (k, cls_.__name__)
2182            )
2183        setattr(self, k, kwargs[k])

A simple constructor that allows initialization from kwargs.

Sets attributes on the constructed instance using the names and values in kwargs.

Only keys that are present as attributes of the instance's class are allowed. These could be, for example, any mapped columns or relationships.

registry: sqlalchemy.orm.decl_api.registry = <sqlalchemy.orm.decl_api.registry object>
metadata: sqlalchemy.sql.schema.MetaData = MetaData()
class DBInfoTable(sqlalchemy.orm.decl_api._DynamicAttributesType, sqlalchemy.inspection.Inspectable[sqlalchemy.orm.mapper.Mapper[typing.Any]]):
53class DBInfoTable(Base):
54    """An SQLAlchemy schema for the database information table.
55    """
56    __tablename__ = 'db_info'
57    id = Column(Integer, primary_key=True)
58    next_run = Column(Integer)

An SQLAlchemy schema for the database information table.

DBInfoTable(**kwargs)

A simple constructor that allows initialization from kwargs.

Sets attributes on the constructed instance using the names and values in kwargs.

Only keys that are present as attributes of the instance's class are allowed. These could be, for example, any mapped columns or relationships.

id
next_run
class CampaignTable(sqlalchemy.orm.decl_api._DynamicAttributesType, sqlalchemy.inspection.Inspectable[sqlalchemy.orm.mapper.Mapper[typing.Any]]):
61class CampaignTable(Base):
62    """An SQLAlchemy schema for the campaign information table.
63    """
64    __tablename__ = 'campaign_info'
65    id = Column(Integer, primary_key=True)
66    name = Column(String, unique=True)
67    easyvvuq_version = Column(String)
68    campaign_dir_prefix = Column(String)
69    campaign_dir = Column(String)
70    runs_dir = Column(String)
71    sampler = Column(Integer, ForeignKey('sampler.id'))
72    active_app = Column(Integer, ForeignKey('app.id'))

An SQLAlchemy schema for the campaign information table.

CampaignTable(**kwargs)

A simple constructor that allows initialization from kwargs.

Sets attributes on the constructed instance using the names and values in kwargs.

Only keys that are present as attributes of the instance's class are allowed. These could be, for example, any mapped columns or relationships.

id
name
easyvvuq_version
campaign_dir_prefix
campaign_dir
runs_dir
sampler
active_app
class AppTable(sqlalchemy.orm.decl_api._DynamicAttributesType, sqlalchemy.inspection.Inspectable[sqlalchemy.orm.mapper.Mapper[typing.Any]]):
75class AppTable(Base):
76    """An SQLAlchemy schema for the app table.
77    """
78    __tablename__ = 'app'
79    id = Column(Integer, primary_key=True)
80    name = Column(String, unique=True)
81    params = Column(String)
82    actions = Column(String)

An SQLAlchemy schema for the app table.

AppTable(**kwargs)

A simple constructor that allows initialization from kwargs.

Sets attributes on the constructed instance using the names and values in kwargs.

Only keys that are present as attributes of the instance's class are allowed. These could be, for example, any mapped columns or relationships.

id
name
params
actions
class RunTable(sqlalchemy.orm.decl_api._DynamicAttributesType, sqlalchemy.inspection.Inspectable[sqlalchemy.orm.mapper.Mapper[typing.Any]]):
85class RunTable(Base):
86    """An SQLAlchemy schema for the run table.
87    """
88    __tablename__ = 'run'
89    id = Column(Integer, primary_key=True)
90    run_name = Column(String, index=True)
91    app = Column(Integer, ForeignKey('app.id'))
92    params = Column(String)
93    status = Column(Integer)
94    run_dir = Column(String)
95    result = Column(String, default="{}")
96    execution_info = Column(String, default="{}")
97    campaign = Column(Integer, ForeignKey('campaign_info.id'))
98    sampler = Column(Integer, ForeignKey('sampler.id'))
99    iteration = Column(Integer, default=0)

An SQLAlchemy schema for the run table.

RunTable(**kwargs)

A simple constructor that allows initialization from kwargs.

Sets attributes on the constructed instance using the names and values in kwargs.

Only keys that are present as attributes of the instance's class are allowed. These could be, for example, any mapped columns or relationships.

id
run_name
app
params
status
run_dir
result
execution_info
campaign
sampler
iteration
class SamplerTable(sqlalchemy.orm.decl_api._DynamicAttributesType, sqlalchemy.inspection.Inspectable[sqlalchemy.orm.mapper.Mapper[typing.Any]]):
102class SamplerTable(Base):
103    """An SQLAlchemy schema for the run table.
104    """
105    __tablename__ = 'sampler'
106    id = Column(Integer, primary_key=True)
107    sampler = Column(String)

An SQLAlchemy schema for the run table.

SamplerTable(**kwargs)

A simple constructor that allows initialization from kwargs.

Sets attributes on the constructed instance using the names and values in kwargs.

Only keys that are present as attributes of the instance's class are allowed. These could be, for example, any mapped columns or relationships.

id
sampler
@event.listens_for(Engine, 'connect')
def set_sqlite_pragma(dbapi_connection, connection_record):
110@event.listens_for(Engine, "connect")
111def set_sqlite_pragma(dbapi_connection, connection_record):
112    cursor = dbapi_connection.cursor()
113    cursor.execute("PRAGMA synchronous = OFF")
114    cursor.execute("PRAGMA journal_mode = OFF")
115    cursor.close()
class CampaignDB(easyvvuq.db.base.BaseCampaignDB):
118class CampaignDB(BaseCampaignDB):
119    """An interface between the campaign database and the campaign.
120
121    Parameters
122    ----------
123    location: str
124       database URI as needed by SQLAlchemy
125    """
126
127    def __init__(self, location=None):
128        if location is not None:
129            self.engine = create_engine(location)
130        else:
131            self.engine = create_engine('sqlite://')
132        self.commit_counter = 0
133        session_maker = sessionmaker(bind=self.engine)
134        self.session = session_maker()
135        Base.metadata.create_all(self.engine, checkfirst=True)
136
137    def resume_campaign(self, name):
138        """Resumes campaign.
139
140        Parameters
141        ----------
142        name: str
143           Name of the Campaign to resume. Must already exist in the database.
144        """
145        info = self.session.query(
146            CampaignTable).filter_by(name=name).first()
147        if info is None:
148            raise ValueError('Campaign with the given name not found.')
149        db_info = self.session.query(DBInfoTable).first()
150        self._next_run = db_info.next_run
151
152    def create_campaign(self, info):
153        """Creates a new campaign in the database.
154
155        Parameters
156        ----------
157        info: CampaignInfo
158            This `easyvvuq.data_structs.CampaignInfo` will contain information
159            needed to construct the Campaign table.
160        """
161        is_db_empty = (self.session.query(CampaignTable).first() is None)
162        version_check = self.session.query(
163            CampaignTable).filter(CampaignTable.easyvvuq_version != info.easyvvuq_version).all()
164        if (not is_db_empty) and (len(version_check) != 0):
165            raise RuntimeError('Database contains campaign created with an incompatible' +
166                               ' version of EasyVVUQ!')
167        self._next_run = 1
168        self.session.add(CampaignTable(**info.to_dict(flatten=True)))
169        self.session.add(DBInfoTable(next_run=self._next_run))
170        self.session.commit()
171
172    def get_active_app(self):
173        """Returns active app table.
174
175        Returns
176        -------
177        AppTable
178        """
179        return self.session.query(AppTable, CampaignTable).filter(
180            AppTable.id == CampaignTable.active_app).first()
181
182    def campaign_exists(self, name):
183        """Check if campaign specified by that name already exists.
184
185        Parameters
186        ----------
187        name: str
188
189        Returns
190        -------
191        bool
192          True if such a campaign already exists, False otherwise
193        """
194        result = self.session.query(CampaignTable).filter(
195            CampaignTable.name == name).all()
196        return len(result) > 0
197
198    def app(self, name=None):
199        """Get app information. Specific applications selected by `name`,
200        otherwise first entry in database 'app' selected.
201
202        Parameters
203        ----------
204        name : str or None
205            Name of selected app, if `None` given then first app will be
206            selected.
207
208        Returns
209        -------
210        dict
211            Information about the application.
212        """
213
214        if name is None:
215            selected = self.session.query(AppTable).all()
216        else:
217            selected = self.session.query(AppTable).filter_by(name=name).all()
218
219        if len(selected) == 0:
220            message = f'No entry for app: ({name}).'
221            logger.critical(message)
222            raise RuntimeError(message)
223
224        selected_app = selected[0]
225
226        app_dict = {
227            'id': selected_app.id,
228            'name': selected_app.name,
229            'params': ParamsSpecification.deserialize(selected_app.params),
230            'actions': selected_app.actions,
231        }
232
233        return app_dict
234
235    def set_active_app(self, name):
236        """Set an app specified by name as active.
237
238        Parameters
239        ----------
240        name: str
241           name of the app to set as active
242        """
243        selected = self.session.query(AppTable).filter_by(name=name).all()
244        if len(selected) == 0:
245            raise RuntimeError('no such app - {}'.format(name))
246        assert (not (len(selected) > 1))
247        app = selected[0]
248        self.session.query(CampaignTable).update({'active_app': app.id})
249        self.session.commit()
250
251    def add_app(self, app_info):
252        """Add application to the 'app' table.
253
254        Parameters
255        ----------
256        app_info: AppInfo
257            Application definition.
258        """
259
260        # Check that no app with same name exists
261        name = app_info.name
262        selected = self.session.query(AppTable).filter_by(name=name).all()
263        if len(selected) > 0:
264            message = (
265                f'There is already an app in this database with name {name}'
266                f'(found {len(selected)}).'
267            )
268            logger.critical(message)
269            raise RuntimeError(message)
270
271        app_dict = app_info.to_dict(flatten=True)
272
273        db_entry = AppTable(**app_dict)
274        self.session.add(db_entry)
275        self.session.commit()
276
277    def replace_actions(self, app_name, actions):
278        """Replace actions for an app with a given name.
279
280        Parameters
281        ----------
282        app_name: str
283            Name of the app.
284        actions: Actions
285            `Actions` instance, will replace the current `Actions` of an app.
286        """
287        self.session.query(AppTable).filter_by(name=app_name).update(
288            {'actions': easyvvuq_serialize(actions)})
289        self.session.commit()
290
291    def add_sampler(self, sampler_element):
292        """Add new Sampler to the 'sampler' table.
293
294        Parameters
295        ----------
296        sampler_element: Sampler
297            An EasyVVUQ sampler.
298
299        Returns
300        -------
301        int
302            The sampler `id` in the database.
303        """
304        db_entry = SamplerTable(sampler=easyvvuq_serialize(sampler_element))
305
306        self.session.add(db_entry)
307        self.session.commit()
308
309        return db_entry.id
310
311    def update_sampler(self, sampler_id, sampler_element):
312        """Update the state of the Sampler with id 'sampler_id' to
313        that in the passed 'sampler_element'
314
315        Parameters
316        ----------
317        sampler_id: int
318            The id of the sampler in the db to update
319        sampler_element: Sampler
320            The sampler that should be used as the new state
321        """
322
323        selected = self.session.get(SamplerTable,sampler_id)
324        selected.sampler = easyvvuq_serialize(sampler_element)
325        self.session.commit()
326
327    def resurrect_sampler(self, sampler_id):
328        """Return the sampler object corresponding to id sampler_id in the database.
329        It is deserialized from the state stored in the database.
330
331        Parameters
332        ----------
333        sampler_id: int
334            The id of the sampler to resurrect
335
336        Returns
337        -------
338        Sampler
339            The 'live' sampler object, deserialized from the state in the db
340        """
341        try:
342            serialized_sampler = self.session.get(SamplerTable,sampler_id).sampler 
343            sampler = easyvvuq_deserialize(serialized_sampler.encode('utf-8'))
344        except AttributeError:
345            sampler = None
346        return sampler
347
348    def resurrect_app(self, app_name):
349        """Return the 'live' encoder, decoder and collation objects corresponding to the app with
350        name 'app_name' in the database. They are deserialized from the states previously
351        stored in the database.
352
353        Parameters
354        ----------
355        app_name: string
356            Name of the app to resurrect
357
358        Returns
359        -------
360        Actions
361            The 'live' `Actions` object associated with this app. Used to execute the simulation
362            associated with the app as well as do any pre- and post-processing.
363        """
364        app_info = self.app(app_name)
365        actions = easyvvuq_deserialize(app_info['actions'])
366        return actions
367
368    def add_runs(self, run_info_list=None, run_prefix='run_', iteration=0):
369        """Add list of runs to the `runs` table in the database.
370
371        Parameters
372        ----------
373        run_info_list: List of RunInfo objects
374            Each RunInfo object contains relevant run fields: params, status (where in the
375            EasyVVUQ workflow is this RunTable), campaign (id number), sample, app
376        run_prefix: str
377            Prefix for run name
378        iteration: int
379            Iteration number used by iterative workflows. For example, MCMC. Can be left
380            as default zero in other cases.
381        """
382        # Add all runs to RunTable
383        commit_counter = 0
384        for run_info in run_info_list:
385            run_info.run_name = f"{run_prefix}{self._next_run}"
386            run_info.iteration = iteration
387            run = RunTable(**run_info.to_dict(flatten=True))
388            self.session.add(run)
389            self._next_run += 1
390            commit_counter += 1
391            if commit_counter % COMMIT_RATE == 0:
392                self.session.commit()
393        # Update run and ensemble counters in db
394        db_info = self.session.query(DBInfoTable).first()
395        db_info.next_run = self._next_run
396        self.session.commit()
397
398    @staticmethod
399    def _run_to_dict(run_row):
400        """Convert the provided row from 'runs' table into a dictionary
401
402        Parameters
403        ----------
404        run_row: RunTable
405            Information on a particular run in the database.
406
407        Returns
408        -------
409        dict
410            Contains run information (keys = run_name, params, status, sample,
411            campaign and app)
412        """
413
414        run_info = {
415            'run_name': run_row.run_name,
416            'params': json.loads(run_row.params),
417            'status': constants.Status(run_row.status),
418            'sampler': run_row.sampler,
419            'campaign': run_row.campaign,
420            'app': run_row.app,
421            'result': run_row.result,
422            'run_dir': run_row.run_dir
423        }
424
425        return run_info
426
427    def set_dir_for_run(self, run_name, run_dir, campaign=None, sampler=None):
428        """Set the 'run_dir' path for the specified run in the database.
429
430        Parameters
431        ----------
432        run_name: str
433            Name of run to filter for.
434        run_dir: str
435            Directory path associated to set for this run.
436        campaign:  int or None
437            Campaign id to filter for.
438        sampler: int or None
439            Sample id to filter for.
440        """
441        filter_options = {'run_name': run_name}
442        if campaign:
443            filter_options['campaign'] = campaign
444        if sampler:
445            filter_options['sampler'] = sampler
446        selected = self.session.query(RunTable).filter_by(**filter_options)
447        if selected.count() != 1:
448            logging.critical('Multiple runs selected - using the first')
449        selected = selected.first()
450        selected.run_dir = run_dir
451        self.session.commit()
452
453    def get_run_status(self, run_id, campaign=None, sampler=None):
454        """Return the status (enum) for the run with name 'run_name' (and, optionally,
455        filtering for campaign and sampler by id)
456
457        Parameters
458        ----------
459        run_id: int
460            id of the run
461        campaign: int
462            ID of the desired Campaign
463        sampler: int
464            ID of the desired Sampler
465
466        Returns
467        -------
468        enum(Status)
469            Status of the run.
470        """
471        filter_options = {'id': run_id}
472        if campaign:
473            filter_options['campaign'] = campaign
474        if sampler:
475            filter_options['sampler'] = sampler
476        selected = self.session.query(RunTable).filter_by(**filter_options)
477        if selected.count() != 1:
478            logging.critical('Multiple runs selected - using the first')
479        selected = selected.first()
480        return constants.Status(selected.status)
481
482    def set_run_statuses(self, run_id_list, status):
483        """Set the specified 'status' (enum) for all runs in the list run_id_list
484
485        Parameters
486        ----------
487        run_id_list: list of int
488            a list of run ids
489        status: enum(Status)
490            The new status all listed runs should now have
491        """
492        self.session.query(RunTable).filter(
493            RunTable.id.in_(run_id_list)).update(
494                {RunTable.status: status}, synchronize_session='fetch')
495        self.session.commit()
496
497    def campaigns(self):
498        """Get list of campaigns for which information is stored in the
499        database.
500
501        Returns
502        -------
503        list
504            Campaign names.
505        """
506
507        return [c.name for c in self.session.query(CampaignTable).all()]
508
509    def _get_campaign_info(self, campaign_name=None):
510        """Retrieves Campaign info based on name.
511
512        Parameters
513        ----------
514        campaign_name: str
515            Name of campaign to select.
516
517        Returns
518        -------
519            SQLAlchemy query for campaign with this name.
520        """
521        assert (isinstance(campaign_name, str) or campaign_name is None)
522        query = self.session.query(CampaignTable)
523        if campaign_name is None:
524            campaign_info = query
525        else:
526            campaign_info = query.filter_by(name=campaign_name).all()
527        if campaign_name is not None:
528            if len(campaign_info) > 1:
529                logger.warning(
530                    'More than one campaign selected - using first one.')
531            elif len(campaign_info) == 0:
532                message = 'No campaign available.'
533                logger.critical(message)
534                raise RuntimeError(message)
535            return campaign_info[0]
536        return campaign_info.first()
537
538    def get_campaign_id(self, name):
539        """Return the (database) id corresponding to the campaign with name 'name'.
540
541        Parameters
542        ----------
543        name: str
544            Name of the campaign.
545
546        Returns
547        -------
548        int
549            The id of the campaign with the specified name
550        """
551
552        selected = self.session.query(
553            CampaignTable.name.label(name),
554            CampaignTable.id).filter(CampaignTable.name == name).all()
555        if len(selected) == 0:
556            msg = f"No campaign with name {name} found in campaign database"
557            logger.error(msg)
558            raise RuntimeError(msg)
559        if len(selected) > 1:
560            msg = (
561                f"More than one campaign with name {name} found in"
562                f"campaign database. Database state is compromised."
563            )
564            logger.error(msg)
565            raise RuntimeError(msg)
566        # Return the database ID for the specified campaign
567        return selected[0][1]
568
569    def get_sampler_id(self, campaign_id):
570        """Return the (database) id corresponding to the sampler currently set
571        for the campaign with id 'campaign_id'
572
573        Parameters
574        ----------
575        campaign_id: int
576            ID of the campaign.
577
578        Returns
579        -------
580        int
581            The id of the sampler set for the specified campaign
582        """
583        sampler_id = self.session.get(CampaignTable,campaign_id).sampler
584        return sampler_id
585
586    def set_sampler(self, campaign_id, sampler_id):
587        """Set specified campaign to be using specified sampler
588
589        Parameters
590        ----------
591        campaign_id: int
592            ID of the campaign.
593        sampler_id: int
594            ID of the sampler.
595        """
596        self.session.get(CampaignTable,campaign_id).sampler = sampler_id
597        self.session.commit()
598
599    def campaign_dir(self, campaign_name=None):
600        """Get campaign directory for `campaign_name`.
601
602        Parameters
603        ----------
604        campaign_name: str
605            Name of campaign to select
606
607        Returns
608        -------
609        str
610            Path to campaign directory.
611        """
612        return self._get_campaign_info(campaign_name=campaign_name).campaign_dir
613
614    def _select_runs(
615            self,
616            name=None,
617            campaign=None,
618            sampler=None,
619            status=None,
620            not_status=None,
621            app_id=None):
622        """Select all runs in the database which match the input criteria.
623
624        Parameters
625        ----------
626        name: str
627            Name of run to filter for.
628        campaign:  int or None
629            Campaign id to filter for.
630        sampler: int or None
631            Sampler id to filter for.
632        status: enum(Status) or None
633            Status string to filter for.
634        not_status: enum(Status) or None
635            Exclude runs with this status string
636        app_id: int or None
637            App id to filter for.
638
639        Returns
640        -------
641        sqlalchemy.orm.query.Query
642            Selected runs from the database run table.
643        """
644        filter_options = {}
645        if name:
646            filter_options['run_name'] = name
647        if campaign:
648            filter_options['campaign'] = campaign
649        if sampler:
650            filter_options['sampler'] = sampler
651        if status:
652            filter_options['status'] = status
653        if app_id:
654            filter_options['app'] = app_id
655
656        # Note that for some databases this can be sped up with a yield_per(), but not all
657        selected = self.session.query(RunTable).filter_by(
658            **filter_options).filter(RunTable.status != not_status)
659
660        return selected
661
662    def run(self, name, campaign=None, sampler=None, status=None, not_status=None, app_id=None):
663        """Get the information for a specified run.
664
665        Parameters
666        ----------
667        name: str
668            Name of run to filter for.
669        campaign:  int or None
670            Campaign id to filter for.
671        sampler: int or None
672            Sampler id to filter for.
673        status: enum(Status) or None
674            Status string to filter for.
675        not_status: enum(Status) or None
676            Exclude runs with this status string
677        app_id: int or None
678            App id to filter for.
679
680        Returns
681        -------
682        dict
683            Containing run information (run_name, params, status, sample,
684            campaign, app)
685        """
686        selected = self._select_runs(
687            name=name,
688            campaign=campaign,
689            sampler=sampler,
690            status=status,
691            not_status=not_status,
692            app_id=app_id)
693        if selected.count() != 1:
694            logging.warning('Multiple runs selected - using the first')
695        selected = selected.first()
696        return self._run_to_dict(selected)
697
698    def runs(self, campaign=None, sampler=None, status=None, not_status=None, app_id=None):
699        """A generator to return all run information for selected `campaign` and `sampler`.
700
701        Parameters
702        ----------
703        campaign: int or None
704            Campaign id to filter for.
705        sampler: int or None
706            Sampler id to filter for.
707        status: enum(Status) or None
708            Status string to filter for.
709        not_status: enum(Status) or None
710            Exclude runs with this status string
711        app_id: int or None
712            App id to filter for.
713
714        Yields
715        ------
716        dict
717            Information on each selected run (key = run_name, value = dict of
718            run information fields.), one at a time.
719        """
720        selected = self._select_runs(
721            campaign=campaign,
722            sampler=sampler,
723            status=status,
724            not_status=not_status,
725            app_id=app_id)
726        for r in selected:
727            yield r.id, self._run_to_dict(r)
728
729    def run_ids(self, campaign=None, sampler=None, status=None, not_status=None, app_id=None):
730        """A generator to return all run IDs for selected `campaign` and `sampler`.
731
732        Parameters
733        ----------
734        campaign: int or None
735            Campaign id to filter for.
736        sampler: int or None
737            Sampler id to filter for.
738        status: enum(Status) or None
739            Status string to filter for.
740        not_status: enum(Status) or None
741            Exclude runs with this status string
742        app_id: int or None
743            App id to filter for.
744
745        Yields
746        ------
747        str
748            run ID for each selected run, one at a time.
749        """
750        selected = self._select_runs(
751            campaign=campaign,
752            sampler=sampler,
753            status=status,
754            not_status=not_status,
755            app_id=app_id)
756        for r in selected:
757            yield r.run_name
758
759    def get_num_runs(self, campaign=None, sampler=None, status=None, not_status=None):
760        """Returns the number of runs matching the filtering criteria.
761
762        Parameters
763        ----------
764        campaign: int or None
765            Campaign id to filter for.
766        sampler: int or None
767            Sampler id to filter for.
768        status: enum(Status) or None
769            Status string to filter for.
770        not_status: enum(Status) or None
771            Exclude runs with this status string
772
773        Returns
774        -------
775        int
776            The number of runs in the database matching the filtering criteria
777
778        """
779        selected = self._select_runs(
780            campaign=campaign,
781            sampler=sampler,
782            status=status,
783            not_status=not_status)
784        return selected.count()
785
786    def runs_dir(self, campaign_name=None):
787        """Get the directory used to store run information for `campaign_name`.
788
789        Parameters
790        ----------
791        campaign_name: str
792            Name of the selected campaign.
793
794        Returns
795        -------
796        str
797            Path containing run outputs.
798        """
799        return self._get_campaign_info(campaign_name=campaign_name).runs_dir
800
801    def store_result(self, run_id, result, change_status=True):
802        """Stores results of a simulation inside the RunTable given a run id.
803
804        Parameters
805        ----------
806        run_id: int
807            The id of a run to store the results in. This will be the run with which these
808            results are associated with. Namely the run that has the inputs used to generate
809            these results.
810        result: dict
811            Results in dictionary form. This is the same format as used by the `Decoder`.
812        change_status: bool
813            If set to False will not update the runs' status to COLLATED. This is sometimes
814            useful in scenarios where you want several apps to work on the same runs.
815        """
816        self.commit_counter += 1
817
818        def convert_nonserializable(obj):
819            if isinstance(obj, np.int64):
820                return int(obj)
821            raise TypeError('Unknown type:', type(obj))
822        result_ = result['result']
823        result.pop('result')
824        result.pop('run_info')
825        if change_status:
826            self.session.query(RunTable).\
827                filter(RunTable.id == run_id).\
828                update({'result': json.dumps(result_, default=convert_nonserializable),
829                        'status': constants.Status.COLLATED,
830                        'run_dir': result['rundir']})
831        else:
832            self.session.query(RunTable).\
833                filter(RunTable.id == run_id).\
834                update({'result': json.dumps(result_, default=convert_nonserializable),
835                        'run_dir': result['rundir']})
836        if self.commit_counter % COMMIT_RATE == 0:
837            self.session.commit()
838
839    def store_results(self, app_name, results):
840        """Stores the results from a given run in the database.
841
842        Parameters
843        ----------
844        run_name: str
845            name of the run
846        results: dict
847            dictionary with the results (from the decoder)
848        """
849        try:
850            app_id = self.session.query(AppTable).filter(AppTable.name == app_name).all()[0].id
851        except IndexError:
852            raise RuntimeError("app with the name {} not found".format(app_name))
853        commit_counter = 0
854        for run_id, result in results:
855            try:
856                self.session.query(RunTable).\
857                    filter(RunTable.id == run_id, RunTable.app == app_id).\
858                    update({'result': json.dumps(result), 'status': constants.Status.COLLATED})
859                commit_counter += 1
860                if commit_counter % COMMIT_RATE == 0:
861                    self.session.commit()
862            except IndexError:
863                raise RuntimeError("no runs with name {} found".format(run_id))
864        self.session.commit()
865
866    def get_results(self, app_name, sampler_id, status=constants.Status.COLLATED, iteration=-1):
867        """Returns the results as a pandas DataFrame.
868
869        Parameters
870        ----------
871        app_name: str
872            Name of the app to return data for.
873        sampler_id: int
874            ID of the sampler.
875        status: STATUS
876            Run status to filter for.
877        iteration: int
878            If a positive integer will return the results for a given iteration only.
879
880        Returns
881        -------
882        DataFrame
883            Will construct a `DataFrame` from the decoder output dictionaries.
884        """
885        try:
886            app_id = self.session.query(AppTable).filter(AppTable.name == app_name).all()[0].id
887        except IndexError:
888            raise RuntimeError("app with the name {} not found".format(app_name))
889        pd_result = {}
890        query = self.session.query(RunTable).\
891            filter(RunTable.app == app_id).\
892            filter(RunTable.sampler == sampler_id).\
893            filter(RunTable.status == status)
894        # if only a specific iteration is requested filter it out
895        if iteration >= 0:
896            query = query.filter(RunTable.iteration == iteration)
897        for row in query:
898            params = {'run_id': row.id}
899            params['iteration'] = row.iteration
900            params = {**params, **json.loads(row.params)}
901            result = json.loads(row.result)
902            pd_dict = {**params, **result}
903            for key in pd_dict.keys():
904                if not isinstance(pd_dict[key], list):
905                    try:
906                        pd_result[(key, 0)].append(pd_dict[key])
907                    except KeyError:
908                        pd_result[(key, 0)] = [pd_dict[key]]
909                else:
910                    for i, elt in enumerate(pd_dict[key]):
911                        try:
912                            pd_result[(key, i)].append(pd_dict[key][i])
913                        except KeyError:
914                            pd_result[(key, i)] = [pd_dict[key][i]]
915        try:
916            return pd.DataFrame(pd_result)
917        except ValueError:
918            raise RuntimeError(
919                'the results received from the database seem to be malformed - commonly because a vector quantity of interest changes dimensionality')
920
921    def relocate(self, new_path, campaign_name):
922        """Update all runs in the db with the new campaign path.
923
924        Parameters
925        ----------
926        new_path: str
927            new runs directory
928        campaign_name: str
929            name of the campaign
930        """
931        campaign_id = self.get_campaign_id(campaign_name)
932        campaign_info = self.session.query(CampaignTable).\
933            filter(CampaignTable.id == campaign_id).first()
934        path, runs_dir = os.path.split(campaign_info.runs_dir)
935        self.session.query(CampaignTable).\
936            filter(CampaignTable.id == campaign_id).\
937            update({'campaign_dir': str(new_path),
938                    'runs_dir': str(os.path.join(new_path, runs_dir))})
939        self.session.commit()
940
941    def dump(self):
942        """Dump the database as JSON for debugging purposes.
943
944        Returns
945        -------
946        dict
947            A database dump in JSON format.
948        """
949        meta = MetaData()
950        meta.reflect(bind=self.engine)
951        result = {}
952
953        # Create a connection from the engine
954        with self.engine.connect() as connection:
955            for table in meta.sorted_tables:
956                try:
957                    query_result = connection.execute(table.select()).fetchall()
958                    result[table.name] = [dict(row) for row in query_result]
959                except Exception as e:
960                    result[table.name] = str(e)
961
962        return json.dumps(result)

An interface between the campaign database and the campaign.

Parameters
  • location (str): database URI as needed by SQLAlchemy
CampaignDB(location=None)
127    def __init__(self, location=None):
128        if location is not None:
129            self.engine = create_engine(location)
130        else:
131            self.engine = create_engine('sqlite://')
132        self.commit_counter = 0
133        session_maker = sessionmaker(bind=self.engine)
134        self.session = session_maker()
135        Base.metadata.create_all(self.engine, checkfirst=True)
commit_counter
session
def resume_campaign(self, name):
137    def resume_campaign(self, name):
138        """Resumes campaign.
139
140        Parameters
141        ----------
142        name: str
143           Name of the Campaign to resume. Must already exist in the database.
144        """
145        info = self.session.query(
146            CampaignTable).filter_by(name=name).first()
147        if info is None:
148            raise ValueError('Campaign with the given name not found.')
149        db_info = self.session.query(DBInfoTable).first()
150        self._next_run = db_info.next_run

Resumes campaign.

Parameters
  • name (str): Name of the Campaign to resume. Must already exist in the database.
def create_campaign(self, info):
152    def create_campaign(self, info):
153        """Creates a new campaign in the database.
154
155        Parameters
156        ----------
157        info: CampaignInfo
158            This `easyvvuq.data_structs.CampaignInfo` will contain information
159            needed to construct the Campaign table.
160        """
161        is_db_empty = (self.session.query(CampaignTable).first() is None)
162        version_check = self.session.query(
163            CampaignTable).filter(CampaignTable.easyvvuq_version != info.easyvvuq_version).all()
164        if (not is_db_empty) and (len(version_check) != 0):
165            raise RuntimeError('Database contains campaign created with an incompatible' +
166                               ' version of EasyVVUQ!')
167        self._next_run = 1
168        self.session.add(CampaignTable(**info.to_dict(flatten=True)))
169        self.session.add(DBInfoTable(next_run=self._next_run))
170        self.session.commit()

Creates a new campaign in the database.

Parameters
def get_active_app(self):
172    def get_active_app(self):
173        """Returns active app table.
174
175        Returns
176        -------
177        AppTable
178        """
179        return self.session.query(AppTable, CampaignTable).filter(
180            AppTable.id == CampaignTable.active_app).first()

Returns active app table.

Returns
  • AppTable
def campaign_exists(self, name):
182    def campaign_exists(self, name):
183        """Check if campaign specified by that name already exists.
184
185        Parameters
186        ----------
187        name: str
188
189        Returns
190        -------
191        bool
192          True if such a campaign already exists, False otherwise
193        """
194        result = self.session.query(CampaignTable).filter(
195            CampaignTable.name == name).all()
196        return len(result) > 0

Check if campaign specified by that name already exists.

Parameters
  • name (str):
Returns
  • bool: True if such a campaign already exists, False otherwise
def app(self, name=None):
198    def app(self, name=None):
199        """Get app information. Specific applications selected by `name`,
200        otherwise first entry in database 'app' selected.
201
202        Parameters
203        ----------
204        name : str or None
205            Name of selected app, if `None` given then first app will be
206            selected.
207
208        Returns
209        -------
210        dict
211            Information about the application.
212        """
213
214        if name is None:
215            selected = self.session.query(AppTable).all()
216        else:
217            selected = self.session.query(AppTable).filter_by(name=name).all()
218
219        if len(selected) == 0:
220            message = f'No entry for app: ({name}).'
221            logger.critical(message)
222            raise RuntimeError(message)
223
224        selected_app = selected[0]
225
226        app_dict = {
227            'id': selected_app.id,
228            'name': selected_app.name,
229            'params': ParamsSpecification.deserialize(selected_app.params),
230            'actions': selected_app.actions,
231        }
232
233        return app_dict

Get app information. Specific applications selected by name, otherwise first entry in database 'app' selected.

Parameters
  • name (str or None): Name of selected app, if None given then first app will be selected.
Returns
  • dict: Information about the application.
def set_active_app(self, name):
235    def set_active_app(self, name):
236        """Set an app specified by name as active.
237
238        Parameters
239        ----------
240        name: str
241           name of the app to set as active
242        """
243        selected = self.session.query(AppTable).filter_by(name=name).all()
244        if len(selected) == 0:
245            raise RuntimeError('no such app - {}'.format(name))
246        assert (not (len(selected) > 1))
247        app = selected[0]
248        self.session.query(CampaignTable).update({'active_app': app.id})
249        self.session.commit()

Set an app specified by name as active.

Parameters
  • name (str): name of the app to set as active
def add_app(self, app_info):
251    def add_app(self, app_info):
252        """Add application to the 'app' table.
253
254        Parameters
255        ----------
256        app_info: AppInfo
257            Application definition.
258        """
259
260        # Check that no app with same name exists
261        name = app_info.name
262        selected = self.session.query(AppTable).filter_by(name=name).all()
263        if len(selected) > 0:
264            message = (
265                f'There is already an app in this database with name {name}'
266                f'(found {len(selected)}).'
267            )
268            logger.critical(message)
269            raise RuntimeError(message)
270
271        app_dict = app_info.to_dict(flatten=True)
272
273        db_entry = AppTable(**app_dict)
274        self.session.add(db_entry)
275        self.session.commit()

Add application to the 'app' table.

Parameters
  • app_info (AppInfo): Application definition.
def replace_actions(self, app_name, actions):
277    def replace_actions(self, app_name, actions):
278        """Replace actions for an app with a given name.
279
280        Parameters
281        ----------
282        app_name: str
283            Name of the app.
284        actions: Actions
285            `Actions` instance, will replace the current `Actions` of an app.
286        """
287        self.session.query(AppTable).filter_by(name=app_name).update(
288            {'actions': easyvvuq_serialize(actions)})
289        self.session.commit()

Replace actions for an app with a given name.

Parameters
  • app_name (str): Name of the app.
  • actions (Actions): Actions instance, will replace the current Actions of an app.
def add_sampler(self, sampler_element):
291    def add_sampler(self, sampler_element):
292        """Add new Sampler to the 'sampler' table.
293
294        Parameters
295        ----------
296        sampler_element: Sampler
297            An EasyVVUQ sampler.
298
299        Returns
300        -------
301        int
302            The sampler `id` in the database.
303        """
304        db_entry = SamplerTable(sampler=easyvvuq_serialize(sampler_element))
305
306        self.session.add(db_entry)
307        self.session.commit()
308
309        return db_entry.id

Add new Sampler to the 'sampler' table.

Parameters
  • sampler_element (Sampler): An EasyVVUQ sampler.
Returns
  • int: The sampler id in the database.
def update_sampler(self, sampler_id, sampler_element):
311    def update_sampler(self, sampler_id, sampler_element):
312        """Update the state of the Sampler with id 'sampler_id' to
313        that in the passed 'sampler_element'
314
315        Parameters
316        ----------
317        sampler_id: int
318            The id of the sampler in the db to update
319        sampler_element: Sampler
320            The sampler that should be used as the new state
321        """
322
323        selected = self.session.get(SamplerTable,sampler_id)
324        selected.sampler = easyvvuq_serialize(sampler_element)
325        self.session.commit()

Update the state of the Sampler with id 'sampler_id' to that in the passed 'sampler_element'

Parameters
  • sampler_id (int): The id of the sampler in the db to update
  • sampler_element (Sampler): The sampler that should be used as the new state
def resurrect_sampler(self, sampler_id):
327    def resurrect_sampler(self, sampler_id):
328        """Return the sampler object corresponding to id sampler_id in the database.
329        It is deserialized from the state stored in the database.
330
331        Parameters
332        ----------
333        sampler_id: int
334            The id of the sampler to resurrect
335
336        Returns
337        -------
338        Sampler
339            The 'live' sampler object, deserialized from the state in the db
340        """
341        try:
342            serialized_sampler = self.session.get(SamplerTable,sampler_id).sampler 
343            sampler = easyvvuq_deserialize(serialized_sampler.encode('utf-8'))
344        except AttributeError:
345            sampler = None
346        return sampler

Return the sampler object corresponding to id sampler_id in the database. It is deserialized from the state stored in the database.

Parameters
  • sampler_id (int): The id of the sampler to resurrect
Returns
  • Sampler: The 'live' sampler object, deserialized from the state in the db
def resurrect_app(self, app_name):
348    def resurrect_app(self, app_name):
349        """Return the 'live' encoder, decoder and collation objects corresponding to the app with
350        name 'app_name' in the database. They are deserialized from the states previously
351        stored in the database.
352
353        Parameters
354        ----------
355        app_name: string
356            Name of the app to resurrect
357
358        Returns
359        -------
360        Actions
361            The 'live' `Actions` object associated with this app. Used to execute the simulation
362            associated with the app as well as do any pre- and post-processing.
363        """
364        app_info = self.app(app_name)
365        actions = easyvvuq_deserialize(app_info['actions'])
366        return actions

Return the 'live' encoder, decoder and collation objects corresponding to the app with name 'app_name' in the database. They are deserialized from the states previously stored in the database.

Parameters
  • app_name (string): Name of the app to resurrect
Returns
  • Actions: The 'live' Actions object associated with this app. Used to execute the simulation associated with the app as well as do any pre- and post-processing.
def add_runs(self, run_info_list=None, run_prefix='run_', iteration=0):
368    def add_runs(self, run_info_list=None, run_prefix='run_', iteration=0):
369        """Add list of runs to the `runs` table in the database.
370
371        Parameters
372        ----------
373        run_info_list: List of RunInfo objects
374            Each RunInfo object contains relevant run fields: params, status (where in the
375            EasyVVUQ workflow is this RunTable), campaign (id number), sample, app
376        run_prefix: str
377            Prefix for run name
378        iteration: int
379            Iteration number used by iterative workflows. For example, MCMC. Can be left
380            as default zero in other cases.
381        """
382        # Add all runs to RunTable
383        commit_counter = 0
384        for run_info in run_info_list:
385            run_info.run_name = f"{run_prefix}{self._next_run}"
386            run_info.iteration = iteration
387            run = RunTable(**run_info.to_dict(flatten=True))
388            self.session.add(run)
389            self._next_run += 1
390            commit_counter += 1
391            if commit_counter % COMMIT_RATE == 0:
392                self.session.commit()
393        # Update run and ensemble counters in db
394        db_info = self.session.query(DBInfoTable).first()
395        db_info.next_run = self._next_run
396        self.session.commit()

Add list of runs to the runs table in the database.

Parameters
  • run_info_list (List of RunInfo objects): Each RunInfo object contains relevant run fields: params, status (where in the EasyVVUQ workflow is this RunTable), campaign (id number), sample, app
  • run_prefix (str): Prefix for run name
  • iteration (int): Iteration number used by iterative workflows. For example, MCMC. Can be left as default zero in other cases.
def set_dir_for_run(self, run_name, run_dir, campaign=None, sampler=None):
427    def set_dir_for_run(self, run_name, run_dir, campaign=None, sampler=None):
428        """Set the 'run_dir' path for the specified run in the database.
429
430        Parameters
431        ----------
432        run_name: str
433            Name of run to filter for.
434        run_dir: str
435            Directory path associated to set for this run.
436        campaign:  int or None
437            Campaign id to filter for.
438        sampler: int or None
439            Sample id to filter for.
440        """
441        filter_options = {'run_name': run_name}
442        if campaign:
443            filter_options['campaign'] = campaign
444        if sampler:
445            filter_options['sampler'] = sampler
446        selected = self.session.query(RunTable).filter_by(**filter_options)
447        if selected.count() != 1:
448            logging.critical('Multiple runs selected - using the first')
449        selected = selected.first()
450        selected.run_dir = run_dir
451        self.session.commit()

Set the 'run_dir' path for the specified run in the database.

Parameters
  • run_name (str): Name of run to filter for.
  • run_dir (str): Directory path associated to set for this run.
  • campaign (int or None): Campaign id to filter for.
  • sampler (int or None): Sample id to filter for.
def get_run_status(self, run_id, campaign=None, sampler=None):
453    def get_run_status(self, run_id, campaign=None, sampler=None):
454        """Return the status (enum) for the run with name 'run_name' (and, optionally,
455        filtering for campaign and sampler by id)
456
457        Parameters
458        ----------
459        run_id: int
460            id of the run
461        campaign: int
462            ID of the desired Campaign
463        sampler: int
464            ID of the desired Sampler
465
466        Returns
467        -------
468        enum(Status)
469            Status of the run.
470        """
471        filter_options = {'id': run_id}
472        if campaign:
473            filter_options['campaign'] = campaign
474        if sampler:
475            filter_options['sampler'] = sampler
476        selected = self.session.query(RunTable).filter_by(**filter_options)
477        if selected.count() != 1:
478            logging.critical('Multiple runs selected - using the first')
479        selected = selected.first()
480        return constants.Status(selected.status)

Return the status (enum) for the run with name 'run_name' (and, optionally, filtering for campaign and sampler by id)

Parameters
  • run_id (int): id of the run
  • campaign (int): ID of the desired Campaign
  • sampler (int): ID of the desired Sampler
Returns
  • enum(Status): Status of the run.
def set_run_statuses(self, run_id_list, status):
482    def set_run_statuses(self, run_id_list, status):
483        """Set the specified 'status' (enum) for all runs in the list run_id_list
484
485        Parameters
486        ----------
487        run_id_list: list of int
488            a list of run ids
489        status: enum(Status)
490            The new status all listed runs should now have
491        """
492        self.session.query(RunTable).filter(
493            RunTable.id.in_(run_id_list)).update(
494                {RunTable.status: status}, synchronize_session='fetch')
495        self.session.commit()

Set the specified 'status' (enum) for all runs in the list run_id_list

Parameters
  • run_id_list (list of int): a list of run ids
  • status (enum(Status)): The new status all listed runs should now have
def campaigns(self):
497    def campaigns(self):
498        """Get list of campaigns for which information is stored in the
499        database.
500
501        Returns
502        -------
503        list
504            Campaign names.
505        """
506
507        return [c.name for c in self.session.query(CampaignTable).all()]

Get list of campaigns for which information is stored in the database.

Returns
  • list: Campaign names.
def get_campaign_id(self, name):
538    def get_campaign_id(self, name):
539        """Return the (database) id corresponding to the campaign with name 'name'.
540
541        Parameters
542        ----------
543        name: str
544            Name of the campaign.
545
546        Returns
547        -------
548        int
549            The id of the campaign with the specified name
550        """
551
552        selected = self.session.query(
553            CampaignTable.name.label(name),
554            CampaignTable.id).filter(CampaignTable.name == name).all()
555        if len(selected) == 0:
556            msg = f"No campaign with name {name} found in campaign database"
557            logger.error(msg)
558            raise RuntimeError(msg)
559        if len(selected) > 1:
560            msg = (
561                f"More than one campaign with name {name} found in"
562                f"campaign database. Database state is compromised."
563            )
564            logger.error(msg)
565            raise RuntimeError(msg)
566        # Return the database ID for the specified campaign
567        return selected[0][1]

Return the (database) id corresponding to the campaign with name 'name'.

Parameters
  • name (str): Name of the campaign.
Returns
  • int: The id of the campaign with the specified name
def get_sampler_id(self, campaign_id):
569    def get_sampler_id(self, campaign_id):
570        """Return the (database) id corresponding to the sampler currently set
571        for the campaign with id 'campaign_id'
572
573        Parameters
574        ----------
575        campaign_id: int
576            ID of the campaign.
577
578        Returns
579        -------
580        int
581            The id of the sampler set for the specified campaign
582        """
583        sampler_id = self.session.get(CampaignTable,campaign_id).sampler
584        return sampler_id

Return the (database) id corresponding to the sampler currently set for the campaign with id 'campaign_id'

Parameters
  • campaign_id (int): ID of the campaign.
Returns
  • int: The id of the sampler set for the specified campaign
def set_sampler(self, campaign_id, sampler_id):
586    def set_sampler(self, campaign_id, sampler_id):
587        """Set specified campaign to be using specified sampler
588
589        Parameters
590        ----------
591        campaign_id: int
592            ID of the campaign.
593        sampler_id: int
594            ID of the sampler.
595        """
596        self.session.get(CampaignTable,campaign_id).sampler = sampler_id
597        self.session.commit()

Set specified campaign to be using specified sampler

Parameters
  • campaign_id (int): ID of the campaign.
  • sampler_id (int): ID of the sampler.
def campaign_dir(self, campaign_name=None):
599    def campaign_dir(self, campaign_name=None):
600        """Get campaign directory for `campaign_name`.
601
602        Parameters
603        ----------
604        campaign_name: str
605            Name of campaign to select
606
607        Returns
608        -------
609        str
610            Path to campaign directory.
611        """
612        return self._get_campaign_info(campaign_name=campaign_name).campaign_dir

Get campaign directory for campaign_name.

Parameters
  • campaign_name (str): Name of campaign to select
Returns
  • str: Path to campaign directory.
def run( self, name, campaign=None, sampler=None, status=None, not_status=None, app_id=None):
662    def run(self, name, campaign=None, sampler=None, status=None, not_status=None, app_id=None):
663        """Get the information for a specified run.
664
665        Parameters
666        ----------
667        name: str
668            Name of run to filter for.
669        campaign:  int or None
670            Campaign id to filter for.
671        sampler: int or None
672            Sampler id to filter for.
673        status: enum(Status) or None
674            Status string to filter for.
675        not_status: enum(Status) or None
676            Exclude runs with this status string
677        app_id: int or None
678            App id to filter for.
679
680        Returns
681        -------
682        dict
683            Containing run information (run_name, params, status, sample,
684            campaign, app)
685        """
686        selected = self._select_runs(
687            name=name,
688            campaign=campaign,
689            sampler=sampler,
690            status=status,
691            not_status=not_status,
692            app_id=app_id)
693        if selected.count() != 1:
694            logging.warning('Multiple runs selected - using the first')
695        selected = selected.first()
696        return self._run_to_dict(selected)

Get the information for a specified run.

Parameters
  • name (str): Name of run to filter for.
  • campaign (int or None): Campaign id to filter for.
  • sampler (int or None): Sampler id to filter for.
  • status (enum(Status) or None): Status string to filter for.
  • not_status (enum(Status) or None): Exclude runs with this status string
  • app_id (int or None): App id to filter for.
Returns
  • dict: Containing run information (run_name, params, status, sample, campaign, app)
def runs( self, campaign=None, sampler=None, status=None, not_status=None, app_id=None):
698    def runs(self, campaign=None, sampler=None, status=None, not_status=None, app_id=None):
699        """A generator to return all run information for selected `campaign` and `sampler`.
700
701        Parameters
702        ----------
703        campaign: int or None
704            Campaign id to filter for.
705        sampler: int or None
706            Sampler id to filter for.
707        status: enum(Status) or None
708            Status string to filter for.
709        not_status: enum(Status) or None
710            Exclude runs with this status string
711        app_id: int or None
712            App id to filter for.
713
714        Yields
715        ------
716        dict
717            Information on each selected run (key = run_name, value = dict of
718            run information fields.), one at a time.
719        """
720        selected = self._select_runs(
721            campaign=campaign,
722            sampler=sampler,
723            status=status,
724            not_status=not_status,
725            app_id=app_id)
726        for r in selected:
727            yield r.id, self._run_to_dict(r)

A generator to return all run information for selected campaign and sampler.

Parameters
  • campaign (int or None): Campaign id to filter for.
  • sampler (int or None): Sampler id to filter for.
  • status (enum(Status) or None): Status string to filter for.
  • not_status (enum(Status) or None): Exclude runs with this status string
  • app_id (int or None): App id to filter for.
Yields
  • dict: Information on each selected run (key = run_name, value = dict of run information fields.), one at a time.
def run_ids( self, campaign=None, sampler=None, status=None, not_status=None, app_id=None):
729    def run_ids(self, campaign=None, sampler=None, status=None, not_status=None, app_id=None):
730        """A generator to return all run IDs for selected `campaign` and `sampler`.
731
732        Parameters
733        ----------
734        campaign: int or None
735            Campaign id to filter for.
736        sampler: int or None
737            Sampler id to filter for.
738        status: enum(Status) or None
739            Status string to filter for.
740        not_status: enum(Status) or None
741            Exclude runs with this status string
742        app_id: int or None
743            App id to filter for.
744
745        Yields
746        ------
747        str
748            run ID for each selected run, one at a time.
749        """
750        selected = self._select_runs(
751            campaign=campaign,
752            sampler=sampler,
753            status=status,
754            not_status=not_status,
755            app_id=app_id)
756        for r in selected:
757            yield r.run_name

A generator to return all run IDs for selected campaign and sampler.

Parameters
  • campaign (int or None): Campaign id to filter for.
  • sampler (int or None): Sampler id to filter for.
  • status (enum(Status) or None): Status string to filter for.
  • not_status (enum(Status) or None): Exclude runs with this status string
  • app_id (int or None): App id to filter for.
Yields
  • str: run ID for each selected run, one at a time.
def get_num_runs(self, campaign=None, sampler=None, status=None, not_status=None):
759    def get_num_runs(self, campaign=None, sampler=None, status=None, not_status=None):
760        """Returns the number of runs matching the filtering criteria.
761
762        Parameters
763        ----------
764        campaign: int or None
765            Campaign id to filter for.
766        sampler: int or None
767            Sampler id to filter for.
768        status: enum(Status) or None
769            Status string to filter for.
770        not_status: enum(Status) or None
771            Exclude runs with this status string
772
773        Returns
774        -------
775        int
776            The number of runs in the database matching the filtering criteria
777
778        """
779        selected = self._select_runs(
780            campaign=campaign,
781            sampler=sampler,
782            status=status,
783            not_status=not_status)
784        return selected.count()

Returns the number of runs matching the filtering criteria.

Parameters
  • campaign (int or None): Campaign id to filter for.
  • sampler (int or None): Sampler id to filter for.
  • status (enum(Status) or None): Status string to filter for.
  • not_status (enum(Status) or None): Exclude runs with this status string
Returns
  • int: The number of runs in the database matching the filtering criteria
def runs_dir(self, campaign_name=None):
786    def runs_dir(self, campaign_name=None):
787        """Get the directory used to store run information for `campaign_name`.
788
789        Parameters
790        ----------
791        campaign_name: str
792            Name of the selected campaign.
793
794        Returns
795        -------
796        str
797            Path containing run outputs.
798        """
799        return self._get_campaign_info(campaign_name=campaign_name).runs_dir

Get the directory used to store run information for campaign_name.

Parameters
  • campaign_name (str): Name of the selected campaign.
Returns
  • str: Path containing run outputs.
def store_result(self, run_id, result, change_status=True):
801    def store_result(self, run_id, result, change_status=True):
802        """Stores results of a simulation inside the RunTable given a run id.
803
804        Parameters
805        ----------
806        run_id: int
807            The id of a run to store the results in. This will be the run with which these
808            results are associated with. Namely the run that has the inputs used to generate
809            these results.
810        result: dict
811            Results in dictionary form. This is the same format as used by the `Decoder`.
812        change_status: bool
813            If set to False will not update the runs' status to COLLATED. This is sometimes
814            useful in scenarios where you want several apps to work on the same runs.
815        """
816        self.commit_counter += 1
817
818        def convert_nonserializable(obj):
819            if isinstance(obj, np.int64):
820                return int(obj)
821            raise TypeError('Unknown type:', type(obj))
822        result_ = result['result']
823        result.pop('result')
824        result.pop('run_info')
825        if change_status:
826            self.session.query(RunTable).\
827                filter(RunTable.id == run_id).\
828                update({'result': json.dumps(result_, default=convert_nonserializable),
829                        'status': constants.Status.COLLATED,
830                        'run_dir': result['rundir']})
831        else:
832            self.session.query(RunTable).\
833                filter(RunTable.id == run_id).\
834                update({'result': json.dumps(result_, default=convert_nonserializable),
835                        'run_dir': result['rundir']})
836        if self.commit_counter % COMMIT_RATE == 0:
837            self.session.commit()

Stores results of a simulation inside the RunTable given a run id.

Parameters
  • run_id (int): The id of a run to store the results in. This will be the run with which these results are associated with. Namely the run that has the inputs used to generate these results.
  • result (dict): Results in dictionary form. This is the same format as used by the Decoder.
  • change_status (bool): If set to False will not update the runs' status to COLLATED. This is sometimes useful in scenarios where you want several apps to work on the same runs.
def store_results(self, app_name, results):
839    def store_results(self, app_name, results):
840        """Stores the results from a given run in the database.
841
842        Parameters
843        ----------
844        run_name: str
845            name of the run
846        results: dict
847            dictionary with the results (from the decoder)
848        """
849        try:
850            app_id = self.session.query(AppTable).filter(AppTable.name == app_name).all()[0].id
851        except IndexError:
852            raise RuntimeError("app with the name {} not found".format(app_name))
853        commit_counter = 0
854        for run_id, result in results:
855            try:
856                self.session.query(RunTable).\
857                    filter(RunTable.id == run_id, RunTable.app == app_id).\
858                    update({'result': json.dumps(result), 'status': constants.Status.COLLATED})
859                commit_counter += 1
860                if commit_counter % COMMIT_RATE == 0:
861                    self.session.commit()
862            except IndexError:
863                raise RuntimeError("no runs with name {} found".format(run_id))
864        self.session.commit()

Stores the results from a given run in the database.

Parameters
  • run_name (str): name of the run
  • results (dict): dictionary with the results (from the decoder)
def get_results( self, app_name, sampler_id, status=<Status.COLLATED: 3>, iteration=-1):
866    def get_results(self, app_name, sampler_id, status=constants.Status.COLLATED, iteration=-1):
867        """Returns the results as a pandas DataFrame.
868
869        Parameters
870        ----------
871        app_name: str
872            Name of the app to return data for.
873        sampler_id: int
874            ID of the sampler.
875        status: STATUS
876            Run status to filter for.
877        iteration: int
878            If a positive integer will return the results for a given iteration only.
879
880        Returns
881        -------
882        DataFrame
883            Will construct a `DataFrame` from the decoder output dictionaries.
884        """
885        try:
886            app_id = self.session.query(AppTable).filter(AppTable.name == app_name).all()[0].id
887        except IndexError:
888            raise RuntimeError("app with the name {} not found".format(app_name))
889        pd_result = {}
890        query = self.session.query(RunTable).\
891            filter(RunTable.app == app_id).\
892            filter(RunTable.sampler == sampler_id).\
893            filter(RunTable.status == status)
894        # if only a specific iteration is requested filter it out
895        if iteration >= 0:
896            query = query.filter(RunTable.iteration == iteration)
897        for row in query:
898            params = {'run_id': row.id}
899            params['iteration'] = row.iteration
900            params = {**params, **json.loads(row.params)}
901            result = json.loads(row.result)
902            pd_dict = {**params, **result}
903            for key in pd_dict.keys():
904                if not isinstance(pd_dict[key], list):
905                    try:
906                        pd_result[(key, 0)].append(pd_dict[key])
907                    except KeyError:
908                        pd_result[(key, 0)] = [pd_dict[key]]
909                else:
910                    for i, elt in enumerate(pd_dict[key]):
911                        try:
912                            pd_result[(key, i)].append(pd_dict[key][i])
913                        except KeyError:
914                            pd_result[(key, i)] = [pd_dict[key][i]]
915        try:
916            return pd.DataFrame(pd_result)
917        except ValueError:
918            raise RuntimeError(
919                'the results received from the database seem to be malformed - commonly because a vector quantity of interest changes dimensionality')

Returns the results as a pandas DataFrame.

Parameters
  • app_name (str): Name of the app to return data for.
  • sampler_id (int): ID of the sampler.
  • status (STATUS): Run status to filter for.
  • iteration (int): If a positive integer will return the results for a given iteration only.
Returns
  • DataFrame: Will construct a DataFrame from the decoder output dictionaries.
def relocate(self, new_path, campaign_name):
921    def relocate(self, new_path, campaign_name):
922        """Update all runs in the db with the new campaign path.
923
924        Parameters
925        ----------
926        new_path: str
927            new runs directory
928        campaign_name: str
929            name of the campaign
930        """
931        campaign_id = self.get_campaign_id(campaign_name)
932        campaign_info = self.session.query(CampaignTable).\
933            filter(CampaignTable.id == campaign_id).first()
934        path, runs_dir = os.path.split(campaign_info.runs_dir)
935        self.session.query(CampaignTable).\
936            filter(CampaignTable.id == campaign_id).\
937            update({'campaign_dir': str(new_path),
938                    'runs_dir': str(os.path.join(new_path, runs_dir))})
939        self.session.commit()

Update all runs in the db with the new campaign path.

Parameters
  • new_path (str): new runs directory
  • campaign_name (str): name of the campaign
def dump(self):
941    def dump(self):
942        """Dump the database as JSON for debugging purposes.
943
944        Returns
945        -------
946        dict
947            A database dump in JSON format.
948        """
949        meta = MetaData()
950        meta.reflect(bind=self.engine)
951        result = {}
952
953        # Create a connection from the engine
954        with self.engine.connect() as connection:
955            for table in meta.sorted_tables:
956                try:
957                    query_result = connection.execute(table.select()).fetchall()
958                    result[table.name] = [dict(row) for row in query_result]
959                except Exception as e:
960                    result[table.name] = str(e)
961
962        return json.dumps(result)

Dump the database as JSON for debugging purposes.

Returns
  • dict: A database dump in JSON format.