Source code for bibt.gcp.pubsub.methods

import base64
import logging
from datetime import datetime
from datetime import timezone

from cloudevents.http import CloudEvent
from dateutil.parser import parse

_LOGGER = logging.getLogger(__name__)


[docs] def process_event( event: CloudEvent, timeout_secs=600, decode_bytes=True, ): """Check timestamp of triggering CloudEvent; catches infinite retry loops on 'retry on fail' cloud functions. Its good practice to always call this function first in a Cloud Function. Additionally, **be sure to wrap the call to this function in a try/except block where the except block returns normally.** This ensures that an exception raised here does not result in an infinite rety loop. **If (and only if)** the event has a payload, it will be returned. Otherwise, this function returns ``None``. .. code-block:: python import json from bibt.gcp.pubsub import process_event from cloudevents.http import CloudEvent import functions_framework @functions_framework.event def main(event: CloudEvent): try: payload = process_event(event) if not payload: raise IOError('No payload in triggering pubsub!') payload = json.loads(payload) except Exception as e: _LOGGER.critical( f'Exception while processing trigger: {type(e).__name__}: {e}' ) return :type context: :class:`google.cloud.functions.Context` :param context: the triggering pubsub's context. :type event: :py:class:`dict` :param event: (Optional) the triggering pubsub's event. defaults to :py:class:`None`. :type timeout_secs: :py:class:`int` :param timeout_secs: (Optional) the number of seconds to consider as the timeout threshold from the original trigger time. Defaults to 1800. :type decode_bytes: :py:class:`bool` :param decode_bytes: (Optional) whether to attempt to decode the bytes in the event payload as "utf-8" or simply return the bytes. :rtype: :py:class:`str` OR :py:class:`bytes` OR :py:class:`None` :returns: the pubsub payload, if present. """ _LOGGER.info(f"Processing CloudEvent: {event.get('id')}") utctime = datetime.now(timezone.utc) eventtime = parse(event.get("time")) _LOGGER.debug(f"CloudEvent timestamp: [{eventtime}]") lapsed = utctime - eventtime lapsed = lapsed.total_seconds() _LOGGER.info(f"Lapsed time since triggering event: {lapsed:.5f} seconds") if lapsed > timeout_secs: raise TimeoutError( f"Threshold of {timeout_secs} seconds exceeded by " f"{lapsed-timeout_secs} seconds. Exiting." ) if event.data and "message" in event.data and "data" in event.data["message"]: _LOGGER.debug("Payload found, extracting & returning.") payload = base64.b64decode(event.data["message"]["data"]) if decode_bytes: _LOGGER.debug("Decoding as UTF-8 and returning a string...") return payload.decode("utf-8") _LOGGER.debug("Returning as raw bytes...") return payload return None
[docs] def process_trigger( context, event=None, timeout_secs=600, decode_bytes=True, ): """Check timestamp of triggering event; catches infinite retry loops on 'retry on fail' cloud functions. Its good practice to always call this function first in a Cloud Function. Additionally, **be sure to wrap the call to this function in a try/except block where the except block returns normally.** This ensures that an exception raised here does not result in an infinite rety loop. **If (and only if)** the triggering pubsub's event is also passed **and has a payload**, it will be returned. Otherwise, this function returns ``None``. .. code-block:: python import json from bibt.gcp.pubsub import process_trigger def main(event, context): try: payload = process_trigger(context, event=event) if not payload: raise IOError('No payload in triggering pubsub!') payload = json.loads(payload) except Exception as e: _LOGGER.critical( f'Exception while processing trigger: {type(e).__name__}: {e}' ) return :type context: :class:`google.cloud.functions.Context` :param context: the triggering pubsub's context. :type event: :py:class:`dict` :param event: (Optional) the triggering pubsub's event. defaults to :py:class:`None`. :type timeout_secs: :py:class:`int` :param timeout_secs: (Optional) the number of seconds to consider as the timeout threshold from the original trigger time. Defaults to 1800. :type decode_bytes: :py:class:`bool` :param decode_bytes: (Optional) whether to attempt to decode the bytes in the event payload as "utf-8" or simply return the bytes. :rtype: :py:class:`str` OR :py:class:`bytes` OR :py:class:`None` :returns: the pubsub payload, if present. """ _LOGGER.info(f"Processing PubSub: {context.event_id}") utctime = datetime.now(timezone.utc) eventtime = parse(context.timestamp) _LOGGER.debug(f"PubSub timestamp: [{eventtime}]") lapsed = utctime - eventtime lapsed = datetime.now(timezone.utc) - parse(context.timestamp) lapsed = lapsed.total_seconds() _LOGGER.info(f"Lapsed time since triggering event: {lapsed:.5f} seconds") if lapsed > timeout_secs: raise TimeoutError( f"Threshold of {timeout_secs} seconds exceeded by " f"{lapsed-timeout_secs} seconds. Exiting." ) if event is not None and "data" in event: _LOGGER.debug("Payload found, extracting & returning.") payload = base64.b64decode(event["data"]) if decode_bytes: _LOGGER.debug("Decoding as UTF-8 and returning a string...") return payload.decode("utf-8") _LOGGER.debug("Returning as raw bytes...") return payload return None