
从半年现在从0开始搭建Flink实时计算平台,部分存储层用到了Elasticsearch,从零开始接触Flink,这半年来遇到了好多坑,由传统的开发转变成了大数据开发,Elasticsearch内含有多种熔断器,为了防止OOM。由于目前业务查询的方式会造成成本很高,(可以看一下allow_expensive_querys),某次查询可能会引起服务的熔断,这时候有可能引起实时任务 sink Elasticsearch请求也会被熔断。
当然 Flink Connector 提供了几种失败处理机制
- IgnoringFailureHandler: 会忽略所有 sink elasticsearch Connector的异常 ;
- NoOpFailureHandler: 不处理任何异常,只输出异常栈信息(默认);
- RetryRejectedExecutionFailureHandler: 遇到特定异常时会进行重试 包涵 EsRejectedExecutionException类以及他的子类。
当我们遇到更新比较多频繁的时候,用IgnoringFailureHandler当写入ES失败时不影响Flink任务,当然遇到比较敏感统计时,我们需要对失败的结果集进行重试,
需要配合RetryRejectedExecutionFailureHandler 来进行处理,源码中只会处理EsRejectedExecutionException类以及他的子类,当然熔断类型的异常归属于ElasticsearchStatusException 异常,两者并没有关系。为防止Flink因elasticsearch集群熔断导致挂掉,我们需要做特定的处理,重写ActionRequestFailureHandler。
为了可以更好地扩展,我们首先定义一个策略类ElasticsearchExceptionHandlerStrategy代码如下:
@Getter
public enum ElasticsearchExceptionHandlerStrategy {
DEFAULT(Lists.newArrayList()),
ALL_EXCEPTION(Lists.newArrayList(Throwable.class)),
ELASTICSEARCH_EXCEPTION(Lists.newArrayList(ElasticsearchException.class)),
ELASTICSEARCH_STATUS_AND_REJECTED_EXCEPTION(Lists.newArrayList(org.elasticsearch.ElasticsearchStatusException.class,
EsRejectedExecutionException .class)),;
final List> exceptionClass;
ElasticsearchExceptionHandlerStrategy(List> exceptionClass) {
this.exceptionClass = exceptionClass;
}
}
定义了四种策略
- ALL_EXCEPTION 全部异常
- ELASTICSEARCH_EXCEPTION ELASTICSEARCH_EXCEPTION elasticsearch全部异常
- ELASTICSEARCH_STATUS_AND__EXCEPTION EsRejectedExecutionException 和 ElasticsearchStatusException 异常
- DEFAULT 默认空不处理或默认使用父类, 由handler来决定实现
重写异常处理类可根据实际业务去扩展ElasticsearchExceptionHandlerStrategy 枚举类。
- RetryExecutionFailureHandler: 特定的异常失败重试 如果策略为DEFAULT时 会交由父类去处理(RetryRejectedExecutionFailureHandler) 代码如下:
@Slf4j
public class RetryExecutionFailureHandler extends RetryRejectedExecutionFailureHandler {
private static final long serialVersionUID = -1;
private ElasticsearchExceptionHandlerStrategy strategy;
@Nullable
public RetryExecutionFailureHandler(ElasticsearchExceptionHandlerStrategy strategy) {
this.strategy = strategy;
}
@Override
public void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) throws Throwable {
if (Objects.isNull(strategy) || CollectionUtils.isEmpty(strategy.getExceptionClass())) {
super.onFailure(action, failure, restStatusCode, indexer);
return;
}
log.error("Failed Elasticsearch item request: {}", failure.getMessage(), failure);
for (Class extends Throwable> exceptionClass : strategy.getExceptionClass()) {
if (ExceptionUtils.findThrowable(failure, exceptionClass).isPresent()) {
indexer.add(action);
return;
}
}
// rethrow all other failures
throw failure;
}
}
- IgnoringExceptionFailureHandler: 特定的异常忽略 如果策略为DEFAULT时 类似于IgnoringFailureHandler处理代码如下:
@Slf4j
public class IgnoringExceptionFailureHandler implements ActionRequestFailureHandler {
private static final long serialVersionUID = -1;
private ElasticsearchExceptionHandlerStrategy strategy;
@Override
public void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) throws Throwable {
if (Objects.isNull(strategy) || CollectionUtils.isEmpty(strategy.getExceptionClass())) {
return;
}
log.error("Failed Elasticsearch item request: {}", failure.getMessage(), failure);
for (Class extends Throwable> exceptionClass : strategy.getExceptionClass()) {
if (ExceptionUtils.findThrowable(failure, exceptionClass).isPresent()) {
return;
}
}
// rethrow all other failures
throw failure;
}
}
Sink ES 代码详细配置
伪代码如下:
ElasticsearchSink.Builder
总结Elasticsearch失败重试机制依赖于checkpoint 可参看源码:ElasticsearchSinkbase类
以上拙见,毕竟才入坑,欢迎交流~ 推荐一波Flink 的发布平台。切记:没有最优的公共配置,需要根据特定场景才能达到相应的效果。
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)