Kafka将数据写入到HBase案例

Kafka将数据写入到HBase案例,第1张

Kafka将数据写入到HBase案例 准备工作:

1、创建表
启动Hbase,命令start-hbase.sh。
在Hbase中,创建一个命名空间,并创建一张表。
命名空间create_namespace 'events_db'
train表create 'events_db:train','eu'

2、源数据的处理
通过flume将日志数据写入到Kafka的主题中。
*** 作链接

*** 作步骤:

分为4大模块,handler、worker、writer和test测试模块
1、handler模块:返回一个Put类型的集合
接口:

public interface IParseRecord {
    public List parse(ConsumerRecords records);
}

实现类:

public class TrainHandler implements IParseRecord{
    @Override
    public List parse(ConsumerRecords records) {
        ArrayList datas = new ArrayList<>();
        for (ConsumerRecord record: records) {
            System.out.println(record.value().toString());
            String[] split = record.value().toString().split(",");
            Put put = new Put(Bytes.toBytes(split[0]+split[1]));
            put.addColumn("eu".getBytes(),"user".getBytes(),split[0].getBytes());
            put.addColumn("eu".getBytes(),"event".getBytes(),split[1].getBytes());
            put.addColumn("eu".getBytes(),"invited".getBytes(),split[2].getBytes());
            put.addColumn("eu".getBytes(),"timestamp".getBytes(),split[3].getBytes());
            put.addColumn("eu".getBytes(),"interested".getBytes(),split[4].getBytes());
            put.addColumn("eu".getBytes(),"not_interested".getBytes(),split[5].getBytes());
            datas.add(put);
        }
        return datas;
    }
}

2、worker模块:配置Hbase
接口:

public interface IWorker {
    
    public void fillData(String targetTableName);
}

实现类:

public class Worker implements IWorker{
    private KafkaConsumer consumer = null;
    private IWriter writer = null;

    public Worker(String topicName,String groupId,IWriter writer) {
        this.writer = writer;
        //配置kafka prop(属性)
        Properties prop = new Properties();
        prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.91.180:9092");
        prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
        //设置是否自动提交 获取数据的状态,false 手动提交,true 自动提交
        prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
        prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
        prop.put(ConsumerConfig.GROUP_ID_CONFIG,groupId);
        //earliest none latest
        prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");

        consumer = new KafkaConsumer<>(prop);
        consumer.subscribe(Collections.singleton(topicName));
    }

    @Override
    public void fillData(String targetTableName) {
        long num = 0;
        try {
        while(true){
            ConsumerRecords records = consumer.poll(Duration.ofMillis(10));
            int count = writer.write(targetTableName, records);
            num += count;
            System.out.println("------------num-----------"+num);

                Thread.sleep(10);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

3、writer模块
接口:

public interface IWriter {
    public int write(String targetTableName, ConsumerRecords records);
}

实现类:

//写入目标数据库的功能
public class HbaseWriter implements IWriter{
    private Connection connection = null;
    private IParseRecord handler = null;

    public HbaseWriter(IParseRecord handler){
        this.handler = handler;
        Configuration conf = HbaseConfiguration.create();
        conf.set(HConstants.Hbase_DIR,"hdfs://192.168.91.180:9000/hbase");
        conf.set(HConstants.ZOOKEEPER_QUORUM,"192.168.91.180");
        conf.set(HConstants.CLIENT_PORT_STR,"2181");
        try {
            connection = ConnectionFactory.createConnection(conf);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public int write(String targetTableName, ConsumerRecords records) {
        try {
            Table table = connection.getTable(TableName.valueOf(targetTableName));
            List datas = handler.parse(records);
            if(datas != null && datas.size() > 0){
                table.put(datas);
                table.close();
                return datas.size();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        return 0;
    }
}

4、测试模块

public class Train {
    public static void main(String[] args) {
        IParseRecord handler = new TrainHandler();
        IWriter writer = new HbaseWriter(handler);
        IWorker worker = new Worker("train","train_group",writer);
        worker.fillData("events_db:train");
    }
}

运行即可~

欢迎分享,转载请注明来源:内存溢出

原文地址:https://54852.com/zaji/5700125.html

(0)
打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
上一篇 2022-12-17
下一篇2022-12-17

发表评论

登录后才能评论

评论列表(0条)

    保存