Git Product home page Git Product logo

celery-singleton's Introduction

Celery-Singleton

Duplicate tasks clogging up your message broker? Do time based rate limits make you feel icky? Look no further! This is a baseclass for celery tasks that ensures only one instance of the task can be queued or running at any given time. Uses the task's name+arguments to determine uniqueness.

Table of Contents

Prerequisites

celery-singleton uses the JSON representation of a task's delay() or apply_async() arguments to generate a unique lock and stores it in redis. By default it uses the redis server of the celery result backend. If you use a different/no result backend or want to use a different redis server for celery-singleton, refer the configuration section for how to customize the redis. To use something other than redis, refer to the section on backends

So in gist:

  1. Make sure all your tasks arguments are JSON serializable
  2. Your celery app is configured with a redis result backend or you have specified another redis/compatible backend in your config

If you're already using a redis backend and a mostly default celery config, you're all set!

Quick start

$ pip install celery-singleton

import time
from celery_singleton import Singleton
from somewhere import celery_app

@celery_app.task(base=Singleton)
def do_stuff(*args, **kwargs):
	time.sleep(4)
	return 'I just woke up'

# run the task as normal
async_result = do_stuff.delay(1, 2, 3, a='b')
async_result2 = do_stuff.delay(1, 2, 3, a='b')

assert async_result == async_result2  # These are the same, task is only queued once

That's it! Your task is a singleton and calls to do_stuff.delay() will either queue a new task or return an AsyncResult for the currently queued/running instance of the task.

How does it work?

The Singleton class overrides apply_async() of the base task implementation to only queue a task if an identical task is not already running. Two tasks are considered identical if both have the same name and same arguments.

This is achieved by using redis for distributed locking.

When you call delay() or apply_async() on a singleton task it first attempts to aquire a lock in redis using a hash of [task_name+arguments] as a key and a new task ID as a value. SETNX is used for this to prevent race conditions. If a lock is successfully aquired, the task is queued as normal with the apply_async method of the base class. If another run of the task already holds a lock, we fetch its task ID instead and return an AsyncResult for it. This way it works seamlessly with a standard celery setup, there are no "duplicate exceptions" you need to handle, no timeouts. delay() always returns an AsyncResult as expected, either for the task you just spawned or for the task that aquired the lock before it. So continuing on with the "Quick start" example:

a = do_stuff.delay(1, 2, 3)
b = do_stuff.delay(1, 2, 3)

assert a == b  # Both are AsyncResult for the same task

c = do_stuff.delay(4, 5, 6)

assert a != c  # c has different arguments so it spawns a new task

The lock is released only when the task has finished running, using either the on_success or on_failure handler, after which you're free to start another identical run.

# wait for a to finish
a.get()

# Now we can spawn a duplicate of it
d = do_stuff.delay(1, 2, 3)

assert a != d

Handling deadlocks

Since the task locks are only released when the task is actually finished running (on success or on failure), you can sometimes end up in a situation where the lock remains but there's no task available to release it. This can for example happen if your celery worker crashes before it can release the lock.

A convenience method is included to clear all existing locks, you can run it on celery worker startup or any other celery signal like so:

from celery.signals import worker_ready
from celery_singleton import clear_locks
from somewhere import celery_app

@worker_ready.connect
def unlock_all(**kwargs):
    clear_locks(celery_app)

An alternative is to set a lock expiry time in the task or app config. This makes it so that locks are always released after a given time.

Backends

Redis is the default storage backend for celery singleton. This is where task locks are stored where they can be accessed across celery workers. A custom redis url can be set using the singleton_backend_url config variable in the celery config. By default Celery Singleton attempts to use the redis url of the celery result backend and if that fails the celery broker.

If you don't want to use redis you can implement a custom storage backend. An abstract base class to inherit from is included in celery_singleton.backends.BaseBackend and the source code of RedisBackend serves as an example implementation. Once you have your backend implemented, set the singleton_backend_class configuration variables to point to your class.

Task configuration

unique_on

This can be used to make celery-singleton only consider certain arguments when deciding whether two tasks are identical. (By default, two tasks are considered identical to each other if their name and all arguments are the same).

For example, this task allows only one instance per username, other arguments don't matter:

@app.task(base=Singleton, unique_on=['username', ])
def do_something(username, otherarg=None):
    time.sleep(5)


task1 = do_something.delay(username='bob', otherarg=99)
task2 = do_something.delay(username='bob', otherarg=100)  # this is a duplicate of task1
assert task1 == task2

Specify an empty list to consider the task name only.

raise_on_duplicate

When this option is enabled the task's delay and apply_async method will raise a DuplicateTaskError exception when attempting to spawn a duplicate task instead of returning the existing task's AsyncResult This is useful when you want only one of a particular task at a time, but want more control over what happens on duplicate attempts.

from celery_singleton import Singleton, DuplicateTaskError


@app.task(base=Singleton, raise_on_duplicate=True)
def do_something(username):
    time.sleep(5)

task1 = do_something.delay('bob')
try:
    task2 = do_something.delay('bob')
except DuplicateTaskerror as e:
    print("You tried to create a duplicate of task with ID", e.task_id)

This option can also be applied globally to all Singleton tasks by setting singleton_raise_on_duplicate in the app config. The task level option always overrides the app config when supplied.

lock_expiry

Number of seconds until the task lock expires. This is useful when you want a max of one task queued within a given time frame rather than strictly one at a time. This also adds some safety to your application as it guarantees that locks will eventually be released in case of worker crashes and network failures. For this use case it's recommended to set the lock expiry to a value slightly longer than the expected task duration.

Example

@app.task(base=Singleton, lock_expiry=10)
def runs_for_12_seconds():
    self.time.sleep(12)


task1 = runs_for_12_seconds.delay()
time.sleep(11)
task2 = runs_for_12_seconds.delay()

assert task1 != task2  # These are two separate task instances

This option can be applied globally in the app config with singleton_lock_expiry. Task option supersedes the app config.

App Configuration

Celery singleton supports the following configuration option. These should be added to your Celery app config. Note: if using old style celery config with uppercase variables and a namespace, make sure the singleton config matches. E.g. CELERY_SINGLETON_BACKEND_URL instead of singleton_backend_url

Key Default Description
singleton_backend_url celery_backend_url The URL of the storage backend. If using the default backend implementation, this should be a redis URL. It is passed as the first argument to the backend class.
singleton_backend_class celery_singleton.backend.RedisBackend The full import path of a backend class as string or a reference to the class
singleton_backend_kwargs {} Passed as keyword arguments to the backend class
singleton_json_encoder_class None (json.JSONEncoder) Optional JSON encoder class for generating lock. Useful for task arguments where objects can be reliably marshalled to string (such as uuid.UUID)
singleton_key_prefix SINGLETONLOCK_ Locks are stored as <key_prefix><lock>. Use to prevent collisions with other keys in your database.
singleton_raise_on_duplicate False When True an attempt to queue a duplicate task will raise a DuplicateTaskerror. The default behavior is to return the AsyncResult for the existing task.
singleton_lock_expiry None (Never expires) Lock expiry time in second for singleton task locks. When lock expires identical tasks are allowed to run regardless of whether the locked task has finished or not.

Testing

Tests are located in the /tests directory can be run with pytest

pip install -r dev-requirements.txt
python -m pytest

Some of the tests require a running redis server on redis://localhost To use a redis server on a different url/host, set the env variable CELERY_SINGLETON_TEST_REDIS_URL

Contribute

Please open an issue if you encounter a bug, have any questions or suggestions for improvements or run into any trouble at all using this package.

celery-singleton's People

Contributors

dmartin avatar lpsinger avatar reorx avatar steinitzu avatar suor avatar tony avatar wangsha avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

celery-singleton's Issues

Exception when Celery uses Redis Sentinel

If my celery broker_url is configured for Redis Sentinel (per celery documentation):

app.conf.broker_url = 'sentinel://:password@localhost:26379;sentinel://:password@localhost:26380;sentinel://:password@localhost:26381'
app.conf.broker_transport_options = { 'master_name': "cluster1", sentinel_kwargs': {'password': 'password'} }

I get the following exception:

venv/lib/python3.9/site-packages/celery_singleton/singleton.py", line 106, in apply_async
    task = self.lock_and_run(**run_args)
venv/lib/python3.9/site-packages/celery_singleton/singleton.py", line 119, in lock_and_run
    lock_aquired = self.aquire_lock(lock, task_id)
venv/lib/python3.9/site-packages/celery_singleton/singleton.py", line 51, in aquire_lock
    return self.singleton_backend.lock(lock, task_id, expiry=expiry)
venv/lib/python3.9/site-packages/celery_singleton/singleton.py", line 42, in singleton_backend
    self._singleton_backend = get_backend(self.singleton_config)
venv/lib/python3.9/site-packages/celery_singleton/backends/__init__.py", line 22, in get_backend
    _backend = klass(url, **kwargs)
venv/lib/python3.9/site-packages/celery_singleton/backends/redis.py", line 11, in __init__
    self.redis = Redis.from_url(*args, decode_responses=True, **kwargs)
venv/lib/python3.9/site-packages/redis/client.py", line 696, in from_url
    connection_pool = ConnectionPool.from_url(url, db=db, **kwargs)
venv/lib/python3.9/site-packages/redis/connection.py", line 1051, in from_url
    raise ValueError('Redis URL must specify one of the following '
ValueError: Redis URL must specify one of the following schemes (redis://, rediss://, unix://)

Task looses Flask app context

I am trying to impliment Singleton with Flask and it seems like the celery base is overriden by Singleton, which essentially runs the task without context.

Flask docs advise that Celery is configured as below:

`def make_celery(app):
    celery = Celery(
        app.import_name,
        backend=app.config['CELERY_RESULT_BACKEND'],
        broker=app.config['CELERY_BROKER_URL']
    )
    celery.conf.update(app.config)

    class ContextTask(celery.Task):
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return self.run(*args, **kwargs)

    celery.Task = ContextTask
    return celery
`

Is there any way of using Singleton and not overriding the app Context when a task is run?

Multiple tasks do not generate singleton locks

I'm running two EC2 instances connected to 1 Redis cache. Each EC2 instance is running a celery beat and 4 workers of celery 4.4.7 and celery-singleton 0.3.1.
I'm trying to use celery-singleton to only queue/run one copy of the task from whichever celery beat schedules first.
I have 14 tasks configured with base=Singleton and lock_expiry > task time.
I see only 10 singleton lock keys in my Redis.
Out of the 14 tasks, 9 of them run twice.
Out of the 14 tasks, 3 have parameters. One of those runs once, the other 2 run twice.

One of the tasks is configured with @app.task(base=Singleton, lock_expiry=60, raise_on_duplicate=True) and it still runs two tasks and does not raise an exception.

What could be going wrong so that my 14 configured CELERY_BEAT_SCHEDULE tasks are generating 23 tasks and 10 singleton locks? Why isn't raise_on_duplicate triggering?

Status check on if you're around?

Hi @steinitzu, just wanted to check-in! Sorry for any interruption and hope thing are okay!

For this project:

Are you open to co-maintainers or anything to keep the project maintained? Do you still want to lead this project?

Do you have any ambitions of coming back to the project possibly, in time? If none, are you open to moving it to @jazzband?

The reason I ask because I've had a few PRs in Pull requests and a few others do as well.

What do you think? What's your disposition with the project? (And maybe you will end up coming back)

Thank you!

Automatic lock release on task apply_async

I am using celery-signleton on a project and it saved us a bunch of headaches.
Thank you !

We encountered some exploitation trouble when restarting some workers and have implemented a way to challenge a singleton lock every time a task is submitted.
Doing so, an implementation for scanning celery worker state (see #17 ) and releasing the lock (see #3 ) have been made.
Since the two previous issues are related subjects, this new behavior might be merge. Of course, such behavior has no guaranties to have predictable implementations as the worker state cannot be known for sure. We used it on tasks scheduled regularly where the chances of having a schedule and a worker or scheduler restart simultaneously where very low.

support for callable unique_on taking args, kwargs

This is useful if you have a task which takes a complex object, but in fact uniqueness is only done based on single field of it, such as tasks taking User, but uniqueness check only needs user.id.

Of course you could argue, you shouldn't have such celery tasks with such complex object as params, but I think it should be left to the user to decide if is right for them or not.

At the same time it seems very easy to implement - for now I'm using custom subclass:

class UniqueFunctionSingleton(celery_singleton.Singleton):
    def generate_lock(self, task_name, task_args=None, task_kwargs=None):
        unique_on = self.unique_on
        task_args = task_args or []
        task_kwargs = task_kwargs or {}
        if callable(unique_on):
            unique_args = unique_on(*task_args, **task_kwargs)
            unique_kwargs = None
        else:
            if unique_on:
                if isinstance(unique_on, str):
                    unique_on = [unique_on]
                sig = inspect.signature(self.run)
                bound = sig.bind(*task_args, **task_kwargs).arguments

                unique_args = []
                unique_kwargs = {key: bound[key] for key in unique_on}
            else:
                unique_args = task_args
                unique_kwargs = task_kwargs
        return util.generate_lock(
            task_name,
            unique_args,
            unique_kwargs,
            key_prefix=self.singleton_config.key_prefix,
        )

Option to release lock on task acceptance

My app watches for data changes, and then schedules a singleton task for cache regeneration. The singleton is very helpful, because i'm not queueing the same task multiple times if data changes in succession; any additional data changes will be reflected whenever the task is finally run.

However, i'm dealing with a race condition: if a task has been accepted, and data has changed after the task has accessed the database but before the task is completed (which is when the lock is released), the latest changes will not be reflected in the result- and any attempts to re-schedule the task during that time will fail because the lock still exists. i think that in my case, i need the lock to be released when the task is accepted, not when it's finished. Sure, i will have a case where the same task is being run simultaneously, but the first task's results are already going to be stale before the task is completed anyway.

Celery deals with these different scenarios with acks_late, so maybe one approach is to release the lock in conjunction with task acknowledgement. Otherwise, an additional setting is desirable, so i can have the lock released during the early acknowledgment, rather than after the task is completed.

Ability to chain tasks

I tried with many task dedupe libs including yours and this doesn't work:

(task1.si(42) | task2.s(1)).delay()
(task1.si(42) | task2.s(2)).delay()

I want task1(42) to be deduped and then both task2(1) and task2(2) be ran after it finished. Is this possible?

Singleton Running, Singleton Queued

As a toy case, assume we're updating a standard deviation field for a specific bucket of values:

  • We add a value to the DB
  • We queue that bucket for calculation
  • The task starts and loads the bucket (and starts work)
  • We add a value to the DB
  • We try to queue the bucket (but singleton blocks it)
  • The task finishes (without incorporating the latest value)

Am I correct in my understanding of how this package works? It would block the requeue so our value at the end of this sequence isn't fully updated.

If so, is it hard to adjust this package to only singleton on queued-but-not-running tasks?

Running into an issue with DatabaseBackend

I have my settings configured to use redis as my broker but I'm running into this when I run task.delay()

  File "/Users/leothelion/virtualenv/lib/python3.6/site-packages/celery_singleton/singleton.py", line 22, in aquire_lock
    return app.backend.client.setnx(lock, task_id)
AttributeError: 'DatabaseBackend' object has no attribute 'client'

This is what my settings.py looks like

CELERY_BROKER_URL = 'redis://localhost:6379'
CELERY_BACKEND_URL = 'amqp://'
CELERY_ACCEPT_CONTENT = ['application/json', 'pickle']
CELERY_TASK_SERIALIZER = 'pickle'
CELERY_RESULT_SERIALIZER = 'pickle'
CELERY_RESULT_BACKEND = 'django-db'

Are there any configs that I have to change to make celery-singleton work? Thanks

Inconsistency with countdown

I have a task:

@shared_task(bind=True, base=Singleton, lock_expiry=10, raise_on_duplicate=True)
def encode_posts(self):
    # do some things, which might take longer than 15s, in which case call encode_posts.apply_async() else:
    encode_posts.apply_async(countdown=15)

the problem is, sometimes the code runs as I want it, i.e. runs every 15s. Other times it raises the duplicate exception.

What I want is to release the lock before calling encode_posts.apply_async(countdown=15)

celery 5 compatibility release

I've tested on my project. celery 5 has no breaking changes. would love to have a new release that is celery-5 compatible. currently there is dependency conflicts.

AttributeError when get_existing_task_id runs for a lock which no longer exists

i think this is a race condition, which happens when a lock doesn't exist any longer.

django.request [ERROR] [exception:handle_uncaught_exception] Internal Server Error: /mypath
Traceback (most recent call last):
  File "/usr/local/lib/python3.5/site-packages/django/core/handlers/exception.py", line 41, in inner
    response = get_response(request)
  [...]
  File "/myapp/views.py", line 149, in my_view
    my_task.apply_async(args=(force_text(foo),), priority=9)
  File "/usr/local/lib/python3.5/site-packages/celery_singleton/singleton.py", line 69, in apply_async
    existing_task_id = self.get_existing_task_id(lock)
  File "/usr/local/lib/python3.5/site-packages/celery_singleton/singleton.py", line 26, in get_existing_task_id
    return app.backend.client.get(lock).decode()
AttributeError: 'NoneType' object has no attribute 'decode'

This could be a condition which only happens in my own implementation, which looks like this:

class MySingleton(Singleton):
    """ Implementation of the celery Singleton which leaves lock releasing to the task.
    """
    abstract = True

    def on_failure(self, *args, **kwargs):
        pass

    def on_success(self, *args, **kwargs):
        pass

@shared_task(ignore_result=True, bind=True, base=MySingleton)
def my_task(self, my_arg):
    self.release_lock(my_arg)
    # [...]

Include parse-url support for rabbitmq (amqp://) scheme

Problem: Singleton base are not supported if rabbitmq broker are used to assimilate the tasks.
Error: Redis URL must specify one of the following schemes (redis://, rediss://, unix://)

If there is another way to use singleton with the rabbitmq broker, I would be very grateful to know. Thanks!

recap: the problem is that when I use rabbitmq as a broker, I can't use Singleton property inside of decorator as a base, because parse_url method doesn't support the rabbitmq system

env.config file:

CELERY_BROKER_HOST=redis://127.0.0.1:6379

tasks.py file:

@app.task(base=Singleton, name="create_or_update_user")
def create_or_update(user_id: str):
    data = async_to_sync(update_or_create_user)(user_id=user_id)

    return data

create_user.py file:

data = create_or_update.apply_async(
       args=[user_id],
       countdown=CELERY_TASKS_COUNTDOWN, 
)

Logging

To aid, I would propose we add logging to the package

  1. job name, args, kwargs + generating ID
  2. checking for lock
  3. creating lock
  4. clearing lock

By default, they'd be at a super-low logging level, e.g. DEBUG (or NOTSET?)

Additionally / optionally:

  • pure python example of setting the logging levels, to stdout, etc.

    (this treads a bit into helping the user do what they could themselves as it's standard library, but I think that logging is actually configured/tweaked rarely enough its helpful to the user to see an example)

  • setting a global baseline, e.g. CELERY_SINGLETON_LOG_LEVEL, comparable to DJANGO_LOG_LEVEL

lock by task name only

Hi, loving this project and had a unique use case that I don't believe thru my testing is covered here just yet.

The summary is pretty simple, I just wanted to lock a task to only it's task name. I tested with the unique decorator arg but I couldn't get it to work as expected.

I did end up getting something to work by forking + modifying. Essentially two minor changes, I added a unique_on keyword such that if unique_on == 'none' then it'll run a task hash for only the task name (no args or kwargs). I then set a self.lock_id variable on the object and use this in the release, unlock, etc functions versus rehashing the key.

Lock isn't deleted if task killed by hard-time limit

If i set a hard time limit and the task exceeds that limit the task gets killed. But Celery Singleton doesn't delete it, so it keeps it from creating new tasks.

Proof:
Hard time limit exceeded:

[2021-10-01 07:33:44,026: ERROR/MainProcess] Hard time limit (300s) exceeded for tasks.<some_task>[04c8d60c-c7c7-43f0-ad8c-0d478da732a0]
[2021-10-01 07:33:44,138: ERROR/MainProcess] Process 'ForkPoolWorker-3' pid:3944596 exited with 'signal 9 (SIGKILL)'

Then celery says this task still exists:

Traceback (most recent call last):
File "/home/build/dev/ansible-integration/project_overview/env/lib/python3.6/site-packages/celery/app/trace.py", line 515, in trace_task
priority=task_priority
File "/home/build/dev/ansible-integration/project_overview/env/lib/python3.6/site-packages/celery/canvas.py", line 219, in apply_async
return _apply(args, kwargs, **options)
File "/home/build/dev/ansible-integration/project_overview/env/lib/python3.6/site-packages/celery_singleton/singleton.py", line 116, in apply_async
return self.on_duplicate(existing_task_id)
File "/home/build/dev/ansible-integration/project_overview/env/lib/python3.6/site-packages/celery_singleton/singleton.py", line 141, in on_duplicate
task_id=existing_task_id,
celery_singleton.exceptions.CelerySingletonException: Attempted to queue a duplicate of task ID 04c8d60c-c7c7-43f0-ad8c-0d478da732a0

For now ill try using soft time limits, which should solve my problem all together, that way my tasks can finish up normally (so it is deleted from that celery-singleton storage or whatever). I don't think this is really that big of an issue, just please address this somewhere so people know.

Document usage of Redis + SSL

Ran into some issues getting setup with SSL using celery-singleton

As I couldn't find a working way to pass args necessary to the redis instance used I ended up creating a RedisSSLBackend class and configuring celery with the following:

app=Celery(
    __name__,
    broker=config.REDIS_URI,
    singleton_backend_class=RedisSSLBackend
    broker_use_ssl={
        "ssl_cert_reqs": SSL_VERIFY_MODE,
    },
)
import ssl
from typing import Any
from urllib.parse import urlparse

import redis
from celery_singleton.backends.redis import RedisBackend

from app import config

SSL_VERIFY_MODE: ssl.VerifyMode = ssl.CERT_NONE

class RedisSSLBackend(RedisBackend):
    def __init__(self, *_args: Any, **_kwargs: Any) -> None:
        redis_uri = urlparse(config.REDIS_URI)
        self.redis = redis.Redis(
            host=redis_uri.hostname,
            port=redis_uri.port,
            ssl=True,
            ssl_cert_reqs=SSL_VERIFY_MODE,
        )

unique_on when all parameters are non unique

I have a task

@app.task(bind=True, base=Singleton)
def do_something(self, otherarg=None):
    time.sleep(5)

I would like the otherarg to be not significant in the sense of task uniqueness. How do I express this by using
unique_on. The option unique_on=[] didn't work

RabbitMQ

Hi, can i use the RabbitMQ with this lib?

Singleton killing worker when trying to retry a task

Singleton killing worker when trying to retry.

  • celery-singleton==0.3.1
  • celery==5.2.2
@shared_task(base=base_singleton, raise_on_duplicate=False, lock_expiry=21300, autoretry_for=(Exception,), max_retries=2)

If a task raises an exception, the worker is killed. This error appears en logs.
celery.exceptions.Reject: (DuplicateTaskError('Attempted to queue a duplicate of task ID 2deba45b-f683-4f7a-bef1-f1df22b32895'), False)
[2023-10-11 14:15:05,872: CRITICAL/MainProcess] Unrecoverable error: TypeError("init() missing 1 required positional argument: 'task_id'")

Unable to use uuid object

Hello,

since im using celery-singleton

Call Task:
do_something = allocate_something.apply_async(kwargs={"user": user.pk})

Task itself:


@app.task(name="Allocate something", base=Singleton, ignore_result=False)
def allocate_something(user):
    user = User.objects.get(pk=user)

I Always get back the following TypeError: Object of type UUID is not JSON serializable

Can smb help?

Celery will delete SINGLETONLOCK key when running task

Environment:
celery==4.1.0
celery-singleton==0.1.3
redis==2.10.6

Celery will delete SINGLETONLOCK key when running task, adding a 'celery' prefix to the key will fix the issue.
I think using a separate redis backend will be better.

Default `kwargs=None` cannot be unpacked

A task called without kwargs results in a TypeError because the kwargs=None default can't be unpacked with **. For example:

mytask.apply_async(args=(1,), priority=3)

Here's the traceback:

Traceback (most recent call last):
[...]
  File "/usr/local/lib/python2.7/site-packages/celery_singleton/singleton.py", line 55, in apply_async
    lock = self.generate_lock(self.name, *args, **kwargs)
TypeError: generate_lock() argument after ** must be a mapping, not NoneType

It seems that None needs to be converted to an empty dictionary first. Perhaps args would need to be converted as well.

Support for celery send_task()

Hi,
Does this support tasks invocation via send_task()?
Use-case: I have remote workers and we use send_task to start the tasks

Doesn't work with bind=True and self arg

I get TypeError missing a required argument: 'user_pk', where user_pk is the last argument of the task:

@shared_task(bind=True, base=Singleton, unique_on=["some_pk"], lock_expiry=15 * 60, acks_late=True)
def do_stuff(self, some_pk, user_pk):
    # ...

The bug is in:

celery_singleton/singleton.py in generate_lock at line 64
            bound = sig.bind(*task_args, **task_kwargs).arguments

Is singleton working for 'send_task' method?

Hi,

Hope you are doing good.

I am using send_task('task_name',args) to send celery tasks. When define the celery task i also added the base=singleton parameter, but seems it is not working.

I saw from the doc we suggested to use delay or apply_async method but in my case I will need to use send_task method since it passes the task name. So do we really support send_task method?

Thanks,
Victor

Queue assignment via decorator apply_async doesn't work

Hi, small other issue here but the apply_async function in the custom task decorator doesn't properly apply routing if a queue is defined. Tested and verified locally. I believe this could be as simple as adding a queue=None to the apply_async object method.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.