Skip to main content

This package is a wrapper for REST API requests to Kafka Proxy.

Project description

Kafka REST API

Installation

If you are installing from a remote or public repository, run: pip install kafka-rest-api. If you are installing from a wheel file in the local directory, run: pip install {filename}.whl, and replace {filename} with the name of the .whl file.

Getting Started

Interactions with a Kafka cluster can be performed on a Producer/Consumer paradigm. As such there are two classes that can be imported and used to publish and subscribe to topics in Kafka: Producer and Consumer.

Configuration

When using this package to access Merck API Gateway, you can define the following environment variables:

  • KAFKA_REST_API_URL: Target Kafka REST API URL. In alternative, you can pass the argument kafka_rest_api_url to the Producer and the Consumer constructor.
  • X_API_KEY: The authorization token to validate API requests to API Gateway. In alternative, you can pass a dictionary with the format {"x-api-key": "your-api-key", "other-header-key": "other-header-value", etc...} to the key parameter auth_headers in both the Producer and the Consumer constructors.
  • TOPIC_ID: MSK Topic ID assigned to the user. In alternative, you can pass a string with the topic ID to the key argument topic_id in both the Producer and the Consumer constructors.

Producer

Produce json data

In the snippet below the topic pke is used as example. The pattern for the producer is the following:

from kafka_rest import Producer
producer = Producer()
new_keys = producer.produce(messages_to_pke_endpoint, "pke")

Please note that each message in the list of messages to the target endpoint should correspond to the payload that is expected by that endpoint that would otherwise be a JSON object.

For example, a valid message to the pke endpoint is:

{
  "text": "Genome engineering is a powerful tool.", 
  "algorithm": "TopicRank", 
  "n_best": 10
}

To know which message format you should use for each endpoint, please consult the documentation for NLP API.

The Producer.produce method automatically generates a unique key (UUID) for each message. Optionally, you can provide your unique keys as well, passing a list of keys (strings) to the argument keys.

Produce files

To produce files as inputs to a given endpoint, you can use the method produce_files. The required arguments for this method are:

  • files which consists of a list of absolute or relative paths to the input files;
  • endpoint target endpoint (pdf2text, for example);
from kafka_rest import Producer
producer = Producer()
new_keys = producer.produce_files(files=list_of_files, endpoint="pdf2text")

Consumer

Pattern 1 - Iterator

Arguably, the most useful way of consuming messages with the Consumer class is as follows:

from kafka_rest import Consumer
with Consumer() as consumer:
    for data, remaining_keys in consumer.consume(keys):
        print((data, remaining_keys))

Pattern 2 - Step-by-step instantiation (Chain)

You can also opt to do a step-by-step instantiation and have a finer control of each request sent by the Consumer to the NLP API:

from kafka_rest import Consumer
consumer = Consumer()
consumer.create()
consumer.subscribe()
# or consumer.create().subscribe().consume(keys)

for data, remaining_keys in consumer.consume(keys):
        print((data, remaining_keys))

consumer.delete()

Pattern 3 - Consume all

Optionally, the Consumer can just return when all keys were exhausted, i.e.: when all messages were consumed. For that, please use the consume_all method.

from kafka_rest import Consumer
with Consumer() as consumer:
    data = consumer.consume_all(keys)

Full example

Produce files as inputs and consume outputs.

from kafka_rest import Producer, Consumer

producer = Producer()
new_keys = producer.produce_files(["files/file1.pdf", "files/file2.pdf"], "pdf2text")

with Consumer() as consumer:
    for data, remaining_keys in enumerate(consumer.consume(new_keys)):
        if data:
            print(f"Data: {data} | Remaining Keys: {remaining_keys}")

For more snippets, please check the example in the file kafka_rest/snippets in this repo.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distribution

nlp_kafka_rest_api-1.0.0-py3-none-any.whl (9.5 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page