gpt4 book ai didi

org.apache.flink.yarn.highavailability.YarnIntraNonHaMasterServices类的使用及代码示例

转载 作者:知者 更新时间:2024-03-16 13:47:31 28 4
gpt4 key购买 nike

本文整理了Java中org.apache.flink.yarn.highavailability.YarnIntraNonHaMasterServices类的一些代码示例,展示了YarnIntraNonHaMasterServices类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。YarnIntraNonHaMasterServices类的具体详情如下:
包路径:org.apache.flink.yarn.highavailability.YarnIntraNonHaMasterServices
类名称:YarnIntraNonHaMasterServices

YarnIntraNonHaMasterServices介绍

[英]These YarnHighAvailabilityServices are for the Application Master in setups where there is one ResourceManager that is statically configured in the Flink configuration.

Handled failure types

  • User code & operator failures: Failed operators are recovered from checkpoints.
  • Task Manager Failures: Failed Task Managers are restarted and their tasks are recovered from checkpoints.

Non-recoverable failure types

  • Application Master failures: These failures cannot be recovered, because TaskManagers have no way to discover the new Application Master's address.

Internally, these services put their recovery data into YARN's working directory, except for checkpoints, which are in the configured checkpoint directory. That way, checkpoints can be resumed with a new job/application, even if the complete YARN application is killed and cleaned up.

Because ResourceManager and JobManager run both in the same process (Application Master), they use an embedded leader election service to find each other.

A typical YARN setup that uses these HA services first starts the ResourceManager inside the ApplicationMaster and puts its RPC endpoint address into the configuration with which the TaskManagers are started. Because of this static addressing scheme, the setup cannot handle failures of the JobManager and ResourceManager, which are running as part of the Application Master.
[中]这些YarnHighAvailabilityServices适用于设置中的应用程序主机,其中有一个ResourceManager是在Flink配置中静态配置的。
####处理故障类型
*用户代码和操作员故障:从检查点恢复失败的操作员。
*任务管理器失败:重新启动失败的任务管理器,并从检查点恢复其任务。
####不可恢复故障类型
*应用程序主机故障:这些故障无法恢复,因为TaskManager无法发现新的应用程序主机地址。
在内部,这些服务将其恢复数据放入Thread的工作目录,但检查点除外,检查点位于已配置的检查点目录中。这样,即使完整的纱线应用程序被终止和清理,也可以使用新的作业/应用程序恢复检查点。
因为ResourceManager和JobManager都在同一个进程(应用程序主进程)中运行,所以它们使用嵌入式领导人选举服务来寻找彼此。
使用这些HA服务的典型Thread设置首先在ApplicationMaster中启动ResourceManager,并将其RPC端点地址放入启动TaskManager的配置中。由于这种静态寻址方案,安装程序无法处理JobManager和ResourceManager的故障,它们作为应用程序主机的一部分运行。

代码示例

代码示例来源:origin: apache/flink

@Override
public LeaderElectionService getResourceManagerLeaderElectionService() {
  enter();
  try {
    return resourceManagerLeaderElectionService;
  }
  finally {
    exit();
  }
}

代码示例来源:origin: apache/flink

@Override
  public void close() throws Exception {
    if (enterUnlessClosed()) {
      try {
        try {
          // this class' own cleanup logic
          resourceManagerLeaderElectionService.shutdown();
          dispatcher.shutdownNow();
        }
        finally {
          // in any case must we call the parent cleanup logic
          super.close();
        }
      }
      finally {
        exit();
      }
    }
  }
}

代码示例来源:origin: apache/flink

/**
 * Creates the high-availability services for a single-job Flink YARN application, to be
 * used in the Application Master that runs both ResourceManager and JobManager.
 *
 * @param flinkConfig  The Flink configuration.
 * @param hadoopConfig The Hadoop configuration for the YARN cluster.
 *
 * @return The created high-availability services.
 *
 * @throws IOException Thrown, if the high-availability services could not be initialized.
 */
public static YarnHighAvailabilityServices forSingleJobAppMaster(
    Configuration flinkConfig,
    org.apache.hadoop.conf.Configuration hadoopConfig) throws IOException {
  checkNotNull(flinkConfig, "flinkConfig");
  checkNotNull(hadoopConfig, "hadoopConfig");
  final HighAvailabilityMode mode = HighAvailabilityMode.fromConfig(flinkConfig);
  switch (mode) {
    case NONE:
      return new YarnIntraNonHaMasterServices(flinkConfig, hadoopConfig);
    case ZOOKEEPER:
      throw  new UnsupportedOperationException("to be implemented");
    default:
      throw new IllegalConfigurationException("Unrecognized high availability mode: " + mode);
  }
}

代码示例来源:origin: apache/flink

@Test
public void testRepeatedClose() throws Exception {
  final Configuration flinkConfig = new Configuration();
  final YarnHighAvailabilityServices services = new YarnIntraNonHaMasterServices(flinkConfig, hadoopConfig);
  services.closeAndCleanupAllData();
  // this should not throw an exception
  services.close();
}

代码示例来源:origin: apache/flink

@Override
public LeaderElectionService getDispatcherLeaderElectionService() {
  enter();
  try {
    return dispatcherLeaderElectionService;
  } finally {
    exit();
  }
}

代码示例来源:origin: org.apache.flink/flink-yarn_2.11

@Override
  public void close() throws Exception {
    if (enterUnlessClosed()) {
      try {
        try {
          // this class' own cleanup logic
          resourceManagerLeaderElectionService.shutdown();
          dispatcher.shutdownNow();
        }
        finally {
          // in any case must we call the parent cleanup logic
          super.close();
        }
      }
      finally {
        exit();
      }
    }
  }
}

代码示例来源:origin: apache/flink

@Test
public void testClosingReportsToLeader() throws Exception {
  final Configuration flinkConfig = new Configuration();
  try (YarnHighAvailabilityServices services = new YarnIntraNonHaMasterServices(flinkConfig, hadoopConfig)) {
    final LeaderElectionService elector = services.getResourceManagerLeaderElectionService();
    final LeaderRetrievalService retrieval = services.getResourceManagerLeaderRetriever();
    final LeaderContender contender = mockContender(elector);
    final LeaderRetrievalListener listener = mock(LeaderRetrievalListener.class);
    elector.start(contender);
    retrieval.start(listener);
    // wait until the contender has become the leader
    verify(listener, timeout(1000L).times(1)).notifyLeaderAddress(anyString(), any(UUID.class));
    // now we can close the election service
    services.close();
    verify(contender, timeout(1000L).times(1)).handleError(any(Exception.class));
  }
}

代码示例来源:origin: apache/flink

@Override
public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, String defaultJobManagerAddress) {
  enter();
  try {
    throw new UnsupportedOperationException("needs refactoring to accept default address");
  }
  finally {
    exit();
  }
}

代码示例来源:origin: org.apache.flink/flink-yarn

@Override
  public void close() throws Exception {
    if (enterUnlessClosed()) {
      try {
        try {
          // this class' own cleanup logic
          resourceManagerLeaderElectionService.shutdown();
          dispatcher.shutdownNow();
        }
        finally {
          // in any case must we call the parent cleanup logic
          super.close();
        }
      }
      finally {
        exit();
      }
    }
  }
}

代码示例来源:origin: org.apache.flink/flink-yarn_2.11

/**
 * Creates the high-availability services for a single-job Flink YARN application, to be
 * used in the Application Master that runs both ResourceManager and JobManager.
 *
 * @param flinkConfig  The Flink configuration.
 * @param hadoopConfig The Hadoop configuration for the YARN cluster.
 *
 * @return The created high-availability services.
 *
 * @throws IOException Thrown, if the high-availability services could not be initialized.
 */
public static YarnHighAvailabilityServices forSingleJobAppMaster(
    Configuration flinkConfig,
    org.apache.hadoop.conf.Configuration hadoopConfig) throws IOException {
  checkNotNull(flinkConfig, "flinkConfig");
  checkNotNull(hadoopConfig, "hadoopConfig");
  final HighAvailabilityMode mode = HighAvailabilityMode.fromConfig(flinkConfig);
  switch (mode) {
    case NONE:
      return new YarnIntraNonHaMasterServices(flinkConfig, hadoopConfig);
    case ZOOKEEPER:
      throw  new UnsupportedOperationException("to be implemented");
    default:
      throw new IllegalConfigurationException("Unrecognized high availability mode: " + mode);
  }
}

代码示例来源:origin: apache/flink

@Override
public LeaderRetrievalService getWebMonitorLeaderRetriever() {
  enter();
  try {
    throw new UnsupportedOperationException();
  }
  finally {
    exit();
  }
}

代码示例来源:origin: org.apache.flink/flink-yarn

/**
 * Creates the high-availability services for a single-job Flink YARN application, to be
 * used in the Application Master that runs both ResourceManager and JobManager.
 *
 * @param flinkConfig  The Flink configuration.
 * @param hadoopConfig The Hadoop configuration for the YARN cluster.
 *
 * @return The created high-availability services.
 *
 * @throws IOException Thrown, if the high-availability services could not be initialized.
 */
public static YarnHighAvailabilityServices forSingleJobAppMaster(
    Configuration flinkConfig,
    org.apache.hadoop.conf.Configuration hadoopConfig) throws IOException {
  checkNotNull(flinkConfig, "flinkConfig");
  checkNotNull(hadoopConfig, "hadoopConfig");
  final HighAvailabilityMode mode = HighAvailabilityMode.fromConfig(flinkConfig);
  switch (mode) {
    case NONE:
      return new YarnIntraNonHaMasterServices(flinkConfig, hadoopConfig);
    case ZOOKEEPER:
      throw  new UnsupportedOperationException("to be implemented");
    default:
      throw new IllegalConfigurationException("Unrecognized high availability mode: " + mode);
  }
}

代码示例来源:origin: apache/flink

@Override
public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) {
  enter();
  try {
    throw new UnsupportedOperationException("needs refactoring to accept default address");
  }
  finally {
    exit();
  }
}

代码示例来源:origin: apache/flink

@Override
public LeaderElectionService getWebMonitorLeaderElectionService() {
  enter();
  try {
    throw new UnsupportedOperationException();
  }
  finally {
    exit();
  }
}

代码示例来源:origin: apache/flink

@Override
public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) {
  enter();
  try {
    throw new UnsupportedOperationException("needs refactoring to accept default address");
  }
  finally {
    exit();
  }
}

代码示例来源:origin: apache/flink

@Override
public LeaderRetrievalService getResourceManagerLeaderRetriever() {
  enter();
  try {
    return resourceManagerLeaderElectionService.createLeaderRetrievalService();
  }
  finally {
    exit();
  }
}

代码示例来源:origin: apache/flink

@Override
public LeaderRetrievalService getDispatcherLeaderRetriever() {
  enter();
  try {
    return dispatcherLeaderElectionService.createLeaderRetrievalService();
  } finally {
    exit();
  }
}

代码示例来源:origin: org.apache.flink/flink-yarn

@Override
public LeaderElectionService getDispatcherLeaderElectionService() {
  enter();
  try {
    return dispatcherLeaderElectionService;
  } finally {
    exit();
  }
}

代码示例来源:origin: org.apache.flink/flink-yarn

@Override
public LeaderElectionService getResourceManagerLeaderElectionService() {
  enter();
  try {
    return resourceManagerLeaderElectionService;
  }
  finally {
    exit();
  }
}

代码示例来源:origin: org.apache.flink/flink-yarn_2.11

@Override
public LeaderElectionService getDispatcherLeaderElectionService() {
  enter();
  try {
    return dispatcherLeaderElectionService;
  } finally {
    exit();
  }
}

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com