
从复杂json中提取关心的字段数据,利用ROW的方式, 可以让复杂的json转变为可 *** 作的schema,然后可以通过 field as xxxxxx 来使用
version flink 1130
参考
>
Flink消费集成kerberos认证的kafka集群时,需要做一些配置才可以正常执行。
Flink版本:18;kafka版本:201;Flink模式:Standalone
//指示是否从 Kerberos ticket 缓存中读取
securitykerberosloginuse-ticket-cache: false1
//Kerberos 密钥表文件的绝对路径
securitykerberosloginkeytab: /data/home/keytab/flinkkeytab
//认证主体名称
securitykerberosloginprincipal: flink@datacom
//Kerberos登陆contexts
securitykerberoslogincontexts: Client,KafkaClient
val properties: Properties =new Properties()
propertiessetProperty("bootstrapservers","broker:9092")
propertiessetProperty("groupid","testKafka")
propertiessetProperty("securityprotocol","SASL_PLAINTEXT")
propertiessetProperty("saslmechanism","GSSAPI")
propertiessetProperty("saslkerberosservicename","kafka")
consumer =new FlinkKafkaConsumer[String]("flink",new SimpleStringSchema(), properties)
参数说明 :securityprotocol
运行参数可以配置为PLAINTEXT(可不配置)/SASL_PLAINTEXT/SSL/SASL_SSL四种协议,分别对应Fusion Insight Kafka集群的21005/21007/21008/21009端口。 如果配置了SASL,则必须配置saslkerberosservicename为kafka,并在conf/flink-confyaml中配置securitykerberoslogin相关配置项。如果配置了SSL,则必须配置ssltruststorelocation和ssltruststorepassword,前者表示truststore的位置,后者表示truststore密码。
以上就是关于flink sql kafka 解析复杂json全部的内容,包括:flink sql kafka 解析复杂json、基于flink sql构建实时数据仓库、Flink kafka kerberos的配置等相关内容解答,如果想了解更多相关内容,可以关注我们,你们的支持是我们更新的动力!
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)