
大家好,我是瓜哥,今天进行了Flink相关内容的总结和学习。Flink可以处理实时流式数据处理。
具体的资料文档可以登录flink官方文档直接进行在线学习。flink官方文档
直接看官方文档对flink的一个架构图:
Apache Flink 功能强大,支持开发和运行多种不同种类的应用程序。它的主要特性包括:批流一体化、精密的状态管理、事件时间支持以及精确一次的状态一致性保障等。Flink 不仅可以运行在包括 YARN、 Mesos、Kubernetes 在内的多种资源管理框架上,还支持在裸机集群上独立部署。在启用高可用选项的情况下,它不存在单点失效问题。事实证明,Flink 已经可以扩展到数千核心,其状态可以达到 TB 级别,且仍能保持高吞吐、低延迟的特性。世界各地有很多要求严苛的流处理应用都运行在 Flink 之上。
今天我们来说说StreamExecutionEnvironment的用法
首先我们先来看下StreamExecutionEnvironment的UML图
StreamExecutionEnvironment是一个任务的启动的入口
下面总共有5个实现类
分别是
-
LocalStreamEnvironment StreamPlanEnvironment StreamContextEnvironment RemoteStreamEnvironment LegacyLocalStreamEnvironment
下面通过在Linux客户端开启一个监听端口,然后通过StreamExecutionEnvironment来对客户端的流式数据进行读取。
1、首先创建guage-flink工程入下图2、在pom文件中引入以下依赖
3、创建StreamWordCount.java类4.0.0 cn.com.guage guage-flink1.0-SNAPSHOT org.apache.flink flink-clients_2.111.6.1 org.apache.flink flink-streaming-java_2.111.6.1 org.apache.flink flink-streaming-scala_2.111.6.1 org.apache.flink flink-connector-wikiedits_2.111.4.1 org.slf4j slf4j-log4j121.7.21 test log4j log4j1.2.17
package cn.com.guage.flink;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class StreamWordCount {
public static void main(String[] args) throws Exception{
//linux里面执行nc -lk 9527
//创建流处理执行环境
StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
//从socket文本流读取数据
DataStream inputDataStream = streamEnv.socketTextStream("192.168.0.128",9528);
//基于数据流进行转换计算
DataStream> resultSum = inputDataStream.flatMap(new MyFlatMapper()).keyBy(0).sum(1);
resultSum.print();
streamEnv.execute();
}
//自定义类,实现FlatMapFunction接口
public static class MyFlatMapper implements FlatMapFunction>{
private static final long serialVersionUID = -6332218868988874763L;
public void flatMap(String s, Collector> collector){
//按空格分词
String[] words = s.split(" ");
//遍历所有word 加工成二元组
for(String word:words){
collector.collect(new Tuple2(word,1));
}
}
}
}
4、开启linux监听客户端
客户端监听是用国产的深度 *** 作系统,大家可以在虚拟机中搭建类似的测试环境。如下图:
本人利用SecureCRT直接连接虚拟机,IP地址为:192.168.0.128
在命令行窗口执行如下命令:
nc -lk 9527
如下图
说明:
DataStream
中的IP地址和port要和linux启动的IP地址端口一致。这样流处理程序就会等待客户端那边的数据,只要数据出现,就会不断对接收到的数据进行处理。
6、客户端数据输入在9527端口客户端输入以下字符:
然后在console窗口就可以看到:
如下图
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)