My Blog
← Back to home

Hermes Multi-Agent System: MUD Pattern Design

Hermes Multi-Agent System: MUD Pattern Design

Date: 2026-04-27 Status: Approved Approach: C — Native reimplementation of MUD abstractions for Hermes's Redis + ARQ architecture

Background

Hermes is a CLI-based multi-agent orchestration system with an existing working prototype:

  • Redis for messaging (ARQ queues + pub-sub) and state storage (Hash, ZSet, String)
  • ARQ workers for concurrent task execution (verified: 2 workers consuming 4 tasks concurrently)
  • Hermes dispatcher (lib/dispatcher.py) routing tasks from config/agents.yaml owner→queue mapping
  • CLI (hermes task list/show, hermes events) reading from Redis

This design extracts conceptual patterns from Evennia (a Python MUD framework) and reimplements them natively for Hermes, without introducing Django/Twisted dependencies.

Architecture Overview

┌─────────────────────────────────────────────┐
│              CLI Layer (click/REPL)          │  MUD-style command interaction
├─────────────────────────────────────────────┤
│  Command Router                             │  Match, disambiguate, dispatch
├─────────────────────────────────────────────┤
│  Hermes Core                                │
│  ┌─────────┬──────────┬──────────┬────────┐ │
│  │Capability│  Channel │   ACL    │Template│ │  Four core subsystems
│  │  Set     │  System  │  System  │ Engine │ │
│  └────┬────┴────┬─────┴────┬─────┴───┬────┘ │
│       │         │          │         │      │
│  ┌────┴─────────┴──────────┴─────────┴────┐ │
│  │          Agent Entity Model             │ │  Unified entity abstraction
│  └────────────────┬───────────────────────┘ │
├───────────────────┼─────────────────────────┤
│  Redis Backend    │  ARQ Worker Layer       │  Existing infrastructure
│  (state/pubsub)   │  (task dispatch)        │
└───────────────────┴─────────────────────────┘

Section 1: Agent Entity Model

Inspired by Evennia's Typeclass (data/behavior separation). Data lives in Redis, behavior in Python classes, linked by agent_type field.

Redis Storage

Key: hermes:agent:{agent_id}
Type: Hash
Fields:
  agent_id      — unique identifier
  agent_type    — Python class path (e.g. "hermes.agents.CoderAgent")
  name          — display name
  owner         — owning owner
  queue         — bound ARQ queue
  status        — created | active | paused | stopped
  template      — template name used at creation
  created_at    — creation timestamp
  last_active   — last activity timestamp

Agent Class

class Agent:
    """Unified entity model. Data in Redis Hash, behavior in Python class."""
 
    def __init__(self, agent_id: str):
        self.agent_id = agent_id
        self._data = None
 
    @property
    def data(self) -> dict:
        """Lazy load from Redis with local cache."""
        if self._data is None:
            self._data = redis.hgetall(f"hermes:agent:{self.agent_id}")
        return self._data
 
    def save(self):
        redis.hmset(f"hermes:agent:{self.agent_id}", self._data)
 
    # Lazy-init handlers (pay only for what you use)
    @lazy_property
    def capabilities(self) -> CapabilityHandler: ...
 
    @lazy_property
    def locks(self) -> LockHandler: ...
 
    @lazy_property
    def channels(self) -> ChannelSubscriptionHandler: ...
 
    # Message delivery (environment-dependent)
    def deliver(self, message: "Message"):
        """
        Receive a message. Behavior depends on runtime context:
        - CLI environment: print to terminal
        - Worker environment: enqueue as ARQ task for the agent's worker
        """
        ...
 
    # Lifecycle hooks (override in subclasses)
    def at_creation(self): ...
    def at_activate(self): ...
    def at_pause(self): ...
    def at_resume(self): ...
    def at_stop(self): ...

AgentRegistry and ChannelRegistry

Global lookup caches ensuring identity consistency and fast access:

class AgentRegistry:
    """In-process cache: {agent_id: Agent}. Auto-populated on first access."""
    _cache: dict[str, Agent] = {}
 
    @classmethod
    def get(cls, agent_id: str) -> Agent:
        if agent_id not in cls._cache:
            cls._cache[agent_id] = Agent(agent_id)
        return cls._cache[agent_id]
 
    @classmethod
    def find(cls, name_or_id: str) -> Agent | None:
        """Find by agent_id, name prefix, or alias. Returns None if not found."""
        ...
 
class ChannelRegistry:
    """Channel lookup by name. Channels are loaded from Redis on first access."""
    _cache: dict[str, Channel] = {}
 
    @classmethod
    def get(cls, name: str) -> Channel:
        if name not in cls._cache:
            cls._cache[name] = Channel(name)
        return cls._cache[name]

Agent Identity Cache

The AgentRegistry._cache dict serves as the identity cache (inspired by Evennia's IDMapper), ensuring one Python instance per agent within a Hermes process.

Key Design Decisions

  • One Redis Hash per agent, compatible with existing hermes task list/show Redis reads
  • agent_type replaces Evennia's db_typeclass_path — same concept, agent-oriented naming
  • Handler lazy initialization — not every agent needs every handler
  • No new database introduced

Section 2: Capability Set (Dynamic Capability Composition)

Inspired by Evennia CmdSet's set-algebra merging. Capability items map directly to Hermes tools/operations/queues.

Data Structures

class Capability:
    """Single capability item."""
    name: str               # "code", "deploy", "review", "debug"
    metadata: dict           # optional: {"timeout": 300, "max_tokens": 8000}
 
    def match(self, input_str: str) -> bool:
        """Prefix matching (supports abbreviations)."""
        return self.name.startswith(input_str) or input_str.startswith(self.name[:len(input_str)])
 
 
class CapabilitySet:
    """Capability collection supporting four merge operations."""
 
    priority: int           # higher = higher priority
    mergetype: str          # "union" | "intersect" | "replace" | "remove"
    source: str             # origin: "role", "task", "channel", "override", "template"
 
    _caps: dict[str, Capability]
 
    def __add__(self, other: "CapabilitySet") -> "CapabilitySet":
        """Merge two CapabilitySets. Higher priority's mergetype wins."""
        ...

Four Merge Operations

Operation Behavior Use Case
Union Combine both; same-name resolved by priority Role + task capabilities stacking
Intersect Keep only capabilities in both sets Security sandbox constraining
Replace Higher-priority set completely overrides Locked mode with restricted capability set
Remove Remove specified capabilities without adding Temporary revocation of debug access

Multi-Source Merge Pipeline

class CapabilityHandler:
    """
    Per-agent handler managing multi-source CapabilitySet merging.
 
    Redis storage:
      Capability sources: SortedSet hermes:agent:{id}:cap_sources (score=priority)
      Per-source caps:    Hash        hermes:agent:{id}:caps:{source}
      Merged cache:       Hash        hermes:agent:{id}:caps_merged (TTL 60s)
    """
 
    def get_merged(self) -> CapabilitySet:
        """Merge all sources from low to high priority."""
        sources = redis.zrange(f"hermes:agent:{self.agent.agent_id}:cap_sources", 0, -1)
        result = CapabilitySet()
        for source_name in sources:
            capset = self._load_source(source_name)
            result = result + capset
        return result
 
    def grant(self, cap_name: str, source: str = "override", **metadata):
        """Dynamically grant capability."""
        ...
 
    def revoke(self, cap_name: str, source: str = "override"):
        """Revoke capability from a source."""
        ...
 
    def grant_source(self, source: str, cap_names: list[str]):
        """Replace entire source (used by template loading)."""
        ...
 
    def check(self, cap_name: str) -> bool:
        """Check if merged set contains capability."""
        merged = self.get_merged()
        return any(c.match(cap_name) for c in merged._caps.values())

Dispatcher Integration

# Existing lib/dispatcher.py routing enhanced with capability check
def dispatch_task(task, target_agent):
    if not target_agent.capabilities.check(task.required_cap):
        raise CapabilityDenied(f"Agent lacks: {task.required_cap}")
    await arq_pool.enqueue(task.queue, task.payload)

Section 3: ACL System (Permission and Access Control)

Inspired by Evennia Lock's declarative string ACL + safe evaluation. Lock functions bind directly to Hermes agent attributes.

Core Structure

class LockHandler:
    """
    Per-agent access control.
 
    Redis storage:
      Key: hermes:agent:{id}:locks
      Type: Hash
      Field: access_type -> lock_string
    """
 
    _locks: dict[str, ParsedLock]
 
    def check(self, access_type: str, caller: "Agent", default: bool = False) -> bool:
        """
        Check if caller has access_type permission.
        Fail-closed: unknown access types return default (False).
        """
        if caller.data.get("is_superuser"):
            return True
        parsed = self._locks.get(access_type)
        if parsed is None:
            return default
        results = tuple(func(caller, self.agent, *args, **kwargs)
                        for func, args, kwargs in parsed.func_calls)
        return eval(parsed.eval_template % results)
 
    def set_locks(self, lockstring: str):
        """Parse and store lock string."""
        self._locks = parse_lockstring(lockstring)
        # Persist to Redis
        redis.hmset(f"hermes:agent:{self.agent.agent_id}:locks",
                     {k: v.raw for k, v in self._locks.items()})

Lock String Format

Same syntax as Evennia: access_type:func1(args) [AND|OR|NOT] func2(args); access_type2:...

Examples:

execute:role(junior) AND NOT status(paused)
deploy:queue(engineering) OR role(admin)
message:role(member) AND attr(muted, eq, false)
control:owner() OR role(admin)

Parsing and Safe Evaluation

class ParsedLock:
    """Pre-compiled lock string."""
    eval_template: str    # "%s and not %s"
    func_calls: list      # [(callable, args, kwargs), ...]
    raw: str              # original string for storage
 
def parse_lockstring(lockstring: str) -> dict[str, ParsedLock]:
    """
    Parse lock string into {access_type: ParsedLock}.
 
    Safety: eval_template only allows and/or/not, pre-validated with
    empty-namespace eval. Only boolean results are ever interpolated.
    """
    ...

Hermes Lock Functions

All share signature (caller, target, *args) -> bool. Registered via decorator.

Function Purpose
role(name) Role hierarchy check (admin > senior > junior)
owner() Caller is target's owner
queue(name) Caller bound to specified queue
status(name) Target's current status
attr(name, op, value) Target Redis attribute with comparison operators (eq, ne, gt, lt)
cap(name) Caller has specified capability (calls CapabilityHandler)
true() / false() Always pass/fail

Relationship with Capability Set

Permission check chain:
  CLI/Agent initiates operation
    -> LockHandler.check("execute", caller)     # "Are you allowed to act on this agent?"
    -> CapabilityHandler.check("deploy")         # "Does your agent have this capability?"
    -> Execute operation

Lock = "who can do what on your agent". Capability = "what your agent can do". Two independent but complementary layers.


Section 4: Channel System (Real-time Collaboration + CLI Chat)

Inspired by Evennia Channel's pub-sub + hook chain + ban/mute. Built natively on Redis pub-sub and Stream.

Core Structures

class Channel:
    """
    Channel entity.
 
    Redis storage:
      Metadata:    Hash    hermes:channel:{name}
      Subscribers: Set     hermes:channel:{name}:subscribers
      Ban list:    Set     hermes:channel:{name}:banlist
      Mute list:   Set     hermes:channel:{name}:mutelist
      History:     Stream  hermes:channel:{name}:stream  (maxlen=1000)
    """
    name: str
    locks: LockHandler
    send_to_active_only: bool = True
 
    # Hook chain (override in subclasses)
    def at_pre_msg(self, message: "Message") -> "Message | None":
        """Pre-broadcast. Return None to cancel."""
        return message
 
    def at_post_msg(self, message: "Message"):
        """Post-broadcast (default: write to Redis Stream)."""
        ...
 
 
class Message:
    sender_id: str          # agent_id or "user:{user_id}"
    channel: str            # channel name (empty = direct message)
    content: str
    msg_type: str           # "say" | "tell" | "emote" | "system" | "task"
    timestamp: float
    metadata: dict

Three-Stage Message Pipeline

broadcast(message):
  1. Channel-level:    at_pre_msg(message)         # filter/format, can abort
  2. Per-recipient:    at_pre_channel_msg           # per-recipient customization
                     -> deliver(processed)           # actual delivery
                     -> at_post_channel_msg          # per-recipient cleanup
  3. Channel-level:    at_post_msg(message)         # logging, persistence

Channel Types

Type Purpose Lock Defaults
RoomChannel Workspace — agents in same task/project auto-join join:true(); send:true(); receive:true()
DirectChannel 1-on-1 DM, auto-created join:owner() OR self(); send:true(); receive:true()
BroadcastChannel System announcements join:true(); send:role(admin); receive:true()

Agent Subscription Handler

class ChannelSubscriptionHandler:
    """Per-agent handler managing channel subscriptions."""
 
    def join(self, channel_name: str):
        channel = ChannelRegistry.get(channel_name)
        channel.subscribe(self.agent.agent_id)
 
    def leave(self, channel_name: str):
        channel = ChannelRegistry.get(channel_name)
        channel.unsubscribe(self.agent.agent_id)
 
    def tell(self, target_id: str, content: str):
        """Direct message."""
        target = AgentRegistry.get(target_id)
        if not target.locks.check("message", self.agent):
            raise AccessDenied(f"Cannot message {target_id}")
        target.deliver(Message(sender_id=self.agent.agent_id,
                                content=content, msg_type="tell"))
 
    def get_history(self, channel_name: str, count: int = 20) -> list[Message]:
        """Read channel history from Redis Stream."""
        entries = redis.xrevrange(f"hermes:channel:{channel_name}:stream", count=count)
        return [Message.from_stream(e) for e in entries]

Section 5: Agent Lifecycle Management

Inspired by Evennia Script's state machine + four timing strategies. Implemented directly on ARQ workers and Redis.

State Machine

                 spawn()
                   |
                   v
              +----------+
              | created  |
              +-----+----+
                    | start()
                    v
             +--------------+
        +--->|    active    |<---+
        |    +------+-------+    |
        |  pause()  |            | resume()
        |           v            |
        |    +--------------+    |
        |    |   paused     |----+
        |    +--------------+
        |                        stop()
        |  is_valid()=False       |
        +-------------------------+
                                  v
                            +----------+
                            | stopped  |
                            +-----+----+
                                  | delete()
                                  v
                                 (gone)

Valid transitions:
  created  -> {active}
  active   -> {paused, stopped}
  paused   -> {active, stopped}
  stopped  -> {active, gone}

Lifecycle Handler

class AgentLifecycle:
    """Per-agent lifecycle manager. State stored in Redis Hash `status` field."""
 
    TRANSITIONS = {
        "created":  {"active"},
        "active":   {"paused", "stopped"},
        "paused":   {"active", "stopped"},
        "stopped":  {"active", "gone"},
    }
 
    def transition(self, target_status: str, **kwargs):
        current = self.agent.data.get("status", "created")
        if target_status not in self.TRANSITIONS.get(current, set()):
            raise InvalidTransition(f"{current} -> {target_status}")
        # Pre-hook
        getattr(self.agent, f"at_{target_status}")(current_status=current, **kwargs)
        # Update Redis
        self.agent.data["status"] = target_status
        self.agent.data["last_active"] = str(time.time())
        self.agent.save()
        # Publish event
        redis.publish(f"hermes:lifecycle:{self.agent.agent_id}",
                      json.dumps({"event": target_status, "ts": time.time()}))

Four Timing Strategies

Strategy Evennia Source Hermes Implementation Use Case
Repeating Script interval + repeats ARQ cron_jobs Periodic health checks
Ticker Pool TICKER_HANDLER shared timers asyncio timer + Redis Set subscribers Shared-interval callbacks
One-shot TASK_HANDLER deferLater ARQ delayed task (existing) Delayed notifications
On-demand ON_DEMAND_HANDLER lazy eval Compute from Redis timestamp on query Idle stage detection (no timer)

On-demand example:

def get_agent_state(agent: Agent) -> dict:
    """Zero-timer state computation from elapsed time."""
    elapsed = time.time() - float(agent.data.get("last_active", 0))
    stages = [
        (0,   "active"),
        (300, "idle"),
        (600, "idle_warning"),
        (900, "auto_pause"),
    ]
    stage = "active"
    for threshold, name in stages:
        if elapsed >= threshold:
            stage = name
    return {"stage": stage, "elapsed": elapsed}

Section 6: Agent Template System

Inspired by Evennia Prototype's dict template + multiple inheritance + callable late binding. Built on existing agents.yaml.

Template Definition (agents.yaml extension)

templates:
  base_agent:
    capabilities: [status, message]
    locks: "execute:true(); message:true()"
    max_concurrent_tasks: 1
    timeout: 300
 
  coder:
    inherits: [base_agent]
    capabilities: [code, read, search, test]
    locks: "execute:role(junior); deploy:role(senior); control:owner()"
    max_concurrent_tasks: 2
    timeout: 600
    queue: engineering
 
  senior_coder:
    inherits: [coder]
    capabilities: [deploy, review, mentor]    # appended
    max_concurrent_tasks: 4
    max_tokens: 16000
 
  rate_limited_coder:
    inherits: [rate_limited, senior_coder]    # senior_coder wins on conflict
    timeout: 120
 
  rate_limited:
    rate_limit: "100/60s"
    rate_limit_strategy: sliding_window
 
  dynamic_worker:
    inherits: [base_agent]
    timeout: "callable:hermes.templates.dynamic_timeout()"
    capabilities: "callable:hermes.templates.resolve_caps(${task_type})"

Template Engine

class TemplateEngine:
    """
    Parse templates, resolve inheritance, evaluate callables.
    Cached in Redis Hash hermes:template:{name}.
    """
 
    def resolve(self, template_name: str) -> dict:
        """Resolve template with inheritance into flat config dict."""
        ...
 
    def _resolve_inheritance(self, template: dict) -> dict:
        """
        Multiple inheritance, right-to-left precedence.
        Capabilities lists: deduplication merge (not replace).
        Scalar values: child overrides parent.
        """
        ...
 
    def evaluate_callables(self, template: dict, context: dict = None) -> dict:
        """
        Evaluate callable fields at spawn time.
        "callable:module.func()" -> call function, return value.
        "${var}" -> substitute from context dict.
        """
        ...

Merge Rules

Field Type Merge Strategy
capabilities Deduplication union
locks Override (child wins)
Other lists Deduplication union
Dicts Recursive merge
Scalars Override (child wins)

Spawn Flow

spawn_agent(template_name, context, overrides):
  1. resolve(template_name)              # parse inheritance
  2. evaluate_callables(config, context) # resolve callables and variables
  3. apply overrides                     # runtime overrides
  4. create Redis state (Agent Hash)     # hermes:agent:{id}
  5. init CapabilitySet from template    # grant_source("template", caps)
  6. init Locks from template            # set_locks(lock_string)
  7. auto-subscribe channels             # join auto_subscribe list
  8. at_creation() hook
  9. tag with template_key               # hermes:agent:{id}:template_key
  10. transition("active")

Diff-Based Batch Update

When a template changes, find all spawned agents and apply incremental diffs:

def diff_template(old_config, new_config) -> dict:
    """Generate incremental operations: capabilities add/remove, locks changed, etc."""
    ...
 
async def update_agents_from_template(template_name: str):
    """Apply diff to all agents spawned from this template."""
    ...

Section 7: CLI Command Layer

Inspired by Evennia Command matching + disambiguation + REPL continuous input. Exposes all subsystems as MUD-style commands.

Two Interaction Modes

# Mode 1: Single command (enhanced existing)
hermes spawn coder
hermes tell coder-3 status?
hermes status
 
# Mode 2: REPL (MUD-style continuous session)
$ hermes repl
[hermes] > spawn coder
[hermes] > look
[hermes] > say hello everyone
[hermes] > quit

Command Base Class

class Command:
    name: str
    aliases: list[str]
    help_text: str
    lock: str               # e.g. "use:role(admin)"
 
    def at_pre_cmd(self, ctx) -> bool: ...   # True = skip
    def parse(self, ctx): ...
    def func(self, ctx): ...                  # main logic
    def at_post_cmd(self, ctx): ...
 
    def match(self, input_str: str) -> bool:
        """Prefix matching, longest alias first."""
        ...

Command Matching and Disambiguation

1. Exact match
2. Unique prefix match
3. Multiple matches -> prompt for disambiguation

MUD shorthand mapping:

'hello     -> say hello
:thinking  -> emote thinking
2-spawn    -> select 2nd matching command

Command Execution Pipeline

execute_command(raw_input):
  1. Match command (with MUD shorthand fallback)
  2. Permission check (lock string)
  3. at_pre_cmd() hook (can abort)
  4. parse() arguments
  5. func() execution
  6. at_post_cmd() cleanup

Command List

Category Commands
Agent management spawn, stop, pause, resume, restart, status
Communication say, tell, emote, channel, channels, history
Capability/Permission grant, revoke, capabilities
Template template list, template show, template apply
Task task list, task submit, task show
Info look (l), help, quit

Redis Data Layout Summary

# Agent state
hermes:agent:{id}                          Hash    agent metadata
hermes:agent:{id}:lifecycle                Hash    pause metadata
hermes:agent:{id}:template_key             String  template name

# Capability sets
hermes:agent:{id}:cap_sources              ZSet    (source, priority)
hermes:agent:{id}:caps:{source}            Hash    cap_name -> metadata_json
hermes:agent:{id}:caps_merged              Hash    cached merge result (TTL 60s)

# ACL
hermes:agent:{id}:locks                    Hash    access_type -> lock_string

# Channels
hermes:channel:{name}                      Hash    channel metadata
hermes:channel:{name}:subscribers          Set     agent_ids
hermes:channel:{name}:banlist              Set     banned agent_ids
hermes:channel:{name}:mutelist             Set     muted agent_ids
hermes:channel:{name}:stream               Stream  message history (maxlen=1000)
hermes:channel:registry                    Set     all channel names

# Templates
hermes:template:{name}                     Hash    resolved flat config

# Ticker pool
hermes:ticker:{interval}                   Set     subscriber JSON entries

# Template→instance index
hermes:template:{name}:instances           Set     spawned agent_ids

Evennia Pattern Mapping

Hermes Component Evennia Source Key Adaptation
Agent Entity Typeclass + IDMapper Redis Hash replaces Django model; simple dict cache replaces metaclass
Capability Set CmdSet merge engine Same 4 set operations; capabilities replace commands
ACL Lock system Same string format + safe eval; lock functions bind to Redis state
Channel Channel/Comms system Redis pub-sub + Stream replaces Django Msg; same hook chain
Lifecycle Script state machine ARQ worker states replace Twisted LoopingCall
Template Prototype spawning YAML + callable syntax; same inheritance semantics
CLI Command + cmdparser click + REPL; same match/disambiguate pipeline

Implementation Priority

Suggested build order (each builds on the previous):

  1. Agent Entity Model — foundation, everything depends on it
  2. ACL System — small, self-contained, immediately useful
  3. Capability Set — builds on Agent, enables permission-aware dispatch
  4. Channel System — real-time collaboration, builds on ACL for access control
  5. Template System — builds on Capability + ACL for template-driven spawning
  6. CLI Command Layer — ties everything together into MUD-style interface
  7. Lifecycle Management — cross-cutting, can be added incrementally