easyvvuq.analysis.pce_analysis

Analysis element for polynomial chaos expansion (PCE). We use ChaosPy under the hood for this functionality.

  1"""Analysis element for polynomial chaos expansion (PCE). We use ChaosPy
  2under the hood for this functionality.
  3"""
  4import logging
  5import chaospy as cp
  6import numpy as np
  7import numpoly
  8import warnings
  9from easyvvuq import OutputType
 10from .base import BaseAnalysisElement
 11from .results import AnalysisResults
 12from .qmc_analysis import QMCAnalysisResults
 13import traceback
 14
 15__author__ = 'Jalal Lakhlili'
 16__license__ = "LGPL"
 17
 18logger = logging.getLogger(__name__)
 19
 20
 21class PCEAnalysisResults(QMCAnalysisResults):
 22    """Analysis results for the PCEAnalysis class.
 23    """
 24
 25    def _get_derivatives_first(self, qoi, input_):
 26        """Returns the first order derivative-based index for a given qoi wrt input variable.
 27
 28        Parameters
 29        ----------
 30        qoi : str
 31           Quantity of interest
 32        input_ : str
 33           Input variable
 34
 35        Returns
 36        -------
 37        float
 38            First order derivative-based index.
 39        """
 40        raw_dict = AnalysisResults._keys_to_tuples(self.raw_data['derivatives_first'])
 41        return raw_dict[AnalysisResults._to_tuple(qoi)][input_]
 42
 43    def _get_sobols_first(self, qoi, input_):
 44        """Returns the first order sobol index for a given qoi wrt input variable.
 45
 46        Parameters
 47        ----------
 48        qoi : str
 49           Quantity of interest
 50        input_ : str
 51           Input variable
 52
 53        Returns
 54        -------
 55        float
 56            First order sobol index.
 57        """
 58        raw_dict = AnalysisResults._keys_to_tuples(self.raw_data['sobols_first'])
 59        return raw_dict[AnalysisResults._to_tuple(qoi)][input_]
 60
 61    def _get_sobols_second(self, qoi, input_):
 62        """Returns the second order sobol index for a given qoi wrt input variable.
 63
 64        Parameters
 65        ----------
 66        qoi : str
 67           Quantity of interest
 68        input_ : str
 69           Input variable
 70
 71        Returns
 72        -------
 73        float
 74            Second order sobol index.
 75        """
 76        raw_dict = AnalysisResults._keys_to_tuples(self.raw_data['sobols_second'])
 77        return dict([(in_, raw_dict[AnalysisResults._to_tuple(qoi)][input_][i])
 78                     for i, in_ in enumerate(self.inputs) if in_ != input_])
 79
 80    def _get_sobols_total(self, qoi, input_):
 81        """Returns the total order sobol index for a given qoi wrt input variable.
 82
 83        Parameters
 84        ----------
 85        qoi : str
 86           Quantity of interest
 87        input_ : str
 88           Input variable
 89
 90        Returns
 91        -------
 92        float
 93            Total order sobol index.
 94        """
 95        raw_dict = AnalysisResults._keys_to_tuples(self.raw_data['sobols_total'])
 96        return raw_dict[AnalysisResults._to_tuple(qoi)][input_]
 97
 98    def supported_stats(self):
 99        """Types of statistics supported by the describe method.
100
101        Returns
102        -------
103        list of str
104        """
105        return ['min', 'max', '10%', '90%', '1%', '99%', 'median',
106                'mean', 'var', 'std']
107
108    def _describe(self, qoi, statistic):
109        """Returns descriptive statistics, similar to pandas describe.
110
111        Parameters
112        ----------
113        qoi : str
114            Name of quantity of interest.
115        statistic : str
116            One of 'min', 'max', '10%', '90%', 'median', 'mean', 'var', 'std'
117
118        Returns
119        -------
120        float
121            Value of the requested statistic.
122        """
123        if statistic not in self.supported_stats():
124            raise NotImplementedError
125        if statistic == 'min':
126            return np.array([v.lower[0] for _, v in enumerate(
127                self.raw_data['output_distributions'][qoi])]) if self.raw_data['output_distributions'][qoi] is not None else np.array([])
128        elif statistic == 'max':
129            return np.array([v.upper[0] for _, v in enumerate(
130                self.raw_data['output_distributions'][qoi])]) if self.raw_data['output_distributions'][qoi] is not None else np.array([])
131        elif statistic == '1%':
132            if isinstance(self.raw_data['percentiles'][qoi]['p01'], np.ndarray):
133                return self.raw_data['percentiles'][qoi]['p01']
134            else:
135                return np.array([self.raw_data['percentiles'][qoi]['p01']])
136        elif statistic == '10%':
137            if isinstance(self.raw_data['percentiles'][qoi]['p10'], np.ndarray):
138                return self.raw_data['percentiles'][qoi]['p10']
139            else:
140                return np.array([self.raw_data['percentiles'][qoi]['p10']])
141        elif statistic == '90%':
142            if isinstance(self.raw_data['percentiles'][qoi]['p90'], np.ndarray):
143                return self.raw_data['percentiles'][qoi]['p90']
144            else:
145                return np.array([self.raw_data['percentiles'][qoi]['p90']])
146        elif statistic == '99%':
147            if isinstance(self.raw_data['percentiles'][qoi]['p99'], np.ndarray):
148                return self.raw_data['percentiles'][qoi]['p99']
149            else:
150                return np.array([self.raw_data['percentiles'][qoi]['p99']])
151        elif statistic == 'median':
152            if isinstance(self.raw_data['percentiles'][qoi]['p50'], np.ndarray):
153                return self.raw_data['percentiles'][qoi]['p50']
154            else:
155                return np.array([self.raw_data['percentiles'][qoi]['p50']])
156        else:
157            try:
158                return self.raw_data['statistical_moments'][qoi][statistic]
159            except KeyError:
160                raise NotImplementedError
161
162    def surrogate(self):
163        """Return a PCE surrogate model.
164
165        Returns
166        -------
167        A function that takes a dictionary of parameter - value pairs and returns
168        a dictionary with the results (same output as decoder).
169        """
170        def surrogate_fn(inputs):
171            def swap(x):
172                if len(x) > 1:
173                    return list(x)
174                else:
175                    return x[0]
176            values = np.array([inputs[key] for key in self.inputs])
177            results = dict([(qoi, swap((self.raw_data['fit'][qoi](*values)).T))
178                           for qoi in self.qois])
179            return results
180        return surrogate_fn
181
182    def get_distribution(self, qoi):
183        """Returns a distribution for the given qoi.
184
185        Parameters
186        ----------
187        qoi: str
188            QoI name
189
190        Returns
191        -------
192        A ChaosPy PDF
193        """
194        if qoi not in self.qois:
195            raise RuntimeError('no such quantity of interest - {}'.format(qoi))
196        return self.raw_data['output_distributions'][qoi]
197
198
199class PCEAnalysis(BaseAnalysisElement):
200
201    def __init__(self, sampler=None, qoi_cols=None, sampling=False, CorrelationMatrices=True, OutputDistributions=True):
202        """Analysis element for polynomial chaos expansion (PCE).
203
204        Parameters
205        ----------
206        sampler : PCESampler
207            Sampler used to initiate the PCE analysis.
208        qoi_cols : list or None
209            Column names for quantities of interest (for which analysis is
210            performed).
211        sampling : True if chaospy sampling method to be used for calculating
212            statistical quantities; otherwise [default] the pce coefficients are used
213        CorrelationMatrices : boolean
214            if False then disable the calculation of the Correlation Matrices, otherwise 
215            [default] calculate them
216        OutputDistributions : boolean
217            if False then disable the calculation of the Output Distributions, otherwise
218            [default] calculate them
219        """
220
221        if sampler is None:
222            msg = 'PCE analysis requires a paired sampler to be passed'
223            raise RuntimeError(msg)
224
225        # Flag specifing if we should scale the runs with the nominal base run
226        self.relative_analysis = sampler.relative_analysis
227
228        if qoi_cols is None:
229            raise RuntimeError("Analysis element requires a list of "
230                               "quantities of interest (qoi)")
231
232        self.qoi_cols = qoi_cols
233        self.sampling = sampling
234        self.output_type = OutputType.SUMMARY
235        self.sampler = sampler
236        self.CorrelationMatrices = CorrelationMatrices
237        self.CorrelationMatrices_messages = 0
238        self.OutputDistributions = OutputDistributions
239        self.OutputDistributions_messages = 0
240
241    def element_name(self):
242        """Name for this element for logging purposes.
243
244        Returns
245        -------
246        str
247            "PCE_Analysis"
248        """
249        return "PCE_Analysis"
250
251    def element_version(self):
252        """Version of this element for logging purposes.
253
254        Returns  ## print out the warning only once
255                    self.OutputDistributions_messages += 1
256        -------
257        str
258            Element version.
259        """
260        return "0.6"
261
262    def analyse(self, data_frame=None):
263        """Perform PCE analysis on input `data_frame`.
264
265        Parameters
266        ----------
267        data_frame : pandas DataFrame
268            Input data for analysis.
269
270        Returns
271        -------
272        PCEAnalysisResults
273            Use it to get the sobol indices and other information.
274        """
275
276        def sobols(P, coefficients):
277            """ Utility routine to calculate sobols based on coefficients
278            """
279            A = np.array(P.coefficients) != 0
280            multi_indices = np.array([P.exponents[A[:, i]].sum(axis=0) for i in range(A.shape[1])])
281            sobol_mask = multi_indices != 0
282            _, index = np.unique(sobol_mask, axis=0, return_index=True)
283            index = np.sort(index)
284            sobol_idx_bool = sobol_mask[index]
285            sobol_idx_bool = np.delete(sobol_idx_bool, [0], axis=0)
286            n_sobol_available = sobol_idx_bool.shape[0]
287            if len(coefficients.shape) == 1:
288                n_out = 1
289            else:
290                n_out = coefficients.shape[1]
291            n_coeffs = coefficients.shape[0]
292            sobol_poly_idx = np.zeros([n_coeffs, n_sobol_available])
293            for i_sobol in range(n_sobol_available):
294                sobol_poly_idx[:, i_sobol] = np.all(sobol_mask == sobol_idx_bool[i_sobol], axis=1)
295            sobol = np.zeros([n_sobol_available, n_out])
296            for i_sobol in range(n_sobol_available):
297                sobol[i_sobol] = np.sum(
298                    np.square(coefficients[sobol_poly_idx[:, i_sobol] == 1]), axis=0)
299            idx_sort_descend_1st = np.argsort(sobol[:, 0], axis=0)[::-1]
300            sobol = sobol[idx_sort_descend_1st, :]
301            sobol_idx_bool = sobol_idx_bool[idx_sort_descend_1st]
302            sobol_idx = [0 for _ in range(sobol_idx_bool.shape[0])]
303            for i_sobol in range(sobol_idx_bool.shape[0]):
304                sobol_idx[i_sobol] = np.array(
305                    [i for i, x in enumerate(sobol_idx_bool[i_sobol, :]) if x])
306            var = ((coefficients[1:]**2).sum(axis=0))
307            sobol = sobol / (var + np.finfo(float).tiny)
308            return sobol, sobol_idx, sobol_idx_bool
309
310        def build_surrogate_der(Y_hat, verbose=False):
311            '''Computes derivative of the polynomial Y_hat w.r.t. Vars
312            Parameter T specifies the time dimension
313            '''
314
315            # Build derivative with respect to all variables
316            dim = len(self.sampler.vary.vary_dict)
317            if dim < 1:
318                return 0
319            elif dim == 1:
320                Vars = [cp.variable(dim).names[0]]
321            else:
322                Vars = [v.names[0] for v in cp.variable(dim)]
323
324            T = len(Y_hat)
325
326            assert(len(Vars) == len(self.sampler.vary.vary_dict))
327
328            # derivative of the PCE expansion
329            # {dYhat_dx1: [t0, t1, ...],
330            #  dYhat_dx2: [t0, t1, ...],
331            #  ...,
332            #  dYhat_dxN: [t0, t1, ...] }
333            dY_hat = {v:[cp.polynomial(0) for t in range(T)] for v in self.sampler.vary.vary_dict}
334
335            for t in range(T):
336
337                for n1, n2 in zip(Y_hat[t].names, Vars):
338                    assert(n1 == n2)
339
340                for d_var_idx, (d_var, d_var_app) in enumerate(zip(Vars, self.sampler.vary.vary_dict)):
341
342                    if verbose:
343                        print(f'Computing derivative d(Y_hat)/d({d_var})')
344                        print('='*40)
345
346                    # Some variables are missing in the expression,
347                    # then they must be constant terms only i.e. sum(exp==0)
348                    if Y_hat[t].exponents.shape[1] < dim:
349                        #exponents.shape: (n_summands, n_variables)
350                        assert(sum(sum(np.array(Y_hat[t].exponents))) == 0)
351                        continue
352
353                    # Consider only polynomial components var^exp where exp > 0 (since the derivative decreases exp by -1)
354                    components_mask = np.array(Y_hat[t].exponents[:,d_var_idx] > 0)
355                    dY_hat_dvar_exp = Y_hat[t].exponents[components_mask]
356                    dY_hat_dvar_coeff = np.array(Y_hat[t].coefficients)[components_mask]
357
358                    # Iterate over all polynomial components (summands)
359                    for i, (coeff, exp) in enumerate(zip(dY_hat_dvar_coeff, dY_hat_dvar_exp)):
360                        assert(exp[d_var_idx] > 0)
361
362                        # derivative = coeff*exp * var^(exp-1)
363                        dY_hat_dvar_coeff[i] = coeff * exp[d_var_idx]
364                        dY_hat_dvar_exp[i][d_var_idx] = exp[d_var_idx] - 1
365
366                    dY_hat[d_var_app][t] = numpoly.construct.polynomial_from_attributes(
367                                exponents=dY_hat_dvar_exp,
368                                coefficients=dY_hat_dvar_coeff,
369                                names=Y_hat[t].names,
370                                retain_coefficients=True,
371                                retain_names=True)
372
373            return dY_hat
374
375        if data_frame is None:
376            raise RuntimeError("Analysis element needs a data frame to "
377                               "analyse")
378        elif data_frame.empty:
379            raise RuntimeError(
380                "No data in data frame passed to analyse element")
381
382        qoi_cols = self.qoi_cols
383        T = len(data_frame[qoi_cols[0]].values[-1])
384
385        results = {'statistical_moments': {k: {'mean':np.zeros(T),
386                                               'var':np.zeros(T),
387                                               'std':np.zeros(T)} for k in qoi_cols},
388                   'percentiles': {k: {'p01': np.zeros(T),
389                                       'p10': np.zeros(T),
390                                       'p50': np.zeros(T),
391                                       'p90': np.zeros(T),
392                                       'p99': np.zeros(T)} for k in qoi_cols},
393                   'sobols_first': {k: {p: np.zeros(T) for p in self.sampler.vary.vary_dict} for k in qoi_cols},
394                   'sobols_second': {k: {p: np.zeros(T) for p in self.sampler.vary.vary_dict} for k in qoi_cols},
395                   'sobols_total': {k: {p: np.zeros(T) for p in self.sampler.vary.vary_dict} for k in qoi_cols},
396                   'correlation_matrices': {k: {} for k in qoi_cols},
397                   'output_distributions': {k: {} for k in qoi_cols},
398                   'fit': {k: cp.polynomial(np.zeros(T)) for k in qoi_cols},
399                   'Fourier_coefficients': {k: {p: np.zeros(T) for p in self.sampler.vary.vary_dict} for k in qoi_cols},
400                   'derivatives_first': {k: {p: np.zeros(T) for p in self.sampler.vary.vary_dict} for k in qoi_cols},
401                   }
402
403        # Get sampler informations
404        P = self.sampler.P
405        nodes = self.sampler._nodes
406        weights = self.sampler._weights
407        regression = self.sampler.regression
408
409        samples = {k: [] for k in qoi_cols}
410        for k in qoi_cols:
411            if self.relative_analysis:
412                base = data_frame[k].values[self.sampler.n_samples]
413                if np.all(np.array(base) == 0):
414                    warnings.warn(f"Removing QoI {k} from the analysis, contains all zeros", RuntimeWarning)
415                    continue
416                if np.any(np.array(base) == 0):
417                    warnings.warn(f"Removing QoI {k} from the analysis, contains some zeros", RuntimeWarning)
418                    continue
419
420            samples[k] = data_frame[k].values[:self.sampler.n_samples]
421
422            # Compute descriptive statistics for each quantity of interest
423            if regression:
424                fit, fc = cp.fit_regression(P, [n[:self.sampler.n_samples] for n in nodes], samples[k], retall=1)
425            else:
426                fit, fc = cp.fit_quadrature(P, nodes, weights, samples[k], retall=1)
427            results['fit'][k] = fit
428            results['Fourier_coefficients'][k] = fc
429
430            # Percentiles: 1%, 10%, 50%, 90% and 99%
431            P01, P10, P50, P90, P99 = cp.Perc(
432                fit, [1, 10, 50, 90, 99], self.sampler.distribution).squeeze()
433            results['percentiles'][k] = {'p01': P01, 'p10': P10, 'p50': P50, 'p90': P90, 'p99': P99}
434
435            if self.sampling:  # use Chaospy's sampling method
436
437                # Statistical moments
438                mean = cp.E(fit, self.sampler.distribution)
439                var = cp.Var(fit, self.sampler.distribution)
440                std = cp.Std(fit, self.sampler.distribution)
441                results['statistical_moments'][k] = {'mean': mean,
442                                                     'var': var,
443                                                     'std': std}
444
445                sobols_first_narr = cp.Sens_m(fit, self.sampler.distribution)
446                sobols_second_narr = cp.Sens_m2(fit, self.sampler.distribution)
447                sobols_total_narr = cp.Sens_t(fit, self.sampler.distribution)
448                sobols_first_dict = {}
449                sobols_second_dict = {}
450                sobols_total_dict = {}
451                for i, param_name in enumerate(self.sampler.vary.vary_dict):
452                    sobols_first_dict[param_name] = sobols_first_narr[i]
453                    sobols_second_dict[param_name] = sobols_second_narr[i]
454                    sobols_total_dict[param_name] = sobols_total_narr[i]
455
456                results['sobols_first'][k] = sobols_first_dict
457                results['sobols_second'][k] = sobols_second_dict
458                results['sobols_total'][k] = sobols_total_dict
459
460            else:  # use PCE coefficients
461
462                # Statistical moments
463                mean = fc[0]
464                var = np.sum(fc[1:]**2, axis=0)
465                std = np.sqrt(var)
466                results['statistical_moments'][k] = {'mean': mean,
467                                                     'var': var,
468                                                     'std': std}
469
470                # Sensitivity Analysis: First, Second and Total Sobol indices
471                sobol, sobol_idx, _ = sobols(P, fc)
472                varied = [_ for _ in self.sampler.vary.get_keys()]
473                S1 = {_: np.zeros(sobol.shape[-1]) for _ in varied}
474                ST = {_: np.zeros(sobol.shape[-1]) for _ in varied}
475                # S2 = {_ : {__: np.zeros(sobol.shape[-1]) for __ in varied} for _ in varied}
476                # for v in varied: del S2[v][v]
477                S2 = {_: np.zeros((len(varied), sobol.shape[-1])) for _ in varied}
478                for n, si in enumerate(sobol_idx):
479                    if len(si) == 1:
480                        v = varied[si[0]]
481                        S1[v] = sobol[n]
482                    elif len(si) == 2:
483                        v1 = varied[si[0]]
484                        v2 = varied[si[1]]
485                        # S2[v1][v2] = sobol[n]
486                        # S2[v2][v1] = sobol[n]
487                        S2[v1][si[1]] = sobol[n]
488                        S2[v2][si[0]] = sobol[n]
489                    for i in si:
490                        ST[varied[i]] += sobol[n]
491
492                results['sobols_first'][k] = S1
493                results['sobols_second'][k] = S2
494                results['sobols_total'][k] = ST
495
496            # Sensitivity Analysis: Derivative based
497            try:
498                dY_hat = build_surrogate_der(fit, verbose=False)
499                derivatives_first_dict = {}
500                Ndimensions = len(self.sampler.vary.vary_dict)
501                for i, param_name in enumerate(self.sampler.vary.vary_dict):
502                    if self.sampler.nominal_value:
503                        # Evaluate dY_hat['param'] at the nominal value of the parameters
504                        values = self.sampler.nominal_value
505                        logging.info(f"Using nominal value of the parameters to evaluate the derivative ")
506                        derivatives_first_dict[param_name] = cp.polynomial(dY_hat[param_name])(*[v for v in values.values()])
507                    elif all([type(v) == type(cp.Normal()) for v in self.sampler.vary.vary_dict.values()]):
508                        # Evaluate dY_hat['param'] at the mean of the parameters
509                        logging.info(f"Using mean value of the parameters to evaluate the derivative ")
510                        derivatives_first_dict[param_name] = cp.polynomial(dY_hat[param_name])(*[v.get_mom_parameters()["shift"][0] for v in self.sampler.vary.vary_dict.values()])
511                    elif all([type(v) == type(cp.Uniform()) for v in self.sampler.vary.vary_dict.values()]):
512                        logging.info(f"Using mean value of the parameters to evaluate the derivative ")
513                        # Evaluate dY_hat['param'] at the mean of the parameters
514                        derivatives_first_dict[param_name] = cp.polynomial(dY_hat[param_name])(*[(v.lower + v.upper)/2.0 for v in self.sampler.vary.vary_dict.values()])
515                    else:
516                        # Evaluate dY_hat['param'] at the zero vector
517                        logging.info(f"Using zero vector to evaluate the derivative ")
518                        derivatives_first_dict[param_name] = cp.polynomial(dY_hat[param_name])(*np.zeros(Ndimensions))
519
520                    results['derivatives_first'][k] = derivatives_first_dict
521
522            except Exception:
523                traceback.print_exc()
524
525            # Transform the relative numbers back to the absolute values
526            if self.relative_analysis:
527                base = data_frame[k].values[-1]
528
529                results['percentiles'][k]['p01'] = (1.0 + results['percentiles'][k]['p01']) * base
530                results['percentiles'][k]['p10'] = (1.0 + results['percentiles'][k]['p10']) * base
531                results['percentiles'][k]['p50'] = (1.0 + results['percentiles'][k]['p50']) * base
532                results['percentiles'][k]['p90'] = (1.0 + results['percentiles'][k]['p90']) * base
533                results['percentiles'][k]['p99'] = (1.0 + results['percentiles'][k]['p99']) * base
534                results['statistical_moments'][k]['mean'] = (1.0 + results['statistical_moments'][k]['mean']) * base
535                results['statistical_moments'][k]['var']  = (1.0 + results['statistical_moments'][k]['var']) * base
536                results['statistical_moments'][k]['std']  = (1.0 + results['statistical_moments'][k]['std']) * base
537
538            # Correlation matrix
539            try:
540                if self.sampler._is_dependent:
541                    if self.CorrelationMatrices_messages == 0: warnings.warn(f"Skipping computation of cp.Corr", RuntimeWarning)  ## print out the warning only once
542                    self.CorrelationMatrices_messages += 1
543                    results['correlation_matrices'][k] = None
544                else:
545                    if self.CorrelationMatrices:
546                        results['correlation_matrices'][k] = cp.Corr(fit, self.sampler.distribution)
547                    else:
548                        if self.CorrelationMatrices_messages == 0: warnings.warn(f"Skipping computation of cp.Corr", RuntimeWarning)  ## print out the warning only once
549                        self.CorrelationMatrices_messages += 1
550                        results['correlation_matrices'][k] = None
551            except Exception as e:
552                print ('Error %s for %s when computing cp.Corr()'% (e.__class__.__name__, k))
553                results['correlation_matrices'][k] = None
554            
555
556            # Output distributions
557            try:
558                if self.sampler._is_dependent:
559                    if self.OutputDistributions_messages == 0: warnings.warn(f"Skipping computation of cp.QoI_Dist", RuntimeWarning)  ## print out the warning only once
560                    self.OutputDistributions_messages += 1
561                    results['output_distributions'][k] = None
562                else:
563                    if self.OutputDistributions:
564                        results['output_distributions'][k] = cp.QoI_Dist( fit, self.sampler.distribution)
565                    else:
566                        if self.OutputDistributions_messages == 0: warnings.warn(f"Skipping computation of cp.QoI_Dist", RuntimeWarning)  ## print out the warning only once
567                        self.OutputDistributions_messages += 1
568                        results['output_distributions'][k] = None                        
569            except Exception as e:
570                print ('Error %s for %s when computing cp.QoI_Dist()'% (e.__class__.__name__, k))
571#                from traceback import print_exc
572#                print_exc()
573                results['output_distributions'][k] = None
574
575        return PCEAnalysisResults(raw_data=results, samples=data_frame,
576                                  qois=self.qoi_cols, inputs=list(self.sampler.vary.get_keys()))
logger = <Logger easyvvuq.analysis.pce_analysis (DEBUG)>
class PCEAnalysisResults(easyvvuq.analysis.qmc_analysis.QMCAnalysisResults):
 22class PCEAnalysisResults(QMCAnalysisResults):
 23    """Analysis results for the PCEAnalysis class.
 24    """
 25
 26    def _get_derivatives_first(self, qoi, input_):
 27        """Returns the first order derivative-based index for a given qoi wrt input variable.
 28
 29        Parameters
 30        ----------
 31        qoi : str
 32           Quantity of interest
 33        input_ : str
 34           Input variable
 35
 36        Returns
 37        -------
 38        float
 39            First order derivative-based index.
 40        """
 41        raw_dict = AnalysisResults._keys_to_tuples(self.raw_data['derivatives_first'])
 42        return raw_dict[AnalysisResults._to_tuple(qoi)][input_]
 43
 44    def _get_sobols_first(self, qoi, input_):
 45        """Returns the first order sobol index for a given qoi wrt input variable.
 46
 47        Parameters
 48        ----------
 49        qoi : str
 50           Quantity of interest
 51        input_ : str
 52           Input variable
 53
 54        Returns
 55        -------
 56        float
 57            First order sobol index.
 58        """
 59        raw_dict = AnalysisResults._keys_to_tuples(self.raw_data['sobols_first'])
 60        return raw_dict[AnalysisResults._to_tuple(qoi)][input_]
 61
 62    def _get_sobols_second(self, qoi, input_):
 63        """Returns the second order sobol index for a given qoi wrt input variable.
 64
 65        Parameters
 66        ----------
 67        qoi : str
 68           Quantity of interest
 69        input_ : str
 70           Input variable
 71
 72        Returns
 73        -------
 74        float
 75            Second order sobol index.
 76        """
 77        raw_dict = AnalysisResults._keys_to_tuples(self.raw_data['sobols_second'])
 78        return dict([(in_, raw_dict[AnalysisResults._to_tuple(qoi)][input_][i])
 79                     for i, in_ in enumerate(self.inputs) if in_ != input_])
 80
 81    def _get_sobols_total(self, qoi, input_):
 82        """Returns the total order sobol index for a given qoi wrt input variable.
 83
 84        Parameters
 85        ----------
 86        qoi : str
 87           Quantity of interest
 88        input_ : str
 89           Input variable
 90
 91        Returns
 92        -------
 93        float
 94            Total order sobol index.
 95        """
 96        raw_dict = AnalysisResults._keys_to_tuples(self.raw_data['sobols_total'])
 97        return raw_dict[AnalysisResults._to_tuple(qoi)][input_]
 98
 99    def supported_stats(self):
100        """Types of statistics supported by the describe method.
101
102        Returns
103        -------
104        list of str
105        """
106        return ['min', 'max', '10%', '90%', '1%', '99%', 'median',
107                'mean', 'var', 'std']
108
109    def _describe(self, qoi, statistic):
110        """Returns descriptive statistics, similar to pandas describe.
111
112        Parameters
113        ----------
114        qoi : str
115            Name of quantity of interest.
116        statistic : str
117            One of 'min', 'max', '10%', '90%', 'median', 'mean', 'var', 'std'
118
119        Returns
120        -------
121        float
122            Value of the requested statistic.
123        """
124        if statistic not in self.supported_stats():
125            raise NotImplementedError
126        if statistic == 'min':
127            return np.array([v.lower[0] for _, v in enumerate(
128                self.raw_data['output_distributions'][qoi])]) if self.raw_data['output_distributions'][qoi] is not None else np.array([])
129        elif statistic == 'max':
130            return np.array([v.upper[0] for _, v in enumerate(
131                self.raw_data['output_distributions'][qoi])]) if self.raw_data['output_distributions'][qoi] is not None else np.array([])
132        elif statistic == '1%':
133            if isinstance(self.raw_data['percentiles'][qoi]['p01'], np.ndarray):
134                return self.raw_data['percentiles'][qoi]['p01']
135            else:
136                return np.array([self.raw_data['percentiles'][qoi]['p01']])
137        elif statistic == '10%':
138            if isinstance(self.raw_data['percentiles'][qoi]['p10'], np.ndarray):
139                return self.raw_data['percentiles'][qoi]['p10']
140            else:
141                return np.array([self.raw_data['percentiles'][qoi]['p10']])
142        elif statistic == '90%':
143            if isinstance(self.raw_data['percentiles'][qoi]['p90'], np.ndarray):
144                return self.raw_data['percentiles'][qoi]['p90']
145            else:
146                return np.array([self.raw_data['percentiles'][qoi]['p90']])
147        elif statistic == '99%':
148            if isinstance(self.raw_data['percentiles'][qoi]['p99'], np.ndarray):
149                return self.raw_data['percentiles'][qoi]['p99']
150            else:
151                return np.array([self.raw_data['percentiles'][qoi]['p99']])
152        elif statistic == 'median':
153            if isinstance(self.raw_data['percentiles'][qoi]['p50'], np.ndarray):
154                return self.raw_data['percentiles'][qoi]['p50']
155            else:
156                return np.array([self.raw_data['percentiles'][qoi]['p50']])
157        else:
158            try:
159                return self.raw_data['statistical_moments'][qoi][statistic]
160            except KeyError:
161                raise NotImplementedError
162
163    def surrogate(self):
164        """Return a PCE surrogate model.
165
166        Returns
167        -------
168        A function that takes a dictionary of parameter - value pairs and returns
169        a dictionary with the results (same output as decoder).
170        """
171        def surrogate_fn(inputs):
172            def swap(x):
173                if len(x) > 1:
174                    return list(x)
175                else:
176                    return x[0]
177            values = np.array([inputs[key] for key in self.inputs])
178            results = dict([(qoi, swap((self.raw_data['fit'][qoi](*values)).T))
179                           for qoi in self.qois])
180            return results
181        return surrogate_fn
182
183    def get_distribution(self, qoi):
184        """Returns a distribution for the given qoi.
185
186        Parameters
187        ----------
188        qoi: str
189            QoI name
190
191        Returns
192        -------
193        A ChaosPy PDF
194        """
195        if qoi not in self.qois:
196            raise RuntimeError('no such quantity of interest - {}'.format(qoi))
197        return self.raw_data['output_distributions'][qoi]

Analysis results for the PCEAnalysis class.

def supported_stats(self):
 99    def supported_stats(self):
100        """Types of statistics supported by the describe method.
101
102        Returns
103        -------
104        list of str
105        """
106        return ['min', 'max', '10%', '90%', '1%', '99%', 'median',
107                'mean', 'var', 'std']

Types of statistics supported by the describe method.

Returns
  • list of str
def surrogate(self):
163    def surrogate(self):
164        """Return a PCE surrogate model.
165
166        Returns
167        -------
168        A function that takes a dictionary of parameter - value pairs and returns
169        a dictionary with the results (same output as decoder).
170        """
171        def surrogate_fn(inputs):
172            def swap(x):
173                if len(x) > 1:
174                    return list(x)
175                else:
176                    return x[0]
177            values = np.array([inputs[key] for key in self.inputs])
178            results = dict([(qoi, swap((self.raw_data['fit'][qoi](*values)).T))
179                           for qoi in self.qois])
180            return results
181        return surrogate_fn

Return a PCE surrogate model.

Returns
  • A function that takes a dictionary of parameter - value pairs and returns
  • a dictionary with the results (same output as decoder).
def get_distribution(self, qoi):
183    def get_distribution(self, qoi):
184        """Returns a distribution for the given qoi.
185
186        Parameters
187        ----------
188        qoi: str
189            QoI name
190
191        Returns
192        -------
193        A ChaosPy PDF
194        """
195        if qoi not in self.qois:
196            raise RuntimeError('no such quantity of interest - {}'.format(qoi))
197        return self.raw_data['output_distributions'][qoi]

Returns a distribution for the given qoi.

Parameters
  • qoi (str): QoI name
Returns
  • A ChaosPy PDF
class PCEAnalysis(easyvvuq.analysis.base.BaseAnalysisElement):
200class PCEAnalysis(BaseAnalysisElement):
201
202    def __init__(self, sampler=None, qoi_cols=None, sampling=False, CorrelationMatrices=True, OutputDistributions=True):
203        """Analysis element for polynomial chaos expansion (PCE).
204
205        Parameters
206        ----------
207        sampler : PCESampler
208            Sampler used to initiate the PCE analysis.
209        qoi_cols : list or None
210            Column names for quantities of interest (for which analysis is
211            performed).
212        sampling : True if chaospy sampling method to be used for calculating
213            statistical quantities; otherwise [default] the pce coefficients are used
214        CorrelationMatrices : boolean
215            if False then disable the calculation of the Correlation Matrices, otherwise 
216            [default] calculate them
217        OutputDistributions : boolean
218            if False then disable the calculation of the Output Distributions, otherwise
219            [default] calculate them
220        """
221
222        if sampler is None:
223            msg = 'PCE analysis requires a paired sampler to be passed'
224            raise RuntimeError(msg)
225
226        # Flag specifing if we should scale the runs with the nominal base run
227        self.relative_analysis = sampler.relative_analysis
228
229        if qoi_cols is None:
230            raise RuntimeError("Analysis element requires a list of "
231                               "quantities of interest (qoi)")
232
233        self.qoi_cols = qoi_cols
234        self.sampling = sampling
235        self.output_type = OutputType.SUMMARY
236        self.sampler = sampler
237        self.CorrelationMatrices = CorrelationMatrices
238        self.CorrelationMatrices_messages = 0
239        self.OutputDistributions = OutputDistributions
240        self.OutputDistributions_messages = 0
241
242    def element_name(self):
243        """Name for this element for logging purposes.
244
245        Returns
246        -------
247        str
248            "PCE_Analysis"
249        """
250        return "PCE_Analysis"
251
252    def element_version(self):
253        """Version of this element for logging purposes.
254
255        Returns  ## print out the warning only once
256                    self.OutputDistributions_messages += 1
257        -------
258        str
259            Element version.
260        """
261        return "0.6"
262
263    def analyse(self, data_frame=None):
264        """Perform PCE analysis on input `data_frame`.
265
266        Parameters
267        ----------
268        data_frame : pandas DataFrame
269            Input data for analysis.
270
271        Returns
272        -------
273        PCEAnalysisResults
274            Use it to get the sobol indices and other information.
275        """
276
277        def sobols(P, coefficients):
278            """ Utility routine to calculate sobols based on coefficients
279            """
280            A = np.array(P.coefficients) != 0
281            multi_indices = np.array([P.exponents[A[:, i]].sum(axis=0) for i in range(A.shape[1])])
282            sobol_mask = multi_indices != 0
283            _, index = np.unique(sobol_mask, axis=0, return_index=True)
284            index = np.sort(index)
285            sobol_idx_bool = sobol_mask[index]
286            sobol_idx_bool = np.delete(sobol_idx_bool, [0], axis=0)
287            n_sobol_available = sobol_idx_bool.shape[0]
288            if len(coefficients.shape) == 1:
289                n_out = 1
290            else:
291                n_out = coefficients.shape[1]
292            n_coeffs = coefficients.shape[0]
293            sobol_poly_idx = np.zeros([n_coeffs, n_sobol_available])
294            for i_sobol in range(n_sobol_available):
295                sobol_poly_idx[:, i_sobol] = np.all(sobol_mask == sobol_idx_bool[i_sobol], axis=1)
296            sobol = np.zeros([n_sobol_available, n_out])
297            for i_sobol in range(n_sobol_available):
298                sobol[i_sobol] = np.sum(
299                    np.square(coefficients[sobol_poly_idx[:, i_sobol] == 1]), axis=0)
300            idx_sort_descend_1st = np.argsort(sobol[:, 0], axis=0)[::-1]
301            sobol = sobol[idx_sort_descend_1st, :]
302            sobol_idx_bool = sobol_idx_bool[idx_sort_descend_1st]
303            sobol_idx = [0 for _ in range(sobol_idx_bool.shape[0])]
304            for i_sobol in range(sobol_idx_bool.shape[0]):
305                sobol_idx[i_sobol] = np.array(
306                    [i for i, x in enumerate(sobol_idx_bool[i_sobol, :]) if x])
307            var = ((coefficients[1:]**2).sum(axis=0))
308            sobol = sobol / (var + np.finfo(float).tiny)
309            return sobol, sobol_idx, sobol_idx_bool
310
311        def build_surrogate_der(Y_hat, verbose=False):
312            '''Computes derivative of the polynomial Y_hat w.r.t. Vars
313            Parameter T specifies the time dimension
314            '''
315
316            # Build derivative with respect to all variables
317            dim = len(self.sampler.vary.vary_dict)
318            if dim < 1:
319                return 0
320            elif dim == 1:
321                Vars = [cp.variable(dim).names[0]]
322            else:
323                Vars = [v.names[0] for v in cp.variable(dim)]
324
325            T = len(Y_hat)
326
327            assert(len(Vars) == len(self.sampler.vary.vary_dict))
328
329            # derivative of the PCE expansion
330            # {dYhat_dx1: [t0, t1, ...],
331            #  dYhat_dx2: [t0, t1, ...],
332            #  ...,
333            #  dYhat_dxN: [t0, t1, ...] }
334            dY_hat = {v:[cp.polynomial(0) for t in range(T)] for v in self.sampler.vary.vary_dict}
335
336            for t in range(T):
337
338                for n1, n2 in zip(Y_hat[t].names, Vars):
339                    assert(n1 == n2)
340
341                for d_var_idx, (d_var, d_var_app) in enumerate(zip(Vars, self.sampler.vary.vary_dict)):
342
343                    if verbose:
344                        print(f'Computing derivative d(Y_hat)/d({d_var})')
345                        print('='*40)
346
347                    # Some variables are missing in the expression,
348                    # then they must be constant terms only i.e. sum(exp==0)
349                    if Y_hat[t].exponents.shape[1] < dim:
350                        #exponents.shape: (n_summands, n_variables)
351                        assert(sum(sum(np.array(Y_hat[t].exponents))) == 0)
352                        continue
353
354                    # Consider only polynomial components var^exp where exp > 0 (since the derivative decreases exp by -1)
355                    components_mask = np.array(Y_hat[t].exponents[:,d_var_idx] > 0)
356                    dY_hat_dvar_exp = Y_hat[t].exponents[components_mask]
357                    dY_hat_dvar_coeff = np.array(Y_hat[t].coefficients)[components_mask]
358
359                    # Iterate over all polynomial components (summands)
360                    for i, (coeff, exp) in enumerate(zip(dY_hat_dvar_coeff, dY_hat_dvar_exp)):
361                        assert(exp[d_var_idx] > 0)
362
363                        # derivative = coeff*exp * var^(exp-1)
364                        dY_hat_dvar_coeff[i] = coeff * exp[d_var_idx]
365                        dY_hat_dvar_exp[i][d_var_idx] = exp[d_var_idx] - 1
366
367                    dY_hat[d_var_app][t] = numpoly.construct.polynomial_from_attributes(
368                                exponents=dY_hat_dvar_exp,
369                                coefficients=dY_hat_dvar_coeff,
370                                names=Y_hat[t].names,
371                                retain_coefficients=True,
372                                retain_names=True)
373
374            return dY_hat
375
376        if data_frame is None:
377            raise RuntimeError("Analysis element needs a data frame to "
378                               "analyse")
379        elif data_frame.empty:
380            raise RuntimeError(
381                "No data in data frame passed to analyse element")
382
383        qoi_cols = self.qoi_cols
384        T = len(data_frame[qoi_cols[0]].values[-1])
385
386        results = {'statistical_moments': {k: {'mean':np.zeros(T),
387                                               'var':np.zeros(T),
388                                               'std':np.zeros(T)} for k in qoi_cols},
389                   'percentiles': {k: {'p01': np.zeros(T),
390                                       'p10': np.zeros(T),
391                                       'p50': np.zeros(T),
392                                       'p90': np.zeros(T),
393                                       'p99': np.zeros(T)} for k in qoi_cols},
394                   'sobols_first': {k: {p: np.zeros(T) for p in self.sampler.vary.vary_dict} for k in qoi_cols},
395                   'sobols_second': {k: {p: np.zeros(T) for p in self.sampler.vary.vary_dict} for k in qoi_cols},
396                   'sobols_total': {k: {p: np.zeros(T) for p in self.sampler.vary.vary_dict} for k in qoi_cols},
397                   'correlation_matrices': {k: {} for k in qoi_cols},
398                   'output_distributions': {k: {} for k in qoi_cols},
399                   'fit': {k: cp.polynomial(np.zeros(T)) for k in qoi_cols},
400                   'Fourier_coefficients': {k: {p: np.zeros(T) for p in self.sampler.vary.vary_dict} for k in qoi_cols},
401                   'derivatives_first': {k: {p: np.zeros(T) for p in self.sampler.vary.vary_dict} for k in qoi_cols},
402                   }
403
404        # Get sampler informations
405        P = self.sampler.P
406        nodes = self.sampler._nodes
407        weights = self.sampler._weights
408        regression = self.sampler.regression
409
410        samples = {k: [] for k in qoi_cols}
411        for k in qoi_cols:
412            if self.relative_analysis:
413                base = data_frame[k].values[self.sampler.n_samples]
414                if np.all(np.array(base) == 0):
415                    warnings.warn(f"Removing QoI {k} from the analysis, contains all zeros", RuntimeWarning)
416                    continue
417                if np.any(np.array(base) == 0):
418                    warnings.warn(f"Removing QoI {k} from the analysis, contains some zeros", RuntimeWarning)
419                    continue
420
421            samples[k] = data_frame[k].values[:self.sampler.n_samples]
422
423            # Compute descriptive statistics for each quantity of interest
424            if regression:
425                fit, fc = cp.fit_regression(P, [n[:self.sampler.n_samples] for n in nodes], samples[k], retall=1)
426            else:
427                fit, fc = cp.fit_quadrature(P, nodes, weights, samples[k], retall=1)
428            results['fit'][k] = fit
429            results['Fourier_coefficients'][k] = fc
430
431            # Percentiles: 1%, 10%, 50%, 90% and 99%
432            P01, P10, P50, P90, P99 = cp.Perc(
433                fit, [1, 10, 50, 90, 99], self.sampler.distribution).squeeze()
434            results['percentiles'][k] = {'p01': P01, 'p10': P10, 'p50': P50, 'p90': P90, 'p99': P99}
435
436            if self.sampling:  # use Chaospy's sampling method
437
438                # Statistical moments
439                mean = cp.E(fit, self.sampler.distribution)
440                var = cp.Var(fit, self.sampler.distribution)
441                std = cp.Std(fit, self.sampler.distribution)
442                results['statistical_moments'][k] = {'mean': mean,
443                                                     'var': var,
444                                                     'std': std}
445
446                sobols_first_narr = cp.Sens_m(fit, self.sampler.distribution)
447                sobols_second_narr = cp.Sens_m2(fit, self.sampler.distribution)
448                sobols_total_narr = cp.Sens_t(fit, self.sampler.distribution)
449                sobols_first_dict = {}
450                sobols_second_dict = {}
451                sobols_total_dict = {}
452                for i, param_name in enumerate(self.sampler.vary.vary_dict):
453                    sobols_first_dict[param_name] = sobols_first_narr[i]
454                    sobols_second_dict[param_name] = sobols_second_narr[i]
455                    sobols_total_dict[param_name] = sobols_total_narr[i]
456
457                results['sobols_first'][k] = sobols_first_dict
458                results['sobols_second'][k] = sobols_second_dict
459                results['sobols_total'][k] = sobols_total_dict
460
461            else:  # use PCE coefficients
462
463                # Statistical moments
464                mean = fc[0]
465                var = np.sum(fc[1:]**2, axis=0)
466                std = np.sqrt(var)
467                results['statistical_moments'][k] = {'mean': mean,
468                                                     'var': var,
469                                                     'std': std}
470
471                # Sensitivity Analysis: First, Second and Total Sobol indices
472                sobol, sobol_idx, _ = sobols(P, fc)
473                varied = [_ for _ in self.sampler.vary.get_keys()]
474                S1 = {_: np.zeros(sobol.shape[-1]) for _ in varied}
475                ST = {_: np.zeros(sobol.shape[-1]) for _ in varied}
476                # S2 = {_ : {__: np.zeros(sobol.shape[-1]) for __ in varied} for _ in varied}
477                # for v in varied: del S2[v][v]
478                S2 = {_: np.zeros((len(varied), sobol.shape[-1])) for _ in varied}
479                for n, si in enumerate(sobol_idx):
480                    if len(si) == 1:
481                        v = varied[si[0]]
482                        S1[v] = sobol[n]
483                    elif len(si) == 2:
484                        v1 = varied[si[0]]
485                        v2 = varied[si[1]]
486                        # S2[v1][v2] = sobol[n]
487                        # S2[v2][v1] = sobol[n]
488                        S2[v1][si[1]] = sobol[n]
489                        S2[v2][si[0]] = sobol[n]
490                    for i in si:
491                        ST[varied[i]] += sobol[n]
492
493                results['sobols_first'][k] = S1
494                results['sobols_second'][k] = S2
495                results['sobols_total'][k] = ST
496
497            # Sensitivity Analysis: Derivative based
498            try:
499                dY_hat = build_surrogate_der(fit, verbose=False)
500                derivatives_first_dict = {}
501                Ndimensions = len(self.sampler.vary.vary_dict)
502                for i, param_name in enumerate(self.sampler.vary.vary_dict):
503                    if self.sampler.nominal_value:
504                        # Evaluate dY_hat['param'] at the nominal value of the parameters
505                        values = self.sampler.nominal_value
506                        logging.info(f"Using nominal value of the parameters to evaluate the derivative ")
507                        derivatives_first_dict[param_name] = cp.polynomial(dY_hat[param_name])(*[v for v in values.values()])
508                    elif all([type(v) == type(cp.Normal()) for v in self.sampler.vary.vary_dict.values()]):
509                        # Evaluate dY_hat['param'] at the mean of the parameters
510                        logging.info(f"Using mean value of the parameters to evaluate the derivative ")
511                        derivatives_first_dict[param_name] = cp.polynomial(dY_hat[param_name])(*[v.get_mom_parameters()["shift"][0] for v in self.sampler.vary.vary_dict.values()])
512                    elif all([type(v) == type(cp.Uniform()) for v in self.sampler.vary.vary_dict.values()]):
513                        logging.info(f"Using mean value of the parameters to evaluate the derivative ")
514                        # Evaluate dY_hat['param'] at the mean of the parameters
515                        derivatives_first_dict[param_name] = cp.polynomial(dY_hat[param_name])(*[(v.lower + v.upper)/2.0 for v in self.sampler.vary.vary_dict.values()])
516                    else:
517                        # Evaluate dY_hat['param'] at the zero vector
518                        logging.info(f"Using zero vector to evaluate the derivative ")
519                        derivatives_first_dict[param_name] = cp.polynomial(dY_hat[param_name])(*np.zeros(Ndimensions))
520
521                    results['derivatives_first'][k] = derivatives_first_dict
522
523            except Exception:
524                traceback.print_exc()
525
526            # Transform the relative numbers back to the absolute values
527            if self.relative_analysis:
528                base = data_frame[k].values[-1]
529
530                results['percentiles'][k]['p01'] = (1.0 + results['percentiles'][k]['p01']) * base
531                results['percentiles'][k]['p10'] = (1.0 + results['percentiles'][k]['p10']) * base
532                results['percentiles'][k]['p50'] = (1.0 + results['percentiles'][k]['p50']) * base
533                results['percentiles'][k]['p90'] = (1.0 + results['percentiles'][k]['p90']) * base
534                results['percentiles'][k]['p99'] = (1.0 + results['percentiles'][k]['p99']) * base
535                results['statistical_moments'][k]['mean'] = (1.0 + results['statistical_moments'][k]['mean']) * base
536                results['statistical_moments'][k]['var']  = (1.0 + results['statistical_moments'][k]['var']) * base
537                results['statistical_moments'][k]['std']  = (1.0 + results['statistical_moments'][k]['std']) * base
538
539            # Correlation matrix
540            try:
541                if self.sampler._is_dependent:
542                    if self.CorrelationMatrices_messages == 0: warnings.warn(f"Skipping computation of cp.Corr", RuntimeWarning)  ## print out the warning only once
543                    self.CorrelationMatrices_messages += 1
544                    results['correlation_matrices'][k] = None
545                else:
546                    if self.CorrelationMatrices:
547                        results['correlation_matrices'][k] = cp.Corr(fit, self.sampler.distribution)
548                    else:
549                        if self.CorrelationMatrices_messages == 0: warnings.warn(f"Skipping computation of cp.Corr", RuntimeWarning)  ## print out the warning only once
550                        self.CorrelationMatrices_messages += 1
551                        results['correlation_matrices'][k] = None
552            except Exception as e:
553                print ('Error %s for %s when computing cp.Corr()'% (e.__class__.__name__, k))
554                results['correlation_matrices'][k] = None
555            
556
557            # Output distributions
558            try:
559                if self.sampler._is_dependent:
560                    if self.OutputDistributions_messages == 0: warnings.warn(f"Skipping computation of cp.QoI_Dist", RuntimeWarning)  ## print out the warning only once
561                    self.OutputDistributions_messages += 1
562                    results['output_distributions'][k] = None
563                else:
564                    if self.OutputDistributions:
565                        results['output_distributions'][k] = cp.QoI_Dist( fit, self.sampler.distribution)
566                    else:
567                        if self.OutputDistributions_messages == 0: warnings.warn(f"Skipping computation of cp.QoI_Dist", RuntimeWarning)  ## print out the warning only once
568                        self.OutputDistributions_messages += 1
569                        results['output_distributions'][k] = None                        
570            except Exception as e:
571                print ('Error %s for %s when computing cp.QoI_Dist()'% (e.__class__.__name__, k))
572#                from traceback import print_exc
573#                print_exc()
574                results['output_distributions'][k] = None
575
576        return PCEAnalysisResults(raw_data=results, samples=data_frame,
577                                  qois=self.qoi_cols, inputs=list(self.sampler.vary.get_keys()))

Base class for all EasyVVUQ analysis elements.

Attributes

PCEAnalysis( sampler=None, qoi_cols=None, sampling=False, CorrelationMatrices=True, OutputDistributions=True)
202    def __init__(self, sampler=None, qoi_cols=None, sampling=False, CorrelationMatrices=True, OutputDistributions=True):
203        """Analysis element for polynomial chaos expansion (PCE).
204
205        Parameters
206        ----------
207        sampler : PCESampler
208            Sampler used to initiate the PCE analysis.
209        qoi_cols : list or None
210            Column names for quantities of interest (for which analysis is
211            performed).
212        sampling : True if chaospy sampling method to be used for calculating
213            statistical quantities; otherwise [default] the pce coefficients are used
214        CorrelationMatrices : boolean
215            if False then disable the calculation of the Correlation Matrices, otherwise 
216            [default] calculate them
217        OutputDistributions : boolean
218            if False then disable the calculation of the Output Distributions, otherwise
219            [default] calculate them
220        """
221
222        if sampler is None:
223            msg = 'PCE analysis requires a paired sampler to be passed'
224            raise RuntimeError(msg)
225
226        # Flag specifing if we should scale the runs with the nominal base run
227        self.relative_analysis = sampler.relative_analysis
228
229        if qoi_cols is None:
230            raise RuntimeError("Analysis element requires a list of "
231                               "quantities of interest (qoi)")
232
233        self.qoi_cols = qoi_cols
234        self.sampling = sampling
235        self.output_type = OutputType.SUMMARY
236        self.sampler = sampler
237        self.CorrelationMatrices = CorrelationMatrices
238        self.CorrelationMatrices_messages = 0
239        self.OutputDistributions = OutputDistributions
240        self.OutputDistributions_messages = 0

Analysis element for polynomial chaos expansion (PCE).

Parameters
  • sampler (PCESampler): Sampler used to initiate the PCE analysis.
  • qoi_cols (list or None): Column names for quantities of interest (for which analysis is performed).
  • sampling (True if chaospy sampling method to be used for calculating): statistical quantities; otherwise [default] the pce coefficients are used
  • CorrelationMatrices (boolean): if False then disable the calculation of the Correlation Matrices, otherwise [default] calculate them
  • OutputDistributions (boolean): if False then disable the calculation of the Output Distributions, otherwise [default] calculate them
relative_analysis
qoi_cols
sampling
output_type
sampler
CorrelationMatrices
CorrelationMatrices_messages
OutputDistributions
OutputDistributions_messages
def element_name(self):
242    def element_name(self):
243        """Name for this element for logging purposes.
244
245        Returns
246        -------
247        str
248            "PCE_Analysis"
249        """
250        return "PCE_Analysis"

Name for this element for logging purposes.

Returns
  • str: "PCE_Analysis"
def element_version(self):
252    def element_version(self):
253        """Version of this element for logging purposes.
254
255        Returns  ## print out the warning only once
256                    self.OutputDistributions_messages += 1
257        -------
258        str
259            Element version.
260        """
261        return "0.6"

Version of this element for logging purposes.

Returns ## print out the warning only once

self.OutputDistributions_messages += 1

str Element version.

def analyse(self, data_frame=None):
263    def analyse(self, data_frame=None):
264        """Perform PCE analysis on input `data_frame`.
265
266        Parameters
267        ----------
268        data_frame : pandas DataFrame
269            Input data for analysis.
270
271        Returns
272        -------
273        PCEAnalysisResults
274            Use it to get the sobol indices and other information.
275        """
276
277        def sobols(P, coefficients):
278            """ Utility routine to calculate sobols based on coefficients
279            """
280            A = np.array(P.coefficients) != 0
281            multi_indices = np.array([P.exponents[A[:, i]].sum(axis=0) for i in range(A.shape[1])])
282            sobol_mask = multi_indices != 0
283            _, index = np.unique(sobol_mask, axis=0, return_index=True)
284            index = np.sort(index)
285            sobol_idx_bool = sobol_mask[index]
286            sobol_idx_bool = np.delete(sobol_idx_bool, [0], axis=0)
287            n_sobol_available = sobol_idx_bool.shape[0]
288            if len(coefficients.shape) == 1:
289                n_out = 1
290            else:
291                n_out = coefficients.shape[1]
292            n_coeffs = coefficients.shape[0]
293            sobol_poly_idx = np.zeros([n_coeffs, n_sobol_available])
294            for i_sobol in range(n_sobol_available):
295                sobol_poly_idx[:, i_sobol] = np.all(sobol_mask == sobol_idx_bool[i_sobol], axis=1)
296            sobol = np.zeros([n_sobol_available, n_out])
297            for i_sobol in range(n_sobol_available):
298                sobol[i_sobol] = np.sum(
299                    np.square(coefficients[sobol_poly_idx[:, i_sobol] == 1]), axis=0)
300            idx_sort_descend_1st = np.argsort(sobol[:, 0], axis=0)[::-1]
301            sobol = sobol[idx_sort_descend_1st, :]
302            sobol_idx_bool = sobol_idx_bool[idx_sort_descend_1st]
303            sobol_idx = [0 for _ in range(sobol_idx_bool.shape[0])]
304            for i_sobol in range(sobol_idx_bool.shape[0]):
305                sobol_idx[i_sobol] = np.array(
306                    [i for i, x in enumerate(sobol_idx_bool[i_sobol, :]) if x])
307            var = ((coefficients[1:]**2).sum(axis=0))
308            sobol = sobol / (var + np.finfo(float).tiny)
309            return sobol, sobol_idx, sobol_idx_bool
310
311        def build_surrogate_der(Y_hat, verbose=False):
312            '''Computes derivative of the polynomial Y_hat w.r.t. Vars
313            Parameter T specifies the time dimension
314            '''
315
316            # Build derivative with respect to all variables
317            dim = len(self.sampler.vary.vary_dict)
318            if dim < 1:
319                return 0
320            elif dim == 1:
321                Vars = [cp.variable(dim).names[0]]
322            else:
323                Vars = [v.names[0] for v in cp.variable(dim)]
324
325            T = len(Y_hat)
326
327            assert(len(Vars) == len(self.sampler.vary.vary_dict))
328
329            # derivative of the PCE expansion
330            # {dYhat_dx1: [t0, t1, ...],
331            #  dYhat_dx2: [t0, t1, ...],
332            #  ...,
333            #  dYhat_dxN: [t0, t1, ...] }
334            dY_hat = {v:[cp.polynomial(0) for t in range(T)] for v in self.sampler.vary.vary_dict}
335
336            for t in range(T):
337
338                for n1, n2 in zip(Y_hat[t].names, Vars):
339                    assert(n1 == n2)
340
341                for d_var_idx, (d_var, d_var_app) in enumerate(zip(Vars, self.sampler.vary.vary_dict)):
342
343                    if verbose:
344                        print(f'Computing derivative d(Y_hat)/d({d_var})')
345                        print('='*40)
346
347                    # Some variables are missing in the expression,
348                    # then they must be constant terms only i.e. sum(exp==0)
349                    if Y_hat[t].exponents.shape[1] < dim:
350                        #exponents.shape: (n_summands, n_variables)
351                        assert(sum(sum(np.array(Y_hat[t].exponents))) == 0)
352                        continue
353
354                    # Consider only polynomial components var^exp where exp > 0 (since the derivative decreases exp by -1)
355                    components_mask = np.array(Y_hat[t].exponents[:,d_var_idx] > 0)
356                    dY_hat_dvar_exp = Y_hat[t].exponents[components_mask]
357                    dY_hat_dvar_coeff = np.array(Y_hat[t].coefficients)[components_mask]
358
359                    # Iterate over all polynomial components (summands)
360                    for i, (coeff, exp) in enumerate(zip(dY_hat_dvar_coeff, dY_hat_dvar_exp)):
361                        assert(exp[d_var_idx] > 0)
362
363                        # derivative = coeff*exp * var^(exp-1)
364                        dY_hat_dvar_coeff[i] = coeff * exp[d_var_idx]
365                        dY_hat_dvar_exp[i][d_var_idx] = exp[d_var_idx] - 1
366
367                    dY_hat[d_var_app][t] = numpoly.construct.polynomial_from_attributes(
368                                exponents=dY_hat_dvar_exp,
369                                coefficients=dY_hat_dvar_coeff,
370                                names=Y_hat[t].names,
371                                retain_coefficients=True,
372                                retain_names=True)
373
374            return dY_hat
375
376        if data_frame is None:
377            raise RuntimeError("Analysis element needs a data frame to "
378                               "analyse")
379        elif data_frame.empty:
380            raise RuntimeError(
381                "No data in data frame passed to analyse element")
382
383        qoi_cols = self.qoi_cols
384        T = len(data_frame[qoi_cols[0]].values[-1])
385
386        results = {'statistical_moments': {k: {'mean':np.zeros(T),
387                                               'var':np.zeros(T),
388                                               'std':np.zeros(T)} for k in qoi_cols},
389                   'percentiles': {k: {'p01': np.zeros(T),
390                                       'p10': np.zeros(T),
391                                       'p50': np.zeros(T),
392                                       'p90': np.zeros(T),
393                                       'p99': np.zeros(T)} for k in qoi_cols},
394                   'sobols_first': {k: {p: np.zeros(T) for p in self.sampler.vary.vary_dict} for k in qoi_cols},
395                   'sobols_second': {k: {p: np.zeros(T) for p in self.sampler.vary.vary_dict} for k in qoi_cols},
396                   'sobols_total': {k: {p: np.zeros(T) for p in self.sampler.vary.vary_dict} for k in qoi_cols},
397                   'correlation_matrices': {k: {} for k in qoi_cols},
398                   'output_distributions': {k: {} for k in qoi_cols},
399                   'fit': {k: cp.polynomial(np.zeros(T)) for k in qoi_cols},
400                   'Fourier_coefficients': {k: {p: np.zeros(T) for p in self.sampler.vary.vary_dict} for k in qoi_cols},
401                   'derivatives_first': {k: {p: np.zeros(T) for p in self.sampler.vary.vary_dict} for k in qoi_cols},
402                   }
403
404        # Get sampler informations
405        P = self.sampler.P
406        nodes = self.sampler._nodes
407        weights = self.sampler._weights
408        regression = self.sampler.regression
409
410        samples = {k: [] for k in qoi_cols}
411        for k in qoi_cols:
412            if self.relative_analysis:
413                base = data_frame[k].values[self.sampler.n_samples]
414                if np.all(np.array(base) == 0):
415                    warnings.warn(f"Removing QoI {k} from the analysis, contains all zeros", RuntimeWarning)
416                    continue
417                if np.any(np.array(base) == 0):
418                    warnings.warn(f"Removing QoI {k} from the analysis, contains some zeros", RuntimeWarning)
419                    continue
420
421            samples[k] = data_frame[k].values[:self.sampler.n_samples]
422
423            # Compute descriptive statistics for each quantity of interest
424            if regression:
425                fit, fc = cp.fit_regression(P, [n[:self.sampler.n_samples] for n in nodes], samples[k], retall=1)
426            else:
427                fit, fc = cp.fit_quadrature(P, nodes, weights, samples[k], retall=1)
428            results['fit'][k] = fit
429            results['Fourier_coefficients'][k] = fc
430
431            # Percentiles: 1%, 10%, 50%, 90% and 99%
432            P01, P10, P50, P90, P99 = cp.Perc(
433                fit, [1, 10, 50, 90, 99], self.sampler.distribution).squeeze()
434            results['percentiles'][k] = {'p01': P01, 'p10': P10, 'p50': P50, 'p90': P90, 'p99': P99}
435
436            if self.sampling:  # use Chaospy's sampling method
437
438                # Statistical moments
439                mean = cp.E(fit, self.sampler.distribution)
440                var = cp.Var(fit, self.sampler.distribution)
441                std = cp.Std(fit, self.sampler.distribution)
442                results['statistical_moments'][k] = {'mean': mean,
443                                                     'var': var,
444                                                     'std': std}
445
446                sobols_first_narr = cp.Sens_m(fit, self.sampler.distribution)
447                sobols_second_narr = cp.Sens_m2(fit, self.sampler.distribution)
448                sobols_total_narr = cp.Sens_t(fit, self.sampler.distribution)
449                sobols_first_dict = {}
450                sobols_second_dict = {}
451                sobols_total_dict = {}
452                for i, param_name in enumerate(self.sampler.vary.vary_dict):
453                    sobols_first_dict[param_name] = sobols_first_narr[i]
454                    sobols_second_dict[param_name] = sobols_second_narr[i]
455                    sobols_total_dict[param_name] = sobols_total_narr[i]
456
457                results['sobols_first'][k] = sobols_first_dict
458                results['sobols_second'][k] = sobols_second_dict
459                results['sobols_total'][k] = sobols_total_dict
460
461            else:  # use PCE coefficients
462
463                # Statistical moments
464                mean = fc[0]
465                var = np.sum(fc[1:]**2, axis=0)
466                std = np.sqrt(var)
467                results['statistical_moments'][k] = {'mean': mean,
468                                                     'var': var,
469                                                     'std': std}
470
471                # Sensitivity Analysis: First, Second and Total Sobol indices
472                sobol, sobol_idx, _ = sobols(P, fc)
473                varied = [_ for _ in self.sampler.vary.get_keys()]
474                S1 = {_: np.zeros(sobol.shape[-1]) for _ in varied}
475                ST = {_: np.zeros(sobol.shape[-1]) for _ in varied}
476                # S2 = {_ : {__: np.zeros(sobol.shape[-1]) for __ in varied} for _ in varied}
477                # for v in varied: del S2[v][v]
478                S2 = {_: np.zeros((len(varied), sobol.shape[-1])) for _ in varied}
479                for n, si in enumerate(sobol_idx):
480                    if len(si) == 1:
481                        v = varied[si[0]]
482                        S1[v] = sobol[n]
483                    elif len(si) == 2:
484                        v1 = varied[si[0]]
485                        v2 = varied[si[1]]
486                        # S2[v1][v2] = sobol[n]
487                        # S2[v2][v1] = sobol[n]
488                        S2[v1][si[1]] = sobol[n]
489                        S2[v2][si[0]] = sobol[n]
490                    for i in si:
491                        ST[varied[i]] += sobol[n]
492
493                results['sobols_first'][k] = S1
494                results['sobols_second'][k] = S2
495                results['sobols_total'][k] = ST
496
497            # Sensitivity Analysis: Derivative based
498            try:
499                dY_hat = build_surrogate_der(fit, verbose=False)
500                derivatives_first_dict = {}
501                Ndimensions = len(self.sampler.vary.vary_dict)
502                for i, param_name in enumerate(self.sampler.vary.vary_dict):
503                    if self.sampler.nominal_value:
504                        # Evaluate dY_hat['param'] at the nominal value of the parameters
505                        values = self.sampler.nominal_value
506                        logging.info(f"Using nominal value of the parameters to evaluate the derivative ")
507                        derivatives_first_dict[param_name] = cp.polynomial(dY_hat[param_name])(*[v for v in values.values()])
508                    elif all([type(v) == type(cp.Normal()) for v in self.sampler.vary.vary_dict.values()]):
509                        # Evaluate dY_hat['param'] at the mean of the parameters
510                        logging.info(f"Using mean value of the parameters to evaluate the derivative ")
511                        derivatives_first_dict[param_name] = cp.polynomial(dY_hat[param_name])(*[v.get_mom_parameters()["shift"][0] for v in self.sampler.vary.vary_dict.values()])
512                    elif all([type(v) == type(cp.Uniform()) for v in self.sampler.vary.vary_dict.values()]):
513                        logging.info(f"Using mean value of the parameters to evaluate the derivative ")
514                        # Evaluate dY_hat['param'] at the mean of the parameters
515                        derivatives_first_dict[param_name] = cp.polynomial(dY_hat[param_name])(*[(v.lower + v.upper)/2.0 for v in self.sampler.vary.vary_dict.values()])
516                    else:
517                        # Evaluate dY_hat['param'] at the zero vector
518                        logging.info(f"Using zero vector to evaluate the derivative ")
519                        derivatives_first_dict[param_name] = cp.polynomial(dY_hat[param_name])(*np.zeros(Ndimensions))
520
521                    results['derivatives_first'][k] = derivatives_first_dict
522
523            except Exception:
524                traceback.print_exc()
525
526            # Transform the relative numbers back to the absolute values
527            if self.relative_analysis:
528                base = data_frame[k].values[-1]
529
530                results['percentiles'][k]['p01'] = (1.0 + results['percentiles'][k]['p01']) * base
531                results['percentiles'][k]['p10'] = (1.0 + results['percentiles'][k]['p10']) * base
532                results['percentiles'][k]['p50'] = (1.0 + results['percentiles'][k]['p50']) * base
533                results['percentiles'][k]['p90'] = (1.0 + results['percentiles'][k]['p90']) * base
534                results['percentiles'][k]['p99'] = (1.0 + results['percentiles'][k]['p99']) * base
535                results['statistical_moments'][k]['mean'] = (1.0 + results['statistical_moments'][k]['mean']) * base
536                results['statistical_moments'][k]['var']  = (1.0 + results['statistical_moments'][k]['var']) * base
537                results['statistical_moments'][k]['std']  = (1.0 + results['statistical_moments'][k]['std']) * base
538
539            # Correlation matrix
540            try:
541                if self.sampler._is_dependent:
542                    if self.CorrelationMatrices_messages == 0: warnings.warn(f"Skipping computation of cp.Corr", RuntimeWarning)  ## print out the warning only once
543                    self.CorrelationMatrices_messages += 1
544                    results['correlation_matrices'][k] = None
545                else:
546                    if self.CorrelationMatrices:
547                        results['correlation_matrices'][k] = cp.Corr(fit, self.sampler.distribution)
548                    else:
549                        if self.CorrelationMatrices_messages == 0: warnings.warn(f"Skipping computation of cp.Corr", RuntimeWarning)  ## print out the warning only once
550                        self.CorrelationMatrices_messages += 1
551                        results['correlation_matrices'][k] = None
552            except Exception as e:
553                print ('Error %s for %s when computing cp.Corr()'% (e.__class__.__name__, k))
554                results['correlation_matrices'][k] = None
555            
556
557            # Output distributions
558            try:
559                if self.sampler._is_dependent:
560                    if self.OutputDistributions_messages == 0: warnings.warn(f"Skipping computation of cp.QoI_Dist", RuntimeWarning)  ## print out the warning only once
561                    self.OutputDistributions_messages += 1
562                    results['output_distributions'][k] = None
563                else:
564                    if self.OutputDistributions:
565                        results['output_distributions'][k] = cp.QoI_Dist( fit, self.sampler.distribution)
566                    else:
567                        if self.OutputDistributions_messages == 0: warnings.warn(f"Skipping computation of cp.QoI_Dist", RuntimeWarning)  ## print out the warning only once
568                        self.OutputDistributions_messages += 1
569                        results['output_distributions'][k] = None                        
570            except Exception as e:
571                print ('Error %s for %s when computing cp.QoI_Dist()'% (e.__class__.__name__, k))
572#                from traceback import print_exc
573#                print_exc()
574                results['output_distributions'][k] = None
575
576        return PCEAnalysisResults(raw_data=results, samples=data_frame,
577                                  qois=self.qoi_cols, inputs=list(self.sampler.vary.get_keys()))

Perform PCE analysis on input data_frame.

Parameters
  • data_frame (pandas DataFrame): Input data for analysis.
Returns
  • PCEAnalysisResults: Use it to get the sobol indices and other information.