Protocols
Protocols define the core logic of your subnet — they are the building blocks that power how nodes communicate, coordinate, and perform distributed tasks.
As a subnet developer, protocols are where you implement the application-layer behavior for your decentralized AI system. Whether you're building a model inference engine, a validator network, or a task delegation system, protocols are where the logic lives.
Each protocol should have a parent and child process, which can be accomplished using the Python builtins multiprocessing library.
What a Protocol Does
Protocols register RPC methods to the P2P network and expose them to other nodes, enabling nodes to call these RPC methods on each other in a decentralized manner.
Structure of a Protocol
Each protocol typically consists of two parts:
Parent Process (Main Process) – Manages the core application logic, P2P interactions, and spawns child processes.
Child Process (Worker Process) – Handles compute-intensive or blocking tasks (e.g., model inference), often run using Python's
multiprocessing
library to avoid blocking the event loop.
RPC Registration
Protocols register RPC (Remote Procedure Call) methods that allow other nodes in the subnet to interact with them.
These RPC methods are registered with the DHT and can be called remotely using
get_stub()
.The
ServerBase
class provides theget_stub()
utility, and protocol implementations expose their ownget_server_stub()
for convenience.These RPCs can be:
One-shot methods that return data or acknowledge a command.
Streaming methods that yield results over time (async generators).
Example use cases include and are not limited to:
Submitting inference jobs
Fetching model outputs or scores
Notifying peers of new tasks or events
Automating tasks
Requesting data from a node
ServicerBase
Every protocol in a Hypertensor subnet must subclass ServicerBase
, which acts as the foundation for defining and handling P2P RPC methods.
What ServicerBase
Provides
ServicerBase
Providesadd_p2p_handlers()
add_p2p_handlers()
This method registers all rpc_*
methods defined in your protocol as RPC endpoints. It inspects the method signatures using type annotations to automatically infer the Protobuf request and response types.
Once registered, these methods become callable by other nodes in the subnet via the DHT-based P2P layer.
🔧 Note: You must call
add_p2p_handlers()
from within the child process where the protocol is running. Otherwise, the handlers won't be bound correctly.
get_stub(p2p, peer)
get_stub(p2p, peer)
This method creates a stub object that exposes all the rpc_*
methods of a remote peer.
Calling a method on the stub translates into a remote RPC call to the target peer.
Under the hood, it uses the same method names and protobuf interfaces as the local implementation.
🔐 Best practice: Always wrap
get_stub()
with an authentication layer (e.g., proof-of-stake, signature verification, etc.) to ensure the identity and trustworthiness of the peer you're interacting with.
RPC Methods Formatting
When defining RPC methods for your protocol, they must follow a specific signature format that is compatible with add_p2p_handlers()
from ServicerBase
.
Each rpc_*
method must:
Be
async def
Accept two parameters:
A Protobuf request message
A
P2PContext
object (provides metadata about the calling peer)
Return either:
A single Protobuf response (
Awaitable[TOutputProtobuf]
), orAn async generator for streaming responses (
AsyncIterator[TOutputStream]
)
Expected Signature Format
Union[TInputProtobuf, TInputStream], P2PContext
→
Union[Awaitable[TOutputProtobuf], AsyncIterator[TOutputStream]]
Protobuf Message Examples
Here’s an example of request/response Protobuf messages used in inference:
message InferenceRequest {
string input = 1;
int32 max_new_tokens = 2;
Tensor tensor = 3;
}
message InferenceResponse {
repeated Tensor tensors = 1;
}
Subnets should have mechanisms for nodes to interact with each other, ensuring that nodes can be verified by other nodes, run automated tasks, and make requests to each other, among other purposes.
To expose methods to the DHT for nodes to call on each other, format those functions using rpc_*
. Each rpc_*
method should have a corresponding function that calls them (See example below).
RPC Method Examples
Each rpc_*
method in the protocol must abide by specific formats.
Examples
Unary (Single Request → Single Response)
def rpc_something(
self,
request: inference_pb2.InferenceRequest,
context: P2PContext
) -> inference_pb2.InferenceResponse:
...
Streaming (Single Request → Streamed Responses)
def rpc_do_stream(
self,
request: inference_pb2.InferenceRequest,
context: P2PContext
) -> AsyncIterator[inference_pb2.InferenceResponse]:
...
Streaming (Stream Request → Streamed Responses)
async def rpc_stream_something(
self,
requests: AsyncIterator[runtime_pb2.StreamRequest],
context: P2PContext
) -> AsyncIterator[runtime_pb2.StreamRequest]:
...
💡 Ensure your Protobuf messages are imported and type-annotated correctly. The system relies on type hints to automatically register and route handlers via
add_p2p_handlers()
.
Peer-to-peer Communication
To call a method on a peer, you must be a node in the P2P.
Calling from within the protocol
async def call_something(
self, peer: PeerID, data: str
) -> str:
something = protocol_pb2.SomethingRequest(something=data)
try:
async with self.rpc_semaphore:
p2p = await self.dht.replicate_p2p()
response = await self.get_stub(p2p, peer).rpc_something(something)
return response
except Exception as e:
return
Calling from outside the protocol
To call the exposed RPC methods from a protocol, outside of the protocol, the protocol itself should have a @classmethod
method that returns the RPC methods of the peer.
Each protocol that needs to be accessed from areas other than from within the protocol should have a method for getting the server's stub, such as:
@classmethod
def get_server_stub(
cls,
p2p: P2P,
peer: PeerID,
authorizer: Optional[AuthorizerBase] = None
) -> "MockProtocolStub":
stub = super().get_stub(p2p, peer)
return AuthRPCWrapperStreamer(stub, AuthRole.CLIENT, authorizer, service_public_key=None)
Protocol Example
Here is an example of a subnet that allows nodes to stream data from one another:
In this example, hosters serve the model and can run inference when nodes use call_inference_stream
on a hoster peer. The call_inference_stream
will get the stub of the peer and call rpc_inference_stream
on the hoster peer, the hoster will then perform an inference stream and stream the results back to the calling node.
from __future__ import annotations
import asyncio
import io
import multiprocessing as mp
from typing import AsyncIterator, Optional
import torch
import mesh
from meshimport DHT, get_dht_time
from mesh.compression.serialization import deserialize_torch_tensor, serialize_torch_tensor
from mesh.p2p import P2P, P2PContext, PeerID, ServicerBase
from mesh.proto import dht_pb2, inference_protocol_pb2, runtime_pb2
from mesh.subnet.protocols.inference_model import AsyncInferenceServer, InferenceModel
from mesh.subnet.utils.consensus import get_consensus_key
from mesh.subnet.utils.key import extract_rsa_peer_id, extract_rsa_peer_id_from_ssh
from mesh.substrate.chain_functions import Hypertensor
from mesh.utils import get_logger
from mesh.utils.asyncio import switch_to_uvloop
from mesh.utils.auth import AuthorizerBase, AuthRole, AuthRPCWrapperStreamer
from mesh.utils.mpfuture import MPFuture
from mesh.utils.serializer import MSGPackSerializer
logger = get_logger(__name__)
class InferenceProtocol(mp.context.ForkProcess, ServicerBase):
_async_model: AsyncInferenceServer
def __init__(
self,
dht: DHT,
subnet_id: int,
model_name: Optional[str] = None,
balanced: bool = True,
shutdown_timeout: float = 3,
hypertensor: Optional[Hypertensor] = None,
authorizer: Optional[AuthorizerBase] = None,
start: bool = False,
):
super().__init__()
self.dht = dht
self.subnet_id = subnet_id
self.peer_id = dht.peer_id
self.node_id = dht.node_id
self.node_info = dht_pb2.NodeInfo(node_id=self.node_id.to_bytes())
self.balanced, self.shutdown_timeout = balanced, shutdown_timeout
self._p2p = None
self.authorizer = authorizer
self.ready = MPFuture()
self.rpc_semaphore = asyncio.Semaphore(float("inf"))
self._inner_pipe, self._outer_pipe = mp.Pipe(duplex=True)
self.model_name = model_name
self.daemon = True
self.hypertensor = hypertensor
if start:
self.run_in_background(await_ready=True)
def run(self):
torch.set_num_threads(1)
loop = switch_to_uvloop()
stop = asyncio.Event()
loop.add_reader(self._inner_pipe.fileno(), stop.set)
async def _run():
try:
self._p2p = await self.dht.replicate_p2p()
"""Add rpc_* methods from this class to the P2P servicer"""
logger.info("Adding P2P handlers")
if self.authorizer is not None:
logger.info("Adding P2P handlers with authorizer")
await self.add_p2p_handlers(
self._p2p,
AuthRPCWrapperStreamer(self, AuthRole.SERVICER, self.authorizer),
)
else:
await self.add_p2p_handlers(self._p2p, balanced=self.balanced)
"""
Run pytorch functions and classes in the child process
Read more:
- https://stackoverflow.com/questions/22950047/cuda-initialization-error-after-fork/22950549#22950549
- https://github.com/pytorch/pytorch/issues/17199
"""
if self.model_name is not None:
logger.info("Loading Inference Model")
model = InferenceModel(self.model_name)
logger.info("Setting Up Async Inference Server")
self._async_model = AsyncInferenceServer(model)
logger.info("Starting Async Inference Server Worker")
asyncio.create_task(self._async_model._worker())
logger.info("Async Inference Server Complete")
self.ready.set_result(None)
except Exception as e:
logger.debug(e, exc_info=True)
self.ready.set_exception(e)
try:
await stop.wait()
finally:
await self.remove_p2p_handlers(self._p2p)
try:
loop.run_until_complete(_run())
except KeyboardInterrupt:
logger.debug("Caught KeyboardInterrupt, shutting down")
def run_in_background(self, await_ready: bool = True, timeout: Optional[float] = None) -> None:
"""
Starts InferenceProtocol in a background process. If :await_ready:, this method will wait until
it is ready to process incoming requests or for :timeout: seconds max.
"""
self.start()
def shutdown(self):
if self.is_alive():
self.join(self.shutdown_timeout)
if self.is_alive():
logger.warning(
"InferenceProtocol did not shut down within the grace period; terminating it the hard way"
)
self.terminate()
else:
logger.warning("InferenceProtocol shutdown had no effect, the process is already dead")
def get_stub(self, p2p: P2P, peer: PeerID) -> AuthRPCWrapperStreamer:
"""
Get a stub that sends requests to a given peer.
It's important here to wrap the stub with an authentication wrapper, see AuthRPCWrapper
"""
stub = super().get_stub(p2p, peer)
return AuthRPCWrapperStreamer(stub, AuthRole.CLIENT, self.authorizer, service_public_key=None)
@classmethod
def get_server_stub(
cls,
p2p: P2P,
peer: PeerID,
authorizer: Optional[AuthorizerBase] = None
) -> "InferenceProtocolStub": # type: ignore # noqa: F821
"""
Get a stub that sends requests to a given peer.
This function can be used to get the RPC methods from this protocol outside of this class.
This is useful for client-side requests.
"""
stub = super().get_stub(p2p, peer)
return AuthRPCWrapperStreamer(stub, AuthRole.CLIENT, authorizer, service_public_key=None)
async def rpc_info(self, request: runtime_pb2.Empty, context: P2PContext) -> runtime_pb2.NodeData:
"""Return metadata about stored block uids and current load"""
result = {
"version": hivemind.__version__,
"dht_client_mode": self.dht.client_mode,
"role": "hoster" if self.model_name is not None else "validator"
}
return runtime_pb2.NodeData(serialized_info=MSGPackSerializer.dumps(result))
async def call_inference_stream(
self, peer: PeerID, prompt: str, tensor: torch.Tensor
) -> AsyncIterator[torch.Tensor]:
"""
Call another peer to perform an inference stream on the `tensor`
The inference will be returned as a streamed
"""
input_stream = inference_protocol_pb2.InferenceRequestAuth(
input=prompt,
max_new_tokens=5,
tensor=serialize_torch_tensor(tensor),
)
try:
async with self.rpc_semaphore:
p2p = await self.dht.replicate_p2p()
response_stream = await self.get_stub(p2p, peer).rpc_inference_stream(input_stream)
async for response in response_stream:
for tensor_bytes in response.tensors:
tensor = deserialize_torch_tensor(tensor_bytes)
yield tensor
except Exception as e:
logger.error(f"InferenceProtocol failed to stream from {peer}: {e}", exc_info=True)
return
def should_process_inference(self, tensor: torch.Tensor) -> bool:
"""
Add RPC validation
"""
return True
async def rpc_inference_stream(
self, requests: inference_protocol_pb2.InferenceRequestAuth, context: P2PContext
) -> AsyncIterator[inference_protocol_pb2.InferenceResponseAuth]:
"""
A peer wants us to perform an inference stream
"""
tensor = deserialize_torch_tensor(requests.tensor)
caller_peer_id = extract_rsa_peer_id_from_ssh(requests.auth.client_access_token.public_key)
"""
Don't allow other hosters to call inference on me if it matches
the current epochs random consensus tensors
"""
if self.authorizer is not None and not caller_peer_id.__eq__(self.peer_id):
# Don't bother pinging the decentralized storage unless we have to
run_inference = self.should_process_inference(tensor)
if run_inference is False:
raise ValueError("Invalid request. ")
async for token_tensor in await self._async_model.submit(tensor):
yield inference_protocol_pb2.InferenceResponseAuth(
peer=self.node_info,
dht_time=get_dht_time(),
output=str(token_tensor.item()),
tensors=[serialize_torch_tensor(token_tensor)]
)
Last updated