Skip to content

PyDEVS Code Examples

This page provides examples for building discrete event simulations for IoT systems using the Python DEVS (Discrete Event System Specification) framework.

Download: PyDEVS Code Examples (ZIP)

Example Categories

Motion Sensor Simulation

These examples demonstrate how to simulate motion sensors:

Motion-Based Lighting System

This example demonstrates a motion-based lighting system using PyDEVS. The system consists of a motion sensor, a router (controller), and a light component.

System Architecture

The system consists of three main components:

  1. Motion Sensor: Generates motion detection events
  2. Router: Routes messages based on the motion state
  3. Light: Controls the light based on received commands

These components are connected in a directed graph to form a complete motion-based lighting system.

Motion Sensor Component (motionsensor.py)

This component simulates a motion sensor that periodically detects motion in its vicinity.

Code Breakdown

from pypdevs.DEVS import AtomicDEVS
from pypdevs.infinity import INFINITY
import random
import time

class MotionSensorState:
    def __init__(self):
        self.MotionData = 0
        self.MotionTimer_next = 0.0
        self.data_to_send = None
        self.output_port = None
        self.received_value = None


class MotionSensor(AtomicDEVS):
    def __init__(self, name="MotionSensor", **kwargs):
        AtomicDEVS.__init__(self, name)
        self.state = MotionSensorState()
        self.timeLast = 0.0

        # Initialize ports
        self.out_0 = self.addOutPort("out_0")

        # Initialize parameters
        self.state.MotionData = kwargs.get('MotionData', 0)

        # Initialize timers
        self.state.MotionTimer_next = 1.0

    def timeAdvance(self):
        """Return time until next internal transition"""
                # Check if there's data to send immediately
        if self.state.data_to_send is not None:
            return 0.0

        next_time = INFINITY
        if hasattr(self.state, 'MotionTimer_next'):
            next_time = min(next_time, self.state.MotionTimer_next - self.timeLast)
        return next_time

    def intTransition(self):
        """Handle internal transition"""
        if hasattr(self.state, 'MotionTimer_next') and self.state.MotionTimer_next <= self.timeLast:
            self.state.MotionTimer_next = self.timeLast + 1.0
        self.state.data_to_send = None
        return self.state

    def extTransition(self, inputs):
        """Handle external transition"""
        print(f"extTransition called")
        return self.state

    def outputFnc(self):
        """Generate output"""
        result = {}
        # Generate sensor data
        sensor_data = {
            "m2m:cin": {
                "lbl": [
                    f"{self.name}"
                ],
                "con": f"{self.name}, {int(time.time())}, {random.randint(0, 1)}"
            }
        }
        # Send to all output ports
        for port_name in dir(self):
            if port_name.startswith('out_'):
                port = getattr(self, port_name)
                result[port] = sensor_data
        return result

    def __lt__(self, other):
        """Comparison method required for sorting during simulation"""
        return self.name < other.name

Explanation

  • MotionSensorState: Maintains the state of the motion sensor
  • MotionData: Current motion value (0 or 1)
  • MotionTimer_next: Tracks when the next motion check should occur

  • MotionSensor Class: Implements the behavior of a motion sensor

  • timeAdvance(): Determines when the next internal transition should occur
  • intTransition(): Updates the state after an internal transition
  • outputFnc(): Generates random motion detection events (0 or 1)
  • The sensor generates data in the oneM2M format, a standard for IoT communication

Router Component (router.py)

This component routes messages based on motion sensor data.

Code Breakdown

from pypdevs.DEVS import AtomicDEVS
from pypdevs.infinity import INFINITY
import random
import time

class RouterState:
    def __init__(self):
        self.Off = False
        self.On = True
        self.data_to_send = None
        self.output_port = None
        self.motion = None  # Condition variable from model

class Router(AtomicDEVS):
    def __init__(self, name="Router", **kwargs):
        AtomicDEVS.__init__(self, name)
        self.state = RouterState()
        self.timeLast = 0.0

        # Initialize ports
        self.in_0 = self.addInPort("in_0")
        self.out_0 = self.addOutPort("out_0")
        self.out_1 = self.addOutPort("out_1")

        # Initialize parameters
        self.state.Off = kwargs.get('Off', False)
        self.state.On = kwargs.get('On', True)

    def timeAdvance(self):
        """Return time until next internal transition"""
        return INFINITY if self.state.data_to_send is None else 0.0

    def intTransition(self):
        """Handle internal transition"""
        self.state.data_to_send = None
        self.state.output_port = None
        return self.state

    def extTransition(self, inputs):
        """Handle external transition"""
        received_data = None
        for port_name, port_value in inputs.items():
            received_data = port_value

        # Process data for controller
        if received_data is not None:
            # Extract data from received message
            if isinstance(received_data, dict) and 'm2m:cin' in received_data:
                content = received_data['m2m:cin'].get('con', '')
                parts = content.split(',')
                if len(parts) > 2:
                    try:
                        # Extract value from the message
                        value = float(parts[2].strip())
                        self.state.motion = value  # Store in condition variable

                        # Apply model-defined conditions
                        if value == 0:
                            self.state.motion = True
                            self.state.data_to_send = received_data
                            self.state.output_port = "out_1"
                        elif value == 1:
                            self.state.motion = True
                            self.state.data_to_send = received_data
                            self.state.output_port = "out_1"
                        else:
                            # No condition matched
                            self.state.data_to_send = None
                            print(f"[{self.name}] Value {value} is in normal range: No action")
                    except (ValueError, IndexError):
                        print("Error parsing value")
        return self.state

    def outputFnc(self):
        """Generate output"""
        result = {}
        if hasattr(self.state, 'data_to_send') and self.state.data_to_send is not None and self.state.output_port:
            port = getattr(self, self.state.output_port)
            result[port] = self.state.data_to_send
        return result

    def __lt__(self, other):
        """Comparison method required for sorting during simulation"""
        return self.name < other.name

Explanation

  • RouterState: Maintains the state of the router
  • data_to_send: Data to be forwarded
  • output_port: Selected output port based on logic
  • motion: Current motion state

  • Router Class: Implements message routing logic

  • extTransition(): Processes incoming messages and determines routing
  • Parses oneM2M formatted messages to extract sensor values
  • Routes messages to different outputs based on motion values (0 or 1)

Model Component (model.py)

This component defines the overall system model by connecting the components.

Code Breakdown

from pypdevs.DEVS import CoupledDEVS
from motionsensor import MotionSensor
from router import Router
from light import Light
class GeneratedModel(CoupledDEVS):
    def __init__(self):
        CoupledDEVS.__init__(self, "GeneratedModel")
        print("Model Loading...")

        # Initialize components
        self.c1 = self.addSubModel(MotionSensor("c1"))
        print("Initialized MotionSensor as c1")
        self.c2 = self.addSubModel(Router("c2"))
        print("Initialized Router as c2")
        self.c3 = self.addSubModel(Light("c3"))
        print("Initialized Light as c3")

        # Connect components
        self.connectPorts(self.c1.out_0, self.c2.in_0)
        print("Connected c1.out_0 to c2.in_0")
        self.connectPorts(self.c2.out_0, self.c3.in_1)
        print("Connected c2.out_0 to c3.in_1")
        self.connectPorts(self.c2.out_1, self.c3.in_0)
        print("Connected c2.out_1 to c3.in_0")

        print("Model initialization complete")

Explanation

  • GeneratedModel Class: Creates a coupled model by connecting components
  • Instantiates MotionSensor, Router, and Light components
  • Connects component ports to define the system structure:
    • Motion sensor output connects to router input
    • Router outputs connect to different light inputs for on/off control

Simulation Runner (simulate.py)

This component runs the simulation for the defined model.

Code Breakdown

import sys
import os
import random
import time

# Add the current directory to the Python path
sys.path.append(os.path.dirname(os.path.abspath(__file__)))

from pypdevs.simulator import Simulator
from model import GeneratedModel

# Create the model
model = GeneratedModel()

# Create the simulator
sim = Simulator(model)

# Configure the simulation
sim.setTerminationTime(1000.0)  # Run for 1000 time units
sim.setClassicDEVS()  # Use classic DEVS formalism

# Use the correct setVerbose syntax for your PyDEVS version
# It expects either None or a string filename, not a boolean
sim.setVerbose(None)  # No additional verbosity

# Redirect stdout to capture log
log_file = 'simulation.log'
original_stdout = sys.stdout
try:
    with open(log_file, 'w') as f:
        sys.stdout = f
        # Run the simulation
        sim.simulate()
finally:
    # Restore stdout
    sys.stdout = original_stdout

print(f"Simulation complete. Results saved to {log_file}")

Explanation

  • Creates an instance of the GeneratedModel
  • Configures the simulation parameters:
  • Sets a termination time of 1000 time units
  • Uses classic DEVS formalism
  • Redirects output to a log file
  • Runs the simulation and captures the results

Experiment Runner (experiment.py)

This script provides a convenient way to run the simulation and examine the results.

Code Breakdown

import os
import subprocess
import sys

print("Running simulation...")

# Run the simulation
try:
    # First try to run with python command
    result = subprocess.run(['python', 'simulate.py'], capture_output=True, text=True)
    if result.returncode != 0 and sys.executable:
        # If that fails, try with the current Python interpreter
        print("Trying with current Python executable...")
        result = subprocess.run([sys.executable, 'simulate.py'], capture_output=True, text=True)
except Exception as e:
    print(f"Error running simulation: {e}")
    sys.exit(1)

# Print output
print("\nSimulation output:")
print(result.stdout)

if result.stderr:
    print("\nErrors:")
    print(result.stderr)

# Open the log file
try:
    with open('simulation.log', 'r') as f:
        log_content = f.read()
        print("\nSimulation Log Preview (first 1000 chars):")
        print(log_content[:1000])
        print("\n...\n")
except FileNotFoundError:
    print("Simulation log file not found.")

print("To view full log, open 'simulation.log'")

Explanation

  • Executes the simulation script as a subprocess
  • Handles different Python interpreter paths for compatibility
  • Displays simulation output and errors
  • Provides a preview of the simulation log file

How to Run the Simulation

  1. Make sure all Python files (motionsensor.py, router.py, light.py, model.py, simulate.py, experiment.py) are in the same directory
  2. Ensure PyDEVS is installed (you can install it with pip install pypdevs)
  3. Run the experiment script with python experiment.py
  4. Review the output and simulation log file for results

Key Concepts Demonstrated

  1. Discrete Event Simulation: The system is modeled as a series of discrete events rather than continuous changes
  2. Component-Based Modeling: Each part of the system is modeled as a separate component
  3. Event Propagation: Events propagate through the system via connected ports
  4. State Machines: Each component implements a state machine with defined transitions
  5. IoT Communication Patterns: Uses oneM2M formatting to simulate IoT data exchange

This example provides a foundation for modeling more complex IoT systems using PyDEVS, where you can add additional sensors, controllers, and actuators.