PyDEVS Code Examples¶
This page provides examples for building discrete event simulations for IoT systems using the Python DEVS (Discrete Event System Specification) framework.
Download: PyDEVS Code Examples (ZIP)
Example Categories¶
Motion Sensor Simulation¶
These examples demonstrate how to simulate motion sensors:
Motion-Based Lighting System¶
This example demonstrates a motion-based lighting system using PyDEVS. The system consists of a motion sensor, a router (controller), and a light component.
System Architecture¶
The system consists of three main components:
- Motion Sensor: Generates motion detection events
- Router: Routes messages based on the motion state
- Light: Controls the light based on received commands
These components are connected in a directed graph to form a complete motion-based lighting system.
Motion Sensor Component (motionsensor.py)¶
This component simulates a motion sensor that periodically detects motion in its vicinity.
Code Breakdown¶
from pypdevs.DEVS import AtomicDEVS
from pypdevs.infinity import INFINITY
import random
import time
class MotionSensorState:
def __init__(self):
self.MotionData = 0
self.MotionTimer_next = 0.0
self.data_to_send = None
self.output_port = None
self.received_value = None
class MotionSensor(AtomicDEVS):
def __init__(self, name="MotionSensor", **kwargs):
AtomicDEVS.__init__(self, name)
self.state = MotionSensorState()
self.timeLast = 0.0
# Initialize ports
self.out_0 = self.addOutPort("out_0")
# Initialize parameters
self.state.MotionData = kwargs.get('MotionData', 0)
# Initialize timers
self.state.MotionTimer_next = 1.0
def timeAdvance(self):
"""Return time until next internal transition"""
# Check if there's data to send immediately
if self.state.data_to_send is not None:
return 0.0
next_time = INFINITY
if hasattr(self.state, 'MotionTimer_next'):
next_time = min(next_time, self.state.MotionTimer_next - self.timeLast)
return next_time
def intTransition(self):
"""Handle internal transition"""
if hasattr(self.state, 'MotionTimer_next') and self.state.MotionTimer_next <= self.timeLast:
self.state.MotionTimer_next = self.timeLast + 1.0
self.state.data_to_send = None
return self.state
def extTransition(self, inputs):
"""Handle external transition"""
print(f"extTransition called")
return self.state
def outputFnc(self):
"""Generate output"""
result = {}
# Generate sensor data
sensor_data = {
"m2m:cin": {
"lbl": [
f"{self.name}"
],
"con": f"{self.name}, {int(time.time())}, {random.randint(0, 1)}"
}
}
# Send to all output ports
for port_name in dir(self):
if port_name.startswith('out_'):
port = getattr(self, port_name)
result[port] = sensor_data
return result
def __lt__(self, other):
"""Comparison method required for sorting during simulation"""
return self.name < other.name
Explanation¶
- MotionSensorState: Maintains the state of the motion sensor
MotionData: Current motion value (0 or 1)-
MotionTimer_next: Tracks when the next motion check should occur -
MotionSensor Class: Implements the behavior of a motion sensor
timeAdvance(): Determines when the next internal transition should occurintTransition(): Updates the state after an internal transitionoutputFnc(): Generates random motion detection events (0 or 1)- The sensor generates data in the oneM2M format, a standard for IoT communication
Router Component (router.py)¶
This component routes messages based on motion sensor data.
Code Breakdown¶
from pypdevs.DEVS import AtomicDEVS
from pypdevs.infinity import INFINITY
import random
import time
class RouterState:
def __init__(self):
self.Off = False
self.On = True
self.data_to_send = None
self.output_port = None
self.motion = None # Condition variable from model
class Router(AtomicDEVS):
def __init__(self, name="Router", **kwargs):
AtomicDEVS.__init__(self, name)
self.state = RouterState()
self.timeLast = 0.0
# Initialize ports
self.in_0 = self.addInPort("in_0")
self.out_0 = self.addOutPort("out_0")
self.out_1 = self.addOutPort("out_1")
# Initialize parameters
self.state.Off = kwargs.get('Off', False)
self.state.On = kwargs.get('On', True)
def timeAdvance(self):
"""Return time until next internal transition"""
return INFINITY if self.state.data_to_send is None else 0.0
def intTransition(self):
"""Handle internal transition"""
self.state.data_to_send = None
self.state.output_port = None
return self.state
def extTransition(self, inputs):
"""Handle external transition"""
received_data = None
for port_name, port_value in inputs.items():
received_data = port_value
# Process data for controller
if received_data is not None:
# Extract data from received message
if isinstance(received_data, dict) and 'm2m:cin' in received_data:
content = received_data['m2m:cin'].get('con', '')
parts = content.split(',')
if len(parts) > 2:
try:
# Extract value from the message
value = float(parts[2].strip())
self.state.motion = value # Store in condition variable
# Apply model-defined conditions
if value == 0:
self.state.motion = True
self.state.data_to_send = received_data
self.state.output_port = "out_1"
elif value == 1:
self.state.motion = True
self.state.data_to_send = received_data
self.state.output_port = "out_1"
else:
# No condition matched
self.state.data_to_send = None
print(f"[{self.name}] Value {value} is in normal range: No action")
except (ValueError, IndexError):
print("Error parsing value")
return self.state
def outputFnc(self):
"""Generate output"""
result = {}
if hasattr(self.state, 'data_to_send') and self.state.data_to_send is not None and self.state.output_port:
port = getattr(self, self.state.output_port)
result[port] = self.state.data_to_send
return result
def __lt__(self, other):
"""Comparison method required for sorting during simulation"""
return self.name < other.name
Explanation¶
- RouterState: Maintains the state of the router
data_to_send: Data to be forwardedoutput_port: Selected output port based on logic-
motion: Current motion state -
Router Class: Implements message routing logic
extTransition(): Processes incoming messages and determines routing- Parses oneM2M formatted messages to extract sensor values
- Routes messages to different outputs based on motion values (0 or 1)
Model Component (model.py)¶
This component defines the overall system model by connecting the components.
Code Breakdown¶
from pypdevs.DEVS import CoupledDEVS
from motionsensor import MotionSensor
from router import Router
from light import Light
class GeneratedModel(CoupledDEVS):
def __init__(self):
CoupledDEVS.__init__(self, "GeneratedModel")
print("Model Loading...")
# Initialize components
self.c1 = self.addSubModel(MotionSensor("c1"))
print("Initialized MotionSensor as c1")
self.c2 = self.addSubModel(Router("c2"))
print("Initialized Router as c2")
self.c3 = self.addSubModel(Light("c3"))
print("Initialized Light as c3")
# Connect components
self.connectPorts(self.c1.out_0, self.c2.in_0)
print("Connected c1.out_0 to c2.in_0")
self.connectPorts(self.c2.out_0, self.c3.in_1)
print("Connected c2.out_0 to c3.in_1")
self.connectPorts(self.c2.out_1, self.c3.in_0)
print("Connected c2.out_1 to c3.in_0")
print("Model initialization complete")
Explanation¶
- GeneratedModel Class: Creates a coupled model by connecting components
- Instantiates MotionSensor, Router, and Light components
- Connects component ports to define the system structure:
- Motion sensor output connects to router input
- Router outputs connect to different light inputs for on/off control
Simulation Runner (simulate.py)¶
This component runs the simulation for the defined model.
Code Breakdown¶
import sys
import os
import random
import time
# Add the current directory to the Python path
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from pypdevs.simulator import Simulator
from model import GeneratedModel
# Create the model
model = GeneratedModel()
# Create the simulator
sim = Simulator(model)
# Configure the simulation
sim.setTerminationTime(1000.0) # Run for 1000 time units
sim.setClassicDEVS() # Use classic DEVS formalism
# Use the correct setVerbose syntax for your PyDEVS version
# It expects either None or a string filename, not a boolean
sim.setVerbose(None) # No additional verbosity
# Redirect stdout to capture log
log_file = 'simulation.log'
original_stdout = sys.stdout
try:
with open(log_file, 'w') as f:
sys.stdout = f
# Run the simulation
sim.simulate()
finally:
# Restore stdout
sys.stdout = original_stdout
print(f"Simulation complete. Results saved to {log_file}")
Explanation¶
- Creates an instance of the GeneratedModel
- Configures the simulation parameters:
- Sets a termination time of 1000 time units
- Uses classic DEVS formalism
- Redirects output to a log file
- Runs the simulation and captures the results
Experiment Runner (experiment.py)¶
This script provides a convenient way to run the simulation and examine the results.
Code Breakdown¶
import os
import subprocess
import sys
print("Running simulation...")
# Run the simulation
try:
# First try to run with python command
result = subprocess.run(['python', 'simulate.py'], capture_output=True, text=True)
if result.returncode != 0 and sys.executable:
# If that fails, try with the current Python interpreter
print("Trying with current Python executable...")
result = subprocess.run([sys.executable, 'simulate.py'], capture_output=True, text=True)
except Exception as e:
print(f"Error running simulation: {e}")
sys.exit(1)
# Print output
print("\nSimulation output:")
print(result.stdout)
if result.stderr:
print("\nErrors:")
print(result.stderr)
# Open the log file
try:
with open('simulation.log', 'r') as f:
log_content = f.read()
print("\nSimulation Log Preview (first 1000 chars):")
print(log_content[:1000])
print("\n...\n")
except FileNotFoundError:
print("Simulation log file not found.")
print("To view full log, open 'simulation.log'")
Explanation¶
- Executes the simulation script as a subprocess
- Handles different Python interpreter paths for compatibility
- Displays simulation output and errors
- Provides a preview of the simulation log file
How to Run the Simulation¶
- Make sure all Python files (motionsensor.py, router.py, light.py, model.py, simulate.py, experiment.py) are in the same directory
- Ensure PyDEVS is installed (you can install it with
pip install pypdevs) - Run the experiment script with
python experiment.py - Review the output and simulation log file for results
Key Concepts Demonstrated¶
- Discrete Event Simulation: The system is modeled as a series of discrete events rather than continuous changes
- Component-Based Modeling: Each part of the system is modeled as a separate component
- Event Propagation: Events propagate through the system via connected ports
- State Machines: Each component implements a state machine with defined transitions
- IoT Communication Patterns: Uses oneM2M formatting to simulate IoT data exchange
This example provides a foundation for modeling more complex IoT systems using PyDEVS, where you can add additional sensors, controllers, and actuators.