octomachinery.routing.routers module#

Octomachinery event dispatchers collection.

class octomachinery.routing.routers.ConcurrentRouter(*other_routers: Router)[source]#

Bases: GidgetHubRouterBase

GitHub event router invoking event handlers simultaneously.

async dispatch(event: GitHubEvent, *args: Any, **kwargs: Any) None[source]#

Invoke coroutine callbacks for the given event together.

class octomachinery.routing.routers.GidgetHubRouterBase(*other_routers: Router)[source]#

Bases: Router, OctomachineryRouterBase

GidgetHub-based router exposing callback matching separately.

async dispatch(event: Union[GidgetHubWebhookEvent, Event], *args: Any, **kwargs: Any) None[source]#

Invoke handler tasks for the given event sequentially.

emit_routes_for(event_name: str, event_payload: Any) Iterator[Callable[[...], Awaitable[None]]][source]#

Emit callbacks that match given event and payload.

Parameters:
  • event_name (str) – name of the GitHub event

  • event_payload (str) – details of the GitHub event

Yields:

coroutine event handlers

class octomachinery.routing.routers.NonBlockingConcurrentRouter(*args, **kwargs)[source]#

Bases: ConcurrentRouter

Non-blocking GitHub event router scheduling handler tasks.

async dispatch(event: GitHubEvent, *args: Any, **kwargs: Any) None[source]#

Schedule coroutine callbacks for the given event together.