gpt4 book ai didi

java - 是否可以对 GlobalKTable 进行 ReKey?

转载 作者:搜寻专家 更新时间:2023-11-01 03:15:29 24 4
gpt4 key购买 nike

我想 ReKey 一个 GlobalKTable(可能是在初始化它的时候,因为我相信它们一旦创建就只读)。

这可能吗?

我在 Spring/Java Kafka Streams 应用程序中处理了两个主题。第一个没有压缩,第二个是。两者都使用 Avro 作为它们的键和值。

应用程序从第一个(未压缩的)主题流式传输记录,并通过 KStream#leftJoin 附加来自压缩主题的其他数据。压缩主题已作为 GlobalKTable 引入应用程序,通过 StreamsBuilder#globalTable() 创建并且需要保持这种方式(我需要主题的所有分区中的每条记录在每个实例中可用应用程序)。

我知道有人谈论支持非主键连接 (https://issues.apache.org/jira/browse/KAFKA-3705),但据我所知,我还不能这样做......

@Configuration
@EnableKafkaStreams
public class StreamsConfig {

@Autowired
private MyCustomSerdes serdes;

@Bean
public KStream<AvroKeyOne, AvroValueOne> reKeyJoin(StreamsBuilder streamsBuilder) {

GlobalKTable<AvroKeyOne, AvroValueOne> globalTable = streamsBuilder.globalTable("topicOne", Consumed.with(
serdes.getAvroKeyOne()
serdes.getAvroValueOne()
));

KStream<AvroKeyTwo, AvroValueOne> kStream = streamsBuilder.stream("topicTwo", Consumed.with(
serdes.getAvroKeyTwo(),
serdes.getAvroValueOne()
));

kStream.join(
globalTable,
/**
* the KeyValueMapper. I need to rekey the Global table as well to the
* corresponding String (which it's data will have) if I want this join
* to return results
*/
(streamKey, streamValue) -> {return streamKey.getNewStringKey()},
(/**ValueJoiner Deal**/)
);
}

}

最佳答案

I want to ReKey a GlobalKTable (probably while initializing it, as I believe they are read only once created).

Is this possible?

目前没有对此的直接支持。您已经提到了即将开展的工作,例如添加 support to global tables for non-primary-key joins , 但这还不可用。

您今天可以做什么:您可以将原始 Kafka 主题重新键入(重新分区)为一个新主题,然后将重新键入的主题读入您的全局 KTable。也许这是您的一个选择。

关于java - 是否可以对 GlobalKTable 进行 ReKey?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55718395/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com