Source code for pydmd.utils

"""Utilities module."""

import warnings
from numbers import Number
from typing import NamedTuple
from collections import namedtuple
import numpy as np
from numpy.lib.stride_tricks import sliding_window_view

#  Named tuples used in functions.
#  compute_svd uses "SVD",
#  compute_tlsq uses "TLSQ",
#  compute_rqb uses "RQB".
SVD = namedtuple("SVD", ["U", "s", "V"])
TLSQ = namedtuple("TLSQ", ["X_denoised", "Y_denoised"])
RQB = namedtuple("RQB", ["Q", "B", "test_matrix"])


def _svht(sigma_svd: np.ndarray, rows: int, cols: int) -> int:
    """
    Singular Value Hard Threshold.

    :param sigma_svd: Singual values computed by SVD
    :type sigma_svd: np.ndarray
    :param rows: Number of rows of original data matrix.
    :type rows: int
    :param cols: Number of columns of original data matrix.
    :type cols: int
    :return: Computed rank.
    :rtype: int

    References:
    Gavish, Matan, and David L. Donoho, The optimal hard threshold for
    singular values is, IEEE Transactions on Information Theory 60.8
    (2014): 5040-5053.
    https://ieeexplore.ieee.org/document/6846297
    """
    beta = np.divide(*sorted((rows, cols)))
    omega = 0.56 * beta**3 - 0.95 * beta**2 + 1.82 * beta + 1.43
    tau = np.median(sigma_svd) * omega
    rank = np.sum(sigma_svd > tau)

    if rank == 0:
        warnings.warn(
            "SVD optimal rank is 0. The largest singular values are "
            "indistinguishable from noise. Setting rank truncation to 1.",
            RuntimeWarning,
        )
        rank = 1

    return rank


def _compute_rank(
    sigma_svd: np.ndarray, rows: int, cols: int, svd_rank: Number
) -> int:
    """
    Rank computation for the truncated Singular Value Decomposition.

    :param sigma_svd: 1D singular values of SVD.
    :type sigma_svd: np.ndarray
    :param rows: Number of rows of original matrix.
    :type rows: int
    :param cols: Number of columns of original matrix.
    :type cols: int
    :param svd_rank: the rank for the truncation; If 0, the method computes
        the optimal rank and uses it for truncation; if positive interger,
        the method uses the argument for the truncation; if float between 0
        and 1, the rank is the number of the biggest singular values that
        are needed to reach the 'energy' specified by `svd_rank`; if -1,
        the method does not compute truncation. Default is 0.
    :type svd_rank: int or float
    :return: the computed rank truncation.
    :rtype: int

    References:
    Gavish, Matan, and David L. Donoho, The optimal hard threshold for
    singular values is, IEEE Transactions on Information Theory 60.8
    (2014): 5040-5053.
    """
    if svd_rank == 0:
        rank = _svht(sigma_svd, rows, cols)
    elif 0 < svd_rank < 1:
        cumulative_energy = np.cumsum(sigma_svd**2 / (sigma_svd**2).sum())
        rank = np.searchsorted(cumulative_energy, svd_rank) + 1
    elif svd_rank >= 1 and isinstance(svd_rank, int):
        rank = min(svd_rank, sigma_svd.size)
    else:
        rank = min(rows, cols)

    return rank


[docs]def compute_rank(X: np.ndarray, svd_rank: Number = 0) -> int: """ Rank computation for the truncated Singular Value Decomposition. :param X: the matrix to decompose. :type X: np.ndarray :param svd_rank: the rank for the truncation; If 0, the method computes the optimal rank and uses it for truncation; if positive interger, the method uses the argument for the truncation; if float between 0 and 1, the rank is the number of the biggest singular values that are needed to reach the 'energy' specified by `svd_rank`; if -1, the method does not compute truncation. Default is 0. :type svd_rank: int or float :return: the computed rank truncation. :rtype: int References: Gavish, Matan, and David L. Donoho, The optimal hard threshold for singular values is, IEEE Transactions on Information Theory 60.8 (2014): 5040-5053. """ _, s, _ = np.linalg.svd(X, full_matrices=False) return _compute_rank(s, X.shape[0], X.shape[1], svd_rank)
[docs]def compute_tlsq( X: np.ndarray, Y: np.ndarray, tlsq_rank: int ) -> NamedTuple( "TLSQ", [("X_denoised", np.ndarray), ("Y_denoised", np.ndarray)] ): """ Compute Total Least Square. :param X: the first matrix; :type X: np.ndarray :param Y: the second matrix; :type Y: np.ndarray :param tlsq_rank: the rank for the truncation; If 0, the method does not compute any noise reduction; if positive number, the method uses the argument for the SVD truncation used in the TLSQ method. :type tlsq_rank: int :return: the denoised matrix X, the denoised matrix Y :rtype: NamedTuple("TLSQ", [('X_denoised', np.ndarray), ('Y_denoised', np.ndarray)]) References: https://arxiv.org/pdf/1703.11004.pdf https://arxiv.org/pdf/1502.03854.pdf """ # Do not perform tlsq if tlsq_rank == 0: return X, Y V = np.linalg.svd(np.append(X, Y, axis=0), full_matrices=False)[-1] rank = min(tlsq_rank, V.shape[0]) VV = V[:rank, :].conj().T.dot(V[:rank, :]) return TLSQ(X.dot(VV), Y.dot(VV))
[docs]def compute_svd( X: np.ndarray, svd_rank: Number = 0 ) -> NamedTuple( "SVD", [("U", np.ndarray), ("s", np.ndarray), ("V", np.ndarray)] ): """ Truncated Singular Value Decomposition. :param X: the matrix to decompose. :type X: np.ndarray :param svd_rank: the rank for the truncation; If 0, the method computes the optimal rank and uses it for truncation; if positive interger, the method uses the argument for the truncation; if float between 0 and 1, the rank is the number of the biggest singular values that are needed to reach the 'energy' specified by `svd_rank`; if -1, the method does not compute truncation. Default is 0. :type svd_rank: int or float :return: the truncated left-singular vectors matrix, the truncated singular values array, the truncated right-singular vectors matrix. :rtype: NamedTuple("SVD", [('U', np.ndarray), ('s', np.ndarray), ('V', np.ndarray)]) References: Gavish, Matan, and David L. Donoho, The optimal hard threshold for singular values is, IEEE Transactions on Information Theory 60.8 (2014): 5040-5053. """ U, s, V = np.linalg.svd(X, full_matrices=False) rank = _compute_rank(s, X.shape[0], X.shape[1], svd_rank) V = V.conj().T U = U[:, :rank] V = V[:, :rank] s = s[:rank] return SVD(U, s, V)
def compute_rqb( X: np.ndarray, svd_rank: Number, oversampling: int, power_iters: int, test_matrix: np.ndarray = None, seed: int = None, ) -> NamedTuple( "RQB", [("Q", np.ndarray), ("B", np.ndarray), ("test_matrix", np.ndarray)] ): """ Randomized QB Decomposition. :param X: the matrix to decompose. :type X: np.ndarray :param svd_rank: the rank for the truncation; If 0, the method computes the optimal rank and uses it for truncation; if positive interger, the method uses the argument for the truncation; if float between 0 and 1, the rank is the number of the biggest singular values that are needed to reach the 'energy' specified by `svd_rank`; if -1, the method does not compute truncation. Use this parameter to define the target rank of the input matrix. :type svd_rank: int or float :param oversampling: Number of additional samples (beyond the target rank) to use when computing the random test matrix. Note that values in the range [5, 10] tend to be sufficient. :type oversampling: int :param power_iters: Number of power iterations to perform when executing the Randomized QB Decomposition. Note that as many as 1 to 2 power iterations often lead to considerable improvements. :type power_iters: int :param test_matrix: The random test matrix that will be used when executing the Randomized QB Decomposition. If not provided, the `svd_rank` and `oversampling` parameters will be used to compute the random matrix. :type test_matrix: numpy.ndarray :param seed: Seed used to initialize the random generator when computing random test matrices. :type seed: int :return: the orthonormal basis matrix, the transformed data matrix, and the random test matrix. :rtype: NamedTuple("RQB", [('Q', np.ndarray), ('B', np.ndarray), ('test_matrix', np.ndarray)]) References: N. Benjamin Erichson, Lionel Mathelin, J. Nathan Kutz, Steven L. Brunton. Randomized dynamic mode decomposition. SIAM Journal on Applied Dynamical Systems, 18, 2019. """ if X.ndim != 2: raise ValueError("Please ensure that input data is a 2D array.") # Define the random test matrix if not provided. if test_matrix is None: m = X.shape[-1] r = compute_rank(X, svd_rank) rng = np.random.default_rng(seed) test_matrix = rng.standard_normal((m, r + oversampling)) # Compute sampling matrix. Y = X.dot(test_matrix) # Perform power iterations. for _ in range(power_iters): Q = np.linalg.qr(Y)[0] Z = np.linalg.qr(X.conj().T.dot(Q))[0] Y = X.dot(Z) # Orthonormalize the sampling matrix. Q = np.linalg.qr(Y)[0] # Project the snapshot matrix onto the smaller space. B = Q.conj().T.dot(X) return RQB(Q, B, test_matrix)
[docs]def pseudo_hankel_matrix(X: np.ndarray, d: int) -> np.ndarray: """ Arrange the snapshot in the matrix `X` into the (pseudo) Hankel matrix. The attribute `d` controls the number of snapshot from `X` in each snapshot of the Hankel matrix. :Example: >>> a = np.array([[1, 2, 3, 4, 5]]) >>> pseudo_hankel_matrix(a, d=2) array([[1, 2, 3, 4], [2, 3, 4, 5]]) >>> pseudo_hankel_matrix(a, d=4) array([[1, 2], [2, 3], [3, 4], [4, 5]]) >>> a = np.array([1,2,3,4,5,6]).reshape(2,3) >>> print(a) [[1 2 3] [4 5 6]] >>> pseudo_hankel_matrix(a, d=2) array([[1, 2], [4, 5], [2, 3], [5, 6]]) """ return ( sliding_window_view(X.T, (d, X.shape[0]))[:, 0] .reshape(X.shape[1] - d + 1, -1) .T )
def differentiate(X, dt): """ Method for performing 2nd order finite difference. Assumes that the input matrix X is 2D, with uniformly-sampled snapshots filling each column. Requires dt, which is the time step between each snapshot. """ if not isinstance(X, np.ndarray) or X.ndim > 2: raise ValueError("Please ensure that input data is a 1D or 2D array.") if X.ndim == 1: X = X[None] X_prime = np.empty(X.shape) X_prime[:, 1:-1] = (X[:, 2:] - X[:, :-2]) / (2 * dt) X_prime[:, 0] = (-3 * X[:, 0] + 4 * X[:, 1] - X[:, 2]) / (2 * dt) X_prime[:, -1] = (3 * X[:, -1] - 4 * X[:, -2] + X[:, -3]) / (2 * dt) return np.squeeze(X_prime)