# Merkle Directed Acyclic Graphs (DAGs)

A Merkle Directed Acyclic Graph (DAG) is built into the template to enable the network to have a chain of events similar to a blockchain.

This is a more advanced feature of the subnet template and should be used if peers require historical data for features, such as to score other peers in the network based on multiple epochs of data, or if newly joined peers require syncing historical data, similar to a blockchain archive node.

This will be a high-level overview of setting up the DAG, syncing peers, and publishing nodes to the DAG.

This can be used in place of using GossipSub directly. The DAG uses the GossipSub itself, but the data will instead form a DAG chain, similar to a blockchain.

To set up a DAG, there are two main concepts: the **DAG receiver** and the **DAG publisher**.

## DAG Receiver

There are only 4 classes that must set up the Merkle DAG receiver logic that can handle multiple payloads:

### MerkleDagSyncProtocol

The MerkleDagSyncProtocol handles syncing the Merkle DAG between peers. For example, when a peer joins the subnet, it will handle the communication between peers to get the joining peer up to date on the latest DAG node.

### SyncProtocolPeerRequestClient

The SyncProtocolPeerRequestClient handles calling the MerkleDagSyncProtocol.

### DagPeerSetProvider

This handles fetching peers to request syncing the Merkle DAG to in the MerkleDagSyncProtocol.

### DagGossipSystem

The DagGossipSystem is the base layer for all Merkle DAG logic, including receiving gossip messages from other peers to form a DAG.

***

## DAG Publisher

The DAG publishing logic comes with a template called the DagPublisherTemplate. Here, you use this template to set up

When creating a Merkle DAG specific to your subnets' use cases, you need to create a payload that will be announced to the DAG. You can also create multiple payloads, and each payload will, by default, have its own line in the Merkle DAG.

Publishing a node to the DAG can be as simple as a function that has an interval where it will publish to the DagGossipSystem.

***

## Usage

Start with the data that will be published to the DAG.

### Payload & Schema

First, we create the payload, which is a Pydantic dataclass template:

#### DagPayloadTemplate

The DAG payload template is the payload template that will be inhreited by your custom payload class.

```python
class DagPayloadTemplate(BaseModel):
    """Pydantic payload base with the standard DAG metadata/JSON helpers."""

    def to_metadata(self) -> dict[str, Any]:
        """Serialize this payload to JSON-compatible DAG metadata."""
        return self.model_dump(mode="json")

    def to_json(self) -> str:
        """Serialize this payload to a JSON string."""
        return self.model_dump_json()

    @classmethod
    def from_metadata(cls: type[DagPayloadT], metadata: Any) -> DagPayloadT:
        """Deserialize this payload from DAG metadata or a DAG body payload."""
        return cls.model_validate(metadata)

    @classmethod
    def from_json(cls: type[DagPayloadT], payload: str | bytes | bytearray) -> DagPayloadT:
        """Deserialize this payload from a JSON string or bytes."""
        return cls.model_validate_json(payload)
```

#### Custom Payload

Create the custom payload. In this example, each peer will publish a peer state payload including its role and state.

```python
from subnet.utils.dag.dag_publisher_template import (
    DagPayloadTemplate,
    DagPublisherTemplateSchema,
)


class ServerState(Enum):
    OFFLINE = 0
    JOINING = 1
    ONLINE = 2


class PeerRole(Enum):
    VALIDATOR = 0


class PeerStateData(DagPayloadTemplate):
    """Application-level peer status stored in DAG node metadata."""

    model_config = ConfigDict(arbitrary_types_allowed=True)

    uid: str
    epoch: int
    subnet_id: int
    subnet_node_id: int
    state: ServerState
    role: PeerRole
    multiaddr: Multiaddr | None = None
    
    def model_post_init(self, __context: Any) -> None:
        assert self.subnet_id > 0, "Subnet ID must be greater than 0"
        assert self.subnet_node_id > 0, "Subnet node ID must be greater than 0"
```

## Initializing Merkle DAG Example

We now initialize the `DagGossipSystem` which handles all DAG related tasks

The following logic will be handled by the ServerBase template, so all of the P2P arguments, such as dht, pubsub, and gossipsub, are automatically generated.

```python
from subnet.protocols.dag_sync_protocol import (
    DagPeerSetProvider,
    MerkleDagSyncProtocol,
    SyncProtocolPeerRequestClient,
)
from subnet.utils.dag.dag_gossip_system import DagGossipSystem, DagGossipTopicConfig

# The following 
sync_protocol = MerkleDagSyncProtocol(
    host=host,
    db=self.db,
    dht=dht,
    pubsub=pubsub,
    gossipsub=gossipsub,
)
request_client = SyncProtocolPeerRequestClient(sync_protocol)
peer_provider = DagPeerSetProvider(sync_protocol)

dag_system = DagGossipSystem(
    pubsub=pubsub,
    termination_event=termination_event,
    db=self.db,
    local_peer_id=host.get_id(),
    topics=[
        DagGossipTopicConfig(
            topic=PEER_STATE_TOPIC,
            namespace=DAG_NAMESPACE,
            payload_schemas=[DagPublisherTemplateSchema(PEER_STATE_SCHEMA_ID, PeerStateData)],
            schema_id=PEER_STATE_SCHEMA_ID,
            author=host.get_id().to_string(),
            parent_schema_id=PEER_STATE_SCHEMA_ID,
            signer=Libp2pKeyPairSigner(self.key_pair),
            skip_if_orphans=True,
            request_client=request_client,
            peer_provider=peer_provider,
            latest_node_snapshot_db_key=PEER_STATE_TOPIC,
        ),
    ],
)
sync_protocol.set_request_handler(dag_system.handle_sync_request_bytes)
nursery.start_soon(dag_system.run)
```

## Initializing Merkle DAG Publisher

Now we create the class that handles configuring the payload for the DAG.

```python
class PeerStateDagPublisher(DagPublisherTemplate[PeerStateData]):
    def __init__(
        self,
        dag_system: DagGossipSystem,
        start_state: ServerState,
        start_role: PeerRole,
        subnet_id: int,
        subnet_node_id: int,
        hypertensor: LocalMockHypertensor | Hypertensor,
        schema_id: str,
        *,
        namespace: str,
        multiaddr: str,
        dag_topic: str | None = None,
        telemetry: Telemetry | None = None,
        termination_event: trio.Event | None = None,
        publish_interval_seconds: float = 20.0,
        log_level: int = logging.DEBUG,
    ) -> None:
        super().__init__(
            dag_system=dag_system,
            namespace=namespace,
            schema_id=schema_id,
            snapshot_db_key=dag_topic,
            telemetry=telemetry,
            log_level=log_level,
        )
        if publish_interval_seconds <= 0:
            raise ValueError("PeerStateDagPublisher publish_interval_seconds must be greater than zero")
        self.state = start_state
        self.role = start_role
        self.subnet_id = subnet_id
        self.subnet_node_id = subnet_node_id
        self.hypertensor = hypertensor
        self.multiaddr = multiaddr
        self.termination_event = termination_event if termination_event is not None else dag_system.termination_event
        self.publish_interval_seconds = publish_interval_seconds

    async def run(self) -> None:
        """Continuously publish the current peer state until shutdown or cancellation."""
        while not self.termination_event.is_set():
            await self.publish()
            await trio.sleep(self.publish_interval_seconds)

    def build_payload(self) -> PeerStateData:
        """Build the current local peer state for periodic/template publishing."""
        current_epoch = self.hypertensor.get_subnet_epoch_data(self.hypertensor.get_subnet_slot(self.subnet_id)).epoch
        return PeerStateData(
            uid=secrets.token_hex(16),
            epoch=current_epoch,
            subnet_id=self.subnet_id,
            subnet_node_id=self.subnet_node_id,
            state=self.state,
            role=self.role,
            multiaddr=self.multiaddr,
        )

    async def build_metadata(self, payload: PeerStateData) -> dict[str, Any]:
        """Store dynamic fields in metadata too, so lightweight indexes remain useful."""
        return payload.to_metadata()

    async def after_publish(self, payload: PeerStateData, result: DagPublishResult) -> None:
        """Emit peer-state telemetry after the template stores the latest-node cache."""
        await super().after_publish(payload, result)

    def dag_payload_from_data(self, data: PeerStateData) -> dict[str, Any]:
        """Convert app-level peer state into the canonical DAG body payload."""
        return {
            "peer_id": self.local_peer_id,
            **data.to_metadata(),
        }

    def data_from_dag_payload(self, payload: Any) -> PeerStateData:
        """Convert a stored DAG body payload back into app-level peer state."""
        return PeerStateData.from_metadata(payload)

    async def publish_peer_state(
        self,
        data: PeerStateData,
        *,
        created_at_ms: int | None = None,
    ) -> DagPublishResult | None:
        """Publish caller-supplied peer state from external application logic."""
        return await self.publish_payload(
            data,
            metadata=data.to_metadata(),
            created_at_ms=created_at_ms,
        )

    async def trigger_publish_payload(
        self,
        data: PeerStateData,
        *,
        created_at_ms: int | None = None,
    ) -> DagPublishResult | None:
        """Backward-compatible alias for publishing caller-supplied peer state."""
        return await self.publish_peer_state(data, created_at_ms=created_at_ms)

```

`run`

In this example, the peer publishes a peer state payload once every 20 seconds to the DAG.

`build_payload`

Build the payload to be published to the DAG


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.hypertensor.org/copy-of-subnet-template/merkle-directed-acyclic-graphs-dags.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
