easyvvuq.db.sql
Provides class that allows access to an SQL Database that serves as the back-end to EasyVVUQ.
1"""Provides class that allows access to an SQL Database that serves as the back-end to EasyVVUQ. 2 3 4""" 5import os 6import json 7import logging 8import pandas as pd 9import numpy as np 10from sqlalchemy.sql import case 11from sqlalchemy import create_engine, Column, Integer, String, ForeignKey 12from sqlalchemy.orm import sessionmaker, declarative_base 13from sqlalchemy import MetaData 14from sqlalchemy import text 15from sqlalchemy.engine import Engine 16from sqlalchemy import event 17from .base import BaseCampaignDB 18from easyvvuq import constants 19from easyvvuq import ParamsSpecification 20from easyvvuq.utils.helpers import easyvvuq_serialize, easyvvuq_deserialize 21 22 23__copyright__ = """ 24 25 Copyright 2018 Robin A. Richardson, David W. Wright 26 27 This file is part of EasyVVUQ 28 29 EasyVVUQ is free software: you can redistribute it and/or modify 30 it under the terms of the Lesser GNU General Public License as published by 31 the Free Software Foundation, either version 3 of the License, or 32 (at your option) any later version. 33 34 EasyVVUQ is distributed in the hope that it will be useful, 35 but WITHOUT ANY WARRANTY; without even the implied warranty of 36 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 37 Lesser GNU General Public License for more details. 38 39 You should have received a copy of the Lesser GNU General Public License 40 along with this program. If not, see <https://www.gnu.org/licenses/>. 41 42""" 43__license__ = "LGPL" 44 45COMMIT_RATE = 50000 46 47logger = logging.getLogger(__name__) 48 49Base = declarative_base() 50 51 52class DBInfoTable(Base): 53 """An SQLAlchemy schema for the database information table. 54 """ 55 __tablename__ = 'db_info' 56 id = Column(Integer, primary_key=True) 57 next_run = Column(Integer) 58 59 60class CampaignTable(Base): 61 """An SQLAlchemy schema for the campaign information table. 62 """ 63 __tablename__ = 'campaign_info' 64 id = Column(Integer, primary_key=True) 65 name = Column(String, unique=True) 66 easyvvuq_version = Column(String) 67 campaign_dir_prefix = Column(String) 68 campaign_dir = Column(String) 69 runs_dir = Column(String) 70 sampler = Column(Integer, ForeignKey('sampler.id')) 71 active_app = Column(Integer, ForeignKey('app.id')) 72 73 74class AppTable(Base): 75 """An SQLAlchemy schema for the app table. 76 """ 77 __tablename__ = 'app' 78 id = Column(Integer, primary_key=True) 79 name = Column(String, unique=True) 80 params = Column(String) 81 actions = Column(String) 82 83 84class RunTable(Base): 85 """An SQLAlchemy schema for the run table. 86 """ 87 __tablename__ = 'run' 88 id = Column(Integer, primary_key=True) 89 run_name = Column(String, index=True) 90 app = Column(Integer, ForeignKey('app.id')) 91 params = Column(String) 92 status = Column(Integer) 93 run_dir = Column(String) 94 result = Column(String, default="{}") 95 execution_info = Column(String, default="{}") 96 campaign = Column(Integer, ForeignKey('campaign_info.id')) 97 sampler = Column(Integer, ForeignKey('sampler.id')) 98 iteration = Column(Integer, default=0) 99 100 101class SamplerTable(Base): 102 """An SQLAlchemy schema for the run table. 103 """ 104 __tablename__ = 'sampler' 105 id = Column(Integer, primary_key=True) 106 sampler = Column(String) 107 108 109@event.listens_for(Engine, "connect") 110def set_sqlite_pragma(dbapi_connection, connection_record): 111 cursor = dbapi_connection.cursor() 112 cursor.execute("PRAGMA synchronous = OFF") 113 cursor.execute("PRAGMA journal_mode = OFF") 114 cursor.close() 115 116 117class CampaignDB(BaseCampaignDB): 118 """An interface between the campaign database and the campaign. 119 120 Parameters 121 ---------- 122 location: str 123 database URI as needed by SQLAlchemy 124 """ 125 126 def __init__(self, location=None): 127 if location is not None: 128 self.engine = create_engine(location) 129 else: 130 self.engine = create_engine('sqlite://') 131 self.commit_counter = 0 132 session_maker = sessionmaker(bind=self.engine) 133 self.session = session_maker() 134 Base.metadata.create_all(self.engine, checkfirst=True) 135 136 def resume_campaign(self, name): 137 """Resumes campaign. 138 139 Parameters 140 ---------- 141 name: str 142 Name of the Campaign to resume. Must already exist in the database. 143 """ 144 info = self.session.query( 145 CampaignTable).filter_by(name=name).first() 146 if info is None: 147 raise ValueError('Campaign with the given name not found.') 148 db_info = self.session.query(DBInfoTable).first() 149 self._next_run = db_info.next_run 150 151 def create_campaign(self, info): 152 """Creates a new campaign in the database. 153 154 Parameters 155 ---------- 156 info: CampaignInfo 157 This `easyvvuq.data_structs.CampaignInfo` will contain information 158 needed to construct the Campaign table. 159 """ 160 is_db_empty = (self.session.query(CampaignTable).first() is None) 161 version_check = self.session.query( 162 CampaignTable).filter(CampaignTable.easyvvuq_version != info.easyvvuq_version).all() 163 if (not is_db_empty) and (len(version_check) != 0): 164 raise RuntimeError('Database contains campaign created with an incompatible' + 165 ' version of EasyVVUQ!') 166 self._next_run = 1 167 self.session.add(CampaignTable(**info.to_dict(flatten=True))) 168 self.session.add(DBInfoTable(next_run=self._next_run)) 169 self.session.commit() 170 171 def get_active_app(self): 172 """Returns active app table. 173 174 Returns 175 ------- 176 AppTable 177 """ 178 return self.session.query(AppTable, CampaignTable).filter( 179 AppTable.id == CampaignTable.active_app).first() 180 181 def campaign_exists(self, name): 182 """Check if campaign specified by that name already exists. 183 184 Parameters 185 ---------- 186 name: str 187 188 Returns 189 ------- 190 bool 191 True if such a campaign already exists, False otherwise 192 """ 193 result = self.session.query(CampaignTable).filter( 194 CampaignTable.name == name).all() 195 return len(result) > 0 196 197 def app(self, name=None): 198 """Get app information. Specific applications selected by `name`, 199 otherwise first entry in database 'app' selected. 200 201 Parameters 202 ---------- 203 name : str or None 204 Name of selected app, if `None` given then first app will be 205 selected. 206 207 Returns 208 ------- 209 dict 210 Information about the application. 211 """ 212 213 if name is None: 214 selected = self.session.query(AppTable).all() 215 else: 216 selected = self.session.query(AppTable).filter_by(name=name).all() 217 218 if len(selected) == 0: 219 message = f'No entry for app: ({name}).' 220 logger.critical(message) 221 raise RuntimeError(message) 222 223 selected_app = selected[0] 224 225 app_dict = { 226 'id': selected_app.id, 227 'name': selected_app.name, 228 'params': ParamsSpecification.deserialize(selected_app.params), 229 'actions': selected_app.actions, 230 } 231 232 return app_dict 233 234 def set_active_app(self, name): 235 """Set an app specified by name as active. 236 237 Parameters 238 ---------- 239 name: str 240 name of the app to set as active 241 """ 242 selected = self.session.query(AppTable).filter_by(name=name).all() 243 if len(selected) == 0: 244 raise RuntimeError('no such app - {}'.format(name)) 245 assert (not (len(selected) > 1)) 246 app = selected[0] 247 self.session.query(CampaignTable).update({'active_app': app.id}) 248 self.session.commit() 249 250 def add_app(self, app_info): 251 """Add application to the 'app' table. 252 253 Parameters 254 ---------- 255 app_info: AppInfo 256 Application definition. 257 """ 258 259 # Check that no app with same name exists 260 name = app_info.name 261 selected = self.session.query(AppTable).filter_by(name=name).all() 262 if len(selected) > 0: 263 message = ( 264 f'There is already an app in this database with name {name}' 265 f'(found {len(selected)}).' 266 ) 267 logger.critical(message) 268 raise RuntimeError(message) 269 270 app_dict = app_info.to_dict(flatten=True) 271 272 db_entry = AppTable(**app_dict) 273 self.session.add(db_entry) 274 self.session.commit() 275 276 def replace_actions(self, app_name, actions): 277 """Replace actions for an app with a given name. 278 279 Parameters 280 ---------- 281 app_name: str 282 Name of the app. 283 actions: Actions 284 `Actions` instance, will replace the current `Actions` of an app. 285 """ 286 self.session.query(AppTable).filter_by(name=app_name).update( 287 {'actions': easyvvuq_serialize(actions)}) 288 self.session.commit() 289 290 def add_sampler(self, sampler_element): 291 """Add new Sampler to the 'sampler' table. 292 293 Parameters 294 ---------- 295 sampler_element: Sampler 296 An EasyVVUQ sampler. 297 298 Returns 299 ------- 300 int 301 The sampler `id` in the database. 302 """ 303 db_entry = SamplerTable(sampler=easyvvuq_serialize(sampler_element)) 304 305 self.session.add(db_entry) 306 self.session.commit() 307 308 return db_entry.id 309 310 def update_sampler(self, sampler_id, sampler_element): 311 """Update the state of the Sampler with id 'sampler_id' to 312 that in the passed 'sampler_element' 313 314 Parameters 315 ---------- 316 sampler_id: int 317 The id of the sampler in the db to update 318 sampler_element: Sampler 319 The sampler that should be used as the new state 320 """ 321 322 selected = self.session.get(SamplerTable,sampler_id) 323 selected.sampler = easyvvuq_serialize(sampler_element) 324 self.session.commit() 325 326 def resurrect_sampler(self, sampler_id): 327 """Return the sampler object corresponding to id sampler_id in the database. 328 It is deserialized from the state stored in the database. 329 330 Parameters 331 ---------- 332 sampler_id: int 333 The id of the sampler to resurrect 334 335 Returns 336 ------- 337 Sampler 338 The 'live' sampler object, deserialized from the state in the db 339 """ 340 try: 341 serialized_sampler = self.session.get(SamplerTable,sampler_id).sampler 342 sampler = easyvvuq_deserialize(serialized_sampler.encode('utf-8')) 343 except AttributeError: 344 sampler = None 345 return sampler 346 347 def resurrect_app(self, app_name): 348 """Return the 'live' encoder, decoder and collation objects corresponding to the app with 349 name 'app_name' in the database. They are deserialized from the states previously 350 stored in the database. 351 352 Parameters 353 ---------- 354 app_name: string 355 Name of the app to resurrect 356 357 Returns 358 ------- 359 Actions 360 The 'live' `Actions` object associated with this app. Used to execute the simulation 361 associated with the app as well as do any pre- and post-processing. 362 """ 363 app_info = self.app(app_name) 364 actions = easyvvuq_deserialize(app_info['actions']) 365 return actions 366 367 def add_runs(self, run_info_list=None, run_prefix='run_', iteration=0): 368 """Add list of runs to the `runs` table in the database. 369 370 Parameters 371 ---------- 372 run_info_list: List of RunInfo objects 373 Each RunInfo object contains relevant run fields: params, status (where in the 374 EasyVVUQ workflow is this RunTable), campaign (id number), sample, app 375 run_prefix: str 376 Prefix for run name 377 iteration: int 378 Iteration number used by iterative workflows. For example, MCMC. Can be left 379 as default zero in other cases. 380 """ 381 # Add all runs to RunTable 382 commit_counter = 0 383 for run_info in run_info_list: 384 run_info.run_name = f"{run_prefix}{self._next_run}" 385 run_info.iteration = iteration 386 run = RunTable(**run_info.to_dict(flatten=True)) 387 self.session.add(run) 388 self._next_run += 1 389 commit_counter += 1 390 if commit_counter % COMMIT_RATE == 0: 391 self.session.commit() 392 # Update run and ensemble counters in db 393 db_info = self.session.query(DBInfoTable).first() 394 db_info.next_run = self._next_run 395 self.session.commit() 396 397 @staticmethod 398 def _run_to_dict(run_row): 399 """Convert the provided row from 'runs' table into a dictionary 400 401 Parameters 402 ---------- 403 run_row: RunTable 404 Information on a particular run in the database. 405 406 Returns 407 ------- 408 dict 409 Contains run information (keys = run_name, params, status, sample, 410 campaign and app) 411 """ 412 413 run_info = { 414 'run_name': run_row.run_name, 415 'params': json.loads(run_row.params), 416 'status': constants.Status(run_row.status), 417 'sampler': run_row.sampler, 418 'campaign': run_row.campaign, 419 'app': run_row.app, 420 'result': run_row.result, 421 'run_dir': run_row.run_dir 422 } 423 424 return run_info 425 426 def set_dir_for_run(self, run_name, run_dir, campaign=None, sampler=None): 427 """Set the 'run_dir' path for the specified run in the database. 428 429 Parameters 430 ---------- 431 run_name: str 432 Name of run to filter for. 433 run_dir: str 434 Directory path associated to set for this run. 435 campaign: int or None 436 Campaign id to filter for. 437 sampler: int or None 438 Sample id to filter for. 439 """ 440 filter_options = {'run_name': run_name} 441 if campaign: 442 filter_options['campaign'] = campaign 443 if sampler: 444 filter_options['sampler'] = sampler 445 selected = self.session.query(RunTable).filter_by(**filter_options) 446 if selected.count() != 1: 447 logging.critical('Multiple runs selected - using the first') 448 selected = selected.first() 449 selected.run_dir = run_dir 450 self.session.commit() 451 452 def get_run_status(self, run_id, campaign=None, sampler=None): 453 """Return the status (enum) for the run with name 'run_name' (and, optionally, 454 filtering for campaign and sampler by id) 455 456 Parameters 457 ---------- 458 run_id: int 459 id of the run 460 campaign: int 461 ID of the desired Campaign 462 sampler: int 463 ID of the desired Sampler 464 465 Returns 466 ------- 467 enum(Status) 468 Status of the run. 469 """ 470 filter_options = {'id': run_id} 471 if campaign: 472 filter_options['campaign'] = campaign 473 if sampler: 474 filter_options['sampler'] = sampler 475 selected = self.session.query(RunTable).filter_by(**filter_options) 476 if selected.count() != 1: 477 logging.critical('Multiple runs selected - using the first') 478 selected = selected.first() 479 return constants.Status(selected.status) 480 481 def set_run_statuses(self, run_id_list, status): 482 """Set the specified 'status' (enum) for all runs in the list run_id_list 483 484 Parameters 485 ---------- 486 run_id_list: list of int 487 a list of run ids 488 status: enum(Status) 489 The new status all listed runs should now have 490 """ 491 self.session.query(RunTable).filter( 492 RunTable.id.in_(run_id_list)).update( 493 {RunTable.status: status}, synchronize_session='fetch') 494 self.session.commit() 495 496 def campaigns(self): 497 """Get list of campaigns for which information is stored in the 498 database. 499 500 Returns 501 ------- 502 list 503 Campaign names. 504 """ 505 506 return [c.name for c in self.session.query(CampaignTable).all()] 507 508 def _get_campaign_info(self, campaign_name=None): 509 """Retrieves Campaign info based on name. 510 511 Parameters 512 ---------- 513 campaign_name: str 514 Name of campaign to select. 515 516 Returns 517 ------- 518 SQLAlchemy query for campaign with this name. 519 """ 520 assert (isinstance(campaign_name, str) or campaign_name is None) 521 query = self.session.query(CampaignTable) 522 if campaign_name is None: 523 campaign_info = query 524 else: 525 campaign_info = query.filter_by(name=campaign_name).all() 526 if campaign_name is not None: 527 if len(campaign_info) > 1: 528 logger.warning( 529 'More than one campaign selected - using first one.') 530 elif len(campaign_info) == 0: 531 message = 'No campaign available.' 532 logger.critical(message) 533 raise RuntimeError(message) 534 return campaign_info[0] 535 return campaign_info.first() 536 537 def get_campaign_id(self, name): 538 """Return the (database) id corresponding to the campaign with name 'name'. 539 540 Parameters 541 ---------- 542 name: str 543 Name of the campaign. 544 545 Returns 546 ------- 547 int 548 The id of the campaign with the specified name 549 """ 550 551 selected = self.session.query( 552 CampaignTable.name.label(name), 553 CampaignTable.id).filter(CampaignTable.name == name).all() 554 if len(selected) == 0: 555 msg = f"No campaign with name {name} found in campaign database" 556 logger.error(msg) 557 raise RuntimeError(msg) 558 if len(selected) > 1: 559 msg = ( 560 f"More than one campaign with name {name} found in" 561 f"campaign database. Database state is compromised." 562 ) 563 logger.error(msg) 564 raise RuntimeError(msg) 565 # Return the database ID for the specified campaign 566 return selected[0][1] 567 568 def get_sampler_id(self, campaign_id): 569 """Return the (database) id corresponding to the sampler currently set 570 for the campaign with id 'campaign_id' 571 572 Parameters 573 ---------- 574 campaign_id: int 575 ID of the campaign. 576 577 Returns 578 ------- 579 int 580 The id of the sampler set for the specified campaign 581 """ 582 sampler_id = self.session.get(CampaignTable,campaign_id).sampler 583 return sampler_id 584 585 def set_sampler(self, campaign_id, sampler_id): 586 """Set specified campaign to be using specified sampler 587 588 Parameters 589 ---------- 590 campaign_id: int 591 ID of the campaign. 592 sampler_id: int 593 ID of the sampler. 594 """ 595 self.session.get(CampaignTable,campaign_id).sampler = sampler_id 596 self.session.commit() 597 598 def campaign_dir(self, campaign_name=None): 599 """Get campaign directory for `campaign_name`. 600 601 Parameters 602 ---------- 603 campaign_name: str 604 Name of campaign to select 605 606 Returns 607 ------- 608 str 609 Path to campaign directory. 610 """ 611 return self._get_campaign_info(campaign_name=campaign_name).campaign_dir 612 613 def _select_runs( 614 self, 615 name=None, 616 campaign=None, 617 sampler=None, 618 status=None, 619 not_status=None, 620 app_id=None): 621 """Select all runs in the database which match the input criteria. 622 623 Parameters 624 ---------- 625 name: str 626 Name of run to filter for. 627 campaign: int or None 628 Campaign id to filter for. 629 sampler: int or None 630 Sampler id to filter for. 631 status: enum(Status) or None 632 Status string to filter for. 633 not_status: enum(Status) or None 634 Exclude runs with this status string 635 app_id: int or None 636 App id to filter for. 637 638 Returns 639 ------- 640 sqlalchemy.orm.query.Query 641 Selected runs from the database run table. 642 """ 643 filter_options = {} 644 if name: 645 filter_options['run_name'] = name 646 if campaign: 647 filter_options['campaign'] = campaign 648 if sampler: 649 filter_options['sampler'] = sampler 650 if status: 651 filter_options['status'] = status 652 if app_id: 653 filter_options['app'] = app_id 654 655 # Note that for some databases this can be sped up with a yield_per(), but not all 656 selected = self.session.query(RunTable).filter_by( 657 **filter_options).filter(RunTable.status != not_status) 658 659 return selected 660 661 def run(self, name, campaign=None, sampler=None, status=None, not_status=None, app_id=None): 662 """Get the information for a specified run. 663 664 Parameters 665 ---------- 666 name: str 667 Name of run to filter for. 668 campaign: int or None 669 Campaign id to filter for. 670 sampler: int or None 671 Sampler id to filter for. 672 status: enum(Status) or None 673 Status string to filter for. 674 not_status: enum(Status) or None 675 Exclude runs with this status string 676 app_id: int or None 677 App id to filter for. 678 679 Returns 680 ------- 681 dict 682 Containing run information (run_name, params, status, sample, 683 campaign, app) 684 """ 685 selected = self._select_runs( 686 name=name, 687 campaign=campaign, 688 sampler=sampler, 689 status=status, 690 not_status=not_status, 691 app_id=app_id) 692 if selected.count() != 1: 693 logging.warning('Multiple runs selected - using the first') 694 selected = selected.first() 695 return self._run_to_dict(selected) 696 697 def runs(self, campaign=None, sampler=None, status=None, not_status=None, app_id=None): 698 """A generator to return all run information for selected `campaign` and `sampler`. 699 700 Parameters 701 ---------- 702 campaign: int or None 703 Campaign id to filter for. 704 sampler: int or None 705 Sampler id to filter for. 706 status: enum(Status) or None 707 Status string to filter for. 708 not_status: enum(Status) or None 709 Exclude runs with this status string 710 app_id: int or None 711 App id to filter for. 712 713 Yields 714 ------ 715 dict 716 Information on each selected run (key = run_name, value = dict of 717 run information fields.), one at a time. 718 """ 719 selected = self._select_runs( 720 campaign=campaign, 721 sampler=sampler, 722 status=status, 723 not_status=not_status, 724 app_id=app_id) 725 for r in selected: 726 yield r.id, self._run_to_dict(r) 727 728 def run_ids(self, campaign=None, sampler=None, status=None, not_status=None, app_id=None): 729 """A generator to return all run IDs for selected `campaign` and `sampler`. 730 731 Parameters 732 ---------- 733 campaign: int or None 734 Campaign id to filter for. 735 sampler: int or None 736 Sampler id to filter for. 737 status: enum(Status) or None 738 Status string to filter for. 739 not_status: enum(Status) or None 740 Exclude runs with this status string 741 app_id: int or None 742 App id to filter for. 743 744 Yields 745 ------ 746 str 747 run ID for each selected run, one at a time. 748 """ 749 selected = self._select_runs( 750 campaign=campaign, 751 sampler=sampler, 752 status=status, 753 not_status=not_status, 754 app_id=app_id) 755 for r in selected: 756 yield r.run_name 757 758 def get_num_runs(self, campaign=None, sampler=None, status=None, not_status=None): 759 """Returns the number of runs matching the filtering criteria. 760 761 Parameters 762 ---------- 763 campaign: int or None 764 Campaign id to filter for. 765 sampler: int or None 766 Sampler id to filter for. 767 status: enum(Status) or None 768 Status string to filter for. 769 not_status: enum(Status) or None 770 Exclude runs with this status string 771 772 Returns 773 ------- 774 int 775 The number of runs in the database matching the filtering criteria 776 777 """ 778 selected = self._select_runs( 779 campaign=campaign, 780 sampler=sampler, 781 status=status, 782 not_status=not_status) 783 return selected.count() 784 785 def runs_dir(self, campaign_name=None): 786 """Get the directory used to store run information for `campaign_name`. 787 788 Parameters 789 ---------- 790 campaign_name: str 791 Name of the selected campaign. 792 793 Returns 794 ------- 795 str 796 Path containing run outputs. 797 """ 798 return self._get_campaign_info(campaign_name=campaign_name).runs_dir 799 800 def store_result(self, run_id, result, change_status=True): 801 """Stores results of a simulation inside the RunTable given a run id. 802 803 Parameters 804 ---------- 805 run_id: int 806 The id of a run to store the results in. This will be the run with which these 807 results are associated with. Namely the run that has the inputs used to generate 808 these results. 809 result: dict 810 Results in dictionary form. This is the same format as used by the `Decoder`. 811 change_status: bool 812 If set to False will not update the runs' status to COLLATED. This is sometimes 813 useful in scenarios where you want several apps to work on the same runs. 814 """ 815 self.commit_counter += 1 816 817 def convert_nonserializable(obj): 818 if isinstance(obj, np.int64): 819 return int(obj) 820 raise TypeError('Unknown type:', type(obj)) 821 result_ = result['result'] 822 result.pop('result') 823 result.pop('run_info') 824 if change_status: 825 self.session.query(RunTable).\ 826 filter(RunTable.id == run_id).\ 827 update({'result': json.dumps(result_, default=convert_nonserializable), 828 'status': constants.Status.COLLATED, 829 'run_dir': result['rundir']}) 830 else: 831 self.session.query(RunTable).\ 832 filter(RunTable.id == run_id).\ 833 update({'result': json.dumps(result_, default=convert_nonserializable), 834 'run_dir': result['rundir']}) 835 if self.commit_counter % COMMIT_RATE == 0: 836 self.session.commit() 837 838 def store_results(self, app_name, results): 839 """Stores the results from a given run in the database. 840 841 Parameters 842 ---------- 843 run_name: str 844 name of the run 845 results: dict 846 dictionary with the results (from the decoder) 847 """ 848 try: 849 app_id = self.session.query(AppTable).filter(AppTable.name == app_name).all()[0].id 850 except IndexError: 851 raise RuntimeError("app with the name {} not found".format(app_name)) 852 commit_counter = 0 853 for run_id, result in results: 854 try: 855 self.session.query(RunTable).\ 856 filter(RunTable.id == run_id, RunTable.app == app_id).\ 857 update({'result': json.dumps(result), 'status': constants.Status.COLLATED}) 858 commit_counter += 1 859 if commit_counter % COMMIT_RATE == 0: 860 self.session.commit() 861 except IndexError: 862 raise RuntimeError("no runs with name {} found".format(run_id)) 863 self.session.commit() 864 865 def get_results(self, app_name, sampler_id, status=constants.Status.COLLATED, iteration=-1): 866 """Returns the results as a pandas DataFrame. 867 868 Parameters 869 ---------- 870 app_name: str 871 Name of the app to return data for. 872 sampler_id: int 873 ID of the sampler. 874 status: STATUS 875 Run status to filter for. 876 iteration: int 877 If a positive integer will return the results for a given iteration only. 878 879 Returns 880 ------- 881 DataFrame 882 Will construct a `DataFrame` from the decoder output dictionaries. 883 """ 884 try: 885 app_id = self.session.query(AppTable).filter(AppTable.name == app_name).all()[0].id 886 except IndexError: 887 raise RuntimeError("app with the name {} not found".format(app_name)) 888 pd_result = {} 889 query = self.session.query(RunTable).\ 890 filter(RunTable.app == app_id).\ 891 filter(RunTable.sampler == sampler_id).\ 892 filter(RunTable.status == status) 893 # if only a specific iteration is requested filter it out 894 if iteration >= 0: 895 query = query.filter(RunTable.iteration == iteration) 896 for row in query: 897 params = {'run_id': row.id} 898 params['iteration'] = row.iteration 899 params = {**params, **json.loads(row.params)} 900 result = json.loads(row.result) 901 pd_dict = {**params, **result} 902 for key in pd_dict.keys(): 903 if not isinstance(pd_dict[key], list): 904 try: 905 pd_result[(key, 0)].append(pd_dict[key]) 906 except KeyError: 907 pd_result[(key, 0)] = [pd_dict[key]] 908 else: 909 for i, elt in enumerate(pd_dict[key]): 910 try: 911 pd_result[(key, i)].append(pd_dict[key][i]) 912 except KeyError: 913 pd_result[(key, i)] = [pd_dict[key][i]] 914 try: 915 return pd.DataFrame(pd_result) 916 except ValueError: 917 raise RuntimeError( 918 'the results received from the database seem to be malformed - commonly because a vector quantity of interest changes dimensionality') 919 920 def relocate(self, new_path, campaign_name): 921 """Update all runs in the db with the new campaign path. 922 923 Parameters 924 ---------- 925 new_path: str 926 new runs directory 927 campaign_name: str 928 name of the campaign 929 """ 930 campaign_id = self.get_campaign_id(campaign_name) 931 campaign_info = self.session.query(CampaignTable).\ 932 filter(CampaignTable.id == campaign_id).first() 933 path, runs_dir = os.path.split(campaign_info.runs_dir) 934 self.session.query(CampaignTable).\ 935 filter(CampaignTable.id == campaign_id).\ 936 update({'campaign_dir': str(new_path), 937 'runs_dir': str(os.path.join(new_path, runs_dir))}) 938 self.session.commit() 939 940 def dump(self): 941 """Dump the database as JSON for debugging purposes. 942 943 Returns 944 ------- 945 dict 946 A database dump in JSON format. 947 """ 948 meta = MetaData() 949 meta.reflect(bind=self.engine) 950 result = {} 951 952 # Create a connection from the engine 953 with self.engine.connect() as connection: 954 for table in meta.sorted_tables: 955 try: 956 query_result = connection.execute(table.select()).fetchall() 957 result[table.name] = [dict(row) for row in query_result] 958 except Exception as e: 959 result[table.name] = str(e) 960 961 return json.dumps(result) 962
The base class of the class hierarchy.
When called, it accepts no arguments and returns a new featureless instance that has no instance attributes and cannot be given any.
2167def _declarative_constructor(self: Any, **kwargs: Any) -> None: 2168 """A simple constructor that allows initialization from kwargs. 2169 2170 Sets attributes on the constructed instance using the names and 2171 values in ``kwargs``. 2172 2173 Only keys that are present as 2174 attributes of the instance's class are allowed. These could be, 2175 for example, any mapped columns or relationships. 2176 """ 2177 cls_ = type(self) 2178 for k in kwargs: 2179 if not hasattr(cls_, k): 2180 raise TypeError( 2181 "%r is an invalid keyword argument for %s" % (k, cls_.__name__) 2182 ) 2183 setattr(self, k, kwargs[k])
A simple constructor that allows initialization from kwargs.
Sets attributes on the constructed instance using the names and
values in kwargs.
Only keys that are present as attributes of the instance's class are allowed. These could be, for example, any mapped columns or relationships.
53class DBInfoTable(Base): 54 """An SQLAlchemy schema for the database information table. 55 """ 56 __tablename__ = 'db_info' 57 id = Column(Integer, primary_key=True) 58 next_run = Column(Integer)
An SQLAlchemy schema for the database information table.
A simple constructor that allows initialization from kwargs.
Sets attributes on the constructed instance using the names and
values in kwargs.
Only keys that are present as attributes of the instance's class are allowed. These could be, for example, any mapped columns or relationships.
61class CampaignTable(Base): 62 """An SQLAlchemy schema for the campaign information table. 63 """ 64 __tablename__ = 'campaign_info' 65 id = Column(Integer, primary_key=True) 66 name = Column(String, unique=True) 67 easyvvuq_version = Column(String) 68 campaign_dir_prefix = Column(String) 69 campaign_dir = Column(String) 70 runs_dir = Column(String) 71 sampler = Column(Integer, ForeignKey('sampler.id')) 72 active_app = Column(Integer, ForeignKey('app.id'))
An SQLAlchemy schema for the campaign information table.
A simple constructor that allows initialization from kwargs.
Sets attributes on the constructed instance using the names and
values in kwargs.
Only keys that are present as attributes of the instance's class are allowed. These could be, for example, any mapped columns or relationships.
75class AppTable(Base): 76 """An SQLAlchemy schema for the app table. 77 """ 78 __tablename__ = 'app' 79 id = Column(Integer, primary_key=True) 80 name = Column(String, unique=True) 81 params = Column(String) 82 actions = Column(String)
An SQLAlchemy schema for the app table.
A simple constructor that allows initialization from kwargs.
Sets attributes on the constructed instance using the names and
values in kwargs.
Only keys that are present as attributes of the instance's class are allowed. These could be, for example, any mapped columns or relationships.
85class RunTable(Base): 86 """An SQLAlchemy schema for the run table. 87 """ 88 __tablename__ = 'run' 89 id = Column(Integer, primary_key=True) 90 run_name = Column(String, index=True) 91 app = Column(Integer, ForeignKey('app.id')) 92 params = Column(String) 93 status = Column(Integer) 94 run_dir = Column(String) 95 result = Column(String, default="{}") 96 execution_info = Column(String, default="{}") 97 campaign = Column(Integer, ForeignKey('campaign_info.id')) 98 sampler = Column(Integer, ForeignKey('sampler.id')) 99 iteration = Column(Integer, default=0)
An SQLAlchemy schema for the run table.
A simple constructor that allows initialization from kwargs.
Sets attributes on the constructed instance using the names and
values in kwargs.
Only keys that are present as attributes of the instance's class are allowed. These could be, for example, any mapped columns or relationships.
102class SamplerTable(Base): 103 """An SQLAlchemy schema for the run table. 104 """ 105 __tablename__ = 'sampler' 106 id = Column(Integer, primary_key=True) 107 sampler = Column(String)
An SQLAlchemy schema for the run table.
A simple constructor that allows initialization from kwargs.
Sets attributes on the constructed instance using the names and
values in kwargs.
Only keys that are present as attributes of the instance's class are allowed. These could be, for example, any mapped columns or relationships.
118class CampaignDB(BaseCampaignDB): 119 """An interface between the campaign database and the campaign. 120 121 Parameters 122 ---------- 123 location: str 124 database URI as needed by SQLAlchemy 125 """ 126 127 def __init__(self, location=None): 128 if location is not None: 129 self.engine = create_engine(location) 130 else: 131 self.engine = create_engine('sqlite://') 132 self.commit_counter = 0 133 session_maker = sessionmaker(bind=self.engine) 134 self.session = session_maker() 135 Base.metadata.create_all(self.engine, checkfirst=True) 136 137 def resume_campaign(self, name): 138 """Resumes campaign. 139 140 Parameters 141 ---------- 142 name: str 143 Name of the Campaign to resume. Must already exist in the database. 144 """ 145 info = self.session.query( 146 CampaignTable).filter_by(name=name).first() 147 if info is None: 148 raise ValueError('Campaign with the given name not found.') 149 db_info = self.session.query(DBInfoTable).first() 150 self._next_run = db_info.next_run 151 152 def create_campaign(self, info): 153 """Creates a new campaign in the database. 154 155 Parameters 156 ---------- 157 info: CampaignInfo 158 This `easyvvuq.data_structs.CampaignInfo` will contain information 159 needed to construct the Campaign table. 160 """ 161 is_db_empty = (self.session.query(CampaignTable).first() is None) 162 version_check = self.session.query( 163 CampaignTable).filter(CampaignTable.easyvvuq_version != info.easyvvuq_version).all() 164 if (not is_db_empty) and (len(version_check) != 0): 165 raise RuntimeError('Database contains campaign created with an incompatible' + 166 ' version of EasyVVUQ!') 167 self._next_run = 1 168 self.session.add(CampaignTable(**info.to_dict(flatten=True))) 169 self.session.add(DBInfoTable(next_run=self._next_run)) 170 self.session.commit() 171 172 def get_active_app(self): 173 """Returns active app table. 174 175 Returns 176 ------- 177 AppTable 178 """ 179 return self.session.query(AppTable, CampaignTable).filter( 180 AppTable.id == CampaignTable.active_app).first() 181 182 def campaign_exists(self, name): 183 """Check if campaign specified by that name already exists. 184 185 Parameters 186 ---------- 187 name: str 188 189 Returns 190 ------- 191 bool 192 True if such a campaign already exists, False otherwise 193 """ 194 result = self.session.query(CampaignTable).filter( 195 CampaignTable.name == name).all() 196 return len(result) > 0 197 198 def app(self, name=None): 199 """Get app information. Specific applications selected by `name`, 200 otherwise first entry in database 'app' selected. 201 202 Parameters 203 ---------- 204 name : str or None 205 Name of selected app, if `None` given then first app will be 206 selected. 207 208 Returns 209 ------- 210 dict 211 Information about the application. 212 """ 213 214 if name is None: 215 selected = self.session.query(AppTable).all() 216 else: 217 selected = self.session.query(AppTable).filter_by(name=name).all() 218 219 if len(selected) == 0: 220 message = f'No entry for app: ({name}).' 221 logger.critical(message) 222 raise RuntimeError(message) 223 224 selected_app = selected[0] 225 226 app_dict = { 227 'id': selected_app.id, 228 'name': selected_app.name, 229 'params': ParamsSpecification.deserialize(selected_app.params), 230 'actions': selected_app.actions, 231 } 232 233 return app_dict 234 235 def set_active_app(self, name): 236 """Set an app specified by name as active. 237 238 Parameters 239 ---------- 240 name: str 241 name of the app to set as active 242 """ 243 selected = self.session.query(AppTable).filter_by(name=name).all() 244 if len(selected) == 0: 245 raise RuntimeError('no such app - {}'.format(name)) 246 assert (not (len(selected) > 1)) 247 app = selected[0] 248 self.session.query(CampaignTable).update({'active_app': app.id}) 249 self.session.commit() 250 251 def add_app(self, app_info): 252 """Add application to the 'app' table. 253 254 Parameters 255 ---------- 256 app_info: AppInfo 257 Application definition. 258 """ 259 260 # Check that no app with same name exists 261 name = app_info.name 262 selected = self.session.query(AppTable).filter_by(name=name).all() 263 if len(selected) > 0: 264 message = ( 265 f'There is already an app in this database with name {name}' 266 f'(found {len(selected)}).' 267 ) 268 logger.critical(message) 269 raise RuntimeError(message) 270 271 app_dict = app_info.to_dict(flatten=True) 272 273 db_entry = AppTable(**app_dict) 274 self.session.add(db_entry) 275 self.session.commit() 276 277 def replace_actions(self, app_name, actions): 278 """Replace actions for an app with a given name. 279 280 Parameters 281 ---------- 282 app_name: str 283 Name of the app. 284 actions: Actions 285 `Actions` instance, will replace the current `Actions` of an app. 286 """ 287 self.session.query(AppTable).filter_by(name=app_name).update( 288 {'actions': easyvvuq_serialize(actions)}) 289 self.session.commit() 290 291 def add_sampler(self, sampler_element): 292 """Add new Sampler to the 'sampler' table. 293 294 Parameters 295 ---------- 296 sampler_element: Sampler 297 An EasyVVUQ sampler. 298 299 Returns 300 ------- 301 int 302 The sampler `id` in the database. 303 """ 304 db_entry = SamplerTable(sampler=easyvvuq_serialize(sampler_element)) 305 306 self.session.add(db_entry) 307 self.session.commit() 308 309 return db_entry.id 310 311 def update_sampler(self, sampler_id, sampler_element): 312 """Update the state of the Sampler with id 'sampler_id' to 313 that in the passed 'sampler_element' 314 315 Parameters 316 ---------- 317 sampler_id: int 318 The id of the sampler in the db to update 319 sampler_element: Sampler 320 The sampler that should be used as the new state 321 """ 322 323 selected = self.session.get(SamplerTable,sampler_id) 324 selected.sampler = easyvvuq_serialize(sampler_element) 325 self.session.commit() 326 327 def resurrect_sampler(self, sampler_id): 328 """Return the sampler object corresponding to id sampler_id in the database. 329 It is deserialized from the state stored in the database. 330 331 Parameters 332 ---------- 333 sampler_id: int 334 The id of the sampler to resurrect 335 336 Returns 337 ------- 338 Sampler 339 The 'live' sampler object, deserialized from the state in the db 340 """ 341 try: 342 serialized_sampler = self.session.get(SamplerTable,sampler_id).sampler 343 sampler = easyvvuq_deserialize(serialized_sampler.encode('utf-8')) 344 except AttributeError: 345 sampler = None 346 return sampler 347 348 def resurrect_app(self, app_name): 349 """Return the 'live' encoder, decoder and collation objects corresponding to the app with 350 name 'app_name' in the database. They are deserialized from the states previously 351 stored in the database. 352 353 Parameters 354 ---------- 355 app_name: string 356 Name of the app to resurrect 357 358 Returns 359 ------- 360 Actions 361 The 'live' `Actions` object associated with this app. Used to execute the simulation 362 associated with the app as well as do any pre- and post-processing. 363 """ 364 app_info = self.app(app_name) 365 actions = easyvvuq_deserialize(app_info['actions']) 366 return actions 367 368 def add_runs(self, run_info_list=None, run_prefix='run_', iteration=0): 369 """Add list of runs to the `runs` table in the database. 370 371 Parameters 372 ---------- 373 run_info_list: List of RunInfo objects 374 Each RunInfo object contains relevant run fields: params, status (where in the 375 EasyVVUQ workflow is this RunTable), campaign (id number), sample, app 376 run_prefix: str 377 Prefix for run name 378 iteration: int 379 Iteration number used by iterative workflows. For example, MCMC. Can be left 380 as default zero in other cases. 381 """ 382 # Add all runs to RunTable 383 commit_counter = 0 384 for run_info in run_info_list: 385 run_info.run_name = f"{run_prefix}{self._next_run}" 386 run_info.iteration = iteration 387 run = RunTable(**run_info.to_dict(flatten=True)) 388 self.session.add(run) 389 self._next_run += 1 390 commit_counter += 1 391 if commit_counter % COMMIT_RATE == 0: 392 self.session.commit() 393 # Update run and ensemble counters in db 394 db_info = self.session.query(DBInfoTable).first() 395 db_info.next_run = self._next_run 396 self.session.commit() 397 398 @staticmethod 399 def _run_to_dict(run_row): 400 """Convert the provided row from 'runs' table into a dictionary 401 402 Parameters 403 ---------- 404 run_row: RunTable 405 Information on a particular run in the database. 406 407 Returns 408 ------- 409 dict 410 Contains run information (keys = run_name, params, status, sample, 411 campaign and app) 412 """ 413 414 run_info = { 415 'run_name': run_row.run_name, 416 'params': json.loads(run_row.params), 417 'status': constants.Status(run_row.status), 418 'sampler': run_row.sampler, 419 'campaign': run_row.campaign, 420 'app': run_row.app, 421 'result': run_row.result, 422 'run_dir': run_row.run_dir 423 } 424 425 return run_info 426 427 def set_dir_for_run(self, run_name, run_dir, campaign=None, sampler=None): 428 """Set the 'run_dir' path for the specified run in the database. 429 430 Parameters 431 ---------- 432 run_name: str 433 Name of run to filter for. 434 run_dir: str 435 Directory path associated to set for this run. 436 campaign: int or None 437 Campaign id to filter for. 438 sampler: int or None 439 Sample id to filter for. 440 """ 441 filter_options = {'run_name': run_name} 442 if campaign: 443 filter_options['campaign'] = campaign 444 if sampler: 445 filter_options['sampler'] = sampler 446 selected = self.session.query(RunTable).filter_by(**filter_options) 447 if selected.count() != 1: 448 logging.critical('Multiple runs selected - using the first') 449 selected = selected.first() 450 selected.run_dir = run_dir 451 self.session.commit() 452 453 def get_run_status(self, run_id, campaign=None, sampler=None): 454 """Return the status (enum) for the run with name 'run_name' (and, optionally, 455 filtering for campaign and sampler by id) 456 457 Parameters 458 ---------- 459 run_id: int 460 id of the run 461 campaign: int 462 ID of the desired Campaign 463 sampler: int 464 ID of the desired Sampler 465 466 Returns 467 ------- 468 enum(Status) 469 Status of the run. 470 """ 471 filter_options = {'id': run_id} 472 if campaign: 473 filter_options['campaign'] = campaign 474 if sampler: 475 filter_options['sampler'] = sampler 476 selected = self.session.query(RunTable).filter_by(**filter_options) 477 if selected.count() != 1: 478 logging.critical('Multiple runs selected - using the first') 479 selected = selected.first() 480 return constants.Status(selected.status) 481 482 def set_run_statuses(self, run_id_list, status): 483 """Set the specified 'status' (enum) for all runs in the list run_id_list 484 485 Parameters 486 ---------- 487 run_id_list: list of int 488 a list of run ids 489 status: enum(Status) 490 The new status all listed runs should now have 491 """ 492 self.session.query(RunTable).filter( 493 RunTable.id.in_(run_id_list)).update( 494 {RunTable.status: status}, synchronize_session='fetch') 495 self.session.commit() 496 497 def campaigns(self): 498 """Get list of campaigns for which information is stored in the 499 database. 500 501 Returns 502 ------- 503 list 504 Campaign names. 505 """ 506 507 return [c.name for c in self.session.query(CampaignTable).all()] 508 509 def _get_campaign_info(self, campaign_name=None): 510 """Retrieves Campaign info based on name. 511 512 Parameters 513 ---------- 514 campaign_name: str 515 Name of campaign to select. 516 517 Returns 518 ------- 519 SQLAlchemy query for campaign with this name. 520 """ 521 assert (isinstance(campaign_name, str) or campaign_name is None) 522 query = self.session.query(CampaignTable) 523 if campaign_name is None: 524 campaign_info = query 525 else: 526 campaign_info = query.filter_by(name=campaign_name).all() 527 if campaign_name is not None: 528 if len(campaign_info) > 1: 529 logger.warning( 530 'More than one campaign selected - using first one.') 531 elif len(campaign_info) == 0: 532 message = 'No campaign available.' 533 logger.critical(message) 534 raise RuntimeError(message) 535 return campaign_info[0] 536 return campaign_info.first() 537 538 def get_campaign_id(self, name): 539 """Return the (database) id corresponding to the campaign with name 'name'. 540 541 Parameters 542 ---------- 543 name: str 544 Name of the campaign. 545 546 Returns 547 ------- 548 int 549 The id of the campaign with the specified name 550 """ 551 552 selected = self.session.query( 553 CampaignTable.name.label(name), 554 CampaignTable.id).filter(CampaignTable.name == name).all() 555 if len(selected) == 0: 556 msg = f"No campaign with name {name} found in campaign database" 557 logger.error(msg) 558 raise RuntimeError(msg) 559 if len(selected) > 1: 560 msg = ( 561 f"More than one campaign with name {name} found in" 562 f"campaign database. Database state is compromised." 563 ) 564 logger.error(msg) 565 raise RuntimeError(msg) 566 # Return the database ID for the specified campaign 567 return selected[0][1] 568 569 def get_sampler_id(self, campaign_id): 570 """Return the (database) id corresponding to the sampler currently set 571 for the campaign with id 'campaign_id' 572 573 Parameters 574 ---------- 575 campaign_id: int 576 ID of the campaign. 577 578 Returns 579 ------- 580 int 581 The id of the sampler set for the specified campaign 582 """ 583 sampler_id = self.session.get(CampaignTable,campaign_id).sampler 584 return sampler_id 585 586 def set_sampler(self, campaign_id, sampler_id): 587 """Set specified campaign to be using specified sampler 588 589 Parameters 590 ---------- 591 campaign_id: int 592 ID of the campaign. 593 sampler_id: int 594 ID of the sampler. 595 """ 596 self.session.get(CampaignTable,campaign_id).sampler = sampler_id 597 self.session.commit() 598 599 def campaign_dir(self, campaign_name=None): 600 """Get campaign directory for `campaign_name`. 601 602 Parameters 603 ---------- 604 campaign_name: str 605 Name of campaign to select 606 607 Returns 608 ------- 609 str 610 Path to campaign directory. 611 """ 612 return self._get_campaign_info(campaign_name=campaign_name).campaign_dir 613 614 def _select_runs( 615 self, 616 name=None, 617 campaign=None, 618 sampler=None, 619 status=None, 620 not_status=None, 621 app_id=None): 622 """Select all runs in the database which match the input criteria. 623 624 Parameters 625 ---------- 626 name: str 627 Name of run to filter for. 628 campaign: int or None 629 Campaign id to filter for. 630 sampler: int or None 631 Sampler id to filter for. 632 status: enum(Status) or None 633 Status string to filter for. 634 not_status: enum(Status) or None 635 Exclude runs with this status string 636 app_id: int or None 637 App id to filter for. 638 639 Returns 640 ------- 641 sqlalchemy.orm.query.Query 642 Selected runs from the database run table. 643 """ 644 filter_options = {} 645 if name: 646 filter_options['run_name'] = name 647 if campaign: 648 filter_options['campaign'] = campaign 649 if sampler: 650 filter_options['sampler'] = sampler 651 if status: 652 filter_options['status'] = status 653 if app_id: 654 filter_options['app'] = app_id 655 656 # Note that for some databases this can be sped up with a yield_per(), but not all 657 selected = self.session.query(RunTable).filter_by( 658 **filter_options).filter(RunTable.status != not_status) 659 660 return selected 661 662 def run(self, name, campaign=None, sampler=None, status=None, not_status=None, app_id=None): 663 """Get the information for a specified run. 664 665 Parameters 666 ---------- 667 name: str 668 Name of run to filter for. 669 campaign: int or None 670 Campaign id to filter for. 671 sampler: int or None 672 Sampler id to filter for. 673 status: enum(Status) or None 674 Status string to filter for. 675 not_status: enum(Status) or None 676 Exclude runs with this status string 677 app_id: int or None 678 App id to filter for. 679 680 Returns 681 ------- 682 dict 683 Containing run information (run_name, params, status, sample, 684 campaign, app) 685 """ 686 selected = self._select_runs( 687 name=name, 688 campaign=campaign, 689 sampler=sampler, 690 status=status, 691 not_status=not_status, 692 app_id=app_id) 693 if selected.count() != 1: 694 logging.warning('Multiple runs selected - using the first') 695 selected = selected.first() 696 return self._run_to_dict(selected) 697 698 def runs(self, campaign=None, sampler=None, status=None, not_status=None, app_id=None): 699 """A generator to return all run information for selected `campaign` and `sampler`. 700 701 Parameters 702 ---------- 703 campaign: int or None 704 Campaign id to filter for. 705 sampler: int or None 706 Sampler id to filter for. 707 status: enum(Status) or None 708 Status string to filter for. 709 not_status: enum(Status) or None 710 Exclude runs with this status string 711 app_id: int or None 712 App id to filter for. 713 714 Yields 715 ------ 716 dict 717 Information on each selected run (key = run_name, value = dict of 718 run information fields.), one at a time. 719 """ 720 selected = self._select_runs( 721 campaign=campaign, 722 sampler=sampler, 723 status=status, 724 not_status=not_status, 725 app_id=app_id) 726 for r in selected: 727 yield r.id, self._run_to_dict(r) 728 729 def run_ids(self, campaign=None, sampler=None, status=None, not_status=None, app_id=None): 730 """A generator to return all run IDs for selected `campaign` and `sampler`. 731 732 Parameters 733 ---------- 734 campaign: int or None 735 Campaign id to filter for. 736 sampler: int or None 737 Sampler id to filter for. 738 status: enum(Status) or None 739 Status string to filter for. 740 not_status: enum(Status) or None 741 Exclude runs with this status string 742 app_id: int or None 743 App id to filter for. 744 745 Yields 746 ------ 747 str 748 run ID for each selected run, one at a time. 749 """ 750 selected = self._select_runs( 751 campaign=campaign, 752 sampler=sampler, 753 status=status, 754 not_status=not_status, 755 app_id=app_id) 756 for r in selected: 757 yield r.run_name 758 759 def get_num_runs(self, campaign=None, sampler=None, status=None, not_status=None): 760 """Returns the number of runs matching the filtering criteria. 761 762 Parameters 763 ---------- 764 campaign: int or None 765 Campaign id to filter for. 766 sampler: int or None 767 Sampler id to filter for. 768 status: enum(Status) or None 769 Status string to filter for. 770 not_status: enum(Status) or None 771 Exclude runs with this status string 772 773 Returns 774 ------- 775 int 776 The number of runs in the database matching the filtering criteria 777 778 """ 779 selected = self._select_runs( 780 campaign=campaign, 781 sampler=sampler, 782 status=status, 783 not_status=not_status) 784 return selected.count() 785 786 def runs_dir(self, campaign_name=None): 787 """Get the directory used to store run information for `campaign_name`. 788 789 Parameters 790 ---------- 791 campaign_name: str 792 Name of the selected campaign. 793 794 Returns 795 ------- 796 str 797 Path containing run outputs. 798 """ 799 return self._get_campaign_info(campaign_name=campaign_name).runs_dir 800 801 def store_result(self, run_id, result, change_status=True): 802 """Stores results of a simulation inside the RunTable given a run id. 803 804 Parameters 805 ---------- 806 run_id: int 807 The id of a run to store the results in. This will be the run with which these 808 results are associated with. Namely the run that has the inputs used to generate 809 these results. 810 result: dict 811 Results in dictionary form. This is the same format as used by the `Decoder`. 812 change_status: bool 813 If set to False will not update the runs' status to COLLATED. This is sometimes 814 useful in scenarios where you want several apps to work on the same runs. 815 """ 816 self.commit_counter += 1 817 818 def convert_nonserializable(obj): 819 if isinstance(obj, np.int64): 820 return int(obj) 821 raise TypeError('Unknown type:', type(obj)) 822 result_ = result['result'] 823 result.pop('result') 824 result.pop('run_info') 825 if change_status: 826 self.session.query(RunTable).\ 827 filter(RunTable.id == run_id).\ 828 update({'result': json.dumps(result_, default=convert_nonserializable), 829 'status': constants.Status.COLLATED, 830 'run_dir': result['rundir']}) 831 else: 832 self.session.query(RunTable).\ 833 filter(RunTable.id == run_id).\ 834 update({'result': json.dumps(result_, default=convert_nonserializable), 835 'run_dir': result['rundir']}) 836 if self.commit_counter % COMMIT_RATE == 0: 837 self.session.commit() 838 839 def store_results(self, app_name, results): 840 """Stores the results from a given run in the database. 841 842 Parameters 843 ---------- 844 run_name: str 845 name of the run 846 results: dict 847 dictionary with the results (from the decoder) 848 """ 849 try: 850 app_id = self.session.query(AppTable).filter(AppTable.name == app_name).all()[0].id 851 except IndexError: 852 raise RuntimeError("app with the name {} not found".format(app_name)) 853 commit_counter = 0 854 for run_id, result in results: 855 try: 856 self.session.query(RunTable).\ 857 filter(RunTable.id == run_id, RunTable.app == app_id).\ 858 update({'result': json.dumps(result), 'status': constants.Status.COLLATED}) 859 commit_counter += 1 860 if commit_counter % COMMIT_RATE == 0: 861 self.session.commit() 862 except IndexError: 863 raise RuntimeError("no runs with name {} found".format(run_id)) 864 self.session.commit() 865 866 def get_results(self, app_name, sampler_id, status=constants.Status.COLLATED, iteration=-1): 867 """Returns the results as a pandas DataFrame. 868 869 Parameters 870 ---------- 871 app_name: str 872 Name of the app to return data for. 873 sampler_id: int 874 ID of the sampler. 875 status: STATUS 876 Run status to filter for. 877 iteration: int 878 If a positive integer will return the results for a given iteration only. 879 880 Returns 881 ------- 882 DataFrame 883 Will construct a `DataFrame` from the decoder output dictionaries. 884 """ 885 try: 886 app_id = self.session.query(AppTable).filter(AppTable.name == app_name).all()[0].id 887 except IndexError: 888 raise RuntimeError("app with the name {} not found".format(app_name)) 889 pd_result = {} 890 query = self.session.query(RunTable).\ 891 filter(RunTable.app == app_id).\ 892 filter(RunTable.sampler == sampler_id).\ 893 filter(RunTable.status == status) 894 # if only a specific iteration is requested filter it out 895 if iteration >= 0: 896 query = query.filter(RunTable.iteration == iteration) 897 for row in query: 898 params = {'run_id': row.id} 899 params['iteration'] = row.iteration 900 params = {**params, **json.loads(row.params)} 901 result = json.loads(row.result) 902 pd_dict = {**params, **result} 903 for key in pd_dict.keys(): 904 if not isinstance(pd_dict[key], list): 905 try: 906 pd_result[(key, 0)].append(pd_dict[key]) 907 except KeyError: 908 pd_result[(key, 0)] = [pd_dict[key]] 909 else: 910 for i, elt in enumerate(pd_dict[key]): 911 try: 912 pd_result[(key, i)].append(pd_dict[key][i]) 913 except KeyError: 914 pd_result[(key, i)] = [pd_dict[key][i]] 915 try: 916 return pd.DataFrame(pd_result) 917 except ValueError: 918 raise RuntimeError( 919 'the results received from the database seem to be malformed - commonly because a vector quantity of interest changes dimensionality') 920 921 def relocate(self, new_path, campaign_name): 922 """Update all runs in the db with the new campaign path. 923 924 Parameters 925 ---------- 926 new_path: str 927 new runs directory 928 campaign_name: str 929 name of the campaign 930 """ 931 campaign_id = self.get_campaign_id(campaign_name) 932 campaign_info = self.session.query(CampaignTable).\ 933 filter(CampaignTable.id == campaign_id).first() 934 path, runs_dir = os.path.split(campaign_info.runs_dir) 935 self.session.query(CampaignTable).\ 936 filter(CampaignTable.id == campaign_id).\ 937 update({'campaign_dir': str(new_path), 938 'runs_dir': str(os.path.join(new_path, runs_dir))}) 939 self.session.commit() 940 941 def dump(self): 942 """Dump the database as JSON for debugging purposes. 943 944 Returns 945 ------- 946 dict 947 A database dump in JSON format. 948 """ 949 meta = MetaData() 950 meta.reflect(bind=self.engine) 951 result = {} 952 953 # Create a connection from the engine 954 with self.engine.connect() as connection: 955 for table in meta.sorted_tables: 956 try: 957 query_result = connection.execute(table.select()).fetchall() 958 result[table.name] = [dict(row) for row in query_result] 959 except Exception as e: 960 result[table.name] = str(e) 961 962 return json.dumps(result)
An interface between the campaign database and the campaign.
Parameters
- location (str): database URI as needed by SQLAlchemy
127 def __init__(self, location=None): 128 if location is not None: 129 self.engine = create_engine(location) 130 else: 131 self.engine = create_engine('sqlite://') 132 self.commit_counter = 0 133 session_maker = sessionmaker(bind=self.engine) 134 self.session = session_maker() 135 Base.metadata.create_all(self.engine, checkfirst=True)
137 def resume_campaign(self, name): 138 """Resumes campaign. 139 140 Parameters 141 ---------- 142 name: str 143 Name of the Campaign to resume. Must already exist in the database. 144 """ 145 info = self.session.query( 146 CampaignTable).filter_by(name=name).first() 147 if info is None: 148 raise ValueError('Campaign with the given name not found.') 149 db_info = self.session.query(DBInfoTable).first() 150 self._next_run = db_info.next_run
Resumes campaign.
Parameters
- name (str): Name of the Campaign to resume. Must already exist in the database.
152 def create_campaign(self, info): 153 """Creates a new campaign in the database. 154 155 Parameters 156 ---------- 157 info: CampaignInfo 158 This `easyvvuq.data_structs.CampaignInfo` will contain information 159 needed to construct the Campaign table. 160 """ 161 is_db_empty = (self.session.query(CampaignTable).first() is None) 162 version_check = self.session.query( 163 CampaignTable).filter(CampaignTable.easyvvuq_version != info.easyvvuq_version).all() 164 if (not is_db_empty) and (len(version_check) != 0): 165 raise RuntimeError('Database contains campaign created with an incompatible' + 166 ' version of EasyVVUQ!') 167 self._next_run = 1 168 self.session.add(CampaignTable(**info.to_dict(flatten=True))) 169 self.session.add(DBInfoTable(next_run=self._next_run)) 170 self.session.commit()
Creates a new campaign in the database.
Parameters
- info (CampaignInfo):
This
easyvvuq.data_structs.CampaignInfowill contain information needed to construct the Campaign table.
172 def get_active_app(self): 173 """Returns active app table. 174 175 Returns 176 ------- 177 AppTable 178 """ 179 return self.session.query(AppTable, CampaignTable).filter( 180 AppTable.id == CampaignTable.active_app).first()
Returns active app table.
Returns
- AppTable
182 def campaign_exists(self, name): 183 """Check if campaign specified by that name already exists. 184 185 Parameters 186 ---------- 187 name: str 188 189 Returns 190 ------- 191 bool 192 True if such a campaign already exists, False otherwise 193 """ 194 result = self.session.query(CampaignTable).filter( 195 CampaignTable.name == name).all() 196 return len(result) > 0
Check if campaign specified by that name already exists.
Parameters
- name (str):
Returns
- bool: True if such a campaign already exists, False otherwise
198 def app(self, name=None): 199 """Get app information. Specific applications selected by `name`, 200 otherwise first entry in database 'app' selected. 201 202 Parameters 203 ---------- 204 name : str or None 205 Name of selected app, if `None` given then first app will be 206 selected. 207 208 Returns 209 ------- 210 dict 211 Information about the application. 212 """ 213 214 if name is None: 215 selected = self.session.query(AppTable).all() 216 else: 217 selected = self.session.query(AppTable).filter_by(name=name).all() 218 219 if len(selected) == 0: 220 message = f'No entry for app: ({name}).' 221 logger.critical(message) 222 raise RuntimeError(message) 223 224 selected_app = selected[0] 225 226 app_dict = { 227 'id': selected_app.id, 228 'name': selected_app.name, 229 'params': ParamsSpecification.deserialize(selected_app.params), 230 'actions': selected_app.actions, 231 } 232 233 return app_dict
Get app information. Specific applications selected by name,
otherwise first entry in database 'app' selected.
Parameters
- name (str or None):
Name of selected app, if
Nonegiven then first app will be selected.
Returns
- dict: Information about the application.
235 def set_active_app(self, name): 236 """Set an app specified by name as active. 237 238 Parameters 239 ---------- 240 name: str 241 name of the app to set as active 242 """ 243 selected = self.session.query(AppTable).filter_by(name=name).all() 244 if len(selected) == 0: 245 raise RuntimeError('no such app - {}'.format(name)) 246 assert (not (len(selected) > 1)) 247 app = selected[0] 248 self.session.query(CampaignTable).update({'active_app': app.id}) 249 self.session.commit()
Set an app specified by name as active.
Parameters
- name (str): name of the app to set as active
251 def add_app(self, app_info): 252 """Add application to the 'app' table. 253 254 Parameters 255 ---------- 256 app_info: AppInfo 257 Application definition. 258 """ 259 260 # Check that no app with same name exists 261 name = app_info.name 262 selected = self.session.query(AppTable).filter_by(name=name).all() 263 if len(selected) > 0: 264 message = ( 265 f'There is already an app in this database with name {name}' 266 f'(found {len(selected)}).' 267 ) 268 logger.critical(message) 269 raise RuntimeError(message) 270 271 app_dict = app_info.to_dict(flatten=True) 272 273 db_entry = AppTable(**app_dict) 274 self.session.add(db_entry) 275 self.session.commit()
Add application to the 'app' table.
Parameters
- app_info (AppInfo): Application definition.
277 def replace_actions(self, app_name, actions): 278 """Replace actions for an app with a given name. 279 280 Parameters 281 ---------- 282 app_name: str 283 Name of the app. 284 actions: Actions 285 `Actions` instance, will replace the current `Actions` of an app. 286 """ 287 self.session.query(AppTable).filter_by(name=app_name).update( 288 {'actions': easyvvuq_serialize(actions)}) 289 self.session.commit()
Replace actions for an app with a given name.
Parameters
- app_name (str): Name of the app.
- actions (Actions):
Actionsinstance, will replace the currentActionsof an app.
291 def add_sampler(self, sampler_element): 292 """Add new Sampler to the 'sampler' table. 293 294 Parameters 295 ---------- 296 sampler_element: Sampler 297 An EasyVVUQ sampler. 298 299 Returns 300 ------- 301 int 302 The sampler `id` in the database. 303 """ 304 db_entry = SamplerTable(sampler=easyvvuq_serialize(sampler_element)) 305 306 self.session.add(db_entry) 307 self.session.commit() 308 309 return db_entry.id
Add new Sampler to the 'sampler' table.
Parameters
- sampler_element (Sampler): An EasyVVUQ sampler.
Returns
- int: The sampler
idin the database.
311 def update_sampler(self, sampler_id, sampler_element): 312 """Update the state of the Sampler with id 'sampler_id' to 313 that in the passed 'sampler_element' 314 315 Parameters 316 ---------- 317 sampler_id: int 318 The id of the sampler in the db to update 319 sampler_element: Sampler 320 The sampler that should be used as the new state 321 """ 322 323 selected = self.session.get(SamplerTable,sampler_id) 324 selected.sampler = easyvvuq_serialize(sampler_element) 325 self.session.commit()
Update the state of the Sampler with id 'sampler_id' to that in the passed 'sampler_element'
Parameters
- sampler_id (int): The id of the sampler in the db to update
- sampler_element (Sampler): The sampler that should be used as the new state
327 def resurrect_sampler(self, sampler_id): 328 """Return the sampler object corresponding to id sampler_id in the database. 329 It is deserialized from the state stored in the database. 330 331 Parameters 332 ---------- 333 sampler_id: int 334 The id of the sampler to resurrect 335 336 Returns 337 ------- 338 Sampler 339 The 'live' sampler object, deserialized from the state in the db 340 """ 341 try: 342 serialized_sampler = self.session.get(SamplerTable,sampler_id).sampler 343 sampler = easyvvuq_deserialize(serialized_sampler.encode('utf-8')) 344 except AttributeError: 345 sampler = None 346 return sampler
Return the sampler object corresponding to id sampler_id in the database. It is deserialized from the state stored in the database.
Parameters
- sampler_id (int): The id of the sampler to resurrect
Returns
- Sampler: The 'live' sampler object, deserialized from the state in the db
348 def resurrect_app(self, app_name): 349 """Return the 'live' encoder, decoder and collation objects corresponding to the app with 350 name 'app_name' in the database. They are deserialized from the states previously 351 stored in the database. 352 353 Parameters 354 ---------- 355 app_name: string 356 Name of the app to resurrect 357 358 Returns 359 ------- 360 Actions 361 The 'live' `Actions` object associated with this app. Used to execute the simulation 362 associated with the app as well as do any pre- and post-processing. 363 """ 364 app_info = self.app(app_name) 365 actions = easyvvuq_deserialize(app_info['actions']) 366 return actions
Return the 'live' encoder, decoder and collation objects corresponding to the app with name 'app_name' in the database. They are deserialized from the states previously stored in the database.
Parameters
- app_name (string): Name of the app to resurrect
Returns
- Actions: The 'live'
Actionsobject associated with this app. Used to execute the simulation associated with the app as well as do any pre- and post-processing.
368 def add_runs(self, run_info_list=None, run_prefix='run_', iteration=0): 369 """Add list of runs to the `runs` table in the database. 370 371 Parameters 372 ---------- 373 run_info_list: List of RunInfo objects 374 Each RunInfo object contains relevant run fields: params, status (where in the 375 EasyVVUQ workflow is this RunTable), campaign (id number), sample, app 376 run_prefix: str 377 Prefix for run name 378 iteration: int 379 Iteration number used by iterative workflows. For example, MCMC. Can be left 380 as default zero in other cases. 381 """ 382 # Add all runs to RunTable 383 commit_counter = 0 384 for run_info in run_info_list: 385 run_info.run_name = f"{run_prefix}{self._next_run}" 386 run_info.iteration = iteration 387 run = RunTable(**run_info.to_dict(flatten=True)) 388 self.session.add(run) 389 self._next_run += 1 390 commit_counter += 1 391 if commit_counter % COMMIT_RATE == 0: 392 self.session.commit() 393 # Update run and ensemble counters in db 394 db_info = self.session.query(DBInfoTable).first() 395 db_info.next_run = self._next_run 396 self.session.commit()
Add list of runs to the runs table in the database.
Parameters
- run_info_list (List of RunInfo objects): Each RunInfo object contains relevant run fields: params, status (where in the EasyVVUQ workflow is this RunTable), campaign (id number), sample, app
- run_prefix (str): Prefix for run name
- iteration (int): Iteration number used by iterative workflows. For example, MCMC. Can be left as default zero in other cases.
427 def set_dir_for_run(self, run_name, run_dir, campaign=None, sampler=None): 428 """Set the 'run_dir' path for the specified run in the database. 429 430 Parameters 431 ---------- 432 run_name: str 433 Name of run to filter for. 434 run_dir: str 435 Directory path associated to set for this run. 436 campaign: int or None 437 Campaign id to filter for. 438 sampler: int or None 439 Sample id to filter for. 440 """ 441 filter_options = {'run_name': run_name} 442 if campaign: 443 filter_options['campaign'] = campaign 444 if sampler: 445 filter_options['sampler'] = sampler 446 selected = self.session.query(RunTable).filter_by(**filter_options) 447 if selected.count() != 1: 448 logging.critical('Multiple runs selected - using the first') 449 selected = selected.first() 450 selected.run_dir = run_dir 451 self.session.commit()
Set the 'run_dir' path for the specified run in the database.
Parameters
- run_name (str): Name of run to filter for.
- run_dir (str): Directory path associated to set for this run.
- campaign (int or None): Campaign id to filter for.
- sampler (int or None): Sample id to filter for.
453 def get_run_status(self, run_id, campaign=None, sampler=None): 454 """Return the status (enum) for the run with name 'run_name' (and, optionally, 455 filtering for campaign and sampler by id) 456 457 Parameters 458 ---------- 459 run_id: int 460 id of the run 461 campaign: int 462 ID of the desired Campaign 463 sampler: int 464 ID of the desired Sampler 465 466 Returns 467 ------- 468 enum(Status) 469 Status of the run. 470 """ 471 filter_options = {'id': run_id} 472 if campaign: 473 filter_options['campaign'] = campaign 474 if sampler: 475 filter_options['sampler'] = sampler 476 selected = self.session.query(RunTable).filter_by(**filter_options) 477 if selected.count() != 1: 478 logging.critical('Multiple runs selected - using the first') 479 selected = selected.first() 480 return constants.Status(selected.status)
Return the status (enum) for the run with name 'run_name' (and, optionally, filtering for campaign and sampler by id)
Parameters
- run_id (int): id of the run
- campaign (int): ID of the desired Campaign
- sampler (int): ID of the desired Sampler
Returns
- enum(Status): Status of the run.
482 def set_run_statuses(self, run_id_list, status): 483 """Set the specified 'status' (enum) for all runs in the list run_id_list 484 485 Parameters 486 ---------- 487 run_id_list: list of int 488 a list of run ids 489 status: enum(Status) 490 The new status all listed runs should now have 491 """ 492 self.session.query(RunTable).filter( 493 RunTable.id.in_(run_id_list)).update( 494 {RunTable.status: status}, synchronize_session='fetch') 495 self.session.commit()
Set the specified 'status' (enum) for all runs in the list run_id_list
Parameters
- run_id_list (list of int): a list of run ids
- status (enum(Status)): The new status all listed runs should now have
497 def campaigns(self): 498 """Get list of campaigns for which information is stored in the 499 database. 500 501 Returns 502 ------- 503 list 504 Campaign names. 505 """ 506 507 return [c.name for c in self.session.query(CampaignTable).all()]
Get list of campaigns for which information is stored in the database.
Returns
- list: Campaign names.
538 def get_campaign_id(self, name): 539 """Return the (database) id corresponding to the campaign with name 'name'. 540 541 Parameters 542 ---------- 543 name: str 544 Name of the campaign. 545 546 Returns 547 ------- 548 int 549 The id of the campaign with the specified name 550 """ 551 552 selected = self.session.query( 553 CampaignTable.name.label(name), 554 CampaignTable.id).filter(CampaignTable.name == name).all() 555 if len(selected) == 0: 556 msg = f"No campaign with name {name} found in campaign database" 557 logger.error(msg) 558 raise RuntimeError(msg) 559 if len(selected) > 1: 560 msg = ( 561 f"More than one campaign with name {name} found in" 562 f"campaign database. Database state is compromised." 563 ) 564 logger.error(msg) 565 raise RuntimeError(msg) 566 # Return the database ID for the specified campaign 567 return selected[0][1]
Return the (database) id corresponding to the campaign with name 'name'.
Parameters
- name (str): Name of the campaign.
Returns
- int: The id of the campaign with the specified name
569 def get_sampler_id(self, campaign_id): 570 """Return the (database) id corresponding to the sampler currently set 571 for the campaign with id 'campaign_id' 572 573 Parameters 574 ---------- 575 campaign_id: int 576 ID of the campaign. 577 578 Returns 579 ------- 580 int 581 The id of the sampler set for the specified campaign 582 """ 583 sampler_id = self.session.get(CampaignTable,campaign_id).sampler 584 return sampler_id
Return the (database) id corresponding to the sampler currently set for the campaign with id 'campaign_id'
Parameters
- campaign_id (int): ID of the campaign.
Returns
- int: The id of the sampler set for the specified campaign
586 def set_sampler(self, campaign_id, sampler_id): 587 """Set specified campaign to be using specified sampler 588 589 Parameters 590 ---------- 591 campaign_id: int 592 ID of the campaign. 593 sampler_id: int 594 ID of the sampler. 595 """ 596 self.session.get(CampaignTable,campaign_id).sampler = sampler_id 597 self.session.commit()
Set specified campaign to be using specified sampler
Parameters
- campaign_id (int): ID of the campaign.
- sampler_id (int): ID of the sampler.
599 def campaign_dir(self, campaign_name=None): 600 """Get campaign directory for `campaign_name`. 601 602 Parameters 603 ---------- 604 campaign_name: str 605 Name of campaign to select 606 607 Returns 608 ------- 609 str 610 Path to campaign directory. 611 """ 612 return self._get_campaign_info(campaign_name=campaign_name).campaign_dir
Get campaign directory for campaign_name.
Parameters
- campaign_name (str): Name of campaign to select
Returns
- str: Path to campaign directory.
662 def run(self, name, campaign=None, sampler=None, status=None, not_status=None, app_id=None): 663 """Get the information for a specified run. 664 665 Parameters 666 ---------- 667 name: str 668 Name of run to filter for. 669 campaign: int or None 670 Campaign id to filter for. 671 sampler: int or None 672 Sampler id to filter for. 673 status: enum(Status) or None 674 Status string to filter for. 675 not_status: enum(Status) or None 676 Exclude runs with this status string 677 app_id: int or None 678 App id to filter for. 679 680 Returns 681 ------- 682 dict 683 Containing run information (run_name, params, status, sample, 684 campaign, app) 685 """ 686 selected = self._select_runs( 687 name=name, 688 campaign=campaign, 689 sampler=sampler, 690 status=status, 691 not_status=not_status, 692 app_id=app_id) 693 if selected.count() != 1: 694 logging.warning('Multiple runs selected - using the first') 695 selected = selected.first() 696 return self._run_to_dict(selected)
Get the information for a specified run.
Parameters
- name (str): Name of run to filter for.
- campaign (int or None): Campaign id to filter for.
- sampler (int or None): Sampler id to filter for.
- status (enum(Status) or None): Status string to filter for.
- not_status (enum(Status) or None): Exclude runs with this status string
- app_id (int or None): App id to filter for.
Returns
- dict: Containing run information (run_name, params, status, sample, campaign, app)
698 def runs(self, campaign=None, sampler=None, status=None, not_status=None, app_id=None): 699 """A generator to return all run information for selected `campaign` and `sampler`. 700 701 Parameters 702 ---------- 703 campaign: int or None 704 Campaign id to filter for. 705 sampler: int or None 706 Sampler id to filter for. 707 status: enum(Status) or None 708 Status string to filter for. 709 not_status: enum(Status) or None 710 Exclude runs with this status string 711 app_id: int or None 712 App id to filter for. 713 714 Yields 715 ------ 716 dict 717 Information on each selected run (key = run_name, value = dict of 718 run information fields.), one at a time. 719 """ 720 selected = self._select_runs( 721 campaign=campaign, 722 sampler=sampler, 723 status=status, 724 not_status=not_status, 725 app_id=app_id) 726 for r in selected: 727 yield r.id, self._run_to_dict(r)
A generator to return all run information for selected campaign and sampler.
Parameters
- campaign (int or None): Campaign id to filter for.
- sampler (int or None): Sampler id to filter for.
- status (enum(Status) or None): Status string to filter for.
- not_status (enum(Status) or None): Exclude runs with this status string
- app_id (int or None): App id to filter for.
Yields
- dict: Information on each selected run (key = run_name, value = dict of run information fields.), one at a time.
729 def run_ids(self, campaign=None, sampler=None, status=None, not_status=None, app_id=None): 730 """A generator to return all run IDs for selected `campaign` and `sampler`. 731 732 Parameters 733 ---------- 734 campaign: int or None 735 Campaign id to filter for. 736 sampler: int or None 737 Sampler id to filter for. 738 status: enum(Status) or None 739 Status string to filter for. 740 not_status: enum(Status) or None 741 Exclude runs with this status string 742 app_id: int or None 743 App id to filter for. 744 745 Yields 746 ------ 747 str 748 run ID for each selected run, one at a time. 749 """ 750 selected = self._select_runs( 751 campaign=campaign, 752 sampler=sampler, 753 status=status, 754 not_status=not_status, 755 app_id=app_id) 756 for r in selected: 757 yield r.run_name
A generator to return all run IDs for selected campaign and sampler.
Parameters
- campaign (int or None): Campaign id to filter for.
- sampler (int or None): Sampler id to filter for.
- status (enum(Status) or None): Status string to filter for.
- not_status (enum(Status) or None): Exclude runs with this status string
- app_id (int or None): App id to filter for.
Yields
- str: run ID for each selected run, one at a time.
759 def get_num_runs(self, campaign=None, sampler=None, status=None, not_status=None): 760 """Returns the number of runs matching the filtering criteria. 761 762 Parameters 763 ---------- 764 campaign: int or None 765 Campaign id to filter for. 766 sampler: int or None 767 Sampler id to filter for. 768 status: enum(Status) or None 769 Status string to filter for. 770 not_status: enum(Status) or None 771 Exclude runs with this status string 772 773 Returns 774 ------- 775 int 776 The number of runs in the database matching the filtering criteria 777 778 """ 779 selected = self._select_runs( 780 campaign=campaign, 781 sampler=sampler, 782 status=status, 783 not_status=not_status) 784 return selected.count()
Returns the number of runs matching the filtering criteria.
Parameters
- campaign (int or None): Campaign id to filter for.
- sampler (int or None): Sampler id to filter for.
- status (enum(Status) or None): Status string to filter for.
- not_status (enum(Status) or None): Exclude runs with this status string
Returns
- int: The number of runs in the database matching the filtering criteria
786 def runs_dir(self, campaign_name=None): 787 """Get the directory used to store run information for `campaign_name`. 788 789 Parameters 790 ---------- 791 campaign_name: str 792 Name of the selected campaign. 793 794 Returns 795 ------- 796 str 797 Path containing run outputs. 798 """ 799 return self._get_campaign_info(campaign_name=campaign_name).runs_dir
Get the directory used to store run information for campaign_name.
Parameters
- campaign_name (str): Name of the selected campaign.
Returns
- str: Path containing run outputs.
801 def store_result(self, run_id, result, change_status=True): 802 """Stores results of a simulation inside the RunTable given a run id. 803 804 Parameters 805 ---------- 806 run_id: int 807 The id of a run to store the results in. This will be the run with which these 808 results are associated with. Namely the run that has the inputs used to generate 809 these results. 810 result: dict 811 Results in dictionary form. This is the same format as used by the `Decoder`. 812 change_status: bool 813 If set to False will not update the runs' status to COLLATED. This is sometimes 814 useful in scenarios where you want several apps to work on the same runs. 815 """ 816 self.commit_counter += 1 817 818 def convert_nonserializable(obj): 819 if isinstance(obj, np.int64): 820 return int(obj) 821 raise TypeError('Unknown type:', type(obj)) 822 result_ = result['result'] 823 result.pop('result') 824 result.pop('run_info') 825 if change_status: 826 self.session.query(RunTable).\ 827 filter(RunTable.id == run_id).\ 828 update({'result': json.dumps(result_, default=convert_nonserializable), 829 'status': constants.Status.COLLATED, 830 'run_dir': result['rundir']}) 831 else: 832 self.session.query(RunTable).\ 833 filter(RunTable.id == run_id).\ 834 update({'result': json.dumps(result_, default=convert_nonserializable), 835 'run_dir': result['rundir']}) 836 if self.commit_counter % COMMIT_RATE == 0: 837 self.session.commit()
Stores results of a simulation inside the RunTable given a run id.
Parameters
- run_id (int): The id of a run to store the results in. This will be the run with which these results are associated with. Namely the run that has the inputs used to generate these results.
- result (dict):
Results in dictionary form. This is the same format as used by the
Decoder. - change_status (bool): If set to False will not update the runs' status to COLLATED. This is sometimes useful in scenarios where you want several apps to work on the same runs.
839 def store_results(self, app_name, results): 840 """Stores the results from a given run in the database. 841 842 Parameters 843 ---------- 844 run_name: str 845 name of the run 846 results: dict 847 dictionary with the results (from the decoder) 848 """ 849 try: 850 app_id = self.session.query(AppTable).filter(AppTable.name == app_name).all()[0].id 851 except IndexError: 852 raise RuntimeError("app with the name {} not found".format(app_name)) 853 commit_counter = 0 854 for run_id, result in results: 855 try: 856 self.session.query(RunTable).\ 857 filter(RunTable.id == run_id, RunTable.app == app_id).\ 858 update({'result': json.dumps(result), 'status': constants.Status.COLLATED}) 859 commit_counter += 1 860 if commit_counter % COMMIT_RATE == 0: 861 self.session.commit() 862 except IndexError: 863 raise RuntimeError("no runs with name {} found".format(run_id)) 864 self.session.commit()
Stores the results from a given run in the database.
Parameters
- run_name (str): name of the run
- results (dict): dictionary with the results (from the decoder)
866 def get_results(self, app_name, sampler_id, status=constants.Status.COLLATED, iteration=-1): 867 """Returns the results as a pandas DataFrame. 868 869 Parameters 870 ---------- 871 app_name: str 872 Name of the app to return data for. 873 sampler_id: int 874 ID of the sampler. 875 status: STATUS 876 Run status to filter for. 877 iteration: int 878 If a positive integer will return the results for a given iteration only. 879 880 Returns 881 ------- 882 DataFrame 883 Will construct a `DataFrame` from the decoder output dictionaries. 884 """ 885 try: 886 app_id = self.session.query(AppTable).filter(AppTable.name == app_name).all()[0].id 887 except IndexError: 888 raise RuntimeError("app with the name {} not found".format(app_name)) 889 pd_result = {} 890 query = self.session.query(RunTable).\ 891 filter(RunTable.app == app_id).\ 892 filter(RunTable.sampler == sampler_id).\ 893 filter(RunTable.status == status) 894 # if only a specific iteration is requested filter it out 895 if iteration >= 0: 896 query = query.filter(RunTable.iteration == iteration) 897 for row in query: 898 params = {'run_id': row.id} 899 params['iteration'] = row.iteration 900 params = {**params, **json.loads(row.params)} 901 result = json.loads(row.result) 902 pd_dict = {**params, **result} 903 for key in pd_dict.keys(): 904 if not isinstance(pd_dict[key], list): 905 try: 906 pd_result[(key, 0)].append(pd_dict[key]) 907 except KeyError: 908 pd_result[(key, 0)] = [pd_dict[key]] 909 else: 910 for i, elt in enumerate(pd_dict[key]): 911 try: 912 pd_result[(key, i)].append(pd_dict[key][i]) 913 except KeyError: 914 pd_result[(key, i)] = [pd_dict[key][i]] 915 try: 916 return pd.DataFrame(pd_result) 917 except ValueError: 918 raise RuntimeError( 919 'the results received from the database seem to be malformed - commonly because a vector quantity of interest changes dimensionality')
Returns the results as a pandas DataFrame.
Parameters
- app_name (str): Name of the app to return data for.
- sampler_id (int): ID of the sampler.
- status (STATUS): Run status to filter for.
- iteration (int): If a positive integer will return the results for a given iteration only.
Returns
- DataFrame: Will construct a
DataFramefrom the decoder output dictionaries.
921 def relocate(self, new_path, campaign_name): 922 """Update all runs in the db with the new campaign path. 923 924 Parameters 925 ---------- 926 new_path: str 927 new runs directory 928 campaign_name: str 929 name of the campaign 930 """ 931 campaign_id = self.get_campaign_id(campaign_name) 932 campaign_info = self.session.query(CampaignTable).\ 933 filter(CampaignTable.id == campaign_id).first() 934 path, runs_dir = os.path.split(campaign_info.runs_dir) 935 self.session.query(CampaignTable).\ 936 filter(CampaignTable.id == campaign_id).\ 937 update({'campaign_dir': str(new_path), 938 'runs_dir': str(os.path.join(new_path, runs_dir))}) 939 self.session.commit()
Update all runs in the db with the new campaign path.
Parameters
- new_path (str): new runs directory
- campaign_name (str): name of the campaign
941 def dump(self): 942 """Dump the database as JSON for debugging purposes. 943 944 Returns 945 ------- 946 dict 947 A database dump in JSON format. 948 """ 949 meta = MetaData() 950 meta.reflect(bind=self.engine) 951 result = {} 952 953 # Create a connection from the engine 954 with self.engine.connect() as connection: 955 for table in meta.sorted_tables: 956 try: 957 query_result = connection.execute(table.select()).fetchall() 958 result[table.name] = [dict(row) for row in query_result] 959 except Exception as e: 960 result[table.name] = str(e) 961 962 return json.dumps(result)
Dump the database as JSON for debugging purposes.
Returns
- dict: A database dump in JSON format.