Worker-Agent Connection Validation#
Overview#
This document describes the connection validation mechanism between Worker processes and Agent processes in the Tensor Bus system.
Design Level: Level 1 - Basic Validation + Error Handling
Philosophy: Simple, robust, following the KISS principle. Approximately 20 lines of core code.
Problem Statement#
Workers and Agents run in separate process groups (launched by separate torchrun invocations). The connection challenges:
Agent Existence: How does Worker know the Agent exists and is running?
Connection Verification: How to detect if Agent has crashed or is unresponsive?
Error Handling: How to provide friendly error messages when connection fails?
Path Correctness: How to detect if Worker is trying to connect to wrong agent_rank?
Architecture#
Communication Channels#
Worker → Agent: CommandQueue LMDB (commands)
Agent → Worker: State LMDB (pair states + heartbeat)
Data Structure#
Agent maintains heartbeat in State LMDB:
# State LMDB keys
b"agent:heartbeat" -> b"1234567890.123" # Unix timestamp
b"pair:{name}:state" -> msgpack(PairState) # Existing pair states
Connection Flow#
sequenceDiagram
participant W as Worker Process
participant LMDB as Agent State LMDB
participant CQ as CommandQueue LMDB
participant A as Agent Process
Note over A: Agent Startup
A->>LMDB: Create State LMDB
A->>LMDB: Write heartbeat (timestamp)
A->>CQ: Create CommandQueue LMDB
Note over A: Enter main loop
Note over W: Worker Startup (determines agent_rank)
W->>W: agent_rank = AGENT_RANK or LOCAL_RANK + OFFSET
Note over W: Connection Validation Phase
W->>LMDB: Check if State LMDB exists
alt LMDB does not exist
LMDB-->>W: FileNotFoundError
W->>W: Wait 100ms, retry
Note over W: Retry until timeout (30s)
W-->>W: Raise ConnectionError("Agent not found")
end
W->>LMDB: Read heartbeat
W->>W: Check if heartbeat fresh (< 5s old)
alt Heartbeat too old
W->>W: Wait 100ms, retry
Note over W: Retry until timeout (30s)
W-->>W: Raise ConnectionError("Agent not responding")
end
Note over W: Connection established ✓
W->>CQ: Enqueue RegisterPair command
Note over A: Main Loop Iteration
A->>LMDB: Update heartbeat (every iteration)
A->>CQ: Check for commands
CQ-->>A: RegisterPair message
A->>A: Process RegisterPair
A->>LMDB: Write PairState
Note over W: Wait for pair matching
loop Poll State LMDB
W->>LMDB: Read PairState
alt PairState.status == "matched"
LMDB-->>W: PairState
W->>W: Return PairHandler ✓
else Still waiting
W->>W: Sleep 10ms, continue polling
end
end
Note over W,A: Normal operation
Implementation#
Agent Side: Heartbeat Mechanism#
File: src/etha/tensor_bus/agent.py
class TensorBusAgent:
def __init__(self, ..., lmdb_state_path: str | None = None):
# ... existing initialization ...
if lmdb_state_path:
self.state_env = lmdb.open(lmdb_state_path, ...)
self.state_db = self.state_env.open_db(b"pair_state")
# Write initial heartbeat
self._update_heartbeat()
logger.info(f"Agent {rank}: Initial heartbeat written")
def run(self):
"""Main loop: update heartbeat and process commands."""
while True:
# Update heartbeat every iteration (~1ms interval)
self._update_heartbeat()
# Process commands from CommandQueue
if self.command_queue.size() != 0:
msg = self.command_queue.dequeue()
if msg is not None:
self._handle_command(msg)
time.sleep(0.001)
def _update_heartbeat(self):
"""Write current timestamp to State LMDB."""
if self.state_env and self.state_db:
with self.state_env.begin(write=True, db=self.state_db) as txn:
txn.put(b"agent:heartbeat", str(time.time()).encode())
Worker Side: Connection Validation#
File: src/etha/tensor_bus/client.py
class TensorBusClient:
def __init__(
self,
lmdb_command_queue_path: str,
agent_state_lmdb_path: str,
):
# Phase 1: Validate Agent is running
self._validate_agent_connection(agent_state_lmdb_path, timeout=30.0)
# Phase 2: Open connections (safe now)
self.state_env = lmdb.open(agent_state_lmdb_path, readonly=True, ...)
self.state_db = self.state_env.open_db(b"pair_state")
self.command_queue = CommandQueue(lmdb_command_queue_path)
def _validate_agent_connection(self, path: str, timeout: float):
"""Validate Agent is running and responsive.
Checks:
1. LMDB file exists
2. Heartbeat is fresh (< 5 seconds old)
Raises:
ConnectionError: Agent not found or not responding
"""
start_time = time.time()
# Step 1: Wait for LMDB file to exist
while not os.path.exists(path):
if time.time() - start_time > timeout:
raise ConnectionError(
f"Agent LMDB not found at {path}. "
f"Please ensure Agent process is running. "
f"Check AGENT_RANK environment variable."
)
time.sleep(0.1)
# Step 2: Verify heartbeat is fresh
env = lmdb.open(path, readonly=True, lock=False, subdir=False, max_dbs=2)
db = env.open_db(b"pair_state")
while True:
with env.begin(db=db) as txn:
heartbeat_bytes = txn.get(b"agent:heartbeat")
if heartbeat_bytes:
heartbeat_time = float(heartbeat_bytes.decode())
age = time.time() - heartbeat_time
if age < 5.0: # Heartbeat is fresh
env.close()
logger.info(f"Agent connection validated (heartbeat age: {age:.2f}s)")
return # Success!
if time.time() - start_time > timeout:
env.close()
raise ConnectionError(
f"Agent at {path} is not responding. "
f"Heartbeat age > 5s. Agent may have crashed."
)
time.sleep(0.1)
Error Scenarios#
Scenario |
Detection |
Error Message |
Resolution |
|---|---|---|---|
Agent not started |
LMDB file missing |
|
Start Agent process first |
Agent crashed |
Heartbeat too old (> 5s) |
|
Restart Agent process |
Wrong AGENT_RANK |
LMDB path incorrect |
|
Fix AGENT_RANK value |
LMDB permission error |
Open fails |
LMDB exception with file permissions |
Fix file permissions |
Configuration#
Environment Variables#
# Worker side: Specify which Agent to connect to
AGENT_RANK=0 # Direct specification (priority 1)
# OR
AGENT_RANK_OFFSET=0 # Offset-based calculation (priority 2)
LOCAL_RANK=0 # From torchrun
Benefits#
✅ Simple: ~20 lines of core code, minimal complexity ✅ Robust: Covers Agent not found, crashed, wrong path scenarios ✅ Friendly: Clear error messages with actionable guidance ✅ Zero overhead: Heartbeat updates reuse main loop, no extra threads ✅ Non-intrusive: No changes to existing RegisterPair protocol
Limitations (Future Work - Level 2)#
This Level 1 design does NOT prevent:
❌ Multiple Workers connecting to same Agent: No mutual exclusion mechanism ❌ Agent tracking connected Workers: Agent doesn’t know who’s connected ❌ Automatic reconnection: No recovery from connection loss during operation
These features require Level 2 implementation:
Explicit Connect/ACK handshake protocol
Connection state machine
Worker identity registration in Agent
Testing#
Test Cases#
# Test 1: Agent not running
AGENT_RANK=0 python worker.py
# Expected: ConnectionError after 30s timeout
# Test 2: Agent crashes after connection
# - Start Agent
# - Worker connects successfully
# - Kill Agent
# - Worker's next operation should detect stale heartbeat
# Test 3: Wrong AGENT_RANK
AGENT_RANK=99 python worker.py # No such Agent
# Expected: ConnectionError with clear message
# Test 4: Normal operation
# - Start Agent rank 0
# - Start Worker with AGENT_RANK=0
# Expected: Connection succeeds, heartbeat validated
Manual Testing#
# Terminal 1: Start Agent
torchrun --nproc_per_node=8 prototyping/pair_registration_demo/agent.py
# Terminal 2: Try to connect before Agent is ready
AGENT_RANK=0 python -c "from etha.tensor_bus import TensorBusClient; ..."
# Should retry and eventually succeed
# Terminal 3: Connect to non-existent Agent
AGENT_RANK=99 python -c "from etha.tensor_bus import TensorBusClient; ..."
# Should fail with clear error after timeout
References#
Bootstrap mechanism:
docs/bootstrap-mechanism.md(convention-based path naming)CommandQueue implementation:
src/etha/tensor_bus/command_queue.pyAgent implementation:
src/etha/tensor_bus/agent.pyClient implementation:
src/etha/tensor_bus/client.py