gpt4 book ai didi

apache-kafka - 如何在 Kafka 流中获取当前的 Kafka 主题?

转载 作者:行者123 更新时间:2023-12-04 14:24:39 26 4
gpt4 key购买 nike

我的场景是我使用了很多共享前缀的 Kafka 主题(例如 house.door, house.room )
并使用 Kafka 流正则表达式主题模式 API 使用所有主题。
一切看起来都不错,我得到了数据的 key 和消息。

为了处理数据,我需要主题名称,以便我可以根据主题名称进行连接,
但我不知道如何在 Kafka 流 DSL 中获取主题名称。

解决我的问题的一种可能方法是在我的消息中保存主题名称。
但是如果能直接得到主题名称就更好了。

那么,如何在 Kafka 流中获取当前的 Kafka 主题?

最佳答案

为了添加到 Matthias J. Sax 点,我附上了示例代码来展示它是如何完成的。

public static void main(final String[] args) {
try {
final Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streamProcessor");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.STATE_DIR_CONFIG, "state-store");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000);
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
StreamsBuilder streamsBuilder = new StreamsBuilder();
final KStream<String, String> textLines = streamsBuilder.stream(inputTopicList);
final KStream<String, String> textLines = builder.stream(inputTopiclist);
textLines.transform(getTopicDetailsTransformer::new)
.foreach(new ForeachAction<String, String>() {
public void apply(String key, String value) {
System.out.println(key + ": " + value);
}
});
textLines.to(outputTopic);
} catch (Exception e) {
System.out.println(e);
}
}
private static class getTopicDetailsTransformer implements Transformer<String, String, KeyValue<String, String>> {

private ProcessorContext context;

@Override
public void init(final ProcessorContext context) {
this.context = context;
}

public KeyValue<String, String> transform(final String recordKey, final String recordValue) {

//here i am returning key as topic name.
return KeyValue.pair(context.topic(), recordValue);
}

@Override
public void close() {
// Not needed.
}

}

关于apache-kafka - 如何在 Kafka 流中获取当前的 Kafka 主题?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48333706/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com