
# __author__ = '小红帽'
# Date: 2020-05-11
"""Naval Fate.
Usage:
py_kafka_protobuf_consume.py --bootstrap-servers=<host:port,host2:port2..>--groupId=<groupId>--topic=<topic_name>--es-servers=<host:port>--index=<schema>--type=<doc>--id=<order_id>
py_kafka_protobuf_consume.py -h | --help
py_kafka_protobuf_consume.py --version
Options:
-h --help 打印帮助信息.
--bootstrap_servers=<host:port,host2:port2..> kafka servers
--groupId=<groupId> kafka消费组
--topic=<topic_name> topic名称
--es-servers=<host:port> ES 地址
--index=<index_name> ES 索引
--type=<doc>ES type
--id=<order_id>指定id主键,快速更新
"""
import json
from kafka import KafkaConsumer
from docopt import docopt
from elasticsearch import Elasticsearch
from elasticsearch import helpers
class Kafka_consumer():
def __init__(self,args):
self.topic = args['--topic']
self.bootstrapServers = args['--bootstrap-servers']
self.groupId = args['--groupId']
self.id = args['--id']
self.es_host = args['--es-servers'].split(':')[0]
self.es_port = args['--es-servers'].split(':')[1]
self.es_index = args['--index']
self.es_type = args['--type']
self.consumer = KafkaConsumer(
bootstrap_servers=self.bootstrapServers,
group_id=self.groupId,
enable_auto_commit = True,
auto_commit_interval_ms=5000,
consumer_timeout_ms=5000
)
def consume_data_es(self):
while True:
try:
es = Elasticsearch([{'host': self.es_host, 'port': self.es_port}], timeout=3600)
self.consumer.subscribe([self.topic])
actions=[]
for message in self.consumer:
if message is not None:
query = json.loads(message.value)['data'][0]
action = {
"_index": self.es_index,
"_type": self.es_type,
"_id": json.loads(message.value)['data'][0][self.id],
"_source": query
}
actions.append(action)
if len(actions) >50:
helpers.bulk(client=es, actions=actions)
print("插入es %s 条数据" % len(actions))
actions = []
if len(actions) >0:
helpers.bulk(client=es, actions=actions)
print("等待超时时间,插入es %s 条数据" % len(actions))
actions=[]
except BaseException as e:
print(e)
if __name__ == '__main__':
arguments = docopt(__doc__,version='sbin 1.0')
consumer = Kafka_consumer(arguments)
consumer.consume_data_es()
更详细的可以参考: https://zhuanlan.zhihu.com/p/279784873https://blog.csdn.net/weixin_35688430/article/details/111292744
生产者:producer
生产者kafka有对应的三方库可以支撑去进行信息发送。
消费者:consumer
消费者,同样可以通过topic和服务器名称,去获取对应的数据。
使用kafka
连接 kafka 的库有两种类型,一种是直接连接 kafka 的,存储 offset 的事情要自己在客户端完成。还有一种是先连接 zookeeper 然后再通过 zookeeper 获取 kafka 的 brokers 信息, offset 存放在 zookeeper 上面,由 zookeeper 来协调。我现在使用 samsa 这个 highlevel 库
Producer示例
from kazoo.client import KazooClientfrom samsa.cluster import Clusterzookeeper = KazooClient()zookeeper.start()cluster = Cluster(zookeeper)topic = cluster.topics['topicname']topic.publish('msg')
** Consumer示例 **
from kazoo.client import KazooClientfrom samsa.cluster import Clusterzookeeper = KazooClient()zookeeper.start()cluster = Cluster(zookeeper)topic = cluster.topics['topicname']consumer = topic.subscribe('groupname')for msg in consumer:
print msg
Tip
consumer 必需在 producer 向 kafka 的 topic 里面提交数据后才能连接,否则会出错。
在 Kafka 中一个 consumer 需要指定 groupname , groue 中保存着 offset 等信息,新开启一个 group 会从 offset 0 的位置重新开始获取日志。
kafka 的配置参数中有个 partition ,默认是 1 ,这个会对数据进行分区,如果多个 consumer 想连接同个 group 就必需要增加 partition , partition 只能大于 consumer 的数量,否则多出来的 consumer 将无法获取到数据。
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)