gpt4 book ai didi

org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.prepareAndCommitOffsets()方法的使用及代码示例

转载 作者:知者 更新时间:2024-03-19 15:25:31 25 4
gpt4 key购买 nike

本文整理了Java中org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.prepareAndCommitOffsets()方法的一些代码示例,展示了ZookeeperOffsetHandler.prepareAndCommitOffsets()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。ZookeeperOffsetHandler.prepareAndCommitOffsets()方法的具体详情如下:
包路径:org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler
类名称:ZookeeperOffsetHandler
方法名:prepareAndCommitOffsets

ZookeeperOffsetHandler.prepareAndCommitOffsets介绍

[英]Commits offsets for Kafka partitions to ZooKeeper. The given offsets to this method should be the offsets of the last processed records; this method will take care of incrementing the offsets by 1 before committing them so that the committed offsets to Zookeeper represent the next record to process.
[中]将Kafka分区的偏移提交给ZooKeeper。此方法的给定偏移量应该是最后处理的记录的偏移量;此方法将在提交偏移量之前将偏移量增加1,以便提交给Zookeeper的偏移量代表下一个要处理的记录。

代码示例

代码示例来源:origin: apache/flink

@Override
public void run() {
  try {
    while (running) {
      Thread.sleep(commitInterval);
      // create copy a deep copy of the current offsets
      HashMap<KafkaTopicPartition, Long> offsetsToCommit = new HashMap<>(partitionStates.size());
      for (KafkaTopicPartitionState<?> partitionState : partitionStates) {
        offsetsToCommit.put(partitionState.getKafkaTopicPartition(), partitionState.getOffset());
      }
      offsetHandler.prepareAndCommitOffsets(offsetsToCommit);
    }
  }
  catch (Throwable t) {
    if (running) {
      errorHandler.reportError(
          new Exception("The periodic offset committer encountered an error: " + t.getMessage(), t));
    }
  }
}

代码示例来源:origin: apache/flink

@Override
protected void doCommitInternalOffsetsToKafka(
    Map<KafkaTopicPartition, Long> offsets,
    @Nonnull KafkaCommitCallback commitCallback) throws Exception {
  ZookeeperOffsetHandler zkHandler = this.zookeeperOffsetHandler;
  if (zkHandler != null) {
    try {
      // the ZK handler takes care of incrementing the offsets by 1 before committing
      zkHandler.prepareAndCommitOffsets(offsets);
      commitCallback.onSuccess();
    }
    catch (Exception e) {
      if (running) {
        commitCallback.onException(e);
        throw e;
      } else {
        return;
      }
    }
  }
  // Set committed offsets in topic partition state
  for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitionStates()) {
    Long offset = offsets.get(partition.getKafkaTopicPartition());
    if (offset != null) {
      partition.setCommittedOffset(offset);
    }
  }
}

代码示例来源:origin: org.apache.flink/flink-connector-kafka-0.8_2.11

@Override
public void run() {
  try {
    while (running) {
      Thread.sleep(commitInterval);
      // create copy a deep copy of the current offsets
      HashMap<KafkaTopicPartition, Long> offsetsToCommit = new HashMap<>(partitionStates.size());
      for (KafkaTopicPartitionState<?> partitionState : partitionStates) {
        offsetsToCommit.put(partitionState.getKafkaTopicPartition(), partitionState.getOffset());
      }
      offsetHandler.prepareAndCommitOffsets(offsetsToCommit);
    }
  }
  catch (Throwable t) {
    if (running) {
      errorHandler.reportError(
          new Exception("The periodic offset committer encountered an error: " + t.getMessage(), t));
    }
  }
}

代码示例来源:origin: org.apache.flink/flink-connector-kafka-0.8

@Override
public void run() {
  try {
    while (running) {
      Thread.sleep(commitInterval);
      // create copy a deep copy of the current offsets
      HashMap<KafkaTopicPartition, Long> offsetsToCommit = new HashMap<>(partitionStates.size());
      for (KafkaTopicPartitionState<?> partitionState : partitionStates) {
        offsetsToCommit.put(partitionState.getKafkaTopicPartition(), partitionState.getOffset());
      }
      offsetHandler.prepareAndCommitOffsets(offsetsToCommit);
    }
  }
  catch (Throwable t) {
    if (running) {
      errorHandler.reportError(
          new Exception("The periodic offset committer encountered an error: " + t.getMessage(), t));
    }
  }
}

代码示例来源:origin: org.apache.flink/flink-connector-kafka-0.8_2.10

@Override
public void run() {
  try {
    while (running) {
      Thread.sleep(commitInterval);
      // create copy a deep copy of the current offsets
      HashMap<KafkaTopicPartition, Long> offsetsToCommit = new HashMap<>(partitionStates.length);
      for (KafkaTopicPartitionState<?> partitionState : partitionStates) {
        offsetsToCommit.put(partitionState.getKafkaTopicPartition(), partitionState.getOffset());
      }
      
      offsetHandler.prepareAndCommitOffsets(offsetsToCommit);
    }
  }
  catch (Throwable t) {
    if (running) {
      errorHandler.reportError(
          new Exception("The periodic offset committer encountered an error: " + t.getMessage(), t));
    }
  }
}

代码示例来源:origin: org.apache.flink/flink-connector-kafka-0.8

@Override
protected void doCommitInternalOffsetsToKafka(
    Map<KafkaTopicPartition, Long> offsets,
    @Nonnull KafkaCommitCallback commitCallback) throws Exception {
  ZookeeperOffsetHandler zkHandler = this.zookeeperOffsetHandler;
  if (zkHandler != null) {
    try {
      // the ZK handler takes care of incrementing the offsets by 1 before committing
      zkHandler.prepareAndCommitOffsets(offsets);
      commitCallback.onSuccess();
    }
    catch (Exception e) {
      if (running) {
        commitCallback.onException(e);
        throw e;
      } else {
        return;
      }
    }
  }
  // Set committed offsets in topic partition state
  for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitionStates()) {
    Long offset = offsets.get(partition.getKafkaTopicPartition());
    if (offset != null) {
      partition.setCommittedOffset(offset);
    }
  }
}

代码示例来源:origin: org.apache.flink/flink-connector-kafka-0.8_2.10

@Override
public void commitInternalOffsetsToKafka(
    Map<KafkaTopicPartition, Long> offsets,
    @Nonnull KafkaCommitCallback commitCallback) throws Exception {
  ZookeeperOffsetHandler zkHandler = this.zookeeperOffsetHandler;
  if (zkHandler != null) {
    try {
      // the ZK handler takes care of incrementing the offsets by 1 before committing
      zkHandler.prepareAndCommitOffsets(offsets);
      commitCallback.onSuccess();
    }
    catch (Exception e) {
      if (running) {
        commitCallback.onException(e);
        throw e;
      } else {
        return;
      }
    }
  }
  // Set committed offsets in topic partition state
  KafkaTopicPartitionState<TopicAndPartition>[] partitions = subscribedPartitionStates();
  for (KafkaTopicPartitionState<TopicAndPartition> partition : partitions) {
    Long offset = offsets.get(partition.getKafkaTopicPartition());
    if (offset != null) {
      partition.setCommittedOffset(offset);
    }
  }
}

代码示例来源:origin: org.apache.flink/flink-connector-kafka-0.8_2.11

@Override
protected void doCommitInternalOffsetsToKafka(
    Map<KafkaTopicPartition, Long> offsets,
    @Nonnull KafkaCommitCallback commitCallback) throws Exception {
  ZookeeperOffsetHandler zkHandler = this.zookeeperOffsetHandler;
  if (zkHandler != null) {
    try {
      // the ZK handler takes care of incrementing the offsets by 1 before committing
      zkHandler.prepareAndCommitOffsets(offsets);
      commitCallback.onSuccess();
    }
    catch (Exception e) {
      if (running) {
        commitCallback.onException(e);
        throw e;
      } else {
        return;
      }
    }
  }
  // Set committed offsets in topic partition state
  for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitionStates()) {
    Long offset = offsets.get(partition.getKafkaTopicPartition());
    if (offset != null) {
      partition.setCommittedOffset(offset);
    }
  }
}

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com