
项目地址:https://gitee.com/cch-bigdata/spark-process.git 输入数据集1
id,name,profession,enroll,score 1,庄劲聪,经济学类,北京理工大学,551 2,吴雅思,经济学类,北京理工大学,529 3,周育传,经济学类,北京理工大学,682 4,丁俊伟,通信工程,北京电子科技学院,708 5,庄逸琳,通信工程,北京电子科技学院,708 6,吴志发,通信工程,北京电子科技学院,578 7,肖妮娜,通信工程,北京电子科技学院,557 8,蔡建明,通信工程,北京电子科技学院,583 9,林逸翔,通信工程,北京电子科技学院,543输入数据集2
id,name,profession,enroll,score 1,曾凰妹,金融学,北京电子科技学院,637 2,谢德炜,金融学,北京电子科技学院,542 4,王丽云,金融学,北京电子科技学院,626 5,吴鸿毅,金融学,北京电子科技学院,591 6,施珊珊,经济学类,北京理工大学,581 7,柯祥坤,经济学类,北京理工大学,650 1,庄劲聪,经济学类,北京理工大学,551输出数据
+---+------+----------------+ | id| 名称| 学院| +---+------+----------------+ | 1|庄劲聪| null| | 2|吴雅思| null| | 3|周育传| null| | 4|丁俊伟| null| | 5|庄逸琳| null| | 6|吴志发| null| | 7|肖妮娜| null| | 8|蔡建明| null| | 9|林逸翔| null| | 1|曾凰妹|北京电子科技学院| | 2|谢德炜|北京电子科技学院| | 4|王丽云|北京电子科技学院| | 5|吴鸿毅|北京电子科技学院| | 6|施珊珊| 北京理工大学| | 7|柯祥坤| 北京理工大学| | 1|庄劲聪| 北京理工大学| +---+------+----------------+程序代码
package com.cch.bigdata.spark.process.union
import com.cch.bigdata.spark.process.AbstractTransform
import org.apache.spark.sql.functions.{col, lit}
import org.apache.spark.sql.{Column, Dataframe}
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
//union合并 *** 作
class Union extends AbstractTransform{
//1->合并不去重 2->合并行去重
private val union_type_set = Set(1,2)
private val union_type = 1
//表1需要合并的列名
private val left_table_union_columns = Array[String]("id","name","aaa")
//表2需要合并的列名
private val right_table_union_columns = Array[String]("id","name","enroll")
//合并后的列的名称
//如果没有设置,则使用表1的列名
private val new_columns = Array[String]("id","名称","学院")
override def process(): Unit = {
if(!union_type_set.contains(union_type)){
throw new RuntimeException("合并类型错误!")
}
if(left_table_union_columns.isEmpty){
throw new RuntimeException("表1合并列不能为空!")
}
if(right_table_union_columns.isEmpty){
throw new RuntimeException("表2合并列不能为空!")
}
//处理表1列名称和别名映射
val nameMapping = new mutable.linkedHashMap[String, String]()
var index = 0
left_table_union_columns.foreach(c=>{
nameMapping(c) = new_columns(index)
index+=1
})
//第一个数据输入
val firstDF: Dataframe = loadCsv("src/main/resources/csv/admission_1.csv",spark)
//获取第一个数据输入的schema
val firstColumnSet:Set[String] = firstDF.schema.fieldNames.toSet
//第二个数据输入
val secondDF: Dataframe = loadCsv("src/main/resources/csv/admission_2.csv",spark)
//获取第二个数据输入的schema
val secondColumnSet: Set[String] = secondDF.schema.fieldNames.toSet
//构造表1的select参数
val leftColumnList: ListBuffer[Column] = new ListBuffer()
//表1需要添加的新列
val withLeftColumnList: ListBuffer[String] = new ListBuffer()
for ((k, v) <- nameMapping){
if(firstColumnSet.contains(k)){
//这个配置的union列是表1中的
leftColumnList.append(col(k).as(v))
}else{
//这个是个新列
withLeftColumnList.append(k)
}
}
var leftQueryDf: Dataframe = firstDF.select(leftColumnList.map(c => {
c
}): _*)
withLeftColumnList.foreach(c=>{
var asName: String = nameMapping(c)
if(asName.isEmpty){
asName = c
}
leftQueryDf = leftQueryDf.withColumn(asName,lit(null))
})
//构造表2的select参数
val rightColumnList: ListBuffer[Column] = new ListBuffer()
//表2需要添加的新列
val withRightColumnList: ListBuffer[String] = new ListBuffer()
right_table_union_columns.foreach(c=>{
if(secondColumnSet.contains(c)){
//这个配置的union列是表2中的
rightColumnList.append(col(c))
}else{
//这个是表2的新列
withRightColumnList.append(c)
}
})
var rightQueryDf: Dataframe = secondDF.select(rightColumnList.map(c => {
c
}): _*)
withRightColumnList.foreach(c=>{
rightQueryDf = rightQueryDf.withColumn(c,lit(null))
})
union_type match {
case 1 =>{
//不去重合并
leftQueryDf.union(rightQueryDf).show()
}
case 2 =>{
//去重合并
leftQueryDf.union(rightQueryDf).dropDuplicates().show()
}
}
}
override def getAppName(): String = "union合并"
}
object Union{
def main(args: Array[String]): Unit = {
new Union().process()
}
}
参数解释
union_type:1->合并不去重 2->合并行去重left_table_union_columns:指定左表需要合并的列right_table_union_columns:右表需要合并的列new_columns:合并结果列的名称
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)