systems.base.numerical_integration.stochastic.SDEIntegratorBase

systems.base.numerical_integration.stochastic.SDEIntegratorBase(
    sde_system,
    dt=None,
    step_mode=StepMode.FIXED,
    backend='numpy',
    sde_type=None,
    convergence_type=ConvergenceType.STRONG,
    seed=None,
    **options,
)

Abstract base class for SDE integrators.

Extends IntegratorBase to handle stochastic differential equations with drift and diffusion terms. Provides unified interface for both Ito and Stratonovich SDEs across multiple backends.

All SDE integrators must implement: - step(): Single integration step with noise - integrate(): Multi-step integration with trajectories - name: Integrator name

Additional SDE-specific capabilities: - Multiple trajectory simulation (Monte Carlo) - Noise structure exploitation (additive, diagonal, etc.) - Weak and strong convergence schemes - Stratonovich correction terms

Attributes

Name Type Description
sde_system StochasticDynamicalSystem SDE system to integrate
sde_type SDEType SDE interpretation (Ito or Stratonovich)
convergence_type ConvergenceType Convergence criterion (strong or weak)
seed Optional[int] Random seed for reproducibility

Result Types

Returns SDEIntegrationResult TypedDict with: - t: Time points (T,) - x: State trajectory (T, nx) or (n_paths, T, nx) - diffusion_evals: Number of diffusion evaluations - noise_samples: Brownian increments used - n_paths: Number of trajectories - convergence_type: ‘strong’ or ‘weak’ - sde_type: ‘ito’ or ‘stratonovich’

Examples

>>> # Create SDE integrator
>>> integrator = EulerMaruyamaIntegrator(
...     sde_system,
...     dt=0.01,
...     backend='numpy',
...     seed=42
... )
>>>
>>> # Single trajectory
>>> result = integrator.integrate(
...     x0=np.array([1.0, 0.0]),
...     u_func=lambda t, x: np.zeros(1),
...     t_span=(0.0, 10.0)
... )
>>> print(f"State shape: {result['x'].shape}")
>>>
>>> # Multiple trajectories (Monte Carlo)
>>> result = integrator.integrate_monte_carlo(
...     x0=np.array([1.0, 0.0]),
...     u_func=lambda t, x: np.zeros(1),
...     t_span=(0.0, 10.0),
...     n_paths=1000
... )
>>> stats = get_trajectory_statistics(result)
>>> print(f"Mean trajectory: {stats['mean']}")
>>> print(f"Standard deviation: {stats['std']}")
>>>
>>> # Autonomous system
>>> result = integrator.integrate(
...     x0=np.array([1.0, 0.0]),
...     u_func=lambda t, x: None,
...     t_span=(0.0, 10.0)
... )

Methods

Name Description
get_noise_info Get information about noise structure and optimizations.
get_sde_stats Get SDE-specific integration statistics.
integrate Integrate SDE over time interval.
integrate_monte_carlo Integrate multiple SDE trajectories for Monte Carlo analysis.
reset_stats Reset all statistics including SDE-specific counters.
set_seed Set random seed for reproducibility.
step Take one SDE integration step with noise.

get_noise_info

systems.base.numerical_integration.stochastic.SDEIntegratorBase.get_noise_info()

Get information about noise structure and optimizations.

Returns

Name Type Description
Dict[str, Any] Noise structure information

get_sde_stats

systems.base.numerical_integration.stochastic.SDEIntegratorBase.get_sde_stats()

Get SDE-specific integration statistics.

Returns

Name Type Description
Dict[str, Any] Statistics including drift/diffusion evaluations

integrate

systems.base.numerical_integration.stochastic.SDEIntegratorBase.integrate(
    x0,
    u_func,
    t_span,
    t_eval=None,
    dense_output=False,
)

Integrate SDE over time interval.

API Level: This is a low-level stochastic integration method that directly interfaces with numerical SDE solvers. For typical use cases, prefer the high-level simulate() method if available in your stochastic system class.

Control Function Convention: This method uses the scipy/ODE solver convention where control functions have signature (t, x) → u, with time as the FIRST argument. This differs from high-level simulation APIs which use (x, t) → u with state as the primary argument. The difference is intentional:

  • Low-level integrate(): Uses (t, x) for direct solver compatibility
  • High-level simulate(): Uses (x, t) for intuitive control-theoretic API

Stochastic Integration: Unlike deterministic ODE integration, SDE integration involves random Brownian motion increments. Each call with the same initial condition will produce different trajectories unless the random seed is fixed. For statistical analysis, use integrate_monte_carlo() to simulate multiple paths.

Parameters

Name Type Description Default
x0 ArrayLike Initial state (nx,) required
u_func Callable[[float, ArrayLike], Optional[ArrayLike]] Control policy with low-level convention: (t, x) → u - t: float - current time (FIRST argument, scipy convention) - x: ArrayLike - current state (SECOND argument) - Returns: Optional[ArrayLike] - control input u, or None for autonomous Can be: - Autonomous: lambda t, x: None - Constant control: lambda t, x: u_const - State feedback: lambda t, x: -K @ x - Time-varying: lambda t, x: u(t) - Stochastic policy: lambda t, x: policy(x) + noise required
t_span Tuple[float, float] Integration interval (t_start, t_end) required
t_eval Optional[ArrayLike] Specific times at which to store solution If None, uses solver’s internal time points (typically uniform grid) For SDEs, irregular grids may affect statistical properties None
dense_output bool If True, return dense interpolated solution (if supported by solver) Note: Most SDE solvers do not support dense output False

Returns

Name Type Description
SDEIntegrationResult TypedDict containing: - t: Time points (T,) - x: State trajectory (T, nx) - time-major ordering - success: Whether integration succeeded - message: Status message - nfev: Number of drift function evaluations - diffusion_evals: Number of diffusion function evaluations - noise_samples: Number of Brownian motion samples generated - nsteps: Number of integration steps taken - integration_time: Computation time (seconds) - solver: Integrator name - convergence_type: ‘strong’ or ‘weak’ convergence - sde_type: ‘ito’ or ‘stratonovich’ interpretation - n_paths: Number of trajectories (1 for single path)

Raises

Name Type Description
RuntimeError If integration fails (numerical instability, step size issues, etc.)

Notes

Stochastic Nature: Each call generates a new random trajectory. For reproducible results, set the integrator’s random seed via set_seed() before calling integrate().

Convergence Types: - Strong convergence: Pathwise accuracy - each trajectory is accurate - Weak convergence: Moment accuracy - statistics (mean, variance) are accurate

SDE Interpretations: - Itô: Most common, natural for stochastic calculus - Stratonovich: Physics-based, matches ordinary calculus rules

Time-Major Ordering: Unlike some high-level APIs that use (nx, T) for backward compatibility, integrate() returns (T, nx) time-major ordering for consistency with numerical solver conventions and efficient time-series operations.

Examples

Low-level integrate() usage (uses (t, x) convention):

Autonomous stochastic system:

>>> result = integrator.integrate(
...     x0=np.array([1.0, 0.0]),
...     u_func=lambda t, x: None,  # Autonomous
...     t_span=(0.0, 10.0)
... )
>>> print(f"Final state: {result['x'][-1]}")
>>> print(f"Convergence: {result['convergence_type']}")

Controlled stochastic system with constant control:

>>> result = integrator.integrate(
...     x0=np.array([1.0, 0.0]),
...     u_func=lambda t, x: np.array([0.5]),  # Note: (t, x) order
...     t_span=(0.0, 10.0)
... )
>>> print(f"Diffusion evaluations: {result['diffusion_evals']}")

State feedback for stochastic stabilization:

>>> K = np.array([[1.0, 2.0]])
>>> result = integrator.integrate(
...     x0=np.array([1.0, 0.0]),
...     u_func=lambda t, x: -K @ x,  # (t, x) order for integrate()
...     t_span=(0.0, 10.0)
... )

Time-varying stochastic control:

>>> result = integrator.integrate(
...     x0=np.array([1.0, 0.0]),
...     u_func=lambda t, x: np.array([0.5 * np.sin(t)]),
...     t_span=(0.0, 10.0)
... )

Reproducible stochastic simulation:

>>> integrator.set_seed(42)
>>> result1 = integrator.integrate(
...     x0=np.array([1.0, 0.0]),
...     u_func=lambda t, x: None,
...     t_span=(0.0, 10.0)
... )
>>> integrator.set_seed(42)
>>> result2 = integrator.integrate(
...     x0=np.array([1.0, 0.0]),
...     u_func=lambda t, x: None,
...     t_span=(0.0, 10.0)
... )
>>> np.allclose(result1['x'], result2['x'])  # True - same random trajectory
True

Evaluate at specific times:

>>> t_eval = np.linspace(0, 10, 101)
>>> result = integrator.integrate(
...     x0=np.array([1.0, 0.0]),
...     u_func=lambda t, x: None,
...     t_span=(0, 10),
...     t_eval=t_eval
... )
>>> assert len(result['t']) == 101

Monte Carlo simulation (multiple trajectories):

For statistical analysis, use integrate_monte_carlo():

>>> # Simulate 1000 paths
>>> mc_result = integrator.integrate_monte_carlo(
...     x0=np.array([1.0, 0.0]),
...     u_func=lambda t, x: None,
...     t_span=(0.0, 10.0),
...     n_paths=1000
... )
>>> # Result has shape (n_paths, T, nx)
>>> print(mc_result['x'].shape)  # (1000, 101, 2)
>>>
>>> # Compute statistics
>>> from controldesymulation.systems.base.numerical_integration.stochastic.sde_integrator_base import get_trajectory_statistics
>>> stats = get_trajectory_statistics(mc_result)
>>> print(f"Mean at t=10: {stats['mean'][-1]}")
>>> print(f"Std at t=10: {stats['std'][-1]}")

High-level simulate() usage (if available, uses (x, t) convention):

If your stochastic system provides a high-level simulate() method, prefer it for the more intuitive (x, t) convention:

>>> # Controller with (x, t) order - state is primary
>>> def controller(x, t):  # Note: (x, t) order
...     K = np.array([[1.0, 2.0]])
...     return -K @ x
>>>
>>> result = sde_system.simulate(
...     x0=np.array([1.0, 0.0]),
...     controller=controller,  # Uses (x, t) signature
...     t_span=(0.0, 10.0),
...     dt=0.01
... )

Converting between conventions:

If you have a controller designed for simulate() and need to use integrate():

>>> # Controller for simulate() - uses (x, t)
>>> def my_controller(x, t):
...     return -K @ x
>>>
>>> # Wrap for integrate() - convert to (t, x)
>>> result = integrator.integrate(
...     x0=x0,
...     u_func=lambda t, x: my_controller(x, t),  # Swap argument order
...     t_span=(0, 10)
... )

Comparing SDE with ODE integration:

>>> # Deterministic (ODE) - same result every time
>>> ode_result = ode_integrator.integrate(x0, u_func, t_span)
>>>
>>> # Stochastic (SDE) - different result each time
>>> sde_result1 = sde_integrator.integrate(x0, u_func, t_span)
>>> sde_result2 = sde_integrator.integrate(x0, u_func, t_span)
>>> # Trajectories will differ due to randomness

See Also

integrate_monte_carlo : Simulate multiple paths for statistical analysis get_trajectory_statistics : Compute statistics from Monte Carlo results set_seed : Set random seed for reproducibility step : Single SDE integration step with noise simulate : High-level simulation with (x, t) convention (if available)

integrate_monte_carlo

systems.base.numerical_integration.stochastic.SDEIntegratorBase.integrate_monte_carlo(
    x0,
    u_func,
    t_span,
    n_paths,
    t_eval=None,
    store_paths=True,
    parallel=False,
)

Integrate multiple SDE trajectories for Monte Carlo analysis.

Simulates n_paths independent realizations of the SDE to estimate statistical properties (mean, variance, probability distributions).

Parameters

Name Type Description Default
x0 ArrayLike Initial state (nx,) required
u_func Callable Control policy (or None for autonomous) required
t_span Tuple[float, float] Time interval required
n_paths int Number of trajectories to simulate required
t_eval Optional[ArrayLike] Evaluation times None
store_paths bool If True, store all trajectories (memory intensive) If False, only compute statistics online True
parallel bool If True, use parallel execution (if backend supports) False

Returns

Name Type Description
SDEIntegrationResult Result with shape (n_paths, T, nx) if store_paths=True Result with statistics only if store_paths=False

Examples

>>> # Monte Carlo with 1000 paths
>>> result = integrator.integrate_monte_carlo(
...     x0=np.array([1.0, 0.0]),
...     u_func=lambda t, x: np.zeros(1),
...     t_span=(0.0, 10.0),
...     n_paths=1000
... )
>>>
>>> # Get statistics
>>> stats = get_trajectory_statistics(result)
>>> print(f"Mean at t=10: {stats['mean'][-1]}")
>>> print(f"Std at t=10: {stats['std'][-1]}")
>>>
>>> # Confidence intervals
>>> lower = stats['mean'] - 1.96 * stats['std']
>>> upper = stats['mean'] + 1.96 * stats['std']

reset_stats

systems.base.numerical_integration.stochastic.SDEIntegratorBase.reset_stats()

Reset all statistics including SDE-specific counters.

set_seed

systems.base.numerical_integration.stochastic.SDEIntegratorBase.set_seed(seed)

Set random seed for reproducibility.

Parameters

Name Type Description Default
seed int Random seed required

Examples

>>> integrator.set_seed(42)
>>> result1 = integrator.integrate(x0, u_func, t_span)
>>> integrator.set_seed(42)
>>> result2 = integrator.integrate(x0, u_func, t_span)
>>> # result1 and result2 are identical

step

systems.base.numerical_integration.stochastic.SDEIntegratorBase.step(
    x,
    u=None,
    dt=None,
    dW=None,
)

Take one SDE integration step with noise.

Parameters

Name Type Description Default
x ArrayLike Current state (nx,) required
u Optional[ArrayLike] Control input (nu,) or None for autonomous None
dt Optional[float] Step size (uses self.dt if None) None
dW Optional[ArrayLike] Brownian increment (nw,) - if None, generates random None

Returns

Name Type Description
ArrayLike Next state x(t + dt)

Notes

Subclasses implement specific SDE methods: - Euler-Maruyama (order 0.5 strong, order 1 weak) - Milstein (order 1 strong) - Runge-Kutta SDE methods - etc.

Examples

>>> x_next = integrator.step(x, u, dt=0.01)
>>> # With custom noise
>>> dW = np.random.randn(nw) * np.sqrt(0.01)
>>> x_next = integrator.step(x, u, dt=0.01, dW=dW)