
众所周知,kafka中存储的数据是经过BASE64加密后的jsonObject,因此从kafka中读取的数据经过base64解码,得到的是json串,利用JSONObect的方法可以对json串进行解析,拿到对应的数据。那么要如何将scala对象或者java对象转换为JsonObject对象或JSONObject对象呢?(注意:JsonObject对象和JSONObject对象不同,调用的API也不一样)
三种转换方式依赖的包源码都是用JAVA编写,所以构建Map对象时完全使用java对象,不会发生错误。构建过程如下:
三种将java对象转换为jsonObject对象的开源包有:
1、google提供的Genson是一个完全的Java和JSON转换的类库,提供了全面的数据绑定、流 *** 作等。基于Apache 20协议发布。转换结果为
JsonObject对象。
使用需要先导入Jar包:import comgooglegson{Gson, JsonParser}
引入依赖:这里选用版本为:224,具体版本可以根据业务需求选择。
<dependency>
<groupId>comgooglecodegson</groupId>
<artifactId>gson</artifactId>
<version>224</version>
</dependency>
2、Fastjson 是一个 Java 库,可以将 Java 对象转换为 JSON 格式,当然它也可以将 JSON 字符串转换为 Java 对象。
导入jar包:import comalibabafastjsonJSON
引入依赖:
<dependency>
<groupId>comalibaba</groupId>
<artifactId>fastjson</artifactId>
<version>128</version>
</dependency>
3、netsfjson-lib方式
导入jar包:import netsfjsonJSONObject
引入依赖:
<dependency>
<groupId>netsfjson-lib</groupId>
<artifactId>json-lib-ext-spring</artifactId>
<version>102</version>
</dependency>
List list是一种处理一组有序项目的数据结构,也就是说,您可以在一个列表中存储一系列项目。 元组是Scala语言中非常有用的容器对象。元组非常类似于列表,但是元组是不可变的。列表控件可以在四种不同的视图中显示项目,而元组适用于许多场景。List list是一种处理有序项目集的数据结构,也就是说,您可以在一个列表中存储一系列项目。列表中的项目应该用方括号括起来,这样python就知道您在指示一个列表。创建列表后,您可以添加、删除或搜索列表中的项目。由于可以添加或删除项目,所以我们说列表是可变数据类型,也就是说,这种类型是可以改变的。列表可以嵌套。元组非常类似于列表,但是元组是不可变的。也就是不能修改元组。元组由括号中逗号分隔的项定义。元组通常用于使语句或用户定义的函数能够安全地采用一组值,也就是说,所使用的元组的值不会改变。元组可以嵌套。列表控件可以在四种不同的视图中显示项目。您可以将项目分组到有或没有列标题的列中,并显示附带的图标和文本。例如,ListView控件用于将名为ListItem对象的列表项组织成以下四种不同视图之一:1 大(标准)图标2。小图标3。清单4。“报表视图”属性确定控件使用哪个视图来显示列表中的项。比如组件[1]的列表控件,有完整的列表框、多栏下拉列表框、拆分样式等。它可以管理列表中项目的排序方法和选定项目的外观。Tuple是Scala语言中非常有用的容器对象。像列表一样,元组是不可变的;但是与列表不同,元组可以包含不同类型的元素。比如一个list只能写成List[Int]或者List[String],但是一个tuple可以同时有Int和String。元组适用于许多场景,例如,如果您需要在一个方法中返回多个对象。Java中的做法是创建一个已经包含多个返回值的JavaBean,Scala只能返回元组。而且做起来也很简单;把元组实例化需要的对象放在括号里,用逗号隔开就行了。元组实例化后,可以通过点、下划线和基于1的索引来访问其中的元素。
如何使用scala+spark读写Hbase
软件版本如下:
scala2118
spark210
hbase120
公司有一些实时数据处理的项目,存储用的是hbase,提供实时的检索,当然hbase里面存储的数据模型都是简单的,复杂的多维检索的结果是在es里面存储的,公司也正在引入Kylin作为OLAP的数据分析引擎,这块后续有空在研究下。
接着上面说的,hbase存储着一些实时的数据,前两周新需求需要对hbase里面指定表的数据做一次全量的update以满足业务的发展,平时 *** 作hbase都是单条的curd,或者插入一个批量的list,用的都是hbase的java api比较简单,但这次涉及全量update,所以如果再用原来那种单线程的 *** 作api,势必速度回慢上许多。
关于批量 *** 作Hbase,一般我们都会用MapReduce来 *** 作,这样可以大大加快处理效率,原来也写过MR *** 作Hbase,过程比较繁琐,最近一直在用scala做spark的相关开发,所以就直接使用scala+spark来搞定这件事了,当然底层用的还是Hbase的TableOutputFormat和TableOutputFormat这个和MR是一样的,在spark里面把从hbase里面读取的数据集转成rdd了,然后做一些简单的过滤,转化,最终在把结果写入到hbase里面。
整个流程如下:
(1)全量读取hbase表的数据
(2)做一系列的ETL
(3)把全量数据再写回hbase
核心代码如下:
//获取conf
val conf=HBaseConfigurationcreate() //设置读取的表
confset(TableInputFormatINPUT_TABLE,tableName) //设置写入的表
confset(TableOutputFormatOUTPUT_TABLE,tableName)//创建sparkConf
val sparkConf=new SparkConf() //设置spark的任务名
sparkConfsetAppName("read and write for hbase ") //创建spark上下文
val sc=new SparkContext(sparkConf)
//为job指定输出格式和输出表名
val newAPIJobConfiguration1 = JobgetInstance(conf)
newAPIJobConfiguration1getConfiguration()set(TableOutputFormatOUTPUT_TABLE, tableName)
newAPIJobConfiguration1setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
//全量读取hbase表
val rdd=scnewAPIHadoopRDD(conf,classOf[TableInputFormat]
,classOf[ImmutableBytesWritable]
,classOf[Result]
)
//过滤空数据,然后对每一个记录做更新,并转换成写入的格式
val final_rdd= rddfilter(checkNotEmptyKs)map(forDatas)
//转换后的结果,再次做过滤
val save_rdd=final_rddfilter(checkNull)
//最终在写回hbase表
save_rddsaveAsNewAPIHadoopDataset(newAPIJobConfiguration1getConfiguration)
scstop()
从上面的代码可以看出来,使用spark+scala *** 作hbase是非常简单的。下面我们看一下,中间用到的几个自定义函数:
第一个:checkNotEmptyKs
作用:过滤掉空列簇的数据
def checkNotEmptyKs(f:((ImmutableBytesWritable,Result))):Boolean={ val r=f_2 val rowkey=BytestoString(rgetRow) val map:scalacollectionmutableMap[Array[Byte],Array[Byte]]= rgetFamilyMap(BytestoBytes("ks"))asScala if(mapisEmpty) false else true
}
第二个:forDatas
作用:读取每一条数据,做update后,在转化成写入 *** 作
def forDatas(f: (ImmutableBytesWritable,Result)): (ImmutableBytesWritable,Put)={ val r=f_2 //获取Result
val put:Put=new Put(rgetRow) //声明put
val ks=BytestoBytes("ks") //读取指定列簇
val map:scalacollectionmutableMap[Array[Byte],Array[Byte]]= rgetFamilyMap(ks)asScala
mapforeach(kv=>{//遍历每一个rowkey下面的指定列簇的每一列的数据做转化
val kid= BytestoString(kv_1)//知识点id
var value=BytestoString(kv_2)//知识点的value值
value="修改后的value"
putaddColumn(ks,kv_1,BytestoBytes(value)) //放入put对象
}
) if(putisEmpty) null else (new ImmutableBytesWritable(),put)
}
第三个:checkNull 作用:过滤最终结果里面的null数据
def checkNull(f:((ImmutableBytesWritable,Put))):Boolean={ if(f==null) false else true
}
上面就是整个处理的逻辑了,需要注意的是对hbase里面的无效数据作过滤,跳过无效数据即可,逻辑是比较简单的,代码量也比较少。
除了上面的方式,还有一些开源的框架,也封装了相关的处理逻辑,使得spark *** 作hbase变得更简洁,有兴趣的朋友可以了解下,github链接如下:
大数据技术的体系庞大且复杂,基础的技术包含数据的采集、数据预处理、分布式存储、数据库、数据仓库、机器学习、并行计算、可视化等。
1、数据采集与预处理:FlumeNG实时日志收集系统,支持在日志系统中定制各类数据发送方,用于收集数据;Zookeeper是一个分布式的,开放源码的分布式应用程序协调服务,提供数据同步服务。
2、数据存储:Hadoop作为一个开源的框架,专为离线和大规模数据分析而设计,HDFS作为其核心的存储引擎,已被广泛用于数据存储。HBase,是一个分布式的、面向列的开源数据库,可以认为是hdfs的封装,本质是数据存储、NoSQL数据库。
3、数据清洗:MapReduce作为Hadoop的查询引擎,用于大规模数据集的并行计算。
4、数据查询分析:Hive的核心工作就是把SQL语句翻译成MR程序,可以将结构化的数据映射为一张数据库表,并提供HQL(HiveSQL)查询功能。Spark启用了内存分布数据集,除了能够提供交互式查询外,它还可以优化迭代工作负载。
5、数据可视化:对接一些BI平台,将分析得到的数据进行可视化,用于指导决策服务。
以上就是关于如何构建json串,并将map转为jsonObject对象的三种方式(scala)全部的内容,包括:如何构建json串,并将map转为jsonObject对象的三种方式(scala)、python的元组和列表的区别、如何使用scala+spark读写hbase等相关内容解答,如果想了解更多相关内容,可以关注我们,你们的支持是我们更新的动力!
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)