Source code for pydmd.varprodmd

"""
Variable Projection for DMD. Reformulation of original paper
(https://epubs.siam.org/doi/abs/10.1137/M1124176) s.t. sparse matrix computation
is substiuted by outer products. Further the optimization is
reformulated s.t. SciPy's nonlinear least squares optimizer
can handle "complex" parameters.

Default optimizer arguments:
    OPT_DEF_ARGS =
        {"method": "trf",
        "tr_solver": "exact",
        "loss": "linear",
        "x_scale": "jac",
        "gtol": 1e-8,
        "xtol": 1e-8,
        "ftol": 1e-8}
"""

import warnings
from types import MappingProxyType
from typing import Any, Dict, Tuple, Union

import numpy as np
from scipy.linalg import qr
from scipy.optimize import OptimizeResult, least_squares

from .dmd import DMDBase
from .dmdoperator import DMDOperator
from .snapshots import Snapshots
from .utils import compute_svd

OPT_DEF_ARGS = MappingProxyType(
    {
        "method": "trf",
        "tr_solver": "exact",
        "loss": "linear",
        "x_scale": "jac",
        "gtol": 1e-8,
        "xtol": 1e-8,
        "ftol": 1e-8,
    }
)


def _compute_dmd_ev(
    x_current: np.ndarray,
    x_next: np.ndarray,
    rank: Union[float, int] = 0,
) -> np.ndarray:
    r"""
    Compute DMD eigenvalues.

    :param x_current: Observables :math:`\boldsymbol{X}`
    :type x_current: np.ndarray
    :param x_next: Observables :math:`\boldsymbol{X}'`
    :type x_current: np.ndarray
    :param rank: Desired rank. If rank :math:`r = 0`, the optimal rank is
        determined automatically. If rank is a float s.t. :math: `0 < r < 1`,
        the cumulative energy of the singular values is used
        to determine the optimal rank. If rank is an integer and :math:`r > 0`,
        the desired rank is used iff possible. Defaults to 0.
    :type rank: Union[float, int], optional
    :return: Diagonal matrix of eigenvalues
        :math:`\boldsymbol{\Lambda}` as 1d array.
    :rtype: np.ndarray
    """

    u_x, sigma_x, v_x = compute_svd(x_current, rank)

    # columns of v need to be multiplicated with inverse sigma
    sigma_inv_approx = np.reciprocal(sigma_x)

    a_approx = np.linalg.multi_dot(
        [u_x.conj().T, x_next, sigma_inv_approx[None] * v_x]
    )

    return np.linalg.eigvals(a_approx)


class _OptimizeHelper:  # pylint: disable=too-few-public-methods
    """
    Helper Class to store intermediate results during the optimization.
    """

    __slots__ = ["phi", "phi_inv", "u_svd", "s_inv", "v_svd", "b_matrix", "rho"]

    def __init__(self, l_in: int, m_in: int, n_in: int):
        self.phi = np.empty((m_in, l_in), dtype=np.complex128)
        self.u_svd = np.empty((m_in, l_in), dtype=np.complex128)
        self.s_inv = np.empty((l_in,), dtype=np.complex128)
        self.v_svd = np.empty((l_in, l_in), dtype=np.complex128)
        self.b_matrix = np.empty((l_in, n_in), dtype=np.complex128)
        self.rho = np.empty((m_in, n_in), dtype=np.complex128)


def _compute_dmd_rho(
    alphas: np.ndarray,
    time: np.ndarray,
    data: np.ndarray,
    opthelper: _OptimizeHelper,
) -> np.ndarray:
    r"""
    Compute the real residual vector :math:`\boldsymbol{\rho}` for
    Levenberg-Marquardt update.

    :param alphas: DMD eigenvalues to optimize,
        where :math:`\alpha \in \mathbb{C}^l`,
        but here :math:`\alpha \in \mathbb{R}^{2l}`,
        since optimizer cannot deal with complex numbers.
    :type alphas: np.ndarray
    :param time: 1D time array.
    :type time: np.ndarray
    :param data: data :math:`\boldsymbol{Y} \n C^{m \times n}`.
        For DMD computation we set :math:`\boldsymbol{Y} = \boldsymbol{X}^T`.
    :type data: np.ndarray
    :param opthelper: Optimization helper to speed up computations
        mainly for Jacobian.
    :type opthelper: _OptimizeHelper
    :return: 1d residual vector for Levenberg-Marquardt update
        :math:`\boldsymbol{\rho} \in \mathbb{R}^{2mn}`.
    :rtype: np.ndarray
    """

    _alphas = np.zeros((alphas.shape[-1] // 2,), dtype=np.complex128)
    _alphas.real = alphas[: alphas.shape[-1] // 2]
    _alphas.imag = alphas[alphas.shape[-1] // 2 :]

    phi = np.exp(np.outer(time, _alphas))
    u_phi, s_phi, v_phi_t = np.linalg.svd(phi, full_matrices=False)
    idx = np.where(s_phi.real != 0.0)[0]
    s_phi_inv = np.zeros_like(s_phi)
    s_phi_inv[idx] = np.reciprocal(s_phi[idx])

    rho = data - np.linalg.multi_dot([u_phi, u_phi.conj().T, data])
    rho_flat = np.ravel(rho)
    rho_out = np.zeros((2 * rho_flat.shape[-1],), dtype=np.float64)
    rho_out[: rho_flat.shape[-1]] = rho_flat.real
    rho_out[rho_flat.shape[-1] :] = rho_flat.imag

    opthelper.phi = phi
    opthelper.u_svd = u_phi
    opthelper.s_inv = s_phi_inv
    opthelper.v_svd = v_phi_t.conj().T
    opthelper.rho = rho
    opthelper.b_matrix = np.linalg.multi_dot(
        [
            opthelper.v_svd * s_phi_inv[None],
            opthelper.u_svd.conj().T,
            data,
        ]
    )
    return rho_out


def _compute_dmd_jac(
    alphas: np.ndarray,
    time: np.ndarray,
    data: np.ndarray,
    opthelper: _OptimizeHelper,
) -> np.ndarray:
    r"""
    Compute the real Jacobian.
    SciPy's nonlinear least squares optimizer requires real entities.
    Therefore, complex and real parts are split.

    :param alphas: DMD eigenvalues to optimize,
        where :math:`\alpha \in \mathbb{C}^l`,
        but here :math:`\alpha \in \mathbb{R}^{2l}` since optimizer cannot
        deal with complex numbers.
    :type alphas: np.ndarray
    :param time: 1D time array.
    :type time: np.ndarray
    :param data: data :math: `\boldsymbol{Y} \n C^{m \times n}`.
        For DMD computation we set :math:`\boldsymbol{Y} = \boldsymbol{X}^T`.
    :type data: np.ndarray
    :param opthelper: Optimization helper to speed up computations
        mainly for Jacobian. The entities are computed in `_compute_dmd_rho`.
    :type opthelper: _OptimizeHelper
    :return: Jacobian :math:`\boldsymbol{J} \in \mathbb{R}^{mn \times 2l}`.
    :rtype: np.ndarray
    """

    _alphas = np.zeros((alphas.shape[-1] // 2,), dtype=np.complex128)
    _alphas.real = alphas[: alphas.shape[-1] // 2]
    _alphas.imag = alphas[alphas.shape[-1] // 2 :]
    jac_out = np.zeros((2 * np.prod(data.shape), alphas.shape[-1]))

    for j in range(_alphas.shape[-1]):
        d_phi_j = time * opthelper.phi[:, j]
        outer = np.outer(d_phi_j, opthelper.b_matrix[j])
        a_j = outer - np.linalg.multi_dot(
            [opthelper.u_svd, opthelper.u_svd.conj().T, outer]
        )
        g_j = np.linalg.multi_dot(
            [
                opthelper.u_svd * opthelper.s_inv[None],
                np.outer(
                    opthelper.v_svd[j].conj(), d_phi_j.conj() @ opthelper.rho
                ),
            ]
        )
        # Compute the jacobian J_mat_j = - (A_j + G_j).
        jac = -a_j - g_j
        jac_flat = np.ravel(jac)

        # construct the overall jacobian for optimized
        # J_real = |Re{J} -Im{J}|
        #          |Im{J}  Re{J}|

        # construct real part for optimization
        jac_out[: jac_out.shape[0] // 2, j] = jac_flat.real
        jac_out[jac_out.shape[0] // 2 :, j] = jac_flat.imag

        # construct imaginary part for optimization
        jac_out[: jac_out.shape[0] // 2, _alphas.shape[-1] + j] = -jac_flat.imag
        jac_out[jac_out.shape[0] // 2 :, _alphas.shape[-1] + j] = jac_flat.real

    return jac_out


def _varpro_preprocessing(
    data: np.ndarray,
    time: np.ndarray,
    rank: Union[float, int] = 0.0,
    use_proj: bool = True,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
    r"""
    Preprocess data for Variable Projection: Calculate
    :math:`\boldsymbol{Y}, \boldsymbol{Z}` for trapezoidal derivative
    approximation. If desired input data is projected to
    low-dimensional space.

    :param data: data matrix s.t. :math:`X \n C^{n \times m}`.
    :type data: np.ndarray
    :param time: 1d array of timestamps.
    :type time: np.ndarray
    :param optargs: Arguments for 'least_squares' optimizer.
    :type optargs: Dict[str, Any]
    :param rank: Desired rank. If rank :math:`r = 0`, the optimal rank is
        determined automatically. If rank is a float s.t. :math:`0 < r < 1`,
        the cumulative energy of the singular values is used
        to determine the optimal rank. If rank is an integer
        and :math:`r > 0`, the desired rank is used iff possible.
        Defaults to 0.
    :type rank: Union[float, int], optional
    :param use_proj: Perform variable projection in
        low dimensional space if `use_proj=True`, else in the original space.
        Defaults to True.
    :type use_proj: bool, optional
    :return: Derivative :math:`\boldsymbol{Y},\boldsymbol{Z}`, (projected) data,
         rank reduced projection matrix :math:`\boldsymbol{U}_r`.
    :rtype: Tuple[np.ndarray,
                  np.ndarray,
                  np.ndarray,
                  np.ndarray]
    """

    u_r, s_r, v_r = compute_svd(data, rank)
    data_out = v_r.conj().T * s_r[:, None] if use_proj else data

    # trapezoidal derivative approximation
    y_out = (data_out[:, :-1] + data_out[:, 1:]) / 2.0
    dt_in = time[1:] - time[:-1]
    z_out = (data_out[:, 1:] - data_out[:, :-1]) / dt_in[None]

    return z_out, y_out, data_out, u_r


def _compute_dmd_varpro(
    alphas_init: np.ndarray,
    time: np.ndarray,
    data: np.ndarray,
    opthelper: _OptimizeHelper,
    **optargs,
) -> OptimizeResult:
    r"""
    Compute Variable Projection (VarPro) for DMD using SciPy's
    nonlinear least squares optimizer.

    :type alphas_init: np.ndarray
    :param time: 1d time array.
    :type time: np.ndarray
    :param data: data :math:`\boldsymbol{Y} \n C^{m \times n}`.
        For DMD computation we set :math:`\boldsymbol{Y} = \boldsymbol{X}^T`.
    :type data: np.ndarray
    :param opthelper: Optimization helper to speed up computations
        mainly for Jacobian. The entities are computed in `_compute_dmd_rho`
        and are used in `_compute_dmd_jac`.
    :type opthelper: _OptimizeHelper
    :return: Optimization result.
    :rtype: OptimizeResult
    """

    return least_squares(
        _compute_dmd_rho,
        alphas_init,
        _compute_dmd_jac,
        **optargs,
        args=[time, data, opthelper],
    )


def select_best_samples_fast(data: np.ndarray, comp: float = 0.9) -> np.ndarray:
    r"""
    Select library samples using QR decomposition with column pivoting.

    :param data: Data matrix :math:`\boldsymbol{X} \in \mathbb{C}^{n \times m}`.
    :type data: np.ndarray
    :param comp: Library compression :math:`c`, where :math:`0 < c < 1`.
        The best fitting :math:`\lfloor \left(1 - c\right)m\rfloor` samples
        are selected. Defaults to 0.9.
    :type comp: float, optional
    :raises ValueError: ValueError is raised if data matrix is not a
        2d array.
    :raises ValueError: ValueError is raised of compression is not in required
        interval (:math:`0 < r < 1`).
    :return: Indices of selected samples as 1d array.
    :rtype: np.ndarray
    """

    if len(data.shape) != 2:
        raise ValueError("Expected 2D array!")

    if not 0 < comp < 1:
        raise ValueError("Compression must be in (0, 1)]")

    n_samples = int(data.shape[-1] * (1.0 - comp))
    pcolumn = qr(data, mode="economic", pivoting=True)[-1]

    return pcolumn[:n_samples]


def compute_varprodmd_any(  # pylint: disable=unused-variable
    data: np.ndarray,
    time: np.ndarray,
    optargs: Dict[str, Any],
    rank: Union[float, int] = 0.0,
    use_proj: bool = True,
    compression: float = 0,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, OptimizeResult]:
    r"""
    Compute DMD given arbitary timesteps.

    :param data: data matrix s.t. :math:`X \n C^{n \times m}`.
    :type data: np.ndarray
    :param time: 1d array of timestamps.
    :type time: np.ndarray
    :param optargs: Arguments for 'least_squares' optimizer.
    :type optargs: Dict[str, Any]
    :param rank: Desired rank. If rank :math:`r = 0`, the optimal rank is
        determined automatically. If rank is a float s.t. :math:`0 < r < 1`,
        the cumulative energy of the singular values is used
        to determine the optimal rank. If rank is an integer
        and :math:`r > 0`, the desired rank is used iff possible.
        Defaults to 0.
    :type rank: Union[float, int], optional
    :param use_proj: Perform variable projection in
        low dimensional space if `use_proj=True`, else in the original space.
        Defaults to True.
    :type use_proj: bool, optional
    :param compression: If libary compression :math:`c = 0`,
        all samples are used. If :math:`0 < c < 1`, the best
        fitting :math:`\lfloor \left(1 - c\right)m\rfloor` samples
        are selected.
    :type compression: float, optional
    :raises ValueError: ValueError is raised if data matrix is not a
        2d array.
    :raises ValueError: ValueError is raised if time is not a
        1d array.
    :return: DMD modes :math:`\boldsymbol{\Phi}`, continuous DMD eigenvalues
        :math: `\boldsymbol{\Omega}` as 1d array,
        DMD eigenfunctions or amplitudes :math:`\boldsymbol{\varphi}`,
        optimization results of SciPy's nonlinear least squares optimizer.
    :rtype: Tuple[np.ndarray,
                  np.ndarray,
                  np.ndarray,
                  np.ndarray,
                  OptimizeResult]
    """

    if len(data.shape) != 2:
        raise ValueError("data needs to be 2D array")

    if len(time.shape) != 1:
        raise ValueError("time needs to be a 1D array")

    #  y_in, z_in, data_in, u_r
    res = _varpro_preprocessing(data, time, rank, use_proj)
    omegas = _compute_dmd_ev(res[0], res[1], res[-1].shape[-1])

    if compression > 0:
        indices = select_best_samples_fast(res[2], compression)

        if indices.size <= 1:
            indices = np.arange(res[2])

    else:
        indices = np.arange(res[2].shape[-1])

    if res[2].shape[-1] < omegas.shape[-1]:
        warnings.warn(
            " ".join(
                (
                    "Attempting to solve underdetermined system.",
                    "Decrease desired rank or compression!",
                )
            )
        )

    opthelper = _OptimizeHelper(res[-1].shape[-1], *res[2].shape)
    opt = _compute_dmd_varpro(
        np.concatenate([omegas.real, omegas.imag]),
        time[indices],
        res[2][:, indices].T,
        opthelper,
        **optargs,
    )
    omegas.real = opt.x[: opt.x.shape[-1] // 2]
    omegas.imag = opt.x[opt.x.shape[-1] // 2 :]
    xi = res[-1] @ opthelper.b_matrix.T if use_proj else opthelper.b_matrix.T
    eigenf = np.linalg.norm(xi, axis=0)
    return xi / eigenf[None], omegas, eigenf, indices, opt


def varprodmd_predict(
    phi: np.ndarray,
    omegas: np.ndarray,
    eigenf: np.ndarray,
    time: np.ndarray,
) -> np.ndarray:
    r"""
    Perform DMD prediction using computed modes, continuous eigenvalues
    and eigenfunctions/amplitudes.

    :param phi: DMD modes
        :math:`\boldsymbol{\Phi} \in \mathbb{C}^{n \times \left(m-1\right)}`.
    :type phi: np.ndarray
    :param omegas: Continuous diagonal matrix of eigenvalues
        :math:`\boldsymbol{\Omega} \in
            \mathbb{C}^{\left(m-1\right) \times \left(m-1\right)}`
        as 1d array.
    :type omegas: np.ndarray
    :param eigenf: Eigenfunctions or amplitudes
        :math:`\boldsymbol{\varphi} \in \mathbb{C}^{m - 1}`.
    :type eigenf: np.ndarray
    :param time: 1d array of timestamps.
    :type time: np.ndarray
    :return: Reconstructed/predicted state :math:`\hat{\boldsymbol{X}}`.
    :rtype: np.ndarray
    """

    return phi @ (np.exp(np.outer(omegas, time)) * eigenf[:, None])


[docs]class VarProOperator(DMDOperator): """ Variable Projection Operator for DMD. """ def __init__( self, svd_rank: Union[float, int], exact: bool, sorted_eigs: Union[bool, str], compression: float, optargs: Dict[str, Any], ): r""" VarProOperator constructor. :param svd_rank: Desired SVD rank. If rank :math:`r = 0`, the optimal rank is determined automatically. If rank is a float s.t. :math:`0 < r < 1`, the cumulative energy of the singular values is used to determine the optimal rank. If rank is an integer and :math:`r > 0`, the desired rank is used iff possible. :type svd_rank: Union[float, int] :param exact: Perform computations in original state space if `exact=True`, else perform optimization in projected (low dimensional) space. :type exact: bool :param sorted_eigs: Sort eigenvalues. If `sorted_eigs=True`, the variance of the absolute values of the complex eigenvalues :math:`\sqrt{\omega_i \cdot \bar{\omega}_i}`, the variance absolute values of the real parts :math:`\left|\Re\{{\omega_i}\}\right|` and the variance of the absolute values of the imaginary parts :math:`\left|\Im\{{\omega_i}\}\right|` is computed. The eigenvalues are then sorted according to the highest variance (from highest to lowest). If `sorted_eigs=False`, no sorting is performed. If the parameter is a string and set to sorted_eigs='auto', the eigenvalues are sorted accoring to the variances of previous mentioned quantities. If `sorted_eigs='real'` the eigenvalues are sorted w.r.t. the absolute values of the real parts of the eigenvalues (from highest to lowest). If `sorted_eigs='imag'` the eigenvalues are sorted w.r.t. the absolute values of the imaginary parts of the eigenvalues (from highest to lowest). If `sorted_eigs='abs'` the eigenvalues are sorted w.r.t. the magnitude of the eigenvalues :math:`\left(\sqrt{\omega_i \cdot \bar{\omega}_i}\right)` (from highest to lowest). :type sorted_eigs: Union[bool, str] :param compression: If libary compression :math:`c = 0`, all samples are used. If :math:`0 < c < 1`, the best fitting :math:`\lfloor \left(1 - c\right)m\rfloor` samples are selected. :type compression: float :param optargs: Arguments for 'least_squares' optimizer. Use `OPT_DEF_ARGS` as starting point. :type optargs: Dict[str, Any] """ super().__init__(svd_rank, exact, False, None, sorted_eigs, False) self._sorted_eigs = sorted_eigs self._svd_rank = svd_rank self._exact = exact self._optargs: Dict[str, Any] = optargs self._compression: float = compression self._modes: np.ndarray = None self._eigenvalues: np.ndarray = None
[docs] def compute_operator( self, X: np.ndarray, Y: np.ndarray ) -> Tuple[np.ndarray, OptimizeResult, np.ndarray]: r""" Perform Variable Projection for DMD using SciPy's nonlinear least squares optimizer. :param X: Measurement :math:`\boldsymbol{X} \in \mathbb{C}^{n \times m}` :type X: np.ndarray :param Y: 1d array of timestamps where individual measurements :math:`\boldsymbol{x}_i \in \mathbb{C}^n` where taken. :type Y: np.ndarray :raises ValueError: If `sorted_eigs` from constructor was set to an invalid string. :return: Eigenfunctions/amplitudes :math:`\boldsymbol{\varphi}^{m-1}`, OptimizeResult from SciPy's optimizer (optimal parameters and statistics), indices of selected measurements. If no compression (:math:`c = 0`) is used, all indices are returned, else the indices of the selected samples are used. :rtype: Tuple[np.ndarray, OptimizeResult, np.ndarray] """ ( self._modes, self._eigenvalues, eigenf, indices, opt, ) = compute_varprodmd_any( X, Y, self._optargs, self._svd_rank, not self._exact, self._compression, ) # overwrite for lazy sorting if isinstance(self._sorted_eigs, bool): if self._sorted_eigs: self._sorted_eigs = "auto" if isinstance(self._sorted_eigs, str): if self._sorted_eigs == "auto": eigs_real = self._eigenvalues.real eigs_imag = self._eigenvalues.imag _eigs_abs = np.abs(self._eigenvalues) var_real = np.var(eigs_real) var_imag = np.var(eigs_imag) var_abs = np.var(_eigs_abs) array = np.array([var_real, var_imag, var_abs]) eigs_abs = (eigs_real, eigs_imag, _eigs_abs)[np.argmax(array)] elif self._sorted_eigs == "real": eigs_abs = np.abs(self._eigenvalues.real) elif self._sorted_eigs == "imag": eigs_abs = np.abs(self._eigenvalues.imag) elif self._sorted_eigs == "abs": eigs_abs = np.abs(self._eigenvalues) else: raise ValueError(f"{self._sorted_eigs} not supported!") idx = np.argsort(eigs_abs)[::-1] # sort from biggest to smallest self._eigenvalues = self._eigenvalues[idx] self._modes = self._modes[:, idx] eigenf = eigenf[idx] return eigenf, opt, indices
[docs]class VarProDMD(DMDBase): """ Variable Projection for DMD using SciPy's nonlinear least squares solver. The original problem is reformulated s.t. complex residuals and Jacobians, which are used by the Levenberg-Marquardt algorithm, are transormed to real numbers. Further simplifications (outer products) avoids using sparse matrices. """ def __init__( # pylint: disable=super-init-not-called self, svd_rank: Union[float, int] = 0, exact: bool = False, sorted_eigs: Union[bool, str] = False, compression: float = 0.0, optargs: Dict[str, Any] = None, ): r""" VarProDMD constructor. :param svd_rank: Desired SVD rank. If rank :math:`r = 0`, the optimal rank is determined automatically. If rank is a float s.t. :math:`0 < r < 1`, the cumulative energy of the singular values is used to determine the optimal rank. If rank is an integer and :math:`r > 0`, the desired rank is used iff possible. Defaults to 0. :type svd_rank: Union[float, int], optional :param exact: Perform variable projection in low dimensional space if `exact=False`. Else the optimization is performed in the original space. Defaults to False. :type exact: bool, optional :param sorted_eigs: Sort eigenvalues. If `sorted_eigs=True`, the variance of the absolute values of the complex eigenvalues :math:`\left(\sqrt{\omega_i \cdot \bar{\omega}_i}\right)`, the variance absolute values of the real parts :math:`\left|\Re\{{\omega_i}\}\right|` and the variance of the absolute values of the imaginary parts :math:`\left|\Im\{{\omega_i}\}\right|` is computed. The eigenvalues are then sorted according to the highest variance (from highest to lowest). If `sorted_eigs=False`, no sorting is performed. If the parameter is a string and set to sorted_eigs='auto', the eigenvalues are sorted accoring to the variances of previous mentioned quantities. If `sorted_eigs='real'` the eigenvalues are sorted w.r.t. the absolute values of the real parts of the eigenvalues (from highest to lowest). If `sorted_eigs='imag'` the eigenvalues are sorted w.r.t. the absolute values of the imaginary parts of the eigenvalues (from highest to lowest). If `sorted_eigs='abs'` the eigenvalues are sorted w.r.t. the magnitude of the eigenvalues :math:`\left(\sqrt{\omega_i \cdot \bar{\omega}_i}\right)` (from highest to lowest). Defaults to False. :type sorted_eigs: Union[bool, str], optional :param compression: If libary compression :math:`c = 0`, all samples are used. If :math:`0 < c < 1`, the best fitting :math:`\lfloor \left(1 - c\right)m\rfloor` samples are selected. :type compression: float, optional :param optargs: Arguments for 'least_squares' optimizer. If set to None, `OPT_DEF_ARGS` are used as default parameters. Defaults to None. :type optargs: Dict[str, Any], optional """ # super constructor not called # as most of the attributes are # not used. if optargs is None: optargs = OPT_DEF_ARGS self._Atilde = VarProOperator( svd_rank, exact, sorted_eigs, compression, optargs ) self._optres: OptimizeResult = None self._snapshots_holder: Snapshots = None self._indices: np.ndarray = None self._modes_activation_bitmask_proxy = None
[docs] def fit(self, X: np.ndarray, time: np.ndarray) -> object: r""" Fit the eigenvalues, modes and eigenfunctions/amplitudes to measurements. :param X: Measurements :math:`\boldsymbol{X} \in \mathbb{C}^{n \times m}`. :type X: np.ndarray :param time: 1d array of timestamps where measurements were taken. :type time: np.ndarray :return: VarProDMD instance. :rtype: object """ self._snapshots_holder = Snapshots(X) (self._b, self._optres, self._indices) = self._Atilde.compute_operator( self._snapshots_holder.snapshots.astype(np.complex128), time ) self._original_time = time self._dmd_time = time[self._indices] return self
[docs] def forecast(self, time: np.ndarray) -> np.ndarray: r""" Forecast measurements at given timestamps `time`. :param time: Desired times for forcasting as 1d array. :type time: np.ndarray :raises ValueError: If method `fit(X, time)` was not called. :return: Predicted measurements :math:`\hat{\boldsymbol{X}}`. :rtype: np.ndarray """ if not self.fitted: raise ValueError("Nothing fitted yet. Call fit-method first!") return varprodmd_predict( self._Atilde.modes, self._Atilde.eigenvalues, self._b, time )
@property def ssr(self) -> float: """ Compute the square root of sum squared residual (SSR) taken from https://link.springer.com/article/10.1007/s10589-012-9492-9. The SSR gives insight w.r.t. signal qualities. A low SSR is desired. If SSR is high the model may be inaccurate. :raises ValueError: ValueError is raised if method `fit(X, time)` was not called. :return: SSR. :rtype: float """ if not self.fitted: raise ValueError("Nothing fitted yet!") rho_flat_real = self._optres.fun rho_flat_imag = np.zeros( (rho_flat_real.size // 2,), dtype=np.complex128 ) rho_flat_imag.real = rho_flat_real[: rho_flat_real.size // 2] rho_flat_imag.imag = rho_flat_real[rho_flat_real.size // 2 :] sigma = np.linalg.norm(rho_flat_imag) denom = max( self._original_time.size - self._optres.jac.shape[0] // 2 - self._optres.jac.shape[1] // 2, 1, ) ssr = sigma / np.sqrt(float(denom)) return ssr @property def selected_samples(self) -> np.ndarray: r""" Return indices for creating the library. :raises ValueError: ValueError is raised if method `fit(X, time)` was not called. :return: Indices of the selected samples. If no compression was performed :math:`\left(c = 0\right)`, all indices are returned, else indices of the library selection scheme using QR-Decomposition with column pivoting. :rtype: np.ndarray """ if not self.fitted: raise ValueError("Nothing fitted yet. Call fit-method first!") return self._indices @property def opt_stats(self) -> OptimizeResult: """ Return optimization statistics of the Variable Projection optimization. :raises ValueError: ValueError is raised if method `fit(X, time)` was not called. :return: Optimization results including optimal weights (continuous eigenvalues) and number of iterations. :rtype: OptimizeResult """ if not self.fitted: raise ValueError("Nothing fitted yet!") return self._optres @property def dynamics(self): """ Get the time evolution of each mode. :return: matrix that contains all the time evolution, stored by row. :rtype: numpy.ndarray """ t_omega = np.exp(np.outer(self.eigs, self._original_time)) return self.amplitudes[:, None] * t_omega @property def frequency(self): """ Get the amplitude spectrum. :return: the array that contains the frequencies of the eigenvalues. :rtype: numpy.ndarray """ return self.eigs.imag / (2 * np.pi) @property def growth_rate(self): """ Get the growth rate values relative to the modes. :return: the Floquet values :rtype: numpy.ndarray """ return self.eigs.real