easyvvuq.actions.execute_qcgpj
1import base64 2import json 3import logging 4from os import environ 5from os import remove 6 7import time 8 9import dill 10from typing import Tuple, Dict, Any 11 12from concurrent.futures import Executor 13 14from qcg.pilotjob.executor_api.qcgpj_executor import QCGPJExecutor 15from qcg.pilotjob.executor_api.templates.qcgpj_template import QCGPJTemplate 16 17 18logger = logging.getLogger(__name__) 19 20 21class EasyVVUQBasicTemplate(QCGPJTemplate): 22 """A basic template class for submission of QCG-PilotJob tasks that run on a single core 23 24 The class can be used only for the most simple use-cases. For example it doesn't allow 25 to specify resource requirements. Thus, for more advanced use-cases, it is recommended to provide custom 26 implementation of QCGPJTemplate. For complete reference of QCG-PilotJob task's description parameters 27 please look at https://qcg-pilotjob.readthedocs.io/en/latest/fileinterface.html#submit 28 """ 29 @staticmethod 30 def template() -> Tuple[str, Dict[str, Any]]: 31 template = """ 32 { 33 'name': '${name}', 34 'execution': { 35 'exec': '${exec}', 36 'args': ${args}, 37 'stdout': '${stdout}', 38 'stderr': '${stderr}', 39 'venv': '${venv}', 40 'modules': ${modules}, 41 'env': ${env}, 42 'model': '${model}', 43 'model_opts': ${model_opts} 44 } 45 } 46 """ 47 48 defaults = { 49 'args': [], 50 'stdout': 'stdout', 51 'stderr': 'stderr', 52 'venv': '', 53 'modules': [], 54 'env': {}, 55 'model': 'default', 56 'model_opts': {} 57 } 58 59 return template, defaults 60 61 62class EasyVVUQParallelTemplate(QCGPJTemplate): 63 """A template class for submission of QCG-PilotJob tasks that run on exact number cores / nodes 64 65 With this class it is possible to define basic resource requirements for tasks. 66 For advanced use-cases, it is recommended to provide custom implementation of QCGPJTemplate. 67 For complete reference of QCG-PilotJob task's description parameters 68 please look at https://qcg-pilotjob.readthedocs.io/en/latest/fileinterface.html#submit 69 """ 70 @staticmethod 71 def template() -> Tuple[str, Dict[str, Any]]: 72 template = """ 73 { 74 'name': '${name}', 75 'execution': { 76 'exec': '${exec}', 77 'args': ${args}, 78 'stdout': '${stdout}', 79 'stderr': '${stderr}', 80 'venv': '${venv}', 81 'modules': ${modules}, 82 'env': ${env}, 83 'model': '${model}', 84 'model_opts': ${model_opts} 85 }, 86 'resources': { 87 'numCores': { 88 'exact': ${numCores} 89 }, 90 'numNodes': { 91 'exact': ${numNodes} 92 } 93 } 94 } 95 """ 96 97 defaults = { 98 'args': [], 99 'stdout': 'stdout', 100 'stderr': 'stderr', 101 'venv': '', 102 'modules': [], 103 'env': {}, 104 'model': 'default', 105 'model_opts': {}, 106 'numCores': 1, 107 'numNodes': 1 108 } 109 110 return template, defaults 111 112 113class QCGPJPool(Executor): 114 """A Pool that manages execution of actions with QCG-PilotJob. 115 116 Parameters 117 ---------- 118 qcgpj_executor: str 119 An instance of QCGPJExecutor. If not provided, an instance of QCGPJExecutor 120 with default settings will be created 121 template: QCGPJTemplate 122 An object which contains only a single method `template` that returns a tuple. 123 The first element of a tuple should be a string representing a QCG-PilotJob task's description 124 with placeholders (identifiers preceded by $ symbol) 125 and the second a dictionary that assigns default values for selected placeholders. 126 If not provided, a default EasyVVUQBasicTemplate will be used 127 template_params: dict 128 A dictionary that contains parameters that will be used to substitute placeholders 129 defined in the template 130 polling_interval: int 131 An interval between queries to the QCG-PilotJob Manager service about state of the tasks, in seconds. 132 """ 133 134 def __init__( 135 self, 136 qcgpj_executor=None, 137 template=None, 138 template_params=None, 139 polling_interval=1): 140 if qcgpj_executor is None: 141 qcgpj_executor = QCGPJExecutor() 142 if template is None: 143 template = EasyVVUQBasicTemplate() 144 145 self._qcgpj_executor = qcgpj_executor 146 self._template = template 147 self._template_params = template_params 148 self._polling_interval = polling_interval 149 self._campaign_dir = None 150 151 def submit(self, fn, *args, **kwargs): 152 """Submits a callable to be executed by QCG-PilotJob. 153 154 Schedules the callable to be executed inside a QCG-PilotJob's task and returns 155 a Future representing the execution of the callable. 156 157 Returns 158 ------- 159 QCGPJFuture 160 QCGPJFuture representing the given call. 161 """ 162 actions = fn.__self__ 163 actions.set_wrapper(QCGPJPool._wrapper) 164 exec = 'python3' 165 arg1 = "-m" 166 arg2 = "easyvvuq.actions.execute_qcgpj_task" 167 168 self._campaign_dir = args[0]['campaign_dir'] 169 170 pickled_actions = base64.b64encode(dill.dumps(actions)).decode('ascii') 171 pickled_previous = base64.b64encode(dill.dumps(args[0])).decode('ascii') 172 173 actions_file = f'{self._campaign_dir}/.qcgpj_in_act_{args[0]["run_id"]}' 174 previous_file = f'{self._campaign_dir}/.qcgpj_in_prev_{args[0]["run_id"]}' 175 176 with open(actions_file, 'w') as f: 177 f.write(pickled_actions) 178 179 with open(previous_file, 'w') as f: 180 f.write(pickled_previous) 181 182 return self._qcgpj_executor.submit( 183 self._template.template, 184 self._template_params, 185 exec=exec, 186 name=args[0]['run_id'], 187 stdout=f"{self._campaign_dir}/stdout_{args[0]['run_id']}", 188 stderr=f"{self._campaign_dir}/stderr_{args[0]['run_id']}", 189 args=[arg1, arg2, actions_file, previous_file]) 190 191 @property 192 def executor(self): 193 """Returns current QCGPJExecutor instance. 194 195 It gives you an access to QCG-PilotJob Manager instance, which in turn can be used to 196 get information about the QCG-PilotJob execution environment. 197 """ 198 return self._qcgpj_executor 199 200 def convert_results(self, result_qcgpj): 201 """Converts results generated by QCG-PilotJob task to EasyVVUQ-compatible form 202 203 The method loads results data from a file where it was stored by QCG-PilotJob's task 204 and then converts it to a dictionary which can be further processed by EasyVVUQ. 205 206 Parameters 207 ---------- 208 result_qcgpj: list or None 209 A list of results returned by a QCG-PilotJob task (only the first element will be used), 210 or None if the task hasn't finished with the status SUCCEED 211 212 Returns 213 ------- 214 dict 215 A dictionary containing results 216 """ 217 218 for key, value in result_qcgpj.items(): 219 if value != 'SUCCEED': 220 logging.error(f"Task {key} finished with the status: {value}") 221 raise RuntimeError(f"QCG-PilotJob task {key} finished with the status: {value}") 222 223 result_file = f'{self._campaign_dir}/.qcgpj_result_{key}' 224 with open(result_file, 'r') as f: 225 previous = json.load(f) 226 remove(result_file) 227 228 return previous 229 230 def shutdown(self, **kwargs): 231 """Clean-up the resources associated with the QCGPJPool. 232 """ 233 return self._qcgpj_executor.shutdown() 234 235 def as_completed(self, futures): 236 """Checks for the status of features and yields those that are finished 237 """ 238 239 pending = set(futures) 240 finished = set() 241 242 for f in futures: 243 if f.done(): 244 finished.add(f) 245 pending = pending - finished 246 247 while finished: 248 yield finished.pop() 249 250 while pending: 251 for f in pending: 252 if f.done(): 253 finished.add(f) 254 pending = pending - finished 255 256 while finished: 257 yield finished.pop() 258 259 time.sleep(self._polling_interval) 260 261 @staticmethod 262 def _wrapper(action, previous): 263 # TODO: Implement support for specialised execution models of QCG-PilotJob 264 # """For the actions other than ExecuteQCGPJ ensures that the code is invoked only once 265 # """ 266 # if not isinstance(action, ExecuteQCGPJ): 267 # rank = 0 268 # if 'OMPI_COMM_WORLD_RANK' in environ: 269 # rank = environ.get('OMPI_COMM_WORLD_RANK') 270 # elif 'PMI_RANK' in environ: 271 # rank = environ.get('PMI_RANK') 272 # 273 # if rank != 0: 274 # return 275 return action.start(previous) 276 277 278class ExecuteQCGPJ: 279 """A utility decorator over action that marks the action as configured for parallel execution by QCG-PilotJob 280 Currently it has no influence on the processing. 281 282 Parameters 283 ---------- 284 action: Action 285 an action that will be decorated in order to enable parallel execution inside a QCG-PilotJob task. 286 """ 287 288 def __init__(self, action): 289 self._action = action 290 291 def start(self, previous=None): 292 return self._action.start(previous) 293 294 def finished(self): 295 return self._action.finished() 296 297 def finalise(self): 298 return self._action.finalise() 299 300 def succeeded(self): 301 return self._action.succeeded()
22class EasyVVUQBasicTemplate(QCGPJTemplate): 23 """A basic template class for submission of QCG-PilotJob tasks that run on a single core 24 25 The class can be used only for the most simple use-cases. For example it doesn't allow 26 to specify resource requirements. Thus, for more advanced use-cases, it is recommended to provide custom 27 implementation of QCGPJTemplate. For complete reference of QCG-PilotJob task's description parameters 28 please look at https://qcg-pilotjob.readthedocs.io/en/latest/fileinterface.html#submit 29 """ 30 @staticmethod 31 def template() -> Tuple[str, Dict[str, Any]]: 32 template = """ 33 { 34 'name': '${name}', 35 'execution': { 36 'exec': '${exec}', 37 'args': ${args}, 38 'stdout': '${stdout}', 39 'stderr': '${stderr}', 40 'venv': '${venv}', 41 'modules': ${modules}, 42 'env': ${env}, 43 'model': '${model}', 44 'model_opts': ${model_opts} 45 } 46 } 47 """ 48 49 defaults = { 50 'args': [], 51 'stdout': 'stdout', 52 'stderr': 'stderr', 53 'venv': '', 54 'modules': [], 55 'env': {}, 56 'model': 'default', 57 'model_opts': {} 58 } 59 60 return template, defaults
A basic template class for submission of QCG-PilotJob tasks that run on a single core
The class can be used only for the most simple use-cases. For example it doesn't allow to specify resource requirements. Thus, for more advanced use-cases, it is recommended to provide custom implementation of QCGPJTemplate. For complete reference of QCG-PilotJob task's description parameters please look at https://qcg-pilotjob.readthedocs.io/en/latest/fileinterface.html#submit
30 @staticmethod 31 def template() -> Tuple[str, Dict[str, Any]]: 32 template = """ 33 { 34 'name': '${name}', 35 'execution': { 36 'exec': '${exec}', 37 'args': ${args}, 38 'stdout': '${stdout}', 39 'stderr': '${stderr}', 40 'venv': '${venv}', 41 'modules': ${modules}, 42 'env': ${env}, 43 'model': '${model}', 44 'model_opts': ${model_opts} 45 } 46 } 47 """ 48 49 defaults = { 50 'args': [], 51 'stdout': 'stdout', 52 'stderr': 'stderr', 53 'venv': '', 54 'modules': [], 55 'env': {}, 56 'model': 'default', 57 'model_opts': {} 58 } 59 60 return template, defaults
63class EasyVVUQParallelTemplate(QCGPJTemplate): 64 """A template class for submission of QCG-PilotJob tasks that run on exact number cores / nodes 65 66 With this class it is possible to define basic resource requirements for tasks. 67 For advanced use-cases, it is recommended to provide custom implementation of QCGPJTemplate. 68 For complete reference of QCG-PilotJob task's description parameters 69 please look at https://qcg-pilotjob.readthedocs.io/en/latest/fileinterface.html#submit 70 """ 71 @staticmethod 72 def template() -> Tuple[str, Dict[str, Any]]: 73 template = """ 74 { 75 'name': '${name}', 76 'execution': { 77 'exec': '${exec}', 78 'args': ${args}, 79 'stdout': '${stdout}', 80 'stderr': '${stderr}', 81 'venv': '${venv}', 82 'modules': ${modules}, 83 'env': ${env}, 84 'model': '${model}', 85 'model_opts': ${model_opts} 86 }, 87 'resources': { 88 'numCores': { 89 'exact': ${numCores} 90 }, 91 'numNodes': { 92 'exact': ${numNodes} 93 } 94 } 95 } 96 """ 97 98 defaults = { 99 'args': [], 100 'stdout': 'stdout', 101 'stderr': 'stderr', 102 'venv': '', 103 'modules': [], 104 'env': {}, 105 'model': 'default', 106 'model_opts': {}, 107 'numCores': 1, 108 'numNodes': 1 109 } 110 111 return template, defaults
A template class for submission of QCG-PilotJob tasks that run on exact number cores / nodes
With this class it is possible to define basic resource requirements for tasks. For advanced use-cases, it is recommended to provide custom implementation of QCGPJTemplate. For complete reference of QCG-PilotJob task's description parameters please look at https://qcg-pilotjob.readthedocs.io/en/latest/fileinterface.html#submit
71 @staticmethod 72 def template() -> Tuple[str, Dict[str, Any]]: 73 template = """ 74 { 75 'name': '${name}', 76 'execution': { 77 'exec': '${exec}', 78 'args': ${args}, 79 'stdout': '${stdout}', 80 'stderr': '${stderr}', 81 'venv': '${venv}', 82 'modules': ${modules}, 83 'env': ${env}, 84 'model': '${model}', 85 'model_opts': ${model_opts} 86 }, 87 'resources': { 88 'numCores': { 89 'exact': ${numCores} 90 }, 91 'numNodes': { 92 'exact': ${numNodes} 93 } 94 } 95 } 96 """ 97 98 defaults = { 99 'args': [], 100 'stdout': 'stdout', 101 'stderr': 'stderr', 102 'venv': '', 103 'modules': [], 104 'env': {}, 105 'model': 'default', 106 'model_opts': {}, 107 'numCores': 1, 108 'numNodes': 1 109 } 110 111 return template, defaults
114class QCGPJPool(Executor): 115 """A Pool that manages execution of actions with QCG-PilotJob. 116 117 Parameters 118 ---------- 119 qcgpj_executor: str 120 An instance of QCGPJExecutor. If not provided, an instance of QCGPJExecutor 121 with default settings will be created 122 template: QCGPJTemplate 123 An object which contains only a single method `template` that returns a tuple. 124 The first element of a tuple should be a string representing a QCG-PilotJob task's description 125 with placeholders (identifiers preceded by $ symbol) 126 and the second a dictionary that assigns default values for selected placeholders. 127 If not provided, a default EasyVVUQBasicTemplate will be used 128 template_params: dict 129 A dictionary that contains parameters that will be used to substitute placeholders 130 defined in the template 131 polling_interval: int 132 An interval between queries to the QCG-PilotJob Manager service about state of the tasks, in seconds. 133 """ 134 135 def __init__( 136 self, 137 qcgpj_executor=None, 138 template=None, 139 template_params=None, 140 polling_interval=1): 141 if qcgpj_executor is None: 142 qcgpj_executor = QCGPJExecutor() 143 if template is None: 144 template = EasyVVUQBasicTemplate() 145 146 self._qcgpj_executor = qcgpj_executor 147 self._template = template 148 self._template_params = template_params 149 self._polling_interval = polling_interval 150 self._campaign_dir = None 151 152 def submit(self, fn, *args, **kwargs): 153 """Submits a callable to be executed by QCG-PilotJob. 154 155 Schedules the callable to be executed inside a QCG-PilotJob's task and returns 156 a Future representing the execution of the callable. 157 158 Returns 159 ------- 160 QCGPJFuture 161 QCGPJFuture representing the given call. 162 """ 163 actions = fn.__self__ 164 actions.set_wrapper(QCGPJPool._wrapper) 165 exec = 'python3' 166 arg1 = "-m" 167 arg2 = "easyvvuq.actions.execute_qcgpj_task" 168 169 self._campaign_dir = args[0]['campaign_dir'] 170 171 pickled_actions = base64.b64encode(dill.dumps(actions)).decode('ascii') 172 pickled_previous = base64.b64encode(dill.dumps(args[0])).decode('ascii') 173 174 actions_file = f'{self._campaign_dir}/.qcgpj_in_act_{args[0]["run_id"]}' 175 previous_file = f'{self._campaign_dir}/.qcgpj_in_prev_{args[0]["run_id"]}' 176 177 with open(actions_file, 'w') as f: 178 f.write(pickled_actions) 179 180 with open(previous_file, 'w') as f: 181 f.write(pickled_previous) 182 183 return self._qcgpj_executor.submit( 184 self._template.template, 185 self._template_params, 186 exec=exec, 187 name=args[0]['run_id'], 188 stdout=f"{self._campaign_dir}/stdout_{args[0]['run_id']}", 189 stderr=f"{self._campaign_dir}/stderr_{args[0]['run_id']}", 190 args=[arg1, arg2, actions_file, previous_file]) 191 192 @property 193 def executor(self): 194 """Returns current QCGPJExecutor instance. 195 196 It gives you an access to QCG-PilotJob Manager instance, which in turn can be used to 197 get information about the QCG-PilotJob execution environment. 198 """ 199 return self._qcgpj_executor 200 201 def convert_results(self, result_qcgpj): 202 """Converts results generated by QCG-PilotJob task to EasyVVUQ-compatible form 203 204 The method loads results data from a file where it was stored by QCG-PilotJob's task 205 and then converts it to a dictionary which can be further processed by EasyVVUQ. 206 207 Parameters 208 ---------- 209 result_qcgpj: list or None 210 A list of results returned by a QCG-PilotJob task (only the first element will be used), 211 or None if the task hasn't finished with the status SUCCEED 212 213 Returns 214 ------- 215 dict 216 A dictionary containing results 217 """ 218 219 for key, value in result_qcgpj.items(): 220 if value != 'SUCCEED': 221 logging.error(f"Task {key} finished with the status: {value}") 222 raise RuntimeError(f"QCG-PilotJob task {key} finished with the status: {value}") 223 224 result_file = f'{self._campaign_dir}/.qcgpj_result_{key}' 225 with open(result_file, 'r') as f: 226 previous = json.load(f) 227 remove(result_file) 228 229 return previous 230 231 def shutdown(self, **kwargs): 232 """Clean-up the resources associated with the QCGPJPool. 233 """ 234 return self._qcgpj_executor.shutdown() 235 236 def as_completed(self, futures): 237 """Checks for the status of features and yields those that are finished 238 """ 239 240 pending = set(futures) 241 finished = set() 242 243 for f in futures: 244 if f.done(): 245 finished.add(f) 246 pending = pending - finished 247 248 while finished: 249 yield finished.pop() 250 251 while pending: 252 for f in pending: 253 if f.done(): 254 finished.add(f) 255 pending = pending - finished 256 257 while finished: 258 yield finished.pop() 259 260 time.sleep(self._polling_interval) 261 262 @staticmethod 263 def _wrapper(action, previous): 264 # TODO: Implement support for specialised execution models of QCG-PilotJob 265 # """For the actions other than ExecuteQCGPJ ensures that the code is invoked only once 266 # """ 267 # if not isinstance(action, ExecuteQCGPJ): 268 # rank = 0 269 # if 'OMPI_COMM_WORLD_RANK' in environ: 270 # rank = environ.get('OMPI_COMM_WORLD_RANK') 271 # elif 'PMI_RANK' in environ: 272 # rank = environ.get('PMI_RANK') 273 # 274 # if rank != 0: 275 # return 276 return action.start(previous)
A Pool that manages execution of actions with QCG-PilotJob.
Parameters
- qcgpj_executor (str): An instance of QCGPJExecutor. If not provided, an instance of QCGPJExecutor with default settings will be created
- template (QCGPJTemplate):
An object which contains only a single method
templatethat returns a tuple. The first element of a tuple should be a string representing a QCG-PilotJob task's description with placeholders (identifiers preceded by $ symbol) and the second a dictionary that assigns default values for selected placeholders. If not provided, a default EasyVVUQBasicTemplate will be used - template_params (dict): A dictionary that contains parameters that will be used to substitute placeholders defined in the template
- polling_interval (int): An interval between queries to the QCG-PilotJob Manager service about state of the tasks, in seconds.
135 def __init__( 136 self, 137 qcgpj_executor=None, 138 template=None, 139 template_params=None, 140 polling_interval=1): 141 if qcgpj_executor is None: 142 qcgpj_executor = QCGPJExecutor() 143 if template is None: 144 template = EasyVVUQBasicTemplate() 145 146 self._qcgpj_executor = qcgpj_executor 147 self._template = template 148 self._template_params = template_params 149 self._polling_interval = polling_interval 150 self._campaign_dir = None
152 def submit(self, fn, *args, **kwargs): 153 """Submits a callable to be executed by QCG-PilotJob. 154 155 Schedules the callable to be executed inside a QCG-PilotJob's task and returns 156 a Future representing the execution of the callable. 157 158 Returns 159 ------- 160 QCGPJFuture 161 QCGPJFuture representing the given call. 162 """ 163 actions = fn.__self__ 164 actions.set_wrapper(QCGPJPool._wrapper) 165 exec = 'python3' 166 arg1 = "-m" 167 arg2 = "easyvvuq.actions.execute_qcgpj_task" 168 169 self._campaign_dir = args[0]['campaign_dir'] 170 171 pickled_actions = base64.b64encode(dill.dumps(actions)).decode('ascii') 172 pickled_previous = base64.b64encode(dill.dumps(args[0])).decode('ascii') 173 174 actions_file = f'{self._campaign_dir}/.qcgpj_in_act_{args[0]["run_id"]}' 175 previous_file = f'{self._campaign_dir}/.qcgpj_in_prev_{args[0]["run_id"]}' 176 177 with open(actions_file, 'w') as f: 178 f.write(pickled_actions) 179 180 with open(previous_file, 'w') as f: 181 f.write(pickled_previous) 182 183 return self._qcgpj_executor.submit( 184 self._template.template, 185 self._template_params, 186 exec=exec, 187 name=args[0]['run_id'], 188 stdout=f"{self._campaign_dir}/stdout_{args[0]['run_id']}", 189 stderr=f"{self._campaign_dir}/stderr_{args[0]['run_id']}", 190 args=[arg1, arg2, actions_file, previous_file])
Submits a callable to be executed by QCG-PilotJob.
Schedules the callable to be executed inside a QCG-PilotJob's task and returns a Future representing the execution of the callable.
Returns
- QCGPJFuture: QCGPJFuture representing the given call.
192 @property 193 def executor(self): 194 """Returns current QCGPJExecutor instance. 195 196 It gives you an access to QCG-PilotJob Manager instance, which in turn can be used to 197 get information about the QCG-PilotJob execution environment. 198 """ 199 return self._qcgpj_executor
Returns current QCGPJExecutor instance.
It gives you an access to QCG-PilotJob Manager instance, which in turn can be used to get information about the QCG-PilotJob execution environment.
201 def convert_results(self, result_qcgpj): 202 """Converts results generated by QCG-PilotJob task to EasyVVUQ-compatible form 203 204 The method loads results data from a file where it was stored by QCG-PilotJob's task 205 and then converts it to a dictionary which can be further processed by EasyVVUQ. 206 207 Parameters 208 ---------- 209 result_qcgpj: list or None 210 A list of results returned by a QCG-PilotJob task (only the first element will be used), 211 or None if the task hasn't finished with the status SUCCEED 212 213 Returns 214 ------- 215 dict 216 A dictionary containing results 217 """ 218 219 for key, value in result_qcgpj.items(): 220 if value != 'SUCCEED': 221 logging.error(f"Task {key} finished with the status: {value}") 222 raise RuntimeError(f"QCG-PilotJob task {key} finished with the status: {value}") 223 224 result_file = f'{self._campaign_dir}/.qcgpj_result_{key}' 225 with open(result_file, 'r') as f: 226 previous = json.load(f) 227 remove(result_file) 228 229 return previous
Converts results generated by QCG-PilotJob task to EasyVVUQ-compatible form
The method loads results data from a file where it was stored by QCG-PilotJob's task and then converts it to a dictionary which can be further processed by EasyVVUQ.
Parameters
- result_qcgpj (list or None): A list of results returned by a QCG-PilotJob task (only the first element will be used), or None if the task hasn't finished with the status SUCCEED
Returns
- dict: A dictionary containing results
231 def shutdown(self, **kwargs): 232 """Clean-up the resources associated with the QCGPJPool. 233 """ 234 return self._qcgpj_executor.shutdown()
Clean-up the resources associated with the QCGPJPool.
236 def as_completed(self, futures): 237 """Checks for the status of features and yields those that are finished 238 """ 239 240 pending = set(futures) 241 finished = set() 242 243 for f in futures: 244 if f.done(): 245 finished.add(f) 246 pending = pending - finished 247 248 while finished: 249 yield finished.pop() 250 251 while pending: 252 for f in pending: 253 if f.done(): 254 finished.add(f) 255 pending = pending - finished 256 257 while finished: 258 yield finished.pop() 259 260 time.sleep(self._polling_interval)
Checks for the status of features and yields those that are finished
279class ExecuteQCGPJ: 280 """A utility decorator over action that marks the action as configured for parallel execution by QCG-PilotJob 281 Currently it has no influence on the processing. 282 283 Parameters 284 ---------- 285 action: Action 286 an action that will be decorated in order to enable parallel execution inside a QCG-PilotJob task. 287 """ 288 289 def __init__(self, action): 290 self._action = action 291 292 def start(self, previous=None): 293 return self._action.start(previous) 294 295 def finished(self): 296 return self._action.finished() 297 298 def finalise(self): 299 return self._action.finalise() 300 301 def succeeded(self): 302 return self._action.succeeded()
A utility decorator over action that marks the action as configured for parallel execution by QCG-PilotJob Currently it has no influence on the processing.
Parameters
- action (Action): an action that will be decorated in order to enable parallel execution inside a QCG-PilotJob task.