etha.tensor_bus.client#
Tensor Bus Client (Host-side).
Host processes use TensorBusClient to register pairs and communicate with Agents.
Attributes#
Classes#
Handler for batch tensor operations across multiple pairs. |
|
Host-side Tensor Bus Client. |
Functions#
|
Module Contents#
- class etha.tensor_bus.client.BatchHandler(client: TensorBusClient, batch_id: str, pair_names: list[str])#
Handler for batch tensor operations across multiple pairs.
Initialize BatchHandler.
- Parameters:
client – TensorBusClient instance
batch_id – Unique identifier for this batch
pair_names – List of pair names managed by this handler
- close(blocking: bool = True, timeout: float = 30.0)#
Explicitly cleanup batch state in agent.
Sends a CleanupBatch command to free resources associated with this batch. After calling close(), this handler should not be used.
- Parameters:
blocking – If True, block until cleanup completes
timeout – Timeout in seconds
- query_transfer_signal(blocking: bool = True, timeout: float = 30.0) bool#
Query transfer signal status for this batch.
- Returns:
Transfer signal status (True if sender has completed transfer)
- transfer(transfer_type: Literal['send', 'recv'], blocking: bool = False, timeout: float = 30.0) posix_ipc.Semaphore#
Transfer all tensors in this batch atomically.
Sends a single Transfer command to execute all pairs in the batch simultaneously using flattened chunks/buckets for optimal performance.
- Parameters:
transfer_type – “send” or “recv”
blocking – If True, block until transfer completes
timeout – Timeout in seconds
- Returns:
Semaphore for operation completion
- batch_id#
- property client: TensorBusClient#
Get client from weak reference.
- Raises:
RuntimeError – If client has been garbage collected
- pair_names#
- class etha.tensor_bus.client.TensorBusClient(agent_rank: int, lmdb_command_queue_path: str | None = None, agent_state_lmdb_path: str | None = None, connection_timeout: float = 30.0)#
Host-side Tensor Bus Client.
Initialize TensorBusClient.
- Parameters:
lmdb_command_queue_path – Path to Agent’s CommandQueue LMDB
agent_state_lmdb_path – Path to Agent’s State LMDB
connection_timeout – Max time to wait for Agent connection (seconds)
- Raises:
ValueError – If agent_state_lmdb_path is not provided
ConnectionError – If Agent is not found or not responding
- close()#
Cleanup resources.
- init_pair(pair_name: str, local_name: str, remote_name: str, expected_world_size: int, device_mesh: torch.distributed.device_mesh.DeviceMesh | None = None, placements: tuple[torch.distributed.tensor.placement_types.Placement, Ellipsis] | None = None, blocking: bool = True, timeout: float = 30.0) None#
Register a pair for communication.
- Parameters:
pair_name – Unique identifier for this pair
local_name – Name of local peer
remote_name – Name of remote peer
expected_world_size – Number of ranks for local peer
device_mesh – Local device mesh configuration
placements – Local tensor placement strategy.
Shard,ReplicateandPartialare supported on the source side;Partialis collapsed toReplicatevia an all-reduce on the source mesh sub-group before send.Partialon the target side is rejected.
- Blocks until:
Agent registers to TCPStore
Remote peer registers
Pair is matched
- register_tensors(batch_id: str, tensors: list[tuple[torch.Tensor, str]], bucket_size: int | None = None, timeout: float = 30.0) BatchHandler#
Register multiple tensors across pairs.
This operation always blocks until registration completes, ensuring the returned BatchHandler is immediately usable.
- Parameters:
batch_id – Unique identifier for this batch (must be same on both send/recv sides)
tensors – list of (tensor, pair_name) tuples
bucket_size – optional bucket size in bytes for bucketization optimization
timeout – timeout in seconds
- Returns:
BatchHandler for managing the registered tensors
- transfer(batch_id: str, transfer_type: Literal['send', 'recv'], blocking: bool = False, timeout: float = 30.0) posix_ipc.Semaphore#
- agent_rank#
- agent_state_lmdb_path = None#
- command_queue#
- state_db = None#
- etha.tensor_bus.client.logger#