Spark 在一个sparksession中并行的执行多个Job

Spark 在一个sparksession中并行的执行多个Job,第1张

Spark 在一个sparksession中并行的执行多个Job

有两个独立的job A和B可以并行执行,按spark默认的方式A和B是顺序执行的

在代码中进行如下调整

测试用例如下:

代码在win10虚拟机中执行
cpu核数为6

package myspark.core

import myspark.core.readFTP.{toCSVFile, toClickHouse}
import org.apache.hadoop.util.ThreadUtil.getResourceAsStream
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Dataframe, SparkSession}

import java.util.Properties
import java.util.concurrent.{Callable, Executors, TimeUnit}
import scala.concurrent._
import scala.concurrent.duration._
import scala.concurrent.duration.{Duration, MINUTES}
import scala.concurrent.{Await, ExecutionContext, Future}

object testAsyncExecJob {

  def getLocalSparkSession() = {

    val properties = new Properties
    properties.load(getResourceAsStream("conf.properties"))

    val conf = new SparkConf()
      .setAppName("testAsyncExecJob")
      .setMaster("local[*]")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .set("spark.scheduler.mode", "FIFO")
      .set("spark.sql.parquet.binaryAsString", "true")
      .set("spark.sql.crossJoin.enabled", "true")
      .set("spark.debug.maxToStringFields", "1000")
      .set("spark.local.dir", "./SparkTempFile")

    val spark = SparkSession.builder()
      .config(conf)
      .getOrCreate()

    spark
  }

  def main(args: Array[String]): Unit = {

    val spark = getLocalSparkSession()

    import spark.implicits._
    val list1 = (0 until 1000).toDF().repartition(2)
    val list2 = (1001 until 2000).toDF().repartition(2)
  }


  def a(df: Dataframe)(implicit xc: ExecutionContext) = Future {

    df.foreach(x => {
      Thread.sleep(1000)
      println("a:" + x)
    })

  }

  def b(df: Dataframe)(implicit xc: ExecutionContext) = Future {

    df.foreach(x => {
      Thread.sleep(1000)
      println("b:" + x)
    })

  }

  def c(df: Dataframe) = {

    df.foreach(x => {
      Thread.sleep(1000)
      println("c:" + x)
    })

  }

  def d(df: Dataframe) = {

    df.foreach(x => {
      Thread.sleep(1000)
      println("d:" + x)
    })

  }

}
失败尝试1

在一个sparksession中,仅修改任务调度模式为spark.scheduler.mode=FAIR

    c(list1)
    d(list2)

任务仍为顺序执行

方法1
def main(args: Array[String]): Unit = {

				...
				...

    // 创建一个新的线程池,并指定线程数量
    val executors = Executors.newFixedThreadPool(10)
    
    // 基于一个已有的线程池,隐式的生命一个ExecutionContext
    implicit val xc: ExecutionContextExecutorService = ExecutionContext.fromExecutorService(executors)
    
    // 创建一个预执行任务序列,将任务a,b放入队列中,ExecutionContext会将队列中的任务同时提交,任务是否开始执行取决于cpu、内存资源是否足够
	// 等待任务都结束后退出app
	Await.result(Future.sequence(Seq(a(list1), b(list2))), Duration(1, MINUTES))

}

				...
				...

可以看到两个job同时在执行

查看输出

注释掉repartiiton(2),发现两个job虽然同时提交,但是其中一个任务独占了全部资源处于运行状态

方法2

使用Callable或Runable类,重写类中的call方法或run方法,将要执行的job放入call后run方法中提交

Callable和Runable的区别是前者的call有返回值,后者的run无返回值

 def main(args: Array[String]): Unit = {
		...
		...
		...
    // 创建一个新的线程池,并指定线程数量
    val executors = Executors.newFixedThreadPool(10)

//-----------------------------
    //计算业务1
        val task1 = executors.submit(new Callable[String] {
          override def call(): String = {
            //计算代码
            c(list1)
            "业务c计算完成"
          }
        })

        //计算业务2
        val task2 = executors.submit(new Callable[String] {
          override def call(): String = {
            //计算代码
            d(list2)
            "业务d计算完成"
          }
        })

        executors.awaitTermination(1, TimeUnit.HOURS)
        print(task1.get()+task2.get())
    //-----------------------------//

参考资料

spark多线程并发执行job
http://bcxw.net/article/810.html

谈Spark下并行执行多个Job的问题
https://bruce.blog.csdn.net/article/details/88349295

Spark异步job
https://blog.csdn.net/weixin_30793643/article/details/97128340

ExecutorService中submit和execute的区别
https://www.cnblogs.com/wanqieddy/p/3853863.html

欢迎分享,转载请注明来源:内存溢出

原文地址:https://54852.com/zaji/5688677.html

(0)
打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
上一篇 2022-12-17
下一篇2022-12-17

发表评论

登录后才能评论

评论列表(0条)

    保存