Skip to main content

High reliability asynchronous queue using mysql(lock)

Project description

jasyncq

PyPI version

Asynchronous task queue using mysql

You should know

  • Dispatcher's fetch_scheduled_tasks and fetch_pending_tasks method takes scheduled job and concurrently update their status as WORK IN PROGRESS in same transaction
  • Most of tasks that queued in jasyncq would run in exactly once by fetch_scheduled_tasks BUT, some cases job disappeared because of worker shutdown while working. It could be restored by fetch_pending_tasks (that can check how long worker tolerate WIP-ed but not Completed(deleted row))

How to use

1. Create aiomysql connection pool

import asyncio
import logging

import aiomysql

loop = asyncio.get_event_loop()

pool = await aiomysql.create_pool(
    host='127.0.0.1',
    port=3306,
    user='root',
    db='test',
    loop=loop,
    autocommit=False,
)

2. Generate topic (table) with initialize and inject repository to dispatcher

from jasyncq.dispatcher.tasks import TasksDispatcher
from jasyncq.repository.tasks import TaskRepository

repository = TaskRepository(pool=pool, topic_name='test_topic')
await repository.initialize()
dispatcher = TasksDispatcher(repository=repository)

3. Enjoy queue

  • Publish tasks
await dispatcher.apply_tasks(
    tasks=[...list of jasyncq.dispatcher.model.task.TaskIn...],
)
  • Consume tasks
scheduled_tasks = await dispatcher.fetch_scheduled_tasks(queue_name='QUEUE_TEST', limit=10)
pending_tasks = await dispatcher.fetch_pending_tasks(
    queue_name='QUEUE_TEST',
    limit=10,
    check_term_seconds=60,
)
tasks = [*pending_tasks, *scheduled_tasks]
# ...RUN JOBS WITH tasks

4. Complete tasks

task_ids = [str(task.uuid) for task in tasks]
await dispatcher.complete_tasks(task_ids=task_ids)

Other features

Apply tasks with dependency

genesis = TaskIn(task={}, queue_name=queue_name)
dependent = TaskIn(task={}, queue_name=queue_name, depend_on=task.uuid)
# 'dependent' task might fetched after 'genesis' task is completed
await dispatcher.apply_tasks(tasks=[genesis, dependent])

Apply delayed task(scheduled task)

scheduled_at = time.time() + 60
task = TaskIn(task={}, queue_name=queue_name, scheduled_at=scheduled_at)
# 'task' task might fetched after 60 seconds from now
await dispatcher.apply_tasks(tasks=[task])

Apply urgent task (priority)

normal = TaskIn(task={}, queue_name=queue_name)
urgent = TaskIn(task={}, queue_name=queue_name, is_urgent=True)
# 'urgent' task might fetched earlier than 'normal' task if queue was already fulled
await dispatcher.apply_tasks(tasks=[normal, urgent])

Fetching with ignoring dependency

scheduled_tasks = await dispatcher.fetch_scheduled_tasks(
    queue_name='QUEUE_TEST',
    limit=10,
    ignore_dependency=True,
)
pending_tasks = await dispatcher.fetch_pending_tasks(
    queue_name='QUEUE_TEST',
    limit=10,
    check_term_seconds=60,
    ignore_dependency=True,
)
tasks = [*pending_tasks, *scheduled_tasks]
# ...RUN JOBS WITH tasks

Example

  • Consumer: /example/consumer.py
  • Producer: /example/producer.py

Run example scripts

$ docker run --name test_db -p 3306:3306 -e MYSQL_ALLOW_EMPTY_PASSWORD=true -d mysql:8.0.17
$ docker exec -it test_db bash -c 'mysql -u root -e "create database test;"'
$ python3 -m example.producer
$ python3 -m example.consumer

Build

$ python3 setup.py sdist
$ python3 -m pip install ./dist/jasyncq-*

Deploy

$ twine upload ./dist/jasyncq-{version}.tar.gz

Test

$ docker run --name test_db -p 3306:3306 -e MYSQL_ALLOW_EMPTY_PASSWORD=true -d mysql:8.0.17
$ docker exec -it test_db bash -c 'mysql -u root -e "create database test;"'
$ python3 -m pip install pytest==6.2.3
$ pytest

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

jasyncq-1.1.2.tar.gz (9.1 kB view hashes)

Uploaded Source

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page