Flume采集日志到Kafka经典案例

Flume采集日志到Kafka经典案例,第1张

Flume采集日志到Kafka经典案例 环境准备:

涉及到的技术有flume,Kafka,zookeeper。

*** 作步骤:

1、构建agent

train.sources=trainSource
train.channels=trainChannel
train.sinks=trainSink

train.sources.trainSource.type=spooldir
train.sources.trainSource.spoolDir=/opt/kb15tmp/flumelogfile/train
train.sources.trainSource.deserializer=LINE
train.sources.trainSource.deserializer.maxLineLength=320000
train.sources.trainSource.includePattern=train_[0-9]{4}-[0-9]{2}-[0-9]{2}.csv
train.sources.trainSource.interceptors=head_filter
train.sources.trainSource.interceptors.head_filter.type=regex_filter
train.sources.trainSource.interceptors.head_filter.regex=^user*
train.sources.trainSource.interceptors.head_filter.excludeEvents=true

train.channels.trainChannel.type=file
train.channels.trainChannel.checkpointDir=/opt/kb15tmp/checkpoint/train
train.channels.trainChannel.dataDirs=/opt/kb15tmp/checkpoint/data/train

train.sinks.trainSink.type=org.apache.flume.sink.kafka.KafkaSink
train.sinks.trainSink.batchSize=640
train.sinks.trainSink.brokerList=192.168.91.180:9092
train.sinks.trainSink.topic=train

train.sources.trainSource.channels=trainChannel
train.sinks.trainSink.channel=trainChannel

2、启动Kafka和zookeeper
启动zookeeperzkServer.sh start
启动Kafkanohup kafka-server-start.sh /opt/soft/kafka211/config/server.properties &

3、启动消费者进行消费
首先先创建主题,kafka-topics.sh --create --zookeeper 192.168.91.180:2181 --topic train --partitions 1 --replication-factor 1
消费:
kafka-console-consumer.sh --bootstrap-server 192.168.91.180:9092 --topic train --from-beginning

4、启动flume
./bin/flume-ng agent --name train --conf conf/ --conf-file conf/KB15conf/train.conf -Dflume.root.logger=INFO,console

5、将需要消费的日志文件拷贝到指定的文件夹下
cp train.csv /opt/kb15tmp/flumelogfile/train/train_2021-12-27.csv

欢迎分享,转载请注明来源:内存溢出

原文地址:https://54852.com/zaji/5696098.html

(0)
打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
上一篇 2022-12-17
下一篇2022-12-17

发表评论

登录后才能评论

评论列表(0条)

    保存