
文件系统监控,以肉眼可见的速度增大使用空间
查看当前日志保留策略配置,关键配置如下:
日志片大小设置为1G(logsegmentbytes默认大小也是1G),删除策略需要等到日志分片后才能被标记为删除,如没有设置分片时间的话,需要等写满日志片,或是等到7天(默认时间)没有数据写入后,才会自动分片。
可以适当降低logsegmentbytes 大小为512M,设置segmentbytes 大小为 512M,手动设置分片时间 logrollhours ,
参考: kafka 数据定时删除实验
1停止kafka服务
a) 找出kafka进程号,使用命令jps
b) 杀掉kafka进程,使用命令 kill -9 xxx
c) 查看当前kafka进程是否还存在,ps -ef|grep kafka jps
2修改配置
vi $KAFKA_HOME/config/serverproperties
修改配置:
logsegmentbytes=536870912
添加配置:
segmentbytes=536870912
logrollhours=12
3启动kafka服务
/bin/start_kafkash
4检查状态
ps -ef|grep kafka
jps
5依次 *** 作其它服务器
如果这里没有没有增加sleep,会发现数据写入丢失,因为程序快速执行完毕,JVM退出,导致一部分缓存的数据没有发送到kafka集群,从而导致丢失。虽然是一个小问题,但是希望大家注意。。。
Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member This means that the time between subsequent calls to poll() was longer than the configured sessiontimeoutms, which typically implies that the poll loop is spending too much time message processing You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with maxpollrecords
造成的问题:假如consumerproperties配置中maxpollrecords=40 (一次最多拉取40条数据) sessiontimeoutms=30000 (会话时间)
假设kafka此时一次拉取了40条数据,但在处理第31条的时候抛出了如上的异常,就会导致,本次offset不会提交,完了这40条消息都会在接下来的某刻被再次消费,这其中就包含了其实已经消费了的30条数据
原因:the poll loop is spending too much time message processing, the time between subsequent calls to poll() was longer than the configured sessiontimeoutms,好吧其实是一个意思!
意思就是说poll下来数据后,处理这些数据的时间比 sessiontimeoutms配置的时间要长,从而导致the group has already rebalanced
解决办法是最后一句话:You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with maxpollrecords
即要不增大 sessiontimeoutms,要不减小maxpollrecords ,至于具体配置为多少,得看你处理一条消息花费多长时间 x,需要满足 x乘以maxpollrecords < sessiontimeoutms
另一种解决思路:
解决此类重复消费的方式:将能够唯一标识消息的信息存储在其他系统,比如redis,什么能够唯一标识消息呢?就是consumergroup+topic+partition+offset,更准确的应该是consumergroup+" "+topic+" "+partition+"_"+offset组成的key,value可以是处理时间存放在redis中,每次处理kafka消息时先从redis中根据key获取value,如果value为空,则表明该消息是第一次被消费,不为空则表示时已经被消费过的消息;
参考: >
以上就是关于kafka日志保留策略异常处理全部的内容,包括:kafka日志保留策略异常处理、kafka无法写入集群数据、kafka重复消费的问题等相关内容解答,如果想了解更多相关内容,可以关注我们,你们的支持是我们更新的动力!
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)