Flink流处理StreamExecutionEnvironment用法

Flink流处理StreamExecutionEnvironment用法,第1张

Flink流处理StreamExecutionEnvironment用法

大家好,我是瓜哥,今天进行了Flink相关内容的总结和学习。Flink可以处理实时流式数据处理。

具体的资料文档可以登录flink官方文档直接进行在线学习。flink官方文档

直接看官方文档对flink的一个架构图:

Apache Flink 功能强大,支持开发和运行多种不同种类的应用程序。它的主要特性包括:批流一体化、精密的状态管理、事件时间支持以及精确一次的状态一致性保障等。Flink 不仅可以运行在包括 YARN、 Mesos、Kubernetes 在内的多种资源管理框架上,还支持在裸机集群上独立部署。在启用高可用选项的情况下,它不存在单点失效问题。事实证明,Flink 已经可以扩展到数千核心,其状态可以达到 TB 级别,且仍能保持高吞吐、低延迟的特性。世界各地有很多要求严苛的流处理应用都运行在 Flink 之上。

今天我们来说说StreamExecutionEnvironment的用法

首先我们先来看下StreamExecutionEnvironment的UML图

StreamExecutionEnvironment是一个任务的启动的入口

下面总共有5个实现类

分别是

  • LocalStreamEnvironment
    StreamPlanEnvironment
    StreamContextEnvironment
    RemoteStreamEnvironment
    LegacyLocalStreamEnvironment

下面通过在Linux客户端开启一个监听端口,然后通过StreamExecutionEnvironment来对客户端的流式数据进行读取。

1、首先创建guage-flink工程入下图

 

2、在pom文件中引入以下依赖




	4.0.0

	cn.com.guage
	guage-flink
	1.0-SNAPSHOT

	

		
			org.apache.flink
			flink-clients_2.11
			1.6.1
		

		
			org.apache.flink
			flink-streaming-java_2.11
			1.6.1
		


		
			org.apache.flink
			flink-streaming-scala_2.11

			1.6.1
		

		
			org.apache.flink
			flink-connector-wikiedits_2.11
			1.4.1
		
		
			org.slf4j
			slf4j-log4j12
			1.7.21
			test
		

		
		
			log4j
			log4j
			1.2.17
		
	

3、创建StreamWordCount.java类

package cn.com.guage.flink;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;


public class StreamWordCount {
    public static void main(String[] args) throws Exception{
    	
    	//linux里面执行nc -lk  9527
        //创建流处理执行环境
		StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();

        //从socket文本流读取数据
        DataStream inputDataStream = streamEnv.socketTextStream("192.168.0.128",9528);

        //基于数据流进行转换计算
        DataStream> resultSum = inputDataStream.flatMap(new MyFlatMapper()).keyBy(0).sum(1);
        resultSum.print();
        streamEnv.execute();
    }
    
    //自定义类,实现FlatMapFunction接口
    public static class MyFlatMapper implements FlatMapFunction>{

        
		private static final long serialVersionUID = -6332218868988874763L;

		public void flatMap(String s, Collector> collector){
            //按空格分词
            String[] words = s.split(" ");
            //遍历所有word 加工成二元组
            for(String word:words){
                collector.collect(new Tuple2(word,1));
            }
        }
    }
}

4、开启linux监听客户端

客户端监听是用国产的深度 *** 作系统,大家可以在虚拟机中搭建类似的测试环境。如下图:

本人利用SecureCRT直接连接虚拟机,IP地址为:192.168.0.128

在命令行窗口执行如下命令:

nc -lk  9527

   如下图

 

5、在IDE开发工具中导入guage-flink工程,并启动StreamWordCount

说明:

DataStream inputDataStream =streamEnv.socketTextStream("192.168.0.128",9527);

中的IP地址和port要和linux启动的IP地址端口一致。这样流处理程序就会等待客户端那边的数据,只要数据出现,就会不断对接收到的数据进行处理。

6、客户端数据输入

在9527端口客户端输入以下字符:

然后在console窗口就可以看到:

如下图

欢迎分享,转载请注明来源:内存溢出

原文地址:https://54852.com/zaji/5116441.html

(0)
打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
上一篇 2022-11-17
下一篇2022-11-17

发表评论

登录后才能评论

评论列表(0条)

    保存