- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我开始使用 Apache Kakfa,使用 Java 编写一个简单的生产者、消费者应用程序。我正在使用kafka-clients
版本0.10.0.1
并在 Mac 上运行它。
我创建了一个名为 replicated_topic_partitioned
的主题有 3 个分区,复制因子为 3。
我在端口 2181 启动了 Zookeeper。我分别在端口 9092、9093 和 9094 上启动了 id 为 1、2 和 3 的三个代理。
这是描述命令的输出
kafka_2.12-2.3.0/bin/kafka-topics.sh --describe --topic replicated_topic_partitioned --bootstrap-server localhost:9092
Topic:replicated_topic_partitioned PartitionCount:3 ReplicationFactor:3 Configs:segment.bytes=1073741824
Topic: replicated_topic_partitioned Partition: 0 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
Topic: replicated_topic_partitioned Partition: 1 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: replicated_topic_partitioned Partition: 2 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
我编写了一个简单的生产者和消费者代码。生产者成功运行并发布了消息。但是当我启动消费者时,轮询调用只是无限期地等待。在调试时,我发现它在 ConsumerNetworkClient 上的awaitMetadataUpdate 方法处不断循环。
这是生产者和消费者的代码
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> myProducer = new KafkaProducer<>(properties);
DateFormat dtFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss:SSS");
String topic = "replicated_topic_partitioned";
int numberOfRecords = 10;
try {
for (int i = 0; i < numberOfRecords; i++) {
String message = String.format("Message: %s sent at %s", Integer.toString(i), dtFormat.format(new Date()));
System.out.println("Sending " + message);
myProducer.send(new ProducerRecord<String, String>(topic, message));
}
} catch (Exception e) {
e.printStackTrace();
} finally {
myProducer.close();
}
消费者.java
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("group.id", UUID.randomUUID().toString());
properties.put("auto.offset.reset", "earliest");
KafkaConsumer<String, String> myConsumer = new KafkaConsumer<>(properties);
String topic = "replicated_topic_partitioned";
myConsumer.subscribe(Collections.singletonList(topic));
try {
while (true){
ConsumerRecords<String, String> records = myConsumer.poll(1000);
printRecords(records);
}
} finally {
myConsumer.close();
}
从 server.properties
添加一些关键字段
broker.id=1
host.name=localhost
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs-1
num.partitions=1
num.recovery.threads.per.data.dir=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
另外两个代理的 server.properties 是上述代理的副本,其中 Broker.id、端口和 log.dirs 已更改。
这对我不起作用: Kafka 0.9.0.1 Java Consumer stuck in awaitMetadataUpdate()
<小时/>但是,如果我从传递分区的命令行启动使用者,它会成功读取该分区的消息。但当仅指定主题时,它不会收到任何消息。
作品:
kafka_2.12-2.3.0/bin/kafka-console-consumer.sh --topic replicated_topic_partitioned --bootstrap-server localhost:9092
--from-beginning --partition 1
不起作用:
kafka_2.12-2.3.0/bin/kafka-console-consumer.sh --topic replicated_topic_partitioned --bootstrap-server localhost:9092
--from-beginning
注意:上述消费者非常适合复制因子等于 1 的主题。
问题:
为什么 Java Producer 不读取复制因子大于 1 的主题的任何消息(即使将其分配给分区)(如 myConsumer.assign(Collections.singletonList(new TopicPartition(topic, 2)
)?
为什么控制台使用者仅在传递分区时才读取消息(同样适用于复制因子为 1 的主题)
最佳答案
因此,您要发送 10 条记录,但所有 10 条记录都具有相同的 key :
for (int i = 0; i < numberOfRecords; i++) {
String message = String.format("Message: %s sent at %s", Integer.toString(i), dtFormat.format(new Date()));
System.out.println("Sending " + message);
myProducer.send(new ProducerRecord<String, String>(topic, message)); <--- KEY=topic
}
除非另有说明(通过直接在 ProducerRecord
上设置分区),记录传送到的分区由以下内容确定:
partition = murmur2(serialize(key)) % numPartitions
所以相同的键意味着相同的分区。
您是否尝试过在分区 0 和 2 上搜索 10 条记录?
如果您希望记录在分区之间更好地“分布”,请使用空键(您将获得循环)或可变键。
关于java - Apache Kafka Java 消费者未收到复制因子大于 1 的主题的消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57596903/
我不太确定为什么较大字符串(“cat”和“dog”)的答案不一致。我正在用链接列表和模板的使用做一些事情。我的好奇心促使我修改模板和函数重载。如果有人能解释发生了什么,我将不胜感激。谢谢你。 #inc
目前我必须编写这样的查询 SELECT * FROM table WHERE value1 > 5000 OR value2 > 5000 OR value3 > 5000 OR value4 > 5
我想创建一个如下所示的查询,但我不确定如何正确编码, 我希望它在开始时间的 1 小时内返回所有预订,这是我想出的: SELECT BookingId, StartTime FROM Booking W
这个问题已经有答案了: How to check if a number is between two values? (12 个回答) 已关闭 6 年前。 我目前正在 Codecademy 上学习
我想验证用户输入。如果用户输入的数字大于 3,则应抛出错误“Too high”,如果小于 0.15,则应抛出“Too low”错误。如果它在 3 到 0.15 之间,那么它应该显示“好的”。 我的代码
我有一个拖动脚本,我在其中拖动 div.slider,我正在跟踪 div.slider 的“左”值,并在它大于 68 时让它淡出,但问题是它当它达到 6 而不是 68 时淡出。如果我将数字更改为 85
是否有一种常见的模式如何在数据库(postgresql)中存储这样的条件,然后以简单的方式从数据库中获取这些数据,并在前端将其与我们在前端的值 SE 进行比较(以获得正确的“值” "): condit
如何大于/小于内部工作 如果我将 5 与 100 与 5 与 2,147,483,647 (Integer.MAX_VALUE) 进行比较,性能会受到多大影响 5 < 100 and 5 < Inte
当我运行此查询时它有效 SELECT sum( amount ) AS balance FROM balance WHERE amount >= 100 但是当我想过滤用户 ID 时,它返回 NULL
我有下表: account(id, balance, bank_branch) 我想选择账户余额大于其 bank_branch 平均余额的所有账户 我试过了 Select id from accoun
你们有没有人知道如何搜索所有大于指定数字的数字? 例如:所有单据编号>65 我试过这样:documentNumber: [65 TO *] 但我收到异常,因为 lucene 期望解析一个没有 * 的数
我正在使用 Prolog 算法,并且有一个生成抽象语法树的程序,例如 plus(num(1),num(2))这只是 1+2 .这是通过使用 DCG 来完成的。在这个例子中 plus(num(1),nu
是否使用 Sin(720) 或 Cos(1440)(以度为单位的角度)? 无论是在计算机编程中还是在任何其他情况下? 一般来说,是否有任何角度的 Sin/Cosine/Tan 使用 大于360? 在物
我发现了一些与此相关的问题,但没有一个真正回答了我的问题。 我有一个像这样的表格文件: 2 10610 0 0 0 0.0105292 2 10649 0 0 0
我是 Prolog 的新手,我正在尝试解决这个练习: Define a predicate greater_than/2 that takes two numerals in the notation
我想选择具有出现次数的不同键,此查询似乎有效: SELECT ItemMetaData.KEY, ItemMetaData.VALUE, count(*) FROM ItemMetaData GROU
我需要存储和使用大于 ULLONG_MAX 的数值。 我需要对这些值进行算术运算,所以我认为存储为 char** 不是一个选项。 在这些情况下,有没有办法动态创建额外的 long 前缀? 谢谢大家。根
我是 Prolog 的新手,我正在尝试解决这个练习: Define a predicate greater_than/2 that takes two numerals in the notation
处理已知大小但大于 64 位的位掩码(即执行所有位操作)的最有效的数据结构是什么? 字节[]? 大整数?完全是别的东西吗? 需要与 Java 7 兼容,并且对于诸如此类的事情应该很快(或者至少与合理预
编辑:抱歉进行了许多编辑。我自己都忘记写了什么了。 我使用 JPanel,将 BoxLayout 作为 JFrame 的根面板。我向此根面板添加了另外两个面板:带有 FlowLayou 的 Butto
我是一名优秀的程序员,十分优秀!