Flink sink Elasticsearch 防止任务中断

Flink sink Elasticsearch 防止任务中断,第1张

Flink sink Elasticsearch 防止任务中断 前言

从半年现在从0开始搭建Flink实时计算平台,部分存储层用到了Elasticsearch,从零开始接触Flink,这半年来遇到了好多坑,由传统的开发转变成了大数据开发,Elasticsearch内含有多种熔断器,为了防止OOM。由于目前业务查询的方式会造成成本很高,(可以看一下allow_expensive_querys),某次查询可能会引起服务的熔断,这时候有可能引起实时任务 sink Elasticsearch请求也会被熔断。
当然 Flink Connector 提供了几种失败处理机制

  1. IgnoringFailureHandler: 会忽略所有 sink elasticsearch Connector的异常 ;
  2. NoOpFailureHandler: 不处理任何异常,只输出异常栈信息(默认);
  3. RetryRejectedExecutionFailureHandler: 遇到特定异常时会进行重试 包涵 EsRejectedExecutionException类以及他的子类。

当我们遇到更新比较多频繁的时候,用IgnoringFailureHandler当写入ES失败时不影响Flink任务,当然遇到比较敏感统计时,我们需要对失败的结果集进行重试,
需要配合RetryRejectedExecutionFailureHandler 来进行处理,源码中只会处理EsRejectedExecutionException类以及他的子类,当然熔断类型的异常归属于ElasticsearchStatusException 异常,两者并没有关系。为防止Flink因elasticsearch集群熔断导致挂掉,我们需要做特定的处理,重写ActionRequestFailureHandler。

重写处理类 策略

为了可以更好地扩展,我们首先定义一个策略类ElasticsearchExceptionHandlerStrategy代码如下:


@Getter
public enum ElasticsearchExceptionHandlerStrategy {

    
    DEFAULT(Lists.newArrayList()),

    
    ALL_EXCEPTION(Lists.newArrayList(Throwable.class)),

    
    ELASTICSEARCH_EXCEPTION(Lists.newArrayList(ElasticsearchException.class)),

    
    ELASTICSEARCH_STATUS_AND_REJECTED_EXCEPTION(Lists.newArrayList(org.elasticsearch.ElasticsearchStatusException.class,
                                                EsRejectedExecutionException .class)),;

    final List> exceptionClass;

    ElasticsearchExceptionHandlerStrategy(List> exceptionClass) {
        this.exceptionClass = exceptionClass;
    }

}

定义了四种策略

  1. ALL_EXCEPTION 全部异常
  2. ELASTICSEARCH_EXCEPTION ELASTICSEARCH_EXCEPTION elasticsearch全部异常
  3. ELASTICSEARCH_STATUS_AND__EXCEPTION EsRejectedExecutionException 和 ElasticsearchStatusException 异常
  4. DEFAULT 默认空不处理或默认使用父类, 由handler来决定实现

可根据实际业务去扩展ElasticsearchExceptionHandlerStrategy 枚举类。

重写异常处理类
  • RetryExecutionFailureHandler: 特定的异常失败重试 如果策略为DEFAULT时 会交由父类去处理(RetryRejectedExecutionFailureHandler) 代码如下:

@Slf4j
public class RetryExecutionFailureHandler extends RetryRejectedExecutionFailureHandler {

    private static final long serialVersionUID = -1;

    private ElasticsearchExceptionHandlerStrategy strategy;

    @Nullable
    public RetryExecutionFailureHandler(ElasticsearchExceptionHandlerStrategy strategy) {
        this.strategy = strategy;
    }

    @Override
    public void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) throws Throwable {

        if (Objects.isNull(strategy) || CollectionUtils.isEmpty(strategy.getExceptionClass())) {
            super.onFailure(action, failure, restStatusCode, indexer);
            return;
        }

        log.error("Failed Elasticsearch item request: {}", failure.getMessage(), failure);
        for (Class exceptionClass : strategy.getExceptionClass()) {
            if (ExceptionUtils.findThrowable(failure, exceptionClass).isPresent()) {
                indexer.add(action);
                return;
            }
        }
        // rethrow all other failures
        throw failure;

    }
}

  • IgnoringExceptionFailureHandler: 特定的异常忽略 如果策略为DEFAULT时 类似于IgnoringFailureHandler处理代码如下:

@Slf4j
public class IgnoringExceptionFailureHandler implements ActionRequestFailureHandler {

    private static final long serialVersionUID = -1;

    private ElasticsearchExceptionHandlerStrategy strategy;

    @Override
    public void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) throws Throwable {

        if (Objects.isNull(strategy) || CollectionUtils.isEmpty(strategy.getExceptionClass())) {
            return;
        }
        log.error("Failed Elasticsearch item request: {}", failure.getMessage(), failure);
        for (Class exceptionClass : strategy.getExceptionClass()) {
            if (ExceptionUtils.findThrowable(failure, exceptionClass).isPresent()) {
                return;
            }
        }
        // rethrow all other failures
        throw failure;

    }
}

Sink ES 代码详细配置

伪代码如下:

ElasticsearchSink.Builder builder = new ElasticsearchSink.Builder(httpHosts,
                new ElasticsearchSinkFunction(){...});
//配置批量提交
builder.setBulkFlushBackoff(true);
//设置重试次数
builder.setBulkFlushBackoffRetries(2);
//设置重试间隔
builder.setBulkFlushBackoffDelay(2000L);
//设置重试策略CONSTANT: 常数 eg: 重试间隔为2s 重试3次 会在2s->4s->6s进行; EXPONENTIAL:指数 eg:  重试间隔为2s 重试3次 会在2s->4s->8s进行
builder.setBulkFlushBackoffType(ElasticsearchSinkbase.FlushBackoffType.CONSTANT);
//设置批量提交最大数据量
builder.setBulkFlushMaxSizeMb(10);
//设置批量提交间隔
builder.setBulkFlushInterval(2000L);
//设置批量提交的最大条数
builder.setBulkFlushMaxActions(1000);
//设置重试机制
builder.Builder.setFailureHandler(new RetryExecutionFailureHandler(ElasticsearchExceptionHandlerStrategy.DEFAULT));

 

Elasticsearch失败重试机制依赖于checkpoint 可参看源码:ElasticsearchSinkbase类

总结

以上拙见,毕竟才入坑,欢迎交流~ 推荐一波Flink 的发布平台。切记:没有最优的公共配置,需要根据特定场景才能达到相应的效果。

欢迎分享,转载请注明来源:内存溢出

原文地址:https://54852.com/zaji/5654236.html

(0)
打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
上一篇 2022-12-16
下一篇2022-12-16

发表评论

登录后才能评论

评论列表(0条)