
- 前言
- 一、样例数据
- 二、样例源码
- 三、总结
- 思路
- 问题
前言
以一个小例子:统计每个品类下最受欢迎的sku,来记录下spark分组排序的思路。
此代码是参考多易教育spark课程编写
一、样例数据
cate1,sku01 cate1,sku02 cate1,sku01 cate1,sku01 cate1,sku03 cate1,sku04 cate1,sku04 cate2,sku05 cate2,sku06 cate2,sku07 cate2,sku05 cate2,sku07 cate2,sku07 cate2,sku08 cate2,sku09 cate2,sku07 cate2,sku05 cate3,sku10 cate3,sku11 cate3,sku11 cate3,sku11 cate3,sku12 cate3,sku12 cate3,sku12二、样例源码
object Demo1 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("aa")
val sc = new SparkContext(conf)
// 1. 加载数据
val rdd1 = sc.textFile("./data/sku")
// 2. 转换数据格式,字符串变元组: cate,sku => ((cate,sku), 1)
val rdd2: RDD[((String, String), Int)] = rdd1.map(line => {
((line.split(",")(0), line.split(",")(1)), 1)
})
// 3. 统计每个sku的销量
val rdd3: RDD[((String, String), Int)] = rdd2.reduceByKey(_ + _)
// 4. 由于需要计算每个品类最受欢迎的sku,所以先得按品类分组
val rdd4 = rdd3.groupBy(_._1._1)
// 5. 将每个组的元素按照倒序排序,取出第一个元素
val rdd5 = rdd4.flatMapValues( _.toList.sortBy(-_._2).take(1))
// 6. 调整数据的输出格式,没有实际计算逻辑
val rdd6 = rdd5.map(x => (x._1, x._2._1._2, x._2._2))
rdd6.foreach(println)
}
}
三、总结
思路
- 按照二次排序的key进行求和, 得到形如(k1,k2,value)的数据
- 按照一次排序的key进行分组,得到形如(k1,Array(k2 value))的数据
- 组内排序。组内排序需要将数据转化为scala的数组,然后在executor本地排序
val rdd5 = rdd4.flatMapValues(_.toList.sortBy(-_._2).take(1))
这行代码有点问题。_.toList直接将数据全部放到内存,可能会产生oom。如何修改,请看spark分区排序二
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)