gpt4 book ai didi

apache-kafka - 卡夫卡流 : Define multiple Kafka Streams using Spring Cloud Stream for each set of topics

转载 作者:行者123 更新时间:2023-12-05 08:37:20 27 4
gpt4 key购买 nike

我正在尝试使用 Kafka Streams 做一个简单的 POC。但是我在启动应用程序时遇到异常。我正在使用 Spring-Kafka,Kafka-Streams 2.5.1 和 Spring boot 2.3.5Kafka流配置

@Configuration
public class KafkaStreamsConfig {
private static final Logger log = LoggerFactory.getLogger(KafkaStreamsConfig.class);

@Bean
public Function<KStream<String, String>, KStream<String, String>> processAAA() {
return input -> input.peek((key, value) -> log
.info("AAA Cloud Stream Kafka Stream processing : {}", input.toString().length()));
}

@Bean
public Function<KStream<String, String>, KStream<String, String>> processBBB() {
return input -> input.peek((key, value) -> log
.info("BBB Cloud Stream Kafka Stream processing : {}", input.toString().length()));
}

@Bean
public Function<KStream<String, String>, KStream<String, String>> processCCC() {
return input -> input.peek((key, value) -> log
.info("CCC Cloud Stream Kafka Stream processing : {}", input.toString().length()));
}

/*
@Bean
public KafkaStreams kafkaStreams(KafkaProperties kafkaProperties) {
final Properties props = new Properties();
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "groupId-1"););
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class);
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, JsonNode.class);
final KafkaStreams kafkaStreams = new KafkaStreams(kafkaStreamTopology(), props);
kafkaStreams.start();
return kafkaStreams;
}

@Bean
public Topology kafkaStreamTopology() {
final StreamsBuilder streamsBuilder = new StreamsBuilder();
streamsBuilder.stream(Arrays.asList(AAATOPIC, BBBInputTOPIC, CCCInputTOPIC));
return streamsBuilder.build();
} */
}

application.yaml 配置如下。这个想法是我有 3 个输入和 3 个输出主题。该组件从输入主题获取输入并将输出提供给输出主题。

spring:
application.name: consumerapp-1
cloud:
function:
definition: processAAA;processBBB;processCCC
stream:
kafka.binder:
brokers: 127.0.0.1:9092
autoCreateTopics: true
auto-add-partitions: true
kafka.streams.binder:
configuration:
commit.interval.ms: 1000
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
bindings:
processAAA-in-0:
destination: aaaInputTopic
processAAA-out-0:
destination: aaaOutputTopic
processBBB-in-0:
destination: bbbInputTopic
processBBB-out-0:
destination: bbbOutputTopic
processCCC-in-0:
destination: cccInputTopic
processCCC-out-0:
destination: cccOutputTopic

抛出的异常是

Caused by: java.lang.IllegalArgumentException: Trying to prepareConsumerBinding public abstract void org.apache.kafka.streams.kstream.KStream.to(java.lang.String,org.apache.kafka.streams.kstream.Produced)  but no delegate has been set.
at org.springframework.util.Assert.notNull(Assert.java:201)
at org.springframework.cloud.stream.binder.kafka.streams.KStreamBoundElementFactory$KStreamWrapperHandler.invoke(KStreamBoundElementFactory.java:134)

任何人都可以帮助我处理多个输入和输出主题的 Kafka Streams Spring-Kafka 代码示例。

更新:2021 年 1 月 21 日

删除所有 kafkaStreams 和 kafkaStreamsTopology bean 配置后,我在无限循环中收到以下消息。消息消费仍然不起作用。我已经使用 @Bean 函数定义检查了 application.yaml 中的订阅。他们对我来说都很好,但我仍然遇到这个交叉接线错误。 我已经用上面的 application.yaml 替换了 application.properties

    [consumerapp-1-75eec5e5-2772-4999-acf2-e9ef1e69f100-StreamThread-1] [Consumer clientId=consumerapp-1-75eec5e5-2772-4999-acf2-e9ef1e69f100-StreamThread-1-consumer, groupId=consumerapp-1] We received an assignment [cccParserTopic-0] that doesn't match our current subscription Subscribe(bbbParserTopic); it is likely that the subscription has changed since we joined the group. Will try re-join the group with current subscription
2021-01-21 14:12:43,336 WARN org.apache.kafka.clients.consumer.internals.ConsumerCoordinator [consumerapp-1-75eec5e5-2772-4999-acf2-e9ef1e69f100-StreamThread-1] [Consumer clientId=consumerapp-1-75eec5e5-2772-4999-acf2-e9ef1e69f100-StreamThread-1-consumer, groupId=consumerapp-1] We received an assignment [cccParserTopic-0] that doesn't match our current subscription Subscribe(bbbParserTopic); it is likely that the subscription has changed since we joined the group. Will try re-join the group with current subscription

最佳答案

我已经设法解决了这个问题。我写这篇文章是为了其他人的利益。如果您想在单个应用程序 jar 中包含多个流,那么关键是定义多个应用程序 ID,每个流一个。我一直都知道这一点,但我不知道如何定义它。最后,答案是我在阅读 SCSt 文档后设法挖掘出来的。下面是 application.yaml 的定义方式。application.yaml 如下所示

spring:
application.name: kafkaMultiStreamConsumer
cloud:
function:
definition: processAAA; processBBB; processCCC --> // needed for Imperative @StreamListener
stream:
kafka:
binder:
brokers: 127.0.0.1:9092
min-partition-count: 3
replication-factor: 2
transaction:
transaction-id-prefix: transaction-id-2000
autoCreateTopics: true
auto-add-partitions: true
streams:
binder:
functions:
// needed for functional
processBBB:
application-id: SampleBBBapplication
processAAA:
application-id: SampleAAAapplication
processCCC:
application-id: SampleCCCapplication
configuration:
commit.interval.ms: 1000
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
bindings:
// Below is for Imperative Style programming using
// the annotation namely @StreamListener, @SendTo in .java class
inputAAA:
destination: aaaInputTopic
outputAAA:
destination: aaaOutputTopic
inputBBB:
destination: bbbInputTopic
outputBBB:
destination: bbbOutputTopic
inputCCC:
destination: cccInputTopic
outputCCC:
destination: cccOutputTopic
// Functional Style programming using Function<KStream...> use either one of them
// as both are not required. If you use both its ok but only one of them works
// from what i have seen @StreamListener is triggered always.
// Below is from functional style
processAAA-in-0:
destination: aaaInputTopic
group: processAAA-group
processAAA-out-0:
destination: aaaOutputTopic
group: processAAA-group
processBBB-in-0:
destination: bbbInputTopic
group: processBBB-group
processBBB-out-0:
destination: bbbOutputTopic
group: processBBB-group
processCCC-in-0:
destination: cccInputTopic
group: processCCC-group
processCCC-out-0:
destination: cccOutputTopic
group: processCCC-group

定义完上面的内容后,我们现在需要定义实现流处理逻辑的各个 java 类。您的 Java 类可以如下所示。根据您的要求为其他 2 个或 N 个流创建类似的。一个例子如下:AAASampleStreamTask.java

@Component
@EnableBinding(AAASampleChannel.class) // One Channel interface corresponding to in-topic and out-topic
public class AAASampleStreamTask {
private static final Logger log = LoggerFactory.getLogger(AAASampleStreamTask.class);

@StreamListener(AAASampleChannel.INPUT)
@SendTo(AAASampleChannel.OUTPUT)
public KStream<String, String> processAAA(KStream<String, String> input) {
input.foreach((key, value) -> log.info("Annotation AAA *Sample* Cloud Stream Kafka Stream processing {}", String.valueOf(System.currentTimeMillis())));
...
// do other business logic
...
return input;
}

/**
* Use above or below. Below style is latest startting from ScSt 3.0 if iam not
* wrong. 2 different styles of consuming Kafka Streams using SCSt. If we have
* both then above gets priority as per my observation
*/
/*
@Bean
public Function<KStream<String, String>, KStream<String, String>> processAAA() {
return input -> input.peek((key, value) -> log.info(
"Functional AAA *Sample* Cloud Stream Kafka Stream processing : {}", String.valueOf(System.currentTimeMillis())));
...
// do other business logic
...
}
*/
}

如果您想使用命令式编程而不是功能性编程,则需要 channel 。AAASampleChannel.java

public interface AAASampleChannel {
String INPUT = "inputAAA";
String OUTPUT = "outputAAA";

@Input(INPUT)
KStream<String, String> inputAAA();

@Output(OUTPUT)
KStream<String, String> outputAAA();
}

关于apache-kafka - 卡夫卡流 : Define multiple Kafka Streams using Spring Cloud Stream for each set of topics,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65792786/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com