
注意:
- 可以两种方式读取 json 格式的文件。
- df.show()默认显示前 20 行数据。
- Dataframe 原生 API 可以 *** 作 Dataframe。
- 注册成临时表时,表中的列默认按 ascii 顺序显示列。
df.createTempView(“mytable”)
df.createOrReplaceTempView(“mytable”)
df.createGlobalTempView(“mytable”)
df.createOrReplaceGlobalTempView(“mytable”)
Session.sql(“select * from global_temp.mytable ”).show() - Dataframe 是一个一个 Row 类型的 RDD,
json数据源
{"name":"科比","age":24}
{"name":"詹姆斯","age":23}
{"name":"杜兰特","age":25}
{"name":"保罗","age":26}
{"name":"库里","age":27}
{"name":"加索尔","age":28}
{"name":"朗多","age":29}
{"name":"皮尔斯"}
{"name":"雷阿伦"}
{"name":"奥多姆"}
{"name":"拜纳姆","age":24}
{"name":"科比","age":24}
{"name":"詹姆斯","age":23}
{"name":"杜兰特","age":25}
{"name":"保罗","age":26}
{"name":"库里","age":27}
{"name":"加索尔","age":28}
{"name":"朗多","age":29}
{"name":"皮尔斯"}
{"name":"雷阿伦"}
{"name":"奥多姆"}
{"name":"拜纳姆","age":24}
package sparkSql
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SparkSession}
object ReadJsonDataToDF2{
def main(args: Array[String]): Unit = {
val session = SparkSession.builder().master("local").appName("name").getOrCreate()
session.sparkContext.setLogLevel("Error")
// val frame = session.read.json("data/jsondata")
val frame = session.read.format("json").load("data/jsondata")
frame.createTempView("t")
val rdd: RDD[Row] = session.sql("select name,age from t where age is not null").rdd
rdd.foreach(row=>{
val name = row.getAs[String]("name")
val age = row.getAs[Long]("age")
println(s"name:$name,age$age")
})
// rdd.foreach(println)
// frame.show()
}
}
2.通过 json 格式的 RDD 创建 Dataframe
package sparkSql
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataframe, Dataset, SparkSession}
object ReadJsonRDDToDF {
def main(args: Array[String]): Unit = {
val session = SparkSession.builder().master("local").appName("name").getOrCreate()
session.sparkContext.setLogLevel("Error")
val jsonArr = Array[String](
"{"name":"科比","age":24}",
"{"name":"詹姆斯","age":23}",
"{"name":"杜兰特","age":35}",
"{"name":"保罗","age":3}"
)
import session.implicits._
val jsonDataSet: Dataset[String] = jsonArr.toList.toDS()
val frame = session.read.json(jsonDataSet)
frame.createTempView("t")
session.sql("select name,age from t").show()
// val context = session.sparkContext
// val jsonRDD: RDD[String] = context.parallelize(jsonArr)
// val frame: Dataframe = session.read.json(jsonRDD)
// frame.show()
}
}
3.非 json 格式的 RDD 创建 Dataframe
- 通过反射的方式将非 json 格式的 RDD 转换成 Dataframe(不建议使 用)
数据源:
1,科比,24,99 2,詹姆斯,6,100 3,杜兰特,35,100 4,哈登,13,80 5,乔丹,23,90
package sparkSql
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataframe, SparkSession}
case class PersonInfo(id:Int,name:String,num:Int,score:Int)
object ReadRDDToDF {
def main(args: Array[String]): Unit = {
val session: SparkSession = SparkSession.builder().appName("test").master("local").getOrCreate()
val context: SparkContext = session.sparkContext
context.setLogLevel("Error")
val person: RDD[String] = context.textFile("data/person")
val personRDD: RDD[PersonInfo] = person.map(line => {
val arr: Array[String] = line.split(",")
val id: Int = arr(0).toInt
val name: String = arr(1)
val num: Int = arr(2).toInt
val score: Int = arr(3).toInt
PersonInfo(id, name, num, score)
})
import session.implicits._
val frame: Dataframe = personRDD.toDF()
frame.createTempView("t")
val frame1: Dataframe = session.sql("select id,name,num,score from t")
frame1.show()
frame1.printSchema()
}
}
2. 动态创建 Schema 将非 json 格式的 RDD 转换成 Dataframe
package sparkSql
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
import org.apache.spark.sql.{Dataframe, Row, SparkSession}
object ReadRDDtoDF2 {
def main(args: Array[String]): Unit = {
val session: SparkSession = SparkSession.builder().master("local").appName("test1").getOrCreate()
val context: SparkContext = session.sparkContext
context.setLogLevel("Error")
val personRDD: RDD[String] = context.textFile("data/person")
val rowRDD: RDD[Row] = personRDD.map(line => {
val arr: Array[String] = line.split(",")
val id: Int = arr(0).toInt
val name: String = arr(1)
val num: Int = arr(2).toInt
val score: Double = arr(3).toDouble
Row(id, name, num, score)
})
val struct = StructType(List[StructField](
StructField("id",DataTypes.IntegerType,true),
StructField("name",DataTypes.StringType,true),
StructField("num",DataTypes.IntegerType,true),
StructField("score",DataTypes.DoubleType,true)
))
val frame: Dataframe = session.createDataframe(rowRDD, struct)
frame.show()
frame.printSchema()
frame.createTempView("t")
val frame1: Dataframe = session.sql(
"""
|select id,name,num,score from t
|""".stripMargin)
frame1.show()
}
}
4.读取 parquet 文件创建 Dataframe
package sparkSql
import org.apache.spark.SparkContext
import org.apache.spark.sql.{Dataframe, SaveMode, SparkSession}
object ReadParquetFileToDF {
def main(args: Array[String]): Unit = {
val session: SparkSession = SparkSession.builder().appName("ttt").master("local").getOrCreate()
val context: SparkContext = session.sparkContext
context.setLogLevel("Error")
val frame: Dataframe = session.read.json("data/jsondata")
frame.write.mode(SaveMode.Overwrite).parquet("data/parquet")
val frame1: Dataframe = session.read.parquet("data/parquet")
frame1.show(22)
println(frame1.count())
// frame1.write.json("data/json")
}
}
4.读取 cvs 格式的数据加载 Dataframe
csv数据源
id,name,age,score 1,科比,40,100 2,詹姆斯,37,100 3,乔丹,55,100 4,杜兰特,33,99 5,库里,34,99
package sparkSql
import org.apache.spark.sql.{Dataframe, SparkSession}
object ReadCSVDataToDF {
def main(args: Array[String]): Unit = {
val session: SparkSession = SparkSession.builder().master("local").appName("eee").getOrCreate()
session.sparkContext.setLogLevel("Error")
val frame: Dataframe = session.read.option("header",true).csv("data/data.csv")
frame.show()
}
}
5.读取Tuple类型的Dataset加载Dataframe
数据源比较大,不粘贴
package sparkSql
import org.apache.spark.sql.{Dataframe, Dataset, SparkSession}
object ReadTupleDatasetToDF {
def main(args: Array[String]): Unit = {
val session = SparkSession.builder().appName("rrr").master("local").getOrCreate()
session.sparkContext.setLogLevel("Error")
val dt: Dataset[String] = session.read.textFile("data/pvuvdata")
import session.implicits._
val value: Dataset[(String, String, String, String, String, String, String)] = dt.map(line => {
val arr: Array[String] = line.split("t")
val ip = arr(0)
val local = arr(1)
val date = arr(2)
val ts = arr(3)
val uid = arr(4)
val site = arr(5)
val operator = arr(6)
(ip, local, date, ts, uid, site, operator)
})
val frame: Dataframe = value.toDF("ip","local","date","ts","uid","site","operator")
frame.createTempView("t")
session.sql(
"""
|select site,count(*) as site_count from t group by site order by site_count
|""".stripMargin).show()
session.sql(
"""
|select site,count(*) uv from (select distinct ip,site from t) t1 group by site order by uv
|""".stripMargin).show()
}
}
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)