Source code for pydmd.dmdc

"""
Derived module from dmdbase.py for dmd with control.

Reference:
- Proctor, J.L., Brunton, S.L. and Kutz, J.N., 2016. Dynamic mode decomposition
with control. SIAM Journal on Applied Dynamical Systems, 15(1), pp.142-161.
"""

import numpy as np

from .dmdbase import DMDBase
from .dmdoperator import DMDOperator
from .snapshots import Snapshots
from .utils import compute_svd, compute_tlsq


class DMDControlOperator(DMDOperator):
    """
    DMD with control base operator. This should be subclassed in order to
    implement the appropriate features.

    :param svd_rank: the rank for the truncation; If 0, the method computes the
        optimal rank and uses it for truncation; if positive interger, the
        method uses the argument for the truncation; if float between 0 and 1,
        the rank is the number of the biggest singular values that are needed
        to reach the 'energy' specified by `svd_rank`; if -1, the method does
        not compute truncation.
    :type svd_rank: int or float
    :param svd_rank_omega: the rank for the truncation of the aumented matrix
        omega composed by the left snapshots matrix and the control. Used only
        for the `_fit_B_unknown` method of this class. It should be greater or
        equal than `svd_rank`. For the possible values please refer to the
        `svd_rank` parameter description above.
    :type svd_rank_omega: int or float
    :param int tlsq_rank: rank truncation computing Total Least Square. Default
        is 0, that means no truncation.
    """

    def __init__(self, svd_rank, svd_rank_omega, tlsq_rank):
        super(DMDControlOperator, self).__init__(
            svd_rank=svd_rank,
            exact=True,
            rescale_mode=None,
            forward_backward=False,
            sorted_eigs=False,
            tikhonov_regularization=None,
        )
        self._svd_rank_omega = svd_rank_omega
        self._tlsq_rank = tlsq_rank


class DMDBKnownOperator(DMDControlOperator):
    """
    DMD with control base operator when B is given.

    :param svd_rank: the rank for the truncation; If 0, the method computes the
        optimal rank and uses it for truncation; if positive interger, the
        method uses the argument for the truncation; if float between 0 and 1,
        the rank is the number of the biggest singular values that are needed
        to reach the 'energy' specified by `svd_rank`; if -1, the method does
        not compute truncation.
    :type svd_rank: int or float
    :param svd_rank_omega: the rank for the truncation of the aumented matrix
        omega composed by the left snapshots matrix and the control. Used only
        for the `_fit_B_unknown` method of this class. It should be greater or
        equal than `svd_rank`. For the possible values please refer to the
        `svd_rank` parameter description above.
    :type svd_rank_omega: int or float
    :param int tlsq_rank: rank truncation computing Total Least Square. Default
        is 0, that means no truncation.
    """

    def compute_operator(self, X, Y, B, controlin):
        """
        Compute the low-rank operator. This is the standard version of the DMD
        operator, with a correction which depends on B.

        :param numpy.ndarray X: matrix containing the snapshots x0,..x{n-1} by
            column.
        :param numpy.ndarray Y: matrix containing the snapshots x1,..x{n} by
            column.
        :param numpy.ndarray B: the matrix B.
        :param numpy.ndarray control: the control input.
        :return: the (truncated) left-singular vectors matrix, the (truncated)
            singular values array, the (truncated) right-singular vectors
            matrix of X.
        :rtype: numpy.ndarray, numpy.ndarray, numpy.ndarray
        """
        X, Y = compute_tlsq(X, Y, self._tlsq_rank)
        Y = Y - B.dot(controlin)
        return super(DMDBKnownOperator, self).compute_operator(X, Y)


class DMDBUnknownOperator(DMDControlOperator):
    """
    DMD with control base operator when B is unknown.

    :param svd_rank: the rank for the truncation; If 0, the method computes the
        optimal rank and uses it for truncation; if positive interger, the
        method uses the argument for the truncation; if float between 0 and 1,
        the rank is the number of the biggest singular values that are needed
        to reach the 'energy' specified by `svd_rank`; if -1, the method does
        not compute truncation.
    :type svd_rank: int or float
    :param svd_rank_omega: the rank for the truncation of the aumented matrix
        omega composed by the left snapshots matrix and the control. Used only
        for the `_fit_B_unknown` method of this class. It should be greater or
        equal than `svd_rank`. For the possible values please refer to the
        `svd_rank` parameter description above.
    :type svd_rank_omega: int or float
    :param int tlsq_rank: rank truncation computing Total Least Square. Default
        is 0, that means no truncation.
    """

    def compute_operator(self, X, Y, controlin):
        """
        Compute the low-rank operator.

        :param numpy.ndarray X: matrix containing the snapshots x0,..x{n-1} by
            column.
        :param numpy.ndarray Y: matrix containing the snapshots x1,..x{n} by
            column.
        :param numpy.ndarray control: the control input.
        :return: the (truncated) left-singular vectors matrix of Y, and
            the product between the left-singular vectors of Y and Btilde.
        :rtype: numpy.ndarray, numpy.ndarray
        """
        snapshots_rows = X.shape[0]

        omega = np.vstack([X, controlin])

        Up, sp, Vp = compute_svd(omega, self._svd_rank_omega)

        Up1 = Up[:snapshots_rows, :]
        Up2 = Up[snapshots_rows:, :]

        Ur, _, _ = compute_svd(Y, self._svd_rank)

        self._Atilde = np.linalg.multi_dot(
            [Ur.T.conj(), Y, Vp, np.diag(np.reciprocal(sp)), Up1.T.conj(), Ur]
        )
        self._compute_eigenquantities()
        self._compute_modes(Y, sp, Vp, Up1, Ur)

        Btilde = np.linalg.multi_dot(
            [Ur.T.conj(), Y, Vp, np.diag(np.reciprocal(sp)), Up2.T.conj()]
        )

        return Ur, Ur.dot(Btilde)

    def _compute_modes(self, Y, sp, Vp, Up1, Ur):
        """
        Private method that computes eigenvalues and eigenvectors of the
        high-dimensional operator (stored in self.modes and self.Lambda).
        """

        self._modes = np.linalg.multi_dot(
            [
                Y,
                Vp,
                np.diag(np.reciprocal(sp)),
                Up1.T.conj(),
                Ur,
                self.eigenvectors,
            ]
        )
        self._Lambda = self.eigenvalues


[docs]class DMDc(DMDBase): """ Dynamic Mode Decomposition with control. This version does not allow to manipulate the temporal window within the system is reconstructed. :param svd_rank: the rank for the truncation; If 0, the method computes the optimal rank and uses it for truncation; if positive interger, the method uses the argument for the truncation; if float between 0 and 1, the rank is the number of the biggest singular values that are needed to reach the 'energy' specified by `svd_rank`; if -1, the method does not compute truncation. :type svd_rank: int or float :param int tlsq_rank: rank truncation computing Total Least Square. Default is 0, that means no truncation. :param opt: argument to control the computation of DMD modes amplitudes. See :class:`DMDBase`. Default is False. :type opt: bool or int :param svd_rank_omega: the rank for the truncation of the aumented matrix omega composed by the left snapshots matrix and the control. Used only for the `_fit_B_unknown` method of this class. It should be greater or equal than `svd_rank`. For the possible values please refer to the `svd_rank` parameter description above. :type svd_rank_omega: int or float :param lag: the time lag for the snapshots. Used in fit method to generate X and Y. :type lag: int """ def __init__( self, svd_rank=0, tlsq_rank=0, opt=False, svd_rank_omega=-1, lag=1 ): # we're going to initialize Atilde when we know if B is known self._Atilde = None # remember the arguments for when we'll need them self._dmd_operator_kwargs = { "svd_rank": svd_rank, "svd_rank_omega": svd_rank_omega, "tlsq_rank": tlsq_rank, } self._opt = opt self._exact = False self._B = None self._snapshots_holder = None self._controlin = None self._basis = None self._lag = lag self._modes_activation_bitmask_proxy = None @property def svd_rank_omega(self): return self.operator._svd_rank_omega @property def B(self): """ Get the operator B. :return: the operator B. :rtype: numpy.ndarray """ return self._B @property def basis(self): """ Get the basis used to reduce the linear operator to the low dimensional space. :return: the matrix which columns are the basis vectors. :rtype: numpy.ndarray """ return self._basis
[docs] def reconstructed_data(self, control_input=None): """ Return the reconstructed data, computed using the `control_input` argument. If the `control_input` is not passed, the original input (in the `fit` method) is used. The input dimension has to be consistent with the dynamics. :param numpy.ndarray control_input: the input control matrix. :return: the matrix that contains the reconstructed snapshots. :rtype: numpy.ndarray """ controlin = ( np.asarray(control_input) if control_input is not None else self._controlin ) if controlin.shape[-1] != self.dynamics.shape[-1] - self._lag: raise RuntimeError( "The number of control inputs and the number of snapshots to " "reconstruct has to be the same" ) eigs = np.power( self.eigs, self.dmd_time["dt"] // self.original_time["dt"] ) A = np.linalg.multi_dot( [self.modes, np.diag(eigs), np.linalg.pinv(self.modes)] ) data = [self.snapshots[:, i] for i in range(self._lag)] expected_shape = data[0].shape for i, u in enumerate(controlin.T): arr = A.dot(data[i]) + self._B.dot(u) if arr.shape != expected_shape: raise ValueError( f"Invalid shape: expected {expected_shape}, got {arr.shape}" ) data.append(arr) data = np.array(data).T return data
[docs] def fit(self, X, I, B=None): """ Compute the Dynamic Modes Decomposition with control given the original snapshots and the control input data. The matrix `B` that controls how the control input influences the system evolution can be provided by the user; otherwise, it is computed by the algorithm. :param X: the input snapshots. :type X: numpy.ndarray or iterable :param I: the control input. :type I: numpy.ndarray or iterable :param numpy.ndarray B: matrix that controls the control input influences the system evolution. :type B: numpy.ndarray or iterable """ self._reset() self._controlin = np.atleast_2d(np.asarray(I)) self._snapshots_holder = Snapshots(X) n_samples = self.snapshots.shape[-1] if self._lag < 1: raise ValueError("Time lag must be positive.") X = self.snapshots[:, : -self._lag] Y = self.snapshots[:, self._lag :] self._set_initial_time_dictionary( {"t0": 0, "tend": n_samples - 1, "dt": 1} ) if B is None: self._Atilde = DMDBUnknownOperator(**self._dmd_operator_kwargs) self._basis, self._B = self.operator.compute_operator( X, Y, self._controlin ) else: self._Atilde = DMDBKnownOperator(**self._dmd_operator_kwargs) U, _, _ = self.operator.compute_operator(X, Y, B, self._controlin) self._basis = U self._B = B self._b = self._compute_amplitudes() return self