etha.tensor_bus.command_queue#
LMDB-based Command Queue for Tensor Bus.
Attributes#
Exceptions#
Raised when trying to enqueue to a full CommandQueue with block=False. |
Classes#
LMDB based command queue. |
Module Contents#
- exception etha.tensor_bus.command_queue.QueueFullError#
Bases:
ExceptionRaised when trying to enqueue to a full CommandQueue with block=False.
Initialize self. See help(type(self)) for accurate signature.
- class etha.tensor_bus.command_queue.CommandQueue(lmdb_path: str = '/tmp/tensor_bus.lmdb', capacity: int = 50000)#
LMDB based command queue.
- close(destroy: bool = True)#
Close queue connection.
- Parameters:
destroy – If True (default), completely remove all resources (LMDB files, semaphores). If False, only close handles in this process (safe for multi-process).
- Default behavior (destroy=True):
Most common case: single process or final cleanup
Unlinks semaphores from the system
Deletes LMDB database files
Ensures no resource leaks
- Multi-process mode (destroy=False):
Use when other processes are still using the same queue
Only closes handles in this process
Resources remain available for other processes
Last process should call close() or close(destroy=True) to clean up
Examples
# Single process (default) q = CommandQueue(‘/tmp/test.lmdb’) q.enqueue(msg) q.close() # Automatically cleans up everything
# Multi-process # Worker process q = CommandQueue(‘/tmp/bus.lmdb’) q.enqueue(msg) q.close(destroy=False) # Don’t delete, other processes need it
# Main process (last one) q = CommandQueue(‘/tmp/bus.lmdb’) q.close() # Final cleanup, removes all resources
- dequeue(*, block: bool = False, timeout: float | None = None) etha.tensor_bus.commands.Message | None#
Dequeue a message.
- Returns:
Message object (auto-detected type), or None if queue is empty
- dequeue_batch(max_count: int = 32) list[etha.tensor_bus.commands.Message]#
Batch dequeue (improve throughput).
- Parameters:
max_count – Maximum number of messages to dequeue
- Returns:
List of messages
- enqueue(msg: etha.tensor_bus.commands.Message, *, block: bool = True, timeout: float | None = None) int#
Enqueue a message.
- Parameters:
msg – Any Message type (PutCommand, GetCommand, etc.)
block – If True, block when queue is full. If False, raise QueueFullError.
timeout – Timeout in seconds for blocking wait (None = infinite)
- Returns:
Message ID (queue position)
- Raises:
QueueFullError – If queue is full and block=False, or timeout expires
- peek() etha.tensor_bus.commands.Message | None#
View front message (without dequeuing).
- capacity = 50000#
- lmdb_path#
- ready_name#
- sem_name#
- space_sem_name#
- etha.tensor_bus.command_queue.logger#