Flink(六)流处理 DataStream API 数据源及数据流转换算子示例

Flink(六)流处理 DataStream API 数据源及数据流转换算子示例,第1张

Flink(六)流处理 DataStream API 数据源及数据流转换算子示例

DataStream API 流数据处理
    • 流处理基本流程
    • UDF (User-Defined Functions)函数形式编程
    • Data Source 数据源算子
      • 原理
      • 简单数据源
      • 自定义数据源
    • Transform Operators 数据流转换算子
      • 基础转换算子
      • 数据流基础分组算子
      • 简单时间窗口算子
          • 滚动时间窗口 Tumbling Windows
          • 滑动事件窗口 Sliding Windows
          • 会话窗口 Session Windows
      • 流数据合并算子
    • 源码地址

流处理基本流程 UDF (User-Defined Functions)函数形式编程

Flink 中 UDF 无处不在,所有接口几乎都实现了 Function 函数接口,支持 Lambda 表达式,匿名函数类,自定义函数类。大多数 *** 作都需要用户定义的函数。

在 Flink 中有四种指定用户自定义函数类的方式:

  1. Implementing an interface 实现 Flink 提供的函数类接口
  2. Anonymous classes 使用匿名类
  3. Java 8 Lambdas 使用 Lambda 表达式
  4. Extends Rich functions 继承 Flink 中提供的富函数类

Rich functions provide, in addition to the user-defined function (map, reduce, etc), four methods: open, close, getRuntimeContext, and setRuntimeContext. These are useful for parameterizing the function (see Passing Parameters to Functions), creating and finalizing local state, accessing broadcast variables (see Broadcast Variables), and for accessing runtime information such as accumulators and counters (see Accumulators and Counters), and information on iterations (see Iterations).
重要作用:

  • 参数化函数类:即可以想函数类传递参数
  • 创建和完成本地状态
  • 访问广播变量
  • 访问运行时信息,例如计数器
  • 访问迭代信息
Data Source 数据源算子 原理

核心组件:Splits(分区), SplitEnumerator(分区枚举器), SourceReader(数据源读取器)

Splits(分区)
表示消耗数据源的一个部分,是数据源分配work和并行读取数据的粒度

例如:一个目录下有多个文件,文件就可以作为这样的一个粒度;kafka 的 topic 也可以作为分配work和并行读取数据划分粒度

SplitEnumerator(分区枚举器)
产生 Split 分区,并将 split 分区分配给 SourceReader(数据源读取器)。SplitEnumerator 在 JM 中单例运行
SourceReader(数据源读取器)
请求一个数据源 split 分区,并处理分区中的数据。SourceReader 在 SourceOperators(数据源算子)中的 TaskManager 上并行运行(即 每一个 SourceReader 在不同的 TaskSlot 中),并生成并行的流。
简单数据源
  1. 集合中获取流数据

    DataStream streamSource = 
    		environment.fromCollection(Arrays.asList(···))
    
    environment.fromElements(···)
    
  2. 文件流

    StreamExecutionEnvironment environment = 
    	StreamExecutionEnvironment.getExecutionEnvironment();
    
        DataStream streamSource = environment
                .readTextFile("filePath")
    
  3. Socket 中获取

    DataStreamSource streamSourceFromSocket = executionEnvironment
    	.socketTextStream("192.168.116.100", 9999);
    
  4. 从 Kafka 获取数据流

    
        org.apache.flink
        flink-connector-kafka_${scala.binary.version}
        ${flink.version}
    
    
    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", kafkaServers);
    properties.setProperty("group.id", groupId);
    
    DataStream streamSource = environment.addSource(
        new FlinkKafkaConsumer(topic, (DeserializationSchema) new SimpleStringSchema(), properties));
     
      

    以上 1、2、3 时 Flink 内置的,4 需要引入 Kafka 连接器,其他连接器使用类似

    自定义数据源

    目的:更细粒度的控制数据源
    应用:

      • 模拟真实无界流数据的输入,常用于构建测试数据源
      • 数据源分区控制
    Transform Operators 数据流转换算子 基础转换算子
    1. map
    2. flatMap
    3. filter
    public class BasicTransformTest {
    
        public static void main(String[] args) throws Exception {
            ParameterTool parameterTool = ParameterTool.fromArgs(args);
            String testInputFilePath = parameterTool.get("file.path");
            testInputFilePath = testInputFilePath == null ?
                    "E:\Projects\bigdata\flink\flink-study\demo01\src\main\resources\userclickrecordtempdata.txt":
                    testInputFilePath;
    
            StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
    
            DataStream dataSource = executionEnvironment.readTextFile(testInputFilePath);
    
            
            SingleOutputStreamOperator mapOutputStreamOperator = dataSource.map((MapFunction) line -> {
                String[] lineIrems = line.split(",");
                UserClickRecord record = new UserClickRecord(lineIrems[0],
                        lineIrems[1],
                        lineIrems[2],
                        lineIrems[3],
                        lineIrems[4]);
                record.setLabel("0");
                return record;
            });
    
            mapOutputStreamOperator.print("mapOpt");
    
    
            
            SingleOutputStreamOperator flatMapOutputStream = dataSource.flatMap((FlatMapFunction>) (line, out) -> {
                String[] lineIrems = line.split(",");
                UserClickRecord record = new UserClickRecord(lineIrems[0], lineIrems[1], lineIrems[2], lineIrems[3], lineIrems[4]);
    
                String features = record.getFeatures();
                String[] featureArr = features.split(" ");
                ArrayList goodsFeatures = new ArrayList<>();
                ArrayList userFeatures = new ArrayList<>();
                for (int i = 0; i < featureArr.length; i++) {
                    if (i < 5) {
                        goodsFeatures.add(Double.parseDouble(featureArr[i]));
                    } else {
                        userFeatures.add(Double.parseDouble(featureArr[i]));
                    }
                }
                out.collect(goodsFeatures);
                out.collect(userFeatures);
            });
    
            flatMapOutputStream.print("flatOpt");
    
    
            
            SingleOutputStreamOperator filterOutputStreamOperator = dataSource.filter((FilterFunction) line -> {
                String[] lineIrems = line.split(",");
                UserClickRecord record = new UserClickRecord(lineIrems[0], lineIrems[1], lineIrems[2], lineIrems[3], lineIrems[4],lineIrems[5]);
                if ("1".equals(record.getLabel())) {
                    return true;
                }
                return false;
            });
            filterOutputStreamOperator.print("filterOpt");
            executionEnvironment.execute("BasicTransformTestJob");
        }
    
    }
    
    

    UserClickRecord 时自定义的数据类型,可以在文末源码地址中获取

    数据流基础分组算子
    1. KeyBy
    2. Reduce
    3. WindowWindowAll - WindowApply/WindowReduce
    public class GroupTransformTest {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
    
            DataStreamSource inputStream = executionEnvironment
                    .readTextFile("E:\Projects\bigdata\flink\flink-study\demo01\src\main\resources\userclickrecordtempdata.txt");
    
            // 想利用基本转换将接收到的数据装欢为 POJO 对象
            SingleOutputStreamOperator objectStreamSource = inputStream.map((MapFunction) line -> {
                String[] lineIrems = line.split(",");
                UserClickRecord record = new UserClickRecord(lineIrems[0],
                        lineIrems[1],
                        lineIrems[2],
                        lineIrems[3],
                        lineIrems[4],
                        lineIrems[5]);
                return record;
            });
    
            
            KeyedStream keyedStream = objectStreamSource.keyBy(
                    (KeySelector) value -> value.getLabel());
    
            keyedStream.print("UserClickRecordKeyedStream");
    
            
            SingleOutputStreamOperator reduceResultSource = keyedStream.reduce((ReduceFunction) (value1, value2) -> {
                // ur1 先到的数据,ur2 时后来的数据,会
                String uuid1 = value1.getUuid();
                value1.setUuid(uuid1 + "," + value2.getUuid());
                return value1;
            });
            reduceResultSource.print("reduceResultSource");
    
    
            
            WindowedStream windowStream = keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(1)));
            SingleOutputStreamOperator windowApplyStream = windowStream.apply(new WindowFunction() {
    
                @Override
                public void apply(String s, TimeWindow window, Iterable input, Collector out) throws Exception {
                    StringBuffer uuids = new StringBuffer("");
                    for (UserClickRecord record : input) {
                        uuids.append("," + record.getUuid()) ;
                    }
                    out.collect(uuids.toString());
                }
            });
            windowApplyStream.print("windApply");
    
            executionEnvironment.execute("GroupTransformTestJob");
    
        }
    }
    
     
    简单时间窗口算子 
    
    滚动时间窗口 Tumbling Windows

    特点:

    1. 元素只出现在一个窗口中
    2. 窗口无重叠
    3. 窗口只需要一个参数

    滚动窗口时特殊的滑动窗口,窗口大小等于滑动步长。

    滑动事件窗口 Sliding Windows

    特点:

    1. 窗口可以重叠
    2. 一个元素可能会被分配到多个窗口中(window-size/window-slide)
    3. 有两个关键参数:窗口大小,滑动步长
    会话窗口 Session Windows

    特点:

    1. 窗口不重合
    2. 窗口时间跨度不一致
    3. 需要一个参数,会话间隔(session gap),即多长时间内接不大数据就断开会话窗口
    public class OpenWindowOperatorTest {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
            DataStreamSource socketDataSource = executionEnvironment.socketTextStream("192.168.250.55", 9999);
    
            // mapStream
            SingleOutputStreamOperator mapStream = socketDataSource.map((MapFunction) (line) -> {
                String[] lineItems = line.split(",");
                UserClickRecord record =
                        new UserClickRecord(lineItems[0], lineItems[1], lineItems[2], lineItems[3], lineItems[4], lineItems[5]);
                return record;
            });
    
            // KeyedStream
            KeyedStream userClickRecordKeyedStream =
                    mapStream.keyBy((KeySelector) record -> record.getLabel());
    
            
            // 滚动处理窗口
            WindowedStream tumblWindowStream =
                    userClickRecordKeyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));
            // 滑动处理窗口
            WindowedStream slidWindowStream =
                    userClickRecordKeyedStream.window(SlidingProcessingTimeWindows.of(Time.seconds(5),Time.seconds(5)));
    
            
            SingleOutputStreamOperator windowApplyedStream = slidWindowStream.apply(new WindowFunction() {
                @Override
                public void apply(String s, TimeWindow window, Iterable input, Collector out) throws Exception {
                    StringBuffer uuids = new StringBuffer("");
                    for (UserClickRecord record : input) {
                        uuids.append("," + record.getUuid()) ;
                    }
                    out.collect(uuids.toString());
                }
            });
    
            windowApplyedStream.print("windowApplyedStream");
    
            
            SingleOutputStreamOperator windowReducedStream = tumblWindowStream.reduce((ReduceFunction) (valueOld, valueNew) -> {
                String newEventUuid = valueNew.getUuid();
                String uuids = valueOld.getUuid() + "," + newEventUuid;
                valueOld.setUuid(uuids);
                return valueOld;
            });
    
            windowReducedStream.print("windowReducedStream");
    
            executionEnvironment.execute();
        }
    }
    
     
    流数据合并算子 
    
    1. Union
    2. Connect - CoMap, CoFlatMap
    3. Iterate
    public class StreamMergeOperatorTest {
        public static void main(String[] args) throws Exception {
            String filePath = "E:\Projects\bigdata\flink\flink-study\demo01\src\main\resources\userclickrecordtempdata.txt";
            StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
    
            DataStreamSource streamSourceFromMyUserClickSource = executionEnvironment.addSource(new UserDefineDataSourceTest.MyUserClickSource());
    
            DataStreamSource streamSourceFromSocket = executionEnvironment.socketTextStream("192.168.116.100", 9999);
    
            DataStreamSource streamSourceFromTextFile = executionEnvironment.readTextFile(filePath);
    
            DataStream streamSourceFromCollection = executionEnvironment
                    .fromCollection(Arrays.asList(
                        new UserClickRecord("1", "1111", "1", "1", "1", "1").toString(),
                        new UserClickRecord("2", "222", "1", "1", "1", "1").toString(),
                        new UserClickRecord("3", "33", "1", "1", "1", "1").toString(),
                        new UserClickRecord("4", "44", "1", "1", "1", "1").toString(),
                        new UserClickRecord("5", "55", "1", "1", "1", "1").toString(),
                        new UserClickRecord("6", "66", "1", "1", "1", "1").toString()
                    )
            );
    
            
            DataStream unionStringDataStream =
                    streamSourceFromSocket.union(streamSourceFromTextFile,streamSourceFromCollection);
    
            unionStringDataStream.print("unionSocketAndFile");
    
            
            ConnectedStreams connectedStream =
                    streamSourceFromMyUserClickSource.connect(streamSourceFromSocket);
            SingleOutputStreamOperator connectedStreamCoMap =
                    connectedStream.map(new CoMapFunction() {
                @Override
                public UserClickRecord map1(UserClickRecord value) throws Exception {
                    value.setUuid("MyUserClickSource-" + value.getUuid());
                    return value;
                }
    
                @Override
                public UserClickRecord map2(String value) throws Exception {
                    String[] lineIrems = value.split(",");
                    UserClickRecord record = new UserClickRecord("SocketSource-" + lineIrems[0],
                            lineIrems[1],
                            lineIrems[2],
                            lineIrems[3],
                            lineIrems[4],
                            lineIrems[5]);
                    return record;
                }
            });
            connectedStreamCoMap.print("connectedStreamCoMap");
    
    
            
    
            IterativeStream iteration = streamSourceFromMyUserClickSource.iterate();
            SingleOutputStreamOperator iterationBody =
                    iteration.map((MapFunction) value -> {
                        if ("1".equals(value.getLabel())){
                            value.setUuid("非正常点击:" + value.getUuid());  //对应反馈回来的数据做处理
                            value.setLabel("3"); // 重新标记
                        }
                return value;
            });
            DataStream feedback = iterationBody.filter(
                    (FilterFunction) value -> "1".equals(value.getLabel()))
                    .setParallelism(1);// 因为 streamSourceFromMyUserClickSource 的并行度为 1,所有 feedback 回去的并行度要一致
            iteration.closeWith(feedback);
    
            DataStream  out = iterationBody.filter(
                    (FilterFunction) value -> !"1".equals(value.getLabel()));
    
            ConnectedStreams connectOutWithFeedBack = out.connect(feedback);
            SingleOutputStreamOperator connectedFeedBackStream = connectOutWithFeedBack.map(new CoMapFunction() {
                @Override
                public String map1(UserClickRecord value) throws Exception {
                    return value.toString();
                }
    
                @Override
                public String map2(UserClickRecord value) throws Exception {
                    value.setUuid("非正常点击:" + value.getUuid());
                    return value.toString();
                }
            });
    
            connectedFeedBackStream.print("connectedFeedBackStream");
    
            executionEnvironment.execute("unionStringDataStream");
        }
    }
    
    
    源码地址

    flink-study

    欢迎分享,转载请注明来源:内存溢出

    原文地址:https://54852.com/zaji/5136187.html

    (0)
    打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
    上一篇 2022-11-17
    下一篇2022-11-18

    发表评论

    登录后才能评论

    评论列表(0条)