Source code for pydmd.subspacedmd

"""
Derived module from dmdbase.py for Subspace DMD.

Reference:
Takeishi, Naoya, Yoshinobu Kawahara, and Takehisa Yairi. "Subspace dynamic mode
decomposition for stochastic Koopman analysis." Physical Review E 96.3 (2017):
033310.
"""

import numpy as np

from .dmdbase import DMDBase
from .dmdoperator import DMDOperator
from .snapshots import Snapshots


def reducedsvd(X, r=None):
    """
    Computes the reduced SVD of `X` taking only the first `r` modes.

    :param np.ndarray X: Matrix to be used for SVD.
    :param int r: Number of modes to be retained, if `None` the rank of `X` is
        used instead.
    :rtype: tuple
    :return: Left singular vectors, singular values and right singular vectors.
    """

    U, s, V = np.linalg.svd(X, full_matrices=True)
    if r is None:
        r = np.linalg.matrix_rank(X)
    return U[:, :r], s[:r], V.conj().T[:, :r]


class SubspaceDMDOperator(DMDOperator):
    """
    Subspace Dynamic Mode Decomposition operator class.

    :param svd_rank: the rank for the truncation; if -1 all the columns of
        :math:`U_q` are used, if `svd_rank` is an integer grater than zero
        it is used as the number of columns retained from  :math:`U_q`.
        `svd_rank=0` or float values are not supported.
    :type svd_rank: int
    :param rescale_mode: Scale Atilde as shown in
        10.1016/j.jneumeth.2015.10.010 (section 2.4) before computing its
        eigendecomposition. None means no rescaling, 'auto' means automatic
        rescaling using singular values, otherwise the scaling factors.
    :type rescale_mode: {'auto'} or None or numpy.ndarray
    :param sorted_eigs: Sort eigenvalues (and modes/dynamics accordingly) by
        magnitude if `sorted_eigs='abs'`, by real part (and then by imaginary
        part to break ties) if `sorted_eigs='real'`. Default: False.
    :type sorted_eigs: {'real', 'abs'} or False
    """

    def __init__(self, svd_rank, rescale_mode, sorted_eigs):
        super().__init__(
            svd_rank=svd_rank,
            exact=True,
            forward_backward=False,
            rescale_mode=rescale_mode,
            sorted_eigs=sorted_eigs,
            tikhonov_regularization=None,
        )

        self._Atilde = None
        self._modes = None
        self._Lambda = None

    def compute_operator(self, Yp, Yf):
        """
        Compute the low-rank operator.

        Computes also modes, eigenvalues and DMD amplitudes.

        :param numpy.ndarray Yp: Matrix `Yp` as defined in the original paper.
        :param numpy.ndarray Yp: Matrix `Yf` as defined in the original paper.
        """

        n = Yp.shape[0] // 2

        _, _, Vp = reducedsvd(Yp)
        O = Yf.dot(Vp).dot(Vp.T.conj())

        Uq, _, _ = reducedsvd(O)

        r = min(np.linalg.matrix_rank(O), n)
        if self._svd_rank > 0:
            r = min(r, self._svd_rank)

        Uq1, Uq2 = Uq[:n, :r], Uq[n:, :r]

        U, S, V = reducedsvd(Uq1)

        self._Atilde = self._least_square_operator(U, S, V, Uq2)
        self._compute_eigenquantities()

        M = Uq2.dot(V) * np.reciprocal(S)
        self._compute_modes(M)

    def _compute_modes(self, M):
        """
        Private method that computes eigenvalues and eigenvectors of the
        high-dimensional operator (stored in self.modes and self.Lambda).

        :param numpy.ndarray M: Matrix `M` as defined in the original paper.
        """

        if self._rescale_mode is None:
            W = self.eigenvectors
        else:
            # compute W as shown in arXiv:1409.5496 (section 2.4)
            factors_sqrt = np.diag(np.power(self._rescale_mode, 0.5))
            W = factors_sqrt.dot(self.eigenvectors)

        # compute the eigenvectors of the high-dimensional operator
        high_dimensional_eigenvectors = M.dot(W) * np.reciprocal(
            self.eigenvalues
        )

        # eigenvalues are the same of lowrank
        high_dimensional_eigenvalues = self.eigenvalues

        self._modes = high_dimensional_eigenvectors
        self._Lambda = high_dimensional_eigenvalues


[docs]class SubspaceDMD(DMDBase): """ Subspace Dynamic Mode Decomposition :param svd_rank: the rank for the truncation; if -1 all the columns of :math:`U_q` are used, if `svd_rank` is an integer grater than zero it is used as the number of columns retained from :math:`U_q`. `svd_rank=0` or float values are not supported. :type svd_rank: int :param opt: argument to control the computation of DMD modes amplitudes. See :class:`DMDBase`. Default is False. :type opt: bool or int :param rescale_mode: Scale Atilde as shown in 10.1016/j.jneumeth.2015.10.010 (section 2.4) before computing its eigendecomposition. None means no rescaling, 'auto' means automatic rescaling using singular values, otherwise the scaling factors. :type rescale_mode: {'auto'} or None or numpy.ndarray :param sorted_eigs: Sort eigenvalues (and modes/dynamics accordingly) by magnitude if `sorted_eigs='abs'`, by real part (and then by imaginary part to break ties) if `sorted_eigs='real'`. Default: False. :type sorted_eigs: {'real', 'abs'} or False """ def __init__( self, svd_rank=-1, opt=False, rescale_mode=None, sorted_eigs=False, ): self._tlsq_rank = 0 self._original_time = None self._dmd_time = None self._opt = opt self._b = None self._snapshots_holder = None self._modes_activation_bitmask_proxy = None self._Atilde = SubspaceDMDOperator( svd_rank=svd_rank, rescale_mode=rescale_mode, sorted_eigs=sorted_eigs, )
[docs] def fit(self, X): """ Compute the Subspace Dynamic Modes Decomposition to the input data. :param X: the input snapshots. :type X: numpy.ndarray or iterable """ self._reset() self._snapshots_holder = Snapshots(X) n_samples = self.snapshots.shape[1] Y0 = self.snapshots[:, :-3] Y1 = self.snapshots[:, 1:-2] Y2 = self.snapshots[:, 2:-1] Y3 = self.snapshots[:, 3:] Yp = np.vstack((Y0, Y1)) Yf = np.vstack((Y2, Y3)) self.operator.compute_operator(Yp, Yf) # Default timesteps self._set_initial_time_dictionary( {"t0": 0, "tend": n_samples - 1, "dt": 1} ) self._b = self._compute_amplitudes() return self