Skip to main content

amqp-worker is a Python-based multi-threaded RabbitMQ consumer framework

Project description

🐰amqp-worker

English | 简体中文

amqp-worker is a Python-based multi-threaded RabbitMQ consumer framework. It allows you to consume messages more efficiently and stably.

Features

  • Batch consumption: process messages in batches, improve consumption efficiency.
  • Automatic reconnection: when RabbitMQ service disconnects, amqp-worker will automatically reconnect, ensuring uninterrupted consumption.
  • Customizable consumption mode: freely decide to use multi-threading and coroutines in the consumption function.
  • Configurable message acknowledgment mode: support automatic acknowledgment and manual acknowledgment modes, configure according to your consumption needs.
  • Configurable exception handling: support global configuration of message exception consumption mode, re-enter queue, re-insert, consume message.

Installation

You can use pip tool to install amqp-worker:

pip install amqp-workers

Usage

First, you need to import the amqp_worker module in your Python code:

from amqpworker.app import App

Then, you need to instantiate an App object, and the App object depends on the AMQPConnection object:

from amqpworker.connections import AMQPConnection
amqp_conn = AMQPConnection(hostname='127.0.0.1', username='guest', password='guest', port=5672)

app = App(connections=[amqp_conn])

Next, you need to define the consumption function:

@app.amqp.consume(
    ['test'],
    options=AMQPRouteOptions(bulk_size=1024 * 8, bulk_flush_interval=2)
)
def _handler(msgs: List[RabbitMQMessage]):
    print(f"Recv {len(msgs)} {datetime.now().isoformat()}")

In the above code we give the consumption function a decorator, giving the consumption queue, the number of consumption per batch, it is worth noting that the parameter type of the consumption function is List[RabbitMQMessage]

Finally, just call the run method to start consuming:

app.run()

Example code

Below is a simple example code that will consume messages from a queue named test:

from datetime import datetime
from typing import List

from amqpworker.app import App
from amqpworker.connections import AMQPConnection
from amqpworker.rabbitmq import RabbitMQMessage
from amqpworker.routes import AMQPRouteOptions

amqp_conn = AMQPConnection(hostname='127.0.0.1', username='guest', password='guest', port=5672)
app = App(connections=[amqp_conn])

@app.amqp.consume(
    ['test'],
    options=AMQPRouteOptions(bulk_size=1024 * 8, bulk_flush_interval=2)
)
def _handler(msgs: List[RabbitMQMessage]):
    print(f"Recv {len(msgs)} {datetime.now().isoformat()}")

app.run()

Contributors

License

amqp-worker uses MIT license. Please refer to LICENSE file for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

amqp_workers-0.1.5.tar.gz (20.2 kB view hashes)

Uploaded Source

Built Distribution

amqp_workers-0.1.5-py3-none-any.whl (28.5 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page