Hermes Multi-Agent System: MUD Pattern Design
Hermes Multi-Agent System: MUD Pattern Design
Date: 2026-04-27 Status: Approved Approach: C — Native reimplementation of MUD abstractions for Hermes's Redis + ARQ architecture
Background
Hermes is a CLI-based multi-agent orchestration system with an existing working prototype:
- Redis for messaging (ARQ queues + pub-sub) and state storage (Hash, ZSet, String)
- ARQ workers for concurrent task execution (verified: 2 workers consuming 4 tasks concurrently)
- Hermes dispatcher (
lib/dispatcher.py) routing tasks fromconfig/agents.yamlowner→queue mapping - CLI (
hermes task list/show,hermes events) reading from Redis
This design extracts conceptual patterns from Evennia (a Python MUD framework) and reimplements them natively for Hermes, without introducing Django/Twisted dependencies.
Architecture Overview
┌─────────────────────────────────────────────┐
│ CLI Layer (click/REPL) │ MUD-style command interaction
├─────────────────────────────────────────────┤
│ Command Router │ Match, disambiguate, dispatch
├─────────────────────────────────────────────┤
│ Hermes Core │
│ ┌─────────┬──────────┬──────────┬────────┐ │
│ │Capability│ Channel │ ACL │Template│ │ Four core subsystems
│ │ Set │ System │ System │ Engine │ │
│ └────┬────┴────┬─────┴────┬─────┴───┬────┘ │
│ │ │ │ │ │
│ ┌────┴─────────┴──────────┴─────────┴────┐ │
│ │ Agent Entity Model │ │ Unified entity abstraction
│ └────────────────┬───────────────────────┘ │
├───────────────────┼─────────────────────────┤
│ Redis Backend │ ARQ Worker Layer │ Existing infrastructure
│ (state/pubsub) │ (task dispatch) │
└───────────────────┴─────────────────────────┘
Section 1: Agent Entity Model
Inspired by Evennia's Typeclass (data/behavior separation). Data lives in Redis, behavior in Python classes, linked by agent_type field.
Redis Storage
Key: hermes:agent:{agent_id}
Type: Hash
Fields:
agent_id — unique identifier
agent_type — Python class path (e.g. "hermes.agents.CoderAgent")
name — display name
owner — owning owner
queue — bound ARQ queue
status — created | active | paused | stopped
template — template name used at creation
created_at — creation timestamp
last_active — last activity timestamp
Agent Class
class Agent:
"""Unified entity model. Data in Redis Hash, behavior in Python class."""
def __init__(self, agent_id: str):
self.agent_id = agent_id
self._data = None
@property
def data(self) -> dict:
"""Lazy load from Redis with local cache."""
if self._data is None:
self._data = redis.hgetall(f"hermes:agent:{self.agent_id}")
return self._data
def save(self):
redis.hmset(f"hermes:agent:{self.agent_id}", self._data)
# Lazy-init handlers (pay only for what you use)
@lazy_property
def capabilities(self) -> CapabilityHandler: ...
@lazy_property
def locks(self) -> LockHandler: ...
@lazy_property
def channels(self) -> ChannelSubscriptionHandler: ...
# Message delivery (environment-dependent)
def deliver(self, message: "Message"):
"""
Receive a message. Behavior depends on runtime context:
- CLI environment: print to terminal
- Worker environment: enqueue as ARQ task for the agent's worker
"""
...
# Lifecycle hooks (override in subclasses)
def at_creation(self): ...
def at_activate(self): ...
def at_pause(self): ...
def at_resume(self): ...
def at_stop(self): ...AgentRegistry and ChannelRegistry
Global lookup caches ensuring identity consistency and fast access:
class AgentRegistry:
"""In-process cache: {agent_id: Agent}. Auto-populated on first access."""
_cache: dict[str, Agent] = {}
@classmethod
def get(cls, agent_id: str) -> Agent:
if agent_id not in cls._cache:
cls._cache[agent_id] = Agent(agent_id)
return cls._cache[agent_id]
@classmethod
def find(cls, name_or_id: str) -> Agent | None:
"""Find by agent_id, name prefix, or alias. Returns None if not found."""
...
class ChannelRegistry:
"""Channel lookup by name. Channels are loaded from Redis on first access."""
_cache: dict[str, Channel] = {}
@classmethod
def get(cls, name: str) -> Channel:
if name not in cls._cache:
cls._cache[name] = Channel(name)
return cls._cache[name]Agent Identity Cache
The AgentRegistry._cache dict serves as the identity cache (inspired by Evennia's IDMapper), ensuring one Python instance per agent within a Hermes process.
Key Design Decisions
- One Redis Hash per agent, compatible with existing
hermes task list/showRedis reads agent_typereplaces Evennia'sdb_typeclass_path— same concept, agent-oriented naming- Handler lazy initialization — not every agent needs every handler
- No new database introduced
Section 2: Capability Set (Dynamic Capability Composition)
Inspired by Evennia CmdSet's set-algebra merging. Capability items map directly to Hermes tools/operations/queues.
Data Structures
class Capability:
"""Single capability item."""
name: str # "code", "deploy", "review", "debug"
metadata: dict # optional: {"timeout": 300, "max_tokens": 8000}
def match(self, input_str: str) -> bool:
"""Prefix matching (supports abbreviations)."""
return self.name.startswith(input_str) or input_str.startswith(self.name[:len(input_str)])
class CapabilitySet:
"""Capability collection supporting four merge operations."""
priority: int # higher = higher priority
mergetype: str # "union" | "intersect" | "replace" | "remove"
source: str # origin: "role", "task", "channel", "override", "template"
_caps: dict[str, Capability]
def __add__(self, other: "CapabilitySet") -> "CapabilitySet":
"""Merge two CapabilitySets. Higher priority's mergetype wins."""
...Four Merge Operations
| Operation | Behavior | Use Case |
|---|---|---|
| Union | Combine both; same-name resolved by priority | Role + task capabilities stacking |
| Intersect | Keep only capabilities in both sets | Security sandbox constraining |
| Replace | Higher-priority set completely overrides | Locked mode with restricted capability set |
| Remove | Remove specified capabilities without adding | Temporary revocation of debug access |
Multi-Source Merge Pipeline
class CapabilityHandler:
"""
Per-agent handler managing multi-source CapabilitySet merging.
Redis storage:
Capability sources: SortedSet hermes:agent:{id}:cap_sources (score=priority)
Per-source caps: Hash hermes:agent:{id}:caps:{source}
Merged cache: Hash hermes:agent:{id}:caps_merged (TTL 60s)
"""
def get_merged(self) -> CapabilitySet:
"""Merge all sources from low to high priority."""
sources = redis.zrange(f"hermes:agent:{self.agent.agent_id}:cap_sources", 0, -1)
result = CapabilitySet()
for source_name in sources:
capset = self._load_source(source_name)
result = result + capset
return result
def grant(self, cap_name: str, source: str = "override", **metadata):
"""Dynamically grant capability."""
...
def revoke(self, cap_name: str, source: str = "override"):
"""Revoke capability from a source."""
...
def grant_source(self, source: str, cap_names: list[str]):
"""Replace entire source (used by template loading)."""
...
def check(self, cap_name: str) -> bool:
"""Check if merged set contains capability."""
merged = self.get_merged()
return any(c.match(cap_name) for c in merged._caps.values())Dispatcher Integration
# Existing lib/dispatcher.py routing enhanced with capability check
def dispatch_task(task, target_agent):
if not target_agent.capabilities.check(task.required_cap):
raise CapabilityDenied(f"Agent lacks: {task.required_cap}")
await arq_pool.enqueue(task.queue, task.payload)Section 3: ACL System (Permission and Access Control)
Inspired by Evennia Lock's declarative string ACL + safe evaluation. Lock functions bind directly to Hermes agent attributes.
Core Structure
class LockHandler:
"""
Per-agent access control.
Redis storage:
Key: hermes:agent:{id}:locks
Type: Hash
Field: access_type -> lock_string
"""
_locks: dict[str, ParsedLock]
def check(self, access_type: str, caller: "Agent", default: bool = False) -> bool:
"""
Check if caller has access_type permission.
Fail-closed: unknown access types return default (False).
"""
if caller.data.get("is_superuser"):
return True
parsed = self._locks.get(access_type)
if parsed is None:
return default
results = tuple(func(caller, self.agent, *args, **kwargs)
for func, args, kwargs in parsed.func_calls)
return eval(parsed.eval_template % results)
def set_locks(self, lockstring: str):
"""Parse and store lock string."""
self._locks = parse_lockstring(lockstring)
# Persist to Redis
redis.hmset(f"hermes:agent:{self.agent.agent_id}:locks",
{k: v.raw for k, v in self._locks.items()})Lock String Format
Same syntax as Evennia: access_type:func1(args) [AND|OR|NOT] func2(args); access_type2:...
Examples:
execute:role(junior) AND NOT status(paused)
deploy:queue(engineering) OR role(admin)
message:role(member) AND attr(muted, eq, false)
control:owner() OR role(admin)
Parsing and Safe Evaluation
class ParsedLock:
"""Pre-compiled lock string."""
eval_template: str # "%s and not %s"
func_calls: list # [(callable, args, kwargs), ...]
raw: str # original string for storage
def parse_lockstring(lockstring: str) -> dict[str, ParsedLock]:
"""
Parse lock string into {access_type: ParsedLock}.
Safety: eval_template only allows and/or/not, pre-validated with
empty-namespace eval. Only boolean results are ever interpolated.
"""
...Hermes Lock Functions
All share signature (caller, target, *args) -> bool. Registered via decorator.
| Function | Purpose |
|---|---|
role(name) |
Role hierarchy check (admin > senior > junior) |
owner() |
Caller is target's owner |
queue(name) |
Caller bound to specified queue |
status(name) |
Target's current status |
attr(name, op, value) |
Target Redis attribute with comparison operators (eq, ne, gt, lt) |
cap(name) |
Caller has specified capability (calls CapabilityHandler) |
true() / false() |
Always pass/fail |
Relationship with Capability Set
Permission check chain:
CLI/Agent initiates operation
-> LockHandler.check("execute", caller) # "Are you allowed to act on this agent?"
-> CapabilityHandler.check("deploy") # "Does your agent have this capability?"
-> Execute operation
Lock = "who can do what on your agent". Capability = "what your agent can do". Two independent but complementary layers.
Section 4: Channel System (Real-time Collaboration + CLI Chat)
Inspired by Evennia Channel's pub-sub + hook chain + ban/mute. Built natively on Redis pub-sub and Stream.
Core Structures
class Channel:
"""
Channel entity.
Redis storage:
Metadata: Hash hermes:channel:{name}
Subscribers: Set hermes:channel:{name}:subscribers
Ban list: Set hermes:channel:{name}:banlist
Mute list: Set hermes:channel:{name}:mutelist
History: Stream hermes:channel:{name}:stream (maxlen=1000)
"""
name: str
locks: LockHandler
send_to_active_only: bool = True
# Hook chain (override in subclasses)
def at_pre_msg(self, message: "Message") -> "Message | None":
"""Pre-broadcast. Return None to cancel."""
return message
def at_post_msg(self, message: "Message"):
"""Post-broadcast (default: write to Redis Stream)."""
...
class Message:
sender_id: str # agent_id or "user:{user_id}"
channel: str # channel name (empty = direct message)
content: str
msg_type: str # "say" | "tell" | "emote" | "system" | "task"
timestamp: float
metadata: dictThree-Stage Message Pipeline
broadcast(message):
1. Channel-level: at_pre_msg(message) # filter/format, can abort
2. Per-recipient: at_pre_channel_msg # per-recipient customization
-> deliver(processed) # actual delivery
-> at_post_channel_msg # per-recipient cleanup
3. Channel-level: at_post_msg(message) # logging, persistence
Channel Types
| Type | Purpose | Lock Defaults |
|---|---|---|
RoomChannel |
Workspace — agents in same task/project auto-join | join:true(); send:true(); receive:true() |
DirectChannel |
1-on-1 DM, auto-created | join:owner() OR self(); send:true(); receive:true() |
BroadcastChannel |
System announcements | join:true(); send:role(admin); receive:true() |
Agent Subscription Handler
class ChannelSubscriptionHandler:
"""Per-agent handler managing channel subscriptions."""
def join(self, channel_name: str):
channel = ChannelRegistry.get(channel_name)
channel.subscribe(self.agent.agent_id)
def leave(self, channel_name: str):
channel = ChannelRegistry.get(channel_name)
channel.unsubscribe(self.agent.agent_id)
def tell(self, target_id: str, content: str):
"""Direct message."""
target = AgentRegistry.get(target_id)
if not target.locks.check("message", self.agent):
raise AccessDenied(f"Cannot message {target_id}")
target.deliver(Message(sender_id=self.agent.agent_id,
content=content, msg_type="tell"))
def get_history(self, channel_name: str, count: int = 20) -> list[Message]:
"""Read channel history from Redis Stream."""
entries = redis.xrevrange(f"hermes:channel:{channel_name}:stream", count=count)
return [Message.from_stream(e) for e in entries]Section 5: Agent Lifecycle Management
Inspired by Evennia Script's state machine + four timing strategies. Implemented directly on ARQ workers and Redis.
State Machine
spawn()
|
v
+----------+
| created |
+-----+----+
| start()
v
+--------------+
+--->| active |<---+
| +------+-------+ |
| pause() | | resume()
| v |
| +--------------+ |
| | paused |----+
| +--------------+
| stop()
| is_valid()=False |
+-------------------------+
v
+----------+
| stopped |
+-----+----+
| delete()
v
(gone)
Valid transitions:
created -> {active}
active -> {paused, stopped}
paused -> {active, stopped}
stopped -> {active, gone}
Lifecycle Handler
class AgentLifecycle:
"""Per-agent lifecycle manager. State stored in Redis Hash `status` field."""
TRANSITIONS = {
"created": {"active"},
"active": {"paused", "stopped"},
"paused": {"active", "stopped"},
"stopped": {"active", "gone"},
}
def transition(self, target_status: str, **kwargs):
current = self.agent.data.get("status", "created")
if target_status not in self.TRANSITIONS.get(current, set()):
raise InvalidTransition(f"{current} -> {target_status}")
# Pre-hook
getattr(self.agent, f"at_{target_status}")(current_status=current, **kwargs)
# Update Redis
self.agent.data["status"] = target_status
self.agent.data["last_active"] = str(time.time())
self.agent.save()
# Publish event
redis.publish(f"hermes:lifecycle:{self.agent.agent_id}",
json.dumps({"event": target_status, "ts": time.time()}))Four Timing Strategies
| Strategy | Evennia Source | Hermes Implementation | Use Case |
|---|---|---|---|
| Repeating | Script interval + repeats | ARQ cron_jobs | Periodic health checks |
| Ticker Pool | TICKER_HANDLER shared timers | asyncio timer + Redis Set subscribers | Shared-interval callbacks |
| One-shot | TASK_HANDLER deferLater | ARQ delayed task (existing) | Delayed notifications |
| On-demand | ON_DEMAND_HANDLER lazy eval | Compute from Redis timestamp on query | Idle stage detection (no timer) |
On-demand example:
def get_agent_state(agent: Agent) -> dict:
"""Zero-timer state computation from elapsed time."""
elapsed = time.time() - float(agent.data.get("last_active", 0))
stages = [
(0, "active"),
(300, "idle"),
(600, "idle_warning"),
(900, "auto_pause"),
]
stage = "active"
for threshold, name in stages:
if elapsed >= threshold:
stage = name
return {"stage": stage, "elapsed": elapsed}Section 6: Agent Template System
Inspired by Evennia Prototype's dict template + multiple inheritance + callable late binding. Built on existing agents.yaml.
Template Definition (agents.yaml extension)
templates:
base_agent:
capabilities: [status, message]
locks: "execute:true(); message:true()"
max_concurrent_tasks: 1
timeout: 300
coder:
inherits: [base_agent]
capabilities: [code, read, search, test]
locks: "execute:role(junior); deploy:role(senior); control:owner()"
max_concurrent_tasks: 2
timeout: 600
queue: engineering
senior_coder:
inherits: [coder]
capabilities: [deploy, review, mentor] # appended
max_concurrent_tasks: 4
max_tokens: 16000
rate_limited_coder:
inherits: [rate_limited, senior_coder] # senior_coder wins on conflict
timeout: 120
rate_limited:
rate_limit: "100/60s"
rate_limit_strategy: sliding_window
dynamic_worker:
inherits: [base_agent]
timeout: "callable:hermes.templates.dynamic_timeout()"
capabilities: "callable:hermes.templates.resolve_caps(${task_type})"Template Engine
class TemplateEngine:
"""
Parse templates, resolve inheritance, evaluate callables.
Cached in Redis Hash hermes:template:{name}.
"""
def resolve(self, template_name: str) -> dict:
"""Resolve template with inheritance into flat config dict."""
...
def _resolve_inheritance(self, template: dict) -> dict:
"""
Multiple inheritance, right-to-left precedence.
Capabilities lists: deduplication merge (not replace).
Scalar values: child overrides parent.
"""
...
def evaluate_callables(self, template: dict, context: dict = None) -> dict:
"""
Evaluate callable fields at spawn time.
"callable:module.func()" -> call function, return value.
"${var}" -> substitute from context dict.
"""
...Merge Rules
| Field Type | Merge Strategy |
|---|---|
capabilities |
Deduplication union |
locks |
Override (child wins) |
| Other lists | Deduplication union |
| Dicts | Recursive merge |
| Scalars | Override (child wins) |
Spawn Flow
spawn_agent(template_name, context, overrides):
1. resolve(template_name) # parse inheritance
2. evaluate_callables(config, context) # resolve callables and variables
3. apply overrides # runtime overrides
4. create Redis state (Agent Hash) # hermes:agent:{id}
5. init CapabilitySet from template # grant_source("template", caps)
6. init Locks from template # set_locks(lock_string)
7. auto-subscribe channels # join auto_subscribe list
8. at_creation() hook
9. tag with template_key # hermes:agent:{id}:template_key
10. transition("active")
Diff-Based Batch Update
When a template changes, find all spawned agents and apply incremental diffs:
def diff_template(old_config, new_config) -> dict:
"""Generate incremental operations: capabilities add/remove, locks changed, etc."""
...
async def update_agents_from_template(template_name: str):
"""Apply diff to all agents spawned from this template."""
...Section 7: CLI Command Layer
Inspired by Evennia Command matching + disambiguation + REPL continuous input. Exposes all subsystems as MUD-style commands.
Two Interaction Modes
# Mode 1: Single command (enhanced existing)
hermes spawn coder
hermes tell coder-3 status?
hermes status
# Mode 2: REPL (MUD-style continuous session)
$ hermes repl
[hermes] > spawn coder
[hermes] > look
[hermes] > say hello everyone
[hermes] > quitCommand Base Class
class Command:
name: str
aliases: list[str]
help_text: str
lock: str # e.g. "use:role(admin)"
def at_pre_cmd(self, ctx) -> bool: ... # True = skip
def parse(self, ctx): ...
def func(self, ctx): ... # main logic
def at_post_cmd(self, ctx): ...
def match(self, input_str: str) -> bool:
"""Prefix matching, longest alias first."""
...Command Matching and Disambiguation
1. Exact match
2. Unique prefix match
3. Multiple matches -> prompt for disambiguation
MUD shorthand mapping:
'hello -> say hello
:thinking -> emote thinking
2-spawn -> select 2nd matching command
Command Execution Pipeline
execute_command(raw_input):
1. Match command (with MUD shorthand fallback)
2. Permission check (lock string)
3. at_pre_cmd() hook (can abort)
4. parse() arguments
5. func() execution
6. at_post_cmd() cleanup
Command List
| Category | Commands |
|---|---|
| Agent management | spawn, stop, pause, resume, restart, status |
| Communication | say, tell, emote, channel, channels, history |
| Capability/Permission | grant, revoke, capabilities |
| Template | template list, template show, template apply |
| Task | task list, task submit, task show |
| Info | look (l), help, quit |
Redis Data Layout Summary
# Agent state
hermes:agent:{id} Hash agent metadata
hermes:agent:{id}:lifecycle Hash pause metadata
hermes:agent:{id}:template_key String template name
# Capability sets
hermes:agent:{id}:cap_sources ZSet (source, priority)
hermes:agent:{id}:caps:{source} Hash cap_name -> metadata_json
hermes:agent:{id}:caps_merged Hash cached merge result (TTL 60s)
# ACL
hermes:agent:{id}:locks Hash access_type -> lock_string
# Channels
hermes:channel:{name} Hash channel metadata
hermes:channel:{name}:subscribers Set agent_ids
hermes:channel:{name}:banlist Set banned agent_ids
hermes:channel:{name}:mutelist Set muted agent_ids
hermes:channel:{name}:stream Stream message history (maxlen=1000)
hermes:channel:registry Set all channel names
# Templates
hermes:template:{name} Hash resolved flat config
# Ticker pool
hermes:ticker:{interval} Set subscriber JSON entries
# Template→instance index
hermes:template:{name}:instances Set spawned agent_ids
Evennia Pattern Mapping
| Hermes Component | Evennia Source | Key Adaptation |
|---|---|---|
| Agent Entity | Typeclass + IDMapper | Redis Hash replaces Django model; simple dict cache replaces metaclass |
| Capability Set | CmdSet merge engine | Same 4 set operations; capabilities replace commands |
| ACL | Lock system | Same string format + safe eval; lock functions bind to Redis state |
| Channel | Channel/Comms system | Redis pub-sub + Stream replaces Django Msg; same hook chain |
| Lifecycle | Script state machine | ARQ worker states replace Twisted LoopingCall |
| Template | Prototype spawning | YAML + callable syntax; same inheritance semantics |
| CLI | Command + cmdparser | click + REPL; same match/disambiguate pipeline |
Implementation Priority
Suggested build order (each builds on the previous):
- Agent Entity Model — foundation, everything depends on it
- ACL System — small, self-contained, immediately useful
- Capability Set — builds on Agent, enables permission-aware dispatch
- Channel System — real-time collaboration, builds on ACL for access control
- Template System — builds on Capability + ACL for template-driven spawning
- CLI Command Layer — ties everything together into MUD-style interface
- Lifecycle Management — cross-cutting, can be added incrementally