- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在尝试使用带有“replyingKafkaTemplate”的kafka消费者发布一些消息。我的主要目的是订阅消息、修改消息并发回修改后的消息。我尝试增加replykafkaTemplate的replyTimeout。但即便如此,我也没有得到订阅者的回应。生产者控制台显示以下内容。
我尝试增加事务超时、请求超时。但对我来说没有任何作用。任何帮助将不胜感激。
提前致谢
这些是我的配置 bean:
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> properties = new HashMap<>();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
properties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 1000);
return properties;
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> properties = new HashMap<>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return properties;
}
@Bean
public ReplyingKafkaTemplate<String, User, User> replyKafkaTemplate(ProducerFactory<String, User> pf,
KafkaMessageListenerContainer<String, User> container) {
ReplyingKafkaTemplate<String, User, User> replyTemplate = new ReplyingKafkaTemplate<>(pf, container);
replyTemplate.setReplyTimeout(30000);
return replyTemplate;
}
这是我的消费者:
@KafkaListener(topics = "user",containerFactory="kafkaListenerContainerFactory")
@SendTo
public User listen(User user) throws InterruptedException {
System.out.println("************* message published *************");
user.setName("myName");
return user;
}
WARN 8088 --- [TaskScheduler-1] o.s.k.r.ReplyingKafkaTemplate:回复超时:ProducerRecord(topic=user,partition=null, headers=RecordHeaders(headers = [RecordHeader(key = kafka_replyTopic, value = [ 117, 115, 101, 114]), RecordHeader(key = kafka_correlationId, value = [85, 92, 37, -119, 89, 32, 77, -1, -75, -107, 106, 42, 68, 12 , -124, -105]), RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 107, 97, 102, 107, 97, 46, 109, 111, 100, 101, 108, 46, 85, 115, 101, 114])], isReadOnly = true), key=null, value=com.kafka.model.User@71178924, timestamp=null) 与correlationId: [113462832283699872744219122180807230615]
最佳答案
您可能需要在配置ReplyingKafkaTemplate中添加sharedReplyTopic:
@Bean
public ReplyingKafkaTemplate<String, User, User> replyKafkaTemplate(ProducerFactory<String, User> pf,
KafkaMessageListenerContainer<String, User> container) {
ReplyingKafkaTemplate<String, User, User> replyTemplate = new ReplyingKafkaTemplate<>(pf, container);
replyTemplate.setReplyTimeout(30000);
replyTemplate.setSharedReplyTopic(true);
return replyTemplate;
}
这是我的完整示例完整配置:
@Override
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "record-reader");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put("ssl.endpoint.identification.algorithm", ``sslEndpointIdentificationAlgorithm);
props.put("sasl.mechanism", saslMechanism);
props.put("request.timeout.ms", requestTimeoutMs);
props.put("retry.backoff.ms", retryBackoffMs);
props.put("security.protocol", securityProtocol);
props.put("sasl.jaas.config", saslJaasConfig);
return props;
}
@Override
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
new StringDeserializer());
}
@Override
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setReplyTemplate(kafkaTemplate);
return factory;
}
@Override
@Bean
public KafkaMessageListenerContainer<String, String> replyContainer() {
ContainerProperties containerProperties = new ContainerProperties(customerIndexTopic,customerReplyTopic);
return new KafkaMessageListenerContainer<>(consumerFactory(), containerProperties);
}
@Override
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put("ssl.endpoint.identification.algorithm", sslEndpointIdentificationAlgorithm);
props.put("sasl.mechanism", saslMechanism);
props.put("request.timeout.ms", requestTimeoutMs);
props.put("retry.backoff.ms", retryBackoffMs);
props.put("security.protocol", securityProtocol);
props.put("sasl.jaas.config", saslJaasConfig);
return props;
}
@Override
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Override
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Override
@Bean
public ReplyingKafkaTemplate<String, String, String> replyKafkaTemplate(
KafkaMessageListenerContainer<String, String> container) {
ReplyingKafkaTemplate<String, String, String> replyingKafkaTemplate = new ReplyingKafkaTemplate<>(producerFactory(),container);
replyingKafkaTemplate.setSharedReplyTopic(true);
replyingKafkaTemplate.setReplyTimeout(10000);
return replyingKafkaTemplate;
}
关于java - o.s.k.r.ReplyingKafkaTemplate : Reply timed out for: ProducerRecord,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56161248/
在尝试 time 的 python 执行时,我发现在一条语句中两次调用 time.time() 时出现奇怪的行为。在语句执行期间获取time.time() 有一个非常小的处理延迟。 例如time.ti
我要疯了。对于我的生活,我无法弄清楚为什么以下代码会导致 Unity 在我按下播放键后立即卡住。这是一个空的项目,脚本附加到一个空的游戏对象。在控制台中,什么也没有出现,甚至没有出现初始的 Debug
我要疯了。对于我的生活,我无法弄清楚为什么以下代码会导致 Unity 在我按下播放键后立即卡住。这是一个空的项目,脚本附加到一个空的游戏对象。在控制台中,什么也没有出现,甚至没有出现初始的 Debug
我不明白为什么下面的结果是一样的。我预计第一个结果是指针地址。 func print(t *time.Time) { fmt.Println(t) // 2009-11-10 23:00:00
Python 3.6.4 (v3.6.4:d48eceb, Dec 19 2017, 06:54:40) [MSC v.1900 64 bit (AMD64)] on win32 Type "help
当我有一个time.Time时: // January, 29th t, _ := time.Parse("2006-01-02", "2016-01-29") 如何获得代表 1 月 31 日的 ti
首先,我意识到不推荐使用 time with time zone。我要使用它是因为我将多个 time with time zone 值与我当前的系统时间进行比较,而不管是哪一天。 IE。用户说每天 0
长期以来,在 Rust 中精确测量时间的标准方法是 time crate 及其 time::precise_time_ns功能。但是,time crate 现在已被弃用,std 库有 std::tim
我正在我学校的一个科学集群上运行我的有限差分程序。该程序使用 openmpi 来并行化代码。 当程序连续运行时,我得到: real 78m40.592s user 78m34.920s s
尽管它们已被弃用并且有比 time 更好的模块(即 timeit),但我想知道这两个函数 time 之间的区别.clock() 和 time.time()。 从后者 (time.time()) 开始,
这个问题在这里已经有了答案: Python's time.clock() vs. time.time() accuracy? (16 个答案) 关闭 6 年前。 我认为两者都衡量时间量?但是他们返回
我正在尝试测试 http 请求处理代码块在我的 Flask Controller 中需要多长时间,这是我使用的示例代码: cancelled = [] t0 = time.time() t1 = ti
运行 python 的计算机时钟(Windows 或 Linux)时会发生什么自动更改并调用 time.time()? 我读到,当时钟手动更改为过去的某个值时,time.time() 的值会变小。 最
我有一个结构可能无法在其字段之一上设置 time.Time 值。测试无效性时,我不能使用 nil 或 0。time.Unix(0,0) 也不相同。我想到了这个: var emptyTime time.
我有一个打算用数据库记录填充的结构,其中一个日期时间列可以为空: type Reminder struct { Id int CreatedAt time.Time
问题陈述:通过匹配其百分比随机执行各种命令。比如执行 CommandA 50% 的时间和 commandB 25% 的时间和 commandC 15% 的时间等等,总百分比应该是 100%。 我的问题
我正在使用 laravel 6。我在同一个应用程序中有类似的 Controller 和类似的 View ,它工作正常。对比之后还是找不到错误。 Facade\Ignition\Exceptions\V
我需要用 ("%m/%d/%Y %H:%M:%S") 格式表示时间,我得到的浮点值是 time.time(). 我已经有了一个 time.time() 形式的值。例如,我已经有一个值,我每 0.3 秒
我正在使用以下方法获取 utc 日期时间: import datetime import time from pytz import timezone now_utc = datetime.datet
我在 Ubuntu 上使用 time.clock 和 time.time 为一段 python 代码计时: clock elapsed time: 8.770 s time elapsed time
我是一名优秀的程序员,十分优秀!