
项目地址:https://gitee.com/cch-bigdata/spark-process.git 输入数据
subject,name,score 数学,张三,88 语文,张三,92 英语,张三,77 数学,王五,65 语文,王五,87 英语,王五,90 数学,李雷,67 语文,李雷,33 英语,李雷,24 数学,宫九,77 语文,宫九,87 英语,宫九,90输出数据
+-------+----+-----+ |subject|name|score| +-------+----+-----+ | 语文|张三| 92| | 英语|王五| 90| | 英语|宫九| 90| | 数学|张三| 88| | 语文|王五| 87| | 语文|宫九| 87| | 数学|宫九| 77| | 英语|张三| 77| | 数学|李雷| 67| | 数学|王五| 65| | 语文|李雷| 33| | 英语|李雷| 24| +-------+----+-----+程序代码
package com.cch.bigdata.spark.process.sort
import com.cch.bigdata.spark.process.AbstractTransform
import org.apache.spark.sql.{Column, Dataframe}
import scala.collection.mutable.ListBuffer
//排序算子
class Sorter extends AbstractTransform{
//需要排序的列
private val columns = Array[String]("score")
//指定升序还是降序
private val sorts = Array[String]("desc")
override def process(): Unit = {
if(columns.isEmpty){
throw new RuntimeException("排序列未指定")
}
if(sorts.isEmpty){
throw new RuntimeException("排序方式未指定:[asc/desc]")
}
if(columns.length!=sorts.length){
throw new RuntimeException("排序字段和排序规则不匹配")
}
//获取输入流
val df: Dataframe = loadCsv("src/main/resources/csv/score.csv",spark)
//循环下标,用于迭代排序规则
var index = 0
val list:ListBuffer[Column] = ListBuffer()
columns.foreach(c=>{
val sort: String = sorts(index)
sort match {
case "asc" =>{
list.append(df.col(c).asc)
}
case "desc" =>{
list.append(df.col(c).desc)
}
case _=>{
throw new RuntimeException("排序规则只支持asc/desc")
}
}
index+=1
})
df.orderBy(list.map(c=>{c}):_*).show()
}
override def getAppName(): String = "排序"
}
object Sorter{
def main(args: Array[String]): Unit = {
new Sorter().process()
}
}
参数解释
columns:需要排序的列,字符串数组sorts:排序类型,字符串数组
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)