Server
Server Base
Application
Application Base
class ApplicationBase:
def __init__(self, *args: object, **kwargs: object) -> None:
self.args = args
self.kwargs = kwargs
async def setup(self, context: P2PNetworkContext) -> None:
"""Called once after host, DHT, and optional pubsub are ready."""
async def start_application(self, context: P2PNetworkContext) -> None:
"""Called after bootstrap and optional connection maintenance startup."""
async def run(self, context: P2PNetworkContext) -> None:
"""Block until the app or server should stop."""
await context.termination_event.wait()
async def cleanup(self, context: P2PNetworkContext) -> None:
"""Called once while the network context is still available."""Application
Server
Last updated