
Flink 链接Kafka
先建立catalog
CREATE CATALOG flink_hive WITH (
'type' = 'hive',
'default-database' = 'imods',
'hive-conf-dir' = '/home/admin/flink/conf'
);
建立kafka table
use catalog flink_hive;
--创建kafka源表
CREATE TABLE IF NOT EXISTS kafka_table (
vin string,
age int,
...
)--with 写入链接信息以及各种设置
WITH (
'connector' = 'kafka',
'topic' = '自定义的topic',
'properties.group.id' = '自定义的id',
'properties.bootstrap.servers' = '自己知道的地址1:端口号1,自己知道的地址2:端口号2',
'properties.security.protocol'='SASL_PLAINTEXT',
'properties.sasl.mechanism'='PLAIN',
'properties.sasl.jaas.config'='org.apache.kafka.common.security.plain.PlainLoginModule required username="用户名" password="密码";', --设定用户名与密码
'properties.key.deserializer' = 'org.apache.kafka.common.serialization.StringDeserializer',
'properties.value.deserializer' = 'org.apache.kafka.common.serialization.StringDeserializer',
--'scan.startup.mode' = 'latest-offset',--五种选项下面会注意说明
--'scan.startup.mode' = 'earliest-offset',
'scan.startup.mode' = 'group-offsets',
'json.fail-on-missing-field' = 'false',--是否允许失败策略
'json.ignore-parse-errors' = 'true',--是否开启忽略错误策略
'format' = 'json'--传入的格式
);
group-offsets: start from committed offsets in ZK / Kafka brokers of a specific consumer group.从提交给ZK的offset开始消费(必须注明groupID 才可以)
earliest-offset: start from the earliest offset possible. 从最初点开始消费
latest-offset: start from the latest offset.程序运行时有新消息才消费新消息
timestamp: start from user-supplied timestamp for each partition. 指定时间戳开始进行消费
specific-offsets: start from user-supplied specific offsets for each partition.
指定位置进行消费
建立对应的hivetable
--创建HIVE目标表 set table.sql-dialect=hive; create table if not exists hive_table --table字段类型顺序务必与Kafkatable一致,严格要求 ( vin string, age int, ... ) comment '我是Hive表' partitioned by (dt string) --option stored as parquet --option TBLPROPERTIES ( 'sink.rolling-policy.file-size'='128MB', 'sink.rolling-policy.rollover-interval'='30 min', 'sink.partition-commit.policy.kind'='metastore,success-file',--合并小文件选项 'auto-compaction'='true', 'compaction.file-size'='128MB', 'sink.shuffle-by-partition.enable'='true' ) ;
--执行insert语句动态分区插入 set pipeline.name=设定英文任务名; -- 设定英文任务名 不需要加引号 set table.sql-dialect=default; insert into hive_table select vin string as vin, age int as age, ... from kafka_table; --记录一个casewhen语句 用于时间戳的转换:case when CHARACTER_LENGTH(cast (eventTime as string)) = 13 then from_unixtime(cast (substr (cast (eventTime as string),0,10) as BIGINT),'yyyyMMdd') else '19700101' end as dt
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)