
import json
from kafka import KafkaConsumer
from kafka import TopicPartition
consumer = KafkaConsumer(bootstrap_servers='xxx:9092', group_id="group_test")
partition = TopicPartition('topic_name', 4)
offsetStart = 30570
end = 34387
consumer.assign([partition])
consumer.seek(partition, offsetStart)
i = 0
for msg in consumer:
print(f"msg.offset: {msg.offset} mgs:{msg.value} i: {i}")
if msg.offset > end:
break
i += 1
task_info = msg.value.decode('utf-8')
task_info = json.loads(task_info)
print(task_info)
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)