python 消费kafka 写入es 小记

python 消费kafka 写入es 小记,第1张

# -*- coding: utf8 -*-

# __author__ = '小红帽'

# Date: 2020-05-11

"""Naval Fate.

Usage:

        py_kafka_protobuf_consume.py --bootstrap-servers=<host:port,host2:port2..>--groupId=<groupId>--topic=<topic_name>--es-servers=<host:port>--index=<schema>--type=<doc>--id=<order_id>

        py_kafka_protobuf_consume.py -h | --help

        py_kafka_protobuf_consume.py --version

Options:

        -h --help                                      打印帮助信息.

        --bootstrap_servers=<host:port,host2:port2..>  kafka servers

        --groupId=<groupId>                            kafka消费组

        --topic=<topic_name>                            topic名称

        --es-servers=<host:port>                        ES 地址

        --index=<index_name>                            ES 索引

        --type=<doc>ES type

        --id=<order_id>指定id主键,快速更新

"""

import json

from kafka import KafkaConsumer

from docopt import docopt

from elasticsearch import Elasticsearch

from elasticsearch import helpers

class Kafka_consumer():

    def __init__(self,args):

        self.topic = args['--topic']

        self.bootstrapServers = args['--bootstrap-servers']

        self.groupId = args['--groupId']

        self.id = args['--id']

        self.es_host = args['--es-servers'].split(':')[0]

        self.es_port = args['--es-servers'].split(':')[1]

        self.es_index = args['--index']

        self.es_type = args['--type']

        self.consumer = KafkaConsumer(

            bootstrap_servers=self.bootstrapServers,

            group_id=self.groupId,

            enable_auto_commit = True,

            auto_commit_interval_ms=5000,

            consumer_timeout_ms=5000

        )

    def consume_data_es(self):

        while True:

            try:

                es = Elasticsearch([{'host': self.es_host, 'port': self.es_port}], timeout=3600)

                self.consumer.subscribe([self.topic])

                actions=[]

                for message in self.consumer:

                    if message is not None:

                        query = json.loads(message.value)['data'][0]

                        action = {

                            "_index": self.es_index,

                            "_type": self.es_type,

                            "_id": json.loads(message.value)['data'][0][self.id],

                            "_source": query

                        }

                        actions.append(action)

                        if len(actions) >50:

                            helpers.bulk(client=es, actions=actions)

                            print("插入es %s 条数据" % len(actions))

                            actions = []

                if len(actions) >0:

                    helpers.bulk(client=es, actions=actions)

                    print("等待超时时间,插入es %s 条数据" % len(actions))

                    actions=[]

            except BaseException as e:

                print(e)

if __name__ == '__main__':

    arguments = docopt(__doc__,version='sbin 1.0')

    consumer = Kafka_consumer(arguments)

    consumer.consume_data_es()

更详细的可以参考: https://zhuanlan.zhihu.com/p/279784873

https://blog.csdn.net/weixin_35688430/article/details/111292744

生产者:producer

生产者kafka有对应的三方库可以支撑去进行信息发送。

消费者:consumer

消费者,同样可以通过topic和服务器名称,去获取对应的数据。

使用kafka

连接 kafka 的库有两种类型,一种是直接连接 kafka 的,存储 offset 的事情要自己在客户端完成。还有一种是先连接 zookeeper 然后再通过 zookeeper 获取 kafka 的 brokers 信息, offset 存放在 zookeeper 上面,由 zookeeper 来协调。

我现在使用 samsa 这个 highlevel 库

Producer示例

from kazoo.client import KazooClientfrom samsa.cluster import Clusterzookeeper = KazooClient()zookeeper.start()cluster = Cluster(zookeeper)topic = cluster.topics['topicname']topic.publish('msg')

** Consumer示例 **

from kazoo.client import KazooClientfrom samsa.cluster import Clusterzookeeper = KazooClient()zookeeper.start()cluster = Cluster(zookeeper)topic = cluster.topics['topicname']consumer = topic.subscribe('groupname')for msg in consumer:

print msg

Tip

consumer 必需在 producer 向 kafka 的 topic 里面提交数据后才能连接,否则会出错。

在 Kafka 中一个 consumer 需要指定 groupname , groue 中保存着 offset 等信息,新开启一个 group 会从 offset 0 的位置重新开始获取日志。

kafka 的配置参数中有个 partition ,默认是 1 ,这个会对数据进行分区,如果多个 consumer 想连接同个 group 就必需要增加 partition , partition 只能大于 consumer 的数量,否则多出来的 consumer 将无法获取到数据。


欢迎分享,转载请注明来源:内存溢出

原文地址:https://54852.com/sjk/6774705.html

(0)
打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
上一篇 2023-03-28
下一篇2023-03-28

发表评论

登录后才能评论

评论列表(0条)

    保存