# Protocols

Custom **Protocols** define the core logic of your subnet — they are the building blocks that power how nodes communicate, coordinate, and perform general or distributed tasks.

As a subnet developer, protocols are where you implement the **application-layer behavior** for your decentralized AI system. Whether you're building a model inference engine, a validator network, or a task delegation system, protocols are where the logic lives.

A subnet can have one or multiple protocols, and they can interact with each other.

Each protocol should have a parent and child process, which can be accomplished using the Python builtins [multiprocessing](https://docs.python.org/library/multiprocessing.html) library.

{% hint style="info" %}
The template includes the [**DHT Protocol**](https://docs.hypertensor.org/subnet-template/dht#dhtprotocol) built in and should not be altered without P2P expertise. The DHT Protocol is the native protocol of the decentralized network, which handles the core security, storage, and communication logic, and more.&#x20;

Subnet builders will create custom protocols tailored to the specific use case of the subnet, such as inference or training.
{% endhint %}

[Skip to view the example protocol](#protocol-example).

### What a Protocol Does

Protocols register RPC methods to the P2P network and expose them to other nodes, enabling nodes to call these RPC methods on each other in a decentralized manner.

{% hint style="info" %}
If a subnet's use case doesn't require peers to communicate with each other, protocols are not required to register RPC methods.
{% endhint %}

### **Structure of a Protocol**

Each protocol typically consists of two parts:

* **Parent Process (Main Process)** – Manages the core application logic, P2P interactions, and spawns child processes.
* **Child Process (Worker Process)** – Handles compute-intensive or blocking tasks (e.g., model inference), often run using Python's `multiprocessing` library to avoid blocking the event loop.

### **RPC Registration**

Protocols register **RPC (Remote Procedure Call)** methods that allow other nodes in the subnet to interact with them.

* These RPC methods are registered with the DHT and can be called remotely using `get_stub()`.
  * All methods in the protocol class that start with `rpc_*` will be registered. See [RPC methods formatting](#rpc-methods-formatting) below for more information on proper formatting.
* The `ServerBase` class provides the `get_stub()` utility, and protocol implementations expose their own `get_server_stub()` for convenience.
* These RPCs can be:
  * **One-shot methods** that return data or acknowledge a command.
  * **Streaming methods** that yield results over time (async generators).

**Example use cases include and are not limited to**:

* Submitting inference jobs
* Fetching model outputs or scores
* Notifying peers of new tasks or events
* Automating tasks
* Requesting data from a node

***

## ServicerBase

Every protocol in a Hypertensor subnet must subclass `ServicerBase`, which acts as the foundation for defining and handling **P2P RPC methods**.

### **What `ServicerBase` Provides**

#### **`add_p2p_handlers()`**

This method registers all `rpc_*` methods defined in your protocol as RPC endpoints. It inspects the method signatures using type annotations to automatically infer the **Protobuf request and response types**.

Once registered, these methods become callable by other nodes in the subnet via the DHT-based P2P layer.

> 🔧 **Note**: You must call `add_p2p_handlers()` from within the **child process** where the protocol is running. Otherwise, the handlers won't be bound correctly.

#### **`get_stub(p2p, peer)`**

This method creates a stub object that exposes all the `rpc_*` methods of a remote peer.

* Calling a method on the stub translates into a remote RPC call to the target peer.
* Under the hood, it uses the same method names and protobuf interfaces as the local implementation.

> 🔐 **Best practice**: Always wrap `get_stub()` with an authentication layer (e.g., proof-of-stake, signature verification, etc.) to ensure the identity and trustworthiness of the peer you're interacting with.

### RPC Methods Formatting

When defining RPC methods for your protocol, they must follow a specific signature format that is compatible with `add_p2p_handlers()` from `ServicerBase`.

Each `rpc_*` method must:

1. Be `async def`
2. Accept two parameters:
   * A Protobuf request message
   * A `P2PContext` object (provides metadata about the calling peer)
3. Return either:
   * A single Protobuf response (`Awaitable[TOutputProtobuf]`), or
   * An async generator for streaming responses (`AsyncIterator[TOutputStream]`)

**Expected Signature Format**

```python
Union[TInputProtobuf, TInputStream], P2PContext
→
Union[Awaitable[TOutputProtobuf], AsyncIterator[TOutputStream]]
```

[See full examples below](#rpc-method-examples)

***

### Protobuf Message Examples

[Learn more about protobufs](https://docs.hypertensor.org/subnet-template/protobuf).

All messages between peers must have a Protobuf message.&#x20;

See [RPC Method Examples](#rpc-method-examples) for how to create a function to have a peer request another peer to perform and respond to the request.

See [Peer-to-peer Communication](#peer-to-peer-communication) for how to request a peer to perform a task.

Here's an example of request/response Protobuf messages used in inference:

```go
message InferenceRequest {
  string input = 1;
  int32 max_new_tokens = 2;
  Tensor tensor = 3;
}

message InferenceResponse {
  repeated Tensor tensors = 1;
}
```

***

Subnets should have mechanisms for nodes to interact with each other, ensuring that nodes can be verified by other nodes, run automated tasks, and make requests to each other, among other purposes.

To expose methods to the DHT for nodes to call on each other, format those functions using `rpc_*`. Each `rpc_*` method should have a corresponding function that calls them (See example below).

### RPC Method Examples

Each `rpc_*` method in the protocol must abide by specific formats.

#### Examples

**Unary (Single Request → Single Response)**

```python
def rpc_something(
    self,
    request: inference_pb2.InferenceRequest,
    context: P2PContext
) -> inference_pb2.InferenceResponse:
    ...
```

**Streaming (Single Request → Streamed Responses)**

```python
def rpc_do_stream(
    self,
    request: inference_pb2.InferenceRequest,
    context: P2PContext
) -> AsyncIterator[inference_pb2.InferenceResponse]:
    ...
```

**Streaming (Stream Request → Streamed Responses)**

```python
async def rpc_stream_something(
    self, 
    requests: AsyncIterator[runtime_pb2.StreamRequest], 
    context: P2PContext
) -> AsyncIterator[runtime_pb2.StreamRequest]:
    ...
```

> 💡 Ensure your Protobuf messages are imported and type-annotated correctly. The system relies on type hints to automatically register and route handlers via `add_p2p_handlers()`.

### Peer-to-peer Communication

To call a method on a peer, you must be a node in the P2P network (subnet).

#### Calling from within the protocol

```python
async def call_something(
    self, peer: PeerID, data: str
) -> str:
    something = protocol_pb2.SomethingRequest(something=data)
    try:
        async with self.rpc_semaphore:
            p2p = await self.dht.replicate_p2p()
            response = await self.get_stub(p2p, peer).rpc_something(something)
            return response
    except Exception as e:
        return "Error"
```

#### Calling from outside the protocol

To call the exposed RPC methods from a protocol, outside of the protocol, the protocol itself should have a `@classmethod` method that returns the RPC methods of the peer.

Each protocol that needs to be accessed from areas other than from within the protocol should have a method for getting the server's stub, such as:

```python
@classmethod
def get_server_stub(
    cls,
    p2p: P2P,
    peer: PeerID,
    authorizer: Optional[AuthorizerBase] = None
) -> "MockProtocolStub":
    stub = super().get_stub(p2p, peer)
    return AuthRPCWrapperStreamer(stub, AuthRole.CLIENT, authorizer, service_public_key=None)
```

**Calling from outside the protocol example**:

```python
p2p = RemoteWorker.run_coroutine(dht.replicate_p2p())
stub = MockProtocol.get_server_stub(
    p2p,
    peer_id,
    authorizer
)
```

***

## Protocol Example

[See the full example](https://github.com/hypertensor-blockchain).

#### Here is an example of a subnet that allows nodes to stream data from one another:

In this example, hosters serve the model and can run inference when nodes use `call_inference_stream` on a hoster peer. The `call_inference_stream` will get the stub of the peer and call `rpc_inference_stream` on the hoster peer, the hoster will then perform an inference stream and stream the results back to the calling node.

{% hint style="info" %}
The model is loaded in the child process to [avoid a deadlock](https://docs.pytorch.org/docs/stable/notes/multiprocessing.html#cuda-in-multiprocessing) with **CUDA**.
{% endhint %}

```python
from __future__ import annotations

import asyncio
import io
import multiprocessing as mp
from typing import AsyncIterator, Optional

import torch

import subnet
from subnet import DHT, get_dht_time
from subnet.compression.serialization import deserialize_torch_tensor, serialize_torch_tensor
from subnet.p2p import P2P, P2PContext, PeerID, ServicerBase
from subnet.proto import dht_pb2, inference_protocol_pb2, runtime_pb2
from subnet.app.protocols.inference_model import AsyncInferenceServer, InferenceModel
from subnet.app.utils.consensus import get_consensus_key
from subnet.app.utils.key import extract_rsa_peer_id, extract_rsa_peer_id_from_ssh
from subnet.substrate.chain_functions import Hypertensor
from subnet.utils import get_logger
from subnet.utils.asyncio import switch_to_uvloop
from subnet.utils.auth import AuthorizerBase, AuthRole, AuthRPCWrapperStreamer
from subnet.utils.mpfuture import MPFuture
from subnet.utils.serializer import MSGPackSerializer

logger = get_logger(__name__)


class InferenceProtocol(mp.context.ForkProcess, ServicerBase):

    _async_model: AsyncInferenceServer

    def __init__(
        self,
        dht: DHT,
        subnet_id: int,
        model_name: Optional[str] = None,
        balanced: bool = True,
        shutdown_timeout: float = 3,
        hypertensor: Optional[Hypertensor] = None,
        authorizer: Optional[AuthorizerBase] = None,
        start: bool = False,
    ):
        super().__init__()
        self.dht = dht
        self.subnet_id = subnet_id
        self.peer_id = dht.peer_id
        self.node_id = dht.node_id
        self.node_info = dht_pb2.NodeInfo(node_id=self.node_id.to_bytes())
        self.balanced, self.shutdown_timeout = balanced, shutdown_timeout
        self._p2p = None
        self.authorizer = authorizer
        self.ready = MPFuture()
        self.rpc_semaphore = asyncio.Semaphore(float("inf"))
        self._inner_pipe, self._outer_pipe = mp.Pipe(duplex=True)
        self.model_name = model_name
        self.daemon = True
        self.hypertensor = hypertensor

        if start:
            self.run_in_background(await_ready=True)

    def run(self):
        torch.set_num_threads(1)
        loop = switch_to_uvloop()
        stop = asyncio.Event()
        loop.add_reader(self._inner_pipe.fileno(), stop.set)

        async def _run():
            try:
                self._p2p = await self.dht.replicate_p2p()
                """Add rpc_* methods from this class to the P2P servicer"""
                logger.info("Adding P2P handlers")
                if self.authorizer is not None:
                    logger.info("Adding P2P handlers with authorizer")
                    await self.add_p2p_handlers(
                        self._p2p,
                        AuthRPCWrapperStreamer(self, AuthRole.SERVICER, self.authorizer),
                    )
                else:
                    await self.add_p2p_handlers(self._p2p, balanced=self.balanced)

                """
                Run pytorch functions and classes in the child process
                Read more:
                    - https://stackoverflow.com/questions/22950047/cuda-initialization-error-after-fork/22950549#22950549
                    - https://github.com/pytorch/pytorch/issues/17199
                """

                if self.model_name is not None:
                    logger.info("Loading Inference Model")
                    model = InferenceModel(self.model_name)
                    logger.info("Setting Up Async Inference Server")
                    self._async_model = AsyncInferenceServer(model)
                    logger.info("Starting Async Inference Server Worker")
                    asyncio.create_task(self._async_model._worker())
                    logger.info("Async Inference Server Complete")

                self.ready.set_result(None)
            except Exception as e:
                logger.debug(e, exc_info=True)
                self.ready.set_exception(e)

            try:
                await stop.wait()
            finally:
                await self.remove_p2p_handlers(self._p2p)

        try:
            loop.run_until_complete(_run())
        except KeyboardInterrupt:
            logger.debug("Caught KeyboardInterrupt, shutting down")

    def run_in_background(self, await_ready: bool = True, timeout: Optional[float] = None) -> None:
        """
        Starts InferenceProtocol in a background process. If :await_ready:, this method will wait until
        it is ready to process incoming requests or for :timeout: seconds max.
        """
        self.start()

    def shutdown(self):
        if self.is_alive():
            self.join(self.shutdown_timeout)
            if self.is_alive():
                logger.warning(
                    "InferenceProtocol did not shut down within the grace period; terminating it the hard way"
                )
                self.terminate()
        else:
            logger.warning("InferenceProtocol shutdown had no effect, the process is already dead")

    def get_stub(self, p2p: P2P, peer: PeerID) -> AuthRPCWrapperStreamer:
        """
        Get a stub that sends requests to a given peer.

        It's important here to wrap the stub with an authentication wrapper, see AuthRPCWrapper
        """
        stub = super().get_stub(p2p, peer)
        return AuthRPCWrapperStreamer(stub, AuthRole.CLIENT, self.authorizer, service_public_key=None)

    @classmethod
    def get_server_stub(
        cls,
        p2p: P2P,
        peer: PeerID,
        authorizer: Optional[AuthorizerBase] = None
    ) -> "InferenceProtocolStub":  # type: ignore # noqa: F821
        """
        Get a stub that sends requests to a given peer.

        This function can be used to get the RPC methods from this protocol outside of this class.

        This is useful for client-side requests.
        """

        stub = super().get_stub(p2p, peer)
        return AuthRPCWrapperStreamer(stub, AuthRole.CLIENT, authorizer, service_public_key=None)

    async def rpc_info(self, request: runtime_pb2.Empty, context: P2PContext) -> runtime_pb2.NodeData:
        """Return metadata about stored block uids and current load"""

        result = {
            "version": subnet.__version__,
            "dht_client_mode": self.dht.client_mode,
            "role": "hoster" if self.model_name is not None else "validator"
        }

        return runtime_pb2.NodeData(serialized_info=MSGPackSerializer.dumps(result))

    async def call_inference_stream(
        self, peer: PeerID, prompt: str, tensor: torch.Tensor
    ) -> AsyncIterator[torch.Tensor]:
        """
        Call another peer to perform an inference stream on the `tensor`

        The inference will be returned as a streamed
        """
        input_stream = inference_protocol_pb2.InferenceRequestAuth(
            input=prompt,
            max_new_tokens=5,
            tensor=serialize_torch_tensor(tensor),
        )

        try:
            async with self.rpc_semaphore:
                p2p = await self.dht.replicate_p2p()
                response_stream = await self.get_stub(p2p, peer).rpc_inference_stream(input_stream)
                async for response in response_stream:
                    for tensor_bytes in response.tensors:
                        tensor = deserialize_torch_tensor(tensor_bytes)
                        yield tensor
        except Exception as e:
            logger.error(f"InferenceProtocol failed to stream from {peer}: {e}", exc_info=True)
            return

    def should_process_inference(self, tensor: torch.Tensor) -> bool:
        """
        Add RPC validation
        """
        return True

    async def rpc_inference_stream(
        self, requests: inference_protocol_pb2.InferenceRequestAuth, context: P2PContext
    ) -> AsyncIterator[inference_protocol_pb2.InferenceResponseAuth]:
        """
        A peer wants us to perform an inference stream
        """
        tensor = deserialize_torch_tensor(requests.tensor)

        caller_peer_id = extract_rsa_peer_id_from_ssh(requests.auth.client_access_token.public_key)
        """
        Don't allow other hosters to call inference on me if it matches
        the current epochs random consensus tensors
        """
        if self.authorizer is not None and not caller_peer_id.__eq__(self.peer_id):
            # Don't bother pinging the decentralized storage unless we have to
            run_inference = self.should_process_inference(tensor)
            if run_inference is False:
                raise ValueError("Invalid request. ")

        async for token_tensor in await self._async_model.submit(tensor):
            yield inference_protocol_pb2.InferenceResponseAuth(
                peer=self.node_info,
                dht_time=get_dht_time(),
                output=str(token_tensor.item()),
                tensors=[serialize_torch_tensor(token_tensor)]
            )
```
