etha.tensor_bus.client#

Tensor Bus Client (Host-side).

Host processes use TensorBusClient to register pairs and communicate with Agents.

Attributes#

Classes#

BatchHandler

Handler for batch tensor operations across multiple pairs.

TensorBusClient

Host-side Tensor Bus Client.

Functions#

Module Contents#

class etha.tensor_bus.client.BatchHandler(client: TensorBusClient, batch_id: str, pair_names: list[str])#

Handler for batch tensor operations across multiple pairs.

Initialize BatchHandler.

Parameters:
  • client – TensorBusClient instance

  • batch_id – Unique identifier for this batch

  • pair_names – List of pair names managed by this handler

close(blocking: bool = True, timeout: float = 30.0)#

Explicitly cleanup batch state in agent.

Sends a CleanupBatch command to free resources associated with this batch. After calling close(), this handler should not be used.

Parameters:
  • blocking – If True, block until cleanup completes

  • timeout – Timeout in seconds

query_transfer_signal(blocking: bool = True, timeout: float = 30.0) bool#

Query transfer signal status for this batch.

Returns:

Transfer signal status (True if sender has completed transfer)

transfer(transfer_type: Literal['send', 'recv'], blocking: bool = False, timeout: float = 30.0) posix_ipc.Semaphore#

Transfer all tensors in this batch atomically.

Sends a single Transfer command to execute all pairs in the batch simultaneously using flattened chunks/buckets for optimal performance.

Parameters:
  • transfer_type – “send” or “recv”

  • blocking – If True, block until transfer completes

  • timeout – Timeout in seconds

Returns:

Semaphore for operation completion

batch_id#
property client: TensorBusClient#

Get client from weak reference.

Raises:

RuntimeError – If client has been garbage collected

pair_names#
class etha.tensor_bus.client.TensorBusClient(agent_rank: int, lmdb_command_queue_path: str | None = None, agent_state_lmdb_path: str | None = None, connection_timeout: float = 30.0)#

Host-side Tensor Bus Client.

Initialize TensorBusClient.

Parameters:
  • lmdb_command_queue_path – Path to Agent’s CommandQueue LMDB

  • agent_state_lmdb_path – Path to Agent’s State LMDB

  • connection_timeout – Max time to wait for Agent connection (seconds)

Raises:
close()#

Cleanup resources.

init_pair(pair_name: str, local_name: str, remote_name: str, expected_world_size: int, device_mesh: torch.distributed.device_mesh.DeviceMesh | None = None, placements: tuple[torch.distributed.tensor.placement_types.Placement, Ellipsis] | None = None, blocking: bool = True, timeout: float = 30.0) None#

Register a pair for communication.

Parameters:
  • pair_name – Unique identifier for this pair

  • local_name – Name of local peer

  • remote_name – Name of remote peer

  • expected_world_size – Number of ranks for local peer

  • device_mesh – Local device mesh configuration

  • placements – Local tensor placement strategy. Shard, Replicate and Partial are supported on the source side; Partial is collapsed to Replicate via an all-reduce on the source mesh sub-group before send. Partial on the target side is rejected.

Blocks until:
  • Agent registers to TCPStore

  • Remote peer registers

  • Pair is matched

query_transfer_signal(batch_id: str, blocking: bool = True, timeout: float = 30.0) bool#
register_tensors(batch_id: str, tensors: list[tuple[torch.Tensor, str]], bucket_size: int | None = None, timeout: float = 30.0) BatchHandler#

Register multiple tensors across pairs.

This operation always blocks until registration completes, ensuring the returned BatchHandler is immediately usable.

Parameters:
  • batch_id – Unique identifier for this batch (must be same on both send/recv sides)

  • tensors – list of (tensor, pair_name) tuples

  • bucket_size – optional bucket size in bytes for bucketization optimization

  • timeout – timeout in seconds

Returns:

BatchHandler for managing the registered tensors

transfer(batch_id: str, transfer_type: Literal['send', 'recv'], blocking: bool = False, timeout: float = 30.0) posix_ipc.Semaphore#
agent_rank#
agent_state_lmdb_path = None#
command_queue#
state_db = None#
state_env: lmdb.Environment | None = None#
etha.tensor_bus.client.generate_semaphore_name(command_type: str, context_id: str) str#
etha.tensor_bus.client.logger#