Flink kafka kerberos的配置

Flink kafka kerberos的配置,第1张

Flink消费集成kerberos认证的kafka集群时,需要做一些配置才可以正常执行。

    Flink版本:1.8;kafka版本:2.0.1;Flink模式:Standalone

    //指示是否从 Kerberos ticket 缓存中读取

    security.kerberos.login.use-ticket-cache: false1

   //Kerberos 密钥表文件的绝对路径

    security.kerberos.login.keytab: /data/home/keytab/flink.keytab

   //认证主体名称

    security.kerberos.login.principal: flink@data.com

    //Kerberos登陆contexts

    security.kerberos.login.contexts: Client,KafkaClient

  val properties: Properties =new Properties()

  properties.setProperty("bootstrap.servers","broker:9092")

  properties.setProperty("group.id","testKafka")

  properties.setProperty("security.protocol","SASL_PLAINTEXT")

  properties.setProperty("sasl.mechanism","GSSAPI")

  properties.setProperty("sasl.kerberos.service.name","kafka")

  consumer =new   FlinkKafkaConsumer[String]("flink",new SimpleStringSchema(), properties)

    参数说明 :security.protocol 

    运行参数可以配置为PLAINTEXT(可不配置)/SASL_PLAINTEXT/SSL/SASL_SSL四种协议,分别对应Fusion Insight Kafka集群的21005/21007/21008/21009端口。 如果配置了SASL,则必须配置sasl.kerberos.service.name为kafka,并在conf/flink-conf.yaml中配置security.kerberos.login相关配置项。如果配置了SSL,则必须配置ssl.truststore.location和ssl.truststore.password,前者表示truststore的位置,后者表示truststore密码。

1、什么是状态

2、Flink状态类型有哪几种?

3、状态有什么作用?

4、如何使用状态,实现什么样的API?

5、什么是checkpoint与savepoint?

6、如何使用checkpoint与savepoint?

7、checkpoint原理是什么?

8、checkpint存储到hdfs上又是什么意思?

<1>增量计算

聚合 *** 作、机器学习训练模型迭代运算时保存当前模型等等

<2>容错

Job故障重启、升级

定义: 某task或者operator 在某一时刻的在内存中的状态。

而checkpoint是,对于这个中间结果进行一次快照。

作用:State是可以被记录的,在失败的情况下可以恢复。

checkpoint则表示了一个Flink Job,在一个特定时刻的一份全局状态快照,即包含了一个job下所有task/operator某时刻的状态。

比如任务挂掉的时候或被手动停止的时候,可以从挂掉的点重新继续消费。

基本类型:Operator state、Keyed state

特殊的 Broadcast State

适用场景:

增量计算:

<1>聚合 *** 作

<2>机器学习训练模型迭代运算时保存当前模型

等等

容错:

Job故障重启

使用状态,必须使用RichFunction,因为状态是使用RuntimeContext访问的,只能在RichFunction中访问

假设现在存在输入源数据格式为(EventID,Value)

输出数据,直接flatMap即可,无状态。

如果要输出某EventID最大值/最小值等,HashMap是否可以?

程序一旦Crash,如何恢复?

答案:Flink提供了一套状态保存的方法,不需要借助第三方存储系统来解决状态存储问题。

Operator State跟一个特定operator的一个并发实例绑定,整个operator只对应一个state。相比较而言,在一个operator上,可能有很多个key,从而对应多个keyed state。

所以一个并行度为4的source,即有4个实例,那么就会有4个状态

举例:Flink中的Kafka Connector,就使用了operator state。有几个并行度,就会有几个connector实例,消费的分区不一样,它会在每个connector实例中,保存该实例中消费topic的所有(partition,offset)映射。

数据结构:ListState<T>

一般编码过程:实现CheckpointedFunction接口,必须实现两个函数,分别是:

initializeState和snapshotState

如何保存状态?

通常是定义一个private transient ListState<Long>checkPointList

注意:使用Operator State最好不要在keyBy之后使用,另外不要将太大的state存放到这个里面。

是基于KeyStream之上的状态,keyBy之后的Operator State。

那么,一个并行度为3的keyed Opreator有几个状态,这个就不一定是3了,这里有几个状态是由keyby之后有几个key所决定的。

案例:有一个事件流Tuple2[eventId,val],求不同的事件eventId下,相邻3个val的平均值,事件流如下:

(1,4),(2,3),(3,1),(1,2),(3,2),(1,2),(2,2),(2,9)

那么事件1:8/3=2

那么事件2:14/3=4

Keyed State的数据结构类型有:

ValueState<T>:update(T)

ListState<T>:add(T)、get(T)和clear(T)

ReducingState<T>:add(T)、reduceFunction()

MapState<UK,UV>:put(UK,UV)、putAll(Map<UK,UV>)、get(UK)

FlatMapFunction是无状态函数;RichFlatMapFunction是有状态函数

这里没有实现CheckpointedFunction接口,而是直接调用方法 getRuntimeContext(),然后使用getState方法来获取状态值。

特殊场景: 来自一个流的一些数据需要广播到所有下游任务,在这些任务中,这些数据被本地存储并且用于处理另一个流上的所有处理元素 。例如:一个低吞吐量流,其中包含一组规则,我们希望对来自另一个流的所有元素按照规则进行计算

典型应用:常规事件流.connect(规则流)

常规事件流.connect(配置流)

<1>创建常规事件流DataStream或者KeyedDataStream

<2>创建BroadcastedStream:创建规则流/配置流(低吞吐)并广播

<3>连接两个Stream并实现计算处理

process(可以是BroadcastProcessFunction 或者 KeyedBroadcastProcessFunction )

BroadcastProcessFunction:

processElement(...):负责处理非广播流中的传入元素

processBroadcastElement(...):负责处理广播流中的传入元素(如规则),一般广播流的元素添加到状态里去备用,processElement处理业务数据时就可以使用

ReadOnlyContext和Context:

ReadOnlyContext对Broadcast State只有只读权限,Conetxt有写权限

KeyedBroadcastProcessFunction:

注意:

<1>Flink之间没有跨Task的通信

<2>每个任务的广播状态的元素顺序有可能不一样

<3>Broadcast State保存在内存中(并不在RocksDB)


欢迎分享,转载请注明来源:内存溢出

原文地址:https://54852.com/bake/11344361.html

(0)
打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
上一篇 2023-05-15
下一篇2023-05-15

发表评论

登录后才能评论

评论列表(0条)

    保存