
删除kv对方法: 先用add方法设置value为-1,然后用reset清除掉value=-1的值.
import org.apache.spark.SparkContext
import org.apache.spark.streaming.Time
import org.apache.spark.util.AccumulatorV2
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
object AccumulatorIfDataOut {
// 设计单例模式用来初始化 map累加器 这里主要是针对从checkpoint恢复,累加器会丢失的现象来处理.
@volatile private var instance: AccumulatorIfDataOut[(Time, Int)] = null
def getInstance(sc: SparkContext) = {
if (instance == null) {
synchronized {
if (instance == null) {
instance = new AccumulatorIfDataOut[(Time, Int)]()
}
}
}
// 累加器需要在spark上下文中注册才能使用; sc必须是rdd.sparkContext 而不是 ssc.sparkContext
if (!instance.isRegistered) {
sc.register(instance)
}
instance
}
}
class AccumulatorIfDataOut[T] private extends AccumulatorV2[T, mutable.Map[Time, Int]]欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)