Git Product home page Git Product logo

jasyncq's Introduction

jasyncq

PyPI version

Asynchronous task queue using mysql

You should know

  • Dispatcher's fetch_scheduled_tasks and fetch_pending_tasks method takes scheduled job and concurrently update their status as WORK IN PROGRESS in same transaction
  • Most of tasks that queued in jasyncq would run in exactly once by fetch_scheduled_tasks BUT, some cases job disappeared because of worker shutdown while working. It could be restored by fetch_pending_tasks (that can check how long worker tolerate WIP-ed but not Completed(deleted row))

How to use

1. Create aiomysql connection pool

import asyncio
import logging

import aiomysql

loop = asyncio.get_event_loop()

pool = await aiomysql.create_pool(
    host='127.0.0.1',
    port=3306,
    user='root',
    db='test',
    loop=loop,
    autocommit=False,
)

2. Generate topic (table) with initialize and inject repository to dispatcher

from jasyncq.dispatcher.tasks import TasksDispatcher
from jasyncq.repository.tasks import TaskRepository

repository = TaskRepository(pool=pool, topic_name='test_topic')
await repository.initialize()
dispatcher = TasksDispatcher(repository=repository)

3. Enjoy queue

  • Publish tasks
await dispatcher.apply_tasks(
    tasks=[...list of jasyncq.dispatcher.model.task.TaskIn...],
)
  • Consume tasks
scheduled_tasks = await dispatcher.fetch_scheduled_tasks(queue_name='QUEUE_TEST', limit=10)
pending_tasks = await dispatcher.fetch_pending_tasks(
    queue_name='QUEUE_TEST',
    limit=10,
    check_term_seconds=60,
)
tasks = [*pending_tasks, *scheduled_tasks]
# ...RUN JOBS WITH tasks

4. Complete tasks

task_ids = [str(task.uuid) for task in tasks]
await dispatcher.complete_tasks(task_ids=task_ids)

Other features

Apply tasks with dependency

genesis = TaskIn(task={}, queue_name=queue_name)
dependent = TaskIn(task={}, queue_name=queue_name, depend_on=task.uuid)
# 'dependent' task might fetched after 'genesis' task is completed
await dispatcher.apply_tasks(tasks=[genesis, dependent])

Apply delayed task(scheduled task)

scheduled_at = time.time() + 60
task = TaskIn(task={}, queue_name=queue_name, scheduled_at=scheduled_at)
# 'task' task might fetched after 60 seconds from now
await dispatcher.apply_tasks(tasks=[task])

Apply urgent task (priority)

normal = TaskIn(task={}, queue_name=queue_name)
urgent = TaskIn(task={}, queue_name=queue_name, is_urgent=True)
# 'urgent' task might fetched earlier than 'normal' task if queue was already fulled
await dispatcher.apply_tasks(tasks=[normal, urgent])

Fetching with ignoring dependency

scheduled_tasks = await dispatcher.fetch_scheduled_tasks(
    queue_name='QUEUE_TEST',
    limit=10,
    ignore_dependency=True,
)
pending_tasks = await dispatcher.fetch_pending_tasks(
    queue_name='QUEUE_TEST',
    limit=10,
    check_term_seconds=60,
    ignore_dependency=True,
)
tasks = [*pending_tasks, *scheduled_tasks]
# ...RUN JOBS WITH tasks

Example

  • Consumer: /example/consumer.py
  • Producer: /example/producer.py

Run example scripts

$ docker run --name test_db -p 3306:3306 -e MYSQL_ALLOW_EMPTY_PASSWORD=true -d mysql:8.0.17
$ docker exec -it test_db bash -c 'mysql -u root -e "create database test;"'
$ python3 -m example.producer
$ python3 -m example.consumer

Build

$ python3 setup.py sdist
$ python3 -m pip install ./dist/jasyncq-*

Deploy

$ twine upload ./dist/jasyncq-{version}.tar.gz

Test

$ docker run --name test_db -p 3306:3306 -e MYSQL_ALLOW_EMPTY_PASSWORD=true -d mysql:8.0.17
$ docker exec -it test_db bash -c 'mysql -u root -e "create database test;"'
$ python3 -m pip install pytest==6.2.3
$ pytest

jasyncq's People

Contributors

pjongy avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.