
1.先使kafka积压 ,写一个定时任务 来使kafka一直发送消息造成积压
@Component
public class MyTask {
@Autowired
KafkaTemplate kafkaTemplate;
//定时扫描日志文件
//发送给kafka
//读文件
@Scheduled(cron = "0 21 11 * * ?")
public void readLog() throws IOException {
System.out.println("定时任务启动了");
String path="d:/logs/";
readFile(new File(path));
}
private void readFile(File file) throws IOException {
File[] files = file.listFiles();
if (files!=null&&files.length>0){
for (File file1 : files) {
if (file1.isFile()){
BufferedReader reader = new BufferedReader(new FileReader(file1));
String str="";
while ((str=reader.readLine())!=null){
//一直读取消息
kafkaTemplate.send("test","import", UUID.randomUUID()+str);
}
}else {
this.readFile(file1);
}
}
}
}
}
上面的conpont注解是必须的 ,定时任务的注解也需要写
2.
public class MyListenerACK implements AcknowledgingMessageListener{ @Autowired RedisTemplate redisTemplate; @Autowired LogRep logRep; //队列 linkedBlockingQueue > lbq = new linkedBlockingQueue<>(); int i=0; @Override public void onMessage(ConsumerRecord data, Acknowledgment acknowledgment) { //开启线程 用来把消息放入队列 new Thread(new Runnable() { @Override public void run() { //把数据放入线程 try { lbq.put(data); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); //在开启一个线程来消费消息 new Thread(new Runnable() { @Override public void run() { //从队列取出消息 ConsumerRecord data = lbq.poll(); if (null != data) { myWork(data, acknowledgment); } } }).start(); } private void myWork(ConsumerRecord data, Acknowledgment acknowledgment) { ReentrantLock lock = new ReentrantLock(true); //加锁 lock.lock(); String key = data.key(); String value = data.value(); System.out.println("获取的数据key是"+key+"value是"+value); if (null!=key){ if (key.equals("FLOOR_ADD")){ //添加 //转换 TbFloor floor = JSON.parseObject(value, TbFloor.class); //防止提交的时候重复消费 if (redisTemplate.opsForValue().setIfAbsent(floor.getToken(),1,1, TimeUnit.DAYS)){ //存入redis redisTemplate.opsForList().rightPush("addFloors",floor); } //手动提交 acknowledgment.acknowledge(); //解锁 lock.unlock(); }else if (key.equals("logs")){ //把文件写入硬盘 FileUtil.writeFile("d:\logs\",data.value()); acknowledgment.acknowledge(); //解锁 lock.unlock(); }else if (key.equals("import")){ //截取前36位 String uuid = value.substring(0, 35); if (redisTemplate.opsForValue().setIfAbsent(uuid,1,7,TimeUnit.DAYS)){ //为了造成数据积压,所以把写入和手动提交暂时关闭 //读取日志文件 logRep.save(new LogInfo(i++,value)); acknowledgment.acknowledge(); } //解锁 lock.unlock(); System.out.println("log已经存入es"); } } } }
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)