- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我有 Java 8 应用程序与 Apache Kafka 2.11-0.10.1.0 一起使用。我需要使用 seek
功能来poll
来自分区的旧消息。但是,我遇到了没有当前分区分配
的异常,每次我尝试seekByOffset
时都会发生这种情况。这是我的类,它负责seek
将主题搜索到指定的时间戳:
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;
import java.time.Instant;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
/**
* The main purpose of this class is to move fetching point for each partition of the {@link KafkaConsumer}
* to some offset which is determined either by timestamp or by offset number.
*/
public class KafkaSeeker {
public static final long APP_STARTUP_TIME = Instant.now().toEpochMilli();
private final Logger LOGGER = LoggerFactory.getLogger(this.getClass());
private final KafkaConsumer<String, String> kafkaConsumer;
private ConsumerRecords<String, String> polledRecords;
public KafkaSeeker(KafkaConsumer<String, String> kafkaConsumer) {
this.kafkaConsumer = kafkaConsumer;
this.polledRecords = new ConsumerRecords<>(Collections.emptyMap());
}
/**
* For each assigned or subscribed topic {@link org.apache.kafka.clients.consumer.KafkaConsumer#seek(TopicPartition, long)}
* fetching pointer to the specified {@code timestamp}.
* If no messages were found in each partition for a topic,
* then {@link org.apache.kafka.clients.consumer.KafkaConsumer#seekToEnd(Collection)} will be called.
*
* Due to {@link KafkaConsumer#subscribe(Pattern)} and {@link KafkaConsumer#assign(Collection)} laziness
* method needs to execute dummy {@link KafkaConsumer#poll(long)} method. All {@link ConsumerRecords} which were
* polled from buffer are swallowed and produce warning logs.
*
* @param timestamp is used to find proper offset to seek to
* @param topics are used to seek only specific topics. If not specified or empty, all subscribed topics are used.
*/
public Map<TopicPartition, OffsetAndTimestamp> seek(long timestamp, Collection<String> topics) {
this.polledRecords = kafkaConsumer.poll(0);
Collection<TopicPartition> topicPartitions;
if (CollectionUtils.isEmpty(topics)) {
topicPartitions = kafkaConsumer.assignment();
} else {
topicPartitions = topics.stream()
.map(it -> {
List<Integer> partitions = kafkaConsumer.partitionsFor(it).stream()
.map(PartitionInfo::partition).collect(Collectors.toList());
return partitions.stream().map(partition -> new TopicPartition(it, partition));
})
.flatMap(it -> it)
.collect(Collectors.toList());
}
if (topicPartitions.isEmpty()) {
throw new IllegalStateException("Kafka consumer doesn't have any subscribed topics.");
}
Map<TopicPartition, Long> timestampsByTopicPartitions = topicPartitions.stream()
.collect(Collectors.toMap(Function.identity(), topicPartition -> timestamp));
Map<TopicPartition, Long> beginningOffsets = kafkaConsumer.beginningOffsets(topicPartitions);
Map<TopicPartition, OffsetAndTimestamp> offsets = kafkaConsumer.offsetsForTimes(timestampsByTopicPartitions);
for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : offsets.entrySet()) {
TopicPartition topicPartition = entry.getKey();
if (entry.getValue() != null) {
LOGGER.info("Kafka seek topic:partition [{}:{}] from [{} offset] to [{} offset].",
topicPartition.topic(),
topicPartition.partition(),
beginningOffsets.get(topicPartition),
entry.getValue());
kafkaConsumer.seek(topicPartition, entry.getValue().offset());
} else {
LOGGER.info("Kafka seek topic:partition [{}:{}] from [{} offset] to the end of partition.",
topicPartition.topic(),
topicPartition.partition());
kafkaConsumer.seekToEnd(Collections.singleton(topicPartition));
}
}
return offsets;
}
public ConsumerRecords<String, String> getPolledRecords() {
return polledRecords;
}
}
在调用该方法之前,我让消费者订阅了一个主题,例如 consumer.subscribe(singletonList(kafkaTopic));
。当我得到kafkaConsumer.assignment()
时,它返回分配的零TopicPartition
。但是,如果我指定主题并获取其分区,那么我就有有效的 TopicPartition
,尽管它们在 seek
调用上失败并出现标题中的错误。我忘记了什么?
最佳答案
可靠地查找和检查当前分配的正确方法是在订阅后等待 onPartitionsAssigned()
回调。在新创建的(尚未连接)消费者上,调用 poll()
一次并不能保证它将立即连接并分配分区。
作为一个基本示例,请参阅下面的代码,该代码订阅主题,并在分配的回调中寻找所需的位置。最后,您会注意到轮询循环正确地只能看到来自查找位置的记录,而不是来自先前提交或重置偏移量的记录。
public static final Map<TopicPartition, Long> offsets = Map.of(new TopicPartition("testtopic", 0), 5L);
public static void main(String args[]) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
try (Consumer<String, String> consumer = new KafkaConsumer<>(props)) {
consumer.subscribe(Collections.singletonList("testtopic"), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
System.out.println("Assigned " + partitions);
for (TopicPartition tp : partitions) {
OffsetAndMetadata oam = consumer.committed(tp);
if (oam != null) {
System.out.println("Current offset is " + oam.offset());
} else {
System.out.println("No committed offsets");
}
Long offset = offsets.get(tp);
if (offset != null) {
System.out.println("Seeking to " + offset);
consumer.seek(tp, offset);
}
}
}
});
for (int i = 0; i < 10; i++) {
System.out.println("Calling poll");
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100L));
for (ConsumerRecord<String, String> r : records) {
System.out.println("record from " + r.topic() + "-" + r.partition() + " at offset " + r.offset());
}
}
}
}
关于java - 即使在 Kafka 中进行轮询后,当前也不会发生分区分配,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54480715/
我有一个应用程序,它会抛出 GKSession 并在各种条件下(连接超时、 session 失败等)创建一个新的 GKSession。不过,我遇到了内存泄漏问题,并且有时会在重新连接几次循环后崩溃。
比如我在宿主代码中有一个浮点指针 float *p 是否可以确定他指向的内存类型(设备/主机)? 最佳答案 在 UVA system 中, 运行时 API 函数 cudaPointerGetAttri
我已将项目转换为 .Net 4.0 并且以下代码不起作用: typeof(RuntimeTypeHandle).GetMethod("Allocate", BindingFlags.Instance
当我声明 char ch = 'ab' 时,ch 只包含 'b',为什么它不存储 'a'? #include int main() { char ch = 'ab'; printf("%c"
我对 Disk Sector 和 Block 有疑问。扇区是一个单位,通常为 512 字节或 1k、2k、4k 等取决于硬件。文件系统 block 大小是一组扇区大小。 假设我正在存储一个 5KB 的
假设我有 8 个人和5000 个苹果。 我想将所有苹果分发给所有 8 个人,这样我就没有苹果了。 但每个人都应该得到不同数量 将它们全部分发出去的最佳方式是什么? 我是这样开始的: let peopl
我正在构建的网站顶部有一个搜索栏。与 Trello 或 Gmail 类似,我希望当用户按下“/”键时,他们的焦点就会转到该搜索框。 我的 JavaScript 看起来像这样: document.onk
我有一小段代码: if (PZ_APP.dom.isAnyDomElement($textInputs)){ $textInputs.on("focus", function(){
我观察到以下行为。 接受了两个属性变量。 @property (nonatomic, retain) NSString *stringOne; @property (nonatomic, assign
我正在解决这样的问题 - 实现一个计算由以下内容组成的表达式的函数以下操作数:“(”、“)”、“+”、“-”、“*”、“/”。中的每个数字表达式可能很大(与由字符串表示的一样大)1000 位)。 “/
我有一组主机和一组任务。 每个主机都有 cpu、mem 和任务容量,每个任务都有 cpu、mem 要求。 每个主机都属于一个延迟类别,并且可以与具有特定延迟值的其他主机通信。 每个任务可能需要以等于或
该程序的作用:从文件中读取一个包含 nrRows 行和 nrColomns 列的矩阵(二维数组)。矩阵的所有元素都是 [0,100) 之间的整数。程序必须重新排列矩阵内的所有元素,使每个元素等于其所在
世界!我有个问题。今天我尝试创建一个代码,它可以找到加泰罗尼亚语号码。但是在我的程序中可以是长数字。我找到了分子和分母。但我不能分割长数字!此外,只有标准库必须在此程序中使用。请帮帮我。这是我的代码
我确定我遗漏了一些明显的东西,但我想在 Objective C 中创建一个 NSInteger 指针的实例。 -(NSInteger*) getIntegerPointer{ NSInteger
这个问题在这里已经有了答案: Difference between self.ivar and ivar? (4 个答案) 关闭 9 年前。
我如何将 v[i] 分配给一系列整数(v 的类型是 vector )而无需最初填充 最佳答案 你的意思是将 std::vector 初始化为一系列整数? int i[] = {1, 2, 3, 4,
我想寻求分配方面的帮助....我把这个作业带到了学校......我必须编写程序来加载一个 G 矩阵和第二个 G 矩阵,并搜索第二个 G 矩阵以获取存在数第一个 G 矩阵的......但是,当我尝试运行
我必须管理资源。它基本上是一个唯一的编号,用于标识交换机中的第 2 层连接。可以有 16k 个这样的连接,因此每次用户希望配置连接时,他/她都需要分配一个唯一索引。同样,当用户希望删除连接时,资源(号
是否有任何通用的命名约定来区分已分配和未分配的字符串?我正在寻找的是希望类似于 us/s 来自 Making Wrong Code Look Wrong ,但我宁愿使用常见的东西也不愿自己动手。 最佳
我需要读取一个 .txt 文件并将文件中的每个单词分配到一个结构中,该结构从结构 vector 指向。我将在下面更好地解释。 感谢您的帮助。 我的程序只分配文件的第一个字... 我知道问题出在函数 i
我是一名优秀的程序员,十分优秀!