Flink ProcessFucntion

Flink ProcessFucntion,第1张

Flink ProcessFucntion

文章目录
      • 1.为什么要学习底层 ProcessFuntion API
      • 2.Flink提供了哪些 ProcessFuntion
      • 3.这些 ProcessFuntion有什么不同

1.为什么要学习底层 ProcessFuntion API
  • 为了访问 时间戳 watermark以及注册定时事件
2.Flink提供了哪些 ProcessFuntion
  • ProcessFunction
  • KeyedProcessFunction: keyBy后调用
  • CoProcessFunction
  • ProcessJoinFunction
  • BroadcastProcessFunction
  • KeyedBroadcastProcessFunction
  • ProcessWindowFunction: 开窗之后调用
  • ProcessAllWindowFunction
public class ProcessTest1_KeyedProcessFunction {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // socket文本流
        DataStream inputStream = env.socketTextStream("localhost", 7777);

        // 转换成SensorReading类型
        DataStream dataStream = inputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        });

        // 测试KeyedProcessFunction,先分组然后自定义处理
        dataStream.keyBy("id")
                .process( new MyProcess() )
                .print();

        env.execute();
    }

    // 实现自定义的处理函数
    public static class MyProcess extends KeyedProcessFunction{
        ValueState tsTimerState;

        @Override
        public void open(Configuration parameters) throws Exception {
            tsTimerState =  getRuntimeContext().getState(new ValueStateDescriptor("ts-timer", Long.class));
        }


        @Override
        public void processElement(SensorReading value, Context ctx, Collector out) throws Exception {
            out.collect(value.getId().length());

            // context
            ctx.timestamp();
            ctx.getCurrentKey();
           ctx.output();
           // 创建定时器 定时器源码:eventTimeTimersQueue.add(new TimerHeapInternalTimer(Timer))
            ctx.timerService().currentProcessingTime();
            ctx.timerService().currentWatermark();
            ctx.timerService().registerProcessingTimeTimer( ctx.timerService().currentProcessingTime() + 5000L);
            tsTimerState.update(ctx.timerService().currentProcessingTime() + 1000L);
//            ctx.timerService().registerEventTimeTimer((value.getTimestamp() + 10) * 1000L);
//            ctx.timerService().deleteProcessingTimeTimer(tsTimerState.value());
        }


        @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception {
            System.out.println(timestamp + " 定时器触发");
            ctx.getCurrentKey();
//           ctx.output();
            ctx.timeDomain();
        }

        @Override
        public void close() throws Exception {
            tsTimerState.clear();
        }
    }
}

3.这些 ProcessFuntion有什么不同
  • 调用的地方不同

欢迎分享,转载请注明来源:内存溢出

原文地址:https://54852.com/zaji/5669549.html

(0)
打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
上一篇 2022-12-16
下一篇2022-12-16

发表评论

登录后才能评论

评论列表(0条)

    保存