DHT
The code is organized as follows:
DHT and DHTNode
DHT
class dht.DHT(initial_peers: Optional[Sequence[Union[Multiaddr, str]]] = None, *, start: bool, p2p: Optional[P2P] = None, daemon: bool = True, num_workers: int = 4, record_validators: Iterable[RecordValidatorBase] = (), shutdown_timeout: float = 3, await_ready: bool = True, **kwargs)
class dht.DHT(initial_peers: Optional[Sequence[Union[Multiaddr, str]]] = None, *, start: bool, p2p: Optional[P2P] = None, daemon: bool = True, num_workers: int = 4, record_validators: Iterable[RecordValidatorBase] = (), shutdown_timeout: float = 3, await_ready: bool = True, **kwargs)Parameters
run()→ None
run()→ Nonerun_in_background(await_ready: bool = True, timeout: Optional[float] = None)→ None
run_in_background(await_ready: bool = True, timeout: Optional[float] = None)→ Noneshutdown()→ None
shutdown()→ Noneget(key: Any, latest: bool = False, return_future: bool = False, **kwargs)→ Union[ValueWithExpiration[Any], None, MPFuture]
get(key: Any, latest: bool = False, return_future: bool = False, **kwargs)→ Union[ValueWithExpiration[Any], None, MPFuture]store(key: Any, value: Any, expiration_time: float, subkey: Optional[Any] = None, return_future: bool = False, **kwargs)→ Union[bool, MPFuture]
store(key: Any, value: Any, expiration_time: float, subkey: Optional[Any] = None, return_future: bool = False, **kwargs)→ Union[bool, MPFuture]Parameters
Returns
run_coroutine(coro: Callable[[DHT, DHTNode], Awaitable[ReturnType]], return_future: bool = False)→ Union[ReturnType, MPFuture[ReturnType]].
run_coroutine(coro: Callable[[DHT, DHTNode], Awaitable[ReturnType]], return_future: bool = False)→ Union[ReturnType, MPFuture[ReturnType]]. Parameters
Returns
Note
Note
Note
get_visible_maddrs(latest: bool = False)→ List[Multiaddr]
get_visible_maddrs(latest: bool = False)→ List[Multiaddr]async replicate_p2p()→ P2P
async replicate_p2p()→ P2PDHTNode
class dht.DHTNode(*, _initialized_with_create=False)
class dht.DHTNode(*, _initialized_with_create=False)Note
Similar to the Kademlia RPC protocol, DHT has 3 RPCs:
A DHTNode follows the following contract:
DHTNode also features several (optional) caching policies:
async classmethod create(p2p: Optional[P2P] = None, node_id: Optional[DHTID] = None, initial_peers: Optional[Sequence[Union[Multiaddr, str]]] = None, bucket_size: int = 20, num_replicas: int = 5, depth_modulo: int = 5, parallel_rpc: int = None, wait_timeout: float = 3, refresh_timeout: Optional[float] = None, bootstrap_timeout: Optional[float] = None, cache_locally: bool = True, cache_nearest: int = 1, cache_size=None, cache_refresh_before_expiry: float = 5, cache_on_store: bool = True, reuse_get_requests: bool = True, num_workers: int = 4, chunk_size: int = 16, blacklist_time: float = 5.0, backoff_rate: float = 2.0, client_mode: bool = False, record_validator: Optional[RecordValidatorBase] = None, authorizer: Optional[AuthorizerBase] = None, ensure_bootstrap_success: bool = True, strict: bool = True, **kwargs)→ DHTNode
async classmethod create(p2p: Optional[P2P] = None, node_id: Optional[DHTID] = None, initial_peers: Optional[Sequence[Union[Multiaddr, str]]] = None, bucket_size: int = 20, num_replicas: int = 5, depth_modulo: int = 5, parallel_rpc: int = None, wait_timeout: float = 3, refresh_timeout: Optional[float] = None, bootstrap_timeout: Optional[float] = None, cache_locally: bool = True, cache_nearest: int = 1, cache_size=None, cache_refresh_before_expiry: float = 5, cache_on_store: bool = True, reuse_get_requests: bool = True, num_workers: int = 4, chunk_size: int = 16, blacklist_time: float = 5.0, backoff_rate: float = 2.0, client_mode: bool = False, record_validator: Optional[RecordValidatorBase] = None, authorizer: Optional[AuthorizerBase] = None, ensure_bootstrap_success: bool = True, strict: bool = True, **kwargs)→ DHTNodeParameters
async shutdown()
async shutdown()async find_nearest_nodes(queries: Collection[DHTID], k_nearest: Optional[int] = None, beam_size: Optional[int] = None, num_workers: Optional[int] = None, node_to_peer_id: Optional[Dict[DHTID, PeerID]] = None, exclude_self: bool = False, **kwargs)→ Dict[DHTID, Dict[DHTID, PeerID]]
async find_nearest_nodes(queries: Collection[DHTID], k_nearest: Optional[int] = None, beam_size: Optional[int] = None, num_workers: Optional[int] = None, node_to_peer_id: Optional[Dict[DHTID, PeerID]] = None, exclude_self: bool = False, **kwargs)→ Dict[DHTID, Dict[DHTID, PeerID]]Parameters
Returns
async store(key: Any, value: Any, expiration_time: float, subkey: Optional[Any] = None, **kwargs)→ bool
async store(key: Any, value: Any, expiration_time: float, subkey: Optional[Any] = None, **kwargs)→ bool async store_many(keys: List[Any], values: List[Any], expiration_time: Union[float, List[float]], subkeys: Optional[Union[Any, List[Optional[Any]]]] = None, exclude_self: bool = False, await_all_replicas=True, **kwargs)→ Dict[Any, bool]
async store_many(keys: List[Any], values: List[Any], expiration_time: Union[float, List[float]], subkeys: Optional[Union[Any, List[Optional[Any]]]] = None, exclude_self: bool = False, await_all_replicas=True, **kwargs)→ Dict[Any, bool]async get(key: Any, latest=False, **kwargs)→ Optional[ValueWithExpiration[Any]]
async get(key: Any, latest=False, **kwargs)→ Optional[ValueWithExpiration[Any]]asyncget_many(keys: Collection[Any], sufficient_expiration_time: Optional[float] = None, **kwargs)→ Dict[Any, Union[ValueWithExpiration[Any], None, Awaitable[Optional[ValueWithExpiration[Any]]]]]
asyncget_many(keys: Collection[Any], sufficient_expiration_time: Optional[float] = None, **kwargs)→ Dict[Any, Union[ValueWithExpiration[Any], None, Awaitable[Optional[ValueWithExpiration[Any]]]]]Parameters
Returns
Note
async get_many_by_id(key_ids: Collection[DHTID], sufficient_expiration_time: Optional[float] = None, num_workers: Optional[int] = None, beam_size: Optional[int] = None, return_futures: bool = False, _is_refresh=False)→ Dict[DHTID, Union[ValueWithExpiration[Any], None, Awaitable[Optional[ValueWithExpiration[Any]]]]]
async get_many_by_id(key_ids: Collection[DHTID], sufficient_expiration_time: Optional[float] = None, num_workers: Optional[int] = None, beam_size: Optional[int] = None, return_futures: bool = False, _is_refresh=False)→ Dict[DHTID, Union[ValueWithExpiration[Any], None, Awaitable[Optional[ValueWithExpiration[Any]]]]]Parameters
Returns
Note
DHT communication protocol
DHTProtocol
class dht.protocol.DHTProtocol(*, _initialized_with_create=False)
class dht.protocol.DHTProtocol(*, _initialized_with_create=False)serializer
async classmethod create(p2p: P2P, node_id: DHTID, bucket_size: int, depth_modulo: int, num_replicas: int, wait_timeout: float, parallel_rpc: Optional[int] = None, cache_size: Optional[int] = None, client_mode: bool = False, record_validator: Optional[RecordValidatorBase] = None, authorizer: Optional[AuthorizerBase] = None)→ DHTProtocol
async classmethod create(p2p: P2P, node_id: DHTID, bucket_size: int, depth_modulo: int, num_replicas: int, wait_timeout: float, parallel_rpc: Optional[int] = None, cache_size: Optional[int] = None, client_mode: bool = False, record_validator: Optional[RecordValidatorBase] = None, authorizer: Optional[AuthorizerBase] = None)→ DHTProtocolNote
get_stub(peer: PeerID)→ AuthRPCWrapper
get_stub(peer: PeerID)→ AuthRPCWrapperasync call_ping(peer: PeerID, validate: bool = False, strict: bool = True)→ Optional[DHTID]
async call_ping(peer: PeerID, validate: bool = False, strict: bool = True)→ Optional[DHTID]Returns
async rpc_ping(request: PingRequest, context: P2PContext)→ PingResponse
async rpc_ping(request: PingRequest, context: P2PContext)→ PingResponseasync call_store(peer: PeerID, keys: Sequence[DHTID], values: Sequence[Union[bytes, DictionaryDHTValue]], expiration_time: Union[float, Sequence[float]], subkeys: Optional[Union[Any, Sequence[Optional[Any]]]] = None, in_cache: Optional[Union[bool, Sequence[bool]]] = None)→ Optional[List[bool]]
async call_store(peer: PeerID, keys: Sequence[DHTID], values: Sequence[Union[bytes, DictionaryDHTValue]], expiration_time: Union[float, Sequence[float]], subkeys: Optional[Union[Any, Sequence[Optional[Any]]]] = None, in_cache: Optional[Union[bool, Sequence[bool]]] = None)→ Optional[List[bool]]Note
Returns
async rpc_store(request: StoreRequest, context: P2PContext)→ StoreResponse
async rpc_store(request: StoreRequest, context: P2PContext)→ StoreResponseasync call_find(peer: PeerID, keys: Collection[DHTID])→ Optional[Dict[DHTID, Tuple[Optional[ValueWithExpiration[Union[bytes, DictionaryDHTValue]]], Dict[DHTID, PeerID]]]]
async call_find(peer: PeerID, keys: Collection[DHTID])→ Optional[Dict[DHTID, Tuple[Optional[ValueWithExpiration[Union[bytes, DictionaryDHTValue]]], Dict[DHTID, PeerID]]]]Returns
async rpc_find(request: FindRequest, context: P2PContext)→ FindResponse
async rpc_find(request: FindRequest, context: P2PContext)→ FindResponseasync update_routing_table(node_id: Optional[DHTID], peer_id: PeerID, responded=True)
async update_routing_table(node_id: Optional[DHTID], peer_id: PeerID, responded=True)Routing Tables
Parameters
Note
get_bucket_index(node_id: DHTID)→ int
get_bucket_index(node_id: DHTID)→ intadd_or_update_node(node_id: DHTID, peer_id: PeerID)→ Optional[Tuple[DHTID, PeerID]]
add_or_update_node(node_id: DHTID, peer_id: PeerID)→ Optional[Tuple[DHTID, PeerID]]Returns
Note
split_bucket(index: int)→ None
split_bucket(index: int)→ Noneget(*, node_id: Optional[DHTID] = None, peer_id: Optional[PeerID] = None, default=None)
get(*, node_id: Optional[DHTID] = None, peer_id: Optional[PeerID] = None, default=None)get_nearest_neighbors(query_id: DHTID, k: int, exclude: Optional[DHTID] = None)→ List[Tuple[DHTID, PeerID]]
get_nearest_neighbors(query_id: DHTID, k: int, exclude: Optional[DHTID] = None)→ List[Tuple[DHTID, PeerID]]Parameters
Returns
KBucket
class dht.routing.KBucket(lower: int, upper: int, size: int, depth: int = 0)
class dht.routing.KBucket(lower: int, upper: int, size: int, depth: int = 0)has_in_range(node_id: DHTID)
has_in_range(node_id: DHTID)add_or_update_node(node_id: DHTID, peer_id: PeerID)→ bool
add_or_update_node(node_id: DHTID, peer_id: PeerID)→ boolParameters
Note
request_ping_node()→ Optional[Tuple[DHTID, PeerID]]
request_ping_node()→ Optional[Tuple[DHTID, PeerID]]Returns
split()→ Tuple[KBucket, KBucket]
split()→ Tuple[KBucket, KBucket]DHTID
class dht.routing.DHTID(value: int)
class dht.routing.DHTID(value: int)classmethod generate(source: Optional[Any] = None, nbits: int = 255)
classmethod generate(source: Optional[Any] = None, nbits: int = 255)Parameters
xor_distance(other: Union[DHTID, Sequence[DHTID]])→ Union[int, List[int]]
xor_distance(other: Union[DHTID, Sequence[DHTID]])→ Union[int, List[int]]Parameters
Returns
to_bytes(length=20, byteorder='big', *, signed=False)→ bytes
to_bytes(length=20, byteorder='big', *, signed=False)→ bytesclassmethod from_bytes(raw: bytes, byteorder='big', *, signed=False)→ DHTID
classmethod from_bytes(raw: bytes, byteorder='big', *, signed=False)→ DHTIDTraverse (crawl) DHT
async dht.traverse.simple_traverse_dht(query_id: DHTID, initial_nodes: Collection[DHTID], beam_size: int, get_neighbors: Callable[[DHTID], Awaitable[Tuple[Collection[DHTID], bool]]], visited_nodes: Collection[DHTID] = ())→ Tuple[Tuple[DHTID], Set[DHTID]]
async dht.traverse.simple_traverse_dht(query_id: DHTID, initial_nodes: Collection[DHTID], beam_size: int, get_neighbors: Callable[[DHTID], Awaitable[Tuple[Collection[DHTID], bool]]], visited_nodes: Collection[DHTID] = ())→ Tuple[Tuple[DHTID], Set[DHTID]]Note
Parameters
Returns
async dht.traverse.traverse_dht(queries: Collection[DHTID], initial_nodes: List[DHTID], beam_size: int, num_workers: int, queries_per_call: int, get_neighbors: Callable[[DHTID, Collection[DHTID]], Awaitable[Dict[DHTID, Tuple[Tuple[DHTID], bool]]]], found_callback: Optional[Callable[[DHTID, List[DHTID], Set[DHTID]], Awaitable[Any]]] = None, await_all_tasks: bool = True, visited_nodes: Optional[Dict[DHTID, Set[DHTID]]] = ())→ Tuple[Dict[DHTID, List[DHTID]], Dict[DHTID, Set[DHTID]]]
async dht.traverse.traverse_dht(queries: Collection[DHTID], initial_nodes: List[DHTID], beam_size: int, num_workers: int, queries_per_call: int, get_neighbors: Callable[[DHTID, Collection[DHTID]], Awaitable[Dict[DHTID, Tuple[Tuple[DHTID], bool]]]], found_callback: Optional[Callable[[DHTID, List[DHTID], Set[DHTID]], Awaitable[Any]]] = None, await_all_tasks: bool = True, visited_nodes: Optional[Dict[DHTID, Set[DHTID]]] = ())→ Tuple[Dict[DHTID, List[DHTID]], Dict[DHTID, Set[DHTID]]]Parameters
Note
Returns
Last updated