sparkstream 自定义map累加器 支持删除kv对

sparkstream 自定义map累加器 支持删除kv对,第1张

sparkstream 自定义map累加器 支持删除kv对

删除kv对方法: 先用add方法设置value为-1,然后用reset清除掉value=-1的值.

import org.apache.spark.SparkContext
import org.apache.spark.streaming.Time
import org.apache.spark.util.AccumulatorV2

import scala.collection.mutable
import scala.collection.mutable.ListBuffer


object AccumulatorIfDataOut {

    // 设计单例模式用来初始化 map累加器 这里主要是针对从checkpoint恢复,累加器会丢失的现象来处理.
    @volatile private var instance: AccumulatorIfDataOut[(Time, Int)] = null

    def getInstance(sc: SparkContext) = {
        if (instance == null) {
            synchronized {
                if (instance == null) {
                    instance = new AccumulatorIfDataOut[(Time, Int)]()
                }
            }
        }
        // 累加器需要在spark上下文中注册才能使用; sc必须是rdd.sparkContext 而不是 ssc.sparkContext
        if (!instance.isRegistered) {
            sc.register(instance)
        }
        instance
    }

}



class AccumulatorIfDataOut[T] private extends AccumulatorV2[T, mutable.Map[Time, Int]]

欢迎分享,转载请注明来源:内存溢出

原文地址:https://54852.com/zaji/5116514.html

(0)
打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
上一篇 2022-11-17
下一篇2022-11-17

发表评论

登录后才能评论

评论列表(0条)

    保存