Source code for bibt.gcp.pubsub.classes
import json
import logging
import google.auth.transport.requests
from google.cloud import pubsub_v1
_LOGGER = logging.getLogger(__name__)
[docs]
class Client:
"""A Client may be used to interact with the GCP PubSub API using the
same session / credentials.
:type credentials: ``google.oauth2.credentials.Credentials``
:param credentials: the credentials object to use when making the API call, if
not to use the inferred gcloud account.
"""
def __init__(self, credentials=None):
self._client = pubsub_v1.PublisherClient(credentials=credentials)
def _ensure_valid_client(self):
try:
credentials = self._client._credentials
except AttributeError:
try:
credentials = self._client._transport._credentials
except AttributeError:
_LOGGER.error("Could not verify credentials in client.")
return
if not credentials.valid or not credentials.expiry:
_LOGGER.info(
"Refreshing client credentials, token expired: "
f"[{str(credentials.expiry)}]"
)
request = google.auth.transport.requests.Request()
credentials.refresh(request=request)
_LOGGER.info(f"New expiration: [{str(credentials.expiry)}]")
else:
_LOGGER.debug(
f"Token is valid: [{credentials.valid}] "
f"expires: [{str(credentials.expiry)}]"
)
return
[docs]
def send_pubsub(self, topic_uri, payload):
"""
Publishes a pubsub message to the specified topic. Executing account
must have pubsub publisher permissions on the topic or in the project.
.. code:: python
from bibt.gcp.pubsub import Client
def main(event, context):
client = Client()
topic_uri = (
f'projects/{os.environ["GOOGLE_PROJECT"]}'
f'/topics/{os.environ["NEXT_TOPIC"]}'
)
client.send_pubsub(
topic_uri=topic_uri,
payload={'favorite_color': 'blue'}
)
:type topic_uri: :py:class:`str`
:param topic_uri: the topic on which to publish.
topic uri format: ``'projects/{project_name}/topics/{topic_name}'``
:type payload: :py:class:`dict` :py:class:`list` OR :py:class:`str`
:param payload: the pubsub payload. can be a ``dict``, ``list``, or ``str``.
will be converted to bytes before sending.
"""
_LOGGER.info(f"Sending pubsub to topic: [{topic_uri}]")
_LOGGER.debug(f"Payload: {payload}")
# Convert to Bytes then publish message.
if isinstance(payload, dict) or isinstance(payload, list):
payload = json.dumps(payload, default=str)
payload_bytes = payload.encode("utf-8")
self._ensure_valid_client()
future = self._client.publish(topic=topic_uri, data=payload_bytes)
msg_id = future.result()
_LOGGER.info(f"PubSub sent successfully, pubsub message ID: [{msg_id}]")
return msg_id