Source code for octomachinery.routing.abc

"""Octomachinery router base interface definitions."""

from abc import ABCMeta, abstractmethod
from typing import TYPE_CHECKING, Any, Iterator

from gidgethub.routing import AsyncCallback


if TYPE_CHECKING:
    from ..github.models.events import GitHubEvent


__all__ = ('OctomachineryRouterBase',)


[docs]class OctomachineryRouterBase(metaclass=ABCMeta): """Octomachinery router base.""" # pylint: disable=unused-argument
[docs] @abstractmethod def emit_routes_for( self, event_name: str, event_payload: Any, ) -> Iterator[AsyncCallback]: """Emit callbacks that match given event and payload. :param str event_name: name of the GitHub event :param str event_payload: details of the GitHub event :yields: coroutine event handlers """
# pylint: disable=unused-argument
[docs] @abstractmethod async def dispatch( self, event: 'GitHubEvent', *args: Any, **kwargs: Any, ) -> None: """Invoke coroutine handler tasks for the given event."""