# Signature Validators

### SignatureValidator (RSA and Ed25519)

Introduces the notion of ***protected records*** whose key/subkey contains the substring "\[owner:ssh-ed25519 ...]" or "\[owner:ssh-rsa...]" with an Ed25519 or RSA public key of the owner.

If this validator is used, changes to such records must always be signed with the corresponding private key (so only the owner can change them).

This ensures no other peers can update the records of other peers.

{% hint style="info" %}
**Signature Validators should always have top priority (see** [**RecordValidatorBase**](/subnet-template/dht-records/record-validator.md#recordvalidatorbase)**).**
{% endhint %}

```python
class SignatureValidator(RecordValidatorBase):
    """
    Introduces a notion of *protected records* whose key/subkey contains substring

    `Ed25519`
    "[owner:ssh-ed25519 ...]" with an Ed25519 public key of the owner.

    `RSA`
    "[owner:ssh-rsa ...]" with an RSA public key of the owner.

    If this validator is used, changes to such records always must be signed with
    the corresponding private key (so only the owner can change them).
    """

    PUBLIC_KEY_FORMAT = b"[owner:_key_]"
    SIGNATURE_FORMAT = b"[signature:_value_]"

    PUBLIC_KEY_REGEX = re.escape(PUBLIC_KEY_FORMAT).replace(b"_key_", rb"(.+?)")
    _PUBLIC_KEY_RE = re.compile(PUBLIC_KEY_REGEX)
    _SIGNATURE_RE = re.compile(re.escape(SIGNATURE_FORMAT).replace(b"_value_", rb"(.+?)"))

    _cached_private_key = None

    def __init__(self, private_key: RSAPrivateKey | Ed25519PrivateKey):
        self._private_key = private_key
        self._key_type = KeyType.RSA if isinstance(private_key, RSAPrivateKey) else KeyType.Ed25519
        serialized_public_key = private_key.get_public_key().to_bytes()
        self._local_public_key = self.PUBLIC_KEY_FORMAT.replace(b"_key_", serialized_public_key)

    @property
    def local_public_key(self) -> bytes:
        return self._local_public_key

    def validate(self, record: DHTRecord, type: DHTRecordRequestType) -> bool:
        public_keys = self._PUBLIC_KEY_RE.findall(record.key)
        if record.subkey is not None:
            public_keys += self._PUBLIC_KEY_RE.findall(record.subkey)
        if not public_keys:
            return True  # The record is not protected with a public key

        if len(set(public_keys)) > 1:
            logger.debug(f"Key and subkey can't contain different public keys in {record}")
            return False

        public_key = load_public_key_from_bytes(public_keys[0])

        signatures = self._SIGNATURE_RE.findall(record.value)
        if len(signatures) != 1:
            logger.debug(f"Record should have exactly one signature in {record}")
            return False
        signature = signatures[0]

        stripped_record = dataclasses.replace(record, value=self.strip_value(record))
        if not public_key.verify(self._serialize_record(stripped_record), signature):
            logger.debug(f"Signature is invalid in {record}")
            return False
        return True

    def sign_value(self, record: DHTRecord) -> bytes:
        if self._local_public_key not in record.key and self._local_public_key not in record.subkey:
            return record.value

        signature = self._private_key.sign(self._serialize_record(record))
        return record.value + self.SIGNATURE_FORMAT.replace(b"_value_", signature)

    def strip_value(self, record: DHTRecord) -> bytes:
        return self._SIGNATURE_RE.sub(b"", record.value)

    def _serialize_record(self, record: DHTRecord) -> bytes:
        return MSGPackSerializer.dumps(dataclasses.astuple(record))

    @property
    def priority(self) -> int:
        # On validation, this validator must be executed before validators
        # that deserialize the record
        return 10

    def merge_with(self, other: RecordValidatorBase) -> bool:
        if not isinstance(other, SignatureValidator):
            return False

        # Ignore another SignatureValidator instance (it doesn't make sense to have several
        # instances of this class) and report successful merge
        return True

```


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.hypertensor.org/subnet-template/dht-records/record-validator/signature-validators.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
