easyvvuq.actions.action_statuses
Implements ActionPool - a thin wrapper around the Python Executor interface that is meant to simplify the execution of actions and retrieval of results. This object is instantiated by the Campaign. The user would never instantiate it manually. The user does interact with it to track the progress of execution.
1"""Implements ActionPool - a thin wrapper around the Python Executor interface 2that is meant to simplify the execution of actions and retrieval of results. 3This object is instantiated by the Campaign. The user would never instantiate it 4manually. The user does interact with it to track the progress of execution. 5""" 6import concurrent 7from concurrent.futures import ThreadPoolExecutor 8from dask.distributed import Client 9from tqdm import tqdm 10import copy 11 12from . import QCGPJPool 13 14__copyright__ = """ 15 16 Copyright 2020 Vytautas Jancauskas 17 18 This file is part of EasyVVUQ 19 20 EasyVVUQ is free software: you can redistribute it and/or modify 21 it under the terms of the Lesser GNU General Public License as published by 22 the Free Software Foundation, either version 3 of the License, or 23 (at your option) any later version. 24 25 EasyVVUQ is distributed in the hope that it will be useful, 26 but WITHOUT ANY WARRANTY; without even the implied warranty of 27 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 28 Lesser GNU General Public License for more details. 29 30 You should have received a copy of the Lesser GNU General Public License 31 along with this program. If not, see <https://www.gnu.org/licenses/>. 32 33""" 34__license__ = "LGPL" 35 36 37class ActionPool: 38 """A class that handles the execution of Actions. 39 40 Parameters 41 ---------- 42 campaign: Campaign 43 An instance of an EasyVVUQ campaign. 44 actions: Actions 45 An instance of `Actions` containing things to be done as part of the simulation. 46 inits: iterable 47 Initial inputs to be passed to each `Actions` representing a sample. Will usually contain 48 dictionaries with the following information: {'run_id': ..., 'campaign_dir': ..., 49 'run_info': ...}. 50 sequential: bool 51 Will run the actions sequentially. 52 """ 53 54 def __init__(self, campaign, actions, inits, sequential=False): 55 self.campaign = campaign 56 self.actions = actions 57 self.inits = inits 58 self.sequential = sequential 59 self.futures = [] 60 self.results = [] 61 self._collate_callback = lambda previous: previous 62 63 def start(self, pool=None): 64 """Start the actions. 65 66 Parameters 67 ---------- 68 pool: An Executor instance (e.g. ThreadPoolExecutor) 69 70 Returns 71 ------- 72 ActionPool 73 Starts execution and returns a reference to itself for tracking progress 74 and for collation. 75 """ 76 if pool is None: 77 pool = ThreadPoolExecutor() 78 self.pool = pool 79 for previous in self.inits: 80 previous = copy.copy(previous) 81 if self.sequential: 82 result = self.actions.start(previous) 83 self.results.append(result) 84 else: 85 future = self.pool.submit(self.actions.start, previous) 86 self.futures.append(future) 87 return self 88 89 def progress(self): 90 """Some basic stats about the action statuses status. 91 92 Returns 93 ------- 94 dict 95 A dictionary with four keys - 'ready', 'active' and 'finished', 'failed'. 96 Values under "ready" correspond to `Actions` waiting for execution, "active" 97 corresponds to the number of currently running tasks. 98 """ 99 ready = 0 100 running = 0 101 done = 0 102 failed = 0 103 for future in self.futures: 104 if future.running(): 105 running += 1 106 elif future.done(): 107 if not future.result(): 108 failed += 1 109 else: 110 done += 1 111 else: 112 ready += 1 113 return {'ready': ready, 'active': running, 'finished': done, 'failed': failed} 114 115 def add_collate_callback(self, fn): 116 """Adds a callback to be called after collation is done. 117 118 Parameters 119 ---------- 120 fn - A callable that takes previous as it's only input. 121 """ 122 self._collate_callback = fn 123 124 def collate(self, progress_bar=False): 125 """A command that will block until all Futures in the pool have finished. 126 It will also store the results gather from `Actions` in the database. 127 128 Parameters 129 ---------- 130 progress_bar: bool 131 Whether to show progress bar 132 """ 133 if not progress_bar: 134 def tqdm_(x, total=None): return x 135 else: 136 tqdm_ = tqdm 137 if isinstance(self.pool, Client): 138 self.results = self.pool.gather(self.futures) 139 if self.sequential or isinstance(self.pool, Client): 140 for result in tqdm_(self.results, total=len(self.results)): 141 result = self._collate_callback(result) 142 self.campaign.campaign_db.store_result( 143 result['run_id'], result, change_status=result['collated']) 144 else: 145 if isinstance(self.pool, QCGPJPool): 146 as_completed_fn = self.pool.as_completed 147 self.add_collate_callback(self.pool.convert_results) 148 else: 149 as_completed_fn = concurrent.futures.as_completed 150 151 for future in tqdm_(as_completed_fn(self.futures), total=len(self.futures)): 152 result = self._collate_callback(future.result()) 153 self.campaign.campaign_db.store_result( 154 result['run_id'], result, change_status=result['collated']) 155 self.campaign.campaign_db.session.commit()
class
ActionPool:
38class ActionPool: 39 """A class that handles the execution of Actions. 40 41 Parameters 42 ---------- 43 campaign: Campaign 44 An instance of an EasyVVUQ campaign. 45 actions: Actions 46 An instance of `Actions` containing things to be done as part of the simulation. 47 inits: iterable 48 Initial inputs to be passed to each `Actions` representing a sample. Will usually contain 49 dictionaries with the following information: {'run_id': ..., 'campaign_dir': ..., 50 'run_info': ...}. 51 sequential: bool 52 Will run the actions sequentially. 53 """ 54 55 def __init__(self, campaign, actions, inits, sequential=False): 56 self.campaign = campaign 57 self.actions = actions 58 self.inits = inits 59 self.sequential = sequential 60 self.futures = [] 61 self.results = [] 62 self._collate_callback = lambda previous: previous 63 64 def start(self, pool=None): 65 """Start the actions. 66 67 Parameters 68 ---------- 69 pool: An Executor instance (e.g. ThreadPoolExecutor) 70 71 Returns 72 ------- 73 ActionPool 74 Starts execution and returns a reference to itself for tracking progress 75 and for collation. 76 """ 77 if pool is None: 78 pool = ThreadPoolExecutor() 79 self.pool = pool 80 for previous in self.inits: 81 previous = copy.copy(previous) 82 if self.sequential: 83 result = self.actions.start(previous) 84 self.results.append(result) 85 else: 86 future = self.pool.submit(self.actions.start, previous) 87 self.futures.append(future) 88 return self 89 90 def progress(self): 91 """Some basic stats about the action statuses status. 92 93 Returns 94 ------- 95 dict 96 A dictionary with four keys - 'ready', 'active' and 'finished', 'failed'. 97 Values under "ready" correspond to `Actions` waiting for execution, "active" 98 corresponds to the number of currently running tasks. 99 """ 100 ready = 0 101 running = 0 102 done = 0 103 failed = 0 104 for future in self.futures: 105 if future.running(): 106 running += 1 107 elif future.done(): 108 if not future.result(): 109 failed += 1 110 else: 111 done += 1 112 else: 113 ready += 1 114 return {'ready': ready, 'active': running, 'finished': done, 'failed': failed} 115 116 def add_collate_callback(self, fn): 117 """Adds a callback to be called after collation is done. 118 119 Parameters 120 ---------- 121 fn - A callable that takes previous as it's only input. 122 """ 123 self._collate_callback = fn 124 125 def collate(self, progress_bar=False): 126 """A command that will block until all Futures in the pool have finished. 127 It will also store the results gather from `Actions` in the database. 128 129 Parameters 130 ---------- 131 progress_bar: bool 132 Whether to show progress bar 133 """ 134 if not progress_bar: 135 def tqdm_(x, total=None): return x 136 else: 137 tqdm_ = tqdm 138 if isinstance(self.pool, Client): 139 self.results = self.pool.gather(self.futures) 140 if self.sequential or isinstance(self.pool, Client): 141 for result in tqdm_(self.results, total=len(self.results)): 142 result = self._collate_callback(result) 143 self.campaign.campaign_db.store_result( 144 result['run_id'], result, change_status=result['collated']) 145 else: 146 if isinstance(self.pool, QCGPJPool): 147 as_completed_fn = self.pool.as_completed 148 self.add_collate_callback(self.pool.convert_results) 149 else: 150 as_completed_fn = concurrent.futures.as_completed 151 152 for future in tqdm_(as_completed_fn(self.futures), total=len(self.futures)): 153 result = self._collate_callback(future.result()) 154 self.campaign.campaign_db.store_result( 155 result['run_id'], result, change_status=result['collated']) 156 self.campaign.campaign_db.session.commit()
A class that handles the execution of Actions.
Parameters
- campaign (Campaign): An instance of an EasyVVUQ campaign.
- actions (Actions):
An instance of
Actionscontaining things to be done as part of the simulation. - inits (iterable):
Initial inputs to be passed to each
Actionsrepresenting a sample. Will usually contain dictionaries with the following information: {'run_id': ..., 'campaign_dir': ..., 'run_info': ...}. - sequential (bool): Will run the actions sequentially.
def
start(self, pool=None):
64 def start(self, pool=None): 65 """Start the actions. 66 67 Parameters 68 ---------- 69 pool: An Executor instance (e.g. ThreadPoolExecutor) 70 71 Returns 72 ------- 73 ActionPool 74 Starts execution and returns a reference to itself for tracking progress 75 and for collation. 76 """ 77 if pool is None: 78 pool = ThreadPoolExecutor() 79 self.pool = pool 80 for previous in self.inits: 81 previous = copy.copy(previous) 82 if self.sequential: 83 result = self.actions.start(previous) 84 self.results.append(result) 85 else: 86 future = self.pool.submit(self.actions.start, previous) 87 self.futures.append(future) 88 return self
Start the actions.
Parameters
- pool (An Executor instance (e.g. ThreadPoolExecutor)):
Returns
- ActionPool: Starts execution and returns a reference to itself for tracking progress and for collation.
def
progress(self):
90 def progress(self): 91 """Some basic stats about the action statuses status. 92 93 Returns 94 ------- 95 dict 96 A dictionary with four keys - 'ready', 'active' and 'finished', 'failed'. 97 Values under "ready" correspond to `Actions` waiting for execution, "active" 98 corresponds to the number of currently running tasks. 99 """ 100 ready = 0 101 running = 0 102 done = 0 103 failed = 0 104 for future in self.futures: 105 if future.running(): 106 running += 1 107 elif future.done(): 108 if not future.result(): 109 failed += 1 110 else: 111 done += 1 112 else: 113 ready += 1 114 return {'ready': ready, 'active': running, 'finished': done, 'failed': failed}
Some basic stats about the action statuses status.
Returns
- dict: A dictionary with four keys - 'ready', 'active' and 'finished', 'failed'.
Values under "ready" correspond to
Actionswaiting for execution, "active" corresponds to the number of currently running tasks.
def
add_collate_callback(self, fn):
116 def add_collate_callback(self, fn): 117 """Adds a callback to be called after collation is done. 118 119 Parameters 120 ---------- 121 fn - A callable that takes previous as it's only input. 122 """ 123 self._collate_callback = fn
Adds a callback to be called after collation is done.
Parameters
- fn - A callable that takes previous as it's only input.
def
collate(self, progress_bar=False):
125 def collate(self, progress_bar=False): 126 """A command that will block until all Futures in the pool have finished. 127 It will also store the results gather from `Actions` in the database. 128 129 Parameters 130 ---------- 131 progress_bar: bool 132 Whether to show progress bar 133 """ 134 if not progress_bar: 135 def tqdm_(x, total=None): return x 136 else: 137 tqdm_ = tqdm 138 if isinstance(self.pool, Client): 139 self.results = self.pool.gather(self.futures) 140 if self.sequential or isinstance(self.pool, Client): 141 for result in tqdm_(self.results, total=len(self.results)): 142 result = self._collate_callback(result) 143 self.campaign.campaign_db.store_result( 144 result['run_id'], result, change_status=result['collated']) 145 else: 146 if isinstance(self.pool, QCGPJPool): 147 as_completed_fn = self.pool.as_completed 148 self.add_collate_callback(self.pool.convert_results) 149 else: 150 as_completed_fn = concurrent.futures.as_completed 151 152 for future in tqdm_(as_completed_fn(self.futures), total=len(self.futures)): 153 result = self._collate_callback(future.result()) 154 self.campaign.campaign_db.store_result( 155 result['run_id'], result, change_status=result['collated']) 156 self.campaign.campaign_db.session.commit()
A command that will block until all Futures in the pool have finished.
It will also store the results gather from Actions in the database.
Parameters
- progress_bar (bool): Whether to show progress bar