Git Product home page Git Product logo

dramatiq's Introduction

dramatiq

Build Status PyPI version Documentation Discuss

A fast and reliable distributed task processing library for Python 3.


Changelog: https://dramatiq.io/changelog.html
Community: https://groups.io/g/dramatiq-users
Documentation: https://dramatiq.io


Sponsors


Franz: Desktop Client for Apache Kafka

Installation

If you want to use it with RabbitMQ

pip install 'dramatiq[rabbitmq, watch]'

or if you want to use it with Redis

pip install 'dramatiq[redis, watch]'

Quickstart

Make sure you've got RabbitMQ running, then create a new file called example.py:

import dramatiq
import requests
import sys


@dramatiq.actor
def count_words(url):
    response = requests.get(url)
    count = len(response.text.split(" "))
    print(f"There are {count} words at {url!r}.")


if __name__ == "__main__":
    count_words.send(sys.argv[1])

In one terminal, run your workers:

dramatiq example

In another, start enqueueing messages:

python example.py http://example.com
python example.py https://github.com
python example.py https://news.ycombinator.com

Check out the user guide to learn more!

License

dramatiq is licensed under the LGPL. Please see COPYING and COPYING.LESSER for licensing details.

dramatiq's People

Contributors

aberres avatar andrecimander avatar andruskutt avatar asavoy avatar benekastah avatar bersace avatar bogdanp avatar caspervdw avatar davidt99 avatar evstratbg avatar finnlidbetter avatar gdvalle avatar jenstroeger avatar kurtmckee avatar miksyn avatar najamansari avatar orsinium avatar rakanalh avatar rouge8 avatar ryanhiebert avatar ryansm1 avatar staticdev avatar swidoff avatar synweap15 avatar takhs91 avatar thomazthz avatar timdrijvers avatar timgates42 avatar viiicky avatar wsantos avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

dramatiq's Issues

actor declare on a method or classmethod

Hi,

This is not a bug but a wish, I would use the decorator actor on a method or a classmethod have you a solution ?

exemple:


class foo:
    @dramatiq.actor()
    def count_worlds(url):
          pass

A license related recommendation/query

I was originally going to send you an email about this, but I think posting this as a public issue here on github makes it easier to potentially include others in the discussion, and also allow you in future a place you can link to for anyone interested in the outcome.

As an up front comment, I really like this library, what it does, how it does it, and the issues it solves in similar tools. That said, I can't use this library, despite really wanting to, even though almost all the code I write is opensource.

The license you've chosen is the strongest level of protection possible, and I understand your reasons for choosing it based on your comments[1][2] but I think you haven't entirely considering the implications and I hope I can change your mind.

The problem is that the license you've picked is incompatible with every other opensource license other than AGPL itself, and GPLv3[3], and even there the interaction is not great. If this was a final product rather than a library, that would be fine, since it would encourage that code is always written against the core codebase and no one does any customization without sharing it back. I would personally use GPL and AGPL for end service projects. But as a library you are limiting who can use it so heavily that no one easily can, or will do so without realising they are breaking your own license in doing so, and actually need to change their license to one that is compatible (GPLv3 or AGPL). Not to mention that even with the GPL license, the interaction with AGPL requires that the part of their code which runs on a network that uses your AGPL library must be opensource and is itself compatible with the AGPL license or is AGPL itself [4]. This means they really need to break the part that uses your library out into something they can easily share (or make AGPL), or just make their whole project using you library AGPL outright.

What this actually does is make it so that only people also using AGPL, and sort of GPLv3, can use this library if they are an opensource project. No MIT, BSD, Apache, or LGPL licensed projects can ever use this. Not even GPLv2. And they can't/won't even pay for your commercial license since they don't ever intend to close source their work. The commercial license doesn't make sense for those projects.

In my particular case I do a bunch of work in the OpenStack community, and the company I work for is heavily focused on opensource first, with large amounts contributions and even donations/support to the opensource projects we use heavily. I live and breath opensource as much as I can, but because everything in OpenStack is Apache licensed I can never touch this library[5].

I have a piece of work I need to do that splits my program into API and a worker system. Dramatiq is my first choice, my other options are mostly Celery, Kuyruk, or writing a smaller thing myself. I can't use Huey or RQ because OpenStack tends towards RabbitMQ over redis, and our own cloud uses RabbitMQ (so running redis for a single service is a no).

Dramatiq is my best option, and has features I want that don't entirely exist as nicely in Celery or Kuyruk, but despite how much I want to use it, I can't, and won't given the license. The OpenStack foundation itself nor the company I work for will be buying a commercial license because it doesn't make sense (our code is opensource), and that license won't apply to anyone forking our code.

So now that you understand the why behind this post, here comes the suggestion, and do know I'm doing this because I want Dramatiq to succeed over the likes of Celery. Please reconsider using a license that is more applicable to a library such as this and is compatible with the majority of opensource licenses in use at present.

Changing your library to something like LGPLv3 still gives you a fairly strong amount of protection, and the library itself if modified must remain opensource, but can be used as is (without modification) by anyone. Yes this means some people can use your library and make money without you ever getting a cent, but it also encourages others to use your software, gives you more people contributing back, and nothing stops you from still offering a commercial license for anyone who wants to modify your code and not share their changes. This gives you most of what you want, while giving people like me working on opensource projects in the more permissive (and now more common) license space a chance to use it and help you make it better.

If you look at the trends in opensource licenses, most are moving towards permissive ones, and of the projects out there, VERY few are using licenses that are compatible with your library[6]. Only 7% are using GPLv3, and less than 1% is on AGPL. This means the majority can't use this library.

Then if we look at the licenses for all the competitor libraries to Dramatiq, all of them use VERY permissive licenses (BSD, or MIT):
https://github.com/rq/rq/blob/master/LICENSE
https://github.com/celery/celery/blob/master/LICENSE
https://github.com/coleifer/huey/blob/master/LICENSE
https://github.com/cenkalti/kuyruk/blob/master/LICENSE

While the GNU foundation docs themselves recommend using GPL over LGPL for new libraries, that is only if there isn't something that already does the same or similar things. Yes Dramatiq is better (imo) than the above alternatives, but not enough to justify using it because of the license it has. In the cases where there are alternatives GNU recommends LGPL:
Using the ordinary GPL is not advantageous for every library. There are reasons that can make it better to use the Lesser GPL in certain cases. The most common case is when a free library's features are readily available for proprietary software through other libraries. In that case, the library cannot give free software any particular advantage, so it is better to use the Lesser GPL for that library.[7]

I really want to use this library, I'd probably even help fix bugs that I run into, or issues that affect me and gladly contribute back features that make sense. I'd likely even promote it to others that are sick of Celery, or want something a little cleaner. Yet I can't do any of that with the license you've picked, and I bet there will be others who look at Dramatiq and make the same choice, or not realise the license and then run like hell and refactor when they do.

Please reconsider the AGPL, and if you are open to having a discussion about this, lets have one here. Or if that's never going to happen, and you aren't willing to at least have a discussion about that as an option, lets at least have this as a documented confirmation of your choice for anyone else who asks this.

Please let me and others be able to use your library. :)

Foot notes:
  1. https://dramatiq.io/commercial.html
  2. https://news.ycombinator.com/item?id=15681066
  3. https://en.wikipedia.org/wiki/License_compatibility
  4. https://www.gnu.org/licenses/gpl-faq.html#IfLibraryIsGPL
  5. https://governance.openstack.org/tc/reference/licensing.html
  6. https://redmonk.com/sogrady/2017/01/13/the-state-of-open-source-licensing/
  7. https://www.gnu.org/licenses/why-not-lgpl.en.html

Task composition (ie: Celery Canvas)

What is your viewpoint and this projects direction in regards to task composition?

I currently use Celery because it allows me to work with tasks on a high level (chain, group, etc). I know Dramatiq supports callbacks but for as far I can see they are rather limited.

Currently with Dramatiq I cannot trivially:

  • Chain more than 2 Actors.
  • Pass arguments to a chained Actor during composition (unless I pass them through the first task).
  • Composite a more complex task/group and pass it to some other part of my application to have it 'send' (like Celery signatures).

I understand this project might be in it's infancy and you might not want to introduce more complexity then needed, but I'm curious about your roadmap for this project and the use cases.

Worker keeps restarting

Hello,

This caught my eye as i've been running a worker for a couple of days and i saw that this happens quite frequently:

[2017-12-30 19:50:13,310] [PID 2608] [Thread-1] [dramatiq.MainProcess] [INFO] Sending 'SIGHUP' to worker processes...
[2017-12-30 19:50:13,310] [PID 28917] [DummyThread-11] [dramatiq.WorkerProcess(0)] [INFO] Stopping worker process...
[2017-12-30 19:50:13,724] [PID 28917] [MainThread] [dramatiq.worker.Worker] [INFO] Shutting down...
[2017-12-30 19:50:14,734] [PID 28917] [MainThread] [dramatiq.worker.Worker] [INFO] Worker has been shut down.
[2017-12-30 19:50:16,574] [PID 2608] [MainThread] [dramatiq.MainProcess] [INFO] Dramatiq '0.15.1' is booting up.
[2017-12-30 19:50:17,382] [PID 28926] [MainThread] [dramatiq.WorkerProcess(0)] [INFO] Worker process is ready for action.

I didn't take the time to look into it, Is there a reason this happens? or is it a bug?

Task states

When running multiple tasks you often want to know about the tasks state or show some kind of progress to an end user. I believe it would be beneficial for Dramatiq to include such functionality, just like Celery does, which you can read more about here.

If Dramatiq includes some kind of chain or group functionality, it would be great to get the current state of the chain/group.

Handling task failures+retries during testing

As someone new to dramatiq, the retry middleware may cause task failures during testing to appear as if the test worker is hanging indefinitely or is otherwise failing to execute tasks. What actually happened:

  • Some of my application task code failed during testing, causing the retry middleware to kick in.
  • The retry middleware defaults to a 7-day maximum backoff.
  • pytest tends to capture stdout/stderr, and terminating the test run causes that output to be lost.

Given the lack of output and apparent hang, my initial reaction was confusion and to assume that I had somehow misconfigured the test worker. The reality was that everything was working correctly, and pytest was just eating the output.

The unit testing guide should probably mention that apparent hangs may be the result of the retry middleware, and that they should try temporarily setting max_retries=0 or consider disabling the retry middleware during testing.

how to handle timeout?

seems that after fn finished, dramatiq would check the timeout.

#dramatiq.actor.Actor.__call__
def __call__(self, *args, **kwargs):

    try:
        self.logger.info("Received args=%r kwargs=%r.", args, kwargs)
        start = time.perf_counter()
       # call self.fn
        return self.fn(*args, **kwargs)
    finally:
        # finally, we would wait for self.fn finished
        delta = time.perf_counter() - start
        self.logger.info("Completed after %.02fms.", delta * 1000)

but what if i want to kill a task before the task finished when the task timeout?
could i do that?

Performance comparison with RQ, Celery and redis blpop

Hi, how does dramatiq compare against RQ, Celery and redis blocking list pop and possibly others?

I implemented some RQ some time ago and got rid of it in favour of a simpler and much more performant solution: redis blocking list pop.

Cancel long running tasks

Hi there,
is it possible, to revoke enqueued and cancel long running tasks? For example based on the message id used for starting the task? If not, are there any plans in introducing this in a future version?
Cheers
Lars

Hard-coded virtual env name

I am not quite sure what is happening, but I am guessing this is due to something in the build/release process?

When I run the dramatiq-gevent binary, I get the following error:

dramatiq-gevent
zsh: /Users/michael/Envs/planet-ingest/bin/dramatiq-gevent: bad interpreter: /Users/bogdan/.virtualenvs/dramatiq/bin/python: no such file or directory

Class-based actor

Hello,

I've been looking for a celery alternative for quite a while, thanks for taking the time to make this.

A suggestion i'd like to make is to add the ability of defining class-based actor where grouping common functionality among different "tasks" or "actors" is possible.

Thank you.

Handling of task broker made unavailable?

If the task broker becomes unavailable the dramatiq log gets completely flooded with messages like this (about 10 per second or so):

[2018-01-16 14:40:24,146] [PID 22] [Thread-4] [dramatiq.worker.ConsumerThread(default)] [ERROR] Consumer encountered a connection error.
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/dramatiq/worker.py", line 197, in run
    for message in self.consumer:
  File "/usr/local/lib/python3.6/site-packages/dramatiq/brokers/redis.py", line 330, in __next__
    raise ConnectionClosed(e) from None
dramatiq.errors.ConnectionClosed: Error -2 connecting to redis:6379. Name does not resolve.

When sending logs externally this can put quite a strain on the log receiver. Perhaps the retry frequency should be limited, preferably with a delay argument to the Broker instance? 1 second would be a good default I think.

Dramatiq fails if rabbitmq hasn't been started

I noticed this when running dramatiq and django_dramatiq in docker while rabbitmq is still starting. Would it make sense to implement a retry with exponential backoff?

Traceback (most recent call last):
  File "/opt/project/app/venv/lib/python3.6/site-packages/dramatiq/brokers/rabbitmq.py", line 160, in declare_queue
    self._declare_queue(queue_name)
  File "/opt/project/app/venv/lib/python3.6/site-packages/dramatiq/brokers/rabbitmq.py", line 179, in _declare_queue
    return self.channel.queue_declare(queue=queue_name, durable=True, arguments={
  File "/opt/everyprojectlist/app/venv/lib/python3.6/site-packages/dramatiq/brokers/rabbitmq.py", line 95, in channel
    channel = self.state.channel = self.connection.channel()
  File "/opt/project/app/venv/lib/python3.6/site-packages/dramatiq/brokers/rabbitmq.py", line 75, in connection
    parameters=self.parameters)
  File "/opt/project/app/venv/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 374, in __init__
    self._process_io_for_connection_setup()
  File "/opt/project/app/venv/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 414, in _process_io_for_connection_setup
    self._open_error_result.is_ready)
  File "/opt/project/app/venv/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 468, in _flush_output
    raise exceptions.ConnectionClosed(maybe_exception)
pika.exceptions.ConnectionClosed: Connection to 172.17.0.2:5672 failed: [Errno 111] Connection refused

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "manage.py", line 10, in <module>
    execute_from_command_line(sys.argv)
  File "/opt/project/app/venv/lib/python3.6/site-packages/django/core/management/__init__.py", line 367, in execute_from_command_line
    utility.execute()
  File "/opt/project/app/venv/lib/python3.6/site-packages/django/core/management/__init__.py", line 341, in execute
    django.setup()
  File "/opt/project/app/venv/lib/python3.6/site-packages/django/__init__.py", line 27, in setup
    apps.populate(settings.INSTALLED_APPS)
  File "/opt/project/app/venv/lib/python3.6/site-packages/django/apps/registry.py", line 85, in populate
    app_config = AppConfig.create(entry)
  File "/opt/project/app/venv/lib/python3.6/site-packages/django/apps/config.py", line 116, in create
    mod = import_module(mod_path)
  File "/usr/local/lib/python3.6/importlib/__init__.py", line 126, in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
  File "<frozen importlib._bootstrap>", line 978, in _gcd_import
  File "<frozen importlib._bootstrap>", line 961, in _find_and_load
  File "<frozen importlib._bootstrap>", line 950, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 655, in _load_unlocked
  File "<frozen importlib._bootstrap_external>", line 678, in exec_module
  File "<frozen importlib._bootstrap>", line 205, in _call_with_frames_removed
  File "/opt/django_dramatiq/django_dramatiq/apps.py", line 3, in <module>
    from .tasks import broker  # noqa
  File "/opt/django_dramatiq/django_dramatiq/tasks.py", line 39, in <module>
    def delete_old_tasks(max_task_age=86400):
  File "/opt/project/app/venv/lib/python3.6/site-packages/dramatiq/actor.py", line 74, in actor
    return decorator(fn)
  File "/opt/project/app/venv/lib/python3.6/site-packages/dramatiq/actor.py", line 69, in decorator
    priority=priority, broker=broker, options=options,
  File "/opt/project/app/venv/lib/python3.6/site-packages/dramatiq/actor.py", line 89, in __init__
    self.broker.declare_actor(self)
  File "/opt/project/app/venv/lib/python3.6/site-packages/dramatiq/broker.py", line 158, in declare_actor
    self.declare_queue(actor.queue_name)
  File "/opt/project/app/venv/lib/python3.6/site-packages/dramatiq/brokers/rabbitmq.py", line 174, in declare_queue
    del self.consumer
AttributeError: consumer

Retrieve task results by id

Hi, first I want to thank you for this great package. I just started using it and it seems to really work well for me so far.

In my flask based application (built upon your flask example), I want use the results backend to offer an API endpoint that allows to return the results of a given job.

From the celery FAQ, it seems like you can retrieve the results given the unique task id:

result = MyTask.AsyncResult(task_id)
result.get()

Now, how would I implement such a functionality with dramatiq?

question: how to avoid processing the same message multiple times

the contribution guidelines says to ask on SO, but mine is more a knowledge question and not a real problem. From docs I understand that messages are acked after they're processed. It's not clear to me tho how I can avoid to have a message grabbed by multiple workers (do I have to implement myself a distributed lock or something?). @Bogdanp do you have an IRC channel/gitter? If no it could be a nice place where people can ask this kind of questions maybe. Anyway good job, I'm very interested in your project and I'm planning to give it a go to see if we can replace the celery stuff we have now

Dramatiq probably doesn't do at-least-once delivery

Hi there!

It looks like there are two backend stores for Dramatiq: Redis and RabbitMQ. Neither of these systems provides at-least-once delivery; both can lose committed updates when the network fails, nodes pause, or, in some cases, when processes crash. If you're relying on these systems to provide durable queuing (instead of, say, implementing your own replication mechanism across independent RabbitMQs or Redices), it might be a bit optimistic to claim that Dramatiq delivers messages at least once.

Dramatiq worker stuck at Dramatiq '0.14.0' is booting up.

I am experiencing a strange behavior when running dramatiq worker using django:

python manage.py rundramatiq --use-gevent

The log prints out:

[2017-12-02 20:36:53,804] [PID 15625] [MainThread] [dramatiq.MainProcess] [INFO] Dramatiq '0.14.0' is booting up.

But no "workers" announce their readiness (according to the code, they should).
Pressing ctrl-c causes the worker to become ready.

[2017-12-02 20:36:53,804] [PID 15625] [MainThread] [dramatiq.MainProcess] [INFO] Dramatiq '0.14.0' is booting up.
^CException in thread Thread-3:
Traceback (most recent call last):
  File "/home/vagrant/.pyenv/versions/3.6.2/lib/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()
  File "/home/vagrant/.pyenv/versions/3.6.2/envs/project/lib/python3.6/site-packages/watchdog/observers/inotify_buffer.py", line 59, in run
    inotify_events = self._inotify.read_events()
  File "/home/vagrant/.pyenv/versions/3.6.2/envs/project/lib/python3.6/site-packages/watchdog/observers/inotify_c.py", line 296, in read_events
    event_buffer = os.read(self._inotify_fd, event_buffer_size)
KeyboardInterrupt

[2017-12-02 20:36:54,680] [PID 15664] [MainThread] [dramatiq.WorkerProcess(0)] [INFO] Worker process is ready for action.

Note: i have 1088 tasks in my database some of them are being retried while others are queued for processing. Not sure if that helps debugging this issue.

Obscure error when dramatiq is put in the requirement

Hi,

I don't understand why the beaviour is different between

(venv) jssuzanne:tools-api_sensee$ pip install dramatiq
Collecting dramatiq
  Using cached dramatiq-0.13.0-py3-none-any.whl
Collecting prometheus-client==0.0.20 (from dramatiq)
Installing collected packages: prometheus-client, dramatiq
Successfully installed dramatiq-0.13.0 prometheus-client-0.0.20

and dramatiq in the requirement

requirements = [
    'anyblok',
    'dramatiq>=0.12.0',
]
---------------------------
Processing dependencies for toolsapi==0.1.0
Searching for dramatiq>=0.12.0
Reading https://pypi.python.org/simple/dramatiq/
No local packages or working download links found for dramatiq>=0.12.0
error: Could not find suitable distribution for Requirement.parse('dramatiq>=0.12.0')

If I install dramatiq first (pip install dramatiq) and after install my package (pip install .) It's work. But when I do only pip install . I get the version 0.0.4

Have you any solution ?

Message metadata

For tracing purposes I would like to pass some additional metadata with each message.

Task routing

does dramatiq has anything like celery about task routing.

I didn't find any api to send task with queue_name, but I can build message myself to send task to specific queue.

 Message(
            queue_name="only_i_can_receive",
            actor_name="count_words",
            args=("https://www.google.com", ),
        )
broker.enqueue(message, delay=delay)

but i can't start worker only consumer for specific queue like celery.

celery worker tasks -Q tasks.to.nodes.1

Example test fixtures / broker.flush_all is not sufficient after test failure

The suggested broker/worker fixtures (used by โ€ฆ:_dramatiq) look like:

from yourapp import broker

@pytest.fixture()
def stub_broker():
  broker.flush_all()
  return broker

@pytest.fixture()
def stub_worker():
  worker = Worker(broker, worker_timeout=100)
  worker.start()
  yield worker
  worker.stop()

A test failure before the broker/worker can be joined may cause another test to hang indefinitely. eg,

@dramatiq.actor()
def example():
     ...

def test_a(broker, worker):
    example.send()
    
    raises_unintended_exception()

    # never reached
    broker.join(example.queue_name)
    worker.join()

def test_b(broker, worker):
    example.send()
   
    # hangs indefinitely
    broker.join(example.queue_name)
    worker.join()

While test_a fails and exits, the broker is in a bad state when it reaches test_b and will hang on join.

Note that this is not an issue with dramatiq's own stub_broker fixture, as a new broker is created and closed for each test. In contrast, django_dramatiq's broker is globally created in the apps config, and can only issue a flush_all() between tests. Test case:

def test_stub_broker_can_join_after_flush_after_not_joining(stub_broker, stub_worker):
    # Given that I have an actor
    @dramatiq.actor
    def do_work():
        pass

    # "test a"
    stub_broker.flush_all()
    do_work.send()
    # test failure / no joining

    # "test b"
    stub_broker.flush_all()
    do_work.send()
    
    stub_broker.join(do_work.queue_name)
    stub_worker.join()

Issues with recovering from closed Pika connections

Here is the traceback:

ERROR:pika.adapters.base_connection:Read empty data, calling disconnect
ERROR:pika.adapters.blocking_connection:Connection close detected; result=BlockingConnection__OnClosedArgs(connection=<SelectConnection CLOSED socket=None params=<URLParameters host=rabbitmq.farmlogs port=5672 virtual_host=planet ssl=False>>, reason_code=-1, reason_text='EOF')
Traceback (most recent call last):
  File "bin/cron", line 79, in <module>
    schedule.run_pending()
  File "/usr/local/lib/python3.6/dist-packages/schedule/__init__.py", line 452, in run_pending
    default_scheduler.run_pending()
  File "/usr/local/lib/python3.6/dist-packages/schedule/__init__.py", line 75, in run_pending
    self._run_job(job)
  File "/usr/local/lib/python3.6/dist-packages/schedule/__init__.py", line 129, in _run_job
    ret = job.run()
  File "/usr/local/lib/python3.6/dist-packages/schedule/__init__.py", line 377, in run
    ret = self.job_func()
  File "bin/cron", line 63, in check_and_process_new_flags
    process_enterprise.send(enterprise_id)
  File "/usr/local/lib/python3.6/dist-packages/dramatiq/actor.py", line 104, in send
    return self.send_with_options(args=args, kwargs=kwargs)
  File "/usr/local/lib/python3.6/dist-packages/dramatiq/actor.py", line 129, in send_with_options
    return self.broker.enqueue(message, delay=delay)
  File "/usr/local/lib/python3.6/dist-packages/dramatiq/brokers/rabbitmq.py", line 206, in enqueue
    raise ConnectionClosed(e) from None
dramatiq.errors.ConnectionClosed: (-1, 'EOF') 

I'm wondering if this has something to do with the ELB that I have in front of my RabbitMQ hosts. These connection failures happen all the time and then work gets dropped on the floor. I'm not sure if this is an issue I need to try and patch at the Pika level or the Dramatiq level.

Here is a thread from the Pika issue tracker about some similar problems that describes their attitude towards the problem: pika/pika#856

Long story short it looks like they don't intend to make this a concern of Pika and it's a responsibility of whatever uses Pika.

Any thoughts here? At this point I am struggling with this issue in some heavy production workloads. I'd ideally like to fix the root issue but if not, I am comfortable abandoning RabbitMQ entirely and jumping to Redis if this is likely less of an issue in that environment.

How to use authentication

Hi,

I don't found the authentication in your documentation,
How to protect it whatever the broker (rabbitmq, redis, ...)? What parameter ? where ?

Task flows and AWS SQS support

Is it possible to use dramatiq to model task flows - run tasks that have dependencies between them? Compared to Celery which has primitives I didn't find anything about this capability in docs (considering this is not possible) - also Celery is quite limited with some advanced task flows. I'm quite interesting in this as we were dealing with this for quite some time so finally I decided to implement selinon.

Also, is there support for AWS SQS?

Can I use time_limit in send_with_options/message_with_options?

Use case: I have an actor that sometimes takes longer than 10 minutes to run. I can predict this. Let's say I want to double the default time_limit. If I use the decorator @actor(time_limit=1_200_000) then the actor will always have the 20-minute time_limit. I only want to increase the time_limit when I know I'm handing the actor a larger amount of work. Usually, I want it to have the default time_limit.

quirky_actor.message_with_options(args=(config.get('whats_my_motivation'),), time_limit=1_000*60*20)

I'm having a hard time figuring out if this even works. Should it work?

No stacktrace with critical logs from worker

I'm getting the error message "Consumer encountered an error: 'NoneType' object has no attribute 'decode'", but there's no stack trace. How do you feel about adding exc_info=True into the logger.critical call in dramatiq.worker line 217?

As to why the error is happening, I deleted some jobs from the delayed queue and I'm guessing there's still something in redis that knows about them, but I don't know what.

ConcurrentRateLimiter limit=1, with no retries?

First of all - Big thanks @Bogdanp for creating a truly pythonic task processor! Celery has always given me a kind of bitter after taste..

I'm working towards a backend system that can't handle concurrent writes to the same database table. On top of that the task writing to this system might fail half-way and then we have no way to know where to resume when the task is retried (unless I keep state in the app which I'd like to avoid to keep things simple), so I've passed max_retries=0 to dramatic.actor and made the task notify me of any failures.

So my thought was that dramatiq.rate_limits.ConcurrentRateLimiter would be a perfect fit here. In the example in the cookbook though you're using it inside the task function. But I guess this wouldn't work in my scenario as the task will never be retried... Is there any way to have the mutex apply to the task itself so that it never even starts if another instance of that task is already executing?

In my head an API like this would do it for my scenario:

import dramatiq
import time

from dramatiq.rate_limits import ConcurrentRateLimiter
from dramatiq.rate_limits.backends import RedisBackend

backend = RedisBackend()
DISTRIBUTED_MUTEX = ConcurrentRateLimiter(backend, "distributed-mutex", limit=1)

@dramatiq.actor(rate_limiter=DISTRIBUTED_MUTEX, max_retries=0)
def one_at_a_time():
  print("I'll never run concurrently! And never twice!")

Or maybe I'm going all wrong with this?

Any input would be greatly appreciated!

dramatiq should work with python >= 3.4.

Hi,

Actually dramatiq works only with python 3.6 and 3.7a. the linux distribution haven't got theses versions of python. Why don't you keep the capability to use use Python 3.4 and highter

Improve Message.encode / Message.decode to add adapter

Hi,

I try to send uuid in the arguments and I don't want cast this value when I call my actor.

dramatiq should add some hook to add adapter for the json (uuid, datetime, date, ...) to let the dev to implement their adapter.

dramatiq dependencies

I noticed that by default, dramatiq installs the dependencies for rabbitmq and the watch feature. I think this is due to the following lines:

dramatiq/setup.py

Lines 44 to 47 in f6cea0d

extra_dependencies[""] = list(set(
extra_dependencies["rabbitmq"] +
extra_dependencies["watch"]
))

Executing pip install dramatiq[redis] will still install in the rabbitmq and watch dependencies.

Licensing question

Not sure if this is the correct place to ask, but I didn't see any information regarding licensing questions, just a form for license requests.

I am working on integrating Dramatiq into my commercial Django application. Currently, no Dramatiq code is in production. You could say that I am at the "evaluation" stage.

Now, there are large parts of my codebase that I would have no problem open sourcing. However, there are a couple of proprietary ranking algorithms that absolutely cannot be open sourced. So, my question is, how much of my codebase needs to be open sourced in order to qualify for the open source license?

More information can be provided upon request through more private communication channels.

Suggestion: unify message/message_with_options

Minor suggestion - message and message_with_options could be combined into a single method with a simplified signature. Currently, args/kwargs are passed differently between the two methods, with message_with_options fairly verbose, as args/kwargs are passed as keyword-only arguments.

Suggested changes:

  • args and kwargs should always be passed as values (list, dict)
  • Only options should be required to be passed by name.
def message(self, args=None, kwargs=None, **options):
     ...

# similarly, send/send_with_options can be combined 
def send(self, args=None, kwargs=None, *, delay=None, **options):
    ...

# eg,
pipeline(
    actor_a.message([1, 2], pipe_ignore=True),
    actor_b.message([3, 4], pipe_ignore=True),
)

# instead of 
pipeline(
    actor_a.message_with_options(args=[1, 2], pipe_ignore=True),
    actor_b.message_with_options(args=[3, 4], pipe_ignore=True),
)

Again, just a minor suggestion. Not sure if it's worth it to introduce a breaking API change.

task routing problem

Hi

I have

server A,which start dramatiq like this:
dramatiq --path . --processes 4 --threads 4 tasks

server B,start like this:
dramatiq --path . --processes 4 --threads 4 tasks --queues t1

and in my tasks.py,there is a function like this:
@dramatiq.actor(max_retries=0, queue_name="t1")
def func():
print(122)

now, I run:
func.send()

and this job is sent to server A.

but I wish it sends to server B.

Is there anything wrong with my work?

Thanks:)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.