Source code for pydmd.dmdbase

"""
Base module for the DMD: `fit` method must be implemented in inherited classes
"""

import pickle
from copy import copy, deepcopy

import numpy as np

from .dmdoperator import DMDOperator
from .utils import compute_svd


class ActivationBitmaskProxy:
    """
    A proxy which stands in the middle between a bitmask and an instance of
    :class:`DMDBase`. The proxy holds the original values of modes,
    eigenvalues and amplitudes, and exposes (via
    :func:`ActivationBitmaskProxy.modes`, :func:`ActivationBitmaskProxy.eigs`
    and :func:`ActivationBitmaskProxy.amplitudes`) the proxied (i.e. filtered)
    those quantities, depending on the current value of the
    bitmask (see also :func:`ActivationBitmaskProxy.change_bitmask`).

    This machinery is needed in order to allow for the modification of the
    matrices containing modes, amplitudes and eigenvalues after the indexing
    provided by the bitmask. Since double indexing in NumPy does not deliver a
    modifiable view of the original array, we need to propagate any change
    on the selection to the original matrices at some point: we decided to
    propagate the changes just before a change in the bitmask, namely in the
    last available moment before losing the information provided by the ``old''
    bitmask.

    :param dmd_operator: DMD operator to be proxied.
    :type dmd_operator: DMDOperator
    :param amplitudes: DMD amplitudes.
    :type amplitudes: np.ndarray
    """

    def __init__(self, dmd_operator, amplitudes):
        self._original_modes = dmd_operator.modes
        self._original_eigs = np.atleast_1d(dmd_operator.eigenvalues)
        self._original_amplitudes = np.atleast_1d(amplitudes)

        self.old_bitmask = None
        self.change_bitmask(np.full(len(dmd_operator.eigenvalues), True))

    def change_bitmask(self, value):
        """
        Change the bitmask which regulates this proxy.

        Before changing the bitmask this method reflects any change performed
        on the proxied quantities provided by this proxy to the original values
        of the quantities.

        :param value: New value of the bitmask, represented by an array of
            `bool` whose size is the same of the number of DMD modes.
        :type value: np.ndarray
        """

        # apply changes made on the proxied values to the original values
        if self.old_bitmask is not None:
            self._original_modes[:, self.old_bitmask] = self.modes
            self._original_eigs[self.old_bitmask] = self.eigs
            self._original_amplitudes[self.old_bitmask] = self.amplitudes

        self._modes = np.array(self._original_modes)[:, value]
        self._eigs = np.array(self._original_eigs)[value]
        self._amplitudes = np.array(self._original_amplitudes)[value]

        self.old_bitmask = value

    @property
    def modes(self):
        """
        Proxied (i.e. filtered according to the bitmask) view on the matrix
        of DMD modes.

        :return: A matrix containing the selected DMD modes.
        :rtype: np.ndarray
        """
        return self._modes

    @property
    def eigs(self):
        """
        Proxied (i.e. filtered according to the bitmask) view on the array
        of DMD eigenvalues.

        :return: An array containing the selected DMD eigenvalues.
        :rtype: np.ndarray
        """
        return self._eigs

    @property
    def amplitudes(self):
        """
        Proxied (i.e. filtered according to the bitmask) view on the array
        of DMD amplitudes.

        :return: An array containing the selected DMD amplitudes.
        :rtype: np.ndarray
        """
        return self._amplitudes


[docs]class DMDBase: """ Dynamic Mode Decomposition base class. :param svd_rank: the rank for the truncation; If 0, the method computes the optimal rank and uses it for truncation; if positive interger, the method uses the argument for the truncation; if float between 0 and 1, the rank is the number of the biggest singular values that are needed to reach the 'energy' specified by `svd_rank`; if -1, the method does not compute truncation. :type svd_rank: int or float :param int tlsq_rank: rank truncation computing Total Least Square. Default is 0, that means no truncation. :param bool exact: flag to compute either exact DMD or projected DMD. Default is False. :param opt: If True, amplitudes are computed like in optimized DMD (see :func:`~dmdbase.DMDBase._compute_amplitudes` for reference). If False, amplitudes are computed following the standard algorithm. If `opt` is an integer, it is used as the (temporal) index of the snapshot used to compute DMD modes amplitudes (following the standard algorithm). The reconstruction will generally be better in time instants near the chosen snapshot; however increasing `opt` may lead to wrong results when the system presents small eigenvalues. For this reason a manual selection of the number of eigenvalues considered for the analyisis may be needed (check `svd_rank`). Also setting `svd_rank` to a value between 0 and 1 may give better results. Default is False. :type opt: bool or int :param rescale_mode: Scale Atilde as shown in 10.1016/j.jneumeth.2015.10.010 (section 2.4) before computing its eigendecomposition. None means no rescaling, 'auto' means automatic rescaling using singular values, otherwise the scaling factors. :type rescale_mode: {'auto'} or None or numpy.ndarray :param bool forward_backward: If True, the low-rank operator is computed like in fbDMD (reference: https://arxiv.org/abs/1507.02264). Default is False. :param sorted_eigs: Sort eigenvalues (and modes/dynamics accordingly) by magnitude if `sorted_eigs='abs'`, by real part (and then by imaginary part to break ties) if `sorted_eigs='real'`. Default: False. :type sorted_eigs: {'real', 'abs'} or False :param tikhonov_regularization: Tikhonov parameter for the regularization. If `None`, no regularization is applied, if `float`, it is used as the :math:`\\lambda` tikhonov parameter. :type tikhonov_regularization: int or float :cvar dict original_time: dictionary that contains information about the time window where the system is sampled: - `t0` is the time of the first input snapshot; - `tend` is the time of the last input snapshot; - `dt` is the delta time between the snapshots. :cvar dict dmd_time: dictionary that contains information about the time window where the system is reconstructed: - `t0` is the time of the first approximated solution; - `tend` is the time of the last approximated solution; - `dt` is the delta time between the approximated solutions. """ def __init__( self, svd_rank=0, tlsq_rank=0, exact=False, opt=False, rescale_mode=None, forward_backward=False, sorted_eigs=False, tikhonov_regularization=None, ): self._Atilde = DMDOperator( svd_rank=svd_rank, exact=exact, rescale_mode=rescale_mode, forward_backward=forward_backward, sorted_eigs=sorted_eigs, tikhonov_regularization=tikhonov_regularization, ) self._tlsq_rank = tlsq_rank self._original_time = None self._dmd_time = None self._opt = opt self._exact = exact self._b = None # amplitudes self._snapshots_holder = None self._snapshots_holder_y = None self._modes_activation_bitmask_proxy = None @property def dmd_timesteps(self): """ Get the timesteps of the reconstructed states. :return: the time intervals of the original snapshots. :rtype: numpy.ndarray """ return np.arange( self.dmd_time["t0"], self.dmd_time["tend"] + self.dmd_time["dt"], self.dmd_time["dt"], ) @property def original_timesteps(self): """ Get the timesteps of the original snapshot. :return: the time intervals of the original snapshots. :rtype: numpy.ndarray """ return np.arange( self.original_time["t0"], self.original_time["tend"] + self.original_time["dt"], self.original_time["dt"], ) @property def modes(self): """ Get the matrix containing the DMD modes, stored by column. :return: the matrix containing the DMD modes. :rtype: numpy.ndarray """ if self.fitted: if not self._modes_activation_bitmask_proxy: self._allocate_modes_bitmask_proxy() # if the value is still None, it means that we cannot create # the proxy at the moment if not self._modes_activation_bitmask_proxy: return self.operator.modes return self._modes_activation_bitmask_proxy.modes @property def operator(self): """ Get the instance of DMDOperator. :return: the instance of DMDOperator :rtype: DMDOperator """ return self._Atilde @property def eigs(self): """ Get the eigenvalues of A tilde. :return: the eigenvalues from the eigendecomposition of `atilde`. :rtype: numpy.ndarray """ if self.fitted: if not self._modes_activation_bitmask_proxy: self._allocate_modes_bitmask_proxy() # if the value is still None, it means that we cannot create # the proxy at the moment if not self._modes_activation_bitmask_proxy: return self.operator.eigenvalues return self._modes_activation_bitmask_proxy.eigs @property def dynamics(self): """ Get the time evolution of each mode. .. math:: \\mathbf{x}(t) \\approx \\sum_{k=1}^{r} \\boldsymbol{\\phi}_{k} \\exp \\left( \\omega_{k} t \\right) b_{k} = \\sum_{k=1}^{r} \\boldsymbol{\\phi}_{k} \\left( \\lambda_{k} \\right)^{\\left( t / \\Delta t \\right)} b_{k} :return: the matrix that contains all the time evolution, stored by row. :rtype: numpy.ndarray """ temp = np.repeat( self.eigs[:, None], self.dmd_timesteps.shape[0], axis=1 ) tpow = ( self.dmd_timesteps - self.original_time["t0"] ) / self.original_time["dt"] # The new formula is x_(k+j) = \Phi \Lambda^k \Phi^(-1) x_j. # Since j is fixed, for a given snapshot "u" we have the following # formula: # x_u = \Phi \Lambda^{u-j} \Phi^(-1) x_j # Therefore tpow must be scaled appropriately. tpow = self._translate_eigs_exponent(tpow) return np.power(temp, tpow) * self.amplitudes[:, None]
[docs] def _translate_eigs_exponent(self, tpow): """ Transforms the exponent of the eigenvalues in the dynamics formula according to the selected value of `self._opt` (check the documentation for `opt` in :func:`__init__ <dmdbase.DMDBase.__init__>`). :param tpow: the exponent(s) of Sigma in the original DMD formula. :type tpow: int or np.ndarray :return: the exponent(s) adjusted according to `self._opt` :rtype: int or np.ndarray """ if isinstance(self._opt, bool): amplitudes_snapshot_index = 0 else: amplitudes_snapshot_index = self._opt if amplitudes_snapshot_index < 0: # we take care of negative indexes: -n becomes T - n return tpow - (self.snapshots.shape[1] + amplitudes_snapshot_index) else: return tpow - amplitudes_snapshot_index
@property def reconstructed_data(self): """ Get the reconstructed data. :return: the matrix that contains the reconstructed snapshots. :rtype: numpy.ndarray """ return self.modes.dot(self.dynamics) @property def snapshots(self): """ Get the input data (space flattened). :return: the matrix that contains the flattened snapshots. :rtype: numpy.ndarray """ if self._snapshots_holder: return self._snapshots_holder.snapshots return None @property def snapshots_y(self): """ Get the input left-hand side data (space flattened) if given. :return: matrix that contains the flattened left-hand side snapshots. :rtype: numpy.ndarray """ if self._snapshots_holder_y: return self._snapshots_holder_y.snapshots return None @property def snapshots_shape(self): """ Get the original input snapshot shape. :return: input snapshots shape. :rtype: tuple """ if self._snapshots_holder: return self._snapshots_holder.snapshots_shape return None @property def frequency(self): """ Get the amplitude spectrum. :return: the array that contains the frequencies of the eigenvalues. :rtype: numpy.ndarray """ return np.log(self.eigs).imag / (2 * np.pi * self.original_time["dt"]) @property def growth_rate(self): # To check """ Get the growth rate values relative to the modes. :return: the Floquet values :rtype: numpy.ndarray """ return self.eigs.real / self.original_time["dt"] @property def amplitudes(self): """ Get the coefficients that minimize the error between the original system and the reconstructed one. For futher information, see `dmdbase._compute_amplitudes`. :return: the array that contains the amplitudes coefficient. :rtype: numpy.ndarray """ if self.fitted: if not self._modes_activation_bitmask_proxy: self._allocate_modes_bitmask_proxy() return self._modes_activation_bitmask_proxy.amplitudes @property def fitted(self): """Check whether this DMD instance has been fitted. :return: `True` is the instance has been fitted, `False` otherwise. :rtype: bool """ try: return self.operator.modes is not None except (ValueError, AttributeError): return False @property def modes_activation_bitmask(self): """ Get the bitmask which controls which DMD modes are enabled at the moment in this DMD instance. The DMD instance must be fitted before this property becomes valid. After :func:`fit` is called, the defalt value of `modes_activation_bitmask` is an array of `True` values of the same shape of :func:`amplitudes`. The array returned is read-only (this allow us to react appropriately to changes in the bitmask). In order to modify the bitmask you need to set the field to a brand-new value (see example below). Example: .. code-block:: python >>> # this is an error >>> dmd.modes_activation_bitmask[[1,2]] = False ValueError: assignment destination is read-only >>> tmp = np.array(dmd.modes_activation_bitmask) >>> tmp[[1,2]] = False >>> dmd.modes_activation_bitmask = tmp :return: The DMD modes activation bitmask. :rtype: numpy.ndarray """ # check that the DMD was fitted if not self.fitted: raise RuntimeError("This DMD instance has not been fitted yet.") if not self._modes_activation_bitmask_proxy: self._allocate_modes_bitmask_proxy() bitmask = self._modes_activation_bitmask_proxy.old_bitmask # make sure that the array is immutable bitmask.flags.writeable = False return bitmask @modes_activation_bitmask.setter def modes_activation_bitmask(self, value): # check that the DMD was fitted if not self.fitted: raise RuntimeError("This DMD instance has not been fitted yet.") value = np.array(value) if value.dtype != bool: raise RuntimeError( "Unxpected dtype, expected bool, got {}.".format(value.dtype) ) # check that the shape is correct if value.shape != self.modes_activation_bitmask.shape: raise ValueError( "Expected shape {}, got {}".format( self.modes_activation_bitmask.shape, value.shape ) ) self._modes_activation_bitmask_proxy.change_bitmask(value)
[docs] def _allocate_modes_bitmask_proxy(self): """ Utility method which allocates the activation bitmask proxy using the quantities that are currently available in this DMD instance. Fails quietly if the amplitudes are not set. """ if hasattr(self, "_b") and self._b is not None: self._modes_activation_bitmask_proxy = ActivationBitmaskProxy( self.operator, self._b )
def __getitem__(self, key): """ Restrict the DMD modes used by this instance to a subset of indexes specified by keys. The value returned is a shallow copy of this DMD instance, with a different value in :func:`modes_activation_bitmask`. Therefore assignments to attributes are not reflected into the original instance. However the DMD instance returned should not be used for low-level manipulations on DMD modes, since the underlying DMD operator is shared with the original instance. For this reasons modifications to NumPy arrays may result in unwanted and unspecified situations which should be avoided in principle. :param key: An index (integer), slice or list of indexes. :type key: int or slice or list or np.ndarray :return: A shallow copy of this DMD instance having only a subset of DMD modes which are those indexed by `key`. :rtype: DMDBase """ if isinstance(key, (slice, int, list, np.ndarray)): filter_function = lambda x: isinstance(x, int) if isinstance(key, (list, np.ndarray)): if not all(map(filter_function, key)): raise ValueError( "Invalid argument type, expected a slice, an int, or " "a list of indexes." ) # no repeated elements if len(key) != len(set(key)): raise ValueError("Repeated indexes are not supported.") else: raise ValueError( "Invalid argument type, expected a slice, an int, or a list " "of indexes, got {}".format(type(key)) ) mask = np.full(self.modes_activation_bitmask.shape, False) mask[key] = True shallow_copy = copy(self) shallow_copy._allocate_modes_bitmask_proxy() shallow_copy.modes_activation_bitmask = mask return shallow_copy @property def original_time(self): """ A dictionary which contains information about the time window used to fit this DMD instance. Inside the dictionary: ====== ==================================================================================== Key Value ====== ==================================================================================== `t0` Time of the first input snapshot (0 by default). `tend` Time of the last input snapshot (usually corresponds to the number of snapshots). `dt` Timestep between two snapshots (1 by default). ====== ==================================================================================== :return: A dict which contains info about the input time frame. :rtype: dict """ if self._original_time is None: raise RuntimeError( """ _set_initial_time_dictionary() has not been called, did you call fit()?""" ) return self._original_time @property def dmd_time(self): """ A dictionary which contains information about the time window used to reconstruct/predict using this DMD instance. By default this is equal to :func:`original_time`. Inside the dictionary: ====== ==================================================================================== Key Value ====== ==================================================================================== `t0` Time of the first output snapshot. `tend` Time of the last output snapshot. `dt` Timestep between two snapshots. ====== ==================================================================================== :return: A dict which contains info about the input time frame. :rtype: dict """ if self._dmd_time is None: raise RuntimeError( """ _set_initial_time_dictionary() has not been called, did you call fit()?""" ) return self._dmd_time @dmd_time.setter def dmd_time(self, value): self._dmd_time = deepcopy(value)
[docs] def _set_initial_time_dictionary(self, time_dict): """ Set the initial values for the class fields `time_dict` and `original_time`. This is usually called in `fit()` and never again. :param time_dict: Initial time dictionary for this DMD instance. :type time_dict: dict """ if not ( "t0" in time_dict and "tend" in time_dict and "dt" in time_dict ): raise ValueError( 'time_dict must contain the keys "t0", "tend" and "dt".' ) if len(time_dict) > 3: raise ValueError( 'time_dict must contain only the keys "t0", "tend" and "dt".' ) self._original_time = DMDTimeDict(dict(time_dict)) self._dmd_time = DMDTimeDict(dict(time_dict))
[docs] def fit(self, X): """ Abstract method to fit the snapshots matrices. Not implemented, it has to be implemented in subclasses. """ name = self.__class__.__name__ msg = f"Subclass must implement abstract method {name}.fit" raise NotImplementedError(msg)
[docs] def _reset(self): """ Reset this instance. Should be called in :func:`fit`. """ self._modes_activation_bitmask_proxy = None self._b = None self._snapshots_holder = None self._snapshots_holder_y = None
[docs] def save(self, fname): """ Save the object to `fname` using the pickle module. :param str fname: the name of file where the reduced order model will be saved. Example: >>> from pydmd import DMD >>> dmd = DMD(...) # Construct here the rom >>> dmd.fit(...) >>> dmd.save('pydmd.dmd') """ with open(fname, "wb") as output: pickle.dump(self, output, pickle.HIGHEST_PROTOCOL)
[docs] @staticmethod def load(fname): """ Load the object from `fname` using the pickle module. :return: The `ReducedOrderModel` loaded Example: >>> from pydmd import DMD >>> dmd = DMD.load('pydmd.dmd') >>> print(dmd.reconstructed_data) """ with open(fname, "rb") as output: return pickle.load(output)
[docs] def _optimal_dmd_matrices(self): # compute the vandermonde matrix vander = np.vander(self.eigs, len(self.dmd_timesteps), True) P = np.multiply( np.dot(self.modes.conj().T, self.modes), np.conj(np.dot(vander, vander.conj().T)), ) if self._exact: q = np.conj( np.diag( np.linalg.multi_dot( [vander, self.snapshots.conj().T, self.modes] ) ) ) else: if self._snapshots_holder_y: X = self.snapshots else: X = self.snapshots[:, :-1] _, s, V = compute_svd(X, self.modes.shape[-1]) q = np.conj( np.diag( np.linalg.multi_dot( [ vander[:, : X.shape[1]], V, np.diag(s).conj(), self.operator.eigenvectors, ] ) ) ) return P, q
[docs] def _compute_amplitudes(self): """ Compute the amplitude coefficients. If `self._opt` is False the amplitudes are computed by minimizing the error between the modes and the first snapshot; if `self._opt` is True the amplitudes are computed by minimizing the error between the modes and all the snapshots, at the expense of bigger computational cost. This method uses the class variables self.snapshots (for the snapshots), self.modes and self.eigs. :return: the amplitudes array :rtype: numpy.ndarray References for optimal amplitudes: Jovanovic et al. 2014, Sparsity-promoting dynamic mode decomposition, https://hal-polytechnique.archives-ouvertes.fr/hal-00995141/document """ if isinstance(self._opt, bool) and self._opt: # b optimal a = np.linalg.solve(*self._optimal_dmd_matrices()) else: if isinstance(self._opt, bool): amplitudes_snapshot_index = 0 else: amplitudes_snapshot_index = self._opt a = np.linalg.lstsq( self.modes, self.snapshots.T[amplitudes_snapshot_index], rcond=None, )[0] return a
[docs] def _compare_data_shapes(self, y_snapshots): """ Method that ensures that the data inputs X and Y are the same shape if provided separately. Throws an error if the shapes do not agree. """ if self.snapshots.shape != y_snapshots.shape: msg = "X {} and Y {} input data must be the same shape." raise ValueError( msg.format(self.snapshots.shape, y_snapshots.shape) )
class DMDTimeDict(dict): def __setitem__(self, key, value): if key in ["t0", "tend", "dt"]: dict.__setitem__(self, key, value) else: raise KeyError( """DMDBase.dmd_time accepts only the following keys: "t0", "tend", "dt", {} is not allowed.""".format( key ) ) def __eq__(self, o): if isinstance(o, dict): return all(map(lambda s: o[s] == self[s], ["t0", "tend", "dt"])) return False