easyvvuq.actions.execute_qcgpj

  1import base64
  2import json
  3import logging
  4from os import environ
  5from os import remove
  6
  7import time
  8
  9import dill
 10from typing import Tuple, Dict, Any
 11
 12from concurrent.futures import Executor
 13
 14from qcg.pilotjob.executor_api.qcgpj_executor import QCGPJExecutor
 15from qcg.pilotjob.executor_api.templates.qcgpj_template import QCGPJTemplate
 16
 17
 18logger = logging.getLogger(__name__)
 19
 20
 21class EasyVVUQBasicTemplate(QCGPJTemplate):
 22    """A basic template class for submission of QCG-PilotJob tasks that run on a single core
 23
 24    The class can be used only for the most simple use-cases. For example it doesn't allow
 25    to specify resource requirements. Thus, for more advanced use-cases, it is recommended to provide custom
 26    implementation of QCGPJTemplate. For complete reference of QCG-PilotJob task's description parameters
 27    please look at https://qcg-pilotjob.readthedocs.io/en/latest/fileinterface.html#submit
 28    """
 29    @staticmethod
 30    def template() -> Tuple[str, Dict[str, Any]]:
 31        template = """
 32            {
 33                'name': '${name}',
 34                'execution': {
 35                    'exec': '${exec}',
 36                    'args': ${args},
 37                    'stdout': '${stdout}',
 38                    'stderr': '${stderr}',
 39                    'venv': '${venv}',
 40                    'modules': ${modules},
 41                    'env': ${env},
 42                    'model': '${model}',
 43                    'model_opts': ${model_opts}
 44                }
 45            }
 46             """
 47
 48        defaults = {
 49            'args': [],
 50            'stdout': 'stdout',
 51            'stderr': 'stderr',
 52            'venv': '',
 53            'modules': [],
 54            'env': {},
 55            'model': 'default',
 56            'model_opts': {}
 57        }
 58
 59        return template, defaults
 60
 61
 62class EasyVVUQParallelTemplate(QCGPJTemplate):
 63    """A template class for submission of QCG-PilotJob tasks that run on exact number cores / nodes
 64
 65    With this class it is possible to define basic resource requirements for tasks.
 66    For advanced use-cases, it is recommended to provide custom implementation of QCGPJTemplate.
 67    For complete reference of QCG-PilotJob task's description parameters
 68    please look at https://qcg-pilotjob.readthedocs.io/en/latest/fileinterface.html#submit
 69    """
 70    @staticmethod
 71    def template() -> Tuple[str, Dict[str, Any]]:
 72        template = """
 73            {
 74                'name': '${name}',
 75                'execution': {
 76                    'exec': '${exec}',
 77                    'args': ${args},
 78                    'stdout': '${stdout}',
 79                    'stderr': '${stderr}',
 80                    'venv': '${venv}',
 81                    'modules': ${modules},
 82                    'env': ${env},
 83                    'model': '${model}',
 84                    'model_opts': ${model_opts}
 85                },
 86                'resources': {
 87                    'numCores': {
 88                        'exact': ${numCores}
 89                    },
 90                    'numNodes': {
 91                        'exact': ${numNodes}
 92                    }
 93                }
 94            }
 95             """
 96
 97        defaults = {
 98            'args': [],
 99            'stdout': 'stdout',
100            'stderr': 'stderr',
101            'venv': '',
102            'modules': [],
103            'env': {},
104            'model': 'default',
105            'model_opts': {},
106            'numCores': 1,
107            'numNodes': 1
108        }
109
110        return template, defaults
111
112
113class QCGPJPool(Executor):
114    """A Pool that manages execution of actions with QCG-PilotJob.
115
116    Parameters
117    ----------
118    qcgpj_executor: str
119        An instance of QCGPJExecutor. If not provided, an instance of QCGPJExecutor
120        with default settings will be created
121    template: QCGPJTemplate
122        An object which contains only a single method `template` that returns a tuple.
123        The first element of a tuple should be a string representing a QCG-PilotJob task's description
124        with placeholders (identifiers preceded by $ symbol)
125        and the second a dictionary that assigns default values for selected placeholders.
126        If not provided, a default EasyVVUQBasicTemplate will be used
127    template_params: dict
128        A dictionary that contains parameters that will be used to substitute placeholders
129        defined in the template
130    polling_interval: int
131        An interval between queries to the QCG-PilotJob Manager service about state of the tasks, in seconds.
132    """
133
134    def __init__(
135            self,
136            qcgpj_executor=None,
137            template=None,
138            template_params=None,
139            polling_interval=1):
140        if qcgpj_executor is None:
141            qcgpj_executor = QCGPJExecutor()
142        if template is None:
143            template = EasyVVUQBasicTemplate()
144
145        self._qcgpj_executor = qcgpj_executor
146        self._template = template
147        self._template_params = template_params
148        self._polling_interval = polling_interval
149        self._campaign_dir = None
150
151    def submit(self, fn, *args, **kwargs):
152        """Submits a callable to be executed by QCG-PilotJob.
153
154        Schedules the callable to be executed inside a QCG-PilotJob's task and returns
155        a Future representing the execution of the callable.
156
157        Returns
158        -------
159        QCGPJFuture
160            QCGPJFuture representing the given call.
161        """
162        actions = fn.__self__
163        actions.set_wrapper(QCGPJPool._wrapper)
164        exec = 'python3'
165        arg1 = "-m"
166        arg2 = "easyvvuq.actions.execute_qcgpj_task"
167
168        self._campaign_dir = args[0]['campaign_dir']
169
170        pickled_actions = base64.b64encode(dill.dumps(actions)).decode('ascii')
171        pickled_previous = base64.b64encode(dill.dumps(args[0])).decode('ascii')
172
173        actions_file = f'{self._campaign_dir}/.qcgpj_in_act_{args[0]["run_id"]}'
174        previous_file = f'{self._campaign_dir}/.qcgpj_in_prev_{args[0]["run_id"]}'
175
176        with open(actions_file, 'w') as f:
177            f.write(pickled_actions)
178
179        with open(previous_file, 'w') as f:
180            f.write(pickled_previous)
181
182        return self._qcgpj_executor.submit(
183            self._template.template,
184            self._template_params,
185            exec=exec,
186            name=args[0]['run_id'],
187            stdout=f"{self._campaign_dir}/stdout_{args[0]['run_id']}",
188            stderr=f"{self._campaign_dir}/stderr_{args[0]['run_id']}",
189            args=[arg1, arg2, actions_file, previous_file])
190
191    @property
192    def executor(self):
193        """Returns current QCGPJExecutor instance.
194
195        It gives you an access to QCG-PilotJob Manager instance, which in turn can be used to
196        get information about the QCG-PilotJob execution environment.
197        """
198        return self._qcgpj_executor
199
200    def convert_results(self, result_qcgpj):
201        """Converts results generated by QCG-PilotJob task to EasyVVUQ-compatible form
202
203        The method loads results data from a file where it was stored by QCG-PilotJob's task
204        and then converts it to a dictionary which can be further processed by EasyVVUQ.
205
206        Parameters
207        ----------
208        result_qcgpj: list or None
209            A list of results returned by a QCG-PilotJob task (only the first element will be used),
210            or None if the task hasn't finished with the status SUCCEED
211
212        Returns
213        -------
214        dict
215            A dictionary containing results
216        """
217
218        for key, value in result_qcgpj.items():
219            if value != 'SUCCEED':
220                logging.error(f"Task {key} finished with the status: {value}")
221                raise RuntimeError(f"QCG-PilotJob task {key} finished with the status: {value}")
222
223            result_file = f'{self._campaign_dir}/.qcgpj_result_{key}'
224            with open(result_file, 'r') as f:
225                previous = json.load(f)
226            remove(result_file)
227
228            return previous
229
230    def shutdown(self, **kwargs):
231        """Clean-up the resources associated with the QCGPJPool.
232        """
233        return self._qcgpj_executor.shutdown()
234
235    def as_completed(self, futures):
236        """Checks for the status of features and yields those that are finished
237        """
238
239        pending = set(futures)
240        finished = set()
241
242        for f in futures:
243            if f.done():
244                finished.add(f)
245        pending = pending - finished
246
247        while finished:
248            yield finished.pop()
249
250        while pending:
251            for f in pending:
252                if f.done():
253                    finished.add(f)
254            pending = pending - finished
255
256            while finished:
257                yield finished.pop()
258
259            time.sleep(self._polling_interval)
260
261    @staticmethod
262    def _wrapper(action, previous):
263        # TODO: Implement support for specialised execution models of QCG-PilotJob
264        # """For the actions other than ExecuteQCGPJ ensures that the code is invoked only once
265        # """
266        # if not isinstance(action, ExecuteQCGPJ):
267        #     rank = 0
268        #     if 'OMPI_COMM_WORLD_RANK' in environ:
269        #         rank = environ.get('OMPI_COMM_WORLD_RANK')
270        #     elif 'PMI_RANK' in environ:
271        #         rank = environ.get('PMI_RANK')
272        #
273        #     if rank != 0:
274        #         return
275        return action.start(previous)
276
277
278class ExecuteQCGPJ:
279    """A utility decorator over action that marks the action as configured for parallel execution by QCG-PilotJob
280    Currently it has no influence on the processing.
281
282    Parameters
283    ----------
284    action: Action
285        an action that will be decorated in order to enable parallel execution inside a QCG-PilotJob task.
286    """
287
288    def __init__(self, action):
289        self._action = action
290
291    def start(self, previous=None):
292        return self._action.start(previous)
293
294    def finished(self):
295        return self._action.finished()
296
297    def finalise(self):
298        return self._action.finalise()
299
300    def succeeded(self):
301        return self._action.succeeded()
logger = <Logger easyvvuq.actions.execute_qcgpj (DEBUG)>
class EasyVVUQBasicTemplate(qcg.pilotjob.executor_api.templates.qcgpj_template.QCGPJTemplate):
22class EasyVVUQBasicTemplate(QCGPJTemplate):
23    """A basic template class for submission of QCG-PilotJob tasks that run on a single core
24
25    The class can be used only for the most simple use-cases. For example it doesn't allow
26    to specify resource requirements. Thus, for more advanced use-cases, it is recommended to provide custom
27    implementation of QCGPJTemplate. For complete reference of QCG-PilotJob task's description parameters
28    please look at https://qcg-pilotjob.readthedocs.io/en/latest/fileinterface.html#submit
29    """
30    @staticmethod
31    def template() -> Tuple[str, Dict[str, Any]]:
32        template = """
33            {
34                'name': '${name}',
35                'execution': {
36                    'exec': '${exec}',
37                    'args': ${args},
38                    'stdout': '${stdout}',
39                    'stderr': '${stderr}',
40                    'venv': '${venv}',
41                    'modules': ${modules},
42                    'env': ${env},
43                    'model': '${model}',
44                    'model_opts': ${model_opts}
45                }
46            }
47             """
48
49        defaults = {
50            'args': [],
51            'stdout': 'stdout',
52            'stderr': 'stderr',
53            'venv': '',
54            'modules': [],
55            'env': {},
56            'model': 'default',
57            'model_opts': {}
58        }
59
60        return template, defaults

A basic template class for submission of QCG-PilotJob tasks that run on a single core

The class can be used only for the most simple use-cases. For example it doesn't allow to specify resource requirements. Thus, for more advanced use-cases, it is recommended to provide custom implementation of QCGPJTemplate. For complete reference of QCG-PilotJob task's description parameters please look at https://qcg-pilotjob.readthedocs.io/en/latest/fileinterface.html#submit

@staticmethod
def template() -> Tuple[str, Dict[str, Any]]:
30    @staticmethod
31    def template() -> Tuple[str, Dict[str, Any]]:
32        template = """
33            {
34                'name': '${name}',
35                'execution': {
36                    'exec': '${exec}',
37                    'args': ${args},
38                    'stdout': '${stdout}',
39                    'stderr': '${stderr}',
40                    'venv': '${venv}',
41                    'modules': ${modules},
42                    'env': ${env},
43                    'model': '${model}',
44                    'model_opts': ${model_opts}
45                }
46            }
47             """
48
49        defaults = {
50            'args': [],
51            'stdout': 'stdout',
52            'stderr': 'stderr',
53            'venv': '',
54            'modules': [],
55            'env': {},
56            'model': 'default',
57            'model_opts': {}
58        }
59
60        return template, defaults
class EasyVVUQParallelTemplate(qcg.pilotjob.executor_api.templates.qcgpj_template.QCGPJTemplate):
 63class EasyVVUQParallelTemplate(QCGPJTemplate):
 64    """A template class for submission of QCG-PilotJob tasks that run on exact number cores / nodes
 65
 66    With this class it is possible to define basic resource requirements for tasks.
 67    For advanced use-cases, it is recommended to provide custom implementation of QCGPJTemplate.
 68    For complete reference of QCG-PilotJob task's description parameters
 69    please look at https://qcg-pilotjob.readthedocs.io/en/latest/fileinterface.html#submit
 70    """
 71    @staticmethod
 72    def template() -> Tuple[str, Dict[str, Any]]:
 73        template = """
 74            {
 75                'name': '${name}',
 76                'execution': {
 77                    'exec': '${exec}',
 78                    'args': ${args},
 79                    'stdout': '${stdout}',
 80                    'stderr': '${stderr}',
 81                    'venv': '${venv}',
 82                    'modules': ${modules},
 83                    'env': ${env},
 84                    'model': '${model}',
 85                    'model_opts': ${model_opts}
 86                },
 87                'resources': {
 88                    'numCores': {
 89                        'exact': ${numCores}
 90                    },
 91                    'numNodes': {
 92                        'exact': ${numNodes}
 93                    }
 94                }
 95            }
 96             """
 97
 98        defaults = {
 99            'args': [],
100            'stdout': 'stdout',
101            'stderr': 'stderr',
102            'venv': '',
103            'modules': [],
104            'env': {},
105            'model': 'default',
106            'model_opts': {},
107            'numCores': 1,
108            'numNodes': 1
109        }
110
111        return template, defaults

A template class for submission of QCG-PilotJob tasks that run on exact number cores / nodes

With this class it is possible to define basic resource requirements for tasks. For advanced use-cases, it is recommended to provide custom implementation of QCGPJTemplate. For complete reference of QCG-PilotJob task's description parameters please look at https://qcg-pilotjob.readthedocs.io/en/latest/fileinterface.html#submit

@staticmethod
def template() -> Tuple[str, Dict[str, Any]]:
 71    @staticmethod
 72    def template() -> Tuple[str, Dict[str, Any]]:
 73        template = """
 74            {
 75                'name': '${name}',
 76                'execution': {
 77                    'exec': '${exec}',
 78                    'args': ${args},
 79                    'stdout': '${stdout}',
 80                    'stderr': '${stderr}',
 81                    'venv': '${venv}',
 82                    'modules': ${modules},
 83                    'env': ${env},
 84                    'model': '${model}',
 85                    'model_opts': ${model_opts}
 86                },
 87                'resources': {
 88                    'numCores': {
 89                        'exact': ${numCores}
 90                    },
 91                    'numNodes': {
 92                        'exact': ${numNodes}
 93                    }
 94                }
 95            }
 96             """
 97
 98        defaults = {
 99            'args': [],
100            'stdout': 'stdout',
101            'stderr': 'stderr',
102            'venv': '',
103            'modules': [],
104            'env': {},
105            'model': 'default',
106            'model_opts': {},
107            'numCores': 1,
108            'numNodes': 1
109        }
110
111        return template, defaults
class QCGPJPool(concurrent.futures._base.Executor):
114class QCGPJPool(Executor):
115    """A Pool that manages execution of actions with QCG-PilotJob.
116
117    Parameters
118    ----------
119    qcgpj_executor: str
120        An instance of QCGPJExecutor. If not provided, an instance of QCGPJExecutor
121        with default settings will be created
122    template: QCGPJTemplate
123        An object which contains only a single method `template` that returns a tuple.
124        The first element of a tuple should be a string representing a QCG-PilotJob task's description
125        with placeholders (identifiers preceded by $ symbol)
126        and the second a dictionary that assigns default values for selected placeholders.
127        If not provided, a default EasyVVUQBasicTemplate will be used
128    template_params: dict
129        A dictionary that contains parameters that will be used to substitute placeholders
130        defined in the template
131    polling_interval: int
132        An interval between queries to the QCG-PilotJob Manager service about state of the tasks, in seconds.
133    """
134
135    def __init__(
136            self,
137            qcgpj_executor=None,
138            template=None,
139            template_params=None,
140            polling_interval=1):
141        if qcgpj_executor is None:
142            qcgpj_executor = QCGPJExecutor()
143        if template is None:
144            template = EasyVVUQBasicTemplate()
145
146        self._qcgpj_executor = qcgpj_executor
147        self._template = template
148        self._template_params = template_params
149        self._polling_interval = polling_interval
150        self._campaign_dir = None
151
152    def submit(self, fn, *args, **kwargs):
153        """Submits a callable to be executed by QCG-PilotJob.
154
155        Schedules the callable to be executed inside a QCG-PilotJob's task and returns
156        a Future representing the execution of the callable.
157
158        Returns
159        -------
160        QCGPJFuture
161            QCGPJFuture representing the given call.
162        """
163        actions = fn.__self__
164        actions.set_wrapper(QCGPJPool._wrapper)
165        exec = 'python3'
166        arg1 = "-m"
167        arg2 = "easyvvuq.actions.execute_qcgpj_task"
168
169        self._campaign_dir = args[0]['campaign_dir']
170
171        pickled_actions = base64.b64encode(dill.dumps(actions)).decode('ascii')
172        pickled_previous = base64.b64encode(dill.dumps(args[0])).decode('ascii')
173
174        actions_file = f'{self._campaign_dir}/.qcgpj_in_act_{args[0]["run_id"]}'
175        previous_file = f'{self._campaign_dir}/.qcgpj_in_prev_{args[0]["run_id"]}'
176
177        with open(actions_file, 'w') as f:
178            f.write(pickled_actions)
179
180        with open(previous_file, 'w') as f:
181            f.write(pickled_previous)
182
183        return self._qcgpj_executor.submit(
184            self._template.template,
185            self._template_params,
186            exec=exec,
187            name=args[0]['run_id'],
188            stdout=f"{self._campaign_dir}/stdout_{args[0]['run_id']}",
189            stderr=f"{self._campaign_dir}/stderr_{args[0]['run_id']}",
190            args=[arg1, arg2, actions_file, previous_file])
191
192    @property
193    def executor(self):
194        """Returns current QCGPJExecutor instance.
195
196        It gives you an access to QCG-PilotJob Manager instance, which in turn can be used to
197        get information about the QCG-PilotJob execution environment.
198        """
199        return self._qcgpj_executor
200
201    def convert_results(self, result_qcgpj):
202        """Converts results generated by QCG-PilotJob task to EasyVVUQ-compatible form
203
204        The method loads results data from a file where it was stored by QCG-PilotJob's task
205        and then converts it to a dictionary which can be further processed by EasyVVUQ.
206
207        Parameters
208        ----------
209        result_qcgpj: list or None
210            A list of results returned by a QCG-PilotJob task (only the first element will be used),
211            or None if the task hasn't finished with the status SUCCEED
212
213        Returns
214        -------
215        dict
216            A dictionary containing results
217        """
218
219        for key, value in result_qcgpj.items():
220            if value != 'SUCCEED':
221                logging.error(f"Task {key} finished with the status: {value}")
222                raise RuntimeError(f"QCG-PilotJob task {key} finished with the status: {value}")
223
224            result_file = f'{self._campaign_dir}/.qcgpj_result_{key}'
225            with open(result_file, 'r') as f:
226                previous = json.load(f)
227            remove(result_file)
228
229            return previous
230
231    def shutdown(self, **kwargs):
232        """Clean-up the resources associated with the QCGPJPool.
233        """
234        return self._qcgpj_executor.shutdown()
235
236    def as_completed(self, futures):
237        """Checks for the status of features and yields those that are finished
238        """
239
240        pending = set(futures)
241        finished = set()
242
243        for f in futures:
244            if f.done():
245                finished.add(f)
246        pending = pending - finished
247
248        while finished:
249            yield finished.pop()
250
251        while pending:
252            for f in pending:
253                if f.done():
254                    finished.add(f)
255            pending = pending - finished
256
257            while finished:
258                yield finished.pop()
259
260            time.sleep(self._polling_interval)
261
262    @staticmethod
263    def _wrapper(action, previous):
264        # TODO: Implement support for specialised execution models of QCG-PilotJob
265        # """For the actions other than ExecuteQCGPJ ensures that the code is invoked only once
266        # """
267        # if not isinstance(action, ExecuteQCGPJ):
268        #     rank = 0
269        #     if 'OMPI_COMM_WORLD_RANK' in environ:
270        #         rank = environ.get('OMPI_COMM_WORLD_RANK')
271        #     elif 'PMI_RANK' in environ:
272        #         rank = environ.get('PMI_RANK')
273        #
274        #     if rank != 0:
275        #         return
276        return action.start(previous)

A Pool that manages execution of actions with QCG-PilotJob.

Parameters
  • qcgpj_executor (str): An instance of QCGPJExecutor. If not provided, an instance of QCGPJExecutor with default settings will be created
  • template (QCGPJTemplate): An object which contains only a single method template that returns a tuple. The first element of a tuple should be a string representing a QCG-PilotJob task's description with placeholders (identifiers preceded by $ symbol) and the second a dictionary that assigns default values for selected placeholders. If not provided, a default EasyVVUQBasicTemplate will be used
  • template_params (dict): A dictionary that contains parameters that will be used to substitute placeholders defined in the template
  • polling_interval (int): An interval between queries to the QCG-PilotJob Manager service about state of the tasks, in seconds.
QCGPJPool( qcgpj_executor=None, template=None, template_params=None, polling_interval=1)
135    def __init__(
136            self,
137            qcgpj_executor=None,
138            template=None,
139            template_params=None,
140            polling_interval=1):
141        if qcgpj_executor is None:
142            qcgpj_executor = QCGPJExecutor()
143        if template is None:
144            template = EasyVVUQBasicTemplate()
145
146        self._qcgpj_executor = qcgpj_executor
147        self._template = template
148        self._template_params = template_params
149        self._polling_interval = polling_interval
150        self._campaign_dir = None
def submit(self, fn, *args, **kwargs):
152    def submit(self, fn, *args, **kwargs):
153        """Submits a callable to be executed by QCG-PilotJob.
154
155        Schedules the callable to be executed inside a QCG-PilotJob's task and returns
156        a Future representing the execution of the callable.
157
158        Returns
159        -------
160        QCGPJFuture
161            QCGPJFuture representing the given call.
162        """
163        actions = fn.__self__
164        actions.set_wrapper(QCGPJPool._wrapper)
165        exec = 'python3'
166        arg1 = "-m"
167        arg2 = "easyvvuq.actions.execute_qcgpj_task"
168
169        self._campaign_dir = args[0]['campaign_dir']
170
171        pickled_actions = base64.b64encode(dill.dumps(actions)).decode('ascii')
172        pickled_previous = base64.b64encode(dill.dumps(args[0])).decode('ascii')
173
174        actions_file = f'{self._campaign_dir}/.qcgpj_in_act_{args[0]["run_id"]}'
175        previous_file = f'{self._campaign_dir}/.qcgpj_in_prev_{args[0]["run_id"]}'
176
177        with open(actions_file, 'w') as f:
178            f.write(pickled_actions)
179
180        with open(previous_file, 'w') as f:
181            f.write(pickled_previous)
182
183        return self._qcgpj_executor.submit(
184            self._template.template,
185            self._template_params,
186            exec=exec,
187            name=args[0]['run_id'],
188            stdout=f"{self._campaign_dir}/stdout_{args[0]['run_id']}",
189            stderr=f"{self._campaign_dir}/stderr_{args[0]['run_id']}",
190            args=[arg1, arg2, actions_file, previous_file])

Submits a callable to be executed by QCG-PilotJob.

Schedules the callable to be executed inside a QCG-PilotJob's task and returns a Future representing the execution of the callable.

Returns
  • QCGPJFuture: QCGPJFuture representing the given call.
executor
192    @property
193    def executor(self):
194        """Returns current QCGPJExecutor instance.
195
196        It gives you an access to QCG-PilotJob Manager instance, which in turn can be used to
197        get information about the QCG-PilotJob execution environment.
198        """
199        return self._qcgpj_executor

Returns current QCGPJExecutor instance.

It gives you an access to QCG-PilotJob Manager instance, which in turn can be used to get information about the QCG-PilotJob execution environment.

def convert_results(self, result_qcgpj):
201    def convert_results(self, result_qcgpj):
202        """Converts results generated by QCG-PilotJob task to EasyVVUQ-compatible form
203
204        The method loads results data from a file where it was stored by QCG-PilotJob's task
205        and then converts it to a dictionary which can be further processed by EasyVVUQ.
206
207        Parameters
208        ----------
209        result_qcgpj: list or None
210            A list of results returned by a QCG-PilotJob task (only the first element will be used),
211            or None if the task hasn't finished with the status SUCCEED
212
213        Returns
214        -------
215        dict
216            A dictionary containing results
217        """
218
219        for key, value in result_qcgpj.items():
220            if value != 'SUCCEED':
221                logging.error(f"Task {key} finished with the status: {value}")
222                raise RuntimeError(f"QCG-PilotJob task {key} finished with the status: {value}")
223
224            result_file = f'{self._campaign_dir}/.qcgpj_result_{key}'
225            with open(result_file, 'r') as f:
226                previous = json.load(f)
227            remove(result_file)
228
229            return previous

Converts results generated by QCG-PilotJob task to EasyVVUQ-compatible form

The method loads results data from a file where it was stored by QCG-PilotJob's task and then converts it to a dictionary which can be further processed by EasyVVUQ.

Parameters
  • result_qcgpj (list or None): A list of results returned by a QCG-PilotJob task (only the first element will be used), or None if the task hasn't finished with the status SUCCEED
Returns
  • dict: A dictionary containing results
def shutdown(self, **kwargs):
231    def shutdown(self, **kwargs):
232        """Clean-up the resources associated with the QCGPJPool.
233        """
234        return self._qcgpj_executor.shutdown()

Clean-up the resources associated with the QCGPJPool.

def as_completed(self, futures):
236    def as_completed(self, futures):
237        """Checks for the status of features and yields those that are finished
238        """
239
240        pending = set(futures)
241        finished = set()
242
243        for f in futures:
244            if f.done():
245                finished.add(f)
246        pending = pending - finished
247
248        while finished:
249            yield finished.pop()
250
251        while pending:
252            for f in pending:
253                if f.done():
254                    finished.add(f)
255            pending = pending - finished
256
257            while finished:
258                yield finished.pop()
259
260            time.sleep(self._polling_interval)

Checks for the status of features and yields those that are finished

class ExecuteQCGPJ:
279class ExecuteQCGPJ:
280    """A utility decorator over action that marks the action as configured for parallel execution by QCG-PilotJob
281    Currently it has no influence on the processing.
282
283    Parameters
284    ----------
285    action: Action
286        an action that will be decorated in order to enable parallel execution inside a QCG-PilotJob task.
287    """
288
289    def __init__(self, action):
290        self._action = action
291
292    def start(self, previous=None):
293        return self._action.start(previous)
294
295    def finished(self):
296        return self._action.finished()
297
298    def finalise(self):
299        return self._action.finalise()
300
301    def succeeded(self):
302        return self._action.succeeded()

A utility decorator over action that marks the action as configured for parallel execution by QCG-PilotJob Currently it has no influence on the processing.

Parameters
  • action (Action): an action that will be decorated in order to enable parallel execution inside a QCG-PilotJob task.
ExecuteQCGPJ(action)
289    def __init__(self, action):
290        self._action = action
def start(self, previous=None):
292    def start(self, previous=None):
293        return self._action.start(previous)
def finished(self):
295    def finished(self):
296        return self._action.finished()
def finalise(self):
298    def finalise(self):
299        return self._action.finalise()
def succeeded(self):
301    def succeeded(self):
302        return self._action.succeeded()