Source code for pydmd.dmdoperator

import logging

import numpy as np
from scipy.linalg import sqrtm

from .utils import compute_svd

logging.basicConfig(
    format="[%(filename)s:%(lineno)s - %(funcName)20s() ] %(message)s"
)


[docs]class DMDOperator: """ Dynamic Mode Decomposition standard operator class. Non-standard ways of computing the low-rank Atilde operator should be coded into subclasses of this class. :param svd_rank: the rank for the truncation; If 0, the method computes the optimal rank and uses it for truncation; if positive interger, the method uses the argument for the truncation; if float between 0 and 1, the rank is the number of the biggest singular values that are needed to reach the 'energy' specified by `svd_rank`; if -1, the method does not compute truncation. :type svd_rank: int or float :param bool exact: flag to compute either exact DMD or projected DMD. Default is False. :param rescale_mode: Scale Atilde as shown in 10.1016/j.jneumeth.2015.10.010 (section 2.4) before computing its eigendecomposition. None means no rescaling, 'auto' means automatic rescaling using singular values, otherwise the scaling factors. :type rescale_mode: {'auto'} or None or numpy.ndarray :param bool forward_backward: If True, the low-rank operator is computed like in fbDMD (reference: https://arxiv.org/abs/1507.02264). Default is False. :param sorted_eigs: Sort eigenvalues (and modes/dynamics accordingly) by magnitude if `sorted_eigs='abs'`, by real part (and then by imaginary part to break ties) if `sorted_eigs='real'`. Default: False. :type sorted_eigs: {'real', 'abs'} or False :param tikhonov_regularization: Tikhonov parameter for the regularization. If `None`, no regularization is applied, if `float`, it is used as the :math:`\lambda` tikhonov parameter. :type tikhonov_regularization: int or float """ def __init__( self, svd_rank, exact, forward_backward, rescale_mode, sorted_eigs, tikhonov_regularization, ): self._exact = exact self._rescale_mode = rescale_mode self._svd_rank = svd_rank self._forward_backward = forward_backward self._sorted_eigs = sorted_eigs self._tikhonov_regularization = tikhonov_regularization self._norm_X = None
[docs] def compute_operator(self, X, Y): """ Compute the low-rank operator. :param numpy.ndarray X: matrix containing the snapshots x0,..x{n-1} by column. :param numpy.ndarray Y: matrix containing the snapshots x1,..x{n} by column. :return: the (truncated) left-singular vectors matrix, the (truncated) singular values array, the (truncated) right-singular vectors matrix of X. :rtype: numpy.ndarray, numpy.ndarray, numpy.ndarray """ U, s, V = compute_svd(X, self._svd_rank) if self._tikhonov_regularization is not None: self._norm_X = np.linalg.norm(X) atilde = self._least_square_operator(U, s, V, Y) if self._forward_backward: # b stands for "backward" bU, bs, bV = compute_svd(Y, svd_rank=len(s)) atilde_back = self._least_square_operator(bU, bs, bV, X) atilde = sqrtm(atilde.dot(np.linalg.inv(atilde_back))) if hasattr(np, "complex256") and atilde.dtype == np.complex256: atilde = atilde.astype(np.complex128) msg = "Casting atilde from np.complex256 to np.complex128" logging.info(msg) if isinstance(self._rescale_mode, str) and self._rescale_mode == "auto": self._rescale_mode = s self._Atilde = atilde self._compute_eigenquantities() self._compute_modes(Y, U, s, V) return U, s, V
@property def shape(self): """Shape of the operator""" return self.as_numpy_array.shape def __call__(self, snapshot_lowrank_modal_coefficients): """ Apply the low-rank operator to a vector of the modal coefficients of a snapshot(s). :param numpy.ndarray snapshot_lowrank_modal_coefficients: low-rank representation (in modal coefficients) of a snapshot x{n}. :return: low-rank representation (in modal coefficients) of x{n+1}. :rtype: numpy.ndarray """ return self._Atilde.dot(snapshot_lowrank_modal_coefficients) @property def eigenvalues(self): if not hasattr(self, "_eigenvalues"): raise ValueError("You need to call fit before") return self._eigenvalues @property def eigenvectors(self): if not hasattr(self, "_eigenvectors"): raise ValueError("You need to call fit before") return self._eigenvectors @property def modes(self): if not hasattr(self, "_modes"): raise ValueError("You need to call fit before") return self._modes @property def Lambda(self): if not hasattr(self, "_Lambda"): raise ValueError("You need to call fit before") return self._Lambda @property def as_numpy_array(self): if not hasattr(self, "_Atilde") or self._Atilde is None: raise ValueError("You need to call fit before") else: return self._Atilde
[docs] def _least_square_operator(self, U, s, V, Y): """ Private method that computes the lowrank operator from the singular value decomposition of matrix X and the matrix Y. .. math:: \\mathbf{\\tilde{A}} = \\mathbf{U}^* \\mathbf{Y} \\mathbf{X}^\\dagger \\mathbf{U} = \\mathbf{U}^* \\mathbf{Y} \\mathbf{V} \\mathbf{S}^{-1} :param numpy.ndarray U: 2D matrix that contains the left-singular vectors of X, stored by column. :param numpy.ndarray s: 1D array that contains the singular values of X. :param numpy.ndarray V: 2D matrix that contains the right-singular vectors of X, stored by row. :param numpy.ndarray Y: input matrix Y. :return: the lowrank operator :rtype: numpy.ndarray """ if self._tikhonov_regularization is not None: s = ( s**2 + self._tikhonov_regularization * self._norm_X ) * np.reciprocal(s) return np.linalg.multi_dot([U.T.conj(), Y, V]) * np.reciprocal(s)
[docs] def _compute_eigenquantities(self): """ Private method that computes eigenvalues and eigenvectors of the low-dimensional operator, scaled according to self._rescale_mode. """ if self._rescale_mode is None: # scaling isn't required Ahat = self._Atilde elif isinstance(self._rescale_mode, np.ndarray): if len(self._rescale_mode) != self.as_numpy_array.shape[0]: raise ValueError( """Scaling by an invalid number of coefficients""" ) scaling_factors_array = self._rescale_mode factors_inv_sqrt = np.diag(np.power(scaling_factors_array, -0.5)) factors_sqrt = np.diag(np.power(scaling_factors_array, 0.5)) # if an index is 0, we get inf when taking the reciprocal for idx, item in enumerate(scaling_factors_array): if item == 0: factors_inv_sqrt[idx] = 0 Ahat = np.linalg.multi_dot( [factors_inv_sqrt, self.as_numpy_array, factors_sqrt] ) else: raise ValueError( "Invalid value for rescale_mode: {} of type {}".format( self._rescale_mode, type(self._rescale_mode) ) ) self._eigenvalues, self._eigenvectors = np.linalg.eig(Ahat) if self._sorted_eigs is not False and self._sorted_eigs is not None: if self._sorted_eigs == "abs": def k(tp): return abs(tp[0]) elif self._sorted_eigs == "real": def k(tp): eig = tp[0] if isinstance(eig, complex): return (eig.real, eig.imag) return (eig.real, 0) else: raise ValueError( "Invalid value for sorted_eigs: {}".format( self._sorted_eigs ) ) # each column is an eigenvector, therefore we take the # transpose to associate each row (former column) to an # eigenvalue before sorting a, b = zip( *sorted(zip(self._eigenvalues, self._eigenvectors.T), key=k) ) self._eigenvalues = np.array([eig for eig in a]) # we restore the original condition (eigenvectors in columns) self._eigenvectors = np.array([vec for vec in b]).T
[docs] def _compute_modes(self, Y, U, Sigma, V): """ Private method that computes eigenvalues and eigenvectors of the high-dimensional operator (stored in self.modes and self.Lambda). :param numpy.ndarray Y: matrix containing the snapshots x1,..x{n} by column. :param numpy.ndarray U: (truncated) left singular vectors of X :param numpy.ndarray Sigma: (truncated) singular values of X :param numpy.ndarray V: (truncated) right singular vectors of X """ if self._rescale_mode is None: W = self.eigenvectors else: # compute W as shown in arXiv:1409.5496 (section 2.4) factors_sqrt = np.diag(np.power(self._rescale_mode, 0.5)) W = factors_sqrt.dot(self.eigenvectors) # compute the eigenvectors of the high-dimensional operator if self._exact: if self._tikhonov_regularization is not None: Sigma = ( Sigma**2 + self._tikhonov_regularization * self._norm_X ) * np.reciprocal(Sigma) high_dimensional_eigenvectors = ( Y.dot(V) * np.reciprocal(Sigma) ).dot(W) else: high_dimensional_eigenvectors = U.dot(W) # eigenvalues are the same of lowrank high_dimensional_eigenvalues = self.eigenvalues self._modes = high_dimensional_eigenvectors self._Lambda = high_dimensional_eigenvalues