
背景报错截图:分析&解决:
背景学习Flink时,java程序启动报错:Caused by: java.net.ConnectException: Connection refused: connect
报错截图: 分析&解决:报错为连接异常,检查错位为端口未启动,没有数据,应该先启动端口,
启动后再运行java程序,问题解决。
附上java程序:
public class FlinkWordCountJobWithAnonymous {
public static void main(String[] args) throws Exception {
//1 获取flink运行环境
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
//2.加载数据源为dataStream ,绑定hadoop10的9999端口,将这个网络端口发送的数据加载为dataStream
DataStreamSource dataStream = environment.socketTextStream("hadoop10", 9999);
//3.执行多个转换算子 ,SingleOutputStreamOperator是DataStreamSource子类
SingleOutputStreamOperator> result = dataStream.flatMap(new FlatMapFunction() {
@Override
//value:表示一个待处理的数据,在这里就是一行字符串
//out: 用于输出结果的工具对象
public void flatMap(String value, Collector out) throws Exception {
//拆分value,通过out输出结果
String[] words = value.split("//s+"); //去除一个或多个空格
for (String word : words) {
out.collect(word);
}
}
}) //执行一行字符串拆分为多个单词
.map(new MapFunction>() {
@Override
public Tuple2 map(String value) throws Exception {
return Tuple2.of(value, 1);
}
}) //将多个单词转换为(单词,1) 这种tuple2对象
.keyBy(0) //根据单词为key分组,0表示tuple2中的第一个属性,也就是单词
.sum(1);//统计每组单词的个数, 1表示tuple2中第2个属性,也就是次数
//4.通过sink算子输出结果
result.print();
//5.发布执行
environment.execute("flinkWordCount"); //为任务起别名
}
}
打印结果: (前面数字为调用第几个cpu)
5> (world,1)
3> (hello,1)
2> (lgy,1)
3> (hello,2)
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)