
- 运行环境
- 1. 安装、配置 Flink
- 1.1使用Xftp工具将Flink资源传入虚拟机
- 1.2 在虚拟机中将Flink资源传输到docker容器
- 1.3 解压Flink资源并配置环境变量
- 2.Flink单节点测试
- 2.1 在hadoop101节点启动zookeeper 和 hadoop dfs 和 yarn
- 2.2 以 Yarn方式启动 Flink Scala
- 3.配置 flink集群
- 4. 外部测试FlinkStreaming词频统计程序
- 4.1 配置pom.xml 依赖
- 4.2 编写词频统计的Flink程序
- 4.3 启动 Flink集群 ,测试词频统计程序
- 5. 本地测试 将打包的FlinkStream程序 提交到Flink集群
- 5.1 在IDEA使用Maven打包Flink项目
- 5.2 在Flink集群节点执行jar包
- VMware
- Centos7 虚拟机 *** 作系统
- Docker
- Jdk8
- Hadoop3.1.3
- scala 2.11.8
- flink-1.9.1
- Maven3.6.3
- IDEA
注:在搭建Flink集群时需有Hadoop环境,尽管案例里没有用到HDFS,但通过资源调度管理的yarn组件启动Flink-shell 会有更好的效果。
关于Docker搭建Hadoop集群可以参考这篇博文 基于Docker搭建完全分布式集群
1. 安装、配置 Flink1.1使用Xftp工具将Flink资源传入虚拟机 1.2 在虚拟机中将Flink资源传输到docker容器
$ sudo docker cp flink-1.9.1-bin-scala_2.11.tgz hadoop101:/opt/download1.3 解压Flink资源并配置环境变量
tar -zxvf flink-1.9.1-bin-scala_2.12.tgz -C ./ mv flink-1.9.2 flink vim /etc/profile.d/home.sh source /etc/profile2.Flink单节点测试
2.1 在hadoop101节点启动zookeeper 和 hadoop dfs 和 yarn
zkServer.sh start start-dfs.sh start-yarn.sh2.2 以 Yarn方式启动 Flink Scala
./start-cluster.sh ./start-scala-shell.sh yarn
测试完毕后, :q 退出
3.配置 flink集群在flink根目录下的conf文件夹中修改 masters 文件的内容为
hadoop101:8081
然后再修改workers文件夹的内容为
Hadoop101 Hadoop102 Hadoop103
配置 flink-conf.yaml文件:
将 jobmanager.rpc.address 的属性值设置为 hadoop101
再添加一行 taskmanager.tmp.dirs: /opt/module/flink/tmp 指定flink任务的缓存目录
接下来,分发配置好的 Flink资源到其他节点,即 hadoop102 和 hadoop103
scp -r ./flink hadoop103:/opt/module/flink scp -r ./flink hadoop102:/opt/module/flink4. 外部测试FlinkStreaming词频统计程序
创建IDEA项目,需下载 scala 插件,给项目配置scala的SDK,笔者这里的版本号是2.11.8
4.1 配置pom.xml 依赖pom.xml
4.2 编写词频统计的Flink程序4.0.0 com.ycc wordcountWordCount jar 1.0 alimaven aliyun maven http://maven.aliyun.com/nexus/content/groups/public/ maven1 maven1 http://repo.maven.apache.org/maven2 1.9.1 org.apache.flink flink-scala_2.11${flink.version} org.apache.flink flink-streaming-scala_2.11${flink.version} org.apache.flink flink-clients_2.11${flink.version} org.scala-lang scala-library2.11.8 src/main/scala net.alchim31.maven scala-maven-plugincompile-scala process-resources add-source compile scala-compile-first compile add-source testCompile 2.11.8
WordCount.scala
这个程序主要就是使用 Flink 的API方法进行词频统计,通过flatMap转换 *** 作将每一行的数据源根据空格进行分隔,并把所有字母统一为小写,之后再使用map *** 作使其变为key-value键值对,即(单词, 出现频数),最后在进行分组、求和,就得出了最终的词频统计结果 。当在IDEA中执行时,使用的是IDEA本地环境,没有发布到Flink集群环境上,故可直接在IDEA中进行测试。
package com.ycc
import org.apache.flink.api.scala._
object WordCount {
def main(args: Array[String]): Unit = {
//第1步:建立执行环境
val env = ExecutionEnvironment.getExecutionEnvironment
//第2步:创建数据源
val text = env.fromElements(
"hello, world!",
"hello, world!",
"hello, world!")
//第3步:对数据集指定转换 *** 作
val counts = text.flatMap { _.toLowerCase.split(" ") }
.map { (_, 1) }
.groupBy(0)
.sum(1)
// 第4步:输出结果
counts.print()
}
}
StreamWordCount.scala
与上个程序的不同是其数据源来自于IP:端口号的形式,通过在Flink集群的节点,即hadoop101节点向一个端口发送消息,然后程序向这个端口获取数据,获取后进行词频套机的 *** 作,不同于之前,它用到了timeWindow窗口,而且最终有使用execute方法提交这个Job任务。
package com.ycc
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
object StreamWordCount{
def main(args: Array[String]): Unit = {
//第1步:建立执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//第2步:创建数据源
val source = env.socketTextStream("hadoop101",9999,'n')
//第3步:对数据集指定转换 *** 作逻辑
val dataStream = source.flatMap(_.split(" "))
.map((_,1))
.keyBy(0)
.timeWindow(Time.seconds(2),Time.seconds(2))
.sum(1)
//第4步:指定计算结果输出位置
dataStream.print()
//第5步:指定名称并触发流计算
env.execute("Flink Streaming Word Count")
}
}
4.3 启动 Flink集群 ,测试词频统计程序
第一个程序,IDEA中的执行结果
测试第二个程序前需在hadoop101节点监听9999端口号 并发送消息
nc -lk 9999
第二个程序, FlinkStreaming程序在IDEA中的执行结果
5.1 在IDEA使用Maven打包Flink项目 5.2 在Flink集群节点执行jar包
在代码中,需监听hadoop101节点的端口,故在hadoop102节点提交Flink包
测试第一个程序的结果:
flink run --class com.ycc.WordCount ./wordcount-1.0.jar
测试第二个程序出现报错
通过访问Flink web 页面,地址: http://hadoop101:8081/查看报错信息
根据报错提示,发现出错原因是端口占用,可能是因为之前测试9999端口时,没有正常退出,现尝试将StreamWordCount.scala程序val source = env.socketTextStream(“hadoop101”,9999,’n’)
这部分中的9999端口号改为12345,并重新打包,传输到docker容器,在提交任务前,需先打开hadoop101节点的监听端口 nc -lk 12345,再提交jar包,命令如下:
nc -lk 12345 flink run --class com.ycc.StreamWordCount ./wordcount-1.0.jar
查询词频统计输出的结果
tail -f ./log/flink*.out
因为没有使用后台执行的参数,现使用第三个节点通过ssh命令远程登陆hadoop102查看结果,最后的词频统计结果如下:
至此,已完成了 Flink集群的搭建以及 FlinkStreaming *** 作进行词频统计。
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)