
1、安装kafka的python包
pip install kafka-python
2、生产kafka数据
import json from kafka import KafkaProducer, KafkaConsumer producer = KafkaProducer(bootstrap_servers=['xx.xx.xx.xx:xxxx']) # 此处传入kafka的地址和端口 msg = json.dumps(data).encode() # 必须要编码为字节类型的数据,不可以用utf-8 producer.send(topic_name, value=msg) # 此处传入kafka的topic
3、消费kafka
import json
from kafka import KafkaProducer, KafkaConsumer
consumer = KafkaConsumer(topic_name,
bootstrap_servers=['xx.xx.xx.xx:xxxx'],
auto_offset_reset='earliest', # 设置偏移方式,当参数值为earliest时从上次未消费的地方开始消费;当参数值为latest时从最新数据开始消费
group_id='dev', consumer_timeout_ms=1000 # 该参数表示1000ms内如果没有新的数据产生则停止消费,否则会一直循环等待)
res = []
for msg in consumer:
msg = json.loads(msg.value)
res.append(msg)
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)