etha.tensor_bus.command_queue#

LMDB-based Command Queue for Tensor Bus.

Attributes#

Exceptions#

QueueFullError

Raised when trying to enqueue to a full CommandQueue with block=False.

Classes#

CommandQueue

LMDB based command queue.

Module Contents#

exception etha.tensor_bus.command_queue.QueueFullError#

Bases: Exception

Raised when trying to enqueue to a full CommandQueue with block=False.

Initialize self. See help(type(self)) for accurate signature.

class etha.tensor_bus.command_queue.CommandQueue(lmdb_path: str = '/tmp/tensor_bus.lmdb', capacity: int = 50000)#

LMDB based command queue.

clear(delete_commands: bool = True)#

Clear queue.

close(destroy: bool = True)#

Close queue connection.

Parameters:

destroy – If True (default), completely remove all resources (LMDB files, semaphores). If False, only close handles in this process (safe for multi-process).

Default behavior (destroy=True):
  • Most common case: single process or final cleanup

  • Unlinks semaphores from the system

  • Deletes LMDB database files

  • Ensures no resource leaks

Multi-process mode (destroy=False):
  • Use when other processes are still using the same queue

  • Only closes handles in this process

  • Resources remain available for other processes

  • Last process should call close() or close(destroy=True) to clean up

Examples

# Single process (default) q = CommandQueue(‘/tmp/test.lmdb’) q.enqueue(msg) q.close() # Automatically cleans up everything

# Multi-process # Worker process q = CommandQueue(‘/tmp/bus.lmdb’) q.enqueue(msg) q.close(destroy=False) # Don’t delete, other processes need it

# Main process (last one) q = CommandQueue(‘/tmp/bus.lmdb’) q.close() # Final cleanup, removes all resources

dequeue(*, block: bool = False, timeout: float | None = None) etha.tensor_bus.commands.Message | None#

Dequeue a message.

Returns:

Message object (auto-detected type), or None if queue is empty

dequeue_batch(max_count: int = 32) list[etha.tensor_bus.commands.Message]#

Batch dequeue (improve throughput).

Parameters:

max_count – Maximum number of messages to dequeue

Returns:

List of messages

enqueue(msg: etha.tensor_bus.commands.Message, *, block: bool = True, timeout: float | None = None) int#

Enqueue a message.

Parameters:
  • msg – Any Message type (PutCommand, GetCommand, etc.)

  • block – If True, block when queue is full. If False, raise QueueFullError.

  • timeout – Timeout in seconds for blocking wait (None = infinite)

Returns:

Message ID (queue position)

Raises:

QueueFullError – If queue is full and block=False, or timeout expires

is_empty() bool#

Check if queue is empty.

peek() etha.tensor_bus.commands.Message | None#

View front message (without dequeuing).

size() int#

Return queue length.

capacity = 50000#
lmdb_path#
ready_name#
sem_name#
space_sem_name#
etha.tensor_bus.command_queue.logger#