easyvvuq.actions.execute_local
This module provides an assortment of actions generally concerned with executing simulations locally. Some Actions will also be useful when using Dask.
1"""This module provides an assortment of actions generally concerned with executing 2simulations locally. Some Actions will also be useful when using Dask. 3""" 4 5import os 6from pathlib import Path 7import shutil 8import subprocess 9import dill 10import copy 11import logging 12 13__license__ = "LGPL" 14 15 16def local_execute(encoder, command, decoder, root='/tmp'): 17 """A helper function for a simple local execution. 18 It will create a directory under your specified root folder, encode the sampler output, execute a command 19 and decode the results of the simulation. 20 21 Parameters 22 ---------- 23 encoder: Encoder 24 an encoder to use 25 command: list of str 26 a command to run your simulation (same as argument to popen, e.g. ['ls', '-al']) 27 decoder: Decoder 28 a decoder to use 29 root: str 30 root folder, for example '/tmp' or if you want to use ram based filesystem it could be '/dev/shm' 31 32 Returns 33 ------- 34 Actions 35 """ 36 return Actions( 37 CreateRunDirectory(root), 38 Encode(encoder), 39 ExecuteLocal(command), 40 Decode(decoder)) 41 42 43class CreateRunDirectory(): 44 """Creates a directory structure for storing simulation input and output files. 45 46 Parameters 47 ---------- 48 root: str 49 Root directory to create a directory structure in. 50 flatten: bool 51 If set to True will result in a flat directory structure (each run gets a directory 52 under root). If left as False will create a hierarchical structure. This is useful 53 so as not to overload the filesystem. 54 """ 55 56 def __init__(self, root, flatten=False): 57 self.root = root 58 self.flatten = flatten 59 60 def start(self, previous=None): 61 """Starts the action. 62 63 Will read a `run_id` from a dictionary supplied by the previous `Action`. 64 Will then create a directory structure based on the numerical value of the `run_id`. 65 66 Returns 67 ------- 68 dict 69 A dictionary to be passed to the following `Action`. 70 """ 71 run_id = previous['run_id'] 72 level1_a, level1_b = (int(run_id / 100 ** 4) * 100 ** 4, 73 int(run_id / 100 ** 4 + 1) * 100 ** 4) 74 level2_a, level2_b = (int(run_id / 100 ** 3) * 100 ** 3, 75 int(run_id / 100 ** 3 + 1) * 100 ** 3) 76 level3_a, level3_b = (int(run_id / 100 ** 2) * 100 ** 2, 77 int(run_id / 100 ** 2 + 1) * 100 ** 2) 78 level4_a, level4_b = (int(run_id / 100 ** 1) * 100 ** 1, 79 int(run_id / 100 ** 1 + 1) * 100 ** 1) 80 level1_dir = "runs_{}-{}/".format(level1_a, level1_b) 81 level2_dir = "runs_{}-{}/".format(level2_a, level2_b) 82 level3_dir = "runs_{}-{}/".format(level3_a, level3_b) 83 level4_dir = "runs_{}-{}/".format(level4_a, level4_b) 84 level5_dir = "run_{}".format(int(run_id)) 85 86 if self.flatten: 87 path = os.path.join(self.root, previous['campaign_dir'], 'runs', level5_dir) 88 else: 89 path = os.path.join(self.root, previous['campaign_dir'], 'runs', 90 level1_dir, level2_dir, level3_dir, level4_dir, level5_dir) 91 92 Path(path).mkdir(parents=True, exist_ok=True) 93 previous['rundir'] = path 94 self.result = previous 95 return self.result 96 97 def succeeded(self): 98 """Has the `Action` finished successfully. 99 100 Returns 101 ------- 102 bool 103 True if `Action` completed successfully. False otherwise. 104 """ 105 return True 106 107 108class Encode(): 109 def __init__(self, encoder): 110 self.encoder = encoder 111 112 def start(self, previous=None): 113 self.encoder.encode( 114 params=previous['run_info']['params'], 115 target_dir=previous['rundir']) 116 try: 117 previous['encoder_filename'] = self.encoder.target_filename 118 except AttributeError: 119 if os.getenv("EasyVVUQ_Debug"): print('AttributeError raised and ignored') 120 return previous 121 122 def finished(self): 123 return True 124 125 def finalise(self): 126 pass 127 128 def succeeeded(self): 129 return True 130 131 132class Decode(): 133 def __init__(self, decoder): 134 self.decoder = decoder 135 136 def start(self, previous=None): 137 run_info = copy.copy(previous['run_info']) 138 run_info['run_dir'] = previous['rundir'] 139 result = self.decoder.parse_sim_output(run_info) 140 previous['result'] = result 141 previous['decoder_filename'] = self.decoder.target_filename 142 previous['collated'] = True 143 return previous 144 145 def finished(self): 146 return True 147 148 def finalise(self): 149 pass 150 151 def succeeded(self): 152 return True 153 154 155class CleanUp(): 156 def __init__(self): 157 pass 158 159 def start(self, previous=None): 160 if not ('rundir' in previous.keys()): 161 raise RuntimeError('must be used with actions that create a directory structure') 162 shutil.rmtree(previous['rundir']) 163 return previous 164 165 def finished(self): 166 return True 167 168 def finalise(self): 169 pass 170 171 def succeeded(self): 172 return True 173 174 175class ExecutePython(): 176 def __init__(self, function): 177 self.function = dill.dumps(function) 178 self.params = None 179 self.eval_result = None 180 181 def start(self, previous=None): 182 function = dill.loads(self.function) 183 self.eval_result = function(previous['run_info']['params']) 184 previous['result'] = self.eval_result 185 previous['collated'] = True 186 return previous 187 188 def finished(self): 189 if self.eval_result is None: 190 return False 191 else: 192 return True 193 194 def finalise(self): 195 pass 196 197 def succeeded(self): 198 if not self.finished(): 199 raise RuntimeError('action did not finish yet') 200 else: 201 return True 202 203 204class ExecuteLocal(): 205 def __init__(self, full_cmd, stdout=None, stderr=None): 206 self.full_cmd = full_cmd.split() 207 self.popen_object = None 208 self.ret = None 209 self._started = False 210 self.stdout = stdout 211 self.stderr = stderr 212 213 # Fix for the Matlab command, do not split: -r 'test(); quit' 214 # matlab -nodesktop -nojvm -r 'test(); quit'" 215 if "-r" in self.full_cmd: 216 start = full_cmd.find("-r ") 217 cmd = full_cmd[:start].split() 218 cmd.append(full_cmd[start:]) 219 self.full_cmd = cmd 220 logging.info(f"Updating the split of the command for the Matlab's -r argument.") 221 logging.info(f"Full command reads: {cmd}") 222 223 def start(self, previous=None): 224 target_dir = previous['rundir'] 225 if isinstance(self.stdout, str): 226 stdout = open(os.path.join(target_dir, self.stdout), 'w') 227 else: 228 stdout = self.stdout 229 if isinstance(self.stderr, str): 230 stderr = open(os.path.join(target_dir, self.stderr), 'w') 231 else: 232 stderr = self.stderr 233 self.ret = subprocess.run( 234 self.full_cmd, cwd=target_dir, 235 stdout=stdout, stderr=stderr) 236 if isinstance(self.stdout, str): close(stdout) 237 if isinstance(self.stderr, str): close(stderr) 238 return previous 239 240 def finished(self): 241 return True 242 243 def finalise(self): 244 """Performs clean-up if necessary. In this case it isn't. I think. 245 """ 246 stdout.close() 247 stderr.close() 248 pass 249 250 def succeeded(self): 251 """Will return True if the process finished successfully. 252 It judges based on the return code and will return False 253 if that code is not zero. 254 """ 255 if self.ret != 0: 256 return False 257 else: 258 return True 259 260 261class Actions(): 262 def __init__(self, *args): 263 self.actions = list(args) 264 self.wrapper = lambda action, previous: action.start(previous) 265 266 def set_wrapper(self, wrapper): 267 """Adds a wrapper to be called on each Action. 268 269 Parameters 270 ---------- 271 wrapper: callable 272 A function to call on each Action. Should pass through the return of the 273 start method. 274 """ 275 self.wrapper = wrapper 276 277 def start(self, previous=None): 278 for action in self.actions: 279 if not hasattr(action, 'start'): 280 raise RuntimeError('action in the actions list does not provide a start method') 281 previous = copy.copy(previous) 282 run_id = previous['run_id'] 283 for action in self.actions: 284 previous = self.wrapper(action, previous) 285 self.result = previous 286 assert (self.result['run_id'] == run_id) 287 return previous 288 289 def finished(self): 290 return all([action.finished() for action in self.actions]) 291 292 def finalise(self): 293 for action in self.actions: 294 action.finalise() 295 296 def succeeded(self): 297 return all([action.succeeded() for action in self.actions])
17def local_execute(encoder, command, decoder, root='/tmp'): 18 """A helper function for a simple local execution. 19 It will create a directory under your specified root folder, encode the sampler output, execute a command 20 and decode the results of the simulation. 21 22 Parameters 23 ---------- 24 encoder: Encoder 25 an encoder to use 26 command: list of str 27 a command to run your simulation (same as argument to popen, e.g. ['ls', '-al']) 28 decoder: Decoder 29 a decoder to use 30 root: str 31 root folder, for example '/tmp' or if you want to use ram based filesystem it could be '/dev/shm' 32 33 Returns 34 ------- 35 Actions 36 """ 37 return Actions( 38 CreateRunDirectory(root), 39 Encode(encoder), 40 ExecuteLocal(command), 41 Decode(decoder))
A helper function for a simple local execution. It will create a directory under your specified root folder, encode the sampler output, execute a command and decode the results of the simulation.
Parameters
- encoder (Encoder): an encoder to use
- command (list of str): a command to run your simulation (same as argument to popen, e.g. ['ls', '-al'])
- decoder (Decoder): a decoder to use
- root (str): root folder, for example '/tmp' or if you want to use ram based filesystem it could be '/dev/shm'
Returns
- Actions
44class CreateRunDirectory(): 45 """Creates a directory structure for storing simulation input and output files. 46 47 Parameters 48 ---------- 49 root: str 50 Root directory to create a directory structure in. 51 flatten: bool 52 If set to True will result in a flat directory structure (each run gets a directory 53 under root). If left as False will create a hierarchical structure. This is useful 54 so as not to overload the filesystem. 55 """ 56 57 def __init__(self, root, flatten=False): 58 self.root = root 59 self.flatten = flatten 60 61 def start(self, previous=None): 62 """Starts the action. 63 64 Will read a `run_id` from a dictionary supplied by the previous `Action`. 65 Will then create a directory structure based on the numerical value of the `run_id`. 66 67 Returns 68 ------- 69 dict 70 A dictionary to be passed to the following `Action`. 71 """ 72 run_id = previous['run_id'] 73 level1_a, level1_b = (int(run_id / 100 ** 4) * 100 ** 4, 74 int(run_id / 100 ** 4 + 1) * 100 ** 4) 75 level2_a, level2_b = (int(run_id / 100 ** 3) * 100 ** 3, 76 int(run_id / 100 ** 3 + 1) * 100 ** 3) 77 level3_a, level3_b = (int(run_id / 100 ** 2) * 100 ** 2, 78 int(run_id / 100 ** 2 + 1) * 100 ** 2) 79 level4_a, level4_b = (int(run_id / 100 ** 1) * 100 ** 1, 80 int(run_id / 100 ** 1 + 1) * 100 ** 1) 81 level1_dir = "runs_{}-{}/".format(level1_a, level1_b) 82 level2_dir = "runs_{}-{}/".format(level2_a, level2_b) 83 level3_dir = "runs_{}-{}/".format(level3_a, level3_b) 84 level4_dir = "runs_{}-{}/".format(level4_a, level4_b) 85 level5_dir = "run_{}".format(int(run_id)) 86 87 if self.flatten: 88 path = os.path.join(self.root, previous['campaign_dir'], 'runs', level5_dir) 89 else: 90 path = os.path.join(self.root, previous['campaign_dir'], 'runs', 91 level1_dir, level2_dir, level3_dir, level4_dir, level5_dir) 92 93 Path(path).mkdir(parents=True, exist_ok=True) 94 previous['rundir'] = path 95 self.result = previous 96 return self.result 97 98 def succeeded(self): 99 """Has the `Action` finished successfully. 100 101 Returns 102 ------- 103 bool 104 True if `Action` completed successfully. False otherwise. 105 """ 106 return True
Creates a directory structure for storing simulation input and output files.
Parameters
- root (str): Root directory to create a directory structure in.
- flatten (bool): If set to True will result in a flat directory structure (each run gets a directory under root). If left as False will create a hierarchical structure. This is useful so as not to overload the filesystem.
61 def start(self, previous=None): 62 """Starts the action. 63 64 Will read a `run_id` from a dictionary supplied by the previous `Action`. 65 Will then create a directory structure based on the numerical value of the `run_id`. 66 67 Returns 68 ------- 69 dict 70 A dictionary to be passed to the following `Action`. 71 """ 72 run_id = previous['run_id'] 73 level1_a, level1_b = (int(run_id / 100 ** 4) * 100 ** 4, 74 int(run_id / 100 ** 4 + 1) * 100 ** 4) 75 level2_a, level2_b = (int(run_id / 100 ** 3) * 100 ** 3, 76 int(run_id / 100 ** 3 + 1) * 100 ** 3) 77 level3_a, level3_b = (int(run_id / 100 ** 2) * 100 ** 2, 78 int(run_id / 100 ** 2 + 1) * 100 ** 2) 79 level4_a, level4_b = (int(run_id / 100 ** 1) * 100 ** 1, 80 int(run_id / 100 ** 1 + 1) * 100 ** 1) 81 level1_dir = "runs_{}-{}/".format(level1_a, level1_b) 82 level2_dir = "runs_{}-{}/".format(level2_a, level2_b) 83 level3_dir = "runs_{}-{}/".format(level3_a, level3_b) 84 level4_dir = "runs_{}-{}/".format(level4_a, level4_b) 85 level5_dir = "run_{}".format(int(run_id)) 86 87 if self.flatten: 88 path = os.path.join(self.root, previous['campaign_dir'], 'runs', level5_dir) 89 else: 90 path = os.path.join(self.root, previous['campaign_dir'], 'runs', 91 level1_dir, level2_dir, level3_dir, level4_dir, level5_dir) 92 93 Path(path).mkdir(parents=True, exist_ok=True) 94 previous['rundir'] = path 95 self.result = previous 96 return self.result
Starts the action.
Will read a run_id from a dictionary supplied by the previous Action.
Will then create a directory structure based on the numerical value of the run_id.
Returns
- dict: A dictionary to be passed to the following
Action.
98 def succeeded(self): 99 """Has the `Action` finished successfully. 100 101 Returns 102 ------- 103 bool 104 True if `Action` completed successfully. False otherwise. 105 """ 106 return True
Has the Action finished successfully.
Returns
- bool: True if
Actioncompleted successfully. False otherwise.
109class Encode(): 110 def __init__(self, encoder): 111 self.encoder = encoder 112 113 def start(self, previous=None): 114 self.encoder.encode( 115 params=previous['run_info']['params'], 116 target_dir=previous['rundir']) 117 try: 118 previous['encoder_filename'] = self.encoder.target_filename 119 except AttributeError: 120 if os.getenv("EasyVVUQ_Debug"): print('AttributeError raised and ignored') 121 return previous 122 123 def finished(self): 124 return True 125 126 def finalise(self): 127 pass 128 129 def succeeeded(self): 130 return True
113 def start(self, previous=None): 114 self.encoder.encode( 115 params=previous['run_info']['params'], 116 target_dir=previous['rundir']) 117 try: 118 previous['encoder_filename'] = self.encoder.target_filename 119 except AttributeError: 120 if os.getenv("EasyVVUQ_Debug"): print('AttributeError raised and ignored') 121 return previous
133class Decode(): 134 def __init__(self, decoder): 135 self.decoder = decoder 136 137 def start(self, previous=None): 138 run_info = copy.copy(previous['run_info']) 139 run_info['run_dir'] = previous['rundir'] 140 result = self.decoder.parse_sim_output(run_info) 141 previous['result'] = result 142 previous['decoder_filename'] = self.decoder.target_filename 143 previous['collated'] = True 144 return previous 145 146 def finished(self): 147 return True 148 149 def finalise(self): 150 pass 151 152 def succeeded(self): 153 return True
137 def start(self, previous=None): 138 run_info = copy.copy(previous['run_info']) 139 run_info['run_dir'] = previous['rundir'] 140 result = self.decoder.parse_sim_output(run_info) 141 previous['result'] = result 142 previous['decoder_filename'] = self.decoder.target_filename 143 previous['collated'] = True 144 return previous
156class CleanUp(): 157 def __init__(self): 158 pass 159 160 def start(self, previous=None): 161 if not ('rundir' in previous.keys()): 162 raise RuntimeError('must be used with actions that create a directory structure') 163 shutil.rmtree(previous['rundir']) 164 return previous 165 166 def finished(self): 167 return True 168 169 def finalise(self): 170 pass 171 172 def succeeded(self): 173 return True
176class ExecutePython(): 177 def __init__(self, function): 178 self.function = dill.dumps(function) 179 self.params = None 180 self.eval_result = None 181 182 def start(self, previous=None): 183 function = dill.loads(self.function) 184 self.eval_result = function(previous['run_info']['params']) 185 previous['result'] = self.eval_result 186 previous['collated'] = True 187 return previous 188 189 def finished(self): 190 if self.eval_result is None: 191 return False 192 else: 193 return True 194 195 def finalise(self): 196 pass 197 198 def succeeded(self): 199 if not self.finished(): 200 raise RuntimeError('action did not finish yet') 201 else: 202 return True
205class ExecuteLocal(): 206 def __init__(self, full_cmd, stdout=None, stderr=None): 207 self.full_cmd = full_cmd.split() 208 self.popen_object = None 209 self.ret = None 210 self._started = False 211 self.stdout = stdout 212 self.stderr = stderr 213 214 # Fix for the Matlab command, do not split: -r 'test(); quit' 215 # matlab -nodesktop -nojvm -r 'test(); quit'" 216 if "-r" in self.full_cmd: 217 start = full_cmd.find("-r ") 218 cmd = full_cmd[:start].split() 219 cmd.append(full_cmd[start:]) 220 self.full_cmd = cmd 221 logging.info(f"Updating the split of the command for the Matlab's -r argument.") 222 logging.info(f"Full command reads: {cmd}") 223 224 def start(self, previous=None): 225 target_dir = previous['rundir'] 226 if isinstance(self.stdout, str): 227 stdout = open(os.path.join(target_dir, self.stdout), 'w') 228 else: 229 stdout = self.stdout 230 if isinstance(self.stderr, str): 231 stderr = open(os.path.join(target_dir, self.stderr), 'w') 232 else: 233 stderr = self.stderr 234 self.ret = subprocess.run( 235 self.full_cmd, cwd=target_dir, 236 stdout=stdout, stderr=stderr) 237 if isinstance(self.stdout, str): close(stdout) 238 if isinstance(self.stderr, str): close(stderr) 239 return previous 240 241 def finished(self): 242 return True 243 244 def finalise(self): 245 """Performs clean-up if necessary. In this case it isn't. I think. 246 """ 247 stdout.close() 248 stderr.close() 249 pass 250 251 def succeeded(self): 252 """Will return True if the process finished successfully. 253 It judges based on the return code and will return False 254 if that code is not zero. 255 """ 256 if self.ret != 0: 257 return False 258 else: 259 return True
206 def __init__(self, full_cmd, stdout=None, stderr=None): 207 self.full_cmd = full_cmd.split() 208 self.popen_object = None 209 self.ret = None 210 self._started = False 211 self.stdout = stdout 212 self.stderr = stderr 213 214 # Fix for the Matlab command, do not split: -r 'test(); quit' 215 # matlab -nodesktop -nojvm -r 'test(); quit'" 216 if "-r" in self.full_cmd: 217 start = full_cmd.find("-r ") 218 cmd = full_cmd[:start].split() 219 cmd.append(full_cmd[start:]) 220 self.full_cmd = cmd 221 logging.info(f"Updating the split of the command for the Matlab's -r argument.") 222 logging.info(f"Full command reads: {cmd}")
224 def start(self, previous=None): 225 target_dir = previous['rundir'] 226 if isinstance(self.stdout, str): 227 stdout = open(os.path.join(target_dir, self.stdout), 'w') 228 else: 229 stdout = self.stdout 230 if isinstance(self.stderr, str): 231 stderr = open(os.path.join(target_dir, self.stderr), 'w') 232 else: 233 stderr = self.stderr 234 self.ret = subprocess.run( 235 self.full_cmd, cwd=target_dir, 236 stdout=stdout, stderr=stderr) 237 if isinstance(self.stdout, str): close(stdout) 238 if isinstance(self.stderr, str): close(stderr) 239 return previous
244 def finalise(self): 245 """Performs clean-up if necessary. In this case it isn't. I think. 246 """ 247 stdout.close() 248 stderr.close() 249 pass
Performs clean-up if necessary. In this case it isn't. I think.
251 def succeeded(self): 252 """Will return True if the process finished successfully. 253 It judges based on the return code and will return False 254 if that code is not zero. 255 """ 256 if self.ret != 0: 257 return False 258 else: 259 return True
Will return True if the process finished successfully. It judges based on the return code and will return False if that code is not zero.
262class Actions(): 263 def __init__(self, *args): 264 self.actions = list(args) 265 self.wrapper = lambda action, previous: action.start(previous) 266 267 def set_wrapper(self, wrapper): 268 """Adds a wrapper to be called on each Action. 269 270 Parameters 271 ---------- 272 wrapper: callable 273 A function to call on each Action. Should pass through the return of the 274 start method. 275 """ 276 self.wrapper = wrapper 277 278 def start(self, previous=None): 279 for action in self.actions: 280 if not hasattr(action, 'start'): 281 raise RuntimeError('action in the actions list does not provide a start method') 282 previous = copy.copy(previous) 283 run_id = previous['run_id'] 284 for action in self.actions: 285 previous = self.wrapper(action, previous) 286 self.result = previous 287 assert (self.result['run_id'] == run_id) 288 return previous 289 290 def finished(self): 291 return all([action.finished() for action in self.actions]) 292 293 def finalise(self): 294 for action in self.actions: 295 action.finalise() 296 297 def succeeded(self): 298 return all([action.succeeded() for action in self.actions])
267 def set_wrapper(self, wrapper): 268 """Adds a wrapper to be called on each Action. 269 270 Parameters 271 ---------- 272 wrapper: callable 273 A function to call on each Action. Should pass through the return of the 274 start method. 275 """ 276 self.wrapper = wrapper
Adds a wrapper to be called on each Action.
Parameters
- wrapper (callable): A function to call on each Action. Should pass through the return of the start method.
278 def start(self, previous=None): 279 for action in self.actions: 280 if not hasattr(action, 'start'): 281 raise RuntimeError('action in the actions list does not provide a start method') 282 previous = copy.copy(previous) 283 run_id = previous['run_id'] 284 for action in self.actions: 285 previous = self.wrapper(action, previous) 286 self.result = previous 287 assert (self.result['run_id'] == run_id) 288 return previous