Python kafka consumer set offset

Saved searches

Use saved searches to filter your results more quickly

You signed in with another tab or window. Reload to refresh your session. You signed out in another tab or window. Reload to refresh your session. You switched accounts on another tab or window. Reload to refresh your session.

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How do I adjust my offset to consume from this point forward? #145

How do I adjust my offset to consume from this point forward? #145

Comments

So if I am a consumer, and I come to life and maybe I’ve been down for a long time, how can I set up a Consumer to subscribe to a given topic, with a given consumer group, and tell the broker, «advance my offset to the latest. I am not interested in anything produced up to right now, so don’t send me any stale data.»? I had thought that auto.offset.reset would do it, but this makes me think otherwise:
http://stackoverflow.com/questions/32390265/what-determines-kafka-consumer-offset
Given that my client is confluent-kafka-python, what can I do at init time to advance my offset to the present moment’s offset?

Читайте также:  Css hover after animation

The text was updated successfully, but these errors were encountered:

Источник

Kafka — Playing with Consumer API using python library

Firstly, lets get started with a sample code to produce a message.

import pickle from confluent_kafka import Producer my_dat = 'data that needs to be send to consumer' P.produce( 'my_topic', pickle.dumps(my_dat), callback=delivery_report, ) 

pickle is used to serialize the data, this is not necessary if you working with integers and string, however, when working with timestamps and complex objects, we have to serialize the data.
Note: The best practise is to use Apache Avro, which is highly used in combination with Kafka.

Create a consumer and consume data

Initialize a consumer, subscribe to topics, poll consumer until data found and consume.

from confluent_kafka import Consumer cfg =  'bootstrap.servers': '', 'group.id': '', 'auto.offset.reset': 'earliest', > C = Consumer(cfg) C.subscribe(['kafka-topic-1', 'kafka-topic-2', ]) for _ in range(10): msg = C.poll(0.05) if msg: dat =  'msg_value': msg.value(), # This is the actual content of the message 'msg_headers': msg.headers(), # Headers of the message 'msg_key': msg.key(), # Message Key 'msg_partition': msg.partition(), # Partition id from which the message was extracted 'msg_topic': msg.topic(), # Topic in which Producer posted the message to > print(dat) 

By default, consumer instances poll all the partitions of a topic, there is no need to poll each partition of topic to get the messages.
msg has a None value if poll method has no messages to return. Boolean check will help us to understand whether the poll to broker fetched message or not.
Valid message has not only data, it also has other functions which helps us to query or control the data.

Read from multiple partitions of different topics

  • Read from partition 1 of topic 1 starting with offset value 6
  • Read from partition 3 of topic 2 starting with offset value 5
  • Read from partition 2 of topic 1 starting with offset value 9
from confluent_kafka import Consumer, TopicPartition cfg =  'bootstrap.servers': '172.16.18.187:9092', 'group.id': 'hb-events-1', 'auto.offset.reset': 'earliest', > C = Consumer(cfg) C.assign( [ TopicPartition(topic='kafka-topic-1', partition=1, offset=36), TopicPartition(topic='kafka-topic-2', partition=3, offset=35), TopicPartition(topic='kafka-topic-1', partition=2, offset=39), ] ) no_msg_counter = 0 while True: msg = C.poll(0) if msg: no_msg_counter = 0 dat =  'msg_val': msg.value(), 'msg_partition': msg.partition(), 'msg_topic': msg.topic() > print(dat) elif no_msg_counter > 10000: print('No Messages Found from a long time') else: no_msg_counter += 1 C.close() 

When reading from a specific partition of a topic, assign is the best method to use instead of subscribe.
assign method accepts a list of TopicPartitions. TopicPartition is an instance which gets enrolled with one specific partition of a topic.

Rewind Topic(partition) offsets

C = Consumer(cfg) partition_topic_offsets = [ TopicPartition('kafka-topic-1', partition=1, offset=5), TopicPartition('kafka-topic-2', partition=3, offset=0), ] C.commit(offsets=partition_topic_offsets, async=False) C.close() C = Consumer(cfg) C.subscribe(['kafka-topic-1', 'kafka-topic-2', ]) no_msg_counter = 0 while True: msg = C.poll(0.05) if msg: no_msg_counter = 0 print(  'msg_value': msg.value(), 'partition': msg.partition(), 'headers': msg.headers(), 'key': msg.key(), 'offset': msg.offset(), > ) elif no_msg_counter > 10000: break else: no_msg_counter += 1 C.close() 

Irrespective of the current offset for the partition, we can rewind or reset the offset.
Reset or rewind offset values are set for a specific consumer groupid which was used to commit the offset, offsets of other consumer groups are unaffected

How we achieved this?

  • Create a list of TopicPartitions with the respective offset to reset
  • Commit these offsets to broker
  • When consumer subscribed to these topics poll, they get data from the recently set offset

Only retained messages are retrieved

  • Only message within the retention period are retrieved when you reset or rewind the offset.
  • If you lose or do not have a record of last successful offset, use OFFSET_BEGINNING, this will fetch data from the current beginning of the partition.

Use REST API

  • If you’re frequently running out of issues and want to rewind, it is advised to periodically record/fetch the last successful offset to a table which look similar to events.
  • How frequent should we record?, depends on the business case. Recording every offset involves DB call which may slow down the service.
  • Create a wrapper REST-API which can update the table values.
  • Modify consumer groups to get last offset from table.

Источник

Usage¶

There are many configuration options for the consumer class. See KafkaConsumer API documentation for more details.

KafkaProducer¶

from kafka import KafkaProducer from kafka.errors import KafkaError producer = KafkaProducer(bootstrap_servers=['broker1:1234']) # Asynchronous by default future = producer.send('my-topic', b'raw_bytes') # Block for 'synchronous' sends try: record_metadata = future.get(timeout=10) except KafkaError: # Decide what to do if produce request failed. log.exception() pass # Successful result returns assigned partition and offset print (record_metadata.topic) print (record_metadata.partition) print (record_metadata.offset) # produce keyed messages to enable hashed partitioning producer.send('my-topic', key=b'foo', value=b'bar') # encode objects via msgpack producer = KafkaProducer(value_serializer=msgpack.dumps) producer.send('msgpack-topic', 'key': 'value'>) # produce json messages producer = KafkaProducer(value_serializer=lambda m: json.dumps(m).encode('ascii')) producer.send('json-topic', 'key': 'value'>) # produce asynchronously for _ in range(100): producer.send('my-topic', b'msg') def on_send_success(record_metadata): print(record_metadata.topic) print(record_metadata.partition) print(record_metadata.offset) def on_send_error(excp): log.error('I am an errback', exc_info=excp) # handle exception # produce asynchronously with callbacks producer.send('my-topic', b'raw_bytes').add_callback(on_send_success).add_errback(on_send_error) # block until all async messages are sent producer.flush() # configure multiple retries producer = KafkaProducer(retries=5) 

© Copyright 2016 — Dana Powers, David Arthur, and Contributors Revision 34dc36d7 .

Источник

Оцените статью