PubSub

See the official gcp-pubsub API documentation here: link.

class bibt.gcp.pubsub.classes.Client(credentials=None)[source]
A Client may be used to interact with the GCP PubSub API using the

same session / credentials.

Parameters:

credentials (google.oauth2.credentials.Credentials) – the credentials object to use when making the API call, if not to use the inferred gcloud account.

send_pubsub(topic_uri, payload)[source]

Publishes a pubsub message to the specified topic. Executing account must have pubsub publisher permissions on the topic or in the project.

from bibt.gcp.pubsub import Client
def main(event, context):
    client = Client()
    topic_uri = (
        f'projects/{os.environ["GOOGLE_PROJECT"]}'
        f'/topics/{os.environ["NEXT_TOPIC"]}'
    )
    client.send_pubsub(
        topic_uri=topic_uri,
        payload={'favorite_color': 'blue'}
    )
Parameters:
  • topic_uri (str) – the topic on which to publish. topic uri format: 'projects/{project_name}/topics/{topic_name}'

  • payload (dict list OR str) – the pubsub payload. can be a dict, list, or str. will be converted to bytes before sending.

bibt.gcp.pubsub.methods.process_event(event: CloudEvent, timeout_secs=600, decode_bytes=True)[source]
Check timestamp of triggering CloudEvent; catches infinite retry

loops on ‘retry on fail’ cloud functions. Its good practice to always call this function first in a Cloud Function. Additionally, be sure to wrap the call to this function in a try/except block where the except block returns normally. This ensures that an exception raised here does not result in an infinite rety loop.

If (and only if) the event has a payload, it will be returned. Otherwise, this function returns None.

import json
from bibt.gcp.pubsub import process_event
from cloudevents.http import CloudEvent
import functions_framework

@functions_framework.event
def main(event: CloudEvent):
    try:
        payload = process_event(event)
        if not payload:
            raise IOError('No payload in triggering pubsub!')
        payload = json.loads(payload)
    except Exception as e:
        _LOGGER.critical(
            f'Exception while processing trigger: {type(e).__name__}: {e}'
        )
        return
Parameters:
  • context (google.cloud.functions.Context) – the triggering pubsub’s context.

  • event (dict) – (Optional) the triggering pubsub’s event. defaults to None.

  • timeout_secs (int) – (Optional) the number of seconds to consider as the timeout threshold from the original trigger time. Defaults to 1800.

  • decode_bytes (bool) – (Optional) whether to attempt to decode the bytes in the event payload as “utf-8” or simply return the bytes.

Return type:

str OR bytes OR None

Returns:

the pubsub payload, if present.

bibt.gcp.pubsub.methods.process_trigger(context, event=None, timeout_secs=600, decode_bytes=True)[source]
Check timestamp of triggering event; catches infinite retry

loops on ‘retry on fail’ cloud functions. Its good practice to always call this function first in a Cloud Function. Additionally, be sure to wrap the call to this function in a try/except block where the except block returns normally. This ensures that an exception raised here does not result in an infinite rety loop.

If (and only if) the triggering pubsub’s event is also passed and has a payload, it will be returned. Otherwise, this function returns None.

import json
from bibt.gcp.pubsub import process_trigger
def main(event, context):
    try:
        payload = process_trigger(context, event=event)
        if not payload:
            raise IOError('No payload in triggering pubsub!')
        payload = json.loads(payload)
    except Exception as e:
        _LOGGER.critical(
            f'Exception while processing trigger: {type(e).__name__}: {e}'
        )
        return
Parameters:
  • context (google.cloud.functions.Context) – the triggering pubsub’s context.

  • event (dict) – (Optional) the triggering pubsub’s event. defaults to None.

  • timeout_secs (int) – (Optional) the number of seconds to consider as the timeout threshold from the original trigger time. Defaults to 1800.

  • decode_bytes (bool) – (Optional) whether to attempt to decode the bytes in the event payload as “utf-8” or simply return the bytes.

Return type:

str OR bytes OR None

Returns:

the pubsub payload, if present.