# GossipSub

## Introduction

A GossipSub is a publish/subscribe messaging mechanism for distributed networks of computers. It's an efficient, optimized way for scalable communication, in which a single peer can send data or requests to multiple or all peers.

**Nodes in a GossipSub network can “subscribe” to a topic to receive messages that are “published” to it**. The key feature of GossipSub is its self-optimizing, allowing each message to travel along the shortest path to each of its neighbors.

Setting up the gossipsub system is simple using the **GossipSub template**. All that is required is logic for receiving the message (this handles subscribing to a topic) and a way for peers to publish messages. Once a peer publishes a message, all other peers that subscribe to that topic will receive it via epidemic-style messaging.

Which peers subscribe to a topic can be role-based and validated via the `topic_validator`.&#x20;

**For example**, if a subnet has a *"trainer"* role and a *"validator"* role, a `topic_validator` can be set up to ensure only the roles that should be publishing and or forwarding the messages are allowed to.

***

**To build a gossip system, configure the following:**

* Configure how to handle receiving gossip messages
* Develop how and where messages are published to a topic

***

## Receiving Messages

Setting up Gossip topics is a simple approach **using a configuration class** that takes arguments to input into the gossip template:&#x20;

1. **A topic to subscribe to**
   1. Subscribing to a topic to receive all messages published to this topic.
2. **A handler function to handle the message after receiving it**
   1. This can be used to store the data for later use.
3. **A validator for the messages**
   1. Create a validator for all messages, such as:
      1. Pydantic validation
      2. Ensuring data is valid
      3. Ensuring only specific roles can publish messages to a topic
      4. Ensuring only specific roles can forward messages in a topic
      5. etcetera

```python
@dataclass(frozen=True, slots=True)
class GossipTopicConfig:
    """
    Configuration for one GossipSub topic.

    Args:
        topic: Topic string passed to ``Pubsub.subscribe``.
        topic_handler: Function called as ``handler(from_peer_id, message)``.
            Handlers may be synchronous or asynchronous.
        topic_validator: py-libp2p topic validator called by ``Pubsub`` before
            messages are delivered to subscribers.
        is_async_topic_validator: Set to ``True`` when ``topic_validator`` is an
            async validator.

    """

    topic: str
    topic_handler: GossipMessageHandler
    topic_validator: ValidatorFn | None
    is_async_topic_validator: bool = False
```

#### `topic`

The name of the events to listen for.

#### `topic_handler`

The function that will be called after receiving the topic message. The function can be non-async or async, and the format is:

```python
Callable[[ID, rpc_pb2.Message], Awaitable[None] | None]
```

Where the ID is the peer from whom the message is, and the data is the protobuf message. This looks like:

```python
async def fn(from_peer_id: ID, message: rpc_pb2.Message) -> None:
```

The `from_peer_id` is the peer that forwarded the message to you (the peer that originated the publishing of the message, not the peer that forwarded it to you).

#### `topic_validator`

The validator function that validates the gossip message while in propagation.

The validator can be either a non-async function or an async function:&#x20;

* ```python
  Callable[[ID, rpc_pb2.Message], bool]
  ```
* ```python
  Callable[[ID, rpc_pb2.Message], Awaitable[bool]]
  ```

Where the ID is the `forwarder_peer_id` (not the peer that originated the publishing of the message, but the peer that forwarded it to you). This looks like:

```python
def fn(self, forwarder_peer_id: ID, msg: rpc_pb2.Message) -> bool:
```

```python
async def fn(self, forwarder_peer_id: ID, msg: rpc_pb2.Message) -> bool:
```

#### Message Protobuf

The message protobuf `rpc_pb2.Message` is the following:

```protobuf
message Message {
    optional bytes from_id = 1;
    optional bytes data = 2;
    optional bytes seqno = 3;
    repeated string topicIDs = 4;
    optional bytes signature = 5;
    optional bytes key = 6;
}
```

`from_id` is the peer ID that published and originated the message that peers are forwarded to all peers that subscribed to this topic.

`data` is the payload.

### Example

This is an example where peers publish a vote and a peer state. For the "votes" topic, we create the function `handle_vote` that stores the message, then `validate_vote` that validates the peer forwarding the message has a proof of stake, the publishing peer is not a validator because only the "trainers" role can gossip votes, and ensures the data matches the Pyndantic schema.

```python
async def handle_vote(from_peer_id: ID, msg: rpc_pb2.Message) -> None:
    """
    The data has been validated, so we now handle the message.
    """
    # store to database
    db.store(ID(msg.from_id), msg.data.decode("utf-8"))

def validate_vote(self, forwarder_peer_id: ID, msg: rpc_pb2.Message) -> bool:
    """
    Only allow messages from `validator` role nodes.
    """
    # validate the forwarder is a validator
    # this is an example ``is_validator`` function
    if is not pos.proof_of_stake(
        peer_id=peer_id,
    ):
        return false

    # validate that the peer the message is from is in the correct role
    from_peer_id = ID(msg.from_id)
    if is not is_validator(from_peer_id):
        return false
    ...
    # validate message data Pydantic schema
    try:
        data = Vote.from_json(msg.data.decode("utf-8"))
    except Exception as e:
        return false
    # check if voted before
    ...
    return True

async def handle_peer_state(from_peer_id: ID, message: rpc_pb2.Message) -> None:
    # store to database
    ...

# Initialize the Gossip receiver template
receiver = GossipReceiverTemplate(
    pubsub=pubsub,
    termination_event=trio.Event(),
    topics_config=[
        GossipTopicConfig(
            topic="votes",
            topic_handler=handle_vote,
            topic_validator=validate_vote,
        ),
        GossipTopicConfig(
            topic="peer_state",
            topic_handler=handle_peer_state,
            topic_validator=None,
        ),
    ],
)

async with trio.open_nursery() as nursery:
    nursery.start_soon(receiver.run)
    ...
    termination_event.set()
```

Once this is set up, any peer running this code will listen and wait for other peers to publish messages to the topic or topics created in the GossipSub.

## Publishing Messages

Once the receiver is set up, you can begin publishing messages to the GossipSub.

Publishing messages to peers subscribed to a topic is as simple as calling `await self.pubsub.publish(topic, message_bytes)` from any class or function that takes in the Pubsub class.

A basic peer state heartbeat publisher can look like:

```python

class ServerState(Enum):
    OFFLINE = 0
    JOINING = 1
    ONLINE = 2


class PeerRole(Enum):
    """
    Add custom roles, e.g., miner, validator, trainer, etc.

    This logic can be stored and reused for role-permission-based logic.
    """
    
    VALIDATOR = 0
    

class PeerStateData(BaseModel):
    uid: str
    epoch: int
    subnet_id: int
    subnet_node_id: int
    state: ServerState
    role: PeerRole

async def publish_peer_state_heartbeat_loop(
    pubsub: Pubsub,
    topic: TProtocol,
    state: ServerState,
    role: PeerRole,
    subnet_id: int,
    subnet_node_id: int,
    hypertensor: LocalMockHypertensor | Hypertensor,
):
    while not termination_event.is_set():
        current_epoch = hypertensor.get_subnet_epoch_data(hypertensor.get_subnet_slot(subnet_id)).epoch
        message = PeerStateData(
            uid=secrets.token_hex(16),
            epoch=current_epoch,
            subnet_id=subnet_id,
            subnet_node_id=subnet_node_id,
            state=state,
            role=role,
        )

        # Publish peer state heartbeat to all peers subscribed to the topic
        await pubsub.publish(topic, message.to_bytes())
        await trio.sleep(20)
```

## Membership Validator

The above logic in the GossipReceiverTemplate handles both validating who can forward messages and who can originate messages in a GossipSub. Still, it doesn't handle which peers can listen to the topics.

This is what the topic membership validator template solves.

{% hint style="info" %}
This is something that should be rarely used. In cases where it's a must, where role gossip messaging should be isolated from other roles, this should be used.
{% endhint %}

## Application

```python
from subnet.hypertensor.chain_functions import Hypertensor
from subnet.hypertensor.mock.local_chain_functions import LocalMockHypertensor
from subnet.server.server_template import ServerBase

class MyApplication(ApplicationBase):
    def __init__(
        self,
        hypertensor: Hypertensor | LocalMockHypertensor,
        role: PeerRole,
        subnet_id: int,
        subnet_node_id: int
    ):
        self.hypertensor = hypertensor
        self.role = role
        self.subnet_id= subnet_id
        self.subnet_node_id= subnet_node_id
        self.receiver = None
        self.telemetry = None

    async def setup(self, context: P2PNetworkContext) -> None:
        if self.telemetry is not None:
            context.nursery.start_soon(self.telemetry.run)
        
    async def start_application(self, context: P2PNetworkContext) -> None:
        self.receiver = GossipReceiverTemplate(
            pubsub=context.pubsub,
            termination_event=trio.Event(),
            topics_config=[
                GossipTopicConfig(
                    topic="votes",
                    topic_handler=handle_vote,
                    topic_validator=validate_vote,
                ),
                GossipTopicConfig(
                    topic="peer_state",
                    topic_handler=handle_peer_state,
                    topic_validator=None,
                ),
            ],
        )
        context.nursery.start_soon(self.receiver.run)
        
        # Start peer state heartbeat publisher interval
        context.nursery.start_soon(
            publish_peer_state_heartbeat_loop,
            context.pubsub,
            "peer_state",
            ServerState,
            self.role,
            self.subnet_id,
            self.subnet_node_id,
            self.hypertensor,
        )
        
        # Start vote logic
        # Attestor(pubsub=context.pubsub)
        
class MyServer(ServerBase):
    def __init__(
        self,
        *,
        ip: str | None = None,
        port: int,
        bootstrap_addrs: Sequence[str] | None = None,
        key_pair: KeyPair,
        db: RocksDB,
        subnet_id: int,
        subnet_slot: int = 3,
        subnet_node_id: int,
        hypertensor: Hypertensor | LocalMockHypertensor,
        is_bootstrap: bool = False,
        telemetry: Telemetry | None = None,
        enable_mDNS: bool = False,
        enable_upnp: bool = False,
        enable_autotls: bool = False,
        resource_manager: ResourceManager | None = None,
        psk: str | None = None,
    ) -> None:
        application = MyApplication(
            key_pair=key_pair,
            db=db,
            subnet_id=subnet_id,
            subnet_node_id=subnet_node_id,
            is_bootstrap=is_bootstrap,
            telemetry=telemetry,
        )

        super().__init__(
            ip=ip or "0.0.0.0",
            port=port,
            application=application,
            key_pair=key_pair,
            bootstrap_addrs=bootstrap_addrs,
            use_available_interfaces=True,
            enable_pubsub=True,
            enable_random_walk=True,
            enable_mDNS=enable_mDNS,
            enable_upnp=enable_upnp,
            enable_autotls=enable_autotls,
            resource_manager=resource_manager,
            psk=psk,
            max_connections_per_peer=6,
            enable_ping=True,
            enable_proof_of_stake=True,
            db=db,
            subnet_id=subnet_id,
            subnet_slot=subnet_slot,
            subnet_node_id=subnet_node_id,
            hypertensor=hypertensor,
            is_bootstrap=is_bootstrap,
            enable_subnet_info_tracker=True,
            enable_connection_maintenance=True,
            strict_maintain_connections=True,
            telemetry=telemetry,
            maintain_connections_log_level=logging.DEBUG,
        )
```

## GossipSub Research

<https://research.protocol.ai/blog/2019/a-new-lab-for-resilient-networks-research/PL-TechRep-gossipsub-v0.1-Dec30.pdf>


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.hypertensor.org/copy-of-subnet-template/communication/gossipsub.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
