easyvvuq.actions.action_statuses

Implements ActionPool - a thin wrapper around the Python Executor interface that is meant to simplify the execution of actions and retrieval of results. This object is instantiated by the Campaign. The user would never instantiate it manually. The user does interact with it to track the progress of execution.

  1"""Implements ActionPool - a thin wrapper around the Python Executor interface
  2that is meant to simplify the execution of actions and retrieval of results.
  3This object is instantiated by the Campaign. The user would never instantiate it
  4manually. The user does interact with it to track the progress of execution.
  5"""
  6import concurrent
  7from concurrent.futures import ThreadPoolExecutor
  8from dask.distributed import Client
  9from tqdm import tqdm
 10import copy
 11
 12from . import QCGPJPool
 13
 14__copyright__ = """
 15
 16    Copyright 2020 Vytautas Jancauskas
 17
 18    This file is part of EasyVVUQ
 19
 20    EasyVVUQ is free software: you can redistribute it and/or modify
 21    it under the terms of the Lesser GNU General Public License as published by
 22    the Free Software Foundation, either version 3 of the License, or
 23    (at your option) any later version.
 24
 25    EasyVVUQ is distributed in the hope that it will be useful,
 26    but WITHOUT ANY WARRANTY; without even the implied warranty of
 27    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 28    Lesser GNU General Public License for more details.
 29
 30    You should have received a copy of the Lesser GNU General Public License
 31    along with this program.  If not, see <https://www.gnu.org/licenses/>.
 32
 33"""
 34__license__ = "LGPL"
 35
 36
 37class ActionPool:
 38    """A class that handles the execution of Actions.
 39
 40    Parameters
 41    ----------
 42    campaign: Campaign
 43        An instance of an EasyVVUQ campaign.
 44    actions: Actions
 45        An instance of `Actions` containing things to be done as part of the simulation.
 46    inits: iterable
 47        Initial inputs to be passed to each `Actions` representing a sample. Will usually contain
 48        dictionaries with the following information: {'run_id': ..., 'campaign_dir': ...,
 49        'run_info': ...}.
 50    sequential: bool
 51        Will run the actions sequentially.
 52    """
 53
 54    def __init__(self, campaign, actions, inits, sequential=False):
 55        self.campaign = campaign
 56        self.actions = actions
 57        self.inits = inits
 58        self.sequential = sequential
 59        self.futures = []
 60        self.results = []
 61        self._collate_callback = lambda previous: previous
 62
 63    def start(self, pool=None):
 64        """Start the actions.
 65
 66        Parameters
 67        ----------
 68        pool: An Executor instance (e.g. ThreadPoolExecutor)
 69
 70        Returns
 71        -------
 72        ActionPool
 73            Starts execution and returns a reference to itself for tracking progress
 74            and for collation.
 75        """
 76        if pool is None:
 77            pool = ThreadPoolExecutor()
 78        self.pool = pool
 79        for previous in self.inits:
 80            previous = copy.copy(previous)
 81            if self.sequential:
 82                result = self.actions.start(previous)
 83                self.results.append(result)
 84            else:
 85                future = self.pool.submit(self.actions.start, previous)
 86                self.futures.append(future)
 87        return self
 88
 89    def progress(self):
 90        """Some basic stats about the action statuses status.
 91
 92        Returns
 93        -------
 94        dict
 95            A dictionary with four keys - 'ready', 'active' and 'finished', 'failed'.
 96            Values under "ready" correspond to `Actions` waiting for execution, "active"
 97            corresponds to the number of currently running tasks.
 98        """
 99        ready = 0
100        running = 0
101        done = 0
102        failed = 0
103        for future in self.futures:
104            if future.running():
105                running += 1
106            elif future.done():
107                if not future.result():
108                    failed += 1
109                else:
110                    done += 1
111            else:
112                ready += 1
113        return {'ready': ready, 'active': running, 'finished': done, 'failed': failed}
114
115    def add_collate_callback(self, fn):
116        """Adds a callback to be called after collation is done.
117
118        Parameters
119        ----------
120        fn - A callable that takes previous as it's only input.
121        """
122        self._collate_callback = fn
123
124    def collate(self, progress_bar=False):
125        """A command that will block until all Futures in the pool have finished.
126        It will also store the results gather from `Actions` in the database.
127
128        Parameters
129        ----------
130        progress_bar: bool
131           Whether to show progress bar
132        """
133        if not progress_bar:
134            def tqdm_(x, total=None): return x
135        else:
136            tqdm_ = tqdm
137        if isinstance(self.pool, Client):
138            self.results = self.pool.gather(self.futures)
139        if self.sequential or isinstance(self.pool, Client):
140            for result in tqdm_(self.results, total=len(self.results)):
141                result = self._collate_callback(result)
142                self.campaign.campaign_db.store_result(
143                    result['run_id'], result, change_status=result['collated'])
144        else:
145            if isinstance(self.pool, QCGPJPool):
146                as_completed_fn = self.pool.as_completed
147                self.add_collate_callback(self.pool.convert_results)
148            else:
149                as_completed_fn = concurrent.futures.as_completed
150
151            for future in tqdm_(as_completed_fn(self.futures), total=len(self.futures)):
152                result = self._collate_callback(future.result())
153                self.campaign.campaign_db.store_result(
154                    result['run_id'], result, change_status=result['collated'])
155        self.campaign.campaign_db.session.commit()
class ActionPool:
 38class ActionPool:
 39    """A class that handles the execution of Actions.
 40
 41    Parameters
 42    ----------
 43    campaign: Campaign
 44        An instance of an EasyVVUQ campaign.
 45    actions: Actions
 46        An instance of `Actions` containing things to be done as part of the simulation.
 47    inits: iterable
 48        Initial inputs to be passed to each `Actions` representing a sample. Will usually contain
 49        dictionaries with the following information: {'run_id': ..., 'campaign_dir': ...,
 50        'run_info': ...}.
 51    sequential: bool
 52        Will run the actions sequentially.
 53    """
 54
 55    def __init__(self, campaign, actions, inits, sequential=False):
 56        self.campaign = campaign
 57        self.actions = actions
 58        self.inits = inits
 59        self.sequential = sequential
 60        self.futures = []
 61        self.results = []
 62        self._collate_callback = lambda previous: previous
 63
 64    def start(self, pool=None):
 65        """Start the actions.
 66
 67        Parameters
 68        ----------
 69        pool: An Executor instance (e.g. ThreadPoolExecutor)
 70
 71        Returns
 72        -------
 73        ActionPool
 74            Starts execution and returns a reference to itself for tracking progress
 75            and for collation.
 76        """
 77        if pool is None:
 78            pool = ThreadPoolExecutor()
 79        self.pool = pool
 80        for previous in self.inits:
 81            previous = copy.copy(previous)
 82            if self.sequential:
 83                result = self.actions.start(previous)
 84                self.results.append(result)
 85            else:
 86                future = self.pool.submit(self.actions.start, previous)
 87                self.futures.append(future)
 88        return self
 89
 90    def progress(self):
 91        """Some basic stats about the action statuses status.
 92
 93        Returns
 94        -------
 95        dict
 96            A dictionary with four keys - 'ready', 'active' and 'finished', 'failed'.
 97            Values under "ready" correspond to `Actions` waiting for execution, "active"
 98            corresponds to the number of currently running tasks.
 99        """
100        ready = 0
101        running = 0
102        done = 0
103        failed = 0
104        for future in self.futures:
105            if future.running():
106                running += 1
107            elif future.done():
108                if not future.result():
109                    failed += 1
110                else:
111                    done += 1
112            else:
113                ready += 1
114        return {'ready': ready, 'active': running, 'finished': done, 'failed': failed}
115
116    def add_collate_callback(self, fn):
117        """Adds a callback to be called after collation is done.
118
119        Parameters
120        ----------
121        fn - A callable that takes previous as it's only input.
122        """
123        self._collate_callback = fn
124
125    def collate(self, progress_bar=False):
126        """A command that will block until all Futures in the pool have finished.
127        It will also store the results gather from `Actions` in the database.
128
129        Parameters
130        ----------
131        progress_bar: bool
132           Whether to show progress bar
133        """
134        if not progress_bar:
135            def tqdm_(x, total=None): return x
136        else:
137            tqdm_ = tqdm
138        if isinstance(self.pool, Client):
139            self.results = self.pool.gather(self.futures)
140        if self.sequential or isinstance(self.pool, Client):
141            for result in tqdm_(self.results, total=len(self.results)):
142                result = self._collate_callback(result)
143                self.campaign.campaign_db.store_result(
144                    result['run_id'], result, change_status=result['collated'])
145        else:
146            if isinstance(self.pool, QCGPJPool):
147                as_completed_fn = self.pool.as_completed
148                self.add_collate_callback(self.pool.convert_results)
149            else:
150                as_completed_fn = concurrent.futures.as_completed
151
152            for future in tqdm_(as_completed_fn(self.futures), total=len(self.futures)):
153                result = self._collate_callback(future.result())
154                self.campaign.campaign_db.store_result(
155                    result['run_id'], result, change_status=result['collated'])
156        self.campaign.campaign_db.session.commit()

A class that handles the execution of Actions.

Parameters
  • campaign (Campaign): An instance of an EasyVVUQ campaign.
  • actions (Actions): An instance of Actions containing things to be done as part of the simulation.
  • inits (iterable): Initial inputs to be passed to each Actions representing a sample. Will usually contain dictionaries with the following information: {'run_id': ..., 'campaign_dir': ..., 'run_info': ...}.
  • sequential (bool): Will run the actions sequentially.
ActionPool(campaign, actions, inits, sequential=False)
55    def __init__(self, campaign, actions, inits, sequential=False):
56        self.campaign = campaign
57        self.actions = actions
58        self.inits = inits
59        self.sequential = sequential
60        self.futures = []
61        self.results = []
62        self._collate_callback = lambda previous: previous
campaign
actions
inits
sequential
futures
results
def start(self, pool=None):
64    def start(self, pool=None):
65        """Start the actions.
66
67        Parameters
68        ----------
69        pool: An Executor instance (e.g. ThreadPoolExecutor)
70
71        Returns
72        -------
73        ActionPool
74            Starts execution and returns a reference to itself for tracking progress
75            and for collation.
76        """
77        if pool is None:
78            pool = ThreadPoolExecutor()
79        self.pool = pool
80        for previous in self.inits:
81            previous = copy.copy(previous)
82            if self.sequential:
83                result = self.actions.start(previous)
84                self.results.append(result)
85            else:
86                future = self.pool.submit(self.actions.start, previous)
87                self.futures.append(future)
88        return self

Start the actions.

Parameters
  • pool (An Executor instance (e.g. ThreadPoolExecutor)):
Returns
  • ActionPool: Starts execution and returns a reference to itself for tracking progress and for collation.
def progress(self):
 90    def progress(self):
 91        """Some basic stats about the action statuses status.
 92
 93        Returns
 94        -------
 95        dict
 96            A dictionary with four keys - 'ready', 'active' and 'finished', 'failed'.
 97            Values under "ready" correspond to `Actions` waiting for execution, "active"
 98            corresponds to the number of currently running tasks.
 99        """
100        ready = 0
101        running = 0
102        done = 0
103        failed = 0
104        for future in self.futures:
105            if future.running():
106                running += 1
107            elif future.done():
108                if not future.result():
109                    failed += 1
110                else:
111                    done += 1
112            else:
113                ready += 1
114        return {'ready': ready, 'active': running, 'finished': done, 'failed': failed}

Some basic stats about the action statuses status.

Returns
  • dict: A dictionary with four keys - 'ready', 'active' and 'finished', 'failed'. Values under "ready" correspond to Actions waiting for execution, "active" corresponds to the number of currently running tasks.
def add_collate_callback(self, fn):
116    def add_collate_callback(self, fn):
117        """Adds a callback to be called after collation is done.
118
119        Parameters
120        ----------
121        fn - A callable that takes previous as it's only input.
122        """
123        self._collate_callback = fn

Adds a callback to be called after collation is done.

Parameters
  • fn - A callable that takes previous as it's only input.
def collate(self, progress_bar=False):
125    def collate(self, progress_bar=False):
126        """A command that will block until all Futures in the pool have finished.
127        It will also store the results gather from `Actions` in the database.
128
129        Parameters
130        ----------
131        progress_bar: bool
132           Whether to show progress bar
133        """
134        if not progress_bar:
135            def tqdm_(x, total=None): return x
136        else:
137            tqdm_ = tqdm
138        if isinstance(self.pool, Client):
139            self.results = self.pool.gather(self.futures)
140        if self.sequential or isinstance(self.pool, Client):
141            for result in tqdm_(self.results, total=len(self.results)):
142                result = self._collate_callback(result)
143                self.campaign.campaign_db.store_result(
144                    result['run_id'], result, change_status=result['collated'])
145        else:
146            if isinstance(self.pool, QCGPJPool):
147                as_completed_fn = self.pool.as_completed
148                self.add_collate_callback(self.pool.convert_results)
149            else:
150                as_completed_fn = concurrent.futures.as_completed
151
152            for future in tqdm_(as_completed_fn(self.futures), total=len(self.futures)):
153                result = self._collate_callback(future.result())
154                self.campaign.campaign_db.store_result(
155                    result['run_id'], result, change_status=result['collated'])
156        self.campaign.campaign_db.session.commit()

A command that will block until all Futures in the pool have finished. It will also store the results gather from Actions in the database.

Parameters
  • progress_bar (bool): Whether to show progress bar