- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在尝试修改 spring cloud stream samples 之一我得到的结果令人困惑 - 尽管我只为我的 channel 注册了一个流监听器,但我只收到每秒一条消息。我怀疑这是由单个 kafka 分区的默认负载平衡引起的,但我不知道如何确认这一点。
docker ps
仅显示一个正在启动的 kafka 代理实例
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
e058697a3bb2 wurstmeister/kafka "start-kafka.sh" 5 minutes ago Up 5 minutes 0.0.0.0:9092->9092/tcp kafka-uppercase-tx
d001389ddfa4 wurstmeister/zookeeper "/bin/sh -c '/usr/sb…" 5 minutes ago Up 5 minutes 22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp uppercasetransformer_zookeeper_1
检查 kafka 控制台消费者也会生成单一类型的响应,但这次只是 BAR
:
/opt/kafka_2.12-2.1.0/bin # ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic output
BAR
BAR
BAR
BAR
BAR
检查消费者组描述和成员并没有显示任何其他消费者,所以我的负载平衡理论在这里失败了:
/opt/kafka_2.12-2.1.0/bin # ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group anonymous.0559a5b9-7876-4523-8e6c-74eb03d305a3
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
input 0 0 0 0 consumer-2-955118a4-3597-48a3-8fdd-3823fc366068 /172.22.0.1 consumer-2
/opt/kafka_2.12-2.1.0/bin # ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group anonymous.0559a5b9-7876-4523-8e6c-74eb03d305a3 --members
CONSUMER-ID HOST CLIENT-ID #PARTITIONS
consumer-2-955118a4-3597-48a3-8fdd-3823fc366068 /172.22.0.1 consumer-2 1
我也看不出主题描述有什么问题:
/opt/kafka_2.12-2.1.0/bin # ./kafka-topics.sh --zookeeper zookeeper:2181 --describe --topic output
Topic:output PartitionCount:1 ReplicationFactor:1 Configs:
Topic: output Partition: 0 Leader: 1001 Replicas: 1001 Isr: 1001
为什么只有每隔一条消息才会传送到我的 output
channel ?我如何自行检查这一点?
kafka-demo.java:
package demo;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.core.MessageSource;
import org.springframework.messaging.support.GenericMessage;
@EnableBinding(Processor.class)
public class UppercaseTransformer {
private static Logger logger = LoggerFactory.getLogger(UppercaseTransformer.class);
@ServiceActivator(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
public String transform(String payload) {
logger.info("transforming payload {}", payload);
return payload.toUpperCase();
}
static class TestSource {
private AtomicLong longSemaphore = new AtomicLong(0L);
@Bean
@InboundChannelAdapter(channel = "input", poller = @Poller(fixedDelay = "1000"))
public MessageSource<String> sendTestData() {
return () ->
{
final long semaphoreValue = longSemaphore.getAndIncrement();
final boolean condition = semaphoreValue % 2 == 0;
final String foobar = condition ? "foo" : "bar";
logger.info("semaphoreValue: {}, condition: {}, foobar: {}", semaphoreValue, condition, foobar);
return new GenericMessage<>(foobar);
};
}
@StreamListener(Processor.OUTPUT)
public void receive(String payload) {
logger.info("Data received: {}", payload);
}
}
}
日志:
2019-08-05 22:48:02.971 INFO 28843 --- [ask-scheduler-2] demo.UppercaseTransformer : semaphoreValue: 2, condition: true, foobar: foo
2019-08-05 22:48:02.972 INFO 28843 --- [ask-scheduler-2] demo.UppercaseTransformer : transforming payload foo
2019-08-05 22:48:02.972 INFO 28843 --- [ask-scheduler-2] demo.UppercaseTransformer : Data received: FOO
2019-08-05 22:48:03.973 INFO 28843 --- [ask-scheduler-1] demo.UppercaseTransformer : semaphoreValue: 3, condition: false, foobar: bar
2019-08-05 22:48:03.974 INFO 28843 --- [ask-scheduler-1] demo.UppercaseTransformer : transforming payload bar
2019-08-05 22:48:04.976 INFO 28843 --- [ask-scheduler-3] demo.UppercaseTransformer : semaphoreValue: 4, condition: true, foobar: foo
2019-08-05 22:48:04.977 INFO 28843 --- [ask-scheduler-3] demo.UppercaseTransformer : transforming payload foo
2019-08-05 22:48:04.977 INFO 28843 --- [ask-scheduler-3] demo.UppercaseTransformer : Data received: FOO
2019-08-05 22:48:05.978 INFO 28843 --- [ask-scheduler-2] demo.UppercaseTransformer : semaphoreValue: 5, condition: false, foobar: bar
2019-08-05 22:48:05.978 INFO 28843 --- [ask-scheduler-2] demo.UppercaseTransformer : transforming payload bar
2019-08-05 22:48:06.980 INFO 28843 --- [ask-scheduler-4] demo.UppercaseTransformer : semaphoreValue: 6, condition: true, foobar: foo
2019-08-05 22:48:06.981 INFO 28843 --- [ask-scheduler-4] demo.UppercaseTransformer : transforming payload foo
2019-08-05 22:48:06.982 INFO 28843 --- [ask-scheduler-4] demo.UppercaseTransformer : Data received: FOO
2019-08-05 22:48:07.982 INFO 28843 --- [ask-scheduler-1] demo.UppercaseTransformer : semaphoreValue: 7, condition: false, foobar: bar
2019-08-05 22:48:07.983 INFO 28843 --- [ask-scheduler-1] demo.UppercaseTransformer : transforming payload bar
应用程序本地.yml:
spring:
cloud:
stream:
# bindings:
# output:
# destination: xformed
# test-sink:
# destination: xformed
# input:
# destination: testtock
# test-source:
# destination: testtock
default-binder: kafka
最佳答案
您在output
channel 上有两个使用者 - 到主题的绑定(bind)和您的receive()
服务激活器。
默认的循环处理会交替向您的服务激活器和主题发送消息。
关于java - 为什么我只能看到这个 Kafka 示例中的所有其他消息?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57443978/
我想做的是让 JTextPane 在 JPanel 中占用尽可能多的空间。对于我使用的 UpdateInfoPanel: public class UpdateInfoPanel extends JP
我在 JPanel 中有一个 JTextArea,我想将其与 JScrollPane 一起使用。我正在使用 GridBagLayout。当我运行它时,框架似乎为 JScrollPane 腾出了空间,但
我想在 xcode 中实现以下功能。 我有一个 View Controller 。在这个 UIViewController 中,我有一个 UITabBar。它们下面是一个 UIView。将 UITab
有谁知道Firebird 2.5有没有类似于SQL中“STUFF”函数的功能? 我有一个包含父用户记录的表,另一个表包含与父相关的子用户记录。我希望能够提取用户拥有的“ROLES”的逗号分隔字符串,而
我想使用 JSON 作为 mirth channel 的输入和输出,例如详细信息保存在数据库中或创建 HL7 消息。 简而言之,输入为 JSON 解析它并输出为任何格式。 最佳答案 var objec
通常我会使用 R 并执行 merge.by,但这个文件似乎太大了,部门中的任何一台计算机都无法处理它! (任何从事遗传学工作的人的附加信息)本质上,插补似乎删除了 snp ID 的 rs 数字,我只剩
我有一个以前可能被问过的问题,但我很难找到正确的描述。我希望有人能帮助我。 在下面的代码中,我设置了varprice,我想添加javascript变量accu_id以通过rails在我的数据库中查找记
我有一个简单的 SVG 文件,在 Firefox 中可以正常查看 - 它的一些包装文本使用 foreignObject 包含一些 HTML - 文本包装在 div 中:
所以我正在为学校编写一个 Ruby 程序,如果某个值是 1 或 3,则将 bool 值更改为 true,如果是 0 或 2,则更改为 false。由于我有 Java 背景,所以我认为这段代码应该有效:
我做了什么: 我在这些账户之间创建了 VPC 对等连接 互联网网关也连接到每个 VPC 还配置了路由表(以允许来自双方的流量) 情况1: 当这两个 VPC 在同一个账户中时,我成功测试了从另一个 La
我有一个名为 contacts 的表: user_id contact_id 10294 10295 10294 10293 10293 10294 102
我正在使用 Magento 中的新模板。为避免重复代码,我想为每个产品预览使用相同的子模板。 特别是我做了这样一个展示: $products = Mage::getModel('catalog/pro
“for”是否总是检查协议(protocol)中定义的每个函数中第一个参数的类型? 编辑(改写): 当协议(protocol)方法只有一个参数时,根据该单个参数的类型(直接或任意)找到实现。当协议(p
我想从我的 PHP 代码中调用 JavaScript 函数。我通过使用以下方法实现了这一点: echo ' drawChart($id); '; 这工作正常,但我想从我的 PHP 代码中获取数据,我使
这个问题已经有答案了: Event binding on dynamically created elements? (23 个回答) 已关闭 5 年前。 我有一个动态表单,我想在其中附加一些其他 h
我正在尝试找到一种解决方案,以在 componentDidMount 中的映射项上使用 setState。 我正在使用 GraphQL连同 Gatsby返回许多 data 项目,但要求在特定的 pat
我在 ScrollView 中有一个 View 。只要用户按住该 View ,我想每 80 毫秒调用一次方法。这是我已经实现的: final Runnable vibrate = new Runnab
我用 jni 开发了一个 android 应用程序。我在 GetStringUTFChars 的 dvmDecodeIndirectRef 中得到了一个 dvmabort。我只中止了一次。 为什么会这
当我到达我的 Activity 时,我调用 FragmentPagerAdapter 来处理我的不同选项卡。在我的一个选项卡中,我想显示一个 RecyclerView,但他从未出现过,有了断点,我看到
当我按下 Activity 中的按钮时,会弹出一个 DialogFragment。在对话框 fragment 中,有一个看起来像普通 ListView 的 RecyclerView。 我想要的行为是当
我是一名优秀的程序员,十分优秀!