etha.tensor_bus.commands#

Command (Host to Tensor Bus) Definitions and State Structures.

Attributes#

Classes#

BaseCommand

Base class for all Tensor Bus commands.

CleanupBatch

Cleanup a batch's state in the agent.

InitPair

Init a Device Mesh + Placement to Device Mesh + Placement pair.

QueryStatus

Query status for a batch.

RegisterTensors

Register multiple tensors for zero-copy sharing between processes.

Transfer

Transfer tensor command for a specific batch.

Module Contents#

class etha.tensor_bus.commands.BaseCommand#

Bases: msgspec.Struct

Base class for all Tensor Bus commands.

Features: - Auto-tagging: Uses class name as type tag - Common timestamp field for all commands - Optional semaphore for completion notification

semaphore_name: str | None = None#
timestamp: float | None = None#
class etha.tensor_bus.commands.CleanupBatch#

Bases: BaseCommand

Cleanup a batch’s state in the agent.

batch_id: str#
class etha.tensor_bus.commands.InitPair#

Bases: BaseCommand

Init a Device Mesh + Placement to Device Mesh + Placement pair.

Parameters:
  • pair_name – Unique identifier for this pair (e.g., “obs”, “action”)

  • local_name – Name of local peer (e.g., “inference”, “training”)

  • expected_world_size – Number of ranks for local peer

  • remote_name – Name of remote peer (explicit pairing)

  • mesh_shape_payload – Serialized mesh shape tuple as memoryview

  • placements_payload – Serialized placements tuple as memoryview

expected_world_size: int#
local_name: str#
mesh_shape_payload: memoryview | None = None#
pair_name: str#
placements_payload: memoryview | None = None#
remote_name: str#
class etha.tensor_bus.commands.QueryStatus#

Bases: BaseCommand

Query status for a batch.

batch_id: str#
state_name: str#
class etha.tensor_bus.commands.RegisterTensors#

Bases: BaseCommand

Register multiple tensors for zero-copy sharing between processes.

Creates a new batch with a unique batch_id. Multiple tensors can be registered across different pairs in a single batch, enabling efficient cross-pair execution via flattened chunks/buckets.

batch_id: str#
bucket_size: int | None = None#
tensors: list[tuple[str, memoryview]]#
class etha.tensor_bus.commands.Transfer#

Bases: BaseCommand

Transfer tensor command for a specific batch.

batch_id: str#
transfer_type: Literal['send', 'recv']#
etha.tensor_bus.commands.Message#