
演示的日志文件为 op.log,内容为
按照日志格式进行切割
val conf = new SparkConf().set("spark.testing.memory","2147480000").setMaster("local[*]").setAppName("sparkDemo1")
val sc = SparkContext.getOrCreate(conf)
val spark = SparkSession.builder().appName("SparkJson").master("local[*]").getOrCreate()
import spark.implicits._
import org.apache.spark.sql.functions._
val rdd: RDD[String] = sc.textFile("in/op.log")
val df: Dataframe = rdd.map(x=>x.split('|')).map(x=>(x(0),x(1))).toDF()
// df.printSchema()
// df.show()
//修改字段名
val df2: Dataframe = df.withColumnRenamed("_1","id").withColumnRenamed("_2","value")
// df2.printSchema()
// df2.show()
接下来查看每个字段里面的每一个字段代表的内容
这是json解析后的内容,先查询value下面的ap,cm,et
//查询value下第一层字段内容,
val df3: Dataframe = df2.select($"id", get_json_object($"value", "$.ap").as("ap"),
get_json_object($"value", "$.cm").as("cm"),
get_json_object($"value", "$.et").as("et"))
df3.printSchema()
df3.show(2,false)
接着查询cm下面的所有内容,因为ap里面就一个,就无需查询
//查询df3下,cm字段里面的所有内容
val df4: Dataframe = df3.select($"id", $"ap", get_json_object($"cm", "$.ln").as("ln"), get_json_object($"cm", "$.sv").as("sv"),
get_json_object($"cm", "$.os").as("os"), get_json_object($"cm", "$.g").as("g"), get_json_object($"cm", "$.mid").as("mid"),
get_json_object($"cm", "$.nw").as("nw"), get_json_object($"cm", "$.l").as("l"), get_json_object($"cm", "$.vc").as("vc"),
get_json_object($"cm", "$.hw").as("hw"), get_json_object($"cm", "$.ar").as("ar"), get_json_object($"cm", "$.uid").as("uid"),
get_json_object($"cm", "$.t").as("t"), get_json_object($"cm", "$.la").as("la"), get_json_object($"cm", "$.md").as("md"),
get_json_object($"cm", "$.vn").as("vn"), get_json_object($"cm", "$.ba").as("ba"), get_json_object($"cm", "$.sr").as("sr"), $"et")
df4.printSchema()
df4.show()
接下来处理et,et是个数组,最好进行单独处理
//因为et类型比较复杂,这里可以用复杂类型进行schema应用,这里用from_json方法取字段
val schema: ArrayType = ArrayType(StructType(StructField("ett",StringType)::StructField("en",StringType)::StructField("kv",StringType)::Nil))
val df6: Dataset[Row] = df5.select(from_json($"et",schema).as("et"))
df6.printSchema()
df6.show(2,false)
//此时et输出来是一个二维数组,下面进行降维
val df7: Dataframe = df6.withColumn("et",explode(col("et")))
df7.printSchema()
df7.show(2,false)
df7.select($"et.ett",$"et.en",$"et.kv").show()
最后综合处理
//综合处理
val df8: Dataframe = df4.select($"id", $"ap", $"ln", $"sv", $"os",
$"g", $"mid", $"nw", $"l",
$"vc", $"hw", $"ar", $"uid",
$"t", $"la", $"md", $"vn",
$"ba", $"sr", from_json($"et", schema).as("et"))
.withColumn("et", explode(col("et")))
.select($"id", $"ap", $"ln", $"sv", $"os",
$"g", $"mid", $"nw", $"l",
$"vc", $"hw", $"ar", $"uid",
$"t", $"la", $"md", $"vn",
$"ba", $"sr", $"et.ett", $"et.en", $"et.kv")
df8.printSchema()
df8.show(2)
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)