easyvvuq.analysis.pce_analysis
Analysis element for polynomial chaos expansion (PCE). We use ChaosPy under the hood for this functionality.
1"""Analysis element for polynomial chaos expansion (PCE). We use ChaosPy 2under the hood for this functionality. 3""" 4import logging 5import chaospy as cp 6import numpy as np 7import numpoly 8import warnings 9from easyvvuq import OutputType 10from .base import BaseAnalysisElement 11from .results import AnalysisResults 12from .qmc_analysis import QMCAnalysisResults 13import traceback 14 15__author__ = 'Jalal Lakhlili' 16__license__ = "LGPL" 17 18logger = logging.getLogger(__name__) 19 20 21class PCEAnalysisResults(QMCAnalysisResults): 22 """Analysis results for the PCEAnalysis class. 23 """ 24 25 def _get_derivatives_first(self, qoi, input_): 26 """Returns the first order derivative-based index for a given qoi wrt input variable. 27 28 Parameters 29 ---------- 30 qoi : str 31 Quantity of interest 32 input_ : str 33 Input variable 34 35 Returns 36 ------- 37 float 38 First order derivative-based index. 39 """ 40 raw_dict = AnalysisResults._keys_to_tuples(self.raw_data['derivatives_first']) 41 return raw_dict[AnalysisResults._to_tuple(qoi)][input_] 42 43 def _get_sobols_first(self, qoi, input_): 44 """Returns the first order sobol index for a given qoi wrt input variable. 45 46 Parameters 47 ---------- 48 qoi : str 49 Quantity of interest 50 input_ : str 51 Input variable 52 53 Returns 54 ------- 55 float 56 First order sobol index. 57 """ 58 raw_dict = AnalysisResults._keys_to_tuples(self.raw_data['sobols_first']) 59 return raw_dict[AnalysisResults._to_tuple(qoi)][input_] 60 61 def _get_sobols_second(self, qoi, input_): 62 """Returns the second order sobol index for a given qoi wrt input variable. 63 64 Parameters 65 ---------- 66 qoi : str 67 Quantity of interest 68 input_ : str 69 Input variable 70 71 Returns 72 ------- 73 float 74 Second order sobol index. 75 """ 76 raw_dict = AnalysisResults._keys_to_tuples(self.raw_data['sobols_second']) 77 return dict([(in_, raw_dict[AnalysisResults._to_tuple(qoi)][input_][i]) 78 for i, in_ in enumerate(self.inputs) if in_ != input_]) 79 80 def _get_sobols_total(self, qoi, input_): 81 """Returns the total order sobol index for a given qoi wrt input variable. 82 83 Parameters 84 ---------- 85 qoi : str 86 Quantity of interest 87 input_ : str 88 Input variable 89 90 Returns 91 ------- 92 float 93 Total order sobol index. 94 """ 95 raw_dict = AnalysisResults._keys_to_tuples(self.raw_data['sobols_total']) 96 return raw_dict[AnalysisResults._to_tuple(qoi)][input_] 97 98 def supported_stats(self): 99 """Types of statistics supported by the describe method. 100 101 Returns 102 ------- 103 list of str 104 """ 105 return ['min', 'max', '10%', '90%', '1%', '99%', 'median', 106 'mean', 'var', 'std'] 107 108 def _describe(self, qoi, statistic): 109 """Returns descriptive statistics, similar to pandas describe. 110 111 Parameters 112 ---------- 113 qoi : str 114 Name of quantity of interest. 115 statistic : str 116 One of 'min', 'max', '10%', '90%', 'median', 'mean', 'var', 'std' 117 118 Returns 119 ------- 120 float 121 Value of the requested statistic. 122 """ 123 if statistic not in self.supported_stats(): 124 raise NotImplementedError 125 if statistic == 'min': 126 return np.array([v.lower[0] for _, v in enumerate( 127 self.raw_data['output_distributions'][qoi])]) if self.raw_data['output_distributions'][qoi] is not None else np.array([]) 128 elif statistic == 'max': 129 return np.array([v.upper[0] for _, v in enumerate( 130 self.raw_data['output_distributions'][qoi])]) if self.raw_data['output_distributions'][qoi] is not None else np.array([]) 131 elif statistic == '1%': 132 if isinstance(self.raw_data['percentiles'][qoi]['p01'], np.ndarray): 133 return self.raw_data['percentiles'][qoi]['p01'] 134 else: 135 return np.array([self.raw_data['percentiles'][qoi]['p01']]) 136 elif statistic == '10%': 137 if isinstance(self.raw_data['percentiles'][qoi]['p10'], np.ndarray): 138 return self.raw_data['percentiles'][qoi]['p10'] 139 else: 140 return np.array([self.raw_data['percentiles'][qoi]['p10']]) 141 elif statistic == '90%': 142 if isinstance(self.raw_data['percentiles'][qoi]['p90'], np.ndarray): 143 return self.raw_data['percentiles'][qoi]['p90'] 144 else: 145 return np.array([self.raw_data['percentiles'][qoi]['p90']]) 146 elif statistic == '99%': 147 if isinstance(self.raw_data['percentiles'][qoi]['p99'], np.ndarray): 148 return self.raw_data['percentiles'][qoi]['p99'] 149 else: 150 return np.array([self.raw_data['percentiles'][qoi]['p99']]) 151 elif statistic == 'median': 152 if isinstance(self.raw_data['percentiles'][qoi]['p50'], np.ndarray): 153 return self.raw_data['percentiles'][qoi]['p50'] 154 else: 155 return np.array([self.raw_data['percentiles'][qoi]['p50']]) 156 else: 157 try: 158 return self.raw_data['statistical_moments'][qoi][statistic] 159 except KeyError: 160 raise NotImplementedError 161 162 def surrogate(self): 163 """Return a PCE surrogate model. 164 165 Returns 166 ------- 167 A function that takes a dictionary of parameter - value pairs and returns 168 a dictionary with the results (same output as decoder). 169 """ 170 def surrogate_fn(inputs): 171 def swap(x): 172 if len(x) > 1: 173 return list(x) 174 else: 175 return x[0] 176 values = np.array([inputs[key] for key in self.inputs]) 177 results = dict([(qoi, swap((self.raw_data['fit'][qoi](*values)).T)) 178 for qoi in self.qois]) 179 return results 180 return surrogate_fn 181 182 def get_distribution(self, qoi): 183 """Returns a distribution for the given qoi. 184 185 Parameters 186 ---------- 187 qoi: str 188 QoI name 189 190 Returns 191 ------- 192 A ChaosPy PDF 193 """ 194 if qoi not in self.qois: 195 raise RuntimeError('no such quantity of interest - {}'.format(qoi)) 196 return self.raw_data['output_distributions'][qoi] 197 198 199class PCEAnalysis(BaseAnalysisElement): 200 201 def __init__(self, sampler=None, qoi_cols=None, sampling=False, CorrelationMatrices=True, OutputDistributions=True): 202 """Analysis element for polynomial chaos expansion (PCE). 203 204 Parameters 205 ---------- 206 sampler : PCESampler 207 Sampler used to initiate the PCE analysis. 208 qoi_cols : list or None 209 Column names for quantities of interest (for which analysis is 210 performed). 211 sampling : True if chaospy sampling method to be used for calculating 212 statistical quantities; otherwise [default] the pce coefficients are used 213 CorrelationMatrices : boolean 214 if False then disable the calculation of the Correlation Matrices, otherwise 215 [default] calculate them 216 OutputDistributions : boolean 217 if False then disable the calculation of the Output Distributions, otherwise 218 [default] calculate them 219 """ 220 221 if sampler is None: 222 msg = 'PCE analysis requires a paired sampler to be passed' 223 raise RuntimeError(msg) 224 225 # Flag specifing if we should scale the runs with the nominal base run 226 self.relative_analysis = sampler.relative_analysis 227 228 if qoi_cols is None: 229 raise RuntimeError("Analysis element requires a list of " 230 "quantities of interest (qoi)") 231 232 self.qoi_cols = qoi_cols 233 self.sampling = sampling 234 self.output_type = OutputType.SUMMARY 235 self.sampler = sampler 236 self.CorrelationMatrices = CorrelationMatrices 237 self.CorrelationMatrices_messages = 0 238 self.OutputDistributions = OutputDistributions 239 self.OutputDistributions_messages = 0 240 241 def element_name(self): 242 """Name for this element for logging purposes. 243 244 Returns 245 ------- 246 str 247 "PCE_Analysis" 248 """ 249 return "PCE_Analysis" 250 251 def element_version(self): 252 """Version of this element for logging purposes. 253 254 Returns ## print out the warning only once 255 self.OutputDistributions_messages += 1 256 ------- 257 str 258 Element version. 259 """ 260 return "0.6" 261 262 def analyse(self, data_frame=None): 263 """Perform PCE analysis on input `data_frame`. 264 265 Parameters 266 ---------- 267 data_frame : pandas DataFrame 268 Input data for analysis. 269 270 Returns 271 ------- 272 PCEAnalysisResults 273 Use it to get the sobol indices and other information. 274 """ 275 276 def sobols(P, coefficients): 277 """ Utility routine to calculate sobols based on coefficients 278 """ 279 A = np.array(P.coefficients) != 0 280 multi_indices = np.array([P.exponents[A[:, i]].sum(axis=0) for i in range(A.shape[1])]) 281 sobol_mask = multi_indices != 0 282 _, index = np.unique(sobol_mask, axis=0, return_index=True) 283 index = np.sort(index) 284 sobol_idx_bool = sobol_mask[index] 285 sobol_idx_bool = np.delete(sobol_idx_bool, [0], axis=0) 286 n_sobol_available = sobol_idx_bool.shape[0] 287 if len(coefficients.shape) == 1: 288 n_out = 1 289 else: 290 n_out = coefficients.shape[1] 291 n_coeffs = coefficients.shape[0] 292 sobol_poly_idx = np.zeros([n_coeffs, n_sobol_available]) 293 for i_sobol in range(n_sobol_available): 294 sobol_poly_idx[:, i_sobol] = np.all(sobol_mask == sobol_idx_bool[i_sobol], axis=1) 295 sobol = np.zeros([n_sobol_available, n_out]) 296 for i_sobol in range(n_sobol_available): 297 sobol[i_sobol] = np.sum( 298 np.square(coefficients[sobol_poly_idx[:, i_sobol] == 1]), axis=0) 299 idx_sort_descend_1st = np.argsort(sobol[:, 0], axis=0)[::-1] 300 sobol = sobol[idx_sort_descend_1st, :] 301 sobol_idx_bool = sobol_idx_bool[idx_sort_descend_1st] 302 sobol_idx = [0 for _ in range(sobol_idx_bool.shape[0])] 303 for i_sobol in range(sobol_idx_bool.shape[0]): 304 sobol_idx[i_sobol] = np.array( 305 [i for i, x in enumerate(sobol_idx_bool[i_sobol, :]) if x]) 306 var = ((coefficients[1:]**2).sum(axis=0)) 307 sobol = sobol / (var + np.finfo(float).tiny) 308 return sobol, sobol_idx, sobol_idx_bool 309 310 def build_surrogate_der(Y_hat, verbose=False): 311 '''Computes derivative of the polynomial Y_hat w.r.t. Vars 312 Parameter T specifies the time dimension 313 ''' 314 315 # Build derivative with respect to all variables 316 dim = len(self.sampler.vary.vary_dict) 317 if dim < 1: 318 return 0 319 elif dim == 1: 320 Vars = [cp.variable(dim).names[0]] 321 else: 322 Vars = [v.names[0] for v in cp.variable(dim)] 323 324 T = len(Y_hat) 325 326 assert(len(Vars) == len(self.sampler.vary.vary_dict)) 327 328 # derivative of the PCE expansion 329 # {dYhat_dx1: [t0, t1, ...], 330 # dYhat_dx2: [t0, t1, ...], 331 # ..., 332 # dYhat_dxN: [t0, t1, ...] } 333 dY_hat = {v:[cp.polynomial(0) for t in range(T)] for v in self.sampler.vary.vary_dict} 334 335 for t in range(T): 336 337 for n1, n2 in zip(Y_hat[t].names, Vars): 338 assert(n1 == n2) 339 340 for d_var_idx, (d_var, d_var_app) in enumerate(zip(Vars, self.sampler.vary.vary_dict)): 341 342 if verbose: 343 print(f'Computing derivative d(Y_hat)/d({d_var})') 344 print('='*40) 345 346 # Some variables are missing in the expression, 347 # then they must be constant terms only i.e. sum(exp==0) 348 if Y_hat[t].exponents.shape[1] < dim: 349 #exponents.shape: (n_summands, n_variables) 350 assert(sum(sum(np.array(Y_hat[t].exponents))) == 0) 351 continue 352 353 # Consider only polynomial components var^exp where exp > 0 (since the derivative decreases exp by -1) 354 components_mask = np.array(Y_hat[t].exponents[:,d_var_idx] > 0) 355 dY_hat_dvar_exp = Y_hat[t].exponents[components_mask] 356 dY_hat_dvar_coeff = np.array(Y_hat[t].coefficients)[components_mask] 357 358 # Iterate over all polynomial components (summands) 359 for i, (coeff, exp) in enumerate(zip(dY_hat_dvar_coeff, dY_hat_dvar_exp)): 360 assert(exp[d_var_idx] > 0) 361 362 # derivative = coeff*exp * var^(exp-1) 363 dY_hat_dvar_coeff[i] = coeff * exp[d_var_idx] 364 dY_hat_dvar_exp[i][d_var_idx] = exp[d_var_idx] - 1 365 366 dY_hat[d_var_app][t] = numpoly.construct.polynomial_from_attributes( 367 exponents=dY_hat_dvar_exp, 368 coefficients=dY_hat_dvar_coeff, 369 names=Y_hat[t].names, 370 retain_coefficients=True, 371 retain_names=True) 372 373 return dY_hat 374 375 if data_frame is None: 376 raise RuntimeError("Analysis element needs a data frame to " 377 "analyse") 378 elif data_frame.empty: 379 raise RuntimeError( 380 "No data in data frame passed to analyse element") 381 382 qoi_cols = self.qoi_cols 383 T = len(data_frame[qoi_cols[0]].values[-1]) 384 385 results = {'statistical_moments': {k: {'mean':np.zeros(T), 386 'var':np.zeros(T), 387 'std':np.zeros(T)} for k in qoi_cols}, 388 'percentiles': {k: {'p01': np.zeros(T), 389 'p10': np.zeros(T), 390 'p50': np.zeros(T), 391 'p90': np.zeros(T), 392 'p99': np.zeros(T)} for k in qoi_cols}, 393 'sobols_first': {k: {p: np.zeros(T) for p in self.sampler.vary.vary_dict} for k in qoi_cols}, 394 'sobols_second': {k: {p: np.zeros(T) for p in self.sampler.vary.vary_dict} for k in qoi_cols}, 395 'sobols_total': {k: {p: np.zeros(T) for p in self.sampler.vary.vary_dict} for k in qoi_cols}, 396 'correlation_matrices': {k: {} for k in qoi_cols}, 397 'output_distributions': {k: {} for k in qoi_cols}, 398 'fit': {k: cp.polynomial(np.zeros(T)) for k in qoi_cols}, 399 'Fourier_coefficients': {k: {p: np.zeros(T) for p in self.sampler.vary.vary_dict} for k in qoi_cols}, 400 'derivatives_first': {k: {p: np.zeros(T) for p in self.sampler.vary.vary_dict} for k in qoi_cols}, 401 } 402 403 # Get sampler informations 404 P = self.sampler.P 405 nodes = self.sampler._nodes 406 weights = self.sampler._weights 407 regression = self.sampler.regression 408 409 samples = {k: [] for k in qoi_cols} 410 for k in qoi_cols: 411 if self.relative_analysis: 412 base = data_frame[k].values[self.sampler.n_samples] 413 if np.all(np.array(base) == 0): 414 warnings.warn(f"Removing QoI {k} from the analysis, contains all zeros", RuntimeWarning) 415 continue 416 if np.any(np.array(base) == 0): 417 warnings.warn(f"Removing QoI {k} from the analysis, contains some zeros", RuntimeWarning) 418 continue 419 420 samples[k] = data_frame[k].values[:self.sampler.n_samples] 421 422 # Compute descriptive statistics for each quantity of interest 423 if regression: 424 fit, fc = cp.fit_regression(P, [n[:self.sampler.n_samples] for n in nodes], samples[k], retall=1) 425 else: 426 fit, fc = cp.fit_quadrature(P, nodes, weights, samples[k], retall=1) 427 results['fit'][k] = fit 428 results['Fourier_coefficients'][k] = fc 429 430 # Percentiles: 1%, 10%, 50%, 90% and 99% 431 P01, P10, P50, P90, P99 = cp.Perc( 432 fit, [1, 10, 50, 90, 99], self.sampler.distribution).squeeze() 433 results['percentiles'][k] = {'p01': P01, 'p10': P10, 'p50': P50, 'p90': P90, 'p99': P99} 434 435 if self.sampling: # use Chaospy's sampling method 436 437 # Statistical moments 438 mean = cp.E(fit, self.sampler.distribution) 439 var = cp.Var(fit, self.sampler.distribution) 440 std = cp.Std(fit, self.sampler.distribution) 441 results['statistical_moments'][k] = {'mean': mean, 442 'var': var, 443 'std': std} 444 445 sobols_first_narr = cp.Sens_m(fit, self.sampler.distribution) 446 sobols_second_narr = cp.Sens_m2(fit, self.sampler.distribution) 447 sobols_total_narr = cp.Sens_t(fit, self.sampler.distribution) 448 sobols_first_dict = {} 449 sobols_second_dict = {} 450 sobols_total_dict = {} 451 for i, param_name in enumerate(self.sampler.vary.vary_dict): 452 sobols_first_dict[param_name] = sobols_first_narr[i] 453 sobols_second_dict[param_name] = sobols_second_narr[i] 454 sobols_total_dict[param_name] = sobols_total_narr[i] 455 456 results['sobols_first'][k] = sobols_first_dict 457 results['sobols_second'][k] = sobols_second_dict 458 results['sobols_total'][k] = sobols_total_dict 459 460 else: # use PCE coefficients 461 462 # Statistical moments 463 mean = fc[0] 464 var = np.sum(fc[1:]**2, axis=0) 465 std = np.sqrt(var) 466 results['statistical_moments'][k] = {'mean': mean, 467 'var': var, 468 'std': std} 469 470 # Sensitivity Analysis: First, Second and Total Sobol indices 471 sobol, sobol_idx, _ = sobols(P, fc) 472 varied = [_ for _ in self.sampler.vary.get_keys()] 473 S1 = {_: np.zeros(sobol.shape[-1]) for _ in varied} 474 ST = {_: np.zeros(sobol.shape[-1]) for _ in varied} 475 # S2 = {_ : {__: np.zeros(sobol.shape[-1]) for __ in varied} for _ in varied} 476 # for v in varied: del S2[v][v] 477 S2 = {_: np.zeros((len(varied), sobol.shape[-1])) for _ in varied} 478 for n, si in enumerate(sobol_idx): 479 if len(si) == 1: 480 v = varied[si[0]] 481 S1[v] = sobol[n] 482 elif len(si) == 2: 483 v1 = varied[si[0]] 484 v2 = varied[si[1]] 485 # S2[v1][v2] = sobol[n] 486 # S2[v2][v1] = sobol[n] 487 S2[v1][si[1]] = sobol[n] 488 S2[v2][si[0]] = sobol[n] 489 for i in si: 490 ST[varied[i]] += sobol[n] 491 492 results['sobols_first'][k] = S1 493 results['sobols_second'][k] = S2 494 results['sobols_total'][k] = ST 495 496 # Sensitivity Analysis: Derivative based 497 try: 498 dY_hat = build_surrogate_der(fit, verbose=False) 499 derivatives_first_dict = {} 500 Ndimensions = len(self.sampler.vary.vary_dict) 501 for i, param_name in enumerate(self.sampler.vary.vary_dict): 502 if self.sampler.nominal_value: 503 # Evaluate dY_hat['param'] at the nominal value of the parameters 504 values = self.sampler.nominal_value 505 logging.info(f"Using nominal value of the parameters to evaluate the derivative ") 506 derivatives_first_dict[param_name] = cp.polynomial(dY_hat[param_name])(*[v for v in values.values()]) 507 elif all([type(v) == type(cp.Normal()) for v in self.sampler.vary.vary_dict.values()]): 508 # Evaluate dY_hat['param'] at the mean of the parameters 509 logging.info(f"Using mean value of the parameters to evaluate the derivative ") 510 derivatives_first_dict[param_name] = cp.polynomial(dY_hat[param_name])(*[v.get_mom_parameters()["shift"][0] for v in self.sampler.vary.vary_dict.values()]) 511 elif all([type(v) == type(cp.Uniform()) for v in self.sampler.vary.vary_dict.values()]): 512 logging.info(f"Using mean value of the parameters to evaluate the derivative ") 513 # Evaluate dY_hat['param'] at the mean of the parameters 514 derivatives_first_dict[param_name] = cp.polynomial(dY_hat[param_name])(*[(v.lower + v.upper)/2.0 for v in self.sampler.vary.vary_dict.values()]) 515 else: 516 # Evaluate dY_hat['param'] at the zero vector 517 logging.info(f"Using zero vector to evaluate the derivative ") 518 derivatives_first_dict[param_name] = cp.polynomial(dY_hat[param_name])(*np.zeros(Ndimensions)) 519 520 results['derivatives_first'][k] = derivatives_first_dict 521 522 except Exception: 523 traceback.print_exc() 524 525 # Transform the relative numbers back to the absolute values 526 if self.relative_analysis: 527 base = data_frame[k].values[-1] 528 529 results['percentiles'][k]['p01'] = (1.0 + results['percentiles'][k]['p01']) * base 530 results['percentiles'][k]['p10'] = (1.0 + results['percentiles'][k]['p10']) * base 531 results['percentiles'][k]['p50'] = (1.0 + results['percentiles'][k]['p50']) * base 532 results['percentiles'][k]['p90'] = (1.0 + results['percentiles'][k]['p90']) * base 533 results['percentiles'][k]['p99'] = (1.0 + results['percentiles'][k]['p99']) * base 534 results['statistical_moments'][k]['mean'] = (1.0 + results['statistical_moments'][k]['mean']) * base 535 results['statistical_moments'][k]['var'] = (1.0 + results['statistical_moments'][k]['var']) * base 536 results['statistical_moments'][k]['std'] = (1.0 + results['statistical_moments'][k]['std']) * base 537 538 # Correlation matrix 539 try: 540 if self.sampler._is_dependent: 541 if self.CorrelationMatrices_messages == 0: warnings.warn(f"Skipping computation of cp.Corr", RuntimeWarning) ## print out the warning only once 542 self.CorrelationMatrices_messages += 1 543 results['correlation_matrices'][k] = None 544 else: 545 if self.CorrelationMatrices: 546 results['correlation_matrices'][k] = cp.Corr(fit, self.sampler.distribution) 547 else: 548 if self.CorrelationMatrices_messages == 0: warnings.warn(f"Skipping computation of cp.Corr", RuntimeWarning) ## print out the warning only once 549 self.CorrelationMatrices_messages += 1 550 results['correlation_matrices'][k] = None 551 except Exception as e: 552 print ('Error %s for %s when computing cp.Corr()'% (e.__class__.__name__, k)) 553 results['correlation_matrices'][k] = None 554 555 556 # Output distributions 557 try: 558 if self.sampler._is_dependent: 559 if self.OutputDistributions_messages == 0: warnings.warn(f"Skipping computation of cp.QoI_Dist", RuntimeWarning) ## print out the warning only once 560 self.OutputDistributions_messages += 1 561 results['output_distributions'][k] = None 562 else: 563 if self.OutputDistributions: 564 results['output_distributions'][k] = cp.QoI_Dist( fit, self.sampler.distribution) 565 else: 566 if self.OutputDistributions_messages == 0: warnings.warn(f"Skipping computation of cp.QoI_Dist", RuntimeWarning) ## print out the warning only once 567 self.OutputDistributions_messages += 1 568 results['output_distributions'][k] = None 569 except Exception as e: 570 print ('Error %s for %s when computing cp.QoI_Dist()'% (e.__class__.__name__, k)) 571# from traceback import print_exc 572# print_exc() 573 results['output_distributions'][k] = None 574 575 return PCEAnalysisResults(raw_data=results, samples=data_frame, 576 qois=self.qoi_cols, inputs=list(self.sampler.vary.get_keys()))
logger =
<Logger easyvvuq.analysis.pce_analysis (DEBUG)>
22class PCEAnalysisResults(QMCAnalysisResults): 23 """Analysis results for the PCEAnalysis class. 24 """ 25 26 def _get_derivatives_first(self, qoi, input_): 27 """Returns the first order derivative-based index for a given qoi wrt input variable. 28 29 Parameters 30 ---------- 31 qoi : str 32 Quantity of interest 33 input_ : str 34 Input variable 35 36 Returns 37 ------- 38 float 39 First order derivative-based index. 40 """ 41 raw_dict = AnalysisResults._keys_to_tuples(self.raw_data['derivatives_first']) 42 return raw_dict[AnalysisResults._to_tuple(qoi)][input_] 43 44 def _get_sobols_first(self, qoi, input_): 45 """Returns the first order sobol index for a given qoi wrt input variable. 46 47 Parameters 48 ---------- 49 qoi : str 50 Quantity of interest 51 input_ : str 52 Input variable 53 54 Returns 55 ------- 56 float 57 First order sobol index. 58 """ 59 raw_dict = AnalysisResults._keys_to_tuples(self.raw_data['sobols_first']) 60 return raw_dict[AnalysisResults._to_tuple(qoi)][input_] 61 62 def _get_sobols_second(self, qoi, input_): 63 """Returns the second order sobol index for a given qoi wrt input variable. 64 65 Parameters 66 ---------- 67 qoi : str 68 Quantity of interest 69 input_ : str 70 Input variable 71 72 Returns 73 ------- 74 float 75 Second order sobol index. 76 """ 77 raw_dict = AnalysisResults._keys_to_tuples(self.raw_data['sobols_second']) 78 return dict([(in_, raw_dict[AnalysisResults._to_tuple(qoi)][input_][i]) 79 for i, in_ in enumerate(self.inputs) if in_ != input_]) 80 81 def _get_sobols_total(self, qoi, input_): 82 """Returns the total order sobol index for a given qoi wrt input variable. 83 84 Parameters 85 ---------- 86 qoi : str 87 Quantity of interest 88 input_ : str 89 Input variable 90 91 Returns 92 ------- 93 float 94 Total order sobol index. 95 """ 96 raw_dict = AnalysisResults._keys_to_tuples(self.raw_data['sobols_total']) 97 return raw_dict[AnalysisResults._to_tuple(qoi)][input_] 98 99 def supported_stats(self): 100 """Types of statistics supported by the describe method. 101 102 Returns 103 ------- 104 list of str 105 """ 106 return ['min', 'max', '10%', '90%', '1%', '99%', 'median', 107 'mean', 'var', 'std'] 108 109 def _describe(self, qoi, statistic): 110 """Returns descriptive statistics, similar to pandas describe. 111 112 Parameters 113 ---------- 114 qoi : str 115 Name of quantity of interest. 116 statistic : str 117 One of 'min', 'max', '10%', '90%', 'median', 'mean', 'var', 'std' 118 119 Returns 120 ------- 121 float 122 Value of the requested statistic. 123 """ 124 if statistic not in self.supported_stats(): 125 raise NotImplementedError 126 if statistic == 'min': 127 return np.array([v.lower[0] for _, v in enumerate( 128 self.raw_data['output_distributions'][qoi])]) if self.raw_data['output_distributions'][qoi] is not None else np.array([]) 129 elif statistic == 'max': 130 return np.array([v.upper[0] for _, v in enumerate( 131 self.raw_data['output_distributions'][qoi])]) if self.raw_data['output_distributions'][qoi] is not None else np.array([]) 132 elif statistic == '1%': 133 if isinstance(self.raw_data['percentiles'][qoi]['p01'], np.ndarray): 134 return self.raw_data['percentiles'][qoi]['p01'] 135 else: 136 return np.array([self.raw_data['percentiles'][qoi]['p01']]) 137 elif statistic == '10%': 138 if isinstance(self.raw_data['percentiles'][qoi]['p10'], np.ndarray): 139 return self.raw_data['percentiles'][qoi]['p10'] 140 else: 141 return np.array([self.raw_data['percentiles'][qoi]['p10']]) 142 elif statistic == '90%': 143 if isinstance(self.raw_data['percentiles'][qoi]['p90'], np.ndarray): 144 return self.raw_data['percentiles'][qoi]['p90'] 145 else: 146 return np.array([self.raw_data['percentiles'][qoi]['p90']]) 147 elif statistic == '99%': 148 if isinstance(self.raw_data['percentiles'][qoi]['p99'], np.ndarray): 149 return self.raw_data['percentiles'][qoi]['p99'] 150 else: 151 return np.array([self.raw_data['percentiles'][qoi]['p99']]) 152 elif statistic == 'median': 153 if isinstance(self.raw_data['percentiles'][qoi]['p50'], np.ndarray): 154 return self.raw_data['percentiles'][qoi]['p50'] 155 else: 156 return np.array([self.raw_data['percentiles'][qoi]['p50']]) 157 else: 158 try: 159 return self.raw_data['statistical_moments'][qoi][statistic] 160 except KeyError: 161 raise NotImplementedError 162 163 def surrogate(self): 164 """Return a PCE surrogate model. 165 166 Returns 167 ------- 168 A function that takes a dictionary of parameter - value pairs and returns 169 a dictionary with the results (same output as decoder). 170 """ 171 def surrogate_fn(inputs): 172 def swap(x): 173 if len(x) > 1: 174 return list(x) 175 else: 176 return x[0] 177 values = np.array([inputs[key] for key in self.inputs]) 178 results = dict([(qoi, swap((self.raw_data['fit'][qoi](*values)).T)) 179 for qoi in self.qois]) 180 return results 181 return surrogate_fn 182 183 def get_distribution(self, qoi): 184 """Returns a distribution for the given qoi. 185 186 Parameters 187 ---------- 188 qoi: str 189 QoI name 190 191 Returns 192 ------- 193 A ChaosPy PDF 194 """ 195 if qoi not in self.qois: 196 raise RuntimeError('no such quantity of interest - {}'.format(qoi)) 197 return self.raw_data['output_distributions'][qoi]
Analysis results for the PCEAnalysis class.
def
supported_stats(self):
99 def supported_stats(self): 100 """Types of statistics supported by the describe method. 101 102 Returns 103 ------- 104 list of str 105 """ 106 return ['min', 'max', '10%', '90%', '1%', '99%', 'median', 107 'mean', 'var', 'std']
Types of statistics supported by the describe method.
Returns
- list of str
def
surrogate(self):
163 def surrogate(self): 164 """Return a PCE surrogate model. 165 166 Returns 167 ------- 168 A function that takes a dictionary of parameter - value pairs and returns 169 a dictionary with the results (same output as decoder). 170 """ 171 def surrogate_fn(inputs): 172 def swap(x): 173 if len(x) > 1: 174 return list(x) 175 else: 176 return x[0] 177 values = np.array([inputs[key] for key in self.inputs]) 178 results = dict([(qoi, swap((self.raw_data['fit'][qoi](*values)).T)) 179 for qoi in self.qois]) 180 return results 181 return surrogate_fn
Return a PCE surrogate model.
Returns
- A function that takes a dictionary of parameter - value pairs and returns
- a dictionary with the results (same output as decoder).
def
get_distribution(self, qoi):
183 def get_distribution(self, qoi): 184 """Returns a distribution for the given qoi. 185 186 Parameters 187 ---------- 188 qoi: str 189 QoI name 190 191 Returns 192 ------- 193 A ChaosPy PDF 194 """ 195 if qoi not in self.qois: 196 raise RuntimeError('no such quantity of interest - {}'.format(qoi)) 197 return self.raw_data['output_distributions'][qoi]
Returns a distribution for the given qoi.
Parameters
- qoi (str): QoI name
Returns
- A ChaosPy PDF
200class PCEAnalysis(BaseAnalysisElement): 201 202 def __init__(self, sampler=None, qoi_cols=None, sampling=False, CorrelationMatrices=True, OutputDistributions=True): 203 """Analysis element for polynomial chaos expansion (PCE). 204 205 Parameters 206 ---------- 207 sampler : PCESampler 208 Sampler used to initiate the PCE analysis. 209 qoi_cols : list or None 210 Column names for quantities of interest (for which analysis is 211 performed). 212 sampling : True if chaospy sampling method to be used for calculating 213 statistical quantities; otherwise [default] the pce coefficients are used 214 CorrelationMatrices : boolean 215 if False then disable the calculation of the Correlation Matrices, otherwise 216 [default] calculate them 217 OutputDistributions : boolean 218 if False then disable the calculation of the Output Distributions, otherwise 219 [default] calculate them 220 """ 221 222 if sampler is None: 223 msg = 'PCE analysis requires a paired sampler to be passed' 224 raise RuntimeError(msg) 225 226 # Flag specifing if we should scale the runs with the nominal base run 227 self.relative_analysis = sampler.relative_analysis 228 229 if qoi_cols is None: 230 raise RuntimeError("Analysis element requires a list of " 231 "quantities of interest (qoi)") 232 233 self.qoi_cols = qoi_cols 234 self.sampling = sampling 235 self.output_type = OutputType.SUMMARY 236 self.sampler = sampler 237 self.CorrelationMatrices = CorrelationMatrices 238 self.CorrelationMatrices_messages = 0 239 self.OutputDistributions = OutputDistributions 240 self.OutputDistributions_messages = 0 241 242 def element_name(self): 243 """Name for this element for logging purposes. 244 245 Returns 246 ------- 247 str 248 "PCE_Analysis" 249 """ 250 return "PCE_Analysis" 251 252 def element_version(self): 253 """Version of this element for logging purposes. 254 255 Returns ## print out the warning only once 256 self.OutputDistributions_messages += 1 257 ------- 258 str 259 Element version. 260 """ 261 return "0.6" 262 263 def analyse(self, data_frame=None): 264 """Perform PCE analysis on input `data_frame`. 265 266 Parameters 267 ---------- 268 data_frame : pandas DataFrame 269 Input data for analysis. 270 271 Returns 272 ------- 273 PCEAnalysisResults 274 Use it to get the sobol indices and other information. 275 """ 276 277 def sobols(P, coefficients): 278 """ Utility routine to calculate sobols based on coefficients 279 """ 280 A = np.array(P.coefficients) != 0 281 multi_indices = np.array([P.exponents[A[:, i]].sum(axis=0) for i in range(A.shape[1])]) 282 sobol_mask = multi_indices != 0 283 _, index = np.unique(sobol_mask, axis=0, return_index=True) 284 index = np.sort(index) 285 sobol_idx_bool = sobol_mask[index] 286 sobol_idx_bool = np.delete(sobol_idx_bool, [0], axis=0) 287 n_sobol_available = sobol_idx_bool.shape[0] 288 if len(coefficients.shape) == 1: 289 n_out = 1 290 else: 291 n_out = coefficients.shape[1] 292 n_coeffs = coefficients.shape[0] 293 sobol_poly_idx = np.zeros([n_coeffs, n_sobol_available]) 294 for i_sobol in range(n_sobol_available): 295 sobol_poly_idx[:, i_sobol] = np.all(sobol_mask == sobol_idx_bool[i_sobol], axis=1) 296 sobol = np.zeros([n_sobol_available, n_out]) 297 for i_sobol in range(n_sobol_available): 298 sobol[i_sobol] = np.sum( 299 np.square(coefficients[sobol_poly_idx[:, i_sobol] == 1]), axis=0) 300 idx_sort_descend_1st = np.argsort(sobol[:, 0], axis=0)[::-1] 301 sobol = sobol[idx_sort_descend_1st, :] 302 sobol_idx_bool = sobol_idx_bool[idx_sort_descend_1st] 303 sobol_idx = [0 for _ in range(sobol_idx_bool.shape[0])] 304 for i_sobol in range(sobol_idx_bool.shape[0]): 305 sobol_idx[i_sobol] = np.array( 306 [i for i, x in enumerate(sobol_idx_bool[i_sobol, :]) if x]) 307 var = ((coefficients[1:]**2).sum(axis=0)) 308 sobol = sobol / (var + np.finfo(float).tiny) 309 return sobol, sobol_idx, sobol_idx_bool 310 311 def build_surrogate_der(Y_hat, verbose=False): 312 '''Computes derivative of the polynomial Y_hat w.r.t. Vars 313 Parameter T specifies the time dimension 314 ''' 315 316 # Build derivative with respect to all variables 317 dim = len(self.sampler.vary.vary_dict) 318 if dim < 1: 319 return 0 320 elif dim == 1: 321 Vars = [cp.variable(dim).names[0]] 322 else: 323 Vars = [v.names[0] for v in cp.variable(dim)] 324 325 T = len(Y_hat) 326 327 assert(len(Vars) == len(self.sampler.vary.vary_dict)) 328 329 # derivative of the PCE expansion 330 # {dYhat_dx1: [t0, t1, ...], 331 # dYhat_dx2: [t0, t1, ...], 332 # ..., 333 # dYhat_dxN: [t0, t1, ...] } 334 dY_hat = {v:[cp.polynomial(0) for t in range(T)] for v in self.sampler.vary.vary_dict} 335 336 for t in range(T): 337 338 for n1, n2 in zip(Y_hat[t].names, Vars): 339 assert(n1 == n2) 340 341 for d_var_idx, (d_var, d_var_app) in enumerate(zip(Vars, self.sampler.vary.vary_dict)): 342 343 if verbose: 344 print(f'Computing derivative d(Y_hat)/d({d_var})') 345 print('='*40) 346 347 # Some variables are missing in the expression, 348 # then they must be constant terms only i.e. sum(exp==0) 349 if Y_hat[t].exponents.shape[1] < dim: 350 #exponents.shape: (n_summands, n_variables) 351 assert(sum(sum(np.array(Y_hat[t].exponents))) == 0) 352 continue 353 354 # Consider only polynomial components var^exp where exp > 0 (since the derivative decreases exp by -1) 355 components_mask = np.array(Y_hat[t].exponents[:,d_var_idx] > 0) 356 dY_hat_dvar_exp = Y_hat[t].exponents[components_mask] 357 dY_hat_dvar_coeff = np.array(Y_hat[t].coefficients)[components_mask] 358 359 # Iterate over all polynomial components (summands) 360 for i, (coeff, exp) in enumerate(zip(dY_hat_dvar_coeff, dY_hat_dvar_exp)): 361 assert(exp[d_var_idx] > 0) 362 363 # derivative = coeff*exp * var^(exp-1) 364 dY_hat_dvar_coeff[i] = coeff * exp[d_var_idx] 365 dY_hat_dvar_exp[i][d_var_idx] = exp[d_var_idx] - 1 366 367 dY_hat[d_var_app][t] = numpoly.construct.polynomial_from_attributes( 368 exponents=dY_hat_dvar_exp, 369 coefficients=dY_hat_dvar_coeff, 370 names=Y_hat[t].names, 371 retain_coefficients=True, 372 retain_names=True) 373 374 return dY_hat 375 376 if data_frame is None: 377 raise RuntimeError("Analysis element needs a data frame to " 378 "analyse") 379 elif data_frame.empty: 380 raise RuntimeError( 381 "No data in data frame passed to analyse element") 382 383 qoi_cols = self.qoi_cols 384 T = len(data_frame[qoi_cols[0]].values[-1]) 385 386 results = {'statistical_moments': {k: {'mean':np.zeros(T), 387 'var':np.zeros(T), 388 'std':np.zeros(T)} for k in qoi_cols}, 389 'percentiles': {k: {'p01': np.zeros(T), 390 'p10': np.zeros(T), 391 'p50': np.zeros(T), 392 'p90': np.zeros(T), 393 'p99': np.zeros(T)} for k in qoi_cols}, 394 'sobols_first': {k: {p: np.zeros(T) for p in self.sampler.vary.vary_dict} for k in qoi_cols}, 395 'sobols_second': {k: {p: np.zeros(T) for p in self.sampler.vary.vary_dict} for k in qoi_cols}, 396 'sobols_total': {k: {p: np.zeros(T) for p in self.sampler.vary.vary_dict} for k in qoi_cols}, 397 'correlation_matrices': {k: {} for k in qoi_cols}, 398 'output_distributions': {k: {} for k in qoi_cols}, 399 'fit': {k: cp.polynomial(np.zeros(T)) for k in qoi_cols}, 400 'Fourier_coefficients': {k: {p: np.zeros(T) for p in self.sampler.vary.vary_dict} for k in qoi_cols}, 401 'derivatives_first': {k: {p: np.zeros(T) for p in self.sampler.vary.vary_dict} for k in qoi_cols}, 402 } 403 404 # Get sampler informations 405 P = self.sampler.P 406 nodes = self.sampler._nodes 407 weights = self.sampler._weights 408 regression = self.sampler.regression 409 410 samples = {k: [] for k in qoi_cols} 411 for k in qoi_cols: 412 if self.relative_analysis: 413 base = data_frame[k].values[self.sampler.n_samples] 414 if np.all(np.array(base) == 0): 415 warnings.warn(f"Removing QoI {k} from the analysis, contains all zeros", RuntimeWarning) 416 continue 417 if np.any(np.array(base) == 0): 418 warnings.warn(f"Removing QoI {k} from the analysis, contains some zeros", RuntimeWarning) 419 continue 420 421 samples[k] = data_frame[k].values[:self.sampler.n_samples] 422 423 # Compute descriptive statistics for each quantity of interest 424 if regression: 425 fit, fc = cp.fit_regression(P, [n[:self.sampler.n_samples] for n in nodes], samples[k], retall=1) 426 else: 427 fit, fc = cp.fit_quadrature(P, nodes, weights, samples[k], retall=1) 428 results['fit'][k] = fit 429 results['Fourier_coefficients'][k] = fc 430 431 # Percentiles: 1%, 10%, 50%, 90% and 99% 432 P01, P10, P50, P90, P99 = cp.Perc( 433 fit, [1, 10, 50, 90, 99], self.sampler.distribution).squeeze() 434 results['percentiles'][k] = {'p01': P01, 'p10': P10, 'p50': P50, 'p90': P90, 'p99': P99} 435 436 if self.sampling: # use Chaospy's sampling method 437 438 # Statistical moments 439 mean = cp.E(fit, self.sampler.distribution) 440 var = cp.Var(fit, self.sampler.distribution) 441 std = cp.Std(fit, self.sampler.distribution) 442 results['statistical_moments'][k] = {'mean': mean, 443 'var': var, 444 'std': std} 445 446 sobols_first_narr = cp.Sens_m(fit, self.sampler.distribution) 447 sobols_second_narr = cp.Sens_m2(fit, self.sampler.distribution) 448 sobols_total_narr = cp.Sens_t(fit, self.sampler.distribution) 449 sobols_first_dict = {} 450 sobols_second_dict = {} 451 sobols_total_dict = {} 452 for i, param_name in enumerate(self.sampler.vary.vary_dict): 453 sobols_first_dict[param_name] = sobols_first_narr[i] 454 sobols_second_dict[param_name] = sobols_second_narr[i] 455 sobols_total_dict[param_name] = sobols_total_narr[i] 456 457 results['sobols_first'][k] = sobols_first_dict 458 results['sobols_second'][k] = sobols_second_dict 459 results['sobols_total'][k] = sobols_total_dict 460 461 else: # use PCE coefficients 462 463 # Statistical moments 464 mean = fc[0] 465 var = np.sum(fc[1:]**2, axis=0) 466 std = np.sqrt(var) 467 results['statistical_moments'][k] = {'mean': mean, 468 'var': var, 469 'std': std} 470 471 # Sensitivity Analysis: First, Second and Total Sobol indices 472 sobol, sobol_idx, _ = sobols(P, fc) 473 varied = [_ for _ in self.sampler.vary.get_keys()] 474 S1 = {_: np.zeros(sobol.shape[-1]) for _ in varied} 475 ST = {_: np.zeros(sobol.shape[-1]) for _ in varied} 476 # S2 = {_ : {__: np.zeros(sobol.shape[-1]) for __ in varied} for _ in varied} 477 # for v in varied: del S2[v][v] 478 S2 = {_: np.zeros((len(varied), sobol.shape[-1])) for _ in varied} 479 for n, si in enumerate(sobol_idx): 480 if len(si) == 1: 481 v = varied[si[0]] 482 S1[v] = sobol[n] 483 elif len(si) == 2: 484 v1 = varied[si[0]] 485 v2 = varied[si[1]] 486 # S2[v1][v2] = sobol[n] 487 # S2[v2][v1] = sobol[n] 488 S2[v1][si[1]] = sobol[n] 489 S2[v2][si[0]] = sobol[n] 490 for i in si: 491 ST[varied[i]] += sobol[n] 492 493 results['sobols_first'][k] = S1 494 results['sobols_second'][k] = S2 495 results['sobols_total'][k] = ST 496 497 # Sensitivity Analysis: Derivative based 498 try: 499 dY_hat = build_surrogate_der(fit, verbose=False) 500 derivatives_first_dict = {} 501 Ndimensions = len(self.sampler.vary.vary_dict) 502 for i, param_name in enumerate(self.sampler.vary.vary_dict): 503 if self.sampler.nominal_value: 504 # Evaluate dY_hat['param'] at the nominal value of the parameters 505 values = self.sampler.nominal_value 506 logging.info(f"Using nominal value of the parameters to evaluate the derivative ") 507 derivatives_first_dict[param_name] = cp.polynomial(dY_hat[param_name])(*[v for v in values.values()]) 508 elif all([type(v) == type(cp.Normal()) for v in self.sampler.vary.vary_dict.values()]): 509 # Evaluate dY_hat['param'] at the mean of the parameters 510 logging.info(f"Using mean value of the parameters to evaluate the derivative ") 511 derivatives_first_dict[param_name] = cp.polynomial(dY_hat[param_name])(*[v.get_mom_parameters()["shift"][0] for v in self.sampler.vary.vary_dict.values()]) 512 elif all([type(v) == type(cp.Uniform()) for v in self.sampler.vary.vary_dict.values()]): 513 logging.info(f"Using mean value of the parameters to evaluate the derivative ") 514 # Evaluate dY_hat['param'] at the mean of the parameters 515 derivatives_first_dict[param_name] = cp.polynomial(dY_hat[param_name])(*[(v.lower + v.upper)/2.0 for v in self.sampler.vary.vary_dict.values()]) 516 else: 517 # Evaluate dY_hat['param'] at the zero vector 518 logging.info(f"Using zero vector to evaluate the derivative ") 519 derivatives_first_dict[param_name] = cp.polynomial(dY_hat[param_name])(*np.zeros(Ndimensions)) 520 521 results['derivatives_first'][k] = derivatives_first_dict 522 523 except Exception: 524 traceback.print_exc() 525 526 # Transform the relative numbers back to the absolute values 527 if self.relative_analysis: 528 base = data_frame[k].values[-1] 529 530 results['percentiles'][k]['p01'] = (1.0 + results['percentiles'][k]['p01']) * base 531 results['percentiles'][k]['p10'] = (1.0 + results['percentiles'][k]['p10']) * base 532 results['percentiles'][k]['p50'] = (1.0 + results['percentiles'][k]['p50']) * base 533 results['percentiles'][k]['p90'] = (1.0 + results['percentiles'][k]['p90']) * base 534 results['percentiles'][k]['p99'] = (1.0 + results['percentiles'][k]['p99']) * base 535 results['statistical_moments'][k]['mean'] = (1.0 + results['statistical_moments'][k]['mean']) * base 536 results['statistical_moments'][k]['var'] = (1.0 + results['statistical_moments'][k]['var']) * base 537 results['statistical_moments'][k]['std'] = (1.0 + results['statistical_moments'][k]['std']) * base 538 539 # Correlation matrix 540 try: 541 if self.sampler._is_dependent: 542 if self.CorrelationMatrices_messages == 0: warnings.warn(f"Skipping computation of cp.Corr", RuntimeWarning) ## print out the warning only once 543 self.CorrelationMatrices_messages += 1 544 results['correlation_matrices'][k] = None 545 else: 546 if self.CorrelationMatrices: 547 results['correlation_matrices'][k] = cp.Corr(fit, self.sampler.distribution) 548 else: 549 if self.CorrelationMatrices_messages == 0: warnings.warn(f"Skipping computation of cp.Corr", RuntimeWarning) ## print out the warning only once 550 self.CorrelationMatrices_messages += 1 551 results['correlation_matrices'][k] = None 552 except Exception as e: 553 print ('Error %s for %s when computing cp.Corr()'% (e.__class__.__name__, k)) 554 results['correlation_matrices'][k] = None 555 556 557 # Output distributions 558 try: 559 if self.sampler._is_dependent: 560 if self.OutputDistributions_messages == 0: warnings.warn(f"Skipping computation of cp.QoI_Dist", RuntimeWarning) ## print out the warning only once 561 self.OutputDistributions_messages += 1 562 results['output_distributions'][k] = None 563 else: 564 if self.OutputDistributions: 565 results['output_distributions'][k] = cp.QoI_Dist( fit, self.sampler.distribution) 566 else: 567 if self.OutputDistributions_messages == 0: warnings.warn(f"Skipping computation of cp.QoI_Dist", RuntimeWarning) ## print out the warning only once 568 self.OutputDistributions_messages += 1 569 results['output_distributions'][k] = None 570 except Exception as e: 571 print ('Error %s for %s when computing cp.QoI_Dist()'% (e.__class__.__name__, k)) 572# from traceback import print_exc 573# print_exc() 574 results['output_distributions'][k] = None 575 576 return PCEAnalysisResults(raw_data=results, samples=data_frame, 577 qois=self.qoi_cols, inputs=list(self.sampler.vary.get_keys()))
Base class for all EasyVVUQ analysis elements.
Attributes
PCEAnalysis( sampler=None, qoi_cols=None, sampling=False, CorrelationMatrices=True, OutputDistributions=True)
202 def __init__(self, sampler=None, qoi_cols=None, sampling=False, CorrelationMatrices=True, OutputDistributions=True): 203 """Analysis element for polynomial chaos expansion (PCE). 204 205 Parameters 206 ---------- 207 sampler : PCESampler 208 Sampler used to initiate the PCE analysis. 209 qoi_cols : list or None 210 Column names for quantities of interest (for which analysis is 211 performed). 212 sampling : True if chaospy sampling method to be used for calculating 213 statistical quantities; otherwise [default] the pce coefficients are used 214 CorrelationMatrices : boolean 215 if False then disable the calculation of the Correlation Matrices, otherwise 216 [default] calculate them 217 OutputDistributions : boolean 218 if False then disable the calculation of the Output Distributions, otherwise 219 [default] calculate them 220 """ 221 222 if sampler is None: 223 msg = 'PCE analysis requires a paired sampler to be passed' 224 raise RuntimeError(msg) 225 226 # Flag specifing if we should scale the runs with the nominal base run 227 self.relative_analysis = sampler.relative_analysis 228 229 if qoi_cols is None: 230 raise RuntimeError("Analysis element requires a list of " 231 "quantities of interest (qoi)") 232 233 self.qoi_cols = qoi_cols 234 self.sampling = sampling 235 self.output_type = OutputType.SUMMARY 236 self.sampler = sampler 237 self.CorrelationMatrices = CorrelationMatrices 238 self.CorrelationMatrices_messages = 0 239 self.OutputDistributions = OutputDistributions 240 self.OutputDistributions_messages = 0
Analysis element for polynomial chaos expansion (PCE).
Parameters
- sampler (PCESampler): Sampler used to initiate the PCE analysis.
- qoi_cols (list or None): Column names for quantities of interest (for which analysis is performed).
- sampling (True if chaospy sampling method to be used for calculating): statistical quantities; otherwise [default] the pce coefficients are used
- CorrelationMatrices (boolean): if False then disable the calculation of the Correlation Matrices, otherwise [default] calculate them
- OutputDistributions (boolean): if False then disable the calculation of the Output Distributions, otherwise [default] calculate them
def
element_name(self):
242 def element_name(self): 243 """Name for this element for logging purposes. 244 245 Returns 246 ------- 247 str 248 "PCE_Analysis" 249 """ 250 return "PCE_Analysis"
Name for this element for logging purposes.
Returns
- str: "PCE_Analysis"
def
element_version(self):
252 def element_version(self): 253 """Version of this element for logging purposes. 254 255 Returns ## print out the warning only once 256 self.OutputDistributions_messages += 1 257 ------- 258 str 259 Element version. 260 """ 261 return "0.6"
Version of this element for logging purposes.
Returns ## print out the warning only once
self.OutputDistributions_messages += 1
str Element version.
def
analyse(self, data_frame=None):
263 def analyse(self, data_frame=None): 264 """Perform PCE analysis on input `data_frame`. 265 266 Parameters 267 ---------- 268 data_frame : pandas DataFrame 269 Input data for analysis. 270 271 Returns 272 ------- 273 PCEAnalysisResults 274 Use it to get the sobol indices and other information. 275 """ 276 277 def sobols(P, coefficients): 278 """ Utility routine to calculate sobols based on coefficients 279 """ 280 A = np.array(P.coefficients) != 0 281 multi_indices = np.array([P.exponents[A[:, i]].sum(axis=0) for i in range(A.shape[1])]) 282 sobol_mask = multi_indices != 0 283 _, index = np.unique(sobol_mask, axis=0, return_index=True) 284 index = np.sort(index) 285 sobol_idx_bool = sobol_mask[index] 286 sobol_idx_bool = np.delete(sobol_idx_bool, [0], axis=0) 287 n_sobol_available = sobol_idx_bool.shape[0] 288 if len(coefficients.shape) == 1: 289 n_out = 1 290 else: 291 n_out = coefficients.shape[1] 292 n_coeffs = coefficients.shape[0] 293 sobol_poly_idx = np.zeros([n_coeffs, n_sobol_available]) 294 for i_sobol in range(n_sobol_available): 295 sobol_poly_idx[:, i_sobol] = np.all(sobol_mask == sobol_idx_bool[i_sobol], axis=1) 296 sobol = np.zeros([n_sobol_available, n_out]) 297 for i_sobol in range(n_sobol_available): 298 sobol[i_sobol] = np.sum( 299 np.square(coefficients[sobol_poly_idx[:, i_sobol] == 1]), axis=0) 300 idx_sort_descend_1st = np.argsort(sobol[:, 0], axis=0)[::-1] 301 sobol = sobol[idx_sort_descend_1st, :] 302 sobol_idx_bool = sobol_idx_bool[idx_sort_descend_1st] 303 sobol_idx = [0 for _ in range(sobol_idx_bool.shape[0])] 304 for i_sobol in range(sobol_idx_bool.shape[0]): 305 sobol_idx[i_sobol] = np.array( 306 [i for i, x in enumerate(sobol_idx_bool[i_sobol, :]) if x]) 307 var = ((coefficients[1:]**2).sum(axis=0)) 308 sobol = sobol / (var + np.finfo(float).tiny) 309 return sobol, sobol_idx, sobol_idx_bool 310 311 def build_surrogate_der(Y_hat, verbose=False): 312 '''Computes derivative of the polynomial Y_hat w.r.t. Vars 313 Parameter T specifies the time dimension 314 ''' 315 316 # Build derivative with respect to all variables 317 dim = len(self.sampler.vary.vary_dict) 318 if dim < 1: 319 return 0 320 elif dim == 1: 321 Vars = [cp.variable(dim).names[0]] 322 else: 323 Vars = [v.names[0] for v in cp.variable(dim)] 324 325 T = len(Y_hat) 326 327 assert(len(Vars) == len(self.sampler.vary.vary_dict)) 328 329 # derivative of the PCE expansion 330 # {dYhat_dx1: [t0, t1, ...], 331 # dYhat_dx2: [t0, t1, ...], 332 # ..., 333 # dYhat_dxN: [t0, t1, ...] } 334 dY_hat = {v:[cp.polynomial(0) for t in range(T)] for v in self.sampler.vary.vary_dict} 335 336 for t in range(T): 337 338 for n1, n2 in zip(Y_hat[t].names, Vars): 339 assert(n1 == n2) 340 341 for d_var_idx, (d_var, d_var_app) in enumerate(zip(Vars, self.sampler.vary.vary_dict)): 342 343 if verbose: 344 print(f'Computing derivative d(Y_hat)/d({d_var})') 345 print('='*40) 346 347 # Some variables are missing in the expression, 348 # then they must be constant terms only i.e. sum(exp==0) 349 if Y_hat[t].exponents.shape[1] < dim: 350 #exponents.shape: (n_summands, n_variables) 351 assert(sum(sum(np.array(Y_hat[t].exponents))) == 0) 352 continue 353 354 # Consider only polynomial components var^exp where exp > 0 (since the derivative decreases exp by -1) 355 components_mask = np.array(Y_hat[t].exponents[:,d_var_idx] > 0) 356 dY_hat_dvar_exp = Y_hat[t].exponents[components_mask] 357 dY_hat_dvar_coeff = np.array(Y_hat[t].coefficients)[components_mask] 358 359 # Iterate over all polynomial components (summands) 360 for i, (coeff, exp) in enumerate(zip(dY_hat_dvar_coeff, dY_hat_dvar_exp)): 361 assert(exp[d_var_idx] > 0) 362 363 # derivative = coeff*exp * var^(exp-1) 364 dY_hat_dvar_coeff[i] = coeff * exp[d_var_idx] 365 dY_hat_dvar_exp[i][d_var_idx] = exp[d_var_idx] - 1 366 367 dY_hat[d_var_app][t] = numpoly.construct.polynomial_from_attributes( 368 exponents=dY_hat_dvar_exp, 369 coefficients=dY_hat_dvar_coeff, 370 names=Y_hat[t].names, 371 retain_coefficients=True, 372 retain_names=True) 373 374 return dY_hat 375 376 if data_frame is None: 377 raise RuntimeError("Analysis element needs a data frame to " 378 "analyse") 379 elif data_frame.empty: 380 raise RuntimeError( 381 "No data in data frame passed to analyse element") 382 383 qoi_cols = self.qoi_cols 384 T = len(data_frame[qoi_cols[0]].values[-1]) 385 386 results = {'statistical_moments': {k: {'mean':np.zeros(T), 387 'var':np.zeros(T), 388 'std':np.zeros(T)} for k in qoi_cols}, 389 'percentiles': {k: {'p01': np.zeros(T), 390 'p10': np.zeros(T), 391 'p50': np.zeros(T), 392 'p90': np.zeros(T), 393 'p99': np.zeros(T)} for k in qoi_cols}, 394 'sobols_first': {k: {p: np.zeros(T) for p in self.sampler.vary.vary_dict} for k in qoi_cols}, 395 'sobols_second': {k: {p: np.zeros(T) for p in self.sampler.vary.vary_dict} for k in qoi_cols}, 396 'sobols_total': {k: {p: np.zeros(T) for p in self.sampler.vary.vary_dict} for k in qoi_cols}, 397 'correlation_matrices': {k: {} for k in qoi_cols}, 398 'output_distributions': {k: {} for k in qoi_cols}, 399 'fit': {k: cp.polynomial(np.zeros(T)) for k in qoi_cols}, 400 'Fourier_coefficients': {k: {p: np.zeros(T) for p in self.sampler.vary.vary_dict} for k in qoi_cols}, 401 'derivatives_first': {k: {p: np.zeros(T) for p in self.sampler.vary.vary_dict} for k in qoi_cols}, 402 } 403 404 # Get sampler informations 405 P = self.sampler.P 406 nodes = self.sampler._nodes 407 weights = self.sampler._weights 408 regression = self.sampler.regression 409 410 samples = {k: [] for k in qoi_cols} 411 for k in qoi_cols: 412 if self.relative_analysis: 413 base = data_frame[k].values[self.sampler.n_samples] 414 if np.all(np.array(base) == 0): 415 warnings.warn(f"Removing QoI {k} from the analysis, contains all zeros", RuntimeWarning) 416 continue 417 if np.any(np.array(base) == 0): 418 warnings.warn(f"Removing QoI {k} from the analysis, contains some zeros", RuntimeWarning) 419 continue 420 421 samples[k] = data_frame[k].values[:self.sampler.n_samples] 422 423 # Compute descriptive statistics for each quantity of interest 424 if regression: 425 fit, fc = cp.fit_regression(P, [n[:self.sampler.n_samples] for n in nodes], samples[k], retall=1) 426 else: 427 fit, fc = cp.fit_quadrature(P, nodes, weights, samples[k], retall=1) 428 results['fit'][k] = fit 429 results['Fourier_coefficients'][k] = fc 430 431 # Percentiles: 1%, 10%, 50%, 90% and 99% 432 P01, P10, P50, P90, P99 = cp.Perc( 433 fit, [1, 10, 50, 90, 99], self.sampler.distribution).squeeze() 434 results['percentiles'][k] = {'p01': P01, 'p10': P10, 'p50': P50, 'p90': P90, 'p99': P99} 435 436 if self.sampling: # use Chaospy's sampling method 437 438 # Statistical moments 439 mean = cp.E(fit, self.sampler.distribution) 440 var = cp.Var(fit, self.sampler.distribution) 441 std = cp.Std(fit, self.sampler.distribution) 442 results['statistical_moments'][k] = {'mean': mean, 443 'var': var, 444 'std': std} 445 446 sobols_first_narr = cp.Sens_m(fit, self.sampler.distribution) 447 sobols_second_narr = cp.Sens_m2(fit, self.sampler.distribution) 448 sobols_total_narr = cp.Sens_t(fit, self.sampler.distribution) 449 sobols_first_dict = {} 450 sobols_second_dict = {} 451 sobols_total_dict = {} 452 for i, param_name in enumerate(self.sampler.vary.vary_dict): 453 sobols_first_dict[param_name] = sobols_first_narr[i] 454 sobols_second_dict[param_name] = sobols_second_narr[i] 455 sobols_total_dict[param_name] = sobols_total_narr[i] 456 457 results['sobols_first'][k] = sobols_first_dict 458 results['sobols_second'][k] = sobols_second_dict 459 results['sobols_total'][k] = sobols_total_dict 460 461 else: # use PCE coefficients 462 463 # Statistical moments 464 mean = fc[0] 465 var = np.sum(fc[1:]**2, axis=0) 466 std = np.sqrt(var) 467 results['statistical_moments'][k] = {'mean': mean, 468 'var': var, 469 'std': std} 470 471 # Sensitivity Analysis: First, Second and Total Sobol indices 472 sobol, sobol_idx, _ = sobols(P, fc) 473 varied = [_ for _ in self.sampler.vary.get_keys()] 474 S1 = {_: np.zeros(sobol.shape[-1]) for _ in varied} 475 ST = {_: np.zeros(sobol.shape[-1]) for _ in varied} 476 # S2 = {_ : {__: np.zeros(sobol.shape[-1]) for __ in varied} for _ in varied} 477 # for v in varied: del S2[v][v] 478 S2 = {_: np.zeros((len(varied), sobol.shape[-1])) for _ in varied} 479 for n, si in enumerate(sobol_idx): 480 if len(si) == 1: 481 v = varied[si[0]] 482 S1[v] = sobol[n] 483 elif len(si) == 2: 484 v1 = varied[si[0]] 485 v2 = varied[si[1]] 486 # S2[v1][v2] = sobol[n] 487 # S2[v2][v1] = sobol[n] 488 S2[v1][si[1]] = sobol[n] 489 S2[v2][si[0]] = sobol[n] 490 for i in si: 491 ST[varied[i]] += sobol[n] 492 493 results['sobols_first'][k] = S1 494 results['sobols_second'][k] = S2 495 results['sobols_total'][k] = ST 496 497 # Sensitivity Analysis: Derivative based 498 try: 499 dY_hat = build_surrogate_der(fit, verbose=False) 500 derivatives_first_dict = {} 501 Ndimensions = len(self.sampler.vary.vary_dict) 502 for i, param_name in enumerate(self.sampler.vary.vary_dict): 503 if self.sampler.nominal_value: 504 # Evaluate dY_hat['param'] at the nominal value of the parameters 505 values = self.sampler.nominal_value 506 logging.info(f"Using nominal value of the parameters to evaluate the derivative ") 507 derivatives_first_dict[param_name] = cp.polynomial(dY_hat[param_name])(*[v for v in values.values()]) 508 elif all([type(v) == type(cp.Normal()) for v in self.sampler.vary.vary_dict.values()]): 509 # Evaluate dY_hat['param'] at the mean of the parameters 510 logging.info(f"Using mean value of the parameters to evaluate the derivative ") 511 derivatives_first_dict[param_name] = cp.polynomial(dY_hat[param_name])(*[v.get_mom_parameters()["shift"][0] for v in self.sampler.vary.vary_dict.values()]) 512 elif all([type(v) == type(cp.Uniform()) for v in self.sampler.vary.vary_dict.values()]): 513 logging.info(f"Using mean value of the parameters to evaluate the derivative ") 514 # Evaluate dY_hat['param'] at the mean of the parameters 515 derivatives_first_dict[param_name] = cp.polynomial(dY_hat[param_name])(*[(v.lower + v.upper)/2.0 for v in self.sampler.vary.vary_dict.values()]) 516 else: 517 # Evaluate dY_hat['param'] at the zero vector 518 logging.info(f"Using zero vector to evaluate the derivative ") 519 derivatives_first_dict[param_name] = cp.polynomial(dY_hat[param_name])(*np.zeros(Ndimensions)) 520 521 results['derivatives_first'][k] = derivatives_first_dict 522 523 except Exception: 524 traceback.print_exc() 525 526 # Transform the relative numbers back to the absolute values 527 if self.relative_analysis: 528 base = data_frame[k].values[-1] 529 530 results['percentiles'][k]['p01'] = (1.0 + results['percentiles'][k]['p01']) * base 531 results['percentiles'][k]['p10'] = (1.0 + results['percentiles'][k]['p10']) * base 532 results['percentiles'][k]['p50'] = (1.0 + results['percentiles'][k]['p50']) * base 533 results['percentiles'][k]['p90'] = (1.0 + results['percentiles'][k]['p90']) * base 534 results['percentiles'][k]['p99'] = (1.0 + results['percentiles'][k]['p99']) * base 535 results['statistical_moments'][k]['mean'] = (1.0 + results['statistical_moments'][k]['mean']) * base 536 results['statistical_moments'][k]['var'] = (1.0 + results['statistical_moments'][k]['var']) * base 537 results['statistical_moments'][k]['std'] = (1.0 + results['statistical_moments'][k]['std']) * base 538 539 # Correlation matrix 540 try: 541 if self.sampler._is_dependent: 542 if self.CorrelationMatrices_messages == 0: warnings.warn(f"Skipping computation of cp.Corr", RuntimeWarning) ## print out the warning only once 543 self.CorrelationMatrices_messages += 1 544 results['correlation_matrices'][k] = None 545 else: 546 if self.CorrelationMatrices: 547 results['correlation_matrices'][k] = cp.Corr(fit, self.sampler.distribution) 548 else: 549 if self.CorrelationMatrices_messages == 0: warnings.warn(f"Skipping computation of cp.Corr", RuntimeWarning) ## print out the warning only once 550 self.CorrelationMatrices_messages += 1 551 results['correlation_matrices'][k] = None 552 except Exception as e: 553 print ('Error %s for %s when computing cp.Corr()'% (e.__class__.__name__, k)) 554 results['correlation_matrices'][k] = None 555 556 557 # Output distributions 558 try: 559 if self.sampler._is_dependent: 560 if self.OutputDistributions_messages == 0: warnings.warn(f"Skipping computation of cp.QoI_Dist", RuntimeWarning) ## print out the warning only once 561 self.OutputDistributions_messages += 1 562 results['output_distributions'][k] = None 563 else: 564 if self.OutputDistributions: 565 results['output_distributions'][k] = cp.QoI_Dist( fit, self.sampler.distribution) 566 else: 567 if self.OutputDistributions_messages == 0: warnings.warn(f"Skipping computation of cp.QoI_Dist", RuntimeWarning) ## print out the warning only once 568 self.OutputDistributions_messages += 1 569 results['output_distributions'][k] = None 570 except Exception as e: 571 print ('Error %s for %s when computing cp.QoI_Dist()'% (e.__class__.__name__, k)) 572# from traceback import print_exc 573# print_exc() 574 results['output_distributions'][k] = None 575 576 return PCEAnalysisResults(raw_data=results, samples=data_frame, 577 qois=self.qoi_cols, inputs=list(self.sampler.vary.get_keys()))
Perform PCE analysis on input data_frame.
Parameters
- data_frame (pandas DataFrame): Input data for analysis.
Returns
- PCEAnalysisResults: Use it to get the sobol indices and other information.