Git Product home page Git Product logo

psq's Introduction

psq - Cloud Pub/Sub Task Queue for Python.

Build Status Coverage Status PyPI Version

Note

this is not an official Google product, experimental or otherwise and is provided without support. It is intended as a sample library for demonstrating a set of use cases for Google Cloud Pub/Sub. The official Pub/Sub client library should be used for production applications.

psq is an example Python implementation of a simple distributed task queue using Google Cloud Pub/Sub.

psq requires minimal configuration and relies on Cloud Pub/Sub to provide scalable and reliable messaging.

psq is intentionally similar to rq and simpleq, and takes some inspiration from celery and this blog post.

Installation

Install via pip:

pip install psq

Prerequisites

  • A project on the Google Developers Console.
  • The Google Cloud SDK installed locally.
  • You will need the Cloud Pub/Sub API enabled on your project. The link will walk you through enabling the API.
  • You will need to run gcloud auth before running these examples so that authentication to Google Cloud Platform services is handled transparently.

Usage

First, create a task:

def adder(a, b):
    return a + b

Then, create a pubsub client and a queue:

from google.cloud import pubsub_v1
import psq


project = 'your-project-id'

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()

q = psq.Queue(publisher, subscriber, project)

Now you can enqueue tasks:

from tasks import adder

q.enqueue(adder)

In order to get task results, you have to configure storage:

from google.cloud import pubsub_v1
from google.cloud import datastore
import psq


project = 'your-project-id'

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
ds_client = datastore.Client()

q = psq.Queue(
    publisher, subscriber, project,
    storage=psq.DatastoreStorage(ds_client))

With storage configured, you can get the result of a task:

r = q.enqueue(adder, 5, 6)
r.result() # -> 11

You can also define multiple queues:

fast = psq.Queue(publisher, subscriber, project, 'fast')
slow = psq.Queue(publisher, subscriber, project, 'slow')

Things to note

Because psq is largely similar to rq, similar rules around tasks apply. You can put any Python function call on a queue, provided:

  • The function is importable by the worker. This means the __module__ that the function lives in must be importable. Notably, you can't enqueue functions that are declared in the main module - such as tasks defined in a file that is run directly with python or via the interactive interpreter.
  • The function can be a string, but it must be the absolutely importable path to a function that the worker can import. Otherwise, the task will fail.
  • The worker and the applications queuing tasks must share exactly the same source code.
  • The function can't depend on global context such as global variables, current_request, etc. Pass any needed context into the worker at queue time.

Delivery guarantees

Pub/sub guarantees your tasks will be delivered to the workers, but psq doesn't presently guarantee that a task completes execution or exactly-once semantics, though it does allow you to provide your own mechanisms for this. This is similar to Celery's default configuration.

Task completion guarantees can be provided via late ack support. Late ack is possible with Cloud Pub/sub, but it currently not implemented in this library. See CONTRIBUTING.md.

There are many approaches for exactly-once semantics, such as distributed locks. This is possible in systems such as zookeeper and redis.

Running a worker

Execute psqworker in the same directory where you tasks are defined:

psqworker.py config.q

psqworker only operates on one queue at a time. If you want a server to listen to multiple queues, use something like supervisord to run multiple psqworker processes.

Broadcast queues

A normal queue will send a single task to a single worker, spreading your tasks over all workers listening to the same queue. There are also broadcast queues, which will deliver a copy of the task to every worker. This is useful in situations where you want every worker to execute the same task, such as installing or upgrading software on every server.

broadcast_q = psq.BroadcastQueue(publisher, subscriber, project)

def restart_apache_task():
    call(["apachectl", "restart"])

broadcast_q.enqueue(restart_apache_task)

Broadcast queues provide an implementation of the solution described in Reliable Task Scheduling on Google Compute Engine.

Note: broadcast queues do not currently support any form of storage and do not support return values.

Retries

Raising psq.Retry in your task will cause it to be retried.

from psq import Retry

def retry_if_fail(self):
    try:
        r = requests.get('http://some.flaky.service.com')
    except Exception as e:
        logging.error(e)
        raise Retry()

Flask & other contexts

You can bind an extra context manager to the queue.

app = Flask(__name__)

q = psq.Queue(extra_context=app.app_context)

This will ensure that the context is available in your tasks, which is useful for things such as database connections, etc.:

from flask import current_app

def flasky_task():
    backend = current_app.config['BACKEND']

Bypassing workers for testing

During unit tests you most certainly don't want to spin up workers, but instead execute the enqueued functions immediately and synchronously. To do this, pass asynchronous=False to the Queue's constructor (default is True). Also, you don't have to provide a publisher, subscriber or project arguments in this case, just pass None for all them to the queue.

q = psq.Queue(None, None, project=None, asynchronous=False)
r = q.enqueue(adder, 1, 2) # Will be run immediately

Ideas for improvements

  • some sort of storage solution for broadcast queues.
  • Memcache/redis value store.
  • @task decorator that adds a delay/defer function.
  • Task chaining / groups / chords.
  • Late ack.
  • Gevent worker.
  • batch support for queueing.

Contributing changes

Licensing

psq's People

Contributors

busunkim96 avatar dgorelik avatar djiit avatar javierlopezmunoz avatar meredithslota avatar nicoddemus avatar nuschk avatar renovate-bot avatar seawolf42 avatar theacodes avatar tra0x avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

psq's Issues

Dependency Dashboard

This issue lists Renovate updates and detected dependencies. Read the Dependency Dashboard docs to learn more.

Ignored or Blocked

These are blocked by an existing closed PR and will not be recreated unless you click a checkbox below.

Detected dependencies

pip_setup
setup.py
  • google-cloud-pubsub >=0.35.2
  • google-cloud-datastore >=1.0.0,<=2.0.0
  • werkzeug >=0.10.0,<1.0.0
  • click >=4.0
  • colorlog >=2.6.0,<3.0.0

  • Check this box to trigger a request for Renovate to run again on this repository

Worker receive the task but not execute

I'm using psq for running task periodically which works fine. But every few weeks (usually 2 to 3) it receive the task but does not do anything and once it start doing it no further task gets executed. I have checked the logs no error message is there and health check is Ok. For now my only solution is to restart the instance which isn't a very good workaround.

Logs screenshot (notice psq.worker received the task):
screen shot 2017-09-23 at 12 43 12

requirements.txt:
Flask==0.12.2
google-cloud==0.27.0
gunicorn==19.7.1
oauth2client==4.0.0
six==1.10.0
requests==2.18.3
honcho==1.0.1
psq==0.5.0
google-api-python-client==1.6.2

beautifulsoup4>=4.5.1
arrow==0.10.0
feedparser==5.2.1

Any advice is highly appreciable.
Thanks

Wondering when psq-master (with most recent commit) will be available for pip install on PyPi

pip install is grabbing the May 30, 2018 release of psq. I'm wondering when the latest commit (Latest commit 3c51307 on Sep 17, 2018) will be available via pip install. I just did a pip install of psq and I get the version that is not compatible with python 3.7. It still has "async" instead of "asynchronous" in it. Apologies if this is a dumb question. I honestly don't know how new releases are made available on PyPi (I'm pretty new to all of this). Thanks for your help.

DEADLINE_EXCEEDED on empty queue

Hi,

It seems that if there are no tasks in the queue psqworker crashes after approximately 5 minutes with

google.gax.errors.GaxError: GaxError(RPC failed, caused by <_Rendezvous of RPC that terminated with (StatusCode.DEADLINE_EXCEEDED, Deadline Exceeded)>)

it seems that this is caused by googleapis/google-cloud-python#2574

As far as I can tell there's no way to catch this exception in user supplied code so it renders psqworker unusable in a situation where there may be an empty queue.

Here's the stack trace in case it's helpful:

Traceback (most recent call last): File "/usr/local/bin/psqworker", line 11, in <module> sys.exit(main()) File "/usr/local/lib/python2.7/dist-packages/click/core.py", line 664, in __call__ return self.main(*args, **kwargs) File "/usr/local/lib/python2.7/dist-packages/click/core.py", line 644, in main rv = self.invoke(ctx) File "/usr/local/lib/python2.7/dist-packages/click/core.py", line 837, in invoke return ctx.invoke(self.callback, **ctx.params) File "/usr/local/lib/python2.7/dist-packages/click/core.py", line 464, in invoke return callback(*args, **kwargs) File "/usr/local/lib/python2.7/dist-packages/psq/psqworker.py", line 116, in main worker.listen() File "/usr/local/lib/python2.7/dist-packages/psq/worker.py", line 94, in listen super(MultiprocessWorker, self).listen() File "/usr/local/lib/python2.7/dist-packages/psq/worker.py", line 53, in listen tasks = self._safe_dequeue() File "/usr/local/lib/python2.7/dist-packages/psq/worker.py", line 46, in _safe_dequeue return inner() File "/usr/local/lib/python2.7/dist-packages/retrying.py", line 49, in wrapped_f return Retrying(*dargs, **dkw).call(f, *args, **kw) File "/usr/local/lib/python2.7/dist-packages/retrying.py", line 212, in call raise attempt.get() File "/usr/local/lib/python2.7/dist-packages/retrying.py", line 247, in get six.reraise(self.value[0], self.value[1], self.value[2]) File "/usr/local/lib/python2.7/dist-packages/retrying.py", line 200, in call attempt = Attempt(fn(*args, **kwargs), attempt_number, False) File "/usr/local/lib/python2.7/dist-packages/psq/worker.py", line 45, in inner return self.queue.dequeue(max=self.tasks_per_poll, block=True) File "/usr/local/lib/python2.7/dist-packages/psq/queue.py", line 104, in dequeue return_immediately=not block, max_messages=max) File "/usr/local/lib/python2.7/dist-packages/google/cloud/pubsub/subscription.py", line 328, in pull self.full_name, return_immediately, max_messages) File "/usr/local/lib/python2.7/dist-packages/google/cloud/pubsub/_gax.py", line 414, in subscription_pull return_immediately=return_immediately) File "/usr/local/lib/python2.7/dist-packages/google/cloud/gapic/pubsub/v1/subscriber_api.py", line 545, in pull return self._pull(request, options) File "/usr/local/lib/python2.7/dist-packages/google/gax/api_callable.py", line 481, in inner return api_caller(api_call, this_settings, request) File "/usr/local/lib/python2.7/dist-packages/google/gax/api_callable.py", line 469, in base_caller return api_call(*args) File "/usr/local/lib/python2.7/dist-packages/google/gax/api_callable.py", line 434, in inner errors.create_error('RPC failed', cause=exception)) File "/usr/local/lib/python2.7/dist-packages/google/gax/api_callable.py", line 430, in inner return a_func(*args, **kwargs) File "/usr/local/lib/python2.7/dist-packages/google/gax/api_callable.py", line 64, in inner return a_func(*updated_args, **kwargs) File "/usr/local/lib/python2.7/dist-packages/grpc/_channel.py", line 481, in __call__ return _end_unary_response_blocking(state, False, deadline) File "/usr/local/lib/python2.7/dist-packages/grpc/_channel.py", line 432, in _end_unary_response_blocking raise _Rendezvous(state, None, None, deadline) google.gax.errors.GaxError: GaxError(RPC failed, caused by <_Rendezvous of RPC that terminated with (StatusCode.DEADLINE_EXCEEDED, Deadline Exceeded)>) E1208 20:06:37.135646157 14219 network_status_tracker.c:48] Memory leaked as all network endpoints were not shut down

Ready for production?

We're currently looking into migrating to gke from a managed vserver, where we're running celery on rabbitmq and redis. We don't have very elaborate requirements for delayed task execution: the usual delayed tasks, a few cron jobs and a couple of long running background tasks.

PSQ looks very interesting as thin layer above pub/sub. My question is: is it considered ready for production? Is anyone using it already?

Also related: What's the best way to use it in CI for testing? In RQ for example, we can declare a queue as synchronous with

Queue('low', async=False)

and thus don't need to handle any workers, which seems handy (I haven't used it, though). Or should I just use the gcloud emulator, which should work if I'm not mistaken, but brings its fair set of complications to testing, as we'd need to spin up some workers.

Thanks!

Make psq Python3.7 compatible

 File "task.py", line 4, in <module>
    from psq import Queue
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/psq/__init__.py", line 24, in <module>
    from .broadcast_queue import BroadcastQueue
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/psq/broadcast_queue.py", line 20, in <module>
    from . import queue
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/psq/queue.py", line 37
    name='default', storage=None, extra_context=None, async=True):

async and await are now reserved keywords.
https://docs.python.org/3/whatsnew/3.7.html

Stackdriver logging not showing in Console with psq task

I have created the following task and called it from the main app, but I am not seeing this log in the Google Console logging.

from flask import current_app
from google.cloud import pubsub

import google.cloud.logging
import psq

def logging_task():
    logging_client = google.cloud.logging.Client(app.config['PROJECT_ID'])
    logger = logging_client.logger("some-log")

    logger.log_text("This is a logging test", severity='INFO')

My logs viewer is filled with the following,, and I am trying to filter this out by creating a logger:

18:15:24.000
�[0m�[33m22:15:24 worker.1 | �[0m172.18.0.3 - - [03/May/2017 22:15:24] "GET /_ah/health HTTP/1.1" 200 -

bypassing workers not working (master branch)

bypassing workers is not working as mentioned in the readme docs : https://github.com/GoogleCloudPlatform/psq#bypassing-workers-for-testing

I am getting this error:

20:14:14 worker.1 | Traceback (most recent call last):
20:14:14 worker.1 |   File "/Users/kushgoyal/.virtualenvs/myproject/bin/psqworker", line 11, in <module>
20:14:14 worker.1 |     load_entry_point('psq==0.5.0', 'console_scripts', 'psqworker')()
20:14:14 worker.1 |   File "/Users/kushgoyal/.virtualenvs/myproject/lib/python3.6/site-packages/click/core.py", line 664, in __call__
20:14:14 worker.1 |     return self.main(*args, **kwargs)
20:14:14 worker.1 |   File "/Users/kushgoyal/.virtualenvs/myproject/lib/python3.6/site-packages/click/core.py", line 644, in main
20:14:14 worker.1 |     rv = self.invoke(ctx)
20:14:14 worker.1 |   File "/Users/kushgoyal/.virtualenvs/myproject/lib/python3.6/site-packages/click/core.py", line 837, in invoke
20:14:14 worker.1 |     return ctx.invoke(self.callback, **ctx.params)
20:14:14 worker.1 |   File "/Users/kushgoyal/.virtualenvs/myproject/lib/python3.6/site-packages/click/core.py", line 464, in invoke
20:14:14 worker.1 |     return callback(*args, **kwargs)
20:14:14 worker.1 |   File "/Users/kushgoyal/.virtualenvs/myproject/lib/python3.6/site-packages/psq/psqworker.py", line 116, in main
20:14:14 worker.1 |     worker.listen()
20:14:14 worker.1 |   File "/Users/kushgoyal/.virtualenvs/myproject/lib/python3.6/site-packages/psq/worker.py", line 94, in listen
20:14:14 worker.1 |     super(MultiprocessWorker, self).listen()
20:14:14 worker.1 |   File "/Users/kushgoyal/.virtualenvs/myproject/lib/python3.6/site-packages/psq/worker.py", line 53, in listen
20:14:14 worker.1 |     tasks = self._safe_dequeue()
20:14:14 worker.1 |   File "/Users/kushgoyal/.virtualenvs/myproject/lib/python3.6/site-packages/psq/worker.py", line 46, in _safe_dequeue
20:14:14 worker.1 |     return inner()
20:14:14 worker.1 |   File "/Users/kushgoyal/.virtualenvs/myproject/lib/python3.6/site-packages/retrying.py", line 49, in wrapped_f
20:14:14 worker.1 |     return Retrying(*dargs, **dkw).call(f, *args, **kw)
20:14:14 worker.1 |   File "/Users/kushgoyal/.virtualenvs/myproject/lib/python3.6/site-packages/retrying.py", line 212, in call
20:14:14 worker.1 |     raise attempt.get()
20:14:14 worker.1 |   File "/Users/kushgoyal/.virtualenvs/myproject/lib/python3.6/site-packages/retrying.py", line 247, in get
20:14:14 worker.1 |     six.reraise(self.value[0], self.value[1], self.value[2])
20:14:14 worker.1 |   File "/Users/kushgoyal/.virtualenvs/myproject/lib/python3.6/site-packages/six.py", line 686, in reraise
20:14:14 worker.1 |     raise value
20:14:14 worker.1 |   File "/Users/kushgoyal/.virtualenvs/myproject/lib/python3.6/site-packages/retrying.py", line 200, in call
20:14:14 worker.1 |     attempt = Attempt(fn(*args, **kwargs), attempt_number, False)
20:14:14 worker.1 |   File "/Users/kushgoyal/.virtualenvs/myproject/lib/python3.6/site-packages/psq/worker.py", line 45, in inner
20:14:14 worker.1 |     return self.queue.dequeue(max=self.tasks_per_poll, block=True)
20:14:14 worker.1 |   File "/Users/kushgoyal/.virtualenvs/myproject/lib/python3.6/site-packages/psq/queue.py", line 114, in dequeue
20:14:14 worker.1 |     self.subscription = self._get_or_create_subscription()
20:14:14 worker.1 |   File "/Users/kushgoyal/.virtualenvs/myproject/lib/python3.6/site-packages/psq/queue.py", line 73, in _get_or_create_subscription
20:14:14 worker.1 |     subscription_name, topic=self.topic)
20:14:14 worker.1 | AttributeError: 'Queue' object has no attribute 'topic'

[discussion] Late ack

We should support late acknowledgement of tasks and a way to support extending the ack deadline:

If a long timeout is set for a long-running task but the task is interrupted prior to completion, it won't be restarted for some time. At the other extreme, if a short timeout is set, a long-running task will be considered incomplete and re-started with another worker. Support for extending timeouts during task execution exists in PubSub, this library should have a way to perform this late_ack.

We should figure out what we want this to look like from a user perspective.

psqworker dies on 503 error

It looks like psqworker dies after completing a task if it gets a google.cloud.exceptions.ServiceUnavailable: 503 exception. AFAIK it's always safe to retry on a 503 (and 503 seems to be VERY common in GCP). This isn't addressed by psq.Retry because it happens outside of the task context (the task succeeds, but the instance won't pick up another one).

Here's the traceback from psqworker in case it's helpful:

Traceback (most recent call last):
  File "/usr/local/bin/psqworker", line 11, in <module>
    sys.exit(main())
  File "/usr/local/lib/python2.7/dist-packages/click/core.py", line 664, in __call__
    return self.main(*args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/click/core.py", line 644, in main
    rv = self.invoke(ctx)
  File "/usr/local/lib/python2.7/dist-packages/click/core.py", line 837, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/usr/local/lib/python2.7/dist-packages/click/core.py", line 464, in invoke
    return callback(*args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/psq/psqworker.py", line 116, in main
    worker.listen()
  File "/usr/local/lib/python2.7/dist-packages/psq/worker.py", line 60, in listen
    self.run_task(task)
  File "/usr/local/lib/python2.7/dist-packages/psq/worker.py", line 69, in run_task
    task.execute(self.queue)
  File "/usr/local/lib/python2.7/dist-packages/psq/task.py", line 96, in execute
    queue.storage.put_task(self)
  File "/usr/local/lib/python2.7/dist-packages/psq/datastore_storage.py", line 64, in put_task
    self.datastore.put(entity)
  File "/usr/local/lib/python2.7/dist-packages/google/cloud/datastore/client.py", line 335, in put
    self.put_multi(entities=[entity])
  File "/usr/local/lib/python2.7/dist-packages/google/cloud/datastore/client.py", line 362, in put_multi
    current.commit()
  File "/usr/local/lib/python2.7/dist-packages/google/cloud/datastore/batch.py", line 265, in commit
    self._commit()
  File "/usr/local/lib/python2.7/dist-packages/google/cloud/datastore/batch.py", line 242, in _commit
    self.project, self._commit_request, self._id)
  File "/usr/local/lib/python2.7/dist-packages/google/cloud/datastore/_http.py", line 628, in commit
    response = self._datastore_api.commit(project, request)
  File "/usr/local/lib/python2.7/dist-packages/google/cloud/datastore/_http.py", line 356, in commit
    return self._stub.Commit(request_pb)
  File "/usr/lib/python2.7/contextlib.py", line 35, in __exit__
    self.gen.throw(type, value, traceback)
  File "/usr/local/lib/python2.7/dist-packages/google/cloud/datastore/_http.py", line 260, in _grpc_catch_rendezvous
    raise error_class(exc.details())
google.cloud.exceptions.ServiceUnavailable: 503 {"created":"@1484350714.247229709","description":"Secure read failed","file":"src/core/lib/security/transport/secure_endpoint.c","file_line":157,"grpc_status":14,"referenced_errors":[{"created":"@1484350714.247155580","description":"EOF","file":"src/core/lib/iomgr/tcp_posix.c","file_line":235}]}

Thanks for taking a look!

Request had invalid authentication credentials

I fairly frequently see errors like "Request had invalid authentication credentials" when I have a worker running for more than a few hours. Here is a full stack-trace:

Traceback (most recent call last):
File "./turbiniactl", line 271, in
elif args.command == 'psqworker':
File "/usr/local/google/home/aaronpeterson/src/psq-env/local/lib/python2.7/site-packages/psq/worker.py", line 94, in listen
super(MultiprocessWorker, self).listen()
File "/usr/local/google/home/aaronpeterson/src/psq-env/local/lib/python2.7/site-packages/psq/worker.py", line 53, in listen
tasks = self._safe_dequeue()
File "/usr/local/google/home/aaronpeterson/src/psq-env/local/lib/python2.7/site-packages/psq/worker.py", line 46, in _safe_dequeue
return inner()
File "/usr/local/google/home/aaronpeterson/src/psq-env/local/lib/python2.7/site-packages/retrying.py", line 49, in wrapped_f
return Retrying(*dargs, **dkw).call(f, *args, **kw)
File "/usr/local/google/home/aaronpeterson/src/psq-env/local/lib/python2.7/site-packages/retrying.py", line 212, in call
raise attempt.get()
File "/usr/local/google/home/aaronpeterson/src/psq-env/local/lib/python2.7/site-packages/retrying.py", line 247, in get
six.reraise(self.value[0], self.value[1], self.value[2])
File "/usr/local/google/home/aaronpeterson/src/psq-env/local/lib/python2.7/site-packages/retrying.py", line 200, in call
attempt = Attempt(fn(*args, **kwargs), attempt_number, False)
File "/usr/local/google/home/aaronpeterson/src/psq-env/local/lib/python2.7/site-packages/psq/worker.py", line 45, in inner
return self.queue.dequeue(max=self.tasks_per_poll, block=True)
File "/usr/local/google/home/aaronpeterson/src/psq-env/local/lib/python2.7/site-packages/psq/queue.py", line 104, in dequeue
return_immediately=not block, max_messages=max)
File "/usr/local/google/home/aaronpeterson/src/psq-env/local/lib/python2.7/site-packages/google/cloud/pubsub/subscription.py", line 365, in pull
self.full_name, return_immediately, max_messages)
File "/usr/local/google/home/aaronpeterson/src/psq-env/local/lib/python2.7/site-packages/google/cloud/pubsub/_gax.py", line 437, in subscription_pull
return_immediately=return_immediately)
File "/usr/local/google/home/aaronpeterson/src/psq-env/local/lib/python2.7/site-packages/google/cloud/gapic/pubsub/v1/subscriber_client.py", line 706, in pull
return self._pull(request, options)
File "/usr/local/google/home/aaronpeterson/src/psq-env/local/lib/python2.7/site-packages/google/gax/api_callable.py", line 428, in inner
return api_caller(api_call, this_settings, request)
File "/usr/local/google/home/aaronpeterson/src/psq-env/local/lib/python2.7/site-packages/google/gax/api_callable.py", line 416, in base_caller
return api_call(*args)
File "/usr/local/google/home/aaronpeterson/src/psq-env/local/lib/python2.7/site-packages/google/gax/api_callable.py", line 376, in inner
return a_func(*args, **kwargs)
File "/usr/local/google/home/aaronpeterson/src/psq-env/local/lib/python2.7/site-packages/google/gax/retry.py", line 127, in inner
' classified as transient', exception)
google.gax.errors.RetryError: GaxError(Exception occurred in retry method that was not classified as transient, caused by <_Rendezvous of RPC that terminated with (StatusCode.UNAUTHENTICATED, Request had invalid authentication credentials. Expected OAuth 2 access token, login cookie or other valid authentication credential. See https://developers.google.com/identity/sign-in/web/devconsole-project.)>)

When using storage, it goes into a death spiral

I have the following code

import psq
from google.cloud import datastore, pubsub
PROJECT_ID='thisisavalidproject'

def adder(a, b):
  return a + b

if __name__ == '__main__':
  pubsub_client = pubsub.Client(project=PROJECT_ID)
  ds_client = datastore.Client(PROJECT_ID)
  q = psq.Queue(pubsub_client, storage=psq.DatastoreStorage(ds_client))
  r = q.enqueue(adder, 5, 6)
  print(r.result())

with the following requirements.txt

google-cloud==0.20.0
psq==0.5.0

When I run this, it just dies. Having inspected the traffic a little, it just keeps doing storage lookups.

Executing a task hangs and fails quietly

I have an async task that I am trying to perform using psq. When I run the task locally, it takes about 5 minutes to execute.

When I deploy it to gcloud and attempt to run it. I can see in the logging that it starts executing, but then it just never finished and there is no logging entry about why it failed. Performing a health check still works, despite the task never finishing.

Code example for testing produces an error

When testing to enqueue a task using the code example in the README.md

    q = psq.Queue(None, async=False)
    r = q.enqueue(adder, 1, 2) # Will be run immediately

produces the following error

    q = psq.Queue(None, async=False)
TypeError: __init__() missing 2 required positional arguments: 'subscriber_client' and 'project'

the following code works for me

    q = psq.Queue(None, None, project=None, async=False)
    r = q.enqueue(adder, 1, 2) # Will be run immediately

Working around abysmal google.cloud.pubsub.Subscription.pull() delays?

So I'm using psq in a light-demand environment, and am encountering abysmal latency on the workers pulling jobs off the queue. As in 10 minutes when things are "speedy", and up to 25 minutes in some cases. I see that the code depends on the behaviour of google.cloud.pubsub.Subscription.pull() in blocking mode. Would modifying the psq code to do its own polling on the unblocked pull() improve that, or would the underlying subscription service still be taking its own sweet time to deliver messages even if we were banging on it every 10 seconds? I can dig into this, but thought you might already have some insights on this problem.

KeyboardInterrupt not working in Worker

Whenever I start a worker, it can't be killed with ctrl-c, and so I end up ctrl-z and killing the worker. This started happening sometime after #34 was submitted (wherein there was a log of other signal processing that got removed). I'm not sure where the KeyboardInterrupt is going since the 'except KeyboardInterrupt' stanza in the Worker never seems to get called, and setting an explicit signal handler in the calling code doesn't seem to be triggered either.

Here is the code I'm using to call the Worker:
https://github.com/google/turbinia/blob/1f70f2aaef409e33710f5f181ad91b5009b95445/turbinia/client.py#L386

psqworker.py:command not found

This is more to I dont really understand how should I run the worker.
I cloned the repository, install psq by pip install psq. Before running run.py, I tried to run psqworker inside the examples directory but it there is an error saying that -bash: psqworker.py: command not found

Please help me, how should I run psqworker?

Graceful shutdown for in-process Worker

I think it would be sensible to use the graceful shutdown scheme used in MultiprocessWorker (stop gracefully when receiving a SIGINT, finishing a running task, stop forcefully on second signal) also for the in-process Worker.

I could make a PR for that as well.

Support for google-cloud-pubsub==0.29.0

It seems that PSQ does not yet support the latest version of the PubSub library which separates the SubscriberClient and the PublisherClient objects (googleapis/google-cloud-python#3859). Are there any plans to do so?

In [1]: from google.cloud import pubsub

In [2]: import psq

In [3]: pclient = pubsub.PublisherClient()

In [4]: sclient = pubsub.SubscriberClient()

In [7]: try:
...: q = psq.Queue(pclient)
...: except AttributeError as e:
...: print 'ERROR: {0!s}'.format(e)
...:
ERROR: 'Client' object has no attribute 'topic'

In [8]: try:
...: q = psq.Queue(sclient)
...: except AttributeError as e:
...: print 'ERROR: {0!s}'.format(e)
...:
ERROR: 'Client' object has no attribute 'topic'

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.