easyvvuq.actions.execute_kubernetes
Provides an action element to execute a simulation on a Kubernetes cluster and retrieve the output. The successful use of this actions requires that the Kubernetes cluster is properly set-up on the users system. Namely the ~/.kube/config file should contain valid information. Exact details will depend on the cloud service provider. Otherwise this action works similarly to how ExecuteLocal works. The difference is that the simulations are executed on a Kubernetes cluster. The input files are passed to the Pods via the ConfigMap mechanism. This probably limits the size of the configuration files but this can be alleviated with some kind of a pre-processing script on the Pod side. Likewise, output from the simulation is retrieved using the Kubernetes log mechanism. Therefore the simulation output needs to be printed to stdout on the Pod side. Again, if the simulation produces complicated or large output you should extract the quantitities of interest on the Pod using some kind of script and print them to stdout.
Examples
1"""Provides an action element to execute a simulation on a Kubernetes 2cluster and retrieve the output. The successful use of this actions 3requires that the Kubernetes cluster is properly set-up on the users 4system. Namely the ~/.kube/config file should contain valid 5information. Exact details will depend on the cloud service 6provider. Otherwise this action works similarly to how ExecuteLocal 7works. The difference is that the simulations are executed on a 8Kubernetes cluster. The input files are passed to the Pods via the 9ConfigMap mechanism. This probably limits the size of the 10configuration files but this can be alleviated with some kind of a 11pre-processing script on the Pod side. Likewise, output from the 12simulation is retrieved using the Kubernetes log mechanism. Therefore 13the simulation output needs to be printed to stdout on the Pod 14side. Again, if the simulation produces complicated or large output 15you should extract the quantitities of interest on the Pod using some 16kind of script and print them to stdout. 17 18Examples 19-------- 20 21""" 22 23import os 24import logging 25import uuid 26import copy 27import time 28 29from kubernetes.client.api import core_v1_api 30from kubernetes import config 31from kubernetes.client import V1ConfigMap, V1ObjectMeta 32 33__copyright__ = """ 34 35 Copyright 2020 Vytautas Jancauskas 36 37 This file is part of EasyVVUQ 38 39 EasyVVUQ is free software: you can redistribute it and/or modify 40 it under the terms of the Lesser GNU General Public License as published by 41 the Free Software Foundation, either version 3 of the License, or 42 (at your option) any later version. 43 44 EasyVVUQ is distributed in the hope that it will be useful, 45 but WITHOUT ANY WARRANTY; without even the implied warranty of 46 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 47 Lesser GNU General Public License for more details. 48 49 You should have received a copy of the Lesser GNU General Public License 50 along with this program. If not, see <https://www.gnu.org/licenses/>. 51 52""" 53__license__ = "LGPL" 54 55logger = logging.getLogger(__name__) 56 57 58class ExecuteKubernetes(): 59 """ 60 61 Parameters 62 ---------- 63 image: str 64 Name of the repository e.g. orbitfold/easyvvuq:tagname. 65 command: str 66 A command to run the simulation from within the container. 67 input_file_names: list 68 A list of input files. 69 output_file_names: list 70 A list of output files. 71 """ 72 73 def __init__(self, image, command, input_file_names=None, output_file_name=None): 74 pod_name = str(uuid.uuid4()) 75 container_name = str(uuid.uuid4()) 76 self.body = { 77 'apiVersion': 'v1', 'kind': 'Pod', 'metadata': {'name': pod_name}, 78 'spec': { 79 'restartPolicy': 'Never', 80 'containers': [ 81 { 82 'name': container_name, 83 'image': image, 84 'command': ['/bin/sh', '-c'], 85 'args': [command] 86 } 87 ] 88 } 89 } 90 self.input_file_names = input_file_names 91 self.output_file_name = output_file_name 92 config.load_kube_config() 93 self.core_v1 = core_v1_api.CoreV1Api() 94 self.pod_name = self.body['metadata']['name'] 95 self.namespace = "default" 96 self._succeeded = False 97 self._started = False 98 99 def start(self, previous=None): 100 """Will create the Kubernetes pod and hence start the action. 101 102 Parameters 103 ---------- 104 previous: dict 105 Data from previous Action. 106 107 Returns 108 ------- 109 dict 110 Data from previous Action appended with data from this Action. 111 """ 112 target_dir = previous['rundir'] 113 if self.input_file_names is None: 114 self.input_file_names = [previous['encoder_filename']] 115 if self.output_file_name is None: 116 self.output_file_name = previous['decoder_filename'] 117 file_names = [(os.path.join(target_dir, input_file_name), str(uuid.uuid4())) 118 for input_file_name in self.input_file_names] 119 self.config_names = file_names 120 dep = copy.deepcopy(self.body) 121 dep['metadata']['name'] = str(uuid.uuid4()) 122 self.create_config_maps(self.config_names) 123 self.create_volumes(self.config_names, dep) 124 self.core_v1.create_namespaced_pod(body=dep, namespace="default") 125 self._started = True 126 self.result = previous 127 while not self.finished(): 128 time.wait(5) 129 self.finalise() 130 return previous 131 132 def finished(self): 133 """Will return True if the pod has finished, otherwise will return False. 134 """ 135 resp = self.core_v1.read_namespaced_pod( 136 name=self.pod_name, namespace=self.namespace) 137 if resp.status.phase not in ['Pending', 'Running']: 138 if resp.status.phase == 'Succeeded': 139 self._succeeded = True 140 return True 141 else: 142 return False 143 144 def finalise(self): 145 """Will read the logs from the Kubernetes pod, output them to a file and 146 delete the Kubernetes resources we have allocated. 147 """ 148 if not (self.finished() and self.succeeded()): 149 raise RuntimeError("Cannot finalise an Action that hasn't finished.") 150 log_ = self.core_v1.read_namespaced_pod_log( 151 self.pod_name, namespace=self.namespace) 152 with open(self.outfile, 'w') as fd: 153 fd.write(log_) 154 for _, id_ in self.config_names: 155 self.core_v1.delete_namespaced_config_map( 156 id_, namespace=self.namespace) 157 self.core_v1.delete_namespaced_pod( 158 self.pod_name, namespace=self.namespace) 159 160 def succeeded(self): 161 """Will return True if the pod has finished successfully, otherwise will return False. 162 If the job hasn't finished yet will return False. 163 """ 164 return self._succeeded 165 166 def create_volumes(self, file_names, dep): 167 """Create descriptions of Volumes that will hold the input files. 168 169 Parameters 170 ---------- 171 filenames: list 172 A list of file names to be mounted under /config/ in the running image. 173 """ 174 volumes = [{'name': id_ + '-volume', 'configMap': {'name': id_}} 175 for _, id_ in file_names] 176 volume_mounts = [{'name': id_ + '-volume', 177 'mountPath': os.path.join('/config/', os.path.basename(file_name)), 178 'subPath': os.path.basename(file_name), 179 'readOnly': True} 180 for file_name, id_ in file_names] 181 dep['spec']['volumes'] = volumes 182 dep['spec']['containers'][0]['volumeMounts'] = volume_mounts 183 184 def create_config_maps(self, file_names): 185 """Create Kubernetes ConfigMaps for the input files to the simulation. 186 187 Parameters 188 ---------- 189 file_names: list 190 Will go through every filename in this list and create a Kubernetes 191 ConfigMap with it's contents. 192 """ 193 for file_name, id_ in file_names: 194 with open(file_name, 'r') as fd: 195 data = fd.read() 196 metadata = V1ObjectMeta( 197 name=id_, 198 namespace='default' 199 ) 200 configmap = V1ConfigMap( 201 api_version='v1', 202 kind='ConfigMap', 203 data={os.path.basename(file_name): data}, 204 metadata=metadata 205 ) 206 self.core_v1.create_namespaced_config_map(namespace='default', body=configmap)
59class ExecuteKubernetes(): 60 """ 61 62 Parameters 63 ---------- 64 image: str 65 Name of the repository e.g. orbitfold/easyvvuq:tagname. 66 command: str 67 A command to run the simulation from within the container. 68 input_file_names: list 69 A list of input files. 70 output_file_names: list 71 A list of output files. 72 """ 73 74 def __init__(self, image, command, input_file_names=None, output_file_name=None): 75 pod_name = str(uuid.uuid4()) 76 container_name = str(uuid.uuid4()) 77 self.body = { 78 'apiVersion': 'v1', 'kind': 'Pod', 'metadata': {'name': pod_name}, 79 'spec': { 80 'restartPolicy': 'Never', 81 'containers': [ 82 { 83 'name': container_name, 84 'image': image, 85 'command': ['/bin/sh', '-c'], 86 'args': [command] 87 } 88 ] 89 } 90 } 91 self.input_file_names = input_file_names 92 self.output_file_name = output_file_name 93 config.load_kube_config() 94 self.core_v1 = core_v1_api.CoreV1Api() 95 self.pod_name = self.body['metadata']['name'] 96 self.namespace = "default" 97 self._succeeded = False 98 self._started = False 99 100 def start(self, previous=None): 101 """Will create the Kubernetes pod and hence start the action. 102 103 Parameters 104 ---------- 105 previous: dict 106 Data from previous Action. 107 108 Returns 109 ------- 110 dict 111 Data from previous Action appended with data from this Action. 112 """ 113 target_dir = previous['rundir'] 114 if self.input_file_names is None: 115 self.input_file_names = [previous['encoder_filename']] 116 if self.output_file_name is None: 117 self.output_file_name = previous['decoder_filename'] 118 file_names = [(os.path.join(target_dir, input_file_name), str(uuid.uuid4())) 119 for input_file_name in self.input_file_names] 120 self.config_names = file_names 121 dep = copy.deepcopy(self.body) 122 dep['metadata']['name'] = str(uuid.uuid4()) 123 self.create_config_maps(self.config_names) 124 self.create_volumes(self.config_names, dep) 125 self.core_v1.create_namespaced_pod(body=dep, namespace="default") 126 self._started = True 127 self.result = previous 128 while not self.finished(): 129 time.wait(5) 130 self.finalise() 131 return previous 132 133 def finished(self): 134 """Will return True if the pod has finished, otherwise will return False. 135 """ 136 resp = self.core_v1.read_namespaced_pod( 137 name=self.pod_name, namespace=self.namespace) 138 if resp.status.phase not in ['Pending', 'Running']: 139 if resp.status.phase == 'Succeeded': 140 self._succeeded = True 141 return True 142 else: 143 return False 144 145 def finalise(self): 146 """Will read the logs from the Kubernetes pod, output them to a file and 147 delete the Kubernetes resources we have allocated. 148 """ 149 if not (self.finished() and self.succeeded()): 150 raise RuntimeError("Cannot finalise an Action that hasn't finished.") 151 log_ = self.core_v1.read_namespaced_pod_log( 152 self.pod_name, namespace=self.namespace) 153 with open(self.outfile, 'w') as fd: 154 fd.write(log_) 155 for _, id_ in self.config_names: 156 self.core_v1.delete_namespaced_config_map( 157 id_, namespace=self.namespace) 158 self.core_v1.delete_namespaced_pod( 159 self.pod_name, namespace=self.namespace) 160 161 def succeeded(self): 162 """Will return True if the pod has finished successfully, otherwise will return False. 163 If the job hasn't finished yet will return False. 164 """ 165 return self._succeeded 166 167 def create_volumes(self, file_names, dep): 168 """Create descriptions of Volumes that will hold the input files. 169 170 Parameters 171 ---------- 172 filenames: list 173 A list of file names to be mounted under /config/ in the running image. 174 """ 175 volumes = [{'name': id_ + '-volume', 'configMap': {'name': id_}} 176 for _, id_ in file_names] 177 volume_mounts = [{'name': id_ + '-volume', 178 'mountPath': os.path.join('/config/', os.path.basename(file_name)), 179 'subPath': os.path.basename(file_name), 180 'readOnly': True} 181 for file_name, id_ in file_names] 182 dep['spec']['volumes'] = volumes 183 dep['spec']['containers'][0]['volumeMounts'] = volume_mounts 184 185 def create_config_maps(self, file_names): 186 """Create Kubernetes ConfigMaps for the input files to the simulation. 187 188 Parameters 189 ---------- 190 file_names: list 191 Will go through every filename in this list and create a Kubernetes 192 ConfigMap with it's contents. 193 """ 194 for file_name, id_ in file_names: 195 with open(file_name, 'r') as fd: 196 data = fd.read() 197 metadata = V1ObjectMeta( 198 name=id_, 199 namespace='default' 200 ) 201 configmap = V1ConfigMap( 202 api_version='v1', 203 kind='ConfigMap', 204 data={os.path.basename(file_name): data}, 205 metadata=metadata 206 ) 207 self.core_v1.create_namespaced_config_map(namespace='default', body=configmap)
Parameters
- image (str): Name of the repository e.g. orbitfold/easyvvuq:tagname.
- command (str): A command to run the simulation from within the container.
- input_file_names (list): A list of input files.
- output_file_names (list): A list of output files.
74 def __init__(self, image, command, input_file_names=None, output_file_name=None): 75 pod_name = str(uuid.uuid4()) 76 container_name = str(uuid.uuid4()) 77 self.body = { 78 'apiVersion': 'v1', 'kind': 'Pod', 'metadata': {'name': pod_name}, 79 'spec': { 80 'restartPolicy': 'Never', 81 'containers': [ 82 { 83 'name': container_name, 84 'image': image, 85 'command': ['/bin/sh', '-c'], 86 'args': [command] 87 } 88 ] 89 } 90 } 91 self.input_file_names = input_file_names 92 self.output_file_name = output_file_name 93 config.load_kube_config() 94 self.core_v1 = core_v1_api.CoreV1Api() 95 self.pod_name = self.body['metadata']['name'] 96 self.namespace = "default" 97 self._succeeded = False 98 self._started = False
100 def start(self, previous=None): 101 """Will create the Kubernetes pod and hence start the action. 102 103 Parameters 104 ---------- 105 previous: dict 106 Data from previous Action. 107 108 Returns 109 ------- 110 dict 111 Data from previous Action appended with data from this Action. 112 """ 113 target_dir = previous['rundir'] 114 if self.input_file_names is None: 115 self.input_file_names = [previous['encoder_filename']] 116 if self.output_file_name is None: 117 self.output_file_name = previous['decoder_filename'] 118 file_names = [(os.path.join(target_dir, input_file_name), str(uuid.uuid4())) 119 for input_file_name in self.input_file_names] 120 self.config_names = file_names 121 dep = copy.deepcopy(self.body) 122 dep['metadata']['name'] = str(uuid.uuid4()) 123 self.create_config_maps(self.config_names) 124 self.create_volumes(self.config_names, dep) 125 self.core_v1.create_namespaced_pod(body=dep, namespace="default") 126 self._started = True 127 self.result = previous 128 while not self.finished(): 129 time.wait(5) 130 self.finalise() 131 return previous
Will create the Kubernetes pod and hence start the action.
Parameters
- previous (dict): Data from previous Action.
Returns
- dict: Data from previous Action appended with data from this Action.
133 def finished(self): 134 """Will return True if the pod has finished, otherwise will return False. 135 """ 136 resp = self.core_v1.read_namespaced_pod( 137 name=self.pod_name, namespace=self.namespace) 138 if resp.status.phase not in ['Pending', 'Running']: 139 if resp.status.phase == 'Succeeded': 140 self._succeeded = True 141 return True 142 else: 143 return False
Will return True if the pod has finished, otherwise will return False.
145 def finalise(self): 146 """Will read the logs from the Kubernetes pod, output them to a file and 147 delete the Kubernetes resources we have allocated. 148 """ 149 if not (self.finished() and self.succeeded()): 150 raise RuntimeError("Cannot finalise an Action that hasn't finished.") 151 log_ = self.core_v1.read_namespaced_pod_log( 152 self.pod_name, namespace=self.namespace) 153 with open(self.outfile, 'w') as fd: 154 fd.write(log_) 155 for _, id_ in self.config_names: 156 self.core_v1.delete_namespaced_config_map( 157 id_, namespace=self.namespace) 158 self.core_v1.delete_namespaced_pod( 159 self.pod_name, namespace=self.namespace)
Will read the logs from the Kubernetes pod, output them to a file and delete the Kubernetes resources we have allocated.
161 def succeeded(self): 162 """Will return True if the pod has finished successfully, otherwise will return False. 163 If the job hasn't finished yet will return False. 164 """ 165 return self._succeeded
Will return True if the pod has finished successfully, otherwise will return False. If the job hasn't finished yet will return False.
167 def create_volumes(self, file_names, dep): 168 """Create descriptions of Volumes that will hold the input files. 169 170 Parameters 171 ---------- 172 filenames: list 173 A list of file names to be mounted under /config/ in the running image. 174 """ 175 volumes = [{'name': id_ + '-volume', 'configMap': {'name': id_}} 176 for _, id_ in file_names] 177 volume_mounts = [{'name': id_ + '-volume', 178 'mountPath': os.path.join('/config/', os.path.basename(file_name)), 179 'subPath': os.path.basename(file_name), 180 'readOnly': True} 181 for file_name, id_ in file_names] 182 dep['spec']['volumes'] = volumes 183 dep['spec']['containers'][0]['volumeMounts'] = volume_mounts
Create descriptions of Volumes that will hold the input files.
Parameters
- filenames (list): A list of file names to be mounted under /config/ in the running image.
185 def create_config_maps(self, file_names): 186 """Create Kubernetes ConfigMaps for the input files to the simulation. 187 188 Parameters 189 ---------- 190 file_names: list 191 Will go through every filename in this list and create a Kubernetes 192 ConfigMap with it's contents. 193 """ 194 for file_name, id_ in file_names: 195 with open(file_name, 'r') as fd: 196 data = fd.read() 197 metadata = V1ObjectMeta( 198 name=id_, 199 namespace='default' 200 ) 201 configmap = V1ConfigMap( 202 api_version='v1', 203 kind='ConfigMap', 204 data={os.path.basename(file_name): data}, 205 metadata=metadata 206 ) 207 self.core_v1.create_namespaced_config_map(namespace='default', body=configmap)
Create Kubernetes ConfigMaps for the input files to the simulation.
Parameters
- file_names (list): Will go through every filename in this list and create a Kubernetes ConfigMap with it's contents.