
有两个独立的job A和B可以并行执行,按spark默认的方式A和B是顺序执行的
在代码中进行如下调整
测试用例如下:
代码在win10虚拟机中执行
cpu核数为6
package myspark.core
import myspark.core.readFTP.{toCSVFile, toClickHouse}
import org.apache.hadoop.util.ThreadUtil.getResourceAsStream
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Dataframe, SparkSession}
import java.util.Properties
import java.util.concurrent.{Callable, Executors, TimeUnit}
import scala.concurrent._
import scala.concurrent.duration._
import scala.concurrent.duration.{Duration, MINUTES}
import scala.concurrent.{Await, ExecutionContext, Future}
object testAsyncExecJob {
def getLocalSparkSession() = {
val properties = new Properties
properties.load(getResourceAsStream("conf.properties"))
val conf = new SparkConf()
.setAppName("testAsyncExecJob")
.setMaster("local[*]")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.scheduler.mode", "FIFO")
.set("spark.sql.parquet.binaryAsString", "true")
.set("spark.sql.crossJoin.enabled", "true")
.set("spark.debug.maxToStringFields", "1000")
.set("spark.local.dir", "./SparkTempFile")
val spark = SparkSession.builder()
.config(conf)
.getOrCreate()
spark
}
def main(args: Array[String]): Unit = {
val spark = getLocalSparkSession()
import spark.implicits._
val list1 = (0 until 1000).toDF().repartition(2)
val list2 = (1001 until 2000).toDF().repartition(2)
}
def a(df: Dataframe)(implicit xc: ExecutionContext) = Future {
df.foreach(x => {
Thread.sleep(1000)
println("a:" + x)
})
}
def b(df: Dataframe)(implicit xc: ExecutionContext) = Future {
df.foreach(x => {
Thread.sleep(1000)
println("b:" + x)
})
}
def c(df: Dataframe) = {
df.foreach(x => {
Thread.sleep(1000)
println("c:" + x)
})
}
def d(df: Dataframe) = {
df.foreach(x => {
Thread.sleep(1000)
println("d:" + x)
})
}
}
失败尝试1
在一个sparksession中,仅修改任务调度模式为spark.scheduler.mode=FAIR
c(list1)
d(list2)
任务仍为顺序执行
def main(args: Array[String]): Unit = {
...
...
// 创建一个新的线程池,并指定线程数量
val executors = Executors.newFixedThreadPool(10)
// 基于一个已有的线程池,隐式的生命一个ExecutionContext
implicit val xc: ExecutionContextExecutorService = ExecutionContext.fromExecutorService(executors)
// 创建一个预执行任务序列,将任务a,b放入队列中,ExecutionContext会将队列中的任务同时提交,任务是否开始执行取决于cpu、内存资源是否足够
// 等待任务都结束后退出app
Await.result(Future.sequence(Seq(a(list1), b(list2))), Duration(1, MINUTES))
}
...
...
可以看到两个job同时在执行
查看输出
注释掉repartiiton(2),发现两个job虽然同时提交,但是其中一个任务独占了全部资源处于运行状态
使用Callable或Runable类,重写类中的call方法或run方法,将要执行的job放入call后run方法中提交
Callable和Runable的区别是前者的call有返回值,后者的run无返回值
def main(args: Array[String]): Unit = {
...
...
...
// 创建一个新的线程池,并指定线程数量
val executors = Executors.newFixedThreadPool(10)
//-----------------------------
//计算业务1
val task1 = executors.submit(new Callable[String] {
override def call(): String = {
//计算代码
c(list1)
"业务c计算完成"
}
})
//计算业务2
val task2 = executors.submit(new Callable[String] {
override def call(): String = {
//计算代码
d(list2)
"业务d计算完成"
}
})
executors.awaitTermination(1, TimeUnit.HOURS)
print(task1.get()+task2.get())
//-----------------------------//
参考资料
spark多线程并发执行job
http://bcxw.net/article/810.html
谈Spark下并行执行多个Job的问题
https://bruce.blog.csdn.net/article/details/88349295
Spark异步job
https://blog.csdn.net/weixin_30793643/article/details/97128340
ExecutorService中submit和execute的区别
https://www.cnblogs.com/wanqieddy/p/3853863.html
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)