
parquet文件链接:https://pan.baidu.com/s/1dmugj-ty47Hgi6WLAPaiGQ?pwd=yyds
提取码:yyds
--来自百度网盘超级会员V2的分享
======>转换后
package com.sz.table_ddl.test
import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.sql.functions.{struct, to_json}
object col_json {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new
SparkConf().setMaster("local[*]").setAppName("SparkSQL")
//1.todo 建立和spark框架的链接
val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
//禁用广播
spark.sql("set spark.sql.autoBroadcastJoinThreshold=-1")
//2. todo 读取文件
val string_quet=spark.read.format("parquet").load("D:\a\users.parquet")
//创建虚拟表
string_quet.createOrReplaceTempView("users")
//显示parquet文件的数据
string_quet.show(5)
//3. todo 计算指标
val rs=spark.sql(
"""
|select
|s.name as name,
|s.favorite_color as color,
|s.favorite_numbers as numbers
|from users s
|""".stripMargin)
rs.show()
//导包--> import org.apache.spark.sql.functions.{struct, to_json}
val finalDF = rs.withColumn("Newcol", to_json(struct("color","numbers")))
//一定要用sql得到列的别名
//selectExpr查询指定列
finalDF.selectExpr("Newcol","numbers").show(false)
//当然得到的结果也可以导入到mysql
//1. todo mysql数据库建表
// 建表中`numbers` enum,没有测过,思路是这样
//2. todo 导入数据到mysql
finalDF.selectExpr("Newcol","numbers").write.format("jdbc")
// todo option:jdbc里面的四大金刚 url,table,user,password
.option("url", "jdbc:mysql://localhost:3306/names?&useUnicode=true&characterEncoding=utf8")
.option("dbtable", "tab_json02")
.option("user", "root")
.option("password", "root")
// todo 更新
// .mode(SaveMode.Overwrite)
// todo 追加
.mode(SaveMode.Append)
.save()
// todo 释放资源
spark.stop()
}
}
3.代码结果
参考:
Spark SQL指定特定的列转换为Json_呼呼的小窝-CSDN博客_spark sql 结果转为json
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)