
本文基于Flink SQL与hudi构建准实时数仓,在Flink从kafka接入数据之后,即将所有数据存于hudi中,包括所有中间处理数据以及最终数据。文章《实时数仓|基于Flink1.11的SQL构建实时数仓探索实践》描述了基于Flink SQL与kafka构建的实时数仓,本文以上述文章为基础。
在完成本文实践的同时可以同步参考上述文章。
最终结果:
本文电商业务为例,展示准实时数仓的数据处理流程。
组件与配置说明
Flink 1.13.3
flink cdc 2.0.2
hudi 0.10.0 (2021.12.08最新发布版本,地址:https://github.com/apache/hudi/releases/tag/release-0.10.0)
hadoop 3.2.0
zeppelin 0.10.0
mysql 5.7(开启binlog)
kafka 2.5.0
由于zeppelin的便捷性,本文全部基于zeppelin进行任务提交,如果您还不会用zeppelin,那么您可以参考:https://lrting-top.blog.csdn.net/article/details/120681666。当然,如果您不想用zeppelin,用Flink SQL Client提交也是完全没有问题的。
本实验Flink开启checkpoint,设置为60s。
在完成以下任务之前,请确保您已经
-
部署好Flink 1.13.3,并将hudi对应的Jar包已经正确打包并且放置到Flink的lib目录下,将flink cdc对应的jar包放置到lib目录下。
-
部署并启动zeppelin 0.10.0,在zeppelin的Flink interpreter上指定了Flink_HOME以及HADOOP_CLASSPATH
-
同时还有启动hadoop、mysql、kafka
首先从以下地址获取mysql建表语句以及模拟数据:
https://obs-githubhelper.obs.cn-east-3.myhuaweicloud.com/blog-images/category/bigdata/hudi/flink-hudi-realtime/realtime_table.zip
下载了上述建表语句之后,进入mysql,新建realtime_dw_demo_1,进入数据库 realtime_dw_demo_1 ,初始化数据库
mysql -u root -p create database realtime_dw_demo_1; use database realtime_dw_demo_1; source realtime_table.sql将mysql表数据同步到kafka
使用flink cdc将mysql数据同步到kafka中,以下为相关sql语句:
读取mysql源表数据
%flink.ssql drop table if exists base_category1; drop table if exists base_category2; drop table if exists base_category3; drop table if exists base_province; drop table if exists base_region; drop table if exists base_trademark; drop table if exists date_info; drop table if exists holiday_info; drop table if exists holiday_year; drop table if exists order_detail; drop table if exists order_info; drop table if exists order_status_log; drop table if exists payment_info; drop table if exists sku_info; drop table if exists user_info; ---mysql table CREATE TABLE `base_category1` ( `id` bigint NOT NULL, `name` string NOT NULL, PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'mysql-cdc', 'hostname' = 'hostname', 'port' = '3306', 'username' = 'root', 'password' = 'Pass-123-root', 'server-time-zone'= 'Asia/Shanghai', 'debezium.snapshot.mode'='initial', 'database-name' = 'realtime_dw_demo_1', 'table-name' = 'base_category1' ); CREATE TABLE `base_category2` ( `id` bigint NOT NULL COMMENT '编号', `name` string NOT NULL COMMENT '二级分类名称', `category1_id` bigint NULL COMMENT '一级分类编号', PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'mysql-cdc', 'hostname' = 'hostname', 'port' = '3306', 'username' = 'root', 'password' = 'Pass-123-root', 'server-time-zone'= 'Asia/Shanghai', 'debezium.snapshot.mode'='initial', 'database-name' = 'realtime_dw_demo_1', 'table-name' = 'base_category2' ); CREATE TABLE `base_category3` ( `id` bigint NOT NULL COMMENT '编号', `name` string NOT NULL COMMENT '三级分类名称', `category2_id` bigint NULL COMMENT '二级分类编号', PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'mysql-cdc', 'hostname' = 'hostname', 'port' = '3306', 'username' = 'root', 'password' = 'Pass-123-root', 'server-time-zone'= 'Asia/Shanghai', 'debezium.snapshot.mode'='initial', 'database-name' = 'realtime_dw_demo_1', 'table-name' = 'base_category3' ); CREATE TABLE `base_province` ( `id` int NULL COMMENT 'id', `name` string NULL COMMENT '省名称', `region_id` int NULL COMMENT '大区id', `area_code` string NULL COMMENT '行政区位码', PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'mysql-cdc', 'hostname' = 'hostname', 'port' = '3306', 'username' = 'root', 'password' = 'Pass-123-root', 'server-time-zone'= 'Asia/Shanghai', 'debezium.snapshot.mode'='initial', 'database-name' = 'realtime_dw_demo_1', 'table-name' = 'base_province' ); CREATE TABLE `base_region` ( `id` int NOT NULL COMMENT '大区id', `region_name` string NULL COMMENT '大区名称', PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'mysql-cdc', 'hostname' = 'hostname', 'port' = '3306', 'username' = 'root', 'password' = 'Pass-123-root', 'server-time-zone'= 'Asia/Shanghai', 'debezium.snapshot.mode'='initial', 'database-name' = 'realtime_dw_demo_1', 'table-name' = 'base_region' ); CREATE TABLE `base_trademark` ( `tm_id` string NULL COMMENT '品牌id', `tm_name` string NULL COMMENT '品牌名称', PRIMARY KEY (`tm_id`) NOT ENFORCED ) with( 'connector' = 'mysql-cdc', 'hostname' = 'hostname', 'port' = '3306', 'username' = 'root', 'password' = 'Pass-123-root', 'server-time-zone'= 'Asia/Shanghai', 'debezium.snapshot.mode'='initial', 'database-name' = 'realtime_dw_demo_1', 'table-name' = 'base_trademark' ); CREATE TABLE `date_info` ( `date_id` int NOT NULL, `week_id` int NULL, `week_day` int NULL, `day` int NULL, `month` int NULL, `quarter` int NULL, `year` int NULL, `is_workday` int NULL, `holiday_id` int NULL, PRIMARY KEY (`date_id`) NOT ENFORCED ) with( 'connector' = 'mysql-cdc', 'hostname' = 'hostname', 'port' = '3306', 'username' = 'root', 'password' = 'Pass-123-root', 'server-time-zone'= 'Asia/Shanghai', 'debezium.snapshot.mode'='initial', 'database-name' = 'realtime_dw_demo_1', 'table-name' = 'date_info' ); CREATE TABLE `holiday_info` ( `holiday_id` int NOT NULL, `holiday_name` string NULL, PRIMARY KEY (`holiday_id`) NOT ENFORCED ) with( 'connector' = 'mysql-cdc', 'hostname' = 'hostname', 'port' = '3306', 'username' = 'root', 'password' = 'Pass-123-root', 'server-time-zone'= 'Asia/Shanghai', 'debezium.snapshot.mode'='initial', 'database-name' = 'realtime_dw_demo_1', 'table-name' = 'holiday_info' ); CREATE TABLE `holiday_year` ( `year_id` int NULL, `holiday_id` int NULL, `start_date_id` int NULL, `end_date_id` int NULL, PRIMARY KEY (`end_date_id`) NOT ENFORCED ) with( 'connector' = 'mysql-cdc', 'hostname' = 'hostname', 'port' = '3306', 'username' = 'root', 'password' = 'Pass-123-root', 'server-time-zone'= 'Asia/Shanghai', 'debezium.snapshot.mode'='initial', 'database-name' = 'realtime_dw_demo_1', 'table-name' = 'holiday_year' ); CREATE TABLE `order_detail` ( `id` bigint NOT NULL COMMENT '编号', `order_id` bigint NULL COMMENT '订单编号', `sku_id` bigint NULL COMMENT 'sku_id', `sku_name` string NULL COMMENT 'sku名称(冗余)', `img_url` string NULL COMMENT '图片名称(冗余)', `order_price` decimal(10,2) NULL COMMENT '购买价格(下单时sku价格)', `sku_num` string NULL COMMENT '购买个数', `create_time` timestamp(3) NULL COMMENT '创建时间', PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'mysql-cdc', 'hostname' = 'hostname', 'port' = '3306', 'username' = 'root', 'password' = 'Pass-123-root', 'server-time-zone'= 'Asia/Shanghai', 'debezium.snapshot.mode'='initial', 'database-name' = 'realtime_dw_demo_1', 'table-name' = 'order_detail' ); CREATE TABLE `order_info` ( `id` bigint NOT NULL COMMENT '编号', `consignee` string NULL COMMENT '收货人', `consignee_tel` string NULL COMMENT '收件人电话', `total_amount` decimal(10,2) NULL COMMENT '总金额', `order_status` string NULL COMMENT '订单状态', `user_id` bigint NULL COMMENT '用户id', `payment_way` string NULL COMMENT '付款方式', `delivery_address` string NULL COMMENT '送货地址', `order_comment` string NULL COMMENT '订单备注', `out_trade_no` string NULL COMMENT '订单交易编号(第三方支付用)', `trade_body` string NULL COMMENT '订单描述(第三方支付用)', `create_time` timestamp(3) NULL COMMENT '创建时间', `operate_time` timestamp(3) NULL COMMENT ' *** 作时间', `expire_time` timestamp(3) NULL COMMENT '失效时间', `tracking_no` string NULL COMMENT '物流单编号', `parent_order_id` bigint NULL COMMENT '父订单编号', `img_url` string NULL COMMENT '图片路径', `province_id` int NULL COMMENT '地区', PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'mysql-cdc', 'hostname' = 'hostname', 'port' = '3306', 'username' = 'root', 'password' = 'Pass-123-root', 'server-time-zone'= 'Asia/Shanghai', 'debezium.snapshot.mode'='initial', 'database-name' = 'realtime_dw_demo_1', 'table-name' = 'order_info' ); CREATE TABLE `order_status_log` ( `id` int NOT NULL, `order_id` int NULL, `order_status` int NULL, `operate_time` timestamp(3) NULL, PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'mysql-cdc', 'hostname' = 'hostname', 'port' = '3306', 'username' = 'root', 'password' = 'Pass-123-root', 'server-time-zone'= 'Asia/Shanghai', 'debezium.snapshot.mode'='initial', 'database-name' = 'realtime_dw_demo_1', 'table-name' = 'order_status_log' ); CREATE TABLE `payment_info` ( `id` bigint NOT NULL COMMENT '编号', `out_trade_no` string NULL COMMENT '对外业务编号', `order_id` string NULL COMMENT '订单编号', `user_id` string NULL COMMENT '用户编号', `alipay_trade_no` string NULL COMMENT '支付宝交易流水编号', `total_amount` decimal(16,2) NULL COMMENT '支付金额', `subject` string NULL COMMENT '交易内容', `payment_type` string NULL COMMENT '支付方式', `payment_time` string NULL COMMENT '支付时间', PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'mysql-cdc', 'hostname' = 'vhost-118-23', 'port' = '3306', 'username' = 'root', 'password' = 'hostname', 'server-time-zone'= 'Asia/Shanghai', 'debezium.snapshot.mode'='initial', 'database-name' = 'realtime_dw_demo_1', 'table-name' = 'payment_info' ); CREATE TABLE `sku_info` ( `id` bigint NOT NULL COMMENT 'skuid(itemID)', `spu_id` bigint NULL COMMENT 'spuid', `price` decimal(10,0) NULL COMMENT '价格', `sku_name` string NULL COMMENT 'sku名称', `sku_desc` string NULL COMMENT '商品规格描述', `weight` decimal(10,2) NULL COMMENT '重量', `tm_id` bigint NULL COMMENT '品牌(冗余)', `category3_id` bigint NULL COMMENT '三级分类id(冗余)', `sku_default_img` string NULL COMMENT '默认显示图片(冗余)', `create_time` timestamp(3) NULL COMMENT '创建时间', PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'mysql-cdc', 'hostname' = 'hostname', 'port' = '3306', 'username' = 'root', 'password' = 'Pass-123-root', 'server-time-zone'= 'Asia/Shanghai', 'debezium.snapshot.mode'='initial', 'database-name' = 'realtime_dw_demo_1', 'table-name' = 'sku_info' ); CREATE TABLE `user_info` ( `id` bigint NOT NULL COMMENT '编号', `login_name` string NULL COMMENT '用户名称', `nick_name` string NULL COMMENT '用户昵称', `passwd` string NULL COMMENT '用户密码', `name` string NULL COMMENT '用户姓名', `phone_num` string NULL COMMENT '手机号', `email` string NULL COMMENT '邮箱', `head_img` string NULL COMMENT '头像', `user_level` string NULL COMMENT '用户级别', `birthday` date NULL COMMENT '用户生日', `gender` string NULL COMMENT '性别 M男,F女', `create_time` timestamp(3) NULL COMMENT '创建时间', PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'mysql-cdc', 'hostname' = 'hostname', 'port' = '3306', 'username' = 'root', 'password' = 'Pass-123-root', 'server-time-zone'= 'Asia/Shanghai', 'debezium.snapshot.mode'='initial', 'database-name' = 'realtime_dw_demo_1', 'table-name' = 'user_info' );
kafka sink表建表语句
%flink.ssql drop table if exists base_category1_topic; drop table if exists base_category2_topic; drop table if exists base_category3_topic; drop table if exists base_province_topic; drop table if exists base_region_topic; drop table if exists base_trademark_topic; drop table if exists date_info_topic; drop table if exists holiday_info_topic; drop table if exists holiday_year_topic; drop table if exists order_detail_topic; drop table if exists order_info_topic; drop table if exists order_status_log_topic; drop table if exists payment_info_topic; drop table if exists sku_info_topic; drop table if exists user_info_topic; CREATE TABLE `base_category1_topic` ( `id` bigint NOT NULL COMMENT '编号', `name` string NOT NULL COMMENT '分类名称', PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'kafka' ,'topic' = 'my5.base_category1' ,'properties.bootstrap.servers' = 'kafka-cluster:6667' ,'format' = 'debezium-json' ); CREATE TABLE `base_category2_topic` ( `id` bigint NOT NULL COMMENT '编号', `name` string NOT NULL COMMENT '二级分类名称', `category1_id` bigint NULL COMMENT '一级分类编号', PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'kafka' ,'topic' = 'my5.base_category2' ,'properties.bootstrap.servers' = 'kafka-cluster:6667' ,'format' = 'debezium-json' ); CREATE TABLE `base_category3_topic` ( `id` bigint NOT NULL COMMENT '编号', `name` string NOT NULL COMMENT '三级分类名称', `category2_id` bigint NULL COMMENT '二级分类编号', PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'kafka' ,'topic' = 'my5.base_category3' ,'properties.bootstrap.servers' = 'kafka-cluster:6667' ,'format' = 'debezium-json' ); CREATE TABLE `base_province_topic` ( `id` int NULL COMMENT 'id', `name` string NULL COMMENT '省名称', `region_id` int NULL COMMENT '大区id', `area_code` string NULL COMMENT '行政区位码' ) with( 'connector' = 'kafka' ,'topic' = 'my5.base_province' ,'properties.bootstrap.servers' = 'kafka-cluster:6667' ,'format' = 'debezium-json' ); CREATE TABLE `base_region_topic` ( `id` int NOT NULL COMMENT '大区id', `region_name` string NULL COMMENT '大区名称', PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'kafka' ,'topic' = 'my5.base_region' ,'properties.bootstrap.servers' = 'kafka-cluster:6667' ,'format' = 'debezium-json' ); CREATE TABLE `base_trademark_topic` ( `tm_id` string NULL COMMENT '品牌id', `tm_name` string NULL COMMENT '品牌名称', PRIMARY KEY (`tm_id`) NOT ENFORCED ) with( 'connector' = 'kafka' ,'topic' = 'my5.base_trademark' ,'properties.bootstrap.servers' = 'kafka-cluster:6667' ,'format' = 'debezium-json' ); CREATE TABLE `date_info_topic` ( `date_id` int NOT NULL, `week_id` int NULL, `week_day` int NULL, `day` int NULL, `month` int NULL, `quarter` int NULL, `year` int NULL, `is_workday` int NULL, `holiday_id` int NULL, PRIMARY KEY (`date_id`) NOT ENFORCED ) with( 'connector' = 'kafka' ,'topic' = 'my5.date_info' ,'properties.bootstrap.servers' = 'kafka-cluster:6667' ,'format' = 'debezium-json' ); CREATE TABLE `holiday_info_topic` ( `holiday_id` int NOT NULL, `holiday_name` string NULL, PRIMARY KEY (`holiday_id`) NOT ENFORCED ) with( 'connector' = 'kafka' ,'topic' = 'my5.holiday_info' ,'properties.bootstrap.servers' = 'kafka-cluster:6667' ,'format' = 'debezium-json' ); CREATE TABLE `holiday_year_topic` ( `year_id` int NULL, `holiday_id` int NULL, `start_date_id` int NULL, `end_date_id` int NULL ) with( 'connector' = 'kafka' ,'topic' = 'my5.holiday_year' ,'properties.bootstrap.servers' = 'kafka-cluster:6667' ,'format' = 'debezium-json' ); CREATE TABLE `order_detail_topic` ( `id` bigint NOT NULL COMMENT '编号', `order_id` bigint NULL COMMENT '订单编号', `sku_id` bigint NULL COMMENT 'sku_id', `sku_name` string NULL COMMENT 'sku名称(冗余)', `img_url` string NULL COMMENT '图片名称(冗余)', `order_price` decimal(10,2) NULL COMMENT '购买价格(下单时sku价格)', `sku_num` string NULL COMMENT '购买个数', `create_time` timestamp(3) NULL COMMENT '创建时间', PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'kafka' ,'topic' = 'my5.order_detail' ,'properties.bootstrap.servers' = 'kafka-cluster:6667' ,'format' = 'debezium-json' ); CREATE TABLE `order_info_topic` ( `id` bigint NOT NULL COMMENT '编号', `consignee` string NULL COMMENT '收货人', `consignee_tel` string NULL COMMENT '收件人电话', `total_amount` decimal(10,2) NULL COMMENT '总金额', `order_status` string NULL COMMENT '订单状态', `user_id` bigint NULL COMMENT '用户id', `payment_way` string NULL COMMENT '付款方式', `delivery_address` string NULL COMMENT '送货地址', `order_comment` string NULL COMMENT '订单备注', `out_trade_no` string NULL COMMENT '订单交易编号(第三方支付用)', `trade_body` string NULL COMMENT '订单描述(第三方支付用)', `create_time` timestamp(3) NULL COMMENT '创建时间', `operate_time` timestamp(3) NULL COMMENT ' *** 作时间', `expire_time` timestamp(3) NULL COMMENT '失效时间', `tracking_no` string NULL COMMENT '物流单编号', `parent_order_id` bigint NULL COMMENT '父订单编号', `img_url` string NULL COMMENT '图片路径', `province_id` int NULL COMMENT '地区', PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'kafka' ,'topic' = 'my5.order_info' ,'properties.bootstrap.servers' = 'kafka-cluster:6667' ,'format' = 'debezium-json' ); CREATE TABLE `order_status_log_topic` ( `id` int NOT NULL , `order_id` int NULL, `order_status` int NULL, `operate_time` timestamp(3) NULL, PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'kafka' ,'topic' = 'my5.order_status_log' ,'properties.bootstrap.servers' = 'kafka-cluster:6667' ,'format' = 'debezium-json' ); CREATE TABLE `payment_info_topic` ( `id` bigint NOT NULL COMMENT '编号', `out_trade_no` string NULL COMMENT '对外业务编号', `order_id` string NULL COMMENT '订单编号', `user_id` string NULL COMMENT '用户编号', `alipay_trade_no` string NULL COMMENT '支付宝交易流水编号', `total_amount` decimal(16,2) NULL COMMENT '支付金额', `subject` string NULL COMMENT '交易内容', `payment_type` string NULL COMMENT '支付方式', `payment_time` string NULL COMMENT '支付时间', PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'kafka' ,'topic' = 'my5.payment_info' ,'properties.bootstrap.servers' = 'kafka-cluster:6667' ,'format' = 'debezium-json' ); CREATE TABLE `sku_info_topic` ( `id` bigint NOT NULL COMMENT 'skuid(itemID)', `spu_id` bigint NULL COMMENT 'spuid', `price` decimal(10,0) NULL COMMENT '价格', `sku_name` string NULL COMMENT 'sku名称', `sku_desc` string NULL COMMENT '商品规格描述', `weight` decimal(10,2) NULL COMMENT '重量', `tm_id` bigint NULL COMMENT '品牌(冗余)', `category3_id` bigint NULL COMMENT '三级分类id(冗余)', `sku_default_img` string NULL COMMENT '默认显示图片(冗余)', `create_time` timestamp(3) NULL COMMENT '创建时间', PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'kafka' ,'topic' = 'my5.sku_info' ,'properties.bootstrap.servers' = 'kafka-cluster:6667' ,'format' = 'debezium-json' ); CREATE TABLE `user_info_topic` ( `id` bigint NOT NULL COMMENT '编号', `login_name` string NULL COMMENT '用户名称', `nick_name` string NULL COMMENT '用户昵称', `passwd` string NULL COMMENT '用户密码', `name` string NULL COMMENT '用户姓名', `phone_num` string NULL COMMENT '手机号', `email` string NULL COMMENT '邮箱', `head_img` string NULL COMMENT '头像', `user_level` string NULL COMMENT '用户级别', `birthday` date NULL COMMENT '用户生日', `gender` varchar(1) NULL COMMENT '性别 M男,F女', `create_time` timestamp(3) NULL COMMENT '创建时间', PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'kafka' ,'topic' = 'my5.user_info' ,'properties.bootstrap.servers' = 'kafka-cluster:6667' ,'format' = 'debezium-json' );
insert语句,将mysql binlog数据导入kafka对应的topic
%flink.ssql(runAsOne=true) insert into base_category1_topic select * from base_category1; insert into base_category2_topic select * from base_category2; insert into base_category3_topic select * from base_category3; insert into base_province_topic select * from base_province; insert into base_region_topic select * from base_region; insert into base_trademark_topic select * from base_trademark; insert into date_info_topic select * from date_info; insert into holiday_info_topic select * from holiday_info; insert into holiday_year_topic select * from holiday_year; insert into order_detail_topic select * from order_detail; insert into order_info_topic select * from order_info; insert into order_status_log_topic select * from order_status_log; insert into payment_info_topic select * from payment_info; insert into sku_info_topic select * from sku_info; insert into user_info_topic select * from user_info;将维表数据导入hudi
将my5.base_province和my1.base_region两张区域维表信息写入hudi COW表中
%flink.ssql drop table if exists base_province_topic_source; drop table if exists base_province_hudi; CREATE TABLE `base_province_topic_source` ( `id` int NULL COMMENT 'id', `name` string NULL COMMENT '省名称', `region_id` int NULL COMMENT '大区id', `area_code` string NULL COMMENT '行政区位码' ) with( 'connector' = 'kafka' ,'topic' = 'my5.base_province' ,'properties.bootstrap.servers' = 'kafka-cluster:6667' ,'format' = 'debezium-json' ,'scan.startup.mode' = 'earliest-offset' ,'properties.group.id' = 'hudiGroup' ); CREATE TABLE `base_province_hudi` ( `id` int NULL COMMENT 'id', `name` string NULL COMMENT '省名称', `region_id` int NULL COMMENT '大区id', `area_code` string NULL COMMENT '行政区位码', PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'hudi', 'path' = 'hdfs://hadoop:8020/realtime-demo-2/base_province_hudi', 'table.type' = 'COPY_ON_WRITE', 'write.precombine.field' = 'id', 'read.streaming.enabled' = 'true' );
%flink.ssql insert into base_province_hudi select * from base_province_topic_source;
%flink.ssql drop table if exists base_region_topic_source; drop table if exists base_region_hudi; CREATE TABLE `base_region_topic_source` ( `id` int NOT NULL COMMENT '大区id', `region_name` string NULL COMMENT '大区名称', PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'kafka' ,'topic' = 'my5.base_region' ,'properties.bootstrap.servers' = 'kafka-cluster:6667' ,'format' = 'debezium-json' ,'scan.startup.mode' = 'earliest-offset' ,'properties.group.id' = 'hudiGroup' ); CREATE TABLE `base_region_hudi` ( `id` int NOT NULL COMMENT '大区id', `region_name` string NULL COMMENT '大区名称', PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'hudi', 'path' = 'hdfs://hadoop:8020/realtime-demo-2/base_region_hudi', 'table.type' = 'COPY_ON_WRITE', 'write.precombine.field' = 'id', 'read.streaming.enabled' = 'true' );
%flink.ssql insert into base_region_hudi select * from base_region_topic_source;
使用上述两张维表创建dim_province表
%flink.ssql
DROp TABLE IF EXISTS dim_province_hudi;
create table dim_province_hudi (
province_id INT,
province_name STRING,
area_code STRING,
region_id INT,
region_name STRING ,
PRIMARY KEY (province_id) NOT ENFORCED
) with (
'connector' = 'hudi',
'path' = 'hdfs://hadoop:8020/realtime-demo-2/dim_province_hudi',
'table.type' = 'COPY_ON_WRITE',
'write.precombine.field' = 'province_id',
'read.streaming.enabled' = 'true'
);
%flink.ssql
insert into dim_province_hudi
SELECT
bp.id AS province_id,
bp.name AS province_name,
bp.area_code AS area_code,
br.id AS region_id,
br.region_name AS region_name
FROM base_region_hudi br
JOIN base_province_hudi bp ON br.id= bp.region_id
;
将商品维表my5.base_category1和my5.base_category2两张商品维表信息写入hudi COW表
%flink.ssql drop table if exists base_category1_topic_source; drop table if exists base_category1_hudi; CREATE TABLE `base_category1_topic_source` ( `id` bigint NOT NULL COMMENT '编号', `name` string NOT NULL COMMENT '分类名称', PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'kafka' ,'topic' = 'my5.base_category1' ,'properties.bootstrap.servers' = 'kafka-cluster:6667' ,'format' = 'debezium-json' ,'scan.startup.mode' = 'earliest-offset' ,'properties.group.id' = 'hudiGroup' ); CREATE TABLE `base_category1_hudi` ( `id` bigint NOT NULL COMMENT '编号', `name` string NOT NULL COMMENT '分类名称', PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'hudi', 'path' = 'hdfs://hadoop:8020/realtime-demo-2/base_category1_hudi', 'table.type' = 'COPY_ON_WRITE', 'write.precombine.field' = 'id', 'read.streaming.enabled' = 'true' );
%flink.ssql insert into base_category1_hudi select * from base_category1_topic_source;
%flink.ssql drop table if exists base_category2_topic_source; drop table if exists base_category2_hudi; CREATE TABLE `base_category2_topic_source` ( `id` bigint NOT NULL COMMENT '编号', `name` string NOT NULL COMMENT '分类名称', `category1_id` bigint NULL COMMENT '一级分类编号', PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'kafka' ,'topic' = 'my5.base_category2' ,'properties.bootstrap.servers' = 'kafka-cluster:6667' ,'format' = 'debezium-json' ,'scan.startup.mode' = 'earliest-offset' ,'properties.group.id' = 'hudiGroup' ); CREATE TABLE `base_category2_hudi` ( `id` bigint NOT NULL COMMENT '编号', `name` string NOT NULL COMMENT '分类名称', `category1_id` bigint NULL COMMENT '一级分类编号', PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'hudi', 'path' = 'hdfs://hadoop:8020/realtime-demo-2/base_category2_hudi', 'table.type' = 'COPY_ON_WRITE', 'write.precombine.field' = 'id', 'read.streaming.enabled' = 'true' );
%flink.ssql insert into base_category2_hudi select * from base_category2_topic_source;
%flink.ssql drop table if exists base_category3_topic_source; drop table if exists base_category3_hudi; CREATE TABLE `base_category3_topic_source` ( `id` bigint NOT NULL COMMENT '编号', `name` string NOT NULL COMMENT '分类名称', `category2_id` bigint NULL COMMENT '二级分类编号', PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'kafka' ,'topic' = 'my5.base_category3' ,'properties.bootstrap.servers' = 'kafka-cluster:6667' ,'format' = 'debezium-json' ,'scan.startup.mode' = 'earliest-offset' ,'properties.group.id' = 'hudiGroup' ); CREATE TABLE `base_category3_hudi` ( `id` bigint NOT NULL COMMENT '编号', `name` string NOT NULL COMMENT '分类名称', `category2_id` bigint NULL COMMENT '二级分类编号', PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'hudi', 'path' = 'hdfs://hadoop:8020/realtime-demo-2/base_category3_hudi', 'table.type' = 'COPY_ON_WRITE', 'write.precombine.field' = 'id', 'read.streaming.enabled' = 'true' );
%flink.ssql insert into base_category3_hudi select * from base_category3_topic_source;将商品表导入hudi
%flink.ssql drop table if exists sku_info_topic_source; drop table if exists sku_info_topic_hudi; CREATE TABLE `sku_info_topic_source` ( `id` bigint NOT NULL COMMENT 'skuid(itemID)', `spu_id` bigint NULL COMMENT 'spuid', `price` decimal(10,0) NULL COMMENT '价格', `sku_name` string NULL COMMENT 'sku名称', `sku_desc` string NULL COMMENT '商品规格描述', `weight` decimal(10,2) NULL COMMENT '重量', `tm_id` bigint NULL COMMENT '品牌(冗余)', `category3_id` bigint NULL COMMENT '三级分类id(冗余)', `sku_default_img` string NULL COMMENT '默认显示图片(冗余)', `create_time` timestamp(3) NULL COMMENT '创建时间', PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'kafka' ,'topic' = 'my5.sku_info' ,'properties.bootstrap.servers' = 'kafka-cluster:6667' ,'format' = 'debezium-json' ,'scan.startup.mode' = 'earliest-offset' ,'properties.group.id' = 'hudiGroup' ); CREATE TABLE `sku_info_topic_hudi` ( `id` bigint NOT NULL COMMENT 'skuid(itemID)', `spu_id` bigint NULL COMMENT 'spuid', `price` decimal(10,0) NULL COMMENT '价格', `sku_name` string NULL COMMENT 'sku名称', `sku_desc` string NULL COMMENT '商品规格描述', `weight` decimal(10,2) NULL COMMENT '重量', `tm_id` bigint NULL COMMENT '品牌(冗余)', `category3_id` bigint NULL COMMENT '三级分类id(冗余)', `sku_default_img` string NULL COMMENT '默认显示图片(冗余)', `create_time` timestamp(3) NULL COMMENT '创建时间', PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'hudi', 'path' = 'hdfs://hadoop:8020/realtime-demo-2/sku_info_topic_hudi', 'table.type' = 'COPY_ON_WRITE', 'write.precombine.field' = 'id', 'read.streaming.enabled' = 'true' );
%flink.ssql insert into sku_info_topic_hudi select * from sku_info_topic_source;
基于上述步骤,我们把商品维表的基础数据同步到hudi中,同样我们使用商品维表创建
dim_sku_info视图
%flink.ssql drop view if exists dim_sku_info; CREATE VIEW dim_sku_info AS SELECT si.id AS id, si.sku_name AS sku_name, si.category3_id AS c3_id, si.weight AS weight, si.tm_id AS tm_id, si.price AS price, si.spu_id AS spu_id, c3.name AS c3_name, c2.id AS c2_id, c2.name AS c2_name, c3.id AS c1_id, c3.name AS c1_name FROM sku_info_topic_hudi si JOIN base_category3_hudi c3 ON si.category3_id = c3.id JOIN base_category2_hudi c2 ON c3.category2_id =c2.id JOIN base_category1_hudi c1 ON c2.category1_id = c1.id ;DWD层数据处理
经过上面的步骤,我们已经将所用的维表已经准备好了。接下来我们将对ODS的原始数据进行处理,加工成DWD层的明细宽表。
%flink.ssql
drop table if exists ods_order_detail_topic;
drop table if exists ods_order_info_topic;
drop table if exists dwd_paid_order_detail_hudi;
CREATE TABLE `ods_order_detail_topic` (
`id` bigint NOT NULL COMMENT '编号',
`order_id` bigint NULL COMMENT '订单编号',
`sku_id` bigint NULL COMMENT 'sku_id',
`sku_name` string NULL COMMENT 'sku名称(冗余)',
`img_url` string NULL COMMENT '图片名称(冗余)',
`order_price` decimal(10,2) NULL COMMENT '购买价格(下单时sku价格)',
`sku_num` int NULL COMMENT '购买个数',
`create_time` timestamp(3) NULL COMMENT '创建时间',
PRIMARY KEY (`id`) NOT ENFORCED
)
with(
'connector' = 'kafka'
,'topic' = 'my5.order_detail'
,'properties.bootstrap.servers' = 'kafka-cluster:6667'
,'format' = 'debezium-json'
,'scan.startup.mode' = 'earliest-offset'
,'properties.group.id' = 'hudiGroup'
);
CREATE TABLE `ods_order_info_topic` (
`id` bigint NOT NULL COMMENT '编号',
`consignee` string NULL COMMENT '收货人',
`consignee_tel` string NULL COMMENT '收件人电话',
`total_amount` decimal(10,2) NULL COMMENT '总金额',
`order_status` string NULL COMMENT '订单状态',
`user_id` bigint NULL COMMENT '用户id',
`payment_way` string NULL COMMENT '付款方式',
`delivery_address` string NULL COMMENT '送货地址',
`order_comment` string NULL COMMENT '订单备注',
`out_trade_no` string NULL COMMENT '订单交易编号(第三方支付用)',
`trade_body` string NULL COMMENT '订单描述(第三方支付用)',
`create_time` timestamp(3) NULL COMMENT '创建时间',
`operate_time` timestamp(3) NULL COMMENT ' *** 作时间',
`expire_time` timestamp(3) NULL COMMENT '失效时间',
`tracking_no` string NULL COMMENT '物流单编号',
`parent_order_id` bigint NULL COMMENT '父订单编号',
`img_url` string NULL COMMENT '图片路径',
`province_id` int NULL COMMENT '地区',
PRIMARY KEY (`id`) NOT ENFORCED
)
with(
'connector' = 'kafka'
,'topic' = 'my5.order_info'
,'properties.bootstrap.servers' = 'kafka-cluster:6667'
,'format' = 'debezium-json'
,'scan.startup.mode' = 'earliest-offset'
,'properties.group.id' = 'hudiGroup'
);
CREATE TABLE dwd_paid_order_detail_hudi
(
detail_id BIGINT,
order_id BIGINT,
user_id BIGINT,
province_id INT,
sku_id BIGINT,
sku_name STRING,
sku_num INT,
order_price DECIMAL(10,0),
create_time TIMESTAMP(3),
pay_time TIMESTAMP(3),
primary key (detail_id) not enforced
) WITH (
'connector' = 'hudi',
'path' = 'hdfs://hadoop:8020/realtime-demo-2/dwd_paid_order_detail_hudi',
'scan.startup.mode' = 'earliest-offset',
'table.type' = 'MERGE_ON_READ',
'compaction.async.enabled' = 'false',
'read.streaming.enabled' = 'true'
);
%flink.ssql
insert into dwd_paid_order_detail_hudi
SELECT
od.id,
oi.id order_id,
oi.user_id,
oi.province_id,
od.sku_id,
od.sku_name,
od.sku_num,
od.order_price,
oi.create_time,
oi.operate_time
FROM
(
SELECt *
FROM ods_order_info_topic
WHERe order_status = '2' -- 已支付
) oi JOIN
(
SELECt *
FROM ods_order_detail_topic
) od
ON oi.id = od.order_id;
ADS层数据
经过上面的步骤,我们创建了一张dwd_paid_order_detail明细宽表,并将该表存储在了hudi中。接下来我们将使用这张明细宽表与维表进行JOIN,得到我们ADS应用层数据。
ads_province_index_hudi
%flink.ssql
drop table if exists ads_province_index_hudi;
drop table if exists tmp_province_index_hudi;
-- ---------------------------------
-- 使用 DDL创建MySQL中的ADS层表
-- 指标:1.每天每个省份的订单数
-- 2.每天每个省份的订单金额
-- ---------------------------------
CREATE TABLE ads_province_index_hudi(
province_id INT,
area_code STRING,
province_name STRING,
region_id INT,
region_name STRING,
order_amount DECIMAL(10,2),
order_count BIGINT,
dt STRING,
PRIMARY KEY (province_id, dt) NOT ENFORCED
) WITH (
'connector' = 'hudi',
'path' = 'hdfs://hadoop:8020/realtime-demo-2/ads_province_index_hudi',
'table.type' = 'MERGE_ON_READ',
'compaction.async.enabled' = 'false',
'read.streaming.enabled' = 'true'
);
-- ---------------------------------
-- tmp_province_index
-- 订单汇总临时表
-- ---------------------------------
CREATE TABLE tmp_province_index_hudi(
province_id INT,
order_count BIGINT,-- 订单数
order_amount DECIMAL(10,2), -- 订单金额
pay_date DATE,
primary key(province_id) not enforced
)WITH (
'connector' = 'hudi',
'path' = 'hdfs://hadoop:8020/realtime-demo-2/tmp_province_index_hudi',
'table.type' = 'MERGE_ON_READ',
'compaction.async.enabled' = 'false',
'read.streaming.enabled' = 'true'
);
%flink.ssql
-- ---------------------------------
-- tmp_province_index
-- 订单汇总临时表数据装载
-- ---------------------------------
INSERT INTO tmp_province_index_hudi
SELECT
province_id,
count(distinct order_id) order_count,-- 订单数
sum(order_price * sku_num) order_amount, -- 订单金额
TO_DATE(DATE_FORMAT(pay_time,'yyyy-MM-dd')) pay_date
FROM dwd_paid_order_detail_hudi
GROUP BY province_id,TO_DATE(DATE_FORMAT(pay_time,'yyyy-MM-dd'))
%flink.ssql INSERT INTO ads_province_index_hudi SELECt pc.province_id, dp.area_code, dp.province_name, dp.region_id, dp.region_name, pc.order_amount, pc.order_count, cast(pc.pay_date as VARCHAR) FROM tmp_province_index_hudi pc JOIN dim_province_hudi as dp ON dp.province_id = pc.province_id;
查看ADS层的ads_province_index_hudi表数据:
ads_sku_index_hudi
%flink.ssql
-- ---------------------------------
-- 使用 DDL创建hudi中的ADS层表
-- 指标:1.每天每个商品对应的订单个数
-- 2.每天每个商品对应的订单金额
-- 3.每天每个商品对应的数量
-- ---------------------------------
drop table if exists ads_sku_index_hudi;
CREATE TABLE ads_sku_index_hudi
(
sku_id BIGINT,
sku_name VARCHAR,
weight DOUBLE,
tm_id BIGINT,
price DOUBLE,
spu_id BIGINT,
c3_id BIGINT,
c3_name VARCHAR ,
c2_id BIGINT,
c2_name VARCHAR,
c1_id BIGINT,
c1_name VARCHAR,
order_amount DOUBLE,
order_count BIGINT,
sku_count BIGINT,
dt varchar,
PRIMARY KEY (sku_id,dt) NOT ENFORCED
) with (
'connector' = 'hudi',
'path' = 'hdfs://hadoop:8020/realtime-demo-2/ads_sku_index_hudi',
'table.type' = 'MERGE_ON_READ',
'compaction.async.enabled' = 'false',
'read.streaming.enabled' = 'true'
);
-- ---------------------------------
-- tmp_sku_index
-- 商品指标统计
-- ---------------------------------
drop table if exists tmp_sku_index_hudi;
CREATE TABLE tmp_sku_index_hudi(
sku_id BIGINT,
order_count BIGINT,-- 订单数
order_amount DECIMAL(10,2), -- 订单金额
order_sku_num BIGINT,
pay_date DATE,
PRIMARY KEY (sku_id) NOT ENFORCED
)WITH (
'connector' = 'hudi',
'path' = 'hdfs://hadoop:8020/realtime-demo-2/tmp_sku_index_hudi',
'table.type' = 'MERGE_ON_READ',
'compaction.async.enabled' = 'false',
'read.streaming.enabled' = 'true'
);
%flink.ssql
-- ---------------------------------
-- tmp_sku_index
-- 数据装载
-- ---------------------------------
INSERT INTO tmp_sku_index_hudi
SELECT
sku_id,
count(distinct order_id) order_count, -- 订单数
sum(order_price * sku_num) order_amount, -- 订单金额
sum(sku_num) order_sku_num,
TO_DATE(DATE_FORMAT(pay_time,'yyyy-MM-dd')) pay_date
FROM dwd_paid_order_detail_hudi
GROUP BY sku_id,TO_DATE(DATE_FORMAT(pay_time,'yyyy-MM-dd'))
%flink.ssql INSERT INTO ads_sku_index_hudi SELECt sku_id , sku_name , weight , tm_id , price , spu_id , c3_id , c3_name, c2_id , c2_name , c1_id , c1_name , sc.order_amount, sc.order_count , sc.order_sku_num , cast(sc.pay_date as VARCHAR) FROM tmp_sku_index_hudi sc JOIN dim_sku_info ds ON ds.id = sc.sku_id
%flink.ssql select * from ads_sku_index_hudi;
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)