"""
Derived module from dmdbase.py for hankel dmd.
Reference:
- H. Arbabi, I. Mezic, Ergodic theory, dynamic mode decomposition, and
computation of spectral properties of the Koopman operator. SIAM Journal on
Applied Dynamical Systems, 2017, 16.4: 2096-2126.
"""
from copy import copy
from numbers import Number
import numpy as np
from .dmd import DMD
from .dmdbase import DMDBase
from .preprocessing.hankel import hankel_preprocessing
from .snapshots import Snapshots
[docs]class HankelDMD(DMDBase):
"""
Hankel Dynamic Mode Decomposition
:param svd_rank: the rank for the truncation; If 0, the method computes the
optimal rank and uses it for truncation; if positive interger, the
method uses the argument for the truncation; if float between 0 and 1,
the rank is the number of the biggest singular values that are needed
to reach the 'energy' specified by `svd_rank`; if -1, the method does
not compute truncation.
:type svd_rank: int or float
:param int tlsq_rank: rank truncation computing Total Least Square. Default
is 0, that means no truncation.
:param bool exact: flag to compute either exact DMD or projected DMD.
Default is False.
:param opt: argument to control the computation of DMD modes amplitudes.
See :class:`DMDBase`. Default is False.
:type opt: bool or int
:param rescale_mode: Scale Atilde as shown in
10.1016/j.jneumeth.2015.10.010 (section 2.4) before computing its
eigendecomposition. None means no rescaling, 'auto' means automatic
rescaling using singular values, otherwise the scaling factors.
:type rescale_mode: {'auto'} or None or numpy.ndarray
:param bool forward_backward: If True, the low-rank operator is computed
like in fbDMD (reference: https://arxiv.org/abs/1507.02264). Default is
False.
:param int d: the new order for spatial dimension of the input snapshots.
Default is 1.
:param sorted_eigs: Sort eigenvalues (and modes/dynamics accordingly) by
magnitude if `sorted_eigs='abs'`, by real part (and then by imaginary
part to break ties) if `sorted_eigs='real'`. Default: False.
:type sorted_eigs: {'real', 'abs'} or False
:param reconstruction_method: Method used to reconstruct the snapshots of
the dynamical system from the multiple versions available due to how
HankelDMD is conceived. If `'first'` (default) the first version
available is selected (i.e. the nearest to the 0-th row in the
augmented matrix). If `'mean'` we compute the element-wise mean. If
`reconstruction_method` is an array of float values we compute the
weighted average (for each snapshots) using the given values as weights
(the number of weights must be equal to `d`).
:type reconstruction_method: {'first', 'mean'} or array-like
"""
def __init__(
self,
svd_rank=0,
tlsq_rank=0,
exact=False,
opt=False,
rescale_mode=None,
forward_backward=False,
d=1,
sorted_eigs=False,
reconstruction_method="first",
tikhonov_regularization=None,
):
super().__init__(
svd_rank=svd_rank,
tlsq_rank=tlsq_rank,
exact=exact,
opt=opt,
rescale_mode=rescale_mode,
sorted_eigs=sorted_eigs,
)
self._d = d
self._ho_snapshots = None
if isinstance(reconstruction_method, list):
if len(reconstruction_method) != d:
raise ValueError(
"The length of the array of weights must be equal to d"
)
elif isinstance(reconstruction_method, np.ndarray):
if (
reconstruction_method.ndim > 1
or reconstruction_method.shape[0] != d
):
raise ValueError(
"The length of the array of weights must be equal to d"
)
self._reconstruction_method = reconstruction_method
sub_dmd = DMD(
svd_rank=svd_rank,
tlsq_rank=tlsq_rank,
exact=exact,
opt=opt,
rescale_mode=rescale_mode,
forward_backward=forward_backward,
sorted_eigs=sorted_eigs,
tikhonov_regularization=tikhonov_regularization,
)
self._sub_dmd = hankel_preprocessing(
sub_dmd, d=d, reconstruction_method=reconstruction_method
)
[docs] def _hankel_first_occurrence(self, time):
r"""
For a given `t` such that there is :math:`k \in \mathbb{N}` such that
:math:`t = t_0 + k dt`, return the index of the first column in Hankel
pseudo matrix (see also :func:`_pseudo_hankel_matrix`) which contains
the snapshot corresponding to `t`.
:param time: The time corresponding to the requested snapshot.
:return: The index of the first appeareance of `time` in the columns of
Hankel pseudo matrix.
:rtype: int
"""
assert isinstance(time, Number) or np.asarray(time).ndim == 1
return max(
0,
(time - self.original_time["t0"]) // self.dmd_time["dt"]
- (self.original_time["t0"] + self._d - 1),
)
[docs] def _update_sub_dmd_time(self):
"""
Update the time dictionaries (`dmd_time` and `original_time`) of
the auxiliary DMD instance `HankelDMD._sub_dmd` after an update of the
time dictionaries of the time dictionaries of this instance of the
higher level instance of `HankelDMD`.
"""
self._sub_dmd.dmd_time["t0"] = self._hankel_first_occurrence(
self.dmd_time["t0"]
)
self._sub_dmd.dmd_time["tend"] = self._hankel_first_occurrence(
self.dmd_time["tend"]
)
@property
def reconstructed_data(self):
self._update_sub_dmd_time()
# we want to return only the requested timesteps
time_index = min(
self._d - 1,
int(
(self.dmd_time["t0"] - self.original_time["t0"])
// self.dmd_time["dt"]
),
)
return self._sub_dmd.reconstructed_data[
:, time_index : time_index + len(self.dmd_timesteps)
]
@property
def modes(self):
return self._sub_dmd.modes
@property
def eigs(self):
return self._sub_dmd.eigs
@property
def amplitudes(self):
return self._sub_dmd.amplitudes
@property
def operator(self):
return self._sub_dmd.operator
@property
def svd_rank(self):
return self._sub_dmd.svd_rank
@property
def ho_snapshots(self):
"""
Get the time-delay data matrix.
:return: the matrix that contains the time-delayed data.
:rtype: numpy.ndarray
"""
return self._sub_dmd.snapshots
@property
def modes_activation_bitmask(self):
return self._sub_dmd.modes_activation_bitmask
@modes_activation_bitmask.setter
def modes_activation_bitmask(self, value):
self._sub_dmd.modes_activation_bitmask = value
def __getitem__(self, key):
# The implementation was asking for problems...
raise ValueError("This operation is not allowed for HankelDMD")
[docs] def fit(self, X):
"""
Compute the Dynamic Modes Decomposition to the input data.
:param X: the input snapshots.
:type X: numpy.ndarray or iterable
"""
self._reset()
self._snapshots_holder = Snapshots(X)
n_samples = self.snapshots.shape[-1]
if n_samples < self._d:
msg = """The number of snapshots provided is not enough for d={}.
Expected at least d."""
raise ValueError(msg.format(self._d))
self._sub_dmd.fit(X)
# Default timesteps
self._set_initial_time_dictionary(
{"t0": 0, "tend": n_samples - 1, "dt": 1}
)
return self