Derived module from for Optimized DMD and Bagging, Optimized DMD

- Travis Askham and J. Nathan Kutz. Variable projection methods for an
optimized dynamic mode decomposition. SIAM Journal on Applied Dynamical
Systems, 17, 2018.
- Diya Sashidhar and J. Nathan Kutz. Bagging, optimized dynamic mode
decomposition (bop-dmd) for robust, stable forecasting with spatial and
temporal uncertainty-quantification. 2021. arXiv:2107.10878.

import warnings
from collections import OrderedDict
from inspect import isfunction

import numpy as np
from scipy.linalg import qr
from scipy.sparse import csr_matrix
import matplotlib.pyplot as plt

from .dmdbase import DMDBase
from .dmdoperator import DMDOperator
from .snapshots import Snapshots
from .utils import compute_rank, compute_svd

[docs]class BOPDMDOperator(DMDOperator): """ BOP-DMD operator. :param compute_A: Flag that determines whether or not to compute the full Koopman operator A. :type compute_A: bool :param use_proj: Flag that determines the type of computation to perform. If True, fit input data projected onto the first svd_rank POD modes or columns of proj_basis if provided. If False, fit the full input data. :type use_proj: bool :param init_alpha: Initial guess for the continuous-time DMD eigenvalues. :type init_alpha: numpy.ndarray :param proj_basis: Orthogonal basis for projection, where each column of proj_basis contains a basis mode. :type proj_basis: numpy.ndarray :param num_trials: Number of BOP-DMD trials to perform. If num_trials is a positive integer, num_trials BOP-DMD trials are performed. Otherwise, standard optimized dmd is performed. :type num_trials: int :param trial_size: Size of the randomly selected subset of observations to use for each trial of bagged optimized dmd (BOP-DMD). If trial_size is a positive integer, trial_size many observations will be used per trial. If trial_size is a float between 0 and 1, int(trial_size * m) many observations will be used per trial, where m denotes the total number of data points observed. Note that any other type of input for trial_size will yield an error. :type trial_size: int or float :param eig_sort: Method used to sort eigenvalues (and modes accordingly) when performing BOP-DMD. Eigenvalues will be sorted by real part and then by imaginary part to break ties if `eig_sort="real"`, by imaginary part and then by real part to break ties if `eig_sort="imag"`, or by magnitude if `eig_sort="abs"`. If `eig_sort="auto"`, one of the previously-mentioned sorting methods is chosen depending on eigenvalue variance. :type eig_sort: {"real", "imag", "abs", "auto"} :param eig_constraints: Set containing desired DMD operator eigenvalue constraints. Currently available constraints are `"stable"`, which constrains eigenvalues to the left half of the complex plane, `"imag"`, which constrains eigenvalues to the imaginary axis, and `"conjugate_pairs"`, which enforces that eigenvalues are always present with their complex conjugate. Note that constraints may be combined if valid. May alternatively be a custom eigenvalue constraint function that will be applied to the computed eigenvalues at each step of the variable projection routine. :type eig_constraints: set(str) or function :param mode_prox: Optional proximal operator function to apply to the DMD modes. If `use_proj` is False, this function is applied at every iteration of the variable projection routine. If `use_proj` is True, this function is instead applied at the end of the variable projection routine after the modes have been projected back to the space of the full input data. :type mode_prox: function :param remove_bad_bags: Whether or not to exclude results from bagging trials that didn't converge according to the tolerance used for variable projection. Default is False, all trial results are kept regardless of convergence. :type remove_bad_bags: bool :param bag_warning: Number of consecutive non-converged trials of BOP-DMD at which to produce a warning message for the user. Default is 100. This parameter becomes active only when `remove_bad_bags=True`. Use negative arguments for no warning condition. :type bag_warning: int :param bag_maxfail: Number of consecutive non-converged trials of BOP-DMD at which to terminate the fit. Default is 200. This parameter becomes active only when `remove_bad_bags=True`. Use negative arguments for no stopping condition. :type bag_maxfail: int :param init_lambda: Initial value used for the regularization parameter in the Levenberg method. Default is 1.0. Note: Larger lambda values make the method more like gradient descent. :type init_lambda: float :param maxlam: Maximum number of of steps used in the inner Levenberg loop, i.e. the number of times you increase lambda before quitting. Default is 52. :type maxlam: int :param lamup: The factor by which you increase lambda when searching for an appropriate step. Default is 2.0. :type lamup: float :param use_levmarq: Flag that determines whether you use the Levenberg algorithm or the Levenberg-Marquardt algorithm. Default is True, use Levenberg-Marquardt. :type use_levmarq: bool :param maxiter: The maximum number of outer loop iterations to use before quitting. Default is 30. :type maxiter: int :param tol: The tolerance for the relative error in the residual. i.e. the program will terminate if norm(y-Phi(alpha)*b,'fro')/norm(y,'fro') < tol is achieved. Default is 1e-6. :type tol: float :param eps_stall: The tolerance for detecting a stall. i.e. if error(iter-1)-error(iter) < eps_stall*err(iter-1) the program halts. Default is 1e-12. :type eps_stall: float :param use_fulljac: Flag that determines whether or not to use the full expression for the Jacobian or Kaufman's approximation. Default is True, use full expression. :type use_fulljac: bool :param verbose: Flag that determines whether or not to print warning messages that arise during the variable projection routine, and whether or not to print information regarding the method's iterative progress. Default is False, don't print information. :type verbose: bool """ def __init__( self, compute_A, use_proj, init_alpha, proj_basis, num_trials, trial_size, eig_sort, eig_constraints, mode_prox, remove_bad_bags, bag_warning, bag_maxfail, init_lambda=1.0, maxlam=52, lamup=2.0, use_levmarq=True, maxiter=30, tol=1e-6, eps_stall=1e-12, use_fulljac=True, verbose=False, ): self._compute_A = compute_A self._use_proj = use_proj self._init_alpha = init_alpha self._proj_basis = proj_basis self._num_trials = num_trials self._trial_size = trial_size self._eig_sort = eig_sort self._eig_constraints = eig_constraints self._mode_prox = mode_prox self._remove_bad_bags = remove_bad_bags self._bag_warning = bag_warning self._bag_maxfail = bag_maxfail self._varpro_opts = [ init_lambda, maxlam, lamup, use_levmarq, maxiter, tol, eps_stall, use_fulljac, verbose, ] self._varpro_opts_warn() self._modes = None self._eigenvalues = None self._modes_std = None self._eigenvalues_std = None self._amplitudes_std = None self._Atilde = None self._A = None @property def varpro_opts(self): """ Get the variable projection options. :return: the variable projection options. :rtype: tuple """ return self._varpro_opts @property def A(self): """ Get the full Koopman operator A. :return: the full Koopman operator A. :rtype: numpy.ndarray """ if not self._compute_A: msg = ( "A not computed during fit. " "Set parameter compute_A = True to compute A." ) raise ValueError(msg) return self._A @property def amplitudes_std(self): """ Get the amplitudes standard deviation. :return: amplitudes standard deviation. :rtype: numpy.ndarray """ return self._amplitudes_std @property def eigenvalues_std(self): """ Get the eigenvalues standard deviation. :return: eigenvalues standard deviation. :rtype: numpy.ndarray """ return self._eigenvalues_std @property def modes_std(self): """ Get the modes standard deviation. :return: modes standard deviation. :rtype: numpy.ndarray """ return self._modes_std
[docs] def _varpro_opts_warn(self): """ Checks the validity of the parameter values in _varpro_opts. Throws an error if any parameter value has an invalid type and generates a warning if any value lies outside of the recommended range. """ # Generate dictionary of recommended value range for each parameter. rec_ranges = OrderedDict() rec_ranges["init_lambda"] = [0.0, 1e16] rec_ranges["maxlam"] = [0, 200] rec_ranges["lamup"] = [1.0, 1e16] rec_ranges["use_levmarq"] = [-np.inf, np.inf] rec_ranges["maxiter"] = [0, 1e12] rec_ranges["tol"] = [0.0, 1e16] rec_ranges["eps_stall"] = [-np.inf, 1.0] rec_ranges["use_fulljac"] = [-np.inf, np.inf] rec_ranges["verbose"] = [-np.inf, np.inf] for opt_value, (opt_name, (opt_min, opt_max)) in zip( self._varpro_opts, rec_ranges.items() ): if not isinstance(opt_value, (int, float, bool)): raise TypeError("Invalid variable projection option given.") if opt_value < opt_min: msg = ( "Option {} with value {} is less than {}, " "which is not recommended." ) warnings.warn(msg.format(opt_name, opt_value, opt_min)) elif opt_value > opt_max: msg = ( "Option {} with value {} is greater than {}, " "which is not recommended." ) warnings.warn(msg.format(opt_name, opt_value, opt_max))
[docs] def _push_eigenvalues(self, eigenvalues): """ Helper function that constrains the given eigenvalues according to the arguments found in `self._eig_constraints`. If no constraints were given, this function simply returns the given eigenvalues. Applies the provided `eig_constraints` function if a function was provided instead of a set of constraints. :param eigenvalues: Vector of original eigenvalues. :type eigenvalues: numpy.ndarray :return: Vector of constrained eigenvalues. :rtype: numpy.ndarray """ if isfunction(self._eig_constraints): return self._eig_constraints(eigenvalues) if "conjugate_pairs" in self._eig_constraints: num_eigs = len(eigenvalues) new_eigs = np.empty(num_eigs, dtype="complex") # Set of indices -- keeps track of indices of eigenvalues that # still need to be paired with their complex conjugate. unassigned_inds = set(np.arange(num_eigs)) # If given an odd number of eigenvalues, find the eigenvalue with # the smallest imaginary part and take it to be a real eigenvalue. if num_eigs % 2 == 1: ind_0 = np.argmin(np.abs(eigenvalues.imag)) new_eigs[ind_0] = eigenvalues[ind_0].real unassigned_inds.remove(ind_0) # Assign complex conjugate pairs until all eigenvalues are paired. while unassigned_inds: # Randomly grab the next unassigned eigenvalue. ind_1 = unassigned_inds.pop() eig_1 = eigenvalues[ind_1] # Get the index of the eigenvalue that's closest to being the # complex conjugate of the randomly selected eigenvalue. ind_2 = np.argmin(np.abs(eigenvalues - np.conj(eig_1))) # Alert the user if an error occurs while pairing. # Complex conjugate pairs should be unique in theory. if ind_2 not in unassigned_inds: msg = ( "Error occurred while finding conjugate pairs. " 'Please remove the "conjugate pairs" constraint and ' "check that your system eigenvalues approximately " "consist of complex conjugate pairs." ) raise ValueError(msg) # Uniquely pair eig_1 and eig_2. eig_2 = eigenvalues[ind_2] unassigned_inds.remove(ind_2) # Average their real and imaginary components together. a = 0.5 * (eig_1.real + eig_2.real) b = 0.5 * (np.abs(eig_1.imag) + np.abs(eig_2.imag)) new_eigs[ind_1] = a + 1j * (b * np.sign(eig_1.imag)) new_eigs[ind_2] = a + 1j * (b * np.sign(eig_2.imag)) eigenvalues = np.copy(new_eigs) if "stable" in self._eig_constraints: right_half = eigenvalues.real > 0.0 eigenvalues[right_half] = 1j * eigenvalues[right_half].imag elif "imag" in self._eig_constraints: eigenvalues = 1j * eigenvalues.imag return eigenvalues
[docs] def _exp_function(self, alpha, t): """ Matrix of exponentials. :param alpha: Vector of time scalings in the exponent. :type alpha: numpy.ndarray :param t: Vector of time values. :type t: numpy.ndarray :return: Matrix A such that A[i, j] = exp(t_i * alpha_j). :rtype: numpy.ndarray """ return np.exp(np.outer(t, alpha))
[docs] def _exp_function_deriv(self, alpha, t, i): """ Derivatives of the matrix of exponentials. :param alpha: Vector of time scalings in the exponent. :type alpha: numpy.ndarray :param t: Vector of time values. :type t: numpy.ndarray :param i: Index in alpha of the derivative variable. :type i: int :return: Derivatives of Phi(alpha, t) with respect to alpha[i]. :rtype: scipy.sparse.csr_matrix """ m = len(t) n = len(alpha) if i < 0 or i > n - 1: raise ValueError("Invalid index i given to exp_function_deriv.") A = np.multiply(t, np.exp(alpha[i] * t)) return csr_matrix( (A, (np.arange(m), np.full(m, fill_value=i))), shape=(m, n) )
[docs] @staticmethod def _compute_irank_svd(X, tolrank): """ Helper function that computes and returns the SVD of X with a rank truncation of irank, which denotes the number of singular values of X greater than tolrank * s1, where s1 is the largest singular value of the matrix X. :param X: Matrix to decompose. :type X: numpy.ndarray :param tolrank: Determines the rank of the returned SVD. :type tolrank: float :return: irank truncated SVD of X. :rtype: numpy.ndarray, numpy.ndarray, numpy.ndarray """ U, s, Vh = np.linalg.svd(X, full_matrices=False) irank = np.sum(s > tolrank * s[0]) U = U[:, :irank] S = np.diag(s[:irank]) Vh = Vh[:irank] return U, S, Vh
[docs] def _argsort_eigenvalues(self, eigs): """ Helper function that computes and returns the indices that sort the given array of eigenvalues according to the operator's `eig_sort` attribute. Sets `eig_sort` according to eigs if not already done so. :param eigs: array of eigenvalues to sort :type eigs: numpy.ndarray :return: array of indices that sort the given eigenvalues :rtype: numpy.ndarray """ # Set the sorting style if eig_sort is "auto". # Note: the initial call to this function will set the eig_sort # attribute that will be used for the remainder of all fits! if self._eig_sort == "auto": real_var = np.var(eigs.real) imag_var = np.var(eigs.imag) abs_var = np.var(np.abs(eigs)) all_var = [real_var, imag_var, abs_var] self._eig_sort = ("real", "imag", "abs")[np.argmax(all_var)] # Sort the results according to eig_sort. if self._eig_sort == "real": return np.argsort(eigs) if self._eig_sort == "imag": eigs_real_imag_swapped = eigs.imag + (1j * eigs.real) return np.argsort(eigs_real_imag_swapped) if self._eig_sort == "abs": return np.argsort(np.abs(eigs)) raise ValueError("Provided eig_sort method is not supported.")
[docs] def _bag(self, H, trial_size): """ Given a 2D array of data X, where each row contains a data snapshot, randomly sub-selects and returns data snapshots while preserving the original snapshot order. Note that if trial_size is a positive integer, trial_size many observations will be used per trial. If trial_size is a float between 0 and 1, int(trial_size * m) many observations will be used per trial, where m denotes the total number of snapshots in X. The indices of the sub-selected snapshots are also returned. :param H: Full data matrix to be sub-selected from. :type H: numpy.ndarray :param trial_size: Size of the sub-selection from H. :type trial_size: int or float :return: Matrix of sub-selected data snapshots, stored in each row, and a vector of each snapshots's row index location in H. :rtype: numpy.ndarray, numpy.ndarray """ # Ensure that H is a 2D numpy.ndarray. if not isinstance(H, np.ndarray) or H.ndim != 2: msg = "H must be a 2D np.ndarray." raise ValueError(msg) if 0 < trial_size < 1: batch_size = int(trial_size * H.shape[0]) elif trial_size >= 1 and isinstance(trial_size, int): batch_size = trial_size else: msg = ( "Invalid trial_size parameter. trial_size must be either " "a positive integer or a float between 0 and 1." ) raise ValueError(msg) # Throw an error if the batch size is too large or too small. if batch_size > H.shape[0]: msg = ( "Error bagging the input data. Please ensure that the " "trial_size parameter is small enough for bagging." ) raise ValueError(msg) if batch_size == 0: msg = ( "Error bagging the input data. Please ensure that the " "trial_size parameter is large enough for bagging." ) raise ValueError(msg) # Obtain and return subset of the data. all_inds = np.arange(H.shape[0]) subset_inds = np.sort( np.random.choice(all_inds, size=batch_size, replace=False) ) return H[subset_inds], subset_inds
[docs] def _variable_projection(self, H, t, init_alpha, Phi, dPhi): """ Variable projection routine for multivariate data. Attempts to fit the columns of H as linear combinations of the columns of Phi(alpha,t) such that H = Phi(alpha,t)B. Note that M denotes the number of data samples, N denotes the number of columns of Phi, IS denotes the number of functions to fit, and IA denotes the length of the alpha vector. :param H: (M, IS) matrix of data. :type H: numpy.ndarray :param t: (M,) vector of sample times. :type t: numpy.ndarray :param init_alpha: initial guess for alpha. :type init_alpha: numpy.ndarray :param Phi: (M, N) matrix-valued function Phi(alpha,t). :type Phi: function :param dPhi: (M, N) matrix-valued function dPhi(alpha,t,i) that contains the derivatives of Phi wrt the ith component of alpha. :type dPhi: function :return: Tuple of two numpy arrays and a boolean representing: 1. (N, IS) best-fit matrix B. 2. (N,) best-fit vector alpha. 3. Flag indicating whether or not convergence was reached. :rtype: Tuple[numpy.ndarray, numpy.ndarray, bool] References: - Extensions and Uses of the Variable Projection Algorith for Solving Nonlinear Least Squares Problems by G. H. Golub and R. J. LeVeque ARO Report 79-3, Proceedings of the 1979 Army Numerical Analsysis and Computers Conference. - Variable projection for nonlinear least squares problems. Computational Optimization and Applications 54.3 (2013): 579-593 by Dianne P. O'Leary and Bert W. Rust. """ # Define M, IS, and IA. M, IS = H.shape IA = len(init_alpha) tolrank = M * np.finfo(float).eps # Unpack all variable projection parameters stored in varpro_opts. ( init_lambda, maxlam, lamup, use_levmarq, maxiter, tol, eps_stall, use_fulljac, verbose, ) = self._varpro_opts def compute_error(B, alpha): """ Compute the current residual, objective, and relative error. """ residual = H - Phi(alpha, t).dot(B) objective = 0.5 * np.linalg.norm(residual, "fro") ** 2 error = np.linalg.norm(residual, "fro") / np.linalg.norm(H, "fro") return residual, objective, error def compute_B(alpha): """ Update B for the current alpha. """ # Compute B using least squares. B = np.linalg.lstsq(Phi(alpha, t), H, rcond=None)[0] # Apply proximal operator if given, and if data isn't projected. if self._mode_prox is not None and not self._use_proj: B = self._mode_prox(B) return B # Initialize values. _lambda = init_lambda alpha = self._push_eigenvalues(init_alpha) B = compute_B(alpha) U, S, Vh = self._compute_irank_svd(Phi(alpha, t), tolrank) # Initialize termination flags. converged = False stalled = False # Initialize storage. all_error = np.zeros(maxiter) djac_matrix = np.zeros((M * IS, IA), dtype="complex") rjac = np.zeros((2 * IA, IA), dtype="complex") scales = np.zeros(IA) # Initialize iteration progress indicators. residual, objective, error = compute_error(B, alpha) for itr in range(maxiter): # Build Jacobian matrix, looping over alpha indices. for i in range(IA): # Build the approximate expression for the Jacobian. dphi_temp = dPhi(alpha, t, i) ut_dphi = csr_matrix(U.conj().T @ dphi_temp) uut_dphi = csr_matrix(U @ ut_dphi) djac_a = (dphi_temp - uut_dphi) @ B djac_matrix[:, i] = djac_a.ravel(order="F") # Compute the full expression for the Jacobian. if use_fulljac: transform = np.linalg.multi_dot([U, np.linalg.inv(S), Vh]) dphit_res = csr_matrix(dphi_temp.conj().T @ residual) djac_b = transform @ dphit_res djac_matrix[:, i] += djac_b.ravel(order="F") # Scale for the Levenberg algorithm. scales[i] = 1 # Scale for the Levenberg-Marquardt algorithm. if use_levmarq: scales[i] = min(np.linalg.norm(djac_matrix[:, i]), 1) scales[i] = max(scales[i], 1e-6) # Loop to determine lambda (the step-size parameter). rhs_temp = np.copy(residual.ravel(order="F"))[:, None] q_out, djac_out, j_pvt = qr( djac_matrix, mode="economic", pivoting=True ) ij_pvt = np.zeros(IA, dtype=int) ij_pvt[j_pvt] = np.arange(IA, dtype=int) rjac[:IA] = np.triu(djac_out[:IA]) rhs_top = q_out.conj() scales_pvt = scales[j_pvt[:IA]] rhs = np.concatenate( (rhs_top[:IA], np.zeros(IA, dtype="complex")), axis=None ) def step(_lambda, scales_pvt=scales_pvt, rhs=rhs, ij_pvt=ij_pvt): """ Helper function that, when given a step size _lambda, computes and returns the updated step and alpha vectors. """ # Compute the step delta. rjac[IA:] = _lambda * np.diag(scales_pvt) delta = np.linalg.lstsq(rjac, rhs, rcond=None)[0] delta = delta[ij_pvt] # Compute the updated alpha vector. alpha_updated = alpha.ravel() + delta.ravel() alpha_updated = self._push_eigenvalues(alpha_updated) return delta, alpha_updated # Take a step using our initial step size init_lambda. delta_0, alpha_0 = step(_lambda) B_0 = compute_B(alpha_0) residual_0, objective_0, error_0 = compute_error(B_0, alpha_0) # Check actual improvement vs predicted improvement. actual_improvement = objective - objective_0 pred_improvement = ( 0.5 * np.linalg.multi_dot( [delta_0.conj().T, djac_matrix.conj().T, rhs_temp] )[0].real ) improvement_ratio = actual_improvement / pred_improvement if error_0 < error: # Rescale lambda based on the improvement ratio. _lambda *= max(1 / 3, 1 - (2 * improvement_ratio - 1) ** 3) alpha, B = alpha_0, B_0 residual, objective, error = residual_0, objective_0, error_0 else: # Increase lambda until something works. for _ in range(maxlam): _lambda *= lamup delta_0, alpha_0 = step(_lambda) B_0 = compute_B(alpha_0) residual_0, objective_0, error_0 = compute_error( B_0, alpha_0 ) if error_0 < error: break # Terminate if no appropriate step length was found... if error_0 >= error: if verbose: msg = ( "Failed to find appropriate step length at " "iteration {}. Current error {}. " "Consider increasing maxlam or lamup." ) print(msg.format(itr + 1, error)) return B, alpha, converged # ...otherwise, update and proceed. alpha, B = alpha_0, B_0 residual, objective, error = residual_0, objective_0, error_0 # Update SVD information. U, S, Vh = self._compute_irank_svd(Phi(alpha, t), tolrank) # Record the current relative error. all_error[itr] = error # Print iterative progress if the verbose flag is turned on. if verbose: update_msg = "Step {} Error {} Lambda {}" print(update_msg.format(itr + 1, error, _lambda)) # Update termination status and terminate if converged or stalled. converged = error < tol error_reduction = all_error[itr - 1] - all_error[itr] stalled = (itr > 0) and ( error_reduction < eps_stall * all_error[itr - 1] ) if converged: if verbose: print("Convergence reached!") return B, alpha, converged if stalled: if verbose: msg = ( "Stall detected: error reduced by less than {} " "times the error at the previous step. " "Iteration {}. Current error {}. Consider " "increasing tol or decreasing eps_stall." ) print(msg.format(eps_stall, itr + 1, error)) return B, alpha, converged # Failed to meet tolerance in maxiter steps. if verbose: msg = ( "Failed to reach tolerance after maxiter = {} iterations. " "Current error {}." ) print(msg.format(maxiter, error)) return B, alpha, converged
[docs] def _single_trial_compute_operator(self, H, t, init_alpha): """ Helper function that computes the standard optimized dmd operator. Returns the resulting DMD modes, eigenvalues, amplitudes, reduced system matrix, full system matrix, and whether or not convergence of the variable projection routine was reached. """ B, alpha, converged = self._variable_projection( H, t, init_alpha, self._exp_function, self._exp_function_deriv ) # Save the modes, eigenvalues, and amplitudes respectively. w = B.T e = alpha b = np.sqrt(np.sum(np.abs(w) ** 2, axis=0)) # Normalize the modes and the amplitudes. inds_small = np.abs(b) < (10 * np.finfo(float).eps * np.max(b)) b[inds_small] = 1.0 w = / b)) w[:, inds_small] = 0.0 b[inds_small] = 0.0 # Compute the projected propagator Atilde. if self._use_proj: Atilde = np.linalg.multi_dot([w, np.diag(e), np.linalg.pinv(w)]) # Unproject the dmd modes. w = # Apply mode proximal operator if given. if self._mode_prox is not None: w = self._mode_prox(w) else: w_proj = self._proj_basis.conj() Atilde = np.linalg.multi_dot( [w_proj, np.diag(e), np.linalg.pinv(w_proj)] ) # Compute the full system matrix A. if self._compute_A: A = np.linalg.multi_dot([w, np.diag(e), np.linalg.pinv(w)]) else: A = None return w, e, b, Atilde, A, converged
[docs] def compute_operator(self, H, t): """ Compute the low-rank and the full BOP-DMD operators. :param H: Matrix of data to fit. :type H: numpy.ndarray :param t: Vector of sample times. :type t: numpy.ndarray :return: The BOP-DMD amplitudes. :rtype: numpy.ndarray """ # Perform an initial optimized dmd solve using init_alpha. initial_optdmd_results = self._single_trial_compute_operator( H, t, self._init_alpha ) w_0, e_0, b_0, Atilde_0, A_0, converged = initial_optdmd_results # Generate a warning if convergence wasn't initially reached. if not converged: msg = ( "Initial trial of Optimized DMD failed to converge. " "Consider re-adjusting your variable projection parameters " "with the varpro_opts_dict and consider setting verbose=True." ) warnings.warn(msg) # If num_trials isn't a positive int, perform standard optimized dmd. if self._num_trials <= 0 or not isinstance(self._num_trials, int): self._modes = w_0 self._eigenvalues = e_0 self._Atilde = Atilde_0 self._A = A_0 return b_0 # Otherwise, perform BOP-DMD. verbose = self._varpro_opts[-1] if verbose: num_trial_print = 5 msg = "\nDisplaying the results of the next {} trials...\n" print(msg.format(num_trial_print)) # We'll consider non-converged trials successful if the user didn't # request to remove bad bags. if verbose: if self._remove_bad_bags: print("Non-converged trial results will be removed...\n") else: print("Using all bag trial results...\n") # Initialize storage for values needed for stat computations. w_sum = np.zeros(w_0.shape, dtype="complex") e_sum = np.zeros(e_0.shape, dtype="complex") b_sum = np.zeros(b_0.shape, dtype="complex") w_sum2 = np.zeros(w_0.shape, dtype="complex") e_sum2 = np.zeros(e_0.shape, dtype="complex") b_sum2 = np.zeros(b_0.shape, dtype="complex") # Perform num_trials many successful trials of optimized dmd. num_successful_trials = 0 num_consecutive_fails = 0 runtime_warning_given = False while num_successful_trials < self._num_trials: H_i, subset_inds = self._bag(H, self._trial_size) trial_optdmd_results = self._single_trial_compute_operator( H_i, t[subset_inds], e_0 ) w_i, e_i, b_i, _, _, converged = trial_optdmd_results if verbose: print() num_trial_print -= 1 verbose = num_trial_print > 0 self._varpro_opts[-1] = verbose # Incorporate trial results into the running average if successful. if converged or not self._remove_bad_bags: sorted_inds = self._argsort_eigenvalues(e_i) # Add to iterative sums. w_sum += w_i[:, sorted_inds] e_sum += e_i[sorted_inds] b_sum += b_i[sorted_inds] # Add to iterative sums of squares. w_sum2 += np.abs(w_i[:, sorted_inds]) ** 2 e_sum2 += np.abs(e_i[sorted_inds]) ** 2 b_sum2 += np.abs(b_i[sorted_inds]) ** 2 # Bump up the number of successful trials # and reset the consecutive fails counter. num_successful_trials += 1 num_consecutive_fails = 0 # Trial did not converge, and we are throwing away bad bags. else: num_consecutive_fails += 1 if ( num_consecutive_fails == self._bag_warning and not runtime_warning_given ): msg = ( "{} many trials without convergence. " "Consider loosening the tol requirements " "of the variable projection routine." ) print(msg.format(num_consecutive_fails)) runtime_warning_given = True if num_consecutive_fails == self._bag_maxfail: msg = ( "Terminating the bagging routine due to " "{} many trials without convergence." ) raise RuntimeError(msg.format(num_consecutive_fails)) # Compute the BOP-DMD statistics. w_mu = w_sum / self._num_trials e_mu = e_sum / self._num_trials b_mu = b_sum / self._num_trials w_std = np.sqrt(np.abs(w_sum2 / self._num_trials - np.abs(w_mu) ** 2)) e_std = np.sqrt(np.abs(e_sum2 / self._num_trials - np.abs(e_mu) ** 2)) b_std = np.sqrt(np.abs(b_sum2 / self._num_trials - np.abs(b_mu) ** 2)) # Save the BOP-DMD statistics. self._modes = w_mu self._eigenvalues = e_mu self._modes_std = w_std self._eigenvalues_std = e_std self._amplitudes_std = b_std # Compute Atilde using the average optimized dmd results. w_proj = self._proj_basis.conj() self._Atilde = np.linalg.multi_dot( [w_proj, np.diag(self._eigenvalues), np.linalg.pinv(w_proj)] ) # Compute A if requested. if self._compute_A: self._A = np.linalg.multi_dot( [ self._modes, np.diag(self._eigenvalues), np.linalg.pinv(self._modes), ] ) return b_mu
[docs]class BOPDMD(DMDBase): """ Bagging, Optimized Dynamic Mode Decomposition. :param svd_rank: The rank for the truncation; If 0, the method computes the optimal rank and uses it for truncation; if positive integer, the method uses the argument for the truncation; if float between 0 and 1, the rank is the number of the biggest singular values that are needed to reach the 'energy' specified by `svd_rank`; if -1, the method does not compute truncation. :type svd_rank: int or float :param compute_A: Flag that determines whether or not to compute the full Koopman operator A. Default is False, do not compute the full operator. Note that the full operator is potentially prohibitively expensive to compute. :type compute_A: bool :param use_proj: Flag that determines the type of computation to perform. If True, fit input data projected onto the first svd_rank POD modes or columns of proj_basis if provided. If False, fit the full input data. Default is True, fit projected data. :type use_proj: bool :param init_alpha: Initial guess for the continuous-time DMD eigenvalues. If not provided, one is computed via a trapezoidal rule approximation. Default is None (alpha not provided). :type init_alpha: numpy.ndarray :param proj_basis: Orthogonal basis for projection, where each column of proj_basis contains a basis mode. If not provided, POD modes are used. Default is None (basis not provided). :type proj_basis: numpy.ndarray :param num_trials: Number of BOP-DMD trials to perform. If num_trials is a positive integer, num_trials BOP-DMD trials are performed. Otherwise, standard optimized dmd is performed. Default is 0. :type num_trials: int :param trial_size: Size of the randomly selected subset of observations to use for each trial of bagged optimized dmd (BOP-DMD). If trial_size is a positive integer, trial_size many observations will be used per trial. If trial_size is a float between 0 and 1, int(trial_size * m) many observations will be used per trial, where m denotes the total number of data points observed. Note that any other type of input for trial_size will yield an error. Default is 0.6. :type trial_size: int or float :param eig_sort: Method used to sort eigenvalues (and modes accordingly) when performing BOP-DMD. Eigenvalues will be sorted by real part and then by imaginary part to break ties if `eig_sort="real"`, by imaginary part and then by real part to break ties if `eig_sort="imag"`, or by magnitude if `eig_sort="abs"`. If `eig_sort="auto"`, one of the previously-mentioned sorting methods is chosen depending on eigenvalue variance. Default is "auto". :type eig_sort: {"real", "imag", "abs", "auto"} :param eig_constraints: Set containing desired DMD operator eigenvalue constraints. Currently available constraints are `"stable"`, which constrains eigenvalues to the left half of the complex plane, `"imag"`, which constrains eigenvalues to the imaginary axis, and `"conjugate_pairs"`, which enforces that eigenvalues are always present with their complex conjugate. Note that constraints may be combined if valid. May alternatively be a custom eigenvalue constraint function that will be applied to the computed eigenvalues at each step of the variable projection routine. :type eig_constraints: set(str) or function :param mode_prox: Optional proximal operator function to apply to the DMD modes. If `use_proj` is False, this function is applied at every iteration of the variable projection routine. If `use_proj` is True, this function is instead applied at the end of the variable projection routine after the modes have been projected back to the space of the full input data. :type mode_prox: function :param remove_bad_bags: Whether or not to exclude results from bagging trials that didn't converge according to the tolerance used for variable projection. Default is False, all trial results are kept regardless of convergence. :type remove_bad_bags: bool :param bag_warning: Number of consecutive non-converged trials of BOP-DMD at which to produce a warning message for the user. Default is 100. This parameter becomes active only when `remove_bad_bags=True`. Use negative arguments for no warning condition. :type bag_warning: int :param bag_maxfail: Number of consecutive non-converged trials of BOP-DMD at which to terminate the fit. Default is 200. This parameter becomes active only when `remove_bad_bags=True`. Use negative arguments for no stopping condition. :type bag_maxfail: int :param varpro_opts_dict: Dictionary containing the desired parameter values for variable projection. The following parameters may be specified: `init_lambda`, `maxlam`, `lamup`, `use_levmarq`, `maxiter`, `tol`, `eps_stall`, `use_fulljac`, `verbose`. Default values will be used for any parameters not specified in `varpro_opts_dict`. See `BOPDMDOperator` documentation for default values and descriptions for each parameter. :type varpro_opts_dict: dict """ def __init__( self, svd_rank=0, compute_A=False, use_proj=True, init_alpha=None, proj_basis=None, num_trials=0, trial_size=0.6, eig_sort="auto", eig_constraints=None, mode_prox=None, remove_bad_bags=False, bag_warning=100, bag_maxfail=200, varpro_opts_dict=None, ): self._svd_rank = svd_rank self._compute_A = compute_A self._use_proj = use_proj self._init_alpha = init_alpha self._proj_basis = proj_basis self._num_trials = num_trials self._trial_size = trial_size self._eig_sort = eig_sort if not isinstance(bag_warning, int) or not isinstance(bag_maxfail, int): msg = ( "bag_warning and bag_maxfail must be integers. " "Please use a negative integer if no warning " "or stopping condition is desired." ) raise TypeError(msg) self._remove_bad_bags = remove_bad_bags self._bag_warning = bag_warning self._bag_maxfail = bag_maxfail if varpro_opts_dict is None: self._varpro_opts_dict = {} elif not isinstance(varpro_opts_dict, dict): raise TypeError("varpro_opts_dict must be a dict.") else: self._varpro_opts_dict = varpro_opts_dict if eig_constraints is None: eig_constraints = set() elif not isinstance(eig_constraints, set) and not isfunction( eig_constraints ): raise TypeError("eig_constraints must be a set or a function.") self._check_eig_constraints(eig_constraints) self._eig_constraints = eig_constraints self._mode_prox = mode_prox self._snapshots_holder = None self._time = None self._Atilde = None self._modes_activation_bitmask_proxy = None @property def svd_rank(self): """ :return: the rank used for the svd truncation. :rtype: int or float """ return self._svd_rank @svd_rank.setter def svd_rank(self, value): self._svd_rank = value @property def compute_A(self): """ :return: flag that determines whether to compute the full operator A. :rtype: bool """ return self._compute_A @property def use_proj(self): """ :return: flag that determines whether to fit projected or full data. :rtype: bool """ return self._use_proj @property def init_alpha(self): """ :return: initial guess used for the continuous-time DMD eigenvalues. :rtype: numpy.ndarray """ if self._init_alpha is None: msg = ( "fit() hasn't been called " "and no initial value for alpha has been given." ) raise ValueError(msg) return self._init_alpha @init_alpha.setter def init_alpha(self, value): """Set a new initial eigenvalue guess. :param value: The new eigenvalue guess. """ self._init_alpha = value @property def proj_basis(self): """ :return: the projection basis used, with modes stored by column. :rtype: numpy.ndarray """ if self._proj_basis is None: msg = ( "fit() hasn't been called " "and no projection basis has been given." ) raise ValueError(msg) return self._proj_basis @proj_basis.setter def proj_basis(self, new_proj_basis): """Set a new projection basis. :param new_proj_basis: The new projection basis to assign. """ self._proj_basis = new_proj_basis @property def num_trials(self): """ :return: the number of BOP-DMD trials to perform. :rtype: int """ return self._num_trials @property def trial_size(self): """ :return: size of the data subsets used during each BOP-DMD trial. :rtype: int or float """ return self._trial_size @property def time(self): """ Get the vector that contains the time points of the fitted snapshots. :return: the vector that contains the original time points. :rtype: numpy.ndarray """ if not self.fitted: raise ValueError("You need to call fit() before.") return self._time @property def atilde(self): """ Get the reduced Koopman operator A, called Atilde. :return: the reduced Koopman operator A. :rtype: numpy.ndarray """ if not self.fitted: raise ValueError("You need to call fit() before.") return self.operator.as_numpy_array @property def A(self): """ Get the full Koopman operator A. :return: the full Koopman operator A. :rtype: numpy.ndarray """ if not self.fitted: raise ValueError("You need to call fit() before.") return self.operator.A @property def dynamics(self): """ Get the time evolution of each mode. :return: matrix that contains all the time evolution, stored by row. :rtype: numpy.ndarray """ t_omega = np.exp(np.outer(self.eigs, self.time)) return np.diag(self.amplitudes).dot(t_omega) @property def amplitudes_std(self): """ Get the amplitudes standard deviation. :return: amplitudes standard deviation. :rtype: numpy.ndarray """ if not self.fitted: raise ValueError("You need to call fit() before.") return self.operator.amplitudes_std @property def eigenvalues_std(self): """ Get the eigenvalues standard deviation. :return: eigenvalues standard deviation. :rtype: numpy.ndarray """ if not self.fitted: raise ValueError("You need to call fit() before.") return self.operator.eigenvalues_std @property def modes_std(self): """ Get the modes standard deviation. :return: modes standard deviation. :rtype: numpy.ndarray """ if not self.fitted: raise ValueError("You need to call fit() before.") return self.operator.modes_std @property def eig_constraints(self): """ Get the eigenvalue constraints. :return: eigenvalue constraints. :rtype: set(str) """ return self._eig_constraints
[docs] def print_varpro_opts(self): """ Prints a formatted information string that displays all chosen variable projection parameter values. """ if not self.fitted: raise ValueError("You need to call fit() before.") opt_names = [ "init_lambda", "maxlam", "lamup", "use_levmarq", "maxiter", "tol", "eps_stall", "use_fulljac", "verbose", ] print("VARIABLE PROJECTION OPTIONS:") print("============================") for name, value in zip(opt_names, self.operator.varpro_opts): if len(name) < 7: print(name + ":\t\t" + str(value)) else: print(name + ":\t" + str(value))
[docs] def _check_eig_constraints(self, eig_constraints): """ Function that verifies that... - if the given eig_constraints is a function, the function is able to take a (n,) numpy.ndarray and return a (n,) numpy.ndarray - if the given eig_constraints is a set, it does not contain an unsupported constraint class, and does not contain an invalid combination of eigenvalue constraints """ if isfunction(eig_constraints): if self._svd_rank >= 1 and isinstance(self._svd_rank, int): r = self._svd_rank else: # use a dummy rank of 10 r = 10 try: # Test the eig_constraints function on a (r,) np.ndarray. x_dummy = np.arange(r) y_dummy = eig_constraints(x_dummy) except Exception as e: msg = ( "eigenvalue constraint function must be able " "to take a single (n,) numpy.ndarray as input." ) raise ValueError(msg) from e if not isinstance(y_dummy, np.ndarray): msg = ( "eigenvalue constraint function must " "output a single (n,) numpy.ndarray." ) raise ValueError(msg) if x_dummy.shape != y_dummy.shape: msg = ( "eigenvalue constraint function must accept a (n,) " "numpy.ndarray as input and output a (n,) numpy.ndarray." ) raise ValueError(msg) else: valid_constraints = {"stable", "imag", "conjugate_pairs"} invalid_combos = [{"stable", "imag"}] if len(eig_constraints.difference(valid_constraints)) != 0: raise ValueError("Invalid eigenvalue constraint provided.") for invalid_combo_set in invalid_combos: if invalid_combo_set.issubset(eig_constraints): msg = "Invalid eigenvalue constraint combination provided." raise ValueError(msg)
[docs] def _initialize_alpha(self, s=None, V=None): """ Uses projected trapezoidal rule to approximate the eigenvalues of A in z' = Az. The computed eigenvalues will serve as our initial guess for alpha. If the singular values and right singular vectors are provided, they will be used to compute the projection. Otherwise, the snapshot data will be projected onto the projection basis. :param s: the singular values of the snapshot matrix. If provided, they will be used along with V to compute the projection. :type s: numpy.ndarray :param V: the right singular vectors of the snapshot matrix. If provided, they will be used along with s to compute the projection. :type V: numpy.ndarray :return: Approximated eigenvalues of the matrix A. :rtype: numpy.ndarray """ if s is not None and V is not None: # If the singular values and right singular vectors are provided, # use them to compute the projection. ux = np.diag(s).dot(V) else: # Project the snapshot data onto the projection basis. ux = self._proj_basis.conj() ux1 = ux[:, :-1] ux2 = ux[:, 1:] # Define the matrices Y and Z as the following and compute the # rank-truncated SVD of Y. Y = (ux1 + ux2) / 2 # Element-wise division by time differences. w/o large T Z = (ux2 - ux1) / (self._time[1:] - self._time[:-1]) U, s, V = compute_svd(Y, self._svd_rank) S = np.diag(s) # Compute the matrix Atilde and return its eigenvalues. Atilde = np.linalg.multi_dot([U.conj().T, Z, V, np.linalg.inv(S)]) return np.linalg.eig(Atilde)[0]
[docs] def fit(self, X, t): """ Compute the Optimized Dynamic Mode Decomposition. :param X: the input snapshots. :type X: numpy.ndarray or iterable :param t: the input time vector. :type t: numpy.ndarray or iterable """ # Process the input data and convert to numpy.ndarrays. self._reset() self._snapshots_holder = Snapshots(X) self._time = np.array(t).squeeze() # Check that input time vector is one-dimensional. if self._time.ndim > 1: raise ValueError("Input time vector t must be one-dimensional.") # Check that the number of snapshots in the data matrix X matches the # number of time points in the time vector t. if self.snapshots.shape[1] != len(self._time): msg = ( "The number of columns in the data matrix X must match " "the number of time points in the time vector t." ) raise ValueError(msg) # Compute the rank of the fit. self._svd_rank = int(compute_rank(self.snapshots, self._svd_rank)) # Set/check the projection basis. if self._proj_basis is None and self._use_proj: self._proj_basis = compute_svd(self.snapshots, self._svd_rank)[0] elif self._proj_basis is None and not self._use_proj: self._proj_basis = compute_svd(self.snapshots, -1)[0] elif ( not isinstance(self._proj_basis, np.ndarray) or self._proj_basis.ndim != 2 or self._proj_basis.shape[1] != self._svd_rank ): msg = "proj_basis must be a 2D np.ndarray with {} columns." raise ValueError(msg.format(self._svd_rank)) # Set/check the initial guess for the continuous-time DMD eigenvalues. if self._init_alpha is None: self._init_alpha = self._initialize_alpha() elif ( not isinstance(self._init_alpha, np.ndarray) or self._init_alpha.ndim > 1 or len(self._init_alpha) != self._svd_rank ): msg = "init_alpha must be a 1D np.ndarray with {} entries." raise ValueError(msg.format(self._svd_rank)) # Build the BOP-DMD operator now that the initial alpha and # the projection basis have been defined. self._Atilde = BOPDMDOperator( self._compute_A, self._use_proj, self._init_alpha, self._proj_basis, self._num_trials, self._trial_size, self._eig_sort, self._eig_constraints, self._mode_prox, self._remove_bad_bags, self._bag_warning, self._bag_maxfail, **self._varpro_opts_dict, ) # Define the snapshots that will be used for fitting. if self._use_proj: snp = self._proj_basis.conj() else: snp = self.snapshots # Fit the data. self._b = self.operator.compute_operator(snp.T, self._time) return self
[docs] def fit_econ(self, s, V, t): """ Compute the Optimized Dynamic Mode Decomposition using the SVD results of the snapshot matrix. Use this method if you have pre-computed the SVD and the original snapshot matrix is not avaialble or too large to store in memory. The left singular vectors (U) must be specified as the projection basis when initializing the BOPDMD object. :param s: the singular values. :type s: array_like :param V: matrix of right singular vectors, where each row is a different singular vector. :type V: numpy.ndarray :param t: the input time vector. :type t: numpy.ndarray """ self._reset() self._time = np.asarray(t).squeeze() s = np.asarray(s).squeeze() if self._proj_basis is None or not self._use_proj: msg = """ proj_basis must be provided when using fit_econ, and use_proj must be set to True. """ raise ValueError(msg) if ( not isinstance(self._proj_basis, np.ndarray) or self._proj_basis.ndim != 2 or self._proj_basis.shape[1] != self._svd_rank ): msg = ( "proj_basis must be a 2D np.ndarray with " f"{self._svd_rank} columns." ) raise ValueError(msg) if self._time.ndim > 1: msg = "Input time vector t must be one-dimensional." raise ValueError(msg) if s.ndim != 1 or len(s) != self._svd_rank: msg = f"s must be one-dimensional and of length {self._svd_rank}." raise ValueError(msg) if ( not isinstance(V, np.ndarray) or V.ndim != 2 or V.shape[0] != self._svd_rank or V.shape[1] != len(self._time) ): msg = ( "V must be a 2D numpy.ndarray with shape " f"({self._svd_rank}, {len(self._time)})." ) raise ValueError(msg) # Set/check the initial guess for the continuous-time DMD eigenvalues. if self._init_alpha is None: self._init_alpha = self._initialize_alpha(s=s, V=V) elif ( not isinstance(self._init_alpha, np.ndarray) or self._init_alpha.ndim > 1 or len(self._init_alpha) != self._svd_rank ): msg = "init_alpha must be a 1D np.ndarray with {} entries." raise ValueError(msg.format(self._svd_rank)) # Build the BOP-DMD operator now that the initial alpha and # the projection basis have been defined. self._Atilde = BOPDMDOperator( self._compute_A, self._use_proj, self._init_alpha, self._proj_basis, self._num_trials, self._trial_size, self._eig_sort, self._eig_constraints, self._mode_prox, self._remove_bad_bags, self._bag_warning, self._bag_maxfail, **self._varpro_opts_dict, ) # Define the snapshots that will be used for fitting. snp = np.diag(s).dot(V) # Fit the data. self._b = self.operator.compute_operator(snp.T, self._time) # Store the singular values self._singular_values = s return self
[docs] def forecast(self, t): """ Predict the output X given the input time t using the fitted DMD model. If model has been fitted using multiple ensembles, an average prediction and its variance is returned. :param t: the input time vector. :type t: numpy.ndarray or iterable :return: system prediction at times given by vector t. :rtype: numpy.ndarray or numpy.ndarray, numpy.ndarray """ # Process the input data and convert to numpy.ndarray. t = np.array(t).squeeze() # Reject the input time vector if it isn't one-dimensional. if t.ndim > 1: raise ValueError("Input time vector t must be one-dimensional.") # If variance information has been recorded, use it. if self.operator.eigenvalues_std is not None: # Compute num_trials many forecasts. all_x = np.empty( (self._num_trials, self.snapshots.shape[0], len(t)), dtype="complex", ) for k in range(self._num_trials): # Draw eigenvalues and amplitudes from random distribution. eigs_k = self.eigs + np.multiply( np.random.randn(*self.eigs.shape), self.operator.eigenvalues_std, ) b_k = self.amplitudes + np.multiply( np.random.randn(*self.amplitudes.shape), self.operator.amplitudes_std, ) # Compute forecast using average modes and eigs_k, b_k. all_x[k] = np.linalg.multi_dot( [self.modes, np.diag(b_k), np.exp(np.outer(eigs_k, t))] ) # Return the average forecast and the variance. return np.mean(all_x, axis=0), np.var(all_x, axis=0) # If no variance information, simply compute a standard forecast. x = np.linalg.multi_dot( [ self.modes, np.diag(self.amplitudes), np.exp(np.outer(self.eigs, t)), ] ) return x
[docs] def plot_mode_uq( self, *, x=None, y=None, d=1, modes_shape=None, order="C", cols=4, figsize=None, dpi=None, plot_modes=None, plot_conjugate_pairs=True, ): """ Plot BOP-DMD modes alongside their standard deviations. :param x: Points along the 1st spatial dimension where data has been collected. :type x: np.ndarray or iterable :param y: Points along the 2nd spatial dimension where data has been collected. This parameter is only applicable when the data snapshots are 2-D, which must be indicated with `modes_shape`. :type y: np.ndarray or iterable :param d: Number of delays applied to the data. If `d` is greater than 1, then each plotted mode will be the average mode taken across all `d` delays. :type d: int :param modes_shape: Shape of the modes. If not provided, the shape is assumed to be the flattened space dim of the snapshot data. Provide as width, height dimension. :type modes_shape: iterable :param order: Read the elements of snapshots using this index order, and place the elements into the reshaped array using this index order. It has to be the same used to store the snapshots. :type order: {"C", "F", "A"} :param cols: Number of columns to use for the subplot grid. :type cols: int :param figsize: Width, height in inches. :type figsize: iterable :param dpi: Figure resolution. :type dpi: int :param plot_modes: Number of leading modes to plot, or the indices of the modes to plot. If `None`, then all available modes are plotted. Note that if this parameter is given as a list of indices, it will override the `plot_complex_pair` parameter. :type plot_modes: int or iterable :param plot_conjugate_pairs: Whether or not to omit one of the modes that correspond with a complex conjugate pair of eigenvalues. :type plot_conjugate_pairs: bool """ if self.modes_std is None: raise ValueError("No UQ metrics to plot.") # Get the indices of the modes to plot. nd, r = self.modes.shape if plot_modes is None or isinstance(plot_modes, int): mode_indices = np.arange(r) if not plot_conjugate_pairs: if r % 2 == 0: mode_indices = mode_indices[::2] else: mode_indices = np.concatenate([(0,), mode_indices[1::2]]) if isinstance(plot_modes, int): mode_indices = mode_indices[:plot_modes] else: mode_indices = plot_modes plot_conjugate_pairs = True # By default, modes_shape is the flattened space dimension. if modes_shape is None: modes_shape = (len(self.snapshots) // d,) # Order the modes and their standard deviations. mode_order = np.argsort(-np.abs(self.amplitudes)) modes = self.modes[:, mode_order] modes_std = self.modes_std[:, mode_order] # Build the spatial grid for the mode plots. if x is None: x = np.arange(modes_shape[0]) if len(modes_shape) == 2: if y is None: y = np.arange(modes_shape[1]) xgrid, ygrid = np.meshgrid(x, y) # Collapse the results across time-delays. if d > 1: modes = np.average(modes.reshape(d, nd // d, r), axis=0) modes_std = np.average(modes_std.reshape(d, nd // d, r), axis=0) # Define the subplot grid. # Compute the number of subplot rows given the number of columns. rows = 2 * int(np.ceil(len(mode_indices) / cols)) # Compute a grid of all subplot indices. all_inds = np.arange(rows * cols).reshape(rows, cols) # Get the subplot indices at which the mode averages will be plotted. # Mode averages are plotted on the 1st, 3rd, 5th, ... rows of the plot. avg_inds = all_inds[::2].flatten() # Get the subplot indices at which the mode stds will be plotted. # Mode stds are plotted on the 2nd, 4th, 6th, ... rows of the plot. std_inds = all_inds[1::2].flatten() plt.figure(figsize=figsize, dpi=dpi) for i, idx in enumerate(mode_indices): mode = modes[:, idx] mode_std = modes_std[:, idx] # Plot the average mode. plt.subplot(rows, cols, avg_inds[i] + 1) if plot_conjugate_pairs or (r % 2 == 1 and i == 0): plt.title(f"Mode {idx + 1}") else: plt.title(f"Modes {idx + 1},{idx + 2}") if len(modes_shape) == 1: # Plot modes in 1-D. plt.plot(x, mode.real, c="tab:blue") else: # Plot modes in 2-D. plt.pcolormesh( xgrid, ygrid, mode.reshape(xgrid.shape, order=order).real, cmap="viridis", ) plt.colorbar() # Plot the mode standard deviation. plt.subplot(rows, cols, std_inds[i] + 1) plt.title("Mode Standard Deviation") if len(modes_shape) == 1: # Plot modes in 1-D. plt.plot(x, mode_std, c="tab:red") else: # Plot modes in 2-D. plt.pcolormesh( xgrid, ygrid, mode_std.reshape(xgrid.shape, order=order), cmap="inferno", ) plt.colorbar() plt.suptitle("DMD Modes") plt.tight_layout()
[docs] def plot_eig_uq( self, eigs_true=None, xlim=None, ylim=None, figsize=None, dpi=None, flip_axes=False, draw_axes=False, ): """ Plot BOP-DMD eigenvalues against 1 and 2 standard deviations. :param eigs_true: True continuous-time eigenvalues, if known. :type eigs_true: np.ndarray or iterable :param xlim: Desired limits for the x-axis. :type xlim: iterable :param ylim: Desired limits for the y-axis. :type ylim: iterable :param figsize: Width, height in inches. :type figsize: iterable :param dpi: Figure resolution. :type dpi: int :param flip_axes: Whether or not to swap the real and imaginary axes on the eigenvalue plot. If `True`, the real axis will be vertical and the imaginary axis will be horizontal. :type flip_axes: bool :param draw_axes: Whether or not to draw the real and imaginary axes. :type draw_axes: bool """ if self.eigenvalues_std is None: raise ValueError("No UQ metrics to plot.") if eigs_true is not None: eigs_true = np.array(eigs_true) fig, ax = plt.subplots(figsize=figsize, dpi=dpi) plt.title("DMD Eigenvalues") if draw_axes: ax.axhline(y=0, c="k", lw=1) ax.axvline(x=0, c="k", lw=1) if flip_axes: eigs = self.eigs.imag + 1j * self.eigs.real plt.xlabel(r"$Im(\omega)$") plt.ylabel(r"$Re(\omega)$") if eigs_true is not None: eigs_true = eigs_true.imag + 1j * eigs_true.real else: eigs = self.eigs plt.xlabel(r"$Re(\omega)$") plt.ylabel(r"$Im(\omega)$") for e, std in zip(eigs, self.eigenvalues_std): # Plot 2 standard deviations. c_1 = plt.Circle((e.real, e.imag), 2 * std, color="b", alpha=0.2) ax.add_patch(c_1) # Plot 1 standard deviation. c_2 = plt.Circle((e.real, e.imag), std, color="b", alpha=0.5) ax.add_patch(c_2) # Plot the average eigenvalues. ax.plot(eigs.real, eigs.imag, "o", c="b", label="BOP-DMD") # Plot the true eigenvalues if given. if eigs_true is not None: ax.plot(eigs_true.real, eigs_true.imag, "x", c="k", label="Truth") if xlim is not None: ax.set_xlim(xlim) if ylim is not None: ax.set_ylim(ylim) plt.legend()