node-rdkafka-commit-manager
A simple helper for controlling when an offset is ready to be committed via node-rdkafka.
Goals
This package is intended to help you implement at-least-once processing without making a network call for every message you process.
During frequent use, the commit manager will commit offsets only as often as a configurable commit interval.
During infrequent use, the commit manager will always immediately commit if it has seen a period of inactivity exceeding the configurable commit interval.
useCommitManager
The commit manager is exposed via a hook function (you don't need to be using React in order to use this.)
const { readyToCommit, onRebalance } = useCommitManager(consumer, commitIntervalMs);
Arguments
- consumer: this must be a KafkaConsumer object from node-rdkafka. It must be configured with auto-commit turned off.
- commitIntervalMs: the number of milliseconds the commit manager should wait between commits.
- optional: defaults to 5000
Returned Functions
- readyToCommit(data)
- data: This must be an object containing the properties "topic", "partition", and "offset" from the Kafka mesage.
- Call this whenever you are finished with a Kafka message.
- onRebalance()
- Call this whenever your KafkaConsumer has its partitions revoked.
Usage Example
This package can be used from JavaScript or TypeScript.
The examples below illustrate a few key points about using this commit manager:
-
Disable auto-commit feature on your KafkaConsumer.
- Otherwise, the commit manager will be competing with node-rdkafka's auto-commit behavior.
-
Use non-flowing mode on your KafkaConsumer.
- Otherwise, node-rdkafka will provide batches of messages, which may be handled out of order.
- Since non-flowing mode only allows for one message at a time, it will significantly slow the rate at which a single consumer can process messages, so additional horizontal scaling is necessary to compensate.
-
Implement a rebalance callback function which calls the the commit manager's onRebalance function any time your KafkaConsumer's partitions are revoked.
-
Otherwise, the commit manager may later try to commit offsets for partitions which it is no longer assigned.
-
The above example covers the minimum responsibilities of the function. See the node-rdkafka and/or librdkafka documentation for more details.
-
-
Call the commit manager's readyToCommit function for each Kafka message you process.
- Only call readyToCommit when you have finished processing the message.
- Since the data objects provided by node-rdkafka's KafkaConsumer already have all of the necessary properties, you can just use those if you want to.
TypeScript
import { useCommitManager } from "node-rdkafka-commit-manager";import { CODES, KafkaConsumer } from "node-rdkafka"; const consumeNonFlowing = (consumer, consumeTimeout) => { consumer.consume(1); return setInterval(function() { consumer.consume(1); }, CONSUME_TIMEOUT_MS);}; const stopConsuming = (consumeInterval) => { if (consumeInterval) { clearInterval(consumeInterval); }}; const rebalanceCallback = async (err: any, assignments: any) => { if (err.code === CODES.ERRORS.ERR__ASSIGN_PARTITIONS) { consumer.assign(assignments); } else if (err.code === CODES.ERRORS.ERR__REVOKE_PARTITIONS) { consumer.unassign(); onRebalance(); } else { console.error(`Kafka rebalance error : ${err}`); }}; const consumer = new KafkaConsumer( { "enable.auto.commit": false, rebalance_cb: rebalanceCallback.bind(this), // <Your global config here (ex: authentication, consumer group, etc.)> }, { // <Your topic config here> }); const { readyToCommit, onRebalance } = useCommitManager(consumer);const CONSUME_TIMEOUT_MS = 1000;let consumeInterval: NodeJS.Timeout;consumer .on("ready", function() { consumer.subscribe(["sample.test.topic"]); consumer.setDefaultConsumeTimeout(CONSUME_TIMEOUT_MS); consumeInterval = consumeNonFlowing(consumer, CONSUME_TIMEOUT_MS); }) .on("data", function(data: any) { stopConsuming(consumeInterval); // <Process the Kafka message here.> readyToCommit(data); consumeInterval = consumeNonFlowing(consumer, CONSUME_TIMEOUT_MS); }) .connect();
JavaScript
const { useCommitManager } = require("node-rdkafka-commit-manager");const { CODES, KafkaConsumer } = require("node-rdkafka"); const consumeNonFlowing = (consumer, consumeTimeout) => { consumer.consume(1); return setInterval(function() { consumer.consume(1); }, CONSUME_TIMEOUT_MS);}; const stopConsuming = (consumeInterval) => { if (consumeInterval) { clearInterval(consumeInterval); }}; const rebalanceCallback = async (err, assignments) => { if (err.code === CODES.ERRORS.ERR__ASSIGN_PARTITIONS) { consumer.assign(assignments); } else if (err.code === CODES.ERRORS.ERR__REVOKE_PARTITIONS) { consumer.unassign(); onRebalance(); } else { console.error(`Kafka rebalance error : ${err}`); }}; const consumer = new KafkaConsumer( { "enable.auto.commit": false, rebalance_cb: rebalanceCallback.bind(this), // <Your global config here (ex: authentication, consumer group, etc.)> }, { // <Your topic config here> }); const { readyToCommit, onRebalance } = useCommitManager(consumer);const CONSUME_TIMEOUT_MS = 1000;let consumeInterval;consumer .on("ready", function() { consumer.subscribe(["sample.test.topic"]); consumer.setDefaultConsumeTimeout(CONSUME_TIMEOUT_MS); consumeInterval = consumeNonFlowing(consumer, CONSUME_TIMEOUT_MS); }) .on("data", function(data) { stopConsuming(consumeInterval); // <Process the Kafka message here.> readyToCommit(data); consumeInterval = consumeNonFlowing(consumer, CONSUME_TIMEOUT_MS); }) .connect();