
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Demo01 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("stone").setMaster("local")
val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")
val v1 = sc.parallelize(List(1, 2, 3, 4))
val v2 = sc.parallelize(List(6, 5, 3, 4))
v1.foreach(println)
v2.foreach(println)
println("--------------union-----------")
v1.union(v2).foreach(println)
println("--------------distinct-----------")
v1.union(v2).distinct().foreach(println)
println("--------------fileter-----------")
v1.union(v2).distinct().filter(_ >=3).foreach(print)
println("--------------差集-----------")
v1.subtract(v2).foreach(print)
println("--------------交集-----------")
v1.intersection(v2).foreach(print)
println("--------------dikaer-----------")
v1.cartesian(v2).foreach(print)
println(v1.partitions.size)
println("--------------=====-----------")
val kv1: RDD[(String, Int)] = sc.parallelize(List(
("zhang", 11),
("zhangsan", 12),
("lisi", 13),
("wangwu", 14)
))
val kv2: RDD[(String, Int)] = sc.parallelize(List(
("zhan", 21),
("zhang", 22),
("lisi", 23),
("zhaoliu", 28)
))
val cogroup: RDD[(String, (Iterable[Int], Iterable[Int]))] = kv1.cogroup(kv2)
cogroup.foreach(println)
val join: RDD[(String, (Int, Int))] = kv1.join(kv2)
join.foreach(println)
println("--------------=====-----------")
val left: RDD[(String, (Int, Option[Int]))] = kv1.leftOuterJoin(kv2)
left.foreach(println)
println("--------------=====-----------")
val right: RDD[(String, (Option[Int], Int))] = kv1.rightOuterJoin(kv2)
right.foreach(println)
println("--------------=====-----------")
val full: RDD[(String, (Option[Int], Option[Int]))] = kv1.fullOuterJoin(kv2)
full.foreach(println)
}
}
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)