PubSub¶
See the official gcp-pubsub API documentation here: link.
- class bibt.gcp.pubsub.classes.Client(credentials=None)[source]¶
- A Client may be used to interact with the GCP PubSub API using the
same session / credentials.
- Parameters:
credentials (
google.oauth2.credentials.Credentials) – the credentials object to use when making the API call, if not to use the inferred gcloud account.
- send_pubsub(topic_uri, payload)[source]¶
Publishes a pubsub message to the specified topic. Executing account must have pubsub publisher permissions on the topic or in the project.
from bibt.gcp.pubsub import Client def main(event, context): client = Client() topic_uri = ( f'projects/{os.environ["GOOGLE_PROJECT"]}' f'/topics/{os.environ["NEXT_TOPIC"]}' ) client.send_pubsub( topic_uri=topic_uri, payload={'favorite_color': 'blue'} )
- bibt.gcp.pubsub.methods.process_event(event: CloudEvent, timeout_secs=600, decode_bytes=True)[source]¶
- Check timestamp of triggering CloudEvent; catches infinite retry
loops on ‘retry on fail’ cloud functions. Its good practice to always call this function first in a Cloud Function. Additionally, be sure to wrap the call to this function in a try/except block where the except block returns normally. This ensures that an exception raised here does not result in an infinite rety loop.
If (and only if) the event has a payload, it will be returned. Otherwise, this function returns
None.
import json from bibt.gcp.pubsub import process_event from cloudevents.http import CloudEvent import functions_framework @functions_framework.event def main(event: CloudEvent): try: payload = process_event(event) if not payload: raise IOError('No payload in triggering pubsub!') payload = json.loads(payload) except Exception as e: _LOGGER.critical( f'Exception while processing trigger: {type(e).__name__}: {e}' ) return
- Parameters:
context (
google.cloud.functions.Context) – the triggering pubsub’s context.event (
dict) – (Optional) the triggering pubsub’s event. defaults toNone.timeout_secs (
int) – (Optional) the number of seconds to consider as the timeout threshold from the original trigger time. Defaults to 1800.decode_bytes (
bool) – (Optional) whether to attempt to decode the bytes in the event payload as “utf-8” or simply return the bytes.
- Return type:
- Returns:
the pubsub payload, if present.
- bibt.gcp.pubsub.methods.process_trigger(context, event=None, timeout_secs=600, decode_bytes=True)[source]¶
- Check timestamp of triggering event; catches infinite retry
loops on ‘retry on fail’ cloud functions. Its good practice to always call this function first in a Cloud Function. Additionally, be sure to wrap the call to this function in a try/except block where the except block returns normally. This ensures that an exception raised here does not result in an infinite rety loop.
If (and only if) the triggering pubsub’s event is also passed and has a payload, it will be returned. Otherwise, this function returns
None.
import json from bibt.gcp.pubsub import process_trigger def main(event, context): try: payload = process_trigger(context, event=event) if not payload: raise IOError('No payload in triggering pubsub!') payload = json.loads(payload) except Exception as e: _LOGGER.critical( f'Exception while processing trigger: {type(e).__name__}: {e}' ) return
- Parameters:
context (
google.cloud.functions.Context) – the triggering pubsub’s context.event (
dict) – (Optional) the triggering pubsub’s event. defaults toNone.timeout_secs (
int) – (Optional) the number of seconds to consider as the timeout threshold from the original trigger time. Defaults to 1800.decode_bytes (
bool) – (Optional) whether to attempt to decode the bytes in the event payload as “utf-8” or simply return the bytes.
- Return type:
- Returns:
the pubsub payload, if present.