Source code for octomachinery.routing.routers

"""Octomachinery event dispatchers collection."""

import asyncio
from contextlib import suppress
from typing import Any, Iterator, Set, Union

from gidgethub.routing import AsyncCallback
from gidgethub.routing import Router as _GidgetHubRouter

from ..github.models.events import (
    GidgetHubWebhookEvent, GitHubEvent, _GidgetHubEvent,
)
from ..utils.asynctools import aio_gather
from .abc import OctomachineryRouterBase


__all__ = (
    'GidgetHubRouterBase',
    'ConcurrentRouter',
    'NonBlockingConcurrentRouter',
)


[docs]class GidgetHubRouterBase(_GidgetHubRouter, OctomachineryRouterBase): """GidgetHub-based router exposing callback matching separately."""
[docs] def emit_routes_for( self, event_name: str, event_payload: Any, ) -> Iterator[AsyncCallback]: """Emit callbacks that match given event and payload. :param str event_name: name of the GitHub event :param str event_payload: details of the GitHub event :yields: coroutine event handlers """ with suppress(KeyError): yield from self._shallow_routes[event_name] try: deep_routes = self._deep_routes[event_name] except KeyError: return for payload_key, payload_values in deep_routes.items(): if payload_key not in event_payload: continue event_value = event_payload[payload_key] if event_value not in payload_values: continue yield from payload_values[event_value]
[docs] async def dispatch( self, event: Union[GidgetHubWebhookEvent, _GidgetHubEvent], *args: Any, **kwargs: Any, ) -> None: """Invoke handler tasks for the given event sequentially.""" if isinstance(event, _GidgetHubEvent): event = GidgetHubWebhookEvent.from_gidgethub(event) callback_gen = self.emit_routes_for(event.name, event.payload) callback_coros = (cb(event, *args, **kwargs) for cb in callback_gen) for coro in callback_coros: await coro
[docs]class ConcurrentRouter(GidgetHubRouterBase): """GitHub event router invoking event handlers simultaneously."""
[docs] async def dispatch( self, event: GitHubEvent, *args: Any, **kwargs: Any, ) -> None: """Invoke coroutine callbacks for the given event together.""" callback_gen = self.emit_routes_for(event.name, event.payload) callback_coros = (cb(event, *args, **kwargs) for cb in callback_gen) await aio_gather(*callback_coros)
[docs]class NonBlockingConcurrentRouter(ConcurrentRouter): """Non-blocking GitHub event router scheduling handler tasks.""" def __init__(self, *args, **kwargs): """Initialize NonBlockingConcurrentRouter.""" super().__init__(*args, **kwargs) # NOTE: For some reason, mypy doesn't accept anything except Any here: self._event_handler_tasks: Set[Any] = set()
[docs] async def dispatch( self, event: GitHubEvent, *args: Any, **kwargs: Any, ) -> None: """Schedule coroutine callbacks for the given event together.""" callback_gen = self.emit_routes_for(event.name, event.payload) callback_coros = (cb(event, *args, **kwargs) for cb in callback_gen) handler_tasks = map(asyncio.create_task, callback_coros) self._event_handler_tasks.update(handler_tasks)