6.1.8、Hbase

6.1.8、Hbase,第1张

6.1.8、Hbase 1、MapReduce可以 *** 作Hbase,通过Java写Mapreduce,打包在Hadoop上运行

每个map对应一个region,不能直接对hdfs切片,部分数据在memstore中,需要全表扫描,使用scan来获取数据 k:row key v:result一条数据的所有信息
不能使用TextInputforamt读取数据,只能使用TableInputFormat连接得到数据
数据写hdfs上使用TextOutputformat,写回hbase就是用Tableoutputformat

2、导包 org.apache.hbase hbase-server 1.4.6 导入插件,不导插件,Java写的Mr无法运行,识别不出来pox.xml里插件文件,直接读取Hbase数据
        
        
            org.apache.maven.plugins
            maven-compiler-plugin
            3.1
            
                1.8
                1.8
            
        


        
        
            maven-assembly-plugin
            
                
                    jar-with-dependencies
                
            
            
                
                    make-assembly
                    package
                    
                        single
                    
                
            
        
    

3、示例

统计各个班级人数,数据写到hdfs
//3、设置map任务,使用TableMapReduceUtil工具类
//因为输入的数据是hbase的,需要配置扫描表,字段等信息
//new Scan 还可以加过滤条件等信息

//求每个班级的人数,结果输出到hdfs上面
public class Demo01MrRead {
    
    public static class MrMap extends TableMapper {
        @Override
        protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
            //参数为:key:row key   value:每一条数据结果
            String row_key = Bytes.toString(key.get());
            String clazz = Bytes.toString(value.getValue("info".getBytes(), "clazz".getBytes()));
            context.write(new Text(clazz), new IntWritable(1));
        }
    }

    
    public static class MrReduce extends Reducer {
        @Override
        protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
            //参数与mapreduce一样了,map阶段输入kv,最终输出kv
            //key:map阶段传入的key-calzz value:合并的班级人数
            int sum = 0;
            for (IntWritable value : values) {
                sum += value.get();
            }
            context.write(key, new IntWritable(sum));
        }
    }

    
    public static void main(String[] args) throws Exception {
        //也可以直接new Configuration
        Configuration conf = HbaseConfiguration.create();
        //也可以不设置
        //1、创建一个Job
        Job job = Job.getInstance(conf);
        job.setJobName("Hbase_mr1学生班级人数");
        //2、设置Job的Jar
        job.setJarByClass(Demo01MrRead.class);
        //3、设置map任务,使用TableMapReduceUtil工具类
        //因为输入的数据是hbase的,需要配置扫描表,字段等信息
        //new Scan 还可以加过滤条件等信息
        TableMapReduceUtil.initTableMapperJob(TableName.valueOf("students"),
                new Scan(), MrMap.class, Text.class, IntWritable.class, job);
        //4、设置reduce任务
        job.setReducerClass(MrReduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        //5、设置输入输出路径
        FileOutputFormat.setOutputPath(job, new Path("/hbaseMr/clazz_num"));
        //6、执行
        job.waitForCompletion(true);
    }
}

统计各个班级人数,数据写回hbase–建表–使用TablemapreduceUtil指定输出路径为Hbase表

//统计每个班级人数,结果存入hbase表中
public class Demo02MrRead {
    public static class MrMapper extends TableMapper {
        @Override
        protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
            String clazz = Bytes.toString(value.getValue("info".getBytes(), "clazz".getBytes()));
            context.write(new Text(clazz), new IntWritable(1));
        }
    }

    public static class MrReduce extends TableReducer {
        @Override
        protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
            int count = 0;
            for (IntWritable value : values) {
                count += value.get();
            }
            //以班级key作为row key
            Put put = new Put(key.getBytes());
            put.addColumn("info".getBytes(), "num".getBytes(), Bytes.toBytes(count));
            context.write(NullWritable.get(), put);
        }
    }

        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            Configuration conf = HbaseConfiguration.create();
            conf.set("hbase.zookeeper.quorum", "master,node1,node2:2181");
            Job job = Job.getInstance(conf);
            job.setJarByClass(Demo02MrRead.class);

            TableMapReduceUtil.initTableMapperJob("students",
                    new Scan(), MrMapper.class,
                    Text.class,
                    IntWritable.class,
                    job);


            TableMapReduceUtil.initTableReducerJob(
                    "mr_res",
                    MrReduce.class,
                    job
            );

            job.waitForCompletion(true);
        }
    }

欢迎分享,转载请注明来源:内存溢出

原文地址:https://54852.com/zaji/3982492.html

(0)
打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
上一篇 2022-10-21
下一篇2022-10-21

发表评论

登录后才能评论

评论列表(0条)

    保存