Hadoop yarn源码分析(十一) NodeHealthCheck源码解析 2021SC@SDUSC

Hadoop yarn源码分析(十一) NodeHealthCheck源码解析 2021SC@SDUSC,第1张

Hadoop yarn源码分析(十一) NodeHealthCheck源码解析 2021SC@SDUSC

2021SC@SDUSC

Hadoop yarn源码分析(十一) NodeHealthCheck源码解析 2021SC@SDUSC
  • 一、简介
  • 二、NodeHealthCheckerService类
    • 2.1 基本属性
    • 2.2 基本方法
      • 2.2.1 构造方法
      • 2.2.2 初始化方法
      • 2.2.3 获取健康状态报告
      • 2.2.4 获取健康状态和时间
  • 三、NodeHealthscriptRunner类
    • 3.1 简介
    • 3.1 基本属性
    • 3.2 类的构建
    • 3.3 构造方法
  • 四、 NodeHealthMonitorExecutor类
    • 4.1 构造方法
    • 4.2 重要方法
      • 4.2.1 run()方法
      • 4.2.2 reportHealthStatus()方法
      • 4.2.3 hasErrors()方法
    • 4.3 HealthCheckerExitStatus
      • 4.3.1 属性
      • 4.3.2 serviceStart()方法
      • 4.3.3 脚本示例

一、简介

节点健康状况检测是NodeMananger自带的健康状况诊断机制,通过该机制,NodeMananger可以时刻掌握自己的健康状况,并及时汇报给ResoureMananger,RM则根据每个NM的健康状况适当调整分配任务的数目。当NM认为自己健康状况不好时,会通知RM不再为其分配新的任务,等待健康状况好转再分配新的任务。该机制可以及时发现存在问题的NM,避免不必要的任务分配,也可以用于动态升级。

二、NodeHealthCheckerService类

NodeHealthCheckerService类提供了监测节点运行状态的方法,并提交检查程序报告的服务。NM上有一个服务专门用于判断所在节点的健康状况,主要通过两种策略判断:一种是通过管理员自定义的Shell脚本(NodeManager上专门有一个周期性任务执行该脚本, 一旦该脚本输出以"ERROR"开头的字符串, 则认为节点处于不健康状态);另一种是判断磁盘好坏(NodeManager上专门有一个周期性任务检测磁盘的好坏, 如果坏磁盘数目达到一定的比例, 则认为节点处于不健康状态)。

2.1 基本属性

共有6个基本属性。主要用来记录节点运行简况状况,并记录对应的异常信息。

//org.apache.hadoop.yarn.server.nodemanager.health.NodeHealthCheckerService.Java
  //定义日志信息
  public static final Logger LOG =
      LoggerFactory.getLogger(NodeHealthCheckerService.class);
  //常量,最大脚本数为4个
  private static final int MAX_scriptS = 4;
  //健康状况报告列表
  private List reporters;
  //提供检查节点本地目录运行状态的类
  private LocalDirsHandlerService dirsHandler;
  //异常信息
  private ExceptionReporter exceptionReporter;
  //分隔符
  public static final String SEPARATOR = ";";
2.2 基本方法 2.2.1 构造方法

在NodeManager里面已经进行过实例化, 其中核心的scriptRunner和dirHandlerService都是入参。

  public NodeHealthCheckerService(
      LocalDirsHandlerService dirHandlerService) {
    super(NodeHealthCheckerService.class.getName());

    this.reporters = new ArrayList<>();
    this.dirsHandler = dirHandlerService;
    this.exceptionReporter = new ExceptionReporter();
  }
2.2.2 初始化方法

serviceInit()方法,将nodeHealthscriptRunner 和 dirsHandler 加入到service里面, 等待启动

@Override
  protected void serviceInit(Configuration conf) throws Exception {
    reporters.add(exceptionReporter);
    addHealthReporter(dirsHandler);
    //脚本运行状态
    String[] configuredscripts = conf.getTrimmedStrings(
        YarnConfiguration.NM_HEALTH_CHECK_scriptS,
        YarnConfiguration.DEFAULT_NM_HEALTH_CHECK_scriptS);
    if (configuredscripts.length > MAX_scriptS) {
      throw new IllegalArgumentException("Due to performance reasons " +
          "running more than " + MAX_scriptS + "scripts is not allowed.");
    }
    for (String configuredscript : configuredscripts) {
      addHealthReporter(NodeHealthscriptRunner.newInstance(
          configuredscript, conf));
    }
    super.serviceInit(conf);
  }
2.2.3 获取健康状态报告

getHealthReport()方法,该类的核心方法,获取健康状态检查的报告,并以分号(;)拼接,并返回。

//加入附属服务的健康报告
  @Override
  public String getHealthReport() {
    //列表
    ArrayList reports = reporters.stream()
        //获取健康状态的映射和集合
        .map(reporter -> Strings.emptyToNull(reporter.getHealthReport()))
        .collect(Collectors.toCollection(ArrayList::new));
    //返回关于健康状况的字符串,并以分号(;)分隔
    return Joiner.on(SEPARATOR).skipNulls().join(reports);
  }

2.2.4 获取健康状态和时间

主要包含两个方法:isHealthy 和 getLastHealthReportTime,调用健康检查脚本和目录检查,查看是否有异常信息,并返回其异常时间。

  //判断是否健康
  @Override
  public boolean isHealthy() {
    return reporters.stream().allMatch(HealthReporter::isHealthy);
  }

//获取上一次健康报告的时间
  @Override
  public long getLastHealthReportTime() {
    Optional max = reporters.stream()
        .map(HealthReporter::getLastHealthReportTime).max(Long::compareTo);
    return max.orElse(0L);
  }
三、NodeHealthscriptRunner类 3.1 简介

NodeHealthscriptRunner服务主要工作是周期性执行节点健康状况检测脚本。
该服务允许管理员配置一个“健康监测脚本”以检查节点健康状况,且管理员可在该脚本中添加任何检查语句作为节点是否健康运行的依据。若脚本检测到该节点处于不健康状态,它需要在标准输出中打印一条以字符串"ERROR"开头的输出语句。NodeHealthscriptRunner服务周期性调用健康监测脚本并检查其输出,一旦发现脚本输出是以"ERROR"开头的字符串,则认为节点处于不健康状态, 进而将其标注为"unhealthy"并通过心跳告诉ResourceManager。而ResourceManager得知节点状态变为"unhealthy"后,会将其加入黑名单,此后不再为它分配新任务。需要注意的是,只要NodeManager服务是活着的,该线程就会一直运行该脚本 一旦发现节点又变为"healthy", ResourceManager会立刻将其从黑名单中移除,从而又可以为它分配任务。主要有两个优点:
1、可作为节点负载的反馈。 当前yarn仅对CPU和内存两种资源进行了隔离, 其他资源, 比如网络和磁盘IO等, 尚未有任何隔离措施, 这使得不同任务之间仍会有干扰。 而健康脚本检测的方式可从一定程度上缓解该问题, 比如, 可让健康检测脚本检查网络、 磁盘、 文件系统等运行状况, 一旦发现特殊情况, 比如网络拥塞、 磁盘空间不足或者文件系统出现问题, 可将健康状况为"unhealthy", 暂时不接收新的任务, 待它们恢复正常后再继续接收新任务。
2、人为暂时维护NodeManager。 如果发现NodeManager所在节点出现故障, 可通过控制脚本输出暂时让该NodeManager停止接收新任务以便进行维护, 待维护完成后, 修改脚本输出以让NodeManager继续接收新任务。

3.1 基本属性

五个重要属性,都用来检测健康状况

//org.apache.hadoop.yarn.server.nodemanager.health.NodeHealthscriptRunner.Java
private static final Logger LOG =
      LoggerFactory.getLogger(NodeHealthscriptRunner.class);

  //健康监测脚本的绝对路径
  private String nodeHealthscript;
  //脚本应该超时的时间
  private long scriptTimeout;
  //用于执行监视脚本的ShellCommandExecutor
  private ShellCommandExecutor commandExecutor = null;

  //用于在节点运行状况脚本的输出中搜索的模式
  private static final String ERROR_PATTERN = "ERROR";

  //超时错误消息
  static final String NODE_HEALTH_script_TIMED_OUT_MSG =
      "Node health script timed out";
3.2 类的构建
public static NodeHealthscriptRunner getNodeHealthscriptRunner(Configuration conf) {

    // 健康检查脚本所在的绝对路径
   String nodeHealthscript =  conf.get(YarnConfiguration.NM_HEALTH_CHECK_script_PATH);

   // 脚本是否有效 [是否存在/可执行]
    if(!NodeHealthscriptRunner.shouldRun(nodeHealthscript)) {
      LOG.info("Node Manager health check script is not available "
          + "or doesn't have execute permission, so not "
          + "starting the node health script runner.");
      return null;
    }

    // 健康检查脚本检测周期
    // yarn.nodemanager.health-checker.interval-ms : 10min
    long nmCheckintervalTime = conf.getLong(
        YarnConfiguration.NM_HEALTH_CHECK_INTERVAL_MS,
        YarnConfiguration.DEFAULT_NM_HEALTH_CHECK_INTERVAL_MS);
    //健康检查脚本超时时间
    // yarn.nodemanager.health-checker.script.timeout-ms : 20 min
    long scriptTimeout = conf.getLong(
        YarnConfiguration.NM_HEALTH_CHECK_script_TIMEOUT_MS,
        YarnConfiguration.DEFAULT_NM_HEALTH_CHECK_script_TIMEOUT_MS);

    // 健康检查脚本参数
    // yarn.nodemanager.health-checker.script.opts
    String[] scriptArgs = conf.getStrings(
        YarnConfiguration.NM_HEALTH_CHECK_script_OPTS, new String[] {});
    return new NodeHealthscriptRunner(nodeHealthscript,
        nmCheckintervalTime, scriptTimeout, scriptArgs);
  }
3.3 构造方法
private NodeHealthscriptRunner(String scriptName, long checkInterval,
      long timeout, String[] scriptArgs, boolean runBeforeStartup) {
    super(NodeHealthscriptRunner.class.getName(), checkInterval,
        runBeforeStartup);
    this.nodeHealthscript = scriptName;
    this.scriptTimeout = timeout;
    setTimerTask(new NodeHealthMonitorExecutor(scriptArgs));
  }

四、 NodeHealthMonitorExecutor类

NodeHealthMonitorExecutor 继承了TimerTask .,用于周期性的执行健康检查脚本。该类只有一个exceptionStackTrace属性,用于存放报错的信息。

4.1 构造方法

对健康检查的脚本进行设置,构造ShellCommandExecutor

public NodeHealthMonitorExecutor(String[] args) {
      ArrayList execscript = new ArrayList();
      //添加脚本
      execscript.add(nodeHealthscript);
      if (args != null) {
        execscript.addAll(Arrays.asList(args));
      }
      shexec = new ShellCommandExecutor(execscript
          .toArray(new String[execscript.size()]), null, null, scriptTimeout);
    }
4.2 重要方法 4.2.1 run()方法

该类的核心方法,用于执行健康检查的脚本。

@Override
    public void run() {
      HealthCheckerExitStatus status = HealthCheckerExitStatus.SUCCESS;
      try {
        //执行健康检查脚本
        shexec.execute();
      } catch (ExitCodeException e) {
        //忽略脚本的退出代码
        status = HealthCheckerExitStatus.FAILED_WITH_EXIT_CODE;
        //在Windows上,对于超时事件,不会命中标准输出缓冲读取器引发的流关闭IOException
        if (Shell.WINDOWS && shexec.isTimedOut()) {
          status = HealthCheckerExitStatus.TIMED_OUT;
        }
      } catch (Exception e) {
        LOG.warn("Caught exception : " + e.getMessage());
        if (!shexec.isTimedOut()) {
          status = HealthCheckerExitStatus.FAILED_WITH_EXCEPTION;
        } else {
          status = HealthCheckerExitStatus.TIMED_OUT;
        }
        exceptionStackTrace = StringUtils.stringifyException(e);
      } finally {
        if (status == HealthCheckerExitStatus.SUCCESS) {
          //处理输出,是否已"ERROR" 开头的错误信息
          if (hasErrors(shexec.getOutput())) {
            status = HealthCheckerExitStatus.FAILED;
          }
        }
        //报告健康状态
        reportHealthStatus(status);
      }
    }
4.2.2 reportHealthStatus()方法

根据健康检查的返回状态 , 处理 NodeManager的状态。该方法用于解析节点运行状况监视器和发送到报告地址,超时脚本或导致IOException输出的脚本被忽略。如果发生以下情况,则该节点被标记为不正常:
节点运行状况脚本超时
节点运行状况脚本输出有一行以ERROR开头
执行脚本时引发异常
如果脚本抛出{@link IOException}或{@link ExitCodeException},则输出将被忽略,节点将保持正常状态,因为脚本可能存在语法错误

    void reportHealthStatus(HealthCheckerExitStatus status) {
      long now = System.currentTimeMillis();
      switch (status) {
      case SUCCESS:
        setHealthStatus(true, "", now);
        break;
      case TIMED_OUT:
        setHealthStatus(false, NODE_HEALTH_script_TIMED_OUT_MSG);
        break;
      case FAILED_WITH_EXCEPTION:
        setHealthStatus(false, exceptionStackTrace);
        break;
      case FAILED_WITH_EXIT_CODE:
        setHealthStatus(true, "", now);
        break;
      case FAILED:
        setHealthStatus(false, shexec.getOutput());
        break;
      }
    }
4.2.3 hasErrors()方法

检查脚本输出是否"ERROR" 字符开头的错误信息,若代码写死了, 错误信息必须以: “ERROR” 开头。

private boolean hasErrors(String output) {
      String[] splits = output.split("n");
      for (String split : splits) {
        if (split.startsWith(ERROR_PATTERN)) {
          return true;
        }
      }
      return false;
    }
4.3 HealthCheckerExitStatus

一共就五种状态:

  private enum HealthCheckerExitStatus {
    SUCCESS,                // 成功
    TIMED_OUT,              // 超时
    FAILED_WITH_EXIT_CODE,  // 已知的错误, 有错误码
    FAILED_WITH_EXCEPTION,  // 未知错误
    FAILED                  // 失败
  }
4.3.1 属性

多种属性,用于检查脚本健康状态

  // 健康检查脚本绝对路径
  private String nodeHealthscript;
  // 健康检查脚本执行间隔, 默认10min
  private long intervalTime;
  
  //健康检查脚本超时时间
  private long scriptTimeout;
  
  //定时器
  private Timer nodeHealthscriptScheduler;

  //执行健康检查脚本的工具类
  ShellCommandExecutor shexec = null;

  //异常信息的开头定义: ERROR
  static private final String ERROR_PATTERN = "ERROR";

  //健康检查脚本超时提示信息
  public static final String NODE_HEALTH_script_TIMED_OUT_MSG = "Node health script timed out";

  //是否健康
  private boolean isHealthy;

  //健康检查脚本报告
  private String healthReport;

  //健康检查 最后一次时间戳
  private long lastReportedTime;

  //定时器 task
  private TimerTask timer;
4.3.2 serviceStart()方法

设置定时Timer以及具体的TimerTask ,默认执行周期为10min, 超时时间为2*10min

  @Override
  protected void serviceStart() throws Exception {
    nodeHealthscriptScheduler = new Timer("NodeHealthMonitor-Timer", true);
    // Start the timer task immediately and
    // then periodically at interval time.
    nodeHealthscriptScheduler.scheduleAtFixedRate(timer, 0, intervalTime);
    super.serviceStart();
  }
4.3.3 脚本示例

在这个Shell脚本中, 当一个节点上的空闲内存量低于总内存量的10%时, 将打印以"ERROR"开头的字符串, 这样该节点将不再向ResourceManager请求新任务。

#!/bin/bash
MEMORY_RATIO=0.1
freeMem=`grep MemFree /proc/meminfo | awk '{print }'`
totalMem=`grep MemTotal /proc/meminfo | awk '{print }'`
limitMem=`echo | awk '{print int("'$totalMem'"*"'$MEMORY_RATIO'")}'`
if [ $freeMem -lt $limitMem ];then
echo "ERROR, totalMem=$totalMem, freeMem=$freeMem, limitMem=$limitMem"
else
echo "OK, totalMem=$totalMem, freeMem=$freeMem, limitMem=$limitMem"
fi

欢迎分享,转载请注明来源:内存溢出

原文地址:https://54852.com/zaji/5678724.html

(0)
打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
上一篇 2022-12-17
下一篇2022-12-16

发表评论

登录后才能评论

评论列表(0条)

    保存