"""
Derived module from dmdbase.py for dmd with control.
Reference:
- Proctor, J.L., Brunton, S.L. and Kutz, J.N., 2016. Dynamic mode decomposition
with control. SIAM Journal on Applied Dynamical Systems, 15(1), pp.142-161.
"""
import numpy as np
from .dmdbase import DMDBase
from .dmdoperator import DMDOperator
from .snapshots import Snapshots
from .utils import compute_svd, compute_tlsq
class DMDControlOperator(DMDOperator):
"""
DMD with control base operator. This should be subclassed in order to
implement the appropriate features.
:param svd_rank: the rank for the truncation; If 0, the method computes the
optimal rank and uses it for truncation; if positive interger, the
method uses the argument for the truncation; if float between 0 and 1,
the rank is the number of the biggest singular values that are needed
to reach the 'energy' specified by `svd_rank`; if -1, the method does
not compute truncation.
:type svd_rank: int or float
:param svd_rank_omega: the rank for the truncation of the aumented matrix
omega composed by the left snapshots matrix and the control. Used only
for the `_fit_B_unknown` method of this class. It should be greater or
equal than `svd_rank`. For the possible values please refer to the
`svd_rank` parameter description above.
:type svd_rank_omega: int or float
:param int tlsq_rank: rank truncation computing Total Least Square. Default
is 0, that means no truncation.
"""
def __init__(self, svd_rank, svd_rank_omega, tlsq_rank):
super(DMDControlOperator, self).__init__(
svd_rank=svd_rank,
exact=True,
rescale_mode=None,
forward_backward=False,
sorted_eigs=False,
tikhonov_regularization=None,
)
self._svd_rank_omega = svd_rank_omega
self._tlsq_rank = tlsq_rank
class DMDBKnownOperator(DMDControlOperator):
"""
DMD with control base operator when B is given.
:param svd_rank: the rank for the truncation; If 0, the method computes the
optimal rank and uses it for truncation; if positive interger, the
method uses the argument for the truncation; if float between 0 and 1,
the rank is the number of the biggest singular values that are needed
to reach the 'energy' specified by `svd_rank`; if -1, the method does
not compute truncation.
:type svd_rank: int or float
:param svd_rank_omega: the rank for the truncation of the aumented matrix
omega composed by the left snapshots matrix and the control. Used only
for the `_fit_B_unknown` method of this class. It should be greater or
equal than `svd_rank`. For the possible values please refer to the
`svd_rank` parameter description above.
:type svd_rank_omega: int or float
:param int tlsq_rank: rank truncation computing Total Least Square. Default
is 0, that means no truncation.
"""
def compute_operator(self, X, Y, B, controlin):
"""
Compute the low-rank operator. This is the standard version of the DMD
operator, with a correction which depends on B.
:param numpy.ndarray X: matrix containing the snapshots x0,..x{n-1} by
column.
:param numpy.ndarray Y: matrix containing the snapshots x1,..x{n} by
column.
:param numpy.ndarray B: the matrix B.
:param numpy.ndarray control: the control input.
:return: the (truncated) left-singular vectors matrix, the (truncated)
singular values array, the (truncated) right-singular vectors
matrix of X.
:rtype: numpy.ndarray, numpy.ndarray, numpy.ndarray
"""
X, Y = compute_tlsq(X, Y, self._tlsq_rank)
Y = Y - B.dot(controlin)
return super(DMDBKnownOperator, self).compute_operator(X, Y)
class DMDBUnknownOperator(DMDControlOperator):
"""
DMD with control base operator when B is unknown.
:param svd_rank: the rank for the truncation; If 0, the method computes the
optimal rank and uses it for truncation; if positive interger, the
method uses the argument for the truncation; if float between 0 and 1,
the rank is the number of the biggest singular values that are needed
to reach the 'energy' specified by `svd_rank`; if -1, the method does
not compute truncation.
:type svd_rank: int or float
:param svd_rank_omega: the rank for the truncation of the aumented matrix
omega composed by the left snapshots matrix and the control. Used only
for the `_fit_B_unknown` method of this class. It should be greater or
equal than `svd_rank`. For the possible values please refer to the
`svd_rank` parameter description above.
:type svd_rank_omega: int or float
:param int tlsq_rank: rank truncation computing Total Least Square. Default
is 0, that means no truncation.
"""
def compute_operator(self, X, Y, controlin):
"""
Compute the low-rank operator.
:param numpy.ndarray X: matrix containing the snapshots x0,..x{n-1} by
column.
:param numpy.ndarray Y: matrix containing the snapshots x1,..x{n} by
column.
:param numpy.ndarray control: the control input.
:return: the (truncated) left-singular vectors matrix of Y, and
the product between the left-singular vectors of Y and Btilde.
:rtype: numpy.ndarray, numpy.ndarray
"""
snapshots_rows = X.shape[0]
omega = np.vstack([X, controlin])
Up, sp, Vp = compute_svd(omega, self._svd_rank_omega)
Up1 = Up[:snapshots_rows, :]
Up2 = Up[snapshots_rows:, :]
Ur, _, _ = compute_svd(Y, self._svd_rank)
self._Atilde = np.linalg.multi_dot(
[Ur.T.conj(), Y, Vp, np.diag(np.reciprocal(sp)), Up1.T.conj(), Ur]
)
self._compute_eigenquantities()
self._compute_modes(Y, sp, Vp, Up1, Ur)
Btilde = np.linalg.multi_dot(
[Ur.T.conj(), Y, Vp, np.diag(np.reciprocal(sp)), Up2.T.conj()]
)
return Ur, Ur.dot(Btilde)
def _compute_modes(self, Y, sp, Vp, Up1, Ur):
"""
Private method that computes eigenvalues and eigenvectors of the
high-dimensional operator (stored in self.modes and self.Lambda).
"""
self._modes = np.linalg.multi_dot(
[
Y,
Vp,
np.diag(np.reciprocal(sp)),
Up1.T.conj(),
Ur,
self.eigenvectors,
]
)
self._Lambda = self.eigenvalues
[docs]class DMDc(DMDBase):
"""
Dynamic Mode Decomposition with control.
This version does not allow to manipulate the temporal window within the
system is reconstructed.
:param svd_rank: the rank for the truncation; If 0, the method computes the
optimal rank and uses it for truncation; if positive interger, the
method uses the argument for the truncation; if float between 0 and 1,
the rank is the number of the biggest singular values that are needed
to reach the 'energy' specified by `svd_rank`; if -1, the method does
not compute truncation.
:type svd_rank: int or float
:param int tlsq_rank: rank truncation computing Total Least Square. Default
is 0, that means no truncation.
:param opt: argument to control the computation of DMD modes amplitudes.
See :class:`DMDBase`. Default is False.
:type opt: bool or int
:param svd_rank_omega: the rank for the truncation of the aumented matrix
omega composed by the left snapshots matrix and the control. Used only
for the `_fit_B_unknown` method of this class. It should be greater or
equal than `svd_rank`. For the possible values please refer to the
`svd_rank` parameter description above.
:type svd_rank_omega: int or float
:param lag: the time lag for the snapshots. Used in fit method to generate
X and Y.
:type lag: int
"""
def __init__(
self, svd_rank=0, tlsq_rank=0, opt=False, svd_rank_omega=-1, lag=1
):
# we're going to initialize Atilde when we know if B is known
self._Atilde = None
# remember the arguments for when we'll need them
self._dmd_operator_kwargs = {
"svd_rank": svd_rank,
"svd_rank_omega": svd_rank_omega,
"tlsq_rank": tlsq_rank,
}
self._opt = opt
self._exact = False
self._B = None
self._snapshots_holder = None
self._controlin = None
self._basis = None
self._lag = lag
self._modes_activation_bitmask_proxy = None
@property
def svd_rank_omega(self):
return self.operator._svd_rank_omega
@property
def B(self):
"""
Get the operator B.
:return: the operator B.
:rtype: numpy.ndarray
"""
return self._B
@property
def basis(self):
"""
Get the basis used to reduce the linear operator to the low dimensional
space.
:return: the matrix which columns are the basis vectors.
:rtype: numpy.ndarray
"""
return self._basis
[docs] def reconstructed_data(self, control_input=None):
"""
Return the reconstructed data, computed using the `control_input`
argument. If the `control_input` is not passed, the original input (in
the `fit` method) is used. The input dimension has to be consistent
with the dynamics.
:param numpy.ndarray control_input: the input control matrix.
:return: the matrix that contains the reconstructed snapshots.
:rtype: numpy.ndarray
"""
controlin = (
np.asarray(control_input)
if control_input is not None
else self._controlin
)
if controlin.shape[-1] != self.dynamics.shape[-1] - self._lag:
raise RuntimeError(
"The number of control inputs and the number of snapshots to "
"reconstruct has to be the same"
)
eigs = np.power(
self.eigs, self.dmd_time["dt"] // self.original_time["dt"]
)
A = np.linalg.multi_dot(
[self.modes, np.diag(eigs), np.linalg.pinv(self.modes)]
)
data = [self.snapshots[:, i] for i in range(self._lag)]
expected_shape = data[0].shape
for i, u in enumerate(controlin.T):
arr = A.dot(data[i]) + self._B.dot(u)
if arr.shape != expected_shape:
raise ValueError(
f"Invalid shape: expected {expected_shape}, got {arr.shape}"
)
data.append(arr)
data = np.array(data).T
return data
[docs] def fit(self, X, I, B=None):
"""
Compute the Dynamic Modes Decomposition with control given the original
snapshots and the control input data. The matrix `B` that controls how
the control input influences the system evolution can be provided by
the user; otherwise, it is computed by the algorithm.
:param X: the input snapshots.
:type X: numpy.ndarray or iterable
:param I: the control input.
:type I: numpy.ndarray or iterable
:param numpy.ndarray B: matrix that controls the control input
influences the system evolution.
:type B: numpy.ndarray or iterable
"""
self._reset()
self._controlin = np.atleast_2d(np.asarray(I))
self._snapshots_holder = Snapshots(X)
n_samples = self.snapshots.shape[-1]
if self._lag < 1:
raise ValueError("Time lag must be positive.")
X = self.snapshots[:, : -self._lag]
Y = self.snapshots[:, self._lag :]
self._set_initial_time_dictionary(
{"t0": 0, "tend": n_samples - 1, "dt": 1}
)
if B is None:
self._Atilde = DMDBUnknownOperator(**self._dmd_operator_kwargs)
self._basis, self._B = self.operator.compute_operator(
X, Y, self._controlin
)
else:
self._Atilde = DMDBKnownOperator(**self._dmd_operator_kwargs)
U, _, _ = self.operator.compute_operator(X, Y, B, self._controlin)
self._basis = U
self._B = B
self._b = self._compute_amplitudes()
return self