systems.base.core.ContinuousStochasticSystem
systems.base.core.ContinuousStochasticSystem(*args, **kwargs)Concrete symbolic continuous-time stochastic dynamical system (SDE).
Extends ContinuousSymbolicSystem to handle stochastic differential equations. Users subclass this and implement define_system() to specify both drift and diffusion terms.
Represents systems of the form: dx = f(x, u, t)dt + g(x, u, t)dW y = h(x)
where: f: Drift (deterministic part) - inherited from parent g: Diffusion (stochastic part) - added by this class dW: Brownian motion increments
Attributes (Set by User in define_system)
diffusion_expr : sp.Matrix Symbolic diffusion matrix g(x, u), shape (nx, nw) REQUIRED - must be set in define_system() sde_type : SDEType or str SDE interpretation (‘ito’ or ‘stratonovich’) Optional - defaults to Itô
Attributes (Created Automatically)
diffusion_handler : DiffusionHandler Generates and caches diffusion functions noise_characteristics : NoiseCharacteristics Automatic noise structure analysis results nw : int Number of independent Wiener processes is_stochastic : bool Always True for this class
Examples
>>> class OrnsteinUhlenbeck(ContinuousStochasticSystem):
... '''Ornstein-Uhlenbeck process with mean reversion.'''
...
... def define_system(self, alpha=1.0, sigma=0.5):
... x = sp.symbols('x')
... u = sp.symbols('u')
... alpha_sym = sp.symbols('alpha', positive=True)
... sigma_sym = sp.symbols('sigma', positive=True)
...
... # Drift (deterministic part)
... self.state_vars = [x]
... self.control_vars = [u]
... self._f_sym = sp.Matrix([[-alpha_sym * x + u]])
... self.parameters = {alpha_sym: alpha, sigma_sym: sigma}
... self.order = 1
...
... # Diffusion (stochastic part)
... self.diffusion_expr = sp.Matrix([[sigma_sym]])
... self.sde_type = 'ito'
>>>
>>> # Instantiate system
>>> system = OrnsteinUhlenbeck(alpha=2.0, sigma=0.3)
>>>
>>> # Automatic noise analysis
>>> print(system.noise_characteristics.noise_type)
NoiseType.ADDITIVE
>>>
>>> # Evaluate drift and diffusion
>>> x = np.array([1.0])
>>> u = np.array([0.0])
>>> f = system.drift(x, u) # Drift term
>>> g = system.diffusion(x, u) # Diffusion matrixAttributes
| Name | Description |
|---|---|
| diffusion_expr | Symbolic diffusion matrix g(x, u) - MUST be set in define_system() |
| is_stochastic | Return True (this is a stochastic system). |
| sde_type | SDE interpretation - can be ‘ito’ or ‘stratonovich’ (string or enum) |
Methods
| Name | Description |
|---|---|
| can_optimize_for_additive | Check if additive-noise optimizations are applicable. |
| compile_all | Compile both drift and diffusion for all backends. |
| compile_diffusion | Pre-compile diffusion functions for specified backends. |
| depends_on_control | Check if diffusion depends on control inputs. |
| depends_on_state | Check if diffusion depends on state variables. |
| depends_on_time | Check if diffusion depends on time. |
| diffusion | Evaluate diffusion term g(x, u, t) or g(x, t) for autonomous. |
| drift | Evaluate drift term f(x, u, t) or f(x, t) for autonomous. |
| get_constant_noise | Get constant noise matrix for additive noise. |
| get_info | Get comprehensive system information. |
| get_noise_type | Get classified noise type. |
| get_optimization_opportunities | Get optimization opportunities based on noise structure. |
| integrate | Integrate stochastic system using SDE solver. |
| is_additive_noise | Check if noise is additive (constant, state-independent). |
| is_diagonal_noise | Check if noise sources are independent (diagonal diffusion). |
| is_multiplicative_noise | Check if noise is multiplicative (state-dependent). |
| is_pure_diffusion | Check if system is pure diffusion (zero drift). |
| is_scalar_noise | Check if system has single noise source. |
| linearize | Compute linearization including diffusion: A = ∂f/∂x, B = ∂f/∂u, G = g(x_eq). |
| print_sde_info | Print formatted SDE system information. |
| recommend_solvers | Recommend efficient SDE solvers based on noise structure. |
| reset_all_caches | Clear both drift and diffusion caches. |
| reset_diffusion_cache | Clear cached diffusion functions. |
can_optimize_for_additive
systems.base.core.ContinuousStochasticSystem.can_optimize_for_additive()Check if additive-noise optimizations are applicable.
compile_all
systems.base.core.ContinuousStochasticSystem.compile_all(
backends=None,
verbose=False,
**kwargs,
)Compile both drift and diffusion for all backends.
Returns
| Name | Type | Description |
|---|---|---|
| Dict[str, Dict[str, float]] | Nested dict: backend → {‘drift’: time, ‘diffusion’: time} |
compile_diffusion
systems.base.core.ContinuousStochasticSystem.compile_diffusion(
backends=None,
verbose=False,
**kwargs,
)Pre-compile diffusion functions for specified backends.
depends_on_control
systems.base.core.ContinuousStochasticSystem.depends_on_control()Check if diffusion depends on control inputs.
depends_on_state
systems.base.core.ContinuousStochasticSystem.depends_on_state()Check if diffusion depends on state variables.
depends_on_time
systems.base.core.ContinuousStochasticSystem.depends_on_time()Check if diffusion depends on time.
diffusion
systems.base.core.ContinuousStochasticSystem.diffusion(
x,
u=None,
t=0.0,
backend=None,
)Evaluate diffusion term g(x, u, t) or g(x, t) for autonomous.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| x | StateVector | State vector (nx,) or batch (batch, nx) | required |
| u | Optional[ControlVector] | Control vector (nu,) or batch (batch, nu) For autonomous systems (nu=0), u can be None | None |
| t | float | Time (currently ignored) | 0.0 |
| backend | Optional[Backend] | Backend selection (None = auto-detect) | None |
Returns
| Name | Type | Description |
|---|---|---|
| ArrayLike | Diffusion matrix g(x, u), shape (nx, nw) or (batch, nx, nw) |
Examples
Controlled SDE:
>>> g = system.diffusion(np.array([2.0]), np.array([0.0]))
>>> print(g.shape)
(1, 1)Autonomous SDE:
>>> g = system.diffusion(np.array([2.0])) # u=None
>>> print(g.shape)
(1, 1)For additive noise (precompute once):
>>> if system.is_additive_noise():
... G = system.get_constant_noise() # Precompute oncedrift
systems.base.core.ContinuousStochasticSystem.drift(
x,
u=None,
t=0.0,
backend=None,
)Evaluate drift term f(x, u, t) or f(x, t) for autonomous.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| x | StateVector | State vector (nx,) or batch (batch, nx) | required |
| u | Optional[ControlVector] | Control vector (nu,) or batch (batch, nu) For autonomous systems (nu=0), u can be None | None |
| t | float | Time (currently ignored for time-invariant systems) | 0.0 |
| backend | Optional[Backend] | Backend selection (None = auto-detect) | None |
Returns
| Name | Type | Description |
|---|---|---|
| StateVector | Drift vector f(x, u), shape (nx,) or (batch, nx) |
Notes
Delegates to parent class - reuses ALL drift evaluation logic. Supports both controlled and autonomous SDEs.
Examples
Controlled SDE:
>>> f = system.drift(np.array([1.0]), np.array([0.0]))
>>> print(f)
[-1.0]Autonomous SDE:
>>> f = system.drift(np.array([1.0])) # u=None
>>> print(f)
[-2.0]get_constant_noise
systems.base.core.ContinuousStochasticSystem.get_constant_noise(backend='numpy')Get constant noise matrix for additive noise.
For additive noise, diffusion is constant and can be precomputed once for significant performance gains.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| backend | str | Backend for array type | 'numpy' |
Returns
| Name | Type | Description |
|---|---|---|
| ArrayLike | Constant diffusion matrix (nx, nw) |
Raises
| Name | Type | Description |
|---|---|---|
| ValueError | If noise is not additive |
get_info
systems.base.core.ContinuousStochasticSystem.get_info()Get comprehensive system information.
get_noise_type
systems.base.core.ContinuousStochasticSystem.get_noise_type()Get classified noise type.
get_optimization_opportunities
systems.base.core.ContinuousStochasticSystem.get_optimization_opportunities()Get optimization opportunities based on noise structure.
integrate
systems.base.core.ContinuousStochasticSystem.integrate(
x0,
u=None,
t_span=(0.0, 10.0),
method='euler_maruyama',
t_eval=None,
n_paths=1,
seed=None,
**integrator_kwargs,
)Integrate stochastic system using SDE solver.
CRITICAL OVERRIDE: This method overrides the parent’s deterministic integrate() to use SDEIntegratorFactory instead of IntegratorFactory. This ensures proper handling of Brownian motion and noise structure.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| x0 | StateVector | Initial state (nx,) | required |
| u | ControlInput | Control input (constant, callable, or None) | None |
| t_span | TimeSpan | Integration interval (t_start, t_end) | (0.0, 10.0) |
| method | str | SDE integration method (default: ‘euler_maruyama’) | 'euler_maruyama' |
| t_eval | Optional[TimePoints] | Specific times to return solution | None |
| n_paths | int | Number of Monte Carlo paths to simulate (default: 1) For n_paths > 1, performs Monte Carlo simulation | 1 |
| seed | Optional[int] | Random seed for reproducibility | None |
| **integrator_kwargs | Additional options: - dt : float (required for most SDE methods) - rtol, atol : float (adaptive methods only) - convergence_type : ConvergenceType (‘strong’ or ‘weak’) | {} |
Returns
| Name | Type | Description |
|---|---|---|
| SDEIntegrationResult | TypedDict containing: - t: Time points (T,) - x: State trajectories - Single path: (T, nx) - Multiple paths: (n_paths, T, nx) - success: Integration success - n_paths: Number of paths - noise_type: Detected noise type - sde_type: Itô or Stratonovich - nfev: Drift function evaluations - diffusion_evals: Diffusion evaluations - integration_time: Computation time |
Examples
Single trajectory:
>>> result = system.integrate(
... x0=np.array([1.0, 0.0]),
... u=None,
... t_span=(0.0, 10.0),
... method='euler_maruyama',
... dt=0.01,
... seed=42
... )Monte Carlo simulation:
>>> result = system.integrate(
... x0=x0,
... t_span=(0, 10),
... n_paths=1000,
... dt=0.01,
... seed=42
... )
>>> mean_traj = result['x'].mean(axis=0)State feedback:
>>> def controller(x, t):
... return -K @ x
>>> result = system.integrate(x0, controller, t_span=(0, 10), dt=0.01)is_additive_noise
systems.base.core.ContinuousStochasticSystem.is_additive_noise()Check if noise is additive (constant, state-independent).
is_diagonal_noise
systems.base.core.ContinuousStochasticSystem.is_diagonal_noise()Check if noise sources are independent (diagonal diffusion).
is_multiplicative_noise
systems.base.core.ContinuousStochasticSystem.is_multiplicative_noise()Check if noise is multiplicative (state-dependent).
is_pure_diffusion
systems.base.core.ContinuousStochasticSystem.is_pure_diffusion()Check if system is pure diffusion (zero drift).
is_scalar_noise
systems.base.core.ContinuousStochasticSystem.is_scalar_noise()Check if system has single noise source.
linearize
systems.base.core.ContinuousStochasticSystem.linearize(x_eq, u_eq=None)Compute linearization including diffusion: A = ∂f/∂x, B = ∂f/∂u, G = g(x_eq).
For stochastic systems, linearization returns three matrices: - A: State Jacobian ∂f/∂x - B: Control Jacobian ∂f/∂u - G: Diffusion matrix evaluated at equilibrium g(x_eq, u_eq)
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| x_eq | StateVector | Equilibrium state (nx,) | required |
| u_eq | Optional[ControlVector] | Equilibrium control (nu,) | None |
Returns
| Name | Type | Description |
|---|---|---|
| Tuple[ArrayLike, ArrayLike, ArrayLike] | (A, B, G) where: - A: State Jacobian (nx, nx) - B: Control Jacobian (nx, nu) - G: Diffusion matrix (nx, nw) |
Examples
>>> x_eq = np.zeros(2)
>>> u_eq = np.zeros(1)
>>> A, B, G = system.linearize(x_eq, u_eq)
>>>
>>> # Check continuous stability: Re(λ) < 0
>>> eigenvalues = np.linalg.eigvals(A)
>>> is_stable = np.all(np.real(eigenvalues) < 0)print_sde_info
systems.base.core.ContinuousStochasticSystem.print_sde_info()Print formatted SDE system information.
recommend_solvers
systems.base.core.ContinuousStochasticSystem.recommend_solvers(backend='jax')Recommend efficient SDE solvers based on noise structure.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| backend | str | Integration backend (‘jax’, ‘torch’, ‘numpy’) | 'jax' |
Returns
| Name | Type | Description |
|---|---|---|
| List[str] | Recommended solver names, ordered by efficiency/accuracy |
Examples
>>> solvers = system.recommend_solvers('jax')
>>> print(solvers)
['sea', 'shark', 'sra1'] # For additive noisereset_all_caches
systems.base.core.ContinuousStochasticSystem.reset_all_caches(backends=None)Clear both drift and diffusion caches.
reset_diffusion_cache
systems.base.core.ContinuousStochasticSystem.reset_diffusion_cache(
backends=None,
)Clear cached diffusion functions.