etha.tensor_bus.client
======================

.. py:module:: etha.tensor_bus.client

.. autoapi-nested-parse::

   Tensor Bus Client (Host-side).

   Host processes use TensorBusClient to register pairs and communicate with Agents.



Attributes
----------

.. autoapisummary::

   etha.tensor_bus.client.logger


Classes
-------

.. autoapisummary::

   etha.tensor_bus.client.BatchHandler
   etha.tensor_bus.client.TensorBusClient


Functions
---------

.. autoapisummary::

   etha.tensor_bus.client.generate_semaphore_name


Module Contents
---------------

.. py:class:: BatchHandler(client: TensorBusClient, batch_id: str, pair_names: list[str])

   Handler for batch tensor operations across multiple pairs.

   Initialize BatchHandler.

   :param client: TensorBusClient instance
   :param batch_id: Unique identifier for this batch
   :param pair_names: List of pair names managed by this handler


   .. py:method:: close(blocking: bool = True, timeout: float = 30.0)

      Explicitly cleanup batch state in agent.

      Sends a CleanupBatch command to free resources associated with this batch.
      After calling close(), this handler should not be used.

      :param blocking: If True, block until cleanup completes
      :param timeout: Timeout in seconds



   .. py:method:: query_transfer_signal(blocking: bool = True, timeout: float = 30.0) -> bool

      Query transfer signal status for this batch.

      :returns: Transfer signal status (True if sender has completed transfer)



   .. py:method:: transfer(transfer_type: Literal['send', 'recv'], blocking: bool = False, timeout: float = 30.0) -> posix_ipc.Semaphore

      Transfer all tensors in this batch atomically.

      Sends a single Transfer command to execute all pairs in the batch simultaneously
      using flattened chunks/buckets for optimal performance.

      :param transfer_type: "send" or "recv"
      :param blocking: If True, block until transfer completes
      :param timeout: Timeout in seconds

      :returns: Semaphore for operation completion



   .. py:attribute:: batch_id


   .. py:property:: client
      :type: TensorBusClient


      Get client from weak reference.

      :raises RuntimeError: If client has been garbage collected


   .. py:attribute:: pair_names


.. py:class:: TensorBusClient(agent_rank: int, lmdb_command_queue_path: str | None = None, agent_state_lmdb_path: str | None = None, connection_timeout: float = 30.0)

   Host-side Tensor Bus Client.

   Initialize TensorBusClient.

   :param lmdb_command_queue_path: Path to Agent's CommandQueue LMDB
   :param agent_state_lmdb_path: Path to Agent's State LMDB
   :param connection_timeout: Max time to wait for Agent connection (seconds)

   :raises ValueError: If agent_state_lmdb_path is not provided
   :raises ConnectionError: If Agent is not found or not responding


   .. py:method:: close()

      Cleanup resources.



   .. py:method:: init_pair(pair_name: str, local_name: str, remote_name: str, expected_world_size: int, device_mesh: torch.distributed.device_mesh.DeviceMesh | None = None, placements: tuple[torch.distributed.tensor.placement_types.Placement, Ellipsis] | None = None, blocking: bool = True, timeout: float = 30.0) -> None

      Register a pair for communication.

      :param pair_name: Unique identifier for this pair
      :param local_name: Name of local peer
      :param remote_name: Name of remote peer
      :param expected_world_size: Number of ranks for local peer
      :param device_mesh: Local device mesh configuration
      :param placements: Local tensor placement strategy. ``Shard``, ``Replicate``
                         and ``Partial`` are supported on the source side; ``Partial`` is
                         collapsed to ``Replicate`` via an all-reduce on the source mesh
                         sub-group before send. ``Partial`` on the target side is rejected.

      Blocks until:
          - Agent registers to TCPStore
          - Remote peer registers
          - Pair is matched



   .. py:method:: query_transfer_signal(batch_id: str, blocking: bool = True, timeout: float = 30.0) -> bool


   .. py:method:: register_tensors(batch_id: str, tensors: list[tuple[torch.Tensor, str]], bucket_size: int | None = None, timeout: float = 30.0) -> BatchHandler

      Register multiple tensors across pairs.

      This operation always blocks until registration completes,
      ensuring the returned BatchHandler is immediately usable.

      :param batch_id: Unique identifier for this batch (must be same on both send/recv sides)
      :param tensors: list of (tensor, pair_name) tuples
      :param bucket_size: optional bucket size in bytes for bucketization optimization
      :param timeout: timeout in seconds

      :returns: BatchHandler for managing the registered tensors



   .. py:method:: transfer(batch_id: str, transfer_type: Literal['send', 'recv'], blocking: bool = False, timeout: float = 30.0) -> posix_ipc.Semaphore


   .. py:attribute:: agent_rank


   .. py:attribute:: agent_state_lmdb_path
      :value: None



   .. py:attribute:: command_queue


   .. py:attribute:: state_db
      :value: None



   .. py:attribute:: state_env
      :type:  lmdb.Environment | None
      :value: None



.. py:function:: generate_semaphore_name(command_type: str, context_id: str) -> str

.. py:data:: logger

