如何构建json串,并将map转为jsonObject对象的三种方式(scala)

如何构建json串,并将map转为jsonObject对象的三种方式(scala),第1张

众所周知,kafka中存储的数据是经过BASE64加密后的jsonObject,因此从kafka中读取的数据经过base64解码,得到的是json串,利用JSONObect的方法可以对json串进行解析,拿到对应的数据。那么要如何将scala对象或者java对象转换为JsonObject对象或JSONObject对象呢?(注意:JsonObject对象和JSONObject对象不同,调用的API也不一样)

三种转换方式依赖的包源码都是用JAVA编写,所以构建Map对象时完全使用java对象,不会发生错误。构建过程如下:

三种将java对象转换为jsonObject对象的开源包有:

1、google提供的Genson是一个完全的Java和JSON转换的类库,提供了全面的数据绑定、流 *** 作等。基于Apache 20协议发布。转换结果为

JsonObject对象。

使用需要先导入Jar包:import comgooglegson{Gson, JsonParser}

引入依赖:这里选用版本为:224,具体版本可以根据业务需求选择。

<dependency>

<groupId>comgooglecodegson</groupId>

<artifactId>gson</artifactId>

<version>224</version>

</dependency>

2、Fastjson 是一个 Java 库,可以将 Java 对象转换为 JSON 格式,当然它也可以将 JSON 字符串转换为 Java 对象。

导入jar包:import comalibabafastjsonJSON

引入依赖:

<dependency>

<groupId>comalibaba</groupId>

<artifactId>fastjson</artifactId>

<version>128</version>

</dependency>

3、netsfjson-lib方式

导入jar包:import netsfjsonJSONObject

引入依赖:

<dependency>

<groupId>netsfjson-lib</groupId>

<artifactId>json-lib-ext-spring</artifactId>

<version>102</version>

</dependency>

List list是一种处理一组有序项目的数据结构,也就是说,您可以在一个列表中存储一系列项目。 元组是Scala语言中非常有用的容器对象。元组非常类似于列表,但是元组是不可变的。列表控件可以在四种不同的视图中显示项目,而元组适用于许多场景。List list是一种处理有序项目集的数据结构,也就是说,您可以在一个列表中存储一系列项目。列表中的项目应该用方括号括起来,这样python就知道您在指示一个列表。创建列表后,您可以添加、删除或搜索列表中的项目。由于可以添加或删除项目,所以我们说列表是可变数据类型,也就是说,这种类型是可以改变的。列表可以嵌套。元组非常类似于列表,但是元组是不可变的。也就是不能修改元组。元组由括号中逗号分隔的项定义。元组通常用于使语句或用户定义的函数能够安全地采用一组值,也就是说,所使用的元组的值不会改变。元组可以嵌套。列表控件可以在四种不同的视图中显示项目。您可以将项目分组到有或没有列标题的列中,并显示附带的图标和文本。例如,ListView控件用于将名为ListItem对象的列表项组织成以下四种不同视图之一:1 大(标准)图标2。小图标3。清单4。“报表视图”属性确定控件使用哪个视图来显示列表中的项。比如组件[1]的列表控件,有完整的列表框、多栏下拉列表框、拆分样式等。它可以管理列表中项目的排序方法和选定项目的外观。Tuple是Scala语言中非常有用的容器对象。像列表一样,元组是不可变的;但是与列表不同,元组可以包含不同类型的元素。比如一个list只能写成List[Int]或者List[String],但是一个tuple可以同时有Int和String。元组适用于许多场景,例如,如果您需要在一个方法中返回多个对象。Java中的做法是创建一个已经包含多个返回值的JavaBean,Scala只能返回元组。而且做起来也很简单;把元组实例化需要的对象放在括号里,用逗号隔开就行了。元组实例化后,可以通过点、下划线和基于1的索引来访问其中的元素。

如何使用scala+spark读写Hbase

软件版本如下:

scala2118

spark210

hbase120

公司有一些实时数据处理的项目,存储用的是hbase,提供实时的检索,当然hbase里面存储的数据模型都是简单的,复杂的多维检索的结果是在es里面存储的,公司也正在引入Kylin作为OLAP的数据分析引擎,这块后续有空在研究下。

接着上面说的,hbase存储着一些实时的数据,前两周新需求需要对hbase里面指定表的数据做一次全量的update以满足业务的发展,平时 *** 作hbase都是单条的curd,或者插入一个批量的list,用的都是hbase的java api比较简单,但这次涉及全量update,所以如果再用原来那种单线程的 *** 作api,势必速度回慢上许多。

关于批量 *** 作Hbase,一般我们都会用MapReduce来 *** 作,这样可以大大加快处理效率,原来也写过MR *** 作Hbase,过程比较繁琐,最近一直在用scala做spark的相关开发,所以就直接使用scala+spark来搞定这件事了,当然底层用的还是Hbase的TableOutputFormat和TableOutputFormat这个和MR是一样的,在spark里面把从hbase里面读取的数据集转成rdd了,然后做一些简单的过滤,转化,最终在把结果写入到hbase里面。

整个流程如下:

(1)全量读取hbase表的数据

(2)做一系列的ETL

(3)把全量数据再写回hbase

核心代码如下:

//获取conf

val conf=HBaseConfigurationcreate() //设置读取的表

confset(TableInputFormatINPUT_TABLE,tableName) //设置写入的表

confset(TableOutputFormatOUTPUT_TABLE,tableName)//创建sparkConf

val sparkConf=new SparkConf() //设置spark的任务名

sparkConfsetAppName("read and write for hbase ") //创建spark上下文

val sc=new SparkContext(sparkConf)

//为job指定输出格式和输出表名

val newAPIJobConfiguration1 = JobgetInstance(conf)

newAPIJobConfiguration1getConfiguration()set(TableOutputFormatOUTPUT_TABLE, tableName)

newAPIJobConfiguration1setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])

//全量读取hbase表

val rdd=scnewAPIHadoopRDD(conf,classOf[TableInputFormat]

,classOf[ImmutableBytesWritable]

,classOf[Result]

)

//过滤空数据,然后对每一个记录做更新,并转换成写入的格式

val final_rdd= rddfilter(checkNotEmptyKs)map(forDatas)

//转换后的结果,再次做过滤

val save_rdd=final_rddfilter(checkNull)

//最终在写回hbase表

save_rddsaveAsNewAPIHadoopDataset(newAPIJobConfiguration1getConfiguration)

scstop()

从上面的代码可以看出来,使用spark+scala *** 作hbase是非常简单的。下面我们看一下,中间用到的几个自定义函数:

第一个:checkNotEmptyKs

作用:过滤掉空列簇的数据

def checkNotEmptyKs(f:((ImmutableBytesWritable,Result))):Boolean={ val r=f_2 val rowkey=BytestoString(rgetRow) val map:scalacollectionmutableMap[Array[Byte],Array[Byte]]= rgetFamilyMap(BytestoBytes("ks"))asScala if(mapisEmpty) false else true

}

第二个:forDatas

作用:读取每一条数据,做update后,在转化成写入 *** 作

def forDatas(f: (ImmutableBytesWritable,Result)): (ImmutableBytesWritable,Put)={ val r=f_2 //获取Result

val put:Put=new Put(rgetRow) //声明put

val ks=BytestoBytes("ks") //读取指定列簇

val map:scalacollectionmutableMap[Array[Byte],Array[Byte]]= rgetFamilyMap(ks)asScala

mapforeach(kv=>{//遍历每一个rowkey下面的指定列簇的每一列的数据做转化

val kid= BytestoString(kv_1)//知识点id

var value=BytestoString(kv_2)//知识点的value值

value="修改后的value"

putaddColumn(ks,kv_1,BytestoBytes(value)) //放入put对象

}

) if(putisEmpty) null else (new ImmutableBytesWritable(),put)

}

第三个:checkNull 作用:过滤最终结果里面的null数据

def checkNull(f:((ImmutableBytesWritable,Put))):Boolean={ if(f==null) false else true

}

上面就是整个处理的逻辑了,需要注意的是对hbase里面的无效数据作过滤,跳过无效数据即可,逻辑是比较简单的,代码量也比较少。

除了上面的方式,还有一些开源的框架,也封装了相关的处理逻辑,使得spark *** 作hbase变得更简洁,有兴趣的朋友可以了解下,github链接如下:

大数据技术的体系庞大且复杂,基础的技术包含数据的采集、数据预处理、分布式存储、数据库、数据仓库、机器学习、并行计算、可视化等。

1、数据采集与预处理:FlumeNG实时日志收集系统,支持在日志系统中定制各类数据发送方,用于收集数据;Zookeeper是一个分布式的,开放源码的分布式应用程序协调服务,提供数据同步服务。

2、数据存储:Hadoop作为一个开源的框架,专为离线和大规模数据分析而设计,HDFS作为其核心的存储引擎,已被广泛用于数据存储。HBase,是一个分布式的、面向列的开源数据库,可以认为是hdfs的封装,本质是数据存储、NoSQL数据库。

3、数据清洗:MapReduce作为Hadoop的查询引擎,用于大规模数据集的并行计算。

4、数据查询分析:Hive的核心工作就是把SQL语句翻译成MR程序,可以将结构化的数据映射为一张数据库表,并提供HQL(HiveSQL)查询功能。Spark启用了内存分布数据集,除了能够提供交互式查询外,它还可以优化迭代工作负载。

5、数据可视化:对接一些BI平台,将分析得到的数据进行可视化,用于指导决策服务。

以上就是关于如何构建json串,并将map转为jsonObject对象的三种方式(scala)全部的内容,包括:如何构建json串,并将map转为jsonObject对象的三种方式(scala)、python的元组和列表的区别、如何使用scala+spark读写hbase等相关内容解答,如果想了解更多相关内容,可以关注我们,你们的支持是我们更新的动力!

欢迎分享,转载请注明来源:内存溢出

原文地址:https://54852.com/web/10080779.html

(0)
打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
上一篇 2023-05-05
下一篇2023-05-05

发表评论

登录后才能评论

评论列表(0条)

    保存