
flink-demo
--src
--main
--java
--com.bob.demo
--App
--DemoRichSink
--FlinkDemo
--resources
--consumer.properties
--start.sh
--pom.xml
2.具体代码
2.1 pom.xml
2.2 配置文件comsumer.properties4.0.0 com.bob flink-demo1.0-SNAPSHOT 8 8 1.11.2 org.apache.flink flink-streaming-java_2.11${flink.version} org.apache.flink flink-connector-kafka_2.11${flink.version}
bootstrap.servers=192.168.1.1:9092,192.168.1.2:9092,192.168.1.3:9092 kafka.topic=flinkdemo group.id=demo001 key.deserializer=org.apache.kafka.common.serialization.StringDeserializer value.deserializer=org.apache.kafka.common.serialization.StringDeserializer sasl.jaas.config=org.apache.kafka.common.security.scram.ScramCredential required username=demo passsword=demo; security.protocol=SASL_PLAINTEXT sasl.mechanism=SCRAM-SHA-2562.3 启动脚本start.sh --提交到yarn运行
#!/bin/bash
set -x
# kinit
kinit -kt /home/tomcat/keytab-util/demo.keytab demo
Flink_JAR=flink-demo.jar
Flink_HPATH=/u01/app/flink
${Flink_HPATH}/bin/flink run
-m yarn-cluster -yqu root.job
-ynm flink_demo -ys 5
-ytm 1024m -Dtaskmanager.memory.flink.size=8192
-yjm 1024m -d -c com.bob.demo.App
/u01/app/flink_demo/${Flink_JAR}
-- flink_config_path ./consumer.properties
2.4 启动类App
package com.bob.demo;
import org.apache.flink.api.java.utils.ParameterTool;
public class App {
public static void main(String[] args) {
try {
// 从启动参数获取配置文件
ParameterTool parameters = ParameterTool.fromArgs(args);
String configPath = parameters.get("flink_config_path");
// 获取配置参数,如kafka redis
ParameterTool parameterTool = ParameterTool.fromPropertiesFile(configPath);
FlinkDemo flinkDemo = new FlinkDemo();
flinkDemo.runApp(parameterTool);
} catch (Exception e) {
e.printStackTrace();
}
}
}
2.5 具体实现FlinkDemo
package com.bob.demo;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
public class FlinkDemo {
private static Logger logger = LoggerFactory.getLogger(FlinkDemo.class);
public void runApp(ParameterTool parameterTool) throws Exception {
// kafka参数
String topicName = parameterTool.get("topic.name");
Properties properties = initKafkaProperties(parameterTool);
// 创建flink运行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 构建数据源source
DataStreamSource stream = env.addSource(new FlinkKafkaConsumer(topicName, new SimpleStringSchema(), properties));
// 设置全局变量
env.getConfig().setGlobalJobParameters(parameterTool);
try {
// 构建sink1
stream.addSink(new DemoRichSink());
} catch (Exception e) {
e.printStackTrace();
}
// 构建sink2
}
public Properties initKafkaProperties(ParameterTool parameterTool) {
Properties properties = new Properties();
return properties;
}
}
2.6 sink逻辑DemoRichSink
package com.bob.demo; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import java.io.Serializable; public class DemoRichSink extends RichSinkFunctionimplements Serializable { @Override public void invoke(String value, Context context) throws Exception { // 获取全局变量 ParameterTool globalParams = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters(); // TODO 具体逻辑,写hdfs,hbase,es,socket..... } }
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)