大数据应用技术 | 使用 Docker搭建 Flink 集群 | FlinkStreaming实时数据流处理程序实现词频统计

大数据应用技术 | 使用 Docker搭建 Flink 集群 | FlinkStreaming实时数据流处理程序实现词频统计,第1张

大数据应用技术 | 使用 Docker搭建 Flink 集群 | FlinkStreaming实时数据流处理程序实现词频统计

文章目录
  • 运行环境
  • 1. 安装、配置 Flink
    • 1.1使用Xftp工具将Flink资源传入虚拟机
    • 1.2 在虚拟机中将Flink资源传输到docker容器
    • 1.3 解压Flink资源并配置环境变量
  • 2.Flink单节点测试
    • 2.1 在hadoop101节点启动zookeeper 和 hadoop dfs 和 yarn
    • 2.2 以 Yarn方式启动 Flink Scala
  • 3.配置 flink集群
  • 4. 外部测试FlinkStreaming词频统计程序
    • 4.1 配置pom.xml 依赖
    • 4.2 编写词频统计的Flink程序
    • 4.3 启动 Flink集群 ,测试词频统计程序
  • 5. 本地测试 将打包的FlinkStream程序 提交到Flink集群
    • 5.1 在IDEA使用Maven打包Flink项目
    • 5.2 在Flink集群节点执行jar包

运行环境
  • VMware
  • Centos7 虚拟机 *** 作系统
  • Docker
  • Jdk8
  • Hadoop3.1.3
  • scala 2.11.8
  • flink-1.9.1
  • Maven3.6.3
  • IDEA

注:在搭建Flink集群时需有Hadoop环境,尽管案例里没有用到HDFS,但通过资源调度管理的yarn组件启动Flink-shell 会有更好的效果。

关于Docker搭建Hadoop集群可以参考这篇博文 基于Docker搭建完全分布式集群

1. 安装、配置 Flink
1.1使用Xftp工具将Flink资源传入虚拟机

1.2 在虚拟机中将Flink资源传输到docker容器
$ sudo docker cp flink-1.9.1-bin-scala_2.11.tgz hadoop101:/opt/download

1.3 解压Flink资源并配置环境变量
tar -zxvf flink-1.9.1-bin-scala_2.12.tgz -C ./

mv flink-1.9.2 flink

vim /etc/profile.d/home.sh

source /etc/profile

2.Flink单节点测试
2.1 在hadoop101节点启动zookeeper 和 hadoop dfs 和 yarn
zkServer.sh start

start-dfs.sh

start-yarn.sh

2.2 以 Yarn方式启动 Flink Scala
./start-cluster.sh
./start-scala-shell.sh yarn

测试完毕后, :q 退出

3.配置 flink集群

在flink根目录下的conf文件夹中修改 masters 文件的内容为

hadoop101:8081

然后再修改workers文件夹的内容为

Hadoop101
Hadoop102
Hadoop103

配置 flink-conf.yaml文件:
将 jobmanager.rpc.address 的属性值设置为 hadoop101
再添加一行 taskmanager.tmp.dirs: /opt/module/flink/tmp 指定flink任务的缓存目录

接下来,分发配置好的 Flink资源到其他节点,即 hadoop102 和 hadoop103

scp -r ./flink hadoop103:/opt/module/flink
scp -r ./flink hadoop102:/opt/module/flink

4. 外部测试FlinkStreaming词频统计程序

创建IDEA项目,需下载 scala 插件,给项目配置scala的SDK,笔者这里的版本号是2.11.8

4.1 配置pom.xml 依赖

pom.xml



        4.0.0
        com.ycc
        wordcount
        WordCount
        jar
        1.0
        
            
                alimaven
                aliyun maven
                http://maven.aliyun.com/nexus/content/groups/public/
            
	        
	            maven1
	            maven1
	            http://repo.maven.apache.org/maven2
	        
        

    
        1.9.1
    
    
            
                org.apache.flink
                flink-scala_2.11
                ${flink.version}
            
            
                org.apache.flink
                flink-streaming-scala_2.11
                ${flink.version}
            
            
                org.apache.flink
                flink-clients_2.11
                ${flink.version}
            

            
                org.scala-lang
                scala-library
                2.11.8
            
    
    
        src/main/scala
        
        
            
                net.alchim31.maven
                scala-maven-plugin
                
                    
                        compile-scala
                        process-resources
                        
                            add-source
                            compile
                        
                    
                    
                        scala-compile-first
                        compile
                        
                            add-source
                            testCompile
                        
                    
                
                
                    2.11.8
                
            
        
    

4.2 编写词频统计的Flink程序

WordCount.scala

这个程序主要就是使用 Flink 的API方法进行词频统计,通过flatMap转换 *** 作将每一行的数据源根据空格进行分隔,并把所有字母统一为小写,之后再使用map *** 作使其变为key-value键值对,即(单词, 出现频数),最后在进行分组、求和,就得出了最终的词频统计结果 。当在IDEA中执行时,使用的是IDEA本地环境,没有发布到Flink集群环境上,故可直接在IDEA中进行测试。

package com.ycc

import org.apache.flink.api.scala._

object WordCount {
  def main(args: Array[String]): Unit = {

    //第1步:建立执行环境
    val env = ExecutionEnvironment.getExecutionEnvironment

    //第2步:创建数据源
    val text = env.fromElements(
      "hello, world!",
      "hello, world!",
      "hello, world!")

    //第3步:对数据集指定转换 *** 作
    val counts = text.flatMap { _.toLowerCase.split(" ") }
      .map { (_, 1) }
      .groupBy(0)
      .sum(1)

    // 第4步:输出结果
    counts.print()
  }
}

StreamWordCount.scala

与上个程序的不同是其数据源来自于IP:端口号的形式,通过在Flink集群的节点,即hadoop101节点向一个端口发送消息,然后程序向这个端口获取数据,获取后进行词频套机的 *** 作,不同于之前,它用到了timeWindow窗口,而且最终有使用execute方法提交这个Job任务。

package com.ycc

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time

object StreamWordCount{
  def main(args: Array[String]): Unit = {

    //第1步:建立执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //第2步:创建数据源
    val source = env.socketTextStream("hadoop101",9999,'n')

    //第3步:对数据集指定转换 *** 作逻辑
    val dataStream = source.flatMap(_.split(" "))
      .map((_,1))
      .keyBy(0)
      .timeWindow(Time.seconds(2),Time.seconds(2))
      .sum(1)

    //第4步:指定计算结果输出位置
    dataStream.print()

    //第5步:指定名称并触发流计算
    env.execute("Flink Streaming Word Count")
  }
}
4.3 启动 Flink集群 ,测试词频统计程序

第一个程序,IDEA中的执行结果


测试第二个程序前需在hadoop101节点监听9999端口号 并发送消息

nc -lk 9999


第二个程序, FlinkStreaming程序在IDEA中的执行结果

5. 本地测试 将打包的FlinkStream程序 提交到Flink集群
5.1 在IDEA使用Maven打包Flink项目

5.2 在Flink集群节点执行jar包

在代码中,需监听hadoop101节点的端口,故在hadoop102节点提交Flink包
测试第一个程序的结果:

flink run --class com.ycc.WordCount ./wordcount-1.0.jar


测试第二个程序出现报错


通过访问Flink web 页面,地址: http://hadoop101:8081/查看报错信息


根据报错提示,发现出错原因是端口占用,可能是因为之前测试9999端口时,没有正常退出,现尝试将StreamWordCount.scala程序val source = env.socketTextStream(“hadoop101”,9999,’n’)
这部分中的9999端口号改为12345,并重新打包,传输到docker容器,在提交任务前,需先打开hadoop101节点的监听端口 nc -lk 12345,再提交jar包,命令如下:

nc -lk 12345

flink run --class com.ycc.StreamWordCount ./wordcount-1.0.jar

查询词频统计输出的结果

tail -f ./log/flink*.out

因为没有使用后台执行的参数,现使用第三个节点通过ssh命令远程登陆hadoop102查看结果,最后的词频统计结果如下:

至此,已完成了 Flink集群的搭建以及 FlinkStreaming *** 作进行词频统计。

欢迎分享,转载请注明来源:内存溢出

原文地址:https://54852.com/zaji/5687785.html

(0)
打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
上一篇 2022-12-17
下一篇2022-12-17

发表评论

登录后才能评论

评论列表(0条)

    保存