octomachinery.routing.routers module

Octomachinery event dispatchers collection.

class octomachinery.routing.routers.GidgetHubRouterBase(*other_routers: gidgethub.routing.Router)[source]

Bases: gidgethub.routing.Router, octomachinery.routing.abc.OctomachineryRouterBase

GidgetHub-based router exposing callback matching separately.

async dispatch(event: Union[octomachinery.github.models.events.GidgetHubWebhookEvent, gidgethub.sansio.Event], *args: Any, **kwargs: Any)None[source]

Invoke handler tasks for the given event sequentially.

emit_routes_for(event_name: str, event_payload: Any) → Iterator[Callable[[…], Awaitable[None]]][source]

Emit callbacks that match given event and payload.

Parameters
  • event_name (str) – name of the GitHub event

  • event_payload (str) – details of the GitHub event

Yields

coroutine event handlers

class octomachinery.routing.routers.ConcurrentRouter(*other_routers: gidgethub.routing.Router)[source]

Bases: octomachinery.routing.routers.GidgetHubRouterBase

GitHub event router invoking event handlers simultaneously.

async dispatch(event: octomachinery.github.models.events.GitHubEvent, *args: Any, **kwargs: Any)None[source]

Invoke coroutine callbacks for the given event together.

class octomachinery.routing.routers.NonBlockingConcurrentRouter(*args, **kwargs)[source]

Bases: octomachinery.routing.routers.ConcurrentRouter

Non-blocking GitHub event router scheduling handler tasks.

async dispatch(event: octomachinery.github.models.events.GitHubEvent, *args: Any, **kwargs: Any)None[source]

Schedule coroutine callbacks for the given event together.