Skip to main content

Event API

Event

Base event class for the agent communication system. All events in the system should inherit from this class.

Constructor

class Event:
def __init__(
self,
from_agent_id: str,
to_agent_id: str,
**kwargs: Any
)
ParameterTypeDescription
from_agent_idstrThe ID of the agent sending the event
to_agent_idstrThe ID of the agent receiving the event
**kwargsAnyAdditional event attributes (e.g., event_id, timestamp, parent_event_id)

Methods

  • to_dict(self) -> dict: Convert event to a dictionary format with standard fields.
  • get(self, key: str, default: Any = None) -> Any: Get an attribute value by key with a default fallback.
  • __getitem__(self, key: str) -> Any: Access event attributes using dictionary-style syntax.
  • __str__(self) -> str: Return a string representation of the event.

EndEvent

Event to signal agent termination. Inherits from Event.

Constructor

class EndEvent(Event):
def __init__(
self,
from_agent_id: str,
to_agent_id: str,
reason: str = "normal_termination",
**kwargs: Any
)
ParameterTypeDescription
from_agent_idstrID of the sender
to_agent_idstrID of the receiver (use "all" for global termination)
reasonstrReason for termination
**kwargsAnyAdditional keyword arguments

DataEvent

Event for data access across agents and environment. Inherits from Event.

Constructor

class DataEvent(Event):
def __init__(
self,
from_agent_id: str,
to_agent_id: str,
source_type: str,
target_type: str,
key: str,
default: Any = None,
**kwargs
)
ParameterTypeDescription
from_agent_idstrID of requesting entity
to_agent_idstrID of entity that should receive request
source_typestrType of requesting entity ("AGENT" or "ENV")
target_typestrType of target entity ("AGENT" or "ENV")
keystrData key to access
defaultAnyDefault value if key not found
**kwargsAnyAdditional keyword arguments

DataResponseEvent

Event for data access response. Inherits from Event.

Constructor

class DataResponseEvent(Event):
def __init__(
self,
from_agent_id: str,
to_agent_id: str,
request_id: str,
key: str,
data_value: Any = None,
success: bool = True,
error: Optional[str] = None,
**kwargs
)
ParameterTypeDescription
from_agent_idstrID of responding entity
to_agent_idstrID of requesting entity
request_idstrID of the originating request
keystrData key that was accessed
data_valueAnyValue of the data if success
successboolWhether the query was successful
errorOptional[str]Error message if not successful
**kwargsAnyAdditional keyword arguments

DataUpdateEvent

Event for updating data across agents and environment. Inherits from Event.

Constructor

class DataUpdateEvent(Event):
def __init__(
self,
from_agent_id: str,
to_agent_id: str,
source_type: str,
target_type: str,
key: str,
value: Any,
**kwargs
)
ParameterTypeDescription
from_agent_idstrID of requesting entity
to_agent_idstrID of entity that should receive update
source_typestrType of requesting entity ("AGENT" or "ENV")
target_typestrType of target entity ("AGENT" or "ENV")
keystrData key to update
valueAnyNew value to set
**kwargsAnyAdditional keyword arguments

DataUpdateResponseEvent

Event for data update response. Inherits from Event.

Constructor

class DataUpdateResponseEvent(Event):
def __init__(
self,
from_agent_id: str,
to_agent_id: str,
request_id: str,
key: str,
success: bool = True,
error: Optional[str] = None,
**kwargs
)
ParameterTypeDescription
from_agent_idstrID of responding entity
to_agent_idstrID of requesting entity
request_idstrID of the originating request
keystrData key that was updated
successboolWhether the update was successful
errorOptional[str]Error message if not successful
**kwargsAnyAdditional keyword arguments

EventBus

Event bus responsible for handling all event distribution in the system.

Constructor

class EventBus:
def __init__(self)

Methods

  • async def dispatch_event(self, event: Event) -> None: Dispatch an event, choosing local or remote distribution based on distributed mode.
  • async def run(self): Start processing the event queue. Runs until stop() is called.
  • stop(self): Stop the event bus processing.
  • async def pause(self): Pause the event bus processing.
  • async def resume(self): Resume the event bus processing.
  • is_paused(self) -> bool: Check if the event bus is paused.
  • is_empty(self) -> bool: Check if the event queue is empty.
  • is_stopped(self) -> bool: Check if the event bus is stopped.
  • register_event(self, event_kind: str, agent: Any) -> None: Register an agent to listen for a specific event kind.
  • register_agent(self, agent_id: str, agent: Any) -> None: Register an agent in the event bus registry.
  • setup_distributed(self, node): Configure the event bus for distributed mode.
  • async def log_event_flows(self) -> None: Log all tracked event flows at the end of simulation.
  • async def export_event_flow_data(self, output_file: str = None) -> Dict[str, Any]: Export event flow data to a file or return as a dictionary.
  • async def cleanup_expired_locks(self): Clean up expired locks - should be called periodically in distributed systems.

Scheduler

Scheduler for managing timed and recurring events.

Constructor

class Scheduler:
def __init__(
self,
event_bus: EventBus
)
ParameterTypeDescription
event_busEventBusEvent bus instance for dispatching scheduled events

Methods

  • schedule_task(self, interval: float, event: Any, max_count: Optional[int] = None) -> asyncio.Task: Schedule a new task with the given interval and event.
  • async def run(self): Start running all scheduled tasks.
  • async def stop(self): Stop all tasks and clean up.
  • async def pause(self, task: asyncio.Task = None): Pause specific task or all tasks if no task is specified.
  • async def resume(self, task: asyncio.Task = None): Resume specific task or all tasks if no task is specified.
  • is_done(self) -> bool: Check if all scheduled tasks have completed.
  • get_task_info(self, task: asyncio.Task) -> Dict[str, Any]: Get information about a specific task.
  • get_remaining_executions(self, task: asyncio.Task) -> Optional[int]: Get the number of remaining executions for a task. Returns None if the task has infinite executions.

Factory Functions

get_event_bus

Get the global EventBus instance, ensuring only one instance exists throughout the application.

def get_event_bus() -> EventBus
ReturnsTypeDescription
event_busEventBusThe global EventBus instance

reset_event_bus

Reset the global EventBus instance.

def reset_event_bus() -> None

Documentation for YuLan-OneSim - A Next Generation Social Simulator with LLMs