Source code for octomachinery.routing.default_router
"""Default GitHub event dispatcher."""
from functools import wraps
from .routers import ConcurrentRouter
__all__ = (
'dispatch_event',
'process_event',
'process_event_actions',
'WEBHOOK_EVENTS_ROUTER',
)
WEBHOOK_EVENTS_ROUTER = ConcurrentRouter()
"""An event dispatcher for webhooks."""
dispatch_event = WEBHOOK_EVENTS_ROUTER.dispatch # pylint: disable=invalid-name
process_event = WEBHOOK_EVENTS_ROUTER.register # pylint: disable=invalid-name
[docs]def process_event_actions(event_name, actions=None):
"""Subscribe to multiple events."""
if actions is None:
actions = []
def decorator(original_function):
def wrapper(*args, **kwargs):
return original_function(*args, **kwargs)
if not actions:
wrapper = process_event(event_name)(wrapper)
for action in actions:
wrapper = process_event(event_name, action=action)(wrapper)
return wraps(original_function)(wrapper)
return decorator