easyvvuq.actions.execute_kubernetes

Provides an action element to execute a simulation on a Kubernetes cluster and retrieve the output. The successful use of this actions requires that the Kubernetes cluster is properly set-up on the users system. Namely the ~/.kube/config file should contain valid information. Exact details will depend on the cloud service provider. Otherwise this action works similarly to how ExecuteLocal works. The difference is that the simulations are executed on a Kubernetes cluster. The input files are passed to the Pods via the ConfigMap mechanism. This probably limits the size of the configuration files but this can be alleviated with some kind of a pre-processing script on the Pod side. Likewise, output from the simulation is retrieved using the Kubernetes log mechanism. Therefore the simulation output needs to be printed to stdout on the Pod side. Again, if the simulation produces complicated or large output you should extract the quantitities of interest on the Pod using some kind of script and print them to stdout.

Examples

  1"""Provides an action element to execute a simulation on a Kubernetes
  2cluster and retrieve the output. The successful use of this actions
  3requires that the Kubernetes cluster is properly set-up on the users
  4system. Namely the ~/.kube/config file should contain valid
  5information. Exact details will depend on the cloud service
  6provider. Otherwise this action works similarly to how ExecuteLocal
  7works. The difference is that the simulations are executed on a
  8Kubernetes cluster. The input files are passed to the Pods via the
  9ConfigMap mechanism. This probably limits the size of the
 10configuration files but this can be alleviated with some kind of a
 11pre-processing script on the Pod side. Likewise, output from the
 12simulation is retrieved using the Kubernetes log mechanism. Therefore
 13the simulation output needs to be printed to stdout on the Pod
 14side. Again, if the simulation produces complicated or large output
 15you should extract the quantitities of interest on the Pod using some
 16kind of script and print them to stdout.
 17
 18Examples
 19--------
 20
 21"""
 22
 23import os
 24import logging
 25import uuid
 26import copy
 27import time
 28
 29from kubernetes.client.api import core_v1_api
 30from kubernetes import config
 31from kubernetes.client import V1ConfigMap, V1ObjectMeta
 32
 33__copyright__ = """
 34
 35    Copyright 2020 Vytautas Jancauskas
 36
 37    This file is part of EasyVVUQ
 38
 39    EasyVVUQ is free software: you can redistribute it and/or modify
 40    it under the terms of the Lesser GNU General Public License as published by
 41    the Free Software Foundation, either version 3 of the License, or
 42    (at your option) any later version.
 43
 44    EasyVVUQ is distributed in the hope that it will be useful,
 45    but WITHOUT ANY WARRANTY; without even the implied warranty of
 46    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 47    Lesser GNU General Public License for more details.
 48
 49    You should have received a copy of the Lesser GNU General Public License
 50    along with this program.  If not, see <https://www.gnu.org/licenses/>.
 51
 52"""
 53__license__ = "LGPL"
 54
 55logger = logging.getLogger(__name__)
 56
 57
 58class ExecuteKubernetes():
 59    """
 60
 61    Parameters
 62    ----------
 63    image: str
 64        Name of the repository e.g. orbitfold/easyvvuq:tagname.
 65    command: str
 66        A command to run the simulation from within the container.
 67    input_file_names: list
 68        A list of input files.
 69    output_file_names: list
 70        A list of output files.
 71    """
 72
 73    def __init__(self, image, command, input_file_names=None, output_file_name=None):
 74        pod_name = str(uuid.uuid4())
 75        container_name = str(uuid.uuid4())
 76        self.body = {
 77            'apiVersion': 'v1', 'kind': 'Pod', 'metadata': {'name': pod_name},
 78            'spec': {
 79                'restartPolicy': 'Never',
 80                'containers': [
 81                    {
 82                        'name': container_name,
 83                        'image': image,
 84                        'command': ['/bin/sh', '-c'],
 85                        'args': [command]
 86                    }
 87                ]
 88            }
 89        }
 90        self.input_file_names = input_file_names
 91        self.output_file_name = output_file_name
 92        config.load_kube_config()
 93        self.core_v1 = core_v1_api.CoreV1Api()
 94        self.pod_name = self.body['metadata']['name']
 95        self.namespace = "default"
 96        self._succeeded = False
 97        self._started = False
 98
 99    def start(self, previous=None):
100        """Will create the Kubernetes pod and hence start the action.
101
102        Parameters
103        ----------
104        previous: dict
105            Data from previous Action.
106
107        Returns
108        -------
109        dict
110            Data from previous Action appended with data from this Action.
111        """
112        target_dir = previous['rundir']
113        if self.input_file_names is None:
114            self.input_file_names = [previous['encoder_filename']]
115        if self.output_file_name is None:
116            self.output_file_name = previous['decoder_filename']
117        file_names = [(os.path.join(target_dir, input_file_name), str(uuid.uuid4()))
118                      for input_file_name in self.input_file_names]
119        self.config_names = file_names
120        dep = copy.deepcopy(self.body)
121        dep['metadata']['name'] = str(uuid.uuid4())
122        self.create_config_maps(self.config_names)
123        self.create_volumes(self.config_names, dep)
124        self.core_v1.create_namespaced_pod(body=dep, namespace="default")
125        self._started = True
126        self.result = previous
127        while not self.finished():
128            time.wait(5)
129        self.finalise()
130        return previous
131
132    def finished(self):
133        """Will return True if the pod has finished, otherwise will return False.
134        """
135        resp = self.core_v1.read_namespaced_pod(
136            name=self.pod_name, namespace=self.namespace)
137        if resp.status.phase not in ['Pending', 'Running']:
138            if resp.status.phase == 'Succeeded':
139                self._succeeded = True
140            return True
141        else:
142            return False
143
144    def finalise(self):
145        """Will read the logs from the Kubernetes pod, output them to a file and
146        delete the Kubernetes resources we have allocated.
147        """
148        if not (self.finished() and self.succeeded()):
149            raise RuntimeError("Cannot finalise an Action that hasn't finished.")
150        log_ = self.core_v1.read_namespaced_pod_log(
151            self.pod_name, namespace=self.namespace)
152        with open(self.outfile, 'w') as fd:
153            fd.write(log_)
154        for _, id_ in self.config_names:
155            self.core_v1.delete_namespaced_config_map(
156                id_, namespace=self.namespace)
157        self.core_v1.delete_namespaced_pod(
158            self.pod_name, namespace=self.namespace)
159
160    def succeeded(self):
161        """Will return True if the pod has finished successfully, otherwise will return False.
162        If the job hasn't finished yet will return False.
163        """
164        return self._succeeded
165
166    def create_volumes(self, file_names, dep):
167        """Create descriptions of Volumes that will hold the input files.
168
169        Parameters
170        ----------
171        filenames: list
172            A list of file names to be mounted under /config/ in the running image.
173        """
174        volumes = [{'name': id_ + '-volume', 'configMap': {'name': id_}}
175                   for _, id_ in file_names]
176        volume_mounts = [{'name': id_ + '-volume',
177                          'mountPath': os.path.join('/config/', os.path.basename(file_name)),
178                          'subPath': os.path.basename(file_name),
179                          'readOnly': True}
180                         for file_name, id_ in file_names]
181        dep['spec']['volumes'] = volumes
182        dep['spec']['containers'][0]['volumeMounts'] = volume_mounts
183
184    def create_config_maps(self, file_names):
185        """Create Kubernetes ConfigMaps for the input files to the simulation.
186
187        Parameters
188        ----------
189        file_names: list
190            Will go through every filename in this list and create a Kubernetes
191            ConfigMap with it's contents.
192        """
193        for file_name, id_ in file_names:
194            with open(file_name, 'r') as fd:
195                data = fd.read()
196            metadata = V1ObjectMeta(
197                name=id_,
198                namespace='default'
199            )
200            configmap = V1ConfigMap(
201                api_version='v1',
202                kind='ConfigMap',
203                data={os.path.basename(file_name): data},
204                metadata=metadata
205            )
206            self.core_v1.create_namespaced_config_map(namespace='default', body=configmap)
logger = <Logger easyvvuq.actions.execute_kubernetes (DEBUG)>
class ExecuteKubernetes:
 59class ExecuteKubernetes():
 60    """
 61
 62    Parameters
 63    ----------
 64    image: str
 65        Name of the repository e.g. orbitfold/easyvvuq:tagname.
 66    command: str
 67        A command to run the simulation from within the container.
 68    input_file_names: list
 69        A list of input files.
 70    output_file_names: list
 71        A list of output files.
 72    """
 73
 74    def __init__(self, image, command, input_file_names=None, output_file_name=None):
 75        pod_name = str(uuid.uuid4())
 76        container_name = str(uuid.uuid4())
 77        self.body = {
 78            'apiVersion': 'v1', 'kind': 'Pod', 'metadata': {'name': pod_name},
 79            'spec': {
 80                'restartPolicy': 'Never',
 81                'containers': [
 82                    {
 83                        'name': container_name,
 84                        'image': image,
 85                        'command': ['/bin/sh', '-c'],
 86                        'args': [command]
 87                    }
 88                ]
 89            }
 90        }
 91        self.input_file_names = input_file_names
 92        self.output_file_name = output_file_name
 93        config.load_kube_config()
 94        self.core_v1 = core_v1_api.CoreV1Api()
 95        self.pod_name = self.body['metadata']['name']
 96        self.namespace = "default"
 97        self._succeeded = False
 98        self._started = False
 99
100    def start(self, previous=None):
101        """Will create the Kubernetes pod and hence start the action.
102
103        Parameters
104        ----------
105        previous: dict
106            Data from previous Action.
107
108        Returns
109        -------
110        dict
111            Data from previous Action appended with data from this Action.
112        """
113        target_dir = previous['rundir']
114        if self.input_file_names is None:
115            self.input_file_names = [previous['encoder_filename']]
116        if self.output_file_name is None:
117            self.output_file_name = previous['decoder_filename']
118        file_names = [(os.path.join(target_dir, input_file_name), str(uuid.uuid4()))
119                      for input_file_name in self.input_file_names]
120        self.config_names = file_names
121        dep = copy.deepcopy(self.body)
122        dep['metadata']['name'] = str(uuid.uuid4())
123        self.create_config_maps(self.config_names)
124        self.create_volumes(self.config_names, dep)
125        self.core_v1.create_namespaced_pod(body=dep, namespace="default")
126        self._started = True
127        self.result = previous
128        while not self.finished():
129            time.wait(5)
130        self.finalise()
131        return previous
132
133    def finished(self):
134        """Will return True if the pod has finished, otherwise will return False.
135        """
136        resp = self.core_v1.read_namespaced_pod(
137            name=self.pod_name, namespace=self.namespace)
138        if resp.status.phase not in ['Pending', 'Running']:
139            if resp.status.phase == 'Succeeded':
140                self._succeeded = True
141            return True
142        else:
143            return False
144
145    def finalise(self):
146        """Will read the logs from the Kubernetes pod, output them to a file and
147        delete the Kubernetes resources we have allocated.
148        """
149        if not (self.finished() and self.succeeded()):
150            raise RuntimeError("Cannot finalise an Action that hasn't finished.")
151        log_ = self.core_v1.read_namespaced_pod_log(
152            self.pod_name, namespace=self.namespace)
153        with open(self.outfile, 'w') as fd:
154            fd.write(log_)
155        for _, id_ in self.config_names:
156            self.core_v1.delete_namespaced_config_map(
157                id_, namespace=self.namespace)
158        self.core_v1.delete_namespaced_pod(
159            self.pod_name, namespace=self.namespace)
160
161    def succeeded(self):
162        """Will return True if the pod has finished successfully, otherwise will return False.
163        If the job hasn't finished yet will return False.
164        """
165        return self._succeeded
166
167    def create_volumes(self, file_names, dep):
168        """Create descriptions of Volumes that will hold the input files.
169
170        Parameters
171        ----------
172        filenames: list
173            A list of file names to be mounted under /config/ in the running image.
174        """
175        volumes = [{'name': id_ + '-volume', 'configMap': {'name': id_}}
176                   for _, id_ in file_names]
177        volume_mounts = [{'name': id_ + '-volume',
178                          'mountPath': os.path.join('/config/', os.path.basename(file_name)),
179                          'subPath': os.path.basename(file_name),
180                          'readOnly': True}
181                         for file_name, id_ in file_names]
182        dep['spec']['volumes'] = volumes
183        dep['spec']['containers'][0]['volumeMounts'] = volume_mounts
184
185    def create_config_maps(self, file_names):
186        """Create Kubernetes ConfigMaps for the input files to the simulation.
187
188        Parameters
189        ----------
190        file_names: list
191            Will go through every filename in this list and create a Kubernetes
192            ConfigMap with it's contents.
193        """
194        for file_name, id_ in file_names:
195            with open(file_name, 'r') as fd:
196                data = fd.read()
197            metadata = V1ObjectMeta(
198                name=id_,
199                namespace='default'
200            )
201            configmap = V1ConfigMap(
202                api_version='v1',
203                kind='ConfigMap',
204                data={os.path.basename(file_name): data},
205                metadata=metadata
206            )
207            self.core_v1.create_namespaced_config_map(namespace='default', body=configmap)
Parameters
  • image (str): Name of the repository e.g. orbitfold/easyvvuq:tagname.
  • command (str): A command to run the simulation from within the container.
  • input_file_names (list): A list of input files.
  • output_file_names (list): A list of output files.
ExecuteKubernetes(image, command, input_file_names=None, output_file_name=None)
74    def __init__(self, image, command, input_file_names=None, output_file_name=None):
75        pod_name = str(uuid.uuid4())
76        container_name = str(uuid.uuid4())
77        self.body = {
78            'apiVersion': 'v1', 'kind': 'Pod', 'metadata': {'name': pod_name},
79            'spec': {
80                'restartPolicy': 'Never',
81                'containers': [
82                    {
83                        'name': container_name,
84                        'image': image,
85                        'command': ['/bin/sh', '-c'],
86                        'args': [command]
87                    }
88                ]
89            }
90        }
91        self.input_file_names = input_file_names
92        self.output_file_name = output_file_name
93        config.load_kube_config()
94        self.core_v1 = core_v1_api.CoreV1Api()
95        self.pod_name = self.body['metadata']['name']
96        self.namespace = "default"
97        self._succeeded = False
98        self._started = False
body
input_file_names
output_file_name
core_v1
pod_name
namespace
def start(self, previous=None):
100    def start(self, previous=None):
101        """Will create the Kubernetes pod and hence start the action.
102
103        Parameters
104        ----------
105        previous: dict
106            Data from previous Action.
107
108        Returns
109        -------
110        dict
111            Data from previous Action appended with data from this Action.
112        """
113        target_dir = previous['rundir']
114        if self.input_file_names is None:
115            self.input_file_names = [previous['encoder_filename']]
116        if self.output_file_name is None:
117            self.output_file_name = previous['decoder_filename']
118        file_names = [(os.path.join(target_dir, input_file_name), str(uuid.uuid4()))
119                      for input_file_name in self.input_file_names]
120        self.config_names = file_names
121        dep = copy.deepcopy(self.body)
122        dep['metadata']['name'] = str(uuid.uuid4())
123        self.create_config_maps(self.config_names)
124        self.create_volumes(self.config_names, dep)
125        self.core_v1.create_namespaced_pod(body=dep, namespace="default")
126        self._started = True
127        self.result = previous
128        while not self.finished():
129            time.wait(5)
130        self.finalise()
131        return previous

Will create the Kubernetes pod and hence start the action.

Parameters
  • previous (dict): Data from previous Action.
Returns
  • dict: Data from previous Action appended with data from this Action.
def finished(self):
133    def finished(self):
134        """Will return True if the pod has finished, otherwise will return False.
135        """
136        resp = self.core_v1.read_namespaced_pod(
137            name=self.pod_name, namespace=self.namespace)
138        if resp.status.phase not in ['Pending', 'Running']:
139            if resp.status.phase == 'Succeeded':
140                self._succeeded = True
141            return True
142        else:
143            return False

Will return True if the pod has finished, otherwise will return False.

def finalise(self):
145    def finalise(self):
146        """Will read the logs from the Kubernetes pod, output them to a file and
147        delete the Kubernetes resources we have allocated.
148        """
149        if not (self.finished() and self.succeeded()):
150            raise RuntimeError("Cannot finalise an Action that hasn't finished.")
151        log_ = self.core_v1.read_namespaced_pod_log(
152            self.pod_name, namespace=self.namespace)
153        with open(self.outfile, 'w') as fd:
154            fd.write(log_)
155        for _, id_ in self.config_names:
156            self.core_v1.delete_namespaced_config_map(
157                id_, namespace=self.namespace)
158        self.core_v1.delete_namespaced_pod(
159            self.pod_name, namespace=self.namespace)

Will read the logs from the Kubernetes pod, output them to a file and delete the Kubernetes resources we have allocated.

def succeeded(self):
161    def succeeded(self):
162        """Will return True if the pod has finished successfully, otherwise will return False.
163        If the job hasn't finished yet will return False.
164        """
165        return self._succeeded

Will return True if the pod has finished successfully, otherwise will return False. If the job hasn't finished yet will return False.

def create_volumes(self, file_names, dep):
167    def create_volumes(self, file_names, dep):
168        """Create descriptions of Volumes that will hold the input files.
169
170        Parameters
171        ----------
172        filenames: list
173            A list of file names to be mounted under /config/ in the running image.
174        """
175        volumes = [{'name': id_ + '-volume', 'configMap': {'name': id_}}
176                   for _, id_ in file_names]
177        volume_mounts = [{'name': id_ + '-volume',
178                          'mountPath': os.path.join('/config/', os.path.basename(file_name)),
179                          'subPath': os.path.basename(file_name),
180                          'readOnly': True}
181                         for file_name, id_ in file_names]
182        dep['spec']['volumes'] = volumes
183        dep['spec']['containers'][0]['volumeMounts'] = volume_mounts

Create descriptions of Volumes that will hold the input files.

Parameters
  • filenames (list): A list of file names to be mounted under /config/ in the running image.
def create_config_maps(self, file_names):
185    def create_config_maps(self, file_names):
186        """Create Kubernetes ConfigMaps for the input files to the simulation.
187
188        Parameters
189        ----------
190        file_names: list
191            Will go through every filename in this list and create a Kubernetes
192            ConfigMap with it's contents.
193        """
194        for file_name, id_ in file_names:
195            with open(file_name, 'r') as fd:
196                data = fd.read()
197            metadata = V1ObjectMeta(
198                name=id_,
199                namespace='default'
200            )
201            configmap = V1ConfigMap(
202                api_version='v1',
203                kind='ConfigMap',
204                data={os.path.basename(file_name): data},
205                metadata=metadata
206            )
207            self.core_v1.create_namespaced_config_map(namespace='default', body=configmap)

Create Kubernetes ConfigMaps for the input files to the simulation.

Parameters
  • file_names (list): Will go through every filename in this list and create a Kubernetes ConfigMap with it's contents.