- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在使用依赖项处理 SpringBootApplication - spring-cloud-stream-binder-kafka-streams
并尝试测试在发生 serdeError
时向 Dlq 发送错误消息。
@Slf4j
@Component
@EnableBinding(KafkaBinding.class)
public class AListener {
@StreamListener
public void sink(@Input(KafkaBinding.ABINDING) KStream<String, AnOrder> events) {
log.info("HERE_BEFORE");
events.foreach((k, v) -> {
log.info("HERE_AFTER value: {}", v.toString());
throw new RuntimeException("Failed, should land in dlq topic");
});
}
}
public interface KafkaBinding {
String ABINDING = "some.events";
@Input(ABINDING)
public KStream<String, AnOrder> incomingOrder();
}
应用程序.yml
spring:
application:
name: aprocessor
cloud:
stream:
kafka:
streams:
binder:
brokers: localhost:9092
serdeError: sendToDlq
configuration:
commit.interval.ms: 1000
default:
key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
bindings:
input:
consumer:
enableDlq: true
dlqName: a-dlq
autoCommitOnError: true
autoCommitOffset: true
bindings:
input:
group: a-group
destination: some.events
pos:
destination: some.events
consumer.header-mode: raw
测试:
@Slf4j
@DirtiesContext
@SpringBootTest
@EmbeddedKafka(
partitions = 1,
topics = {"some.events"},
controlledShutdown = true,
brokerProperties = {
"listeners=PLAINTEXT://localhost:9092",
"port=9092",
"auto.create.topics.enable=${topics.autoCreate:false}",
"delete.topic.enable=${topic.delete:true}"
})
public class AListenerTest {
private KafkaTemplate<String, String> kafkaTemplate;
@Autowired private EmbeddedKafkaBroker embeddedKafka;
@SpyBean private AListener listener;
private static final String INPUT_TOPIC = "some.events";
@BeforeEach
public void setUp() {
Map<String, Object> senderProperties =
KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString());
ProducerFactory<String, String> producerFactory =
new DefaultKafkaProducerFactory<>(senderProperties);
kafkaTemplate = new KafkaTemplate<>(producerFactory);
kafkaTemplate.setDefaultTopic(INPUT_TOPIC);
}
@Test
public void whenExceptionInConsumer_thenLogToDLQ(){
String logme = "{\"body\":\"thor\"}";
kafkaTemplate.sendDefault(logme);
log.info("<<<<DATA>>>> {}", logme);
}
}
测试失败,堆栈跟踪如下:
Caused by: org.springframework.context.ApplicationContextException: Failed to start bean 'inputBindingLifecycle'; nested exception is java.lang.IllegalArgumentException: DLQ support is not available for anonymous subscriptions
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:185)
at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:53)
at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:360)
at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:158)
at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:122)
at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:893)
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:552)
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:775)
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:316)
at org.springframework.boot.test.context.SpringBootContextLoader.loadContext(SpringBootContextLoader.java:127)
at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContextInternal(DefaultCacheAwareContextLoaderDelegate.java:99)
at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContext(DefaultCacheAwareContextLoaderDelegate.java:117)
... 54 more
Caused by: java.lang.IllegalArgumentException: DLQ support is not available for anonymous subscriptions
at org.springframework.util.Assert.isTrue(Assert.java:118)
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.doProvisionConsumerDestination(KafkaTopicProvisioner.java:186)
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.provisionConsumerDestination(KafkaTopicProvisioner.java:161)
at org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsBinderUtils.prepareConsumerBinding(KafkaStreamsBinderUtils.java:53)
at org.springframework.cloud.stream.binder.kafka.streams.KStreamBinder.doBindConsumer(KStreamBinder.java:93)
at org.springframework.cloud.stream.binder.kafka.streams.KStreamBinder.doBindConsumer(KStreamBinder.java:51)
at org.springframework.cloud.stream.binder.AbstractBinder.bindConsumer(AbstractBinder.java:142)
at org.springframework.cloud.stream.binding.BindingService.doBindConsumer(BindingService.java:144)
at org.springframework.cloud.stream.binding.BindingService.bindConsumer(BindingService.java:112)
at org.springframework.cloud.stream.binding.BindableProxyFactory.createAndBindInputs(BindableProxyFactory.java:254)
at org.springframework.cloud.stream.binding.InputBindingLifecycle.doStartWithBindable(InputBindingLifecycle.java:58)
at java.base/java.util.LinkedHashMap$LinkedValues.forEach(LinkedHashMap.java:608)
at org.springframework.cloud.stream.binding.AbstractBindingLifecycle.start(AbstractBindingLifecycle.java:48)
at org.springframework.cloud.stream.binding.InputBindingLifecycle.start(InputBindingLifecycle.java:34)
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:182)
... 66 more
我希望测试成功,控制台日志显示创建了一个 dlq/topic 并查询 dlq 以打印消息。是什么导致 KafkaTopicProvisioner 抛出“IllegalArgumentException:DLQ 支持不适用于匿名订阅”?
我已经尝试过此处帖子中提到的步骤 - “Correctly manage DLQ in Spring Cloud Stream Kafka”。
最佳答案
不允许匿名消费者使用DLQ;你需要一个持久的订阅。
匿名消费者是那些没有指定消费者组的消费者。
来自您引用的答案。
bindings:
input:
group: so51247113
此外,这是开源的,您可以查看 KafkaTopicProvisioner
的源代码......
boolean anonymous = !StringUtils.hasText(group);
Assert.isTrue(!anonymous || !properties.getExtension().isEnableDlq(),
"DLQ support is not available for anonymous subscriptions");
关于spring - 是什么导致 - "DLQ support is not available for anonymous subscriptions"?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57000784/
目前我使用服务器接收 GCP Pub/Sub 消息,我想执行 Purchase.Subscriptions:acknowledge调用以确认在服务器收到通知时尚未确认的订阅。 这是场景... 用户购
在 SQL Server 中,在“订阅端”,如何知道表是否处于复制/订阅状态? 有什么想法吗? 最佳答案 我不确定这个问题是否有一个简单的答案,并且我认为答案可能会根据复制类型的不同而有所不同。我认为
我正在使用 Angular 5 并使用 subscribe() 订阅了一个 observable方法。我想知道是否只调用unsubscribe()订阅上的方法足以清理所有内容,或者我也应该调用 rem
我似乎无法解决这个问题。下面是我的代码: #include #include #include _Bool are_anagrams (const char *word1, const char *w
当我使用 gcc -Wall 运行程序时,我得到了 warning: array subscript has type ‘char’ 请帮我看看哪里出了问题。警告显示在第 20:7 和 21:7 行。
我有一个来自 BlockCollection 的 Observable,我像队列一样使用它 IObservable GetObservable() { _queue.GetConsum
错误 1:当我尝试从元数据中获取 stringValue 时,在 Swift3 中显示上述错误: let myMetadata: AVMetadataMachineReadableCodeObject
关闭。这个问题不符合Stack Overflow guidelines .它目前不接受答案。 这个问题似乎与 help center 中定义的范围内的编程无关。 . 关闭 9 年前。 Improve
我正在将Paypal的订阅整合到我的网络服务中。。我想知道当我收到onApprove回调(Java SDK)时,订阅是否被激活。。这也意味着onApprove回调是否等同于从WebHook接收通知事件
这个问题在这里已经有了答案: Warning: array subscript has type char (3 个答案) 关闭 3 年前。 问题是这个警告在 15 和 18 警告:数组下标的类型为
根据卷积定理,卷积运算在傅里叶域变为逐点乘法——这里我有'fft_x'的形状(批量大小,高度,宽度,in_channels)这是输入数据的fft和类似的形状(高度,宽度,in_channels,out
我正在按照该教程进行操作,我正在运行带有 cygwin 编译器的 WindowsXP 32 位,该教程要求我运行以下代码: #include #include // forward declara
我有一个网站,我想实现付费订阅服务。它是一项简单的服务,只有两种类型的计划。现在,生病只是使用 Paypal 。但是在开始之前我有点迷茫,主要是数据模型。 我现在的主要问题是,我需要为每个订阅保留哪些
我们可以仅依靠subscr_eot来激活/停用帐户吗? 假设我们有以下情形: 在9/16,客户使用Paypal支付每月定期费用 服务。 24小时后, Paypal (Paypal)发送“subscr_
我的应用程序已在我的模拟器上完成,但当我尝试在我的手机上使用时,我遇到了 2 个错误 “下标”的使用不明确 我正在阅读这篇文章,但无法修复它。 How to solve Ambiguous use o
我在尝试使用下标访问范围的第 n 个元素时遇到问题。代码 super 简单: var range = 0.. = 0..)[0] 但是这些都没有解决问题。有没有办法让 Xcode 消除对 subscr
我是 angualr 2 的新手。我正在做一个小的练习应用程序。我有一个用户登录时的身份验证服务。这是它的登录功能。 login(email:string, password:string){
我是 Swift 的初学者,对运算符没有任何高级知识。 我有以下类(class) class Container { var list: [Any] = []; } 我想实现运算符 subscr
我正在尝试从 99 张纸中复制第二行,并将它们一个接一个地添加到第 100 张纸中。 vba 代码如下,但是我不断收到以下错误: 运行时错误'9': 下标超出范围 Sub copyrow()
我正在尝试在 Excel 中的 VBA 中创建一个宏,以将一个工作表的内容复制到另一个工作表(最终将升级为将内容从一个工作簿复制到另一个工作簿,但需要先证明这个概念)但我一直遇到运行时错误'9':“下
我是一名优秀的程序员,十分优秀!