etha.kvstore.tcp#

TorchTCPStore implementation wrapping torch.distributed.TCPStore.

Attributes#

Classes#

TorchTCPStore

KVStore implementation backed by torch.distributed.TCPStore.

Module Contents#

class etha.kvstore.tcp.TorchTCPStore(host: str, port: int, world_size: int, is_master: bool, timeout: float = 3600.0, wait_for_workers: bool = True, namespace: str = 'default', component: str = 'tensorbus')#

Bases: etha.kvstore.base.KVStore

KVStore implementation backed by torch.distributed.TCPStore.

This is a fallback implementation when etcd is not available. Uses polling for wait_for_keys (less efficient than etcd’s watch).

Initialize TorchTCPStore.

Parameters:
  • host – TCPStore server host

  • port – TCPStore server port

  • world_size – Total number of processes

  • is_master – Whether this process is the master (server)

  • timeout – Connection timeout in seconds

  • wait_for_workers – Whether master should wait for all workers

  • namespace – Namespace for key isolation

  • component – Default component name

close(cleanup: bool = True) None#

Close the store.

Note: TCPStore doesn’t have an explicit close method.

Parameters:

cleanup – Ignored for TCPStore (no persistent storage)

delete(key: str, *, component: str | None = None) bool#

Delete a key.

Note: TCPStore doesn’t support delete, so we set value to empty string.

exists(key: str, *, component: str | None = None) bool#

Check if a key exists.

get(key: str, *, component: str | None = None) bytes | None#

Get value for a key.

get_bytes(key: str, *, component: str | None = None) bytes | None#

Retrieve binary data with base64 decoding.

set(key: str, value: str, *, component: str | None = None) None#

Set a key-value pair.

set_bytes(key: str, data: bytes, *, component: str | None = None) None#

Store binary data with base64 encoding (TCPStore only accepts strings).

wait_for_key(key: str, timeout: float = 3600.0, *, component: str | None = None) bytes#

Wait for a key to exist and return its value.

Uses polling since TCPStore doesn’t support watch.

wait_for_keys(key_pattern: str, expected_count: int, value: str = '1', timeout: float = 3600.0, candidate_keys: list[str] | None = None, *, component: str | None = None) list[str]#

Wait for keys matching pattern using polling.

Since TCPStore doesn’t support prefix listing or watch, we need candidate_keys to know which keys to check.

Parameters:
  • key_pattern – Pattern with ‘*’ wildcard

  • expected_count – Number of matching keys to wait for

  • value – Expected value for matching keys

  • timeout – Maximum time to wait in seconds

  • candidate_keys – List of keys to check (required for TCPStore)

  • component – Override default component for this call

Returns:

List of matched keys (without namespace prefix)

Raises:
wait_for_value(key: str, expected: str, timeout: float = 3600.0, *, component: str | None = None) bytes#

Wait for a key to have a specific value using polling.

host#
is_master#
port#
world_size#
etha.kvstore.tcp.POLL_INTERVAL = 0.001#
etha.kvstore.tcp.logger#