Direct P2P communication that opens a stream between two peers can be achieved by building a custom P2P protocol.
To call a specific peer, its multiaddress is required. While all peers are connected through algorithms using a Kademlia DHT (distributed hash table), not all peers are directly connected. In a P2P network, “connected” does not mean “directly connected.” It means reachable through other peers. Unlike using a GossipSub, where you don't need a peer's multiaddress for them to receive a message, it's required while using direct P2P communication.
Acquiring A Multiaddress
A peer's multiaddress can be stored on-chain and fetched there (a node adding a multiaddress on-chain is optional, but can be required by subnets to allow them into the subnets P2P network); the subnet can set up a heartbeat system using a GossipSub that shares each peer's multiaddress, or a peer can ask a directly connected peer for another peer's multiaddress from itself or its neighbors.
Once a multiaddress is acquired, then a peer can reach out to the desired peer.
Building Custom Protocol
Building a custom P2P protocol for the subnets use case or use cases is achieved by inheriting the ProtocolBase template class.
Generate the Python protobuf module from the repository root:
This creates subnet/protocols/pb/task_protocol_pb2.py and, when mypy-protobuf is installed, subnet/protocols/pb/task_protocol_pb2.pyi. If you are not generating type stubs, omit --mypy_out=.. To regenerate it through the Makefile, add the proto path to the PB variable.
Create a protocol class
This is achieved by subclassing ProtobufRequestResponseProtocolTemplate:
Instantiate the protocol
The base class registers the stream handler automatically.
import logging
from libp2p.abc import IHost
from multiaddr import Multiaddr
from subnet.protocols.pb.task_protocol_pb2 import (
TaskRequest,
TaskResponse,
)
from subnet.protocols.protocol_base import (
IncomingRequestContext,
ProtobufRequestResponseProtocolTemplate,
)
logger = logging.getLogger("task_protocol/1.0.0")
PROTOCOL_ID = "/subnet/task_protocol/1.0.0"
"""
Example function that does a task when a peer is requested.
This can be a class or a function.
"""
async def run_task(payload: bytes) -> bytes:
return payload.upper()
class TaskProtocol(
ProtobufRequestResponseProtocolTemplate[TaskRequest, TaskResponse],
):
def __init__(self, host: IHost) -> None:
super().__init__(
host=host,
protocol_id=PROTOCOL_ID,
request_message_type=TaskRequest,
response_message_type=TaskResponse,
log=logger,
)
async def call_remote(
self,
destination: Multiaddr | str,
task_id: str,
payload: bytes,
) -> TaskResponse:
"""Call out to a peer to perform a task"""
request = TaskRequest(task_id=task_id, payload=payload)
return await super().call_remote(destination, request)
async def handle_request(
self,
context: IncomingRequestContext,
request: TaskRequest,
) -> TaskResponse:
"""A peer is calling us to perform a task"""
logger.info(
"Received task %s from %s",
request.task_id,
context.peer_id,
)
result = await run_task(request.payload)
return TaskResponse(accepted=True, result=result)