Git Product home page Git Product logo

kuyruk's People

Contributors

bitdeli-chef avatar cenkalti avatar frol avatar h4yfans avatar khorolets avatar muraty avatar refik avatar ybrs avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

kuyruk's Issues

[AMQP deprecation] The .frame_writer attribute on the connection was accessed before the connection was established

Please, resolve AMQP deprecation warning:

/usr/lib/python3.5/site-packages/amqp/connection.py:304: AMQPDeprecationWarning: The .frame_writer attribute on the connection was accessed before
the connection was established.  This is supported for now, but will
be deprecated in amqp 2.2.0.

Since amqp 2.0 you have to explicitly call Connection.connect()
before using the connection.

  W_FORCE_CONNECT.format(attr=attr)))

Check if a worker already launched

Hi,

First, thanks for your work !
I used it with success in my Python project but I'm facing a problem. I need to have a control of the worker (See if one worker is already launched for exemple, to avoid launch two worker). Is there any possibilities to do that ?

Run multiple worker

Hi,

I found in an old document (v3.0-beta) and I found a Master subcommand. If I understand it well, it could runs multiple worker at the same time.

Is there an option like this in the latest version of Kuyruk ?

Kuyruk inactive after python sys.exit

Hi,

In my python program I have a sys.exit() in some case. When the sys.exit() is reached, my service (who runs kuyruk) became inactive. Is it normal ? Is there any way to keep the service active but exit the python script ?

Core dumped

Hi,

Some users of my project (https://github.com/edissyum/opencaptureforinvoices/) have the following error while runing kuyruk 👍

opt/OpenCaptureForInvoices/bin/scripts/service_workerOC.sh : ligne 24 : 23377 Instruction non permise (core dumped) /usr/local/bin/kuyruk --app bin.src.main.OCforInvoices_worker worker --queue invoices

This error happen just by starting the worker.. Any ideas ?

Thanks in advance

Batch tasks sending

As far as I can see from the code and from my experiments, Kuyruk creates a new connection to send every single task. It easily gets a bottleneck when you generate a list of tasks in a loop:

import tasks

for i in range(1000):
    tasks.echo("Hello Kuyruk %d" % i)

It seems that if I would be able to pass an external "channel" to the Task.send_to_queue, this would enable the way to reuse the existing connection:

import tasks

with tasks.echo.kuyruk.channel() as channel:
    for i in range(1000):
        tasks.echo.send_to_queue(args=("Hello Kuyruk %d" % i, ), channel=channel)

Ideally, it would be great to have a reusable connection (with auto-reconnection on connection errors) by default.

Currently, I workaround this limitation using Dask by running the tasks in threads:

import dask.bag

import tasks

indexes = dask.bag.from_sequence(range(1000))

indexes.map(lambda i: tasks.echo("Hello Kuyruk %d" % i)).compute()

Having a ping to RabbitMQ server at around 18ms, I can only schedule ~2 tasks/second with a simple loop, ~37 tasks/second using Dask, ~40 tasks/second when I reuse the channel (manually patched the send_to_queue), and ~238 tasks/second when I combine Dask with reusable connection:

import dask.bag

import tasks

indexes = dask.bag.from_sequence(range(1000), npartitions=40)

def send_in_batch(indexes):
    with tasks.echo.kuyruk.channel() as channel:
        for i in indexes:
            tasks.echo.send_to_queue(args=["Hello Kuyruk %d" % i], ch=channel)

indexes.map_partitions(send_in_batch).compute()

Another approach might be also valid, define a helper for batch scheduling, so the API would look something like this:

kuyruk.send_tasks(
    (task.echo, ["Hello Kuyruk %d" % i]) \
        for i in range(1000)
)

or add a helper method on the Task (I am not a fan of Celery, but it is how they handle this):

kuyruk.send_tasks(
    (task.echo.subtask("Hello Kuyruk %d" % i) \
        for i in range(1000)
)

It would be great to hear other thoughts on this issue.

about forking child processes

continuing discussion started at #30

as far as i understand worker's master process is spawning child Threads with start_daemon_thread and kills if a task processed more than X seconds. ( please correct me if i am wrong about the implementation )

what about this,

i. master starts a real child processes with subprocess.Popen(['/bin/python...
ii. child and master process communicates through a channel - maybe popen.communicate or mmap or something else. child sends a message when gets a message from rabbitmq, starts processing, when processed sends a done message.
iii. master sends a sigkill if that message is processed more than X seconds.

in this way, master and child are loosely coupled, so we wont need any fork, load config etc. implementations, also workers not need to be implemented in python at all, we can have workers in go, c etc.. also one can sidestep supervisor process and use another supervisor like supervisord, god etc. if he doesnt care about worker's processing time.

i believe uwsgi has some similar implementation btw, it connects to child processes with mmap and doesnt fork itself.

i'm curious about what you guys think about such implementation ?

Windows support

Trying to start Kuyruk on Windows end up with an error due to the missing signals support on Windows:

Traceback (most recent call last):
  File "c:\project\tasks\app\run.py", line 26, in run
    app.run()
  File "c:\Anaconda\Win64\envs\env\lib\site-packages\kuyruk\worker.py",
ine 97, in run
    signal.signal(signal.SIGHUP, self._handle_sighup)
AttributeError: module 'signal' has no attribute 'SIGHUP'

I am going to try comment out all the signals usage, but even if that works, I believe, it will miss quite a few features... Any ideas on how to work this around for Windows?

P.S. Is Windows support something that you are interested in?

Logging level

Hi,

I want to know if it's possible to modify the logging level ? I'm using python logging lib in my project and all the log are shown into the kuyruk worker.

I want to display only the error log into kuyruk, is it possible ?

Thanks

Kuyruk worker won't stop

Hi,

I made a Python project (https://gitlab.com/edissyum/opencapture/opencaptureformaarch) and I use Kuyruk to enqeue process.

To add it run on startup I create a system service; here is the service file on Debian :

`[Unit]
Description=Daemon for OpenCapture for Maarch

[Service]
Type=simple

User=edissyum
Group=edissyum
UMask=0022

ExecStart=/opt/maarch/OpenCapture/scripts/service.sh

Restart=on-failure

TimeoutStopSec=300

[Install]
WantedBy=multi-user.target

`

I had an issue, where the process aren't processed by Kuyruk. I didn't have any logs about an error and also, I can't stop the service. On the syslog I just had :

Nov 26 14:52:01 sauvegarde service.sh[2062]: W kuyruk.worker.shutdown:363 - Shutdown requested

Do you know where I can find any idea why Kuyruk doesn't handle the process ?

EDIT : After a long time, the service stopped and when I restart it, all the enqueued process are handled. Why I need to restart it ?

Thanks in advance

Kuyruk stopped working after X processes

Hi,

If I send a lot of process in my Kuyruk worker, at some points it will block. There is no error in the kuyruk command /home/nathan/miniconda3/envs/OC/bin/kuyruk --app src.main.OCforMaarch worker but it doesn't take new job

I check the RABBITMQ logs, no errors at all

Do you have any ideas where I can check to find the issue ?

thanks

AttributeError : 'function' object has no attribute 'apply'

I have an issue with the python worker Kuyruk. Here is the way I call it :

from kuyruk import Kuyruk

OCforInvoices = Kuyruk()

@OCforInvoices.task()
def launch(args):
    # DO SOMETHING

Then I launch the following command to start the worker :

kuyruk --app bin.src.main.OCforInvoices worker

worker is the reference for the worker.py file who handles the arguments when launching the command.

So, when the worker is launched I used the following command to enqueue in worker :

python3 worker.py -c config.ini -f file.pdf

After that Kuyruk return me the following error :


I kuyruk.kuyruk.connection:85 - Connected to RabbitMQ
I kuyruk.kuyruk.channel:60 - Opened new channel
I kuyruk.worker._consume_messages:125 - Consumer started
I kuyruk.worker._process_message:189 - Processing task: {'id': '6f050168f72111e9a4aa00e04c68cd8c', 'args': [{'path': '/home/nathan/PycharmProjects/oc_invoices_flask/instance/upload/', 'config': '/home/nathan/PycharmProjects/oc_invoices_flask/instance/config.ini'}], 'kwargs': {}, 'module': 'bin.src.main', 'function': 'launch', 'sender_hostname': 'nathan', 'sender_pid': 508, 'sender_cmd': 'python3 worker.py -c config.ini -f file.pdf', 'sender_timestamp': '2019-10-25T12:17:38'}

E kuyruk.worker._process_task:248 - Task raised an exception
E kuyruk.worker._process_task:250 - Traceback (most recent call last):
  File "/home/nathan/miniconda3/envs/OC/lib/python3.7/site-packages/kuyruk/worker.py", line 301, in _apply_task
    return task.apply(*args, **kwargs)
AttributeError: 'function' object has no attribute 'apply'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/nathan/miniconda3/envs/OC/lib/python3.7/site-packages/kuyruk/worker.py", line 223, in _process_task
    result = self._run_task(message.channel.connection, task, args, kwargs)
  File "/home/nathan/miniconda3/envs/OC/lib/python3.7/site-packages/kuyruk/worker.py", line 279, in _run_task
    return self._apply_task(task, args, kwargs)
  File "/home/nathan/miniconda3/envs/OC/lib/python3.7/site-packages/kuyruk/worker.py", line 304, in _apply_task
    logger.info("%s finished in %i seconds." % (task.name, delta))
AttributeError: 'function' object has no attribute 'name'

Any ideas ?

Thanks in advance

SSL/TLS support

Hi Cenk , thanks for a great little project. Would you please also enable ssl on the AMQP connections ? it literally a 2 line change
RABBIT_SSL = TRUE in config

ssl=self.config.RABBIT_SSL in amqp.connection.

Best Regards

Weird behavior with two Kuyruk workers

Hi,
On my project I created two worker.
On the main.py, I have the @OCforInvoices.task decorator
On the main_splitter.py I have the @OCforInvoices_Sep.task decorator

When I launch a process using main.py, is the OCforInvoices worker is busy, it will use OCforInvoices_Sep and vice versa. Is that normal ?

How could I avoid that ?

Tasks prefetching

What is the recommended way of dealing with a heap of tasks?

It seems like there is a hard-coded 1-second delay somewhere (please, notice the logging timestamps between "task is finished" and "task is successful")...

2016-10-18 14:07:33,638 [INFO] [kuyruk.worker] Processing task: {'function': 'extract_dataset_info', 'sender_cmd': 'client.py', 'id': 'fe7f44be953b11e692320242ac110012', 'queue': 'kuyruk', 'sender_hostname': 'ab1b9d5de6b2', 'module': 'common', 'args': ['qq'], 'sender_pid': 110, 'sender_timestamp': '2016-10-18T14:05:58', 'kwargs': {}}
HEy!
2016-10-18 14:07:33,639 [INFO] [kuyruk.task] common:extract_dataset_info finished in 0 seconds.
2016-10-18 14:07:34,640 [INFO] [kuyruk.worker] Task is successful
2016-10-18 14:07:34,643 [INFO] [kuyruk.worker] Processing task: {'function': 'extract_dataset_info', 'sender_cmd': 'client.py', 'id': 'fe813d8a953b11e6ab700242ac110012', 'queue': 'kuyruk', 'sender_hostname': 'ab1b9d5de6b2', 'module': 'common', 'args': ['qq'], 'sender_pid': 110, 'sender_timestamp': '2016-10-18T14:05:58', 'kwargs': {}}
HEy!
2016-10-18 14:07:34,644 [INFO] [kuyruk.task] common:extract_dataset_info finished in 0 seconds.
2016-10-18 14:07:35,645 [INFO] [kuyruk.worker] Task is successful
2016-10-18 14:07:35,647 [INFO] [kuyruk.worker] Processing task: {'function': 'extract_dataset_info', 'sender_cmd': 'client.py', 'id': 'fe8382b0953b11e6aca90242ac110012', 'queue': 'kuyruk', 'sender_hostname': 'ab1b9d5de6b2', 'module': 'common', 'args': ['qq'], 'sender_pid': 110, 'sender_timestamp': '2016-10-18T14:05:58', 'kwargs': {}}
HEy!
2016-10-18 14:07:35,649 [INFO] [kuyruk.task] common:extract_dataset_info finished in 0 seconds.
2016-10-18 14:07:36,650 [INFO] [kuyruk.worker] Task is successful
2016-10-18 14:07:36,652 [INFO] [kuyruk.worker] Processing task: {'function': 'extract_dataset_info', 'sender_cmd': 'client.py', 'id': 'fe85b51a953b11e685560242ac110012', 'queue': 'kuyruk', 'sender_hostname': 'ab1b9d5de6b2', 'module': 'common', 'args': ['qq'], 'sender_pid': 110, 'sender_timestamp': '2016-10-18T14:05:58', 'kwargs': {}}
HEy!
2016-10-18 14:07:36,653 [INFO] [kuyruk.task] common:extract_dataset_info finished in 0 seconds.
2016-10-18 14:07:37,654 [INFO] [kuyruk.worker] Task is successful

Auto-retry failing tasks with rescheduling

As a follow-up to #50, I want to share the code snippet I ended up using to implement infinite auto-retry for the crashing tasks with rescheduling the tasks back through RabbitMQ queue, which may help if the task will just get scheduled to a non-failing worker. I could not find a way to implement non-infinite auto-retry with rescheduling using RabbitMQ since you cannot update the existing task with some meta information (e.g. a number of failing attempts), but using some external service (e.g. Redis or Memcached), you can modify this code to implement any retry logic you want, so I leave it as an exercise to the reader to adapt the code snippet.

import functools
import logging

from kuyruk import Kuyruk as BaseKuyruk, Task


log = logging.getLogger(__name__)


def safe_kuyruk_task(func, retry_delay):
    """
    Args:
        func (callable): the wrapped function task.
        retry_delay (float): time in seconds between retries on crash.
    """
    @functools.wraps(func)
    def decorator(*args, **kwargs):
        try:
            return func(*args, **kwargs)
        except (Kuyruk.Discard, Kuyruk.Reject):
            raise
        except Exception:
            log.exception("Task has crashed... Retrying in %g seconds.", retry_delay)
            raise Kuyruk.Reject()

    return decorator


class Kuyruk(BaseKuyruk):

    def task(self, queue='kuyruk', retry_delay=60, **kwargs):
        """
        Wrap functions with this decorator to convert them to *tasks*.
        After wrapping, calling the function will send a message to
        a queue instead of running the function.

        Args:
            queue: Queue name for the tasks.
            retry_delay (float): time in seconds between retries on crash.
            kwargs: Keyword arguments will be passed to :class:`~kuyruk.Task`
                constructor.

        Returns:
            function: Callable :class:`~kuyruk.Task` object wrapping the
            original function.
        """
        def inner(func):
            return Task(
                    safe_kuyruk_task(func, retry_delay),
                    self,
                    queue,
                    reject_delay=retry_delay,
                    **kwargs)

        return inner


kuyruk = Kuyruk()

I had to override inner implementation of the Kuyruk class to pass my safe_kuyruk_task adapter, which enables the auto-retry logic for all the tasks automatically (thus, no changes to the existing tasks codebase is needed except this change of Kuyruk initialization).

@cenkalti I couldn't find a good way to generalize this thing, so I won't send a PR, but I decided to share my code since other may wonder how to do that. It would be great to see some upstream solution to this problem either in form of some built-in implementation or just as an example/documentation somewhere.

Werkzeug package issue when running worker

Hi,

I have the following error when I'm running /usr/local/bin/kuyruk --app src.main.OCforMaarch worker

Traceback (most recent call last):
File "/usr/local/bin/kuyruk", line 6, in
from kuyruk.main import main
File "/usr/local/lib/python3.7/dist-packages/kuyruk/init.py", line 4, in
from kuyruk.kuyruk import Kuyruk
File "/usr/local/lib/python3.7/dist-packages/kuyruk/kuyruk.py", line 10, in
from kuyruk.config import Config
File "/usr/local/lib/python3.7/dist-packages/kuyruk/config.py", line 126, in
for k, v in entry_point.load().items():
File "/usr/lib/python3/dist-packages/pkg_resources/init.py", line 2410, in load
self.require(*args, **kwargs)
File "/usr/lib/python3/dist-packages/pkg_resources/init.py", line 2433, in require
items = working_set.resolve(reqs, env, installer, extras=self.extras)
File "/usr/lib/python3/dist-packages/pkg_resources/init.py", line 786, in resolve
raise DistributionNotFound(req, requirers)
pkg_resources.DistributionNotFound: The 'Werkzeug>=0.14' distribution was not found and is required by Flask

I'm running Ubuntu 19.04

When I do pip3 freeze I see for Werkzeug and kuyruk :

Kuyruk==9.1.0
Kuyruk-Manager==1.5.2
Werkzeug==0.15.2

Add a delay between retries when task crashes

Currently, it seems that there is no delay at all between retries when my task raises an exception. Sometimes bugs occur in the tasks logic and they simply raise an exception. I don't want to lose any of the tasks, so I went trying to set retries=2**16, but it retries right away.

What would be the recommended way of handling this use-case?

Catching all exceptions inside the tasks and sleep doesn't make a lot of sense to me as it will be wasting time sleeping instead of processing other tasks...

Reject/Fail delays are not respected

@cenkalti We have just realized that the implementation for #49 is buggy.

Here is a simple reproduction:

import logging
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(threadName)s %(message)s')

from kuyruk import Kuyruk

kuyruk = Kuyruk()

@kuyruk.task(retry=5, fail_delay=10000, reject_delay=10000)
def echo(message):
    logging.info("ECHO: %s", message)
    if message == 'raise an exception':
        raise Exception()

Here is how I run it:

>>> import tasks
>>> tasks.echo('raise an exception')

And here is the output (notice that all the retries happen immediately with no delays and the traceback is printed at the very end after retries exceeded):

2017-03-13 15:03:02,510 MainThread Start from server, version: 0.9, properties: {'platform': 'Erlang/OTP', 'product': 'RabbitMQ', 'information': 'Licensed under the MPL.  See http://www.rabbitmq.com/', 'cluster_name': 'rabbit@211f2b25e5f9', 'copyright': 'Copyright (C) 2007-2016 Pivotal Software, Inc.', 'capabilities': {'consumer_cancel_notify': True, 'direct_reply_to': True, 'publisher_confirms': True, 'consumer_priorities': True, 'basic.nack': True, 'connection.blocked': True, 'authentication_failure_close': True, 'per_consumer_qos': True, 'exchange_exchange_bindings': True}, 'version': '3.6.6'}, mechanisms: ['PLAIN', 'AMQPLAIN'], locales: ['en_US']
2017-03-13 15:03:02,511 MainThread Open OK!
2017-03-13 15:03:02,511 MainThread Connected to RabbitMQ
2017-03-13 15:03:02,511 MainThread using channel_id: 1
2017-03-13 15:03:02,512 MainThread Channel open
2017-03-13 15:03:02,513 MainThread Opened new channel
2017-03-13 15:03:02,513 MainThread queue_declare: kuyruk
2017-03-13 15:03:02,514 MainThread basic_consume: kuyruk
2017-03-13 15:03:02,514 MainThread Consumer started

2017-03-13 15:03:10,875 MainThread Processing task: {'sender_cmd': '/mnt/storage/miniconda3/bin/ipython', 'id': '6903605007ed11e7a58928b2bd03b9d9', 'args': ['raise an exception'], 'module': 'tasks', 'sender_timestamp': '2017-03-13T13:03:10', 'function': 'echo', 'sender_pid': 26218, 'sender_hostname': 'redatopa', 'kwargs': {}}
2017-03-13 15:03:10,875 MainThread Importing module: tasks
2017-03-13 15:03:10,875 MainThread Applying <Task of 'tasks:echo'>, args=('raise an exception',), kwargs={}
2017-03-13 15:03:10,876 MainThread ECHO: raise an exception
2017-03-13 15:03:10,876 MainThread ECHO: raise an exception
2017-03-13 15:03:10,876 MainThread ECHO: raise an exception
2017-03-13 15:03:10,876 MainThread ECHO: raise an exception
2017-03-13 15:03:10,876 MainThread ECHO: raise an exception
2017-03-13 15:03:10,876 MainThread ECHO: raise an exception
2017-03-13 15:03:10,877 MainThread tasks:echo finished in 0 seconds.
2017-03-13 15:03:10,877 MainThread Task raised an exception
2017-03-13 15:03:10,877 MainThread Traceback (most recent call last):
  File "/mnt/storage/miniconda3/lib/python3.5/site-packages/kuyruk/worker.py", line 217, in _process_task
    task, args, kwargs)
  File "/mnt/storage/miniconda3/lib/python3.5/site-packages/kuyruk/worker.py", line 266, in _run_task
    return self._apply_task(task, args, kwargs)
  File "/mnt/storage/miniconda3/lib/python3.5/site-packages/kuyruk/worker.py", line 288, in _apply_task
    return task.apply(*args, **kwargs)
  File "/mnt/storage/miniconda3/lib/python3.5/site-packages/kuyruk/task.py", line 166, in apply
    return self.f(*args, **kwargs)
  File "/mnt/storage/experiments/tasks.py", line 13, in echo
    raise Exception()
Exception

2017-03-13 15:03:10,878 MainThread Task is processed

/cc @khorolets

class tasks

class methods should be able to be wrapped with task decoraor

Kuyruk queue with variable name

Hi,

I'm currently making a web app that can load different configuration based on the URL. I wonder if I can use Kuyruk to specify a queue for each instance.

For exemple for this URL : http://localhost/edissyum/dist/ --> each process I want to queue will use the task with queue id edissyum

The problem is, it could be edissyum or something else, it has to be dynamic, is there any solution I can imagine with Kuyruk ? Like declare my decorator and then change the queue name maybe ?

Thanks in advance

Tune performance

I am very interested in getting maximum available performance out of RabbitMQ as I have a need to schedule a heap of tasks (I want to handle 100k tasks in a reasonable time, a few minutes). Given my old experiments reported in #51 (238 tasks/second), it will take ~400 seconds (~7 minutes) to schedule 100k tasks.

I think the current bottleneck is in amqp library, and it seems that rabbitmq will work better as it implemented CFFI bindings to librabbitmq native library.

This issue is just a placeholder for the related experiments, yet if it is doable, it is better to optimize it sooner rather later.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.