gpt4 book ai didi

java - 为什么我只能看到这个 Kafka 示例中的所有其他消息?

转载 作者:行者123 更新时间:2023-12-02 09:36:46 27 4
gpt4 key购买 nike

我正在尝试修改 spring cloud stream samples 之一我得到的结果令人困惑 - 尽管我只为我的 channel 注册了一个流监听器,但我只收到每秒一条消息。我怀疑这是由单个 kafka 分区的默认负载平衡引起的,但我不知道如何确认这一点。

docker ps 仅显示一个正在启动的 kafka 代理实例

CONTAINER ID        IMAGE                    COMMAND                  CREATED             STATUS              PORTS                                                NAMES
e058697a3bb2 wurstmeister/kafka "start-kafka.sh" 5 minutes ago Up 5 minutes 0.0.0.0:9092->9092/tcp kafka-uppercase-tx
d001389ddfa4 wurstmeister/zookeeper "/bin/sh -c '/usr/sb…" 5 minutes ago Up 5 minutes 22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp uppercasetransformer_zookeeper_1

检查 kafka 控制台消费者也会生成单一类型的响应,但这次只是 BAR:

/opt/kafka_2.12-2.1.0/bin # ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic output
BAR
BAR
BAR
BAR
BAR

检查消费者组描述和成员并没有显示任何其他消费者,所以我的负载平衡理论在这里失败了:

/opt/kafka_2.12-2.1.0/bin # ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group anonymous.0559a5b9-7876-4523-8e6c-74eb03d305a3

TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
input 0 0 0 0 consumer-2-955118a4-3597-48a3-8fdd-3823fc366068 /172.22.0.1 consumer-2
/opt/kafka_2.12-2.1.0/bin # ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group anonymous.0559a5b9-7876-4523-8e6c-74eb03d305a3 --members

CONSUMER-ID HOST CLIENT-ID #PARTITIONS
consumer-2-955118a4-3597-48a3-8fdd-3823fc366068 /172.22.0.1 consumer-2 1

我也看不出主题描述有什么问题:

/opt/kafka_2.12-2.1.0/bin # ./kafka-topics.sh --zookeeper zookeeper:2181 --describe --topic output
Topic:output PartitionCount:1 ReplicationFactor:1 Configs:
Topic: output Partition: 0 Leader: 1001 Replicas: 1001 Isr: 1001

为什么只有每隔一条消息才会传送到我的 output channel ?我如何自行检查这一点?

kafka-demo.java:

package demo;

import java.util.concurrent.atomic.AtomicLong;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.core.MessageSource;
import org.springframework.messaging.support.GenericMessage;

@EnableBinding(Processor.class)
public class UppercaseTransformer {

private static Logger logger = LoggerFactory.getLogger(UppercaseTransformer.class);

@ServiceActivator(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
public String transform(String payload) {
logger.info("transforming payload {}", payload);
return payload.toUpperCase();
}

static class TestSource {
private AtomicLong longSemaphore = new AtomicLong(0L);

@Bean
@InboundChannelAdapter(channel = "input", poller = @Poller(fixedDelay = "1000"))
public MessageSource<String> sendTestData() {
return () ->
{
final long semaphoreValue = longSemaphore.getAndIncrement();
final boolean condition = semaphoreValue % 2 == 0;
final String foobar = condition ? "foo" : "bar";
logger.info("semaphoreValue: {}, condition: {}, foobar: {}", semaphoreValue, condition, foobar);
return new GenericMessage<>(foobar);
};

}

@StreamListener(Processor.OUTPUT)
public void receive(String payload) {
logger.info("Data received: {}", payload);
}
}
}

日志:

2019-08-05 22:48:02.971  INFO 28843 --- [ask-scheduler-2] demo.UppercaseTransformer                : semaphoreValue: 2, condition: true, foobar: foo
2019-08-05 22:48:02.972 INFO 28843 --- [ask-scheduler-2] demo.UppercaseTransformer : transforming payload foo
2019-08-05 22:48:02.972 INFO 28843 --- [ask-scheduler-2] demo.UppercaseTransformer : Data received: FOO
2019-08-05 22:48:03.973 INFO 28843 --- [ask-scheduler-1] demo.UppercaseTransformer : semaphoreValue: 3, condition: false, foobar: bar
2019-08-05 22:48:03.974 INFO 28843 --- [ask-scheduler-1] demo.UppercaseTransformer : transforming payload bar
2019-08-05 22:48:04.976 INFO 28843 --- [ask-scheduler-3] demo.UppercaseTransformer : semaphoreValue: 4, condition: true, foobar: foo
2019-08-05 22:48:04.977 INFO 28843 --- [ask-scheduler-3] demo.UppercaseTransformer : transforming payload foo
2019-08-05 22:48:04.977 INFO 28843 --- [ask-scheduler-3] demo.UppercaseTransformer : Data received: FOO
2019-08-05 22:48:05.978 INFO 28843 --- [ask-scheduler-2] demo.UppercaseTransformer : semaphoreValue: 5, condition: false, foobar: bar
2019-08-05 22:48:05.978 INFO 28843 --- [ask-scheduler-2] demo.UppercaseTransformer : transforming payload bar
2019-08-05 22:48:06.980 INFO 28843 --- [ask-scheduler-4] demo.UppercaseTransformer : semaphoreValue: 6, condition: true, foobar: foo
2019-08-05 22:48:06.981 INFO 28843 --- [ask-scheduler-4] demo.UppercaseTransformer : transforming payload foo
2019-08-05 22:48:06.982 INFO 28843 --- [ask-scheduler-4] demo.UppercaseTransformer : Data received: FOO
2019-08-05 22:48:07.982 INFO 28843 --- [ask-scheduler-1] demo.UppercaseTransformer : semaphoreValue: 7, condition: false, foobar: bar
2019-08-05 22:48:07.983 INFO 28843 --- [ask-scheduler-1] demo.UppercaseTransformer : transforming payload bar

应用程序本地.yml:

spring:
cloud:
stream:
# bindings:
# output:
# destination: xformed
# test-sink:
# destination: xformed
# input:
# destination: testtock
# test-source:
# destination: testtock
default-binder: kafka

最佳答案

您在output channel 上有两个使用者 - 到主题的绑定(bind)和您的receive() 服务激活器。

默认的循环处理会交替向您的服务激活器和主题发送消息。

关于java - 为什么我只能看到这个 Kafka 示例中的所有其他消息?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57443978/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com