Source code for pydmd.optdmd

"""
Derived module from :meth:`pydmd.dmdbase` for the optimal closed-form solution
to dmd.

.. note::

    P. Heas & C. Herzet. Low-rank dynamic mode decomposition: optimal
    solution in polynomial time. arXiv:1610.02962. 2016.

"""

import numpy as np
from scipy.linalg import eig

from .dmdbase import DMDBase
from .dmdoperator import DMDOperator
from .snapshots import Snapshots
from .utils import compute_svd, compute_tlsq


def pinv_diag(x):
    """
    Utility function to compute the pseudo-inverse of a diagonal matrix.

    :param array_like x: diagonal of the matrix to be pseudo-inversed.
    :return: the computed pseudo-inverse
    :rtype: numpy.ndarray
    """
    t = x.dtype.char.lower()
    factor = {"f": 1e2, "d": 1e4}
    rcond = factor[t] * np.finfo(t).eps

    y = np.zeros(*x.shape)

    y[x > rcond] = np.reciprocal(x[x > rcond])

    return np.diag(y)


class DMDOptOperator(DMDOperator):
    """
    DMD operator for OptDMD.

    :param svd_rank: the rank for the truncation; If 0, the method computes the
        optimal rank and uses it for truncation; if positive interger, the
        method uses the argument for the truncation; if float between 0 and 1,
        the rank is the number of the biggest singular values that are needed
        to reach the 'energy' specified by `svd_rank`; if -1, the method does
        not compute truncation.
    :type svd_rank: int or float
    :param str factorization: compute either the eigenvalue decomposition of
        the unknown high-dimensional DMD operator (factorization="evd") or
        its singular value decomposition (factorization="svd"). Default is
        "evd".
    """

    def __init__(self, svd_rank, factorization):
        super().__init__(
            svd_rank=svd_rank,
            exact=True,
            forward_backward=False,
            rescale_mode=None,
            sorted_eigs=False,
            tikhonov_regularization=None,
        )
        self._factorization = factorization

    @property
    def right_eigenvectors(self):
        if self._factorization == "evd":
            return self._right_eigenvectors
        raise ValueError("Eigenquantities haven't been computed yet.")

    def compute_operator(self, X, Y):
        """
        Compute the low-rank operator.

        :param numpy.ndarray X: matrix containing the snapshots x0,..x{n-1} by
            column.
        :param numpy.ndarray Y: matrix containing the snapshots x1,..x{n} by
            column.
        :return: Left singular vectors of Z, and Q.
        :rtype: numpy.ndarray, numpy.ndarray
        """

        Ux, Sx, Vx = compute_svd(X, -1)

        Z = np.linalg.multi_dot(
            [Y, Vx, np.diag(Sx), pinv_diag(Sx), Vx.T.conj()]
        )

        Uz, _, _ = compute_svd(Z, self._svd_rank)

        Q = np.linalg.multi_dot(
            [Uz.T.conj(), Y, Vx, pinv_diag(Sx), Ux.T.conj()]
        ).T.conj()

        self._Atilde = Q.T.conj().dot(Uz)
        if self._factorization == "evd":
            self._compute_eigenquantities(Uz, Q)

        return Uz, Q

    """
        Private method that computes eigenvalues and eigenvectors of the
        low-dimensional operator.

        :param numpy.ndarray P: Left singular vectors of Z.
        :param numpy.ndarray Q: The matrix Q.
    """

    def _compute_eigenquantities(self, P, Q):
        Atilde = self.as_numpy_array

        vals, vecs_left, vecs_right = eig(Atilde, left=True, right=True)

        # --> Build the matrix of right eigenvectors.
        right_vecs = np.linalg.multi_dot([P, Atilde, vecs_right])
        right_vecs = right_vecs.dot(pinv_diag(vals))

        # --> Build the matrix of left eigenvectors.
        left_vecs = Q.dot(vecs_left)
        left_vecs = left_vecs.dot(pinv_diag(vals))

        # --> Rescale the left eigenvectors.
        m = np.diag(left_vecs.T.conj().dot(right_vecs))
        left_vecs = left_vecs.dot(pinv_diag(m))

        self._eigenvalues = vals
        self._eigenvectors = left_vecs
        self._right_eigenvectors = right_vecs

    def _compute_modes(self, Y, U, Sigma, V):
        raise NotImplementedError("This function has not been implemented yet.")


[docs]class OptDMD(DMDBase): """ Dynamic Mode Decomposition This class implements the closed-form solution to the DMD minimization problem. It relies on the optimal solution given by [HeasHerzet16]_. .. [HeasHerzet16] P. Heas & C. Herzet. Low-rank dynamic mode decomposition: optimal solution in polynomial time. arXiv:1610.02962. 2016. :param str factorization: compute either the eigenvalue decomposition of the unknown high-dimensional DMD operator (factorization="evd") or its singular value decomposition (factorization="svd"). Default is "evd". :param svd_rank: the rank for the truncation; If 0, the method computes the optimal rank and uses it for truncation; if positive interger, the method uses the argument for the truncation; if float between 0 and 1, the rank is the number of the biggest singular values that are needed to reach the 'energy' specified by `svd_rank`; if -1, the method does not compute truncation. :type svd_rank: int or float :param int tlsq_rank: rank truncation computing Total Least Square. Default is 0, that means TLSQ is not applied. :param opt: argument to control the computation of DMD modes amplitudes. See :class:`DMDBase`. Default is False. :type opt: bool or int """ def __init__(self, factorization="evd", svd_rank=0, tlsq_rank=0, opt=False): self._factorization = factorization self._tlsq_rank = tlsq_rank self._Atilde = DMDOptOperator( svd_rank=svd_rank, factorization=factorization ) self._svds = None self._input_space = None self._output_space = None self._input_holder = None self._output_holder = None @property def factorization(self): return self._factorization @property def modes(self): return self._output_space @property def eigs(self): return self.operator.eigenvalues @property def amplitudes(self): return self._b
[docs] def fit(self, X, Y=None): """ Compute the Dynamic Modes Decomposition to the input data. :param X: the input snapshots. :type X: numpy.ndarray or iterable :param Y: the input snapshots at sequential timestep, if passed. Default is None. :type Y: numpy.ndarray or iterable """ self._reset() if Y is None: self._snapshots_holder = Snapshots(X) X = self.snapshots[:, :-1] # x = x[k] Y = self.snapshots[:, 1:] # y = x[k+1] else: self._input_holder = Snapshots(X) X = self._input_holder.snapshots self._output_holder = Snapshots(Y) Y = self._output_holder.snapshots X, Y = compute_tlsq(X, Y, self._tlsq_rank) Uz, Q = self.operator.compute_operator(X, Y) if self.factorization == "svd": # --> DMD basis for the input space. self._input_space = Q # --> DMD basis for the output space. self._output_space = Uz elif self.factorization == "evd": # --> Compute DMD eigenvalues and right/left eigenvectors self._input_space = self.eigs self._output_space = self.operator.right_eigenvectors return self
[docs] def predict(self, X): """ Predict the output Y given the input X using the fitted DMD model. :param numpy.ndarray X: the input vector. :return: one time-step ahead predicted output. :rtype: numpy.ndarray """ if self.factorization == "svd": Y = np.linalg.multi_dot( [self._output_space, self._input_space.T.conj(), X] ) elif self.factorization == "evd": Y = np.linalg.multi_dot( [ self._output_space, np.diag(self._eigs), self._input_space.T.conj(), X, ] ) return Y
@property def modes_activation_bitmask(self): raise RuntimeError("This feature has not been implemented yet.") @modes_activation_bitmask.setter def modes_activation_bitmask(self, value): raise RuntimeError("This feature has not been implemented yet.")
[docs] def _compute_amplitudes(self, modes, snapshots, eigs, opt): raise NotImplementedError("This function has not been implemented yet.")
@property def dynamics(self): raise NotImplementedError("This function has not been implemented yet.") @property def fitted(self): raise NotImplementedError("This function has not been implemented yet.") @property def modes_activation_bitmask(self): raise NotImplementedError("This function has not been implemented yet.") @modes_activation_bitmask.setter def modes_activation_bitmask(self, value): raise NotImplementedError("This function has not been implemented yet.")