Protocols

Protocols define the core logic of your subnet — they are the building blocks that power how nodes communicate, coordinate, and perform distributed tasks.

As a subnet developer, protocols are where you implement the application-layer behavior for your decentralized AI system. Whether you're building a model inference engine, a validator network, or a task delegation system, protocols are where the logic lives.

Each protocol should have a parent and child process, which can be accomplished using the Python builtins multiprocessing library.

What a Protocol Does

Protocols register RPC methods to the P2P network and expose them to other nodes, enabling nodes to call these RPC methods on each other in a decentralized manner.

Structure of a Protocol

Each protocol typically consists of two parts:

  • Parent Process (Main Process) – Manages the core application logic, P2P interactions, and spawns child processes.

  • Child Process (Worker Process) – Handles compute-intensive or blocking tasks (e.g., model inference), often run using Python's multiprocessing library to avoid blocking the event loop.

RPC Registration

Protocols register RPC (Remote Procedure Call) methods that allow other nodes in the subnet to interact with them.

  • These RPC methods are registered with the DHT and can be called remotely using get_stub().

  • The ServerBase class provides the get_stub() utility, and protocol implementations expose their own get_server_stub() for convenience.

  • These RPCs can be:

    • One-shot methods that return data or acknowledge a command.

    • Streaming methods that yield results over time (async generators).

Example use cases include and are not limited to:

  • Submitting inference jobs

  • Fetching model outputs or scores

  • Notifying peers of new tasks or events

  • Automating tasks

  • Requesting data from a node


ServicerBase

Every protocol in a Hypertensor subnet must subclass ServicerBase, which acts as the foundation for defining and handling P2P RPC methods.

What ServicerBase Provides

add_p2p_handlers()

This method registers all rpc_* methods defined in your protocol as RPC endpoints. It inspects the method signatures using type annotations to automatically infer the Protobuf request and response types.

Once registered, these methods become callable by other nodes in the subnet via the DHT-based P2P layer.

🔧 Note: You must call add_p2p_handlers() from within the child process where the protocol is running. Otherwise, the handlers won't be bound correctly.

get_stub(p2p, peer)

This method creates a stub object that exposes all the rpc_* methods of a remote peer.

  • Calling a method on the stub translates into a remote RPC call to the target peer.

  • Under the hood, it uses the same method names and protobuf interfaces as the local implementation.

🔐 Best practice: Always wrap get_stub() with an authentication layer (e.g., proof-of-stake, signature verification, etc.) to ensure the identity and trustworthiness of the peer you're interacting with.

RPC Methods Formatting

When defining RPC methods for your protocol, they must follow a specific signature format that is compatible with add_p2p_handlers() from ServicerBase.

Each rpc_* method must:

  1. Be async def

  2. Accept two parameters:

    • A Protobuf request message

    • A P2PContext object (provides metadata about the calling peer)

  3. Return either:

    • A single Protobuf response (Awaitable[TOutputProtobuf]), or

    • An async generator for streaming responses (AsyncIterator[TOutputStream])

Expected Signature Format

Union[TInputProtobuf, TInputStream], P2PContext

Union[Awaitable[TOutputProtobuf], AsyncIterator[TOutputStream]]

Protobuf Message Examples

Learn more about protobufs.

Here’s an example of request/response Protobuf messages used in inference:

message InferenceRequest {
  string input = 1;
  int32 max_new_tokens = 2;
  Tensor tensor = 3;
}

message InferenceResponse {
  repeated Tensor tensors = 1;
}

Subnets should have mechanisms for nodes to interact with each other, ensuring that nodes can be verified by other nodes, run automated tasks, and make requests to each other, among other purposes.

To expose methods to the DHT for nodes to call on each other, format those functions using rpc_*. Each rpc_* method should have a corresponding function that calls them (See example below).

RPC Method Examples

Each rpc_* method in the protocol must abide by specific formats.

Examples

Unary (Single Request → Single Response)

def rpc_something(
    self,
    request: inference_pb2.InferenceRequest,
    context: P2PContext
) -> inference_pb2.InferenceResponse:
    ...

Streaming (Single Request → Streamed Responses)

def rpc_do_stream(
    self,
    request: inference_pb2.InferenceRequest,
    context: P2PContext
) -> AsyncIterator[inference_pb2.InferenceResponse]:
    ...

Streaming (Stream Request → Streamed Responses)

async def rpc_stream_something(
    self, 
    requests: AsyncIterator[runtime_pb2.StreamRequest], 
    context: P2PContext
) -> AsyncIterator[runtime_pb2.StreamRequest]:
    ...

💡 Ensure your Protobuf messages are imported and type-annotated correctly. The system relies on type hints to automatically register and route handlers via add_p2p_handlers().

Peer-to-peer Communication

To call a method on a peer, you must be a node in the P2P.

Calling from within the protocol

async def call_something(
    self, peer: PeerID, data: str
) -> str:
    something = protocol_pb2.SomethingRequest(something=data)
    try:
        async with self.rpc_semaphore:
            p2p = await self.dht.replicate_p2p()
            response = await self.get_stub(p2p, peer).rpc_something(something)
            return response
    except Exception as e:
        return

Calling from outside the protocol

To call the exposed RPC methods from a protocol, outside of the protocol, the protocol itself should have a @classmethod method that returns the RPC methods of the peer.

Each protocol that needs to be accessed from areas other than from within the protocol should have a method for getting the server's stub, such as:

@classmethod
def get_server_stub(
    cls,
    p2p: P2P,
    peer: PeerID,
    authorizer: Optional[AuthorizerBase] = None
) -> "MockProtocolStub":
    stub = super().get_stub(p2p, peer)
    return AuthRPCWrapperStreamer(stub, AuthRole.CLIENT, authorizer, service_public_key=None)

Protocol Example

See the full example.

Here is an example of a subnet that allows nodes to stream data from one another:

In this example, hosters serve the model and can run inference when nodes use call_inference_stream on a hoster peer. The call_inference_stream will get the stub of the peer and call rpc_inference_stream on the hoster peer, the hoster will then perform an inference stream and stream the results back to the calling node.

The model is loaded in the child process to avoid a deadlock with CUDA.

from __future__ import annotations

import asyncio
import io
import multiprocessing as mp
from typing import AsyncIterator, Optional

import torch

import mesh
from meshimport DHT, get_dht_time
from mesh.compression.serialization import deserialize_torch_tensor, serialize_torch_tensor
from mesh.p2p import P2P, P2PContext, PeerID, ServicerBase
from mesh.proto import dht_pb2, inference_protocol_pb2, runtime_pb2
from mesh.subnet.protocols.inference_model import AsyncInferenceServer, InferenceModel
from mesh.subnet.utils.consensus import get_consensus_key
from mesh.subnet.utils.key import extract_rsa_peer_id, extract_rsa_peer_id_from_ssh
from mesh.substrate.chain_functions import Hypertensor
from mesh.utils import get_logger
from mesh.utils.asyncio import switch_to_uvloop
from mesh.utils.auth import AuthorizerBase, AuthRole, AuthRPCWrapperStreamer
from mesh.utils.mpfuture import MPFuture
from mesh.utils.serializer import MSGPackSerializer

logger = get_logger(__name__)


class InferenceProtocol(mp.context.ForkProcess, ServicerBase):

    _async_model: AsyncInferenceServer

    def __init__(
        self,
        dht: DHT,
        subnet_id: int,
        model_name: Optional[str] = None,
        balanced: bool = True,
        shutdown_timeout: float = 3,
        hypertensor: Optional[Hypertensor] = None,
        authorizer: Optional[AuthorizerBase] = None,
        start: bool = False,
    ):
        super().__init__()
        self.dht = dht
        self.subnet_id = subnet_id
        self.peer_id = dht.peer_id
        self.node_id = dht.node_id
        self.node_info = dht_pb2.NodeInfo(node_id=self.node_id.to_bytes())
        self.balanced, self.shutdown_timeout = balanced, shutdown_timeout
        self._p2p = None
        self.authorizer = authorizer
        self.ready = MPFuture()
        self.rpc_semaphore = asyncio.Semaphore(float("inf"))
        self._inner_pipe, self._outer_pipe = mp.Pipe(duplex=True)
        self.model_name = model_name
        self.daemon = True
        self.hypertensor = hypertensor

        if start:
            self.run_in_background(await_ready=True)

    def run(self):
        torch.set_num_threads(1)
        loop = switch_to_uvloop()
        stop = asyncio.Event()
        loop.add_reader(self._inner_pipe.fileno(), stop.set)

        async def _run():
            try:
                self._p2p = await self.dht.replicate_p2p()
                """Add rpc_* methods from this class to the P2P servicer"""
                logger.info("Adding P2P handlers")
                if self.authorizer is not None:
                    logger.info("Adding P2P handlers with authorizer")
                    await self.add_p2p_handlers(
                        self._p2p,
                        AuthRPCWrapperStreamer(self, AuthRole.SERVICER, self.authorizer),
                    )
                else:
                    await self.add_p2p_handlers(self._p2p, balanced=self.balanced)

                """
                Run pytorch functions and classes in the child process
                Read more:
                    - https://stackoverflow.com/questions/22950047/cuda-initialization-error-after-fork/22950549#22950549
                    - https://github.com/pytorch/pytorch/issues/17199
                """

                if self.model_name is not None:
                    logger.info("Loading Inference Model")
                    model = InferenceModel(self.model_name)
                    logger.info("Setting Up Async Inference Server")
                    self._async_model = AsyncInferenceServer(model)
                    logger.info("Starting Async Inference Server Worker")
                    asyncio.create_task(self._async_model._worker())
                    logger.info("Async Inference Server Complete")

                self.ready.set_result(None)
            except Exception as e:
                logger.debug(e, exc_info=True)
                self.ready.set_exception(e)

            try:
                await stop.wait()
            finally:
                await self.remove_p2p_handlers(self._p2p)

        try:
            loop.run_until_complete(_run())
        except KeyboardInterrupt:
            logger.debug("Caught KeyboardInterrupt, shutting down")

    def run_in_background(self, await_ready: bool = True, timeout: Optional[float] = None) -> None:
        """
        Starts InferenceProtocol in a background process. If :await_ready:, this method will wait until
        it is ready to process incoming requests or for :timeout: seconds max.
        """
        self.start()

    def shutdown(self):
        if self.is_alive():
            self.join(self.shutdown_timeout)
            if self.is_alive():
                logger.warning(
                    "InferenceProtocol did not shut down within the grace period; terminating it the hard way"
                )
                self.terminate()
        else:
            logger.warning("InferenceProtocol shutdown had no effect, the process is already dead")

    def get_stub(self, p2p: P2P, peer: PeerID) -> AuthRPCWrapperStreamer:
        """
        Get a stub that sends requests to a given peer.

        It's important here to wrap the stub with an authentication wrapper, see AuthRPCWrapper
        """
        stub = super().get_stub(p2p, peer)
        return AuthRPCWrapperStreamer(stub, AuthRole.CLIENT, self.authorizer, service_public_key=None)

    @classmethod
    def get_server_stub(
        cls,
        p2p: P2P,
        peer: PeerID,
        authorizer: Optional[AuthorizerBase] = None
    ) -> "InferenceProtocolStub":  # type: ignore # noqa: F821
        """
        Get a stub that sends requests to a given peer.

        This function can be used to get the RPC methods from this protocol outside of this class.

        This is useful for client-side requests.
        """

        stub = super().get_stub(p2p, peer)
        return AuthRPCWrapperStreamer(stub, AuthRole.CLIENT, authorizer, service_public_key=None)

    async def rpc_info(self, request: runtime_pb2.Empty, context: P2PContext) -> runtime_pb2.NodeData:
        """Return metadata about stored block uids and current load"""

        result = {
            "version": hivemind.__version__,
            "dht_client_mode": self.dht.client_mode,
            "role": "hoster" if self.model_name is not None else "validator"
        }

        return runtime_pb2.NodeData(serialized_info=MSGPackSerializer.dumps(result))

    async def call_inference_stream(
        self, peer: PeerID, prompt: str, tensor: torch.Tensor
    ) -> AsyncIterator[torch.Tensor]:
        """
        Call another peer to perform an inference stream on the `tensor`

        The inference will be returned as a streamed
        """
        input_stream = inference_protocol_pb2.InferenceRequestAuth(
            input=prompt,
            max_new_tokens=5,
            tensor=serialize_torch_tensor(tensor),
        )

        try:
            async with self.rpc_semaphore:
                p2p = await self.dht.replicate_p2p()
                response_stream = await self.get_stub(p2p, peer).rpc_inference_stream(input_stream)
                async for response in response_stream:
                    for tensor_bytes in response.tensors:
                        tensor = deserialize_torch_tensor(tensor_bytes)
                        yield tensor
        except Exception as e:
            logger.error(f"InferenceProtocol failed to stream from {peer}: {e}", exc_info=True)
            return

    def should_process_inference(self, tensor: torch.Tensor) -> bool:
        """
        Add RPC validation
        """
        return True

    async def rpc_inference_stream(
        self, requests: inference_protocol_pb2.InferenceRequestAuth, context: P2PContext
    ) -> AsyncIterator[inference_protocol_pb2.InferenceResponseAuth]:
        """
        A peer wants us to perform an inference stream
        """
        tensor = deserialize_torch_tensor(requests.tensor)

        caller_peer_id = extract_rsa_peer_id_from_ssh(requests.auth.client_access_token.public_key)
        """
        Don't allow other hosters to call inference on me if it matches
        the current epochs random consensus tensors
        """
        if self.authorizer is not None and not caller_peer_id.__eq__(self.peer_id):
            # Don't bother pinging the decentralized storage unless we have to
            run_inference = self.should_process_inference(tensor)
            if run_inference is False:
                raise ValueError("Invalid request. ")

        async for token_tensor in await self._async_model.submit(tensor):
            yield inference_protocol_pb2.InferenceResponseAuth(
                peer=self.node_info,
                dht_time=get_dht_time(),
                output=str(token_tensor.item()),
                tensors=[serialize_torch_tensor(token_tensor)]
            )

Last updated