gpt4 book ai didi

java - 当主题有多个分区时,KTable-KTable 外键连接不会产生所有消息

转载 作者:行者123 更新时间:2023-12-01 11:21:09 25 4
gpt4 key购买 nike

请参阅下面的更新以显示潜在的解决方法
我们的应用程序使用 2 个主题作为 KTables,执行左连接,并输出到一个主题。在测试期间,我们发现当我们的输出主题只有 1 个分区时,这可以正常工作。当我们增加分区数量时,我们注意到生成到输出主题的消息数量减少了。
在启动应用程序之前,我们使用多个分区配置测试了这个理论。使用 1 个分区,我们可以看到 100% 的消息。使用 2,我们会看到一些消息(少于 50%)。对于 10,我们几乎看不到任何东西(少于 10%)。
因为我们离开了,所以从主题 1 消费的每条消息都应该写入我们的输出主题,但我们发现这并没有发生。消息似乎卡在从 Ktables 的外键连接创建的“中间”主题中,但没有错误消息。
任何帮助将不胜感激!
服务.java

@Bean
public BiFunction<KTable<MyKey, MyValue>, KTable<MyOtherKey, MyOtherValue>, KStream<MyKey, MyEnrichedValue>> process() {

return (topicOne, topicTwo) ->
topicOne
.leftJoin(topicTwo,
value -> MyOtherKey.newBuilder()
.setFieldA(value.getFieldA())
.setFieldB(value.getFieldB())
.build(),
this::enrich)
.toStream();
}
build.gradle
plugins {
id 'org.springframework.boot' version '2.3.1.RELEASE'
id 'io.spring.dependency-management' version '1.0.9.RELEASE'
id 'com.commercehub.gradle.plugin.avro' version '0.9.1'
}

...

ext {
set('springCloudVersion', "Hoxton.SR6")
}

...

implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka-streams'
implementation 'io.confluent:kafka-streams-avro-serde:5.5.1'
注意:由于 spring-cloud-stream 中包含的版本中存在错误,我们排除了 org.apache.kafka 依赖项
application.yml
spring:
application:
name: app-name
stream:
bindings:
process-in-0:
destination: topic1
group: ${spring.application.name}
process-in-1:
destination: topic2
group: ${spring.application.name}
process-out-0:
destination: outputTopic
kafka:
streams:
binder:
applicationId: ${spring.application.name}
brokers: ${KAFKA_BROKERS}
configuration:
commit.interval.ms: 1000
producer:
acks: all
retries: 20
default:
key:
serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
value:
serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
min-partition-count: 2
测试场景:
举一个具体的例子,如果我将以下 3 条消息发布到主题 1:
{"fieldA": 1, "fieldB": 1},,{"fieldA": 1, "fieldB": 1}
{"fieldA": 2, "fieldB": 2},,{"fieldA": 2, "fieldB": 2}
{"fieldA": 3, "fieldB": 3},,{"fieldA": 3, "fieldB": 3}
{"fieldA": 4, "fieldB": 4},,{"fieldA": 4, "fieldB": 4}
输出主题只会收到 2 条消息。
{"fieldA": 2, "fieldB": 2},,{"fieldA": 2, "fieldB": 2}
{"fieldA": 3, "fieldB": 3},,{"fieldA": 3, "fieldB": 3}
其他2人怎么了?似乎某些键/值对无法写入输出主题。重试这些“丢失”的消息也不起作用。
更新:
通过将主题 1 用作 KStream 而不是 KTable 并调用 toTable(),我能够正常运行此功能。在继续进行 KTable-KTable 连接之前。我仍然不确定为什么我的原始解决方案不起作用,但希望这个解决方法可以对实际问题有所了解。
@Bean
public BiFunction<KStream<MyKey, MyValue>, KTable<MyOtherKey, MyOtherValue>, KStream<MyKey, MyEnrichedValue>> process() {

return (topicOne, topicTwo) ->
topicOne
.map(...)
.toTable()
.leftJoin(topicTwo,
value -> MyOtherKey.newBuilder()
.setFieldA(value.getFieldA())
.setFieldB(value.getFieldB())
.build(),
this::enrich)
.toStream();
}

最佳答案

根据问题的描述,(左)KTable 输入主题中的数据似乎没有按其键正确分区。对于单个分区的主题,嗯,只有一个分区,所有数据都转到这个分区,联接结果就完成了。
但是,对于多分区输入主题,您需要确保数据按键进行分区,否则,具有相同键的两条记录可能会在不同的分区中结束,从而连接失败(因为连接是在每个-分区基础)。
请注意,即使外键连接不需要两个输入主题共同分区,仍然需要每个输入主题本身按其键进行分区!
如果您使用 map().toTable()您基本上会触发数据的内部重新分区,以确保数据按键进行分区,这解决了问题。

关于java - 当主题有多个分区时,KTable-KTable 外键连接不会产生所有消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62884230/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com