- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
升级到 Spring Boot 2、Reactor 3.5、kafka-binder 2.0.0 RELEASE 和 kafka-client 1.0.1 后,其中一个模块无法正常工作。我花了 5 天时间制作它并阅读了所有相关主题,但找不到这种行为的原因。
主类:
@Slf4j
@EnableI18N
@EnableSideBar
@ComponentScan
@SpringBootConfiguration
@EnableConfigurationProperties
@EnableAspectJAutoProxy(proxyTargetClass = true)
public class View
{
public static void main(String[] args)
{
new SpringApplicationBuilder(View.class)
.profiles("production")
.bannerMode(Banner.Mode.OFF)
.headless(true)
.application()
.run(args);
log.info("\nhttp://localhost:8083/\n");
}
}
配置标记:
@Configuration
@Profile("production")
@EnableBinding(OffersChannel.class)
class ProductionOffersConfiguration
{
}
channel 界面:
public interface OffersChannel
{
String OFFERS_OBTAIN = "offersObtain";
String OFFERS_OBTAIN_REQUEST= "offersObtainRequest";
@Input(OFFERS_OBTAIN)
SubscribableChannel offersChannel();
@Output(OFFERS_OBTAIN_REQUEST)
MessageChannel offersRequestChannel();
}
AdminUI 类,在更新依赖项之前重要的是,当初始化此类时,我能够在控制台中看到一个日志,上面写着我订阅了 channel ,现在没有任何反应:
@Slf4j
@Push
@Theme("${view.default-theme}")
@SpringUI(path = WebsiteMapping.ADMIN)
@RequiredArgsConstructor
public class AdminUI extends UI
{
MessageChannel offersObtainRequest;
Grid<Ad> adGrid = createAdGrid();
private List<Ad> ads = new LinkedList<>();
ConnectionService connectionService;
@Override
@SneakyThrows
protected void init(VaadinRequest vaadinRequest)
{
setContent(splitPane());
adGrid.setItems(ads);
}
private VerticalSplitPanel splitPane()
{
VerticalSplitPanel verticalSplitPanel = new VerticalSplitPanel();
verticalSplitPanel.setSplitPosition(10, Unit.PERCENTAGE);
verticalSplitPanel.setFirstComponent(buttonsLayout());
verticalSplitPanel.setSecondComponent(adGrid);
return verticalSplitPanel;
}
private Layout buttonsLayout()
{
HorizontalLayout layout = new HorizontalLayout();
layout.setMargin(true);
layout.addComponent(requestMoreOffersButton());
ThemeSelectorComboBox themeSelectorComboBox = new ThemeSelectorComboBox();
layout.addComponent(themeSelectorComboBox);
layout.setComponentAlignment(themeSelectorComboBox, Alignment.MIDDLE_RIGHT);
return layout;
}
@PostConstruct
private void createGridProperties()
{
adGrid.setSizeFull();
adGrid.addColumn(Ad::getTitle).setCaption("Title");
adGrid.addColumn(Ad::getLocation).setCaption("Location");
adGrid.addColumn(Ad::getHref).setCaption("Href");
}
@StreamListener
public void fetchAdsFrom(@Input(OffersChannel.OFFERS_OBTAIN) Flux<Ad> fluxAd)
{
fluxAd.subscribe(this::displayOfferInGrid);
}
private void displayOfferInGrid(Ad ad)
{
ads.add(ad);
adGrid.setItems(ads);
}
private Button requestMoreOffersButton()
{
return new Button("Request 10 more offers", this::requestMoreOffers);
}
private Button startServiceButton(String caption, String url, String message)
{
return new Button(caption, buttonClickedEvent -> startService(url, message));
}
private void startService(String url, String message)
{
ConnectionRequest startProviderRequest = ConnectionRequest
.builder()
.url(url)
.build();
connectionService
.getForHtml(startProviderRequest)
.thenAccept(serviceStarted -> Notification.show(message));
}
private void requestMoreOffers(Event event)
{
offersObtainRequest.send(new GenericMessage(new AdBroadcastRequest(ads.size(),10)));
}
private Grid<Ad> createAdGrid()
{
return new Grid<>();
}
}
应用程序.yml:
spring:
cloud:
stream:
bindings:
offersObtainRequest:
destination: adsBroadcastRequestes
binder: kafka
group: adsBroadcastRequestsProducer
offersObtain:
destination: adsBroadcast
binder: kafka
group: adsConsumer
堆栈跟踪:
2018-04-07 11:27:12.010 DEBUG o.s.retry.support.RetryTemplate : Retry: count=0
2018-04-07 11:27:12.010 DEBUG o.s.integration.channel.DirectChannel : preSend on channel 'offersObtain', message: GenericMessage [payload=byte[227], headers={kafka_offset=12, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@11d6acc8, deliveryAttempt=1, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, contentType=application/json, kafka_receivedTopic=adsBroadcast, kafka_receivedTimestamp=1523093225984}]
2018-04-07 11:27:12.010 DEBUG o.s.r.backoff.ExponentialBackOffPolicy : Sleeping for 1000
2018-04-07 11:27:13.011 DEBUG o.s.retry.support.RetryTemplate : Checking for rethrow: count=1
2018-04-07 11:27:13.011 DEBUG o.s.retry.support.RetryTemplate : Retry: count=1
2018-04-07 11:27:13.011 DEBUG o.s.integration.channel.DirectChannel : preSend on channel 'offersObtain', message: GenericMessage [payload=byte[227], headers={kafka_offset=12, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@11d6acc8, deliveryAttempt=2, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, contentType=application/json, kafka_receivedTopic=adsBroadcast, kafka_receivedTimestamp=1523093225984}]
2018-04-07 11:27:13.011 DEBUG o.s.r.backoff.ExponentialBackOffPolicy : Sleeping for 2000
2018-04-07 11:27:13.909 DEBUG o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=adsConsumer] Sending Heartbeat request to coordinator Kacper-PC:9092 (id: 2147483647 rack: null)
2018-04-07 11:27:14.138 DEBUG o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=adsConsumer] Received successful Heartbeat response
2018-04-07 11:27:15.011 DEBUG o.s.retry.support.RetryTemplate : Checking for rethrow: count=2
2018-04-07 11:27:15.011 DEBUG o.s.retry.support.RetryTemplate : Retry: count=2
2018-04-07 11:27:15.011 DEBUG o.s.integration.channel.DirectChannel : preSend on channel 'offersObtain', message: GenericMessage [payload=byte[227], headers={kafka_offset=12, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@11d6acc8, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, contentType=application/json, kafka_receivedTopic=adsBroadcast, kafka_receivedTimestamp=1523093225984}]
2018-04-07 11:27:15.011 DEBUG o.s.retry.support.RetryTemplate : Checking for rethrow: count=3
2018-04-07 11:27:15.011 DEBUG o.s.retry.support.RetryTemplate : Retry failed last attempt: count=3
2018-04-07 11:27:15.012 DEBUG o.s.i.h.a.ErrorMessageSendingRecoverer : Sending ErrorMessage: failedMessage: GenericMessage [payload=byte[227], headers={kafka_offset=12, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@11d6acc8, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, contentType=application/json, kafka_receivedTopic=adsBroadcast, kafka_receivedTimestamp=1523093225984}]
org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application.offersObtain'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=byte[227], headers={kafka_offset=12, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@11d6acc8, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, contentType=application/json, kafka_receivedTopic=adsBroadcast, kafka_receivedTimestamp=1523093225984}]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:463) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:407) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181) ~[spring-messaging-5.0.4.RELEASE.jar:5.0.4.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160) ~[spring-messaging-5.0.4.RELEASE.jar:5.0.4.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.0.4.RELEASE.jar:5.0.4.RELEASE]
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108) ~[spring-messaging-5.0.4.RELEASE.jar:5.0.4.RELEASE]
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:203) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$300(KafkaMessageDrivenChannelAdapter.java:70) ~[spring-integration-kafka-3.0.3.RELEASE.jar:3.0.3.RELEASE]
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:387) ~[spring-integration-kafka-3.0.3.RELEASE.jar:3.0.3.RELEASE]
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:364) ~[spring-integration-kafka-3.0.3.RELEASE.jar:3.0.3.RELEASE]
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.lambda$onMessage$0(RetryingMessageListenerAdapter.java:120) [spring-kafka-2.1.4.RELEASE.jar:2.1.4.RELEASE]
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287) [spring-retry-1.2.2.RELEASE.jar:na]
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:211) [spring-retry-1.2.2.RELEASE.jar:na]
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:114) [spring-kafka-2.1.4.RELEASE.jar:2.1.4.RELEASE]
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:40) [spring-kafka-2.1.4.RELEASE.jar:2.1.4.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1001) [spring-kafka-2.1.4.RELEASE.jar:2.1.4.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:981) [spring-kafka-2.1.4.RELEASE.jar:2.1.4.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:932) [spring-kafka-2.1.4.RELEASE.jar:2.1.4.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:801) [spring-kafka-2.1.4.RELEASE.jar:2.1.4.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:689) [spring-kafka-2.1.4.RELEASE.jar:2.1.4.RELEASE]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_151]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_151]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_151]
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:138) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
... 23 common frames omitted
最佳答案
好吧,里面发生了很多事情。我的意思是所有非 Spring-Cloud-Stream 的东西。 . .让人难以理解。在任何情况下,您的 @StreamListener
都没有在任何 Spring 管理的配置类中定义,因此它不会被获取。您可以将其移动到 ProductionOffersConfiguration
或 View
或任何其他 Spring 管理的配置类。
此外,请考虑阅读此快速教程 https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RELEASE/reference/htmlsingle/#_quick_start (5 分钟)更好地理解 spring-cloud-stream 的机制
关于apache-kafka - 调度程序没有 channel 订阅者 - spring-cloud-stream-kafka,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49706189/
我有一个带有一些功能的perl对象。每个功能从主程序中调用一次。我想并行运行某些功能以节省时间。由于某些功能取决于先前功能的结果,因此我无法将它们全部一起运行。 我想到了这样的事情: 对于每个函数,保
首先,我的代码在这里: import schedule # see https://github.com/dbader/schedule import crawler def job(): p
从 11 月 1 日开始,我必须使用quartz调度程序每4个月安排一次任务。我使用 cronExpression 来实现同样的目的。但 cronExpression 每年都会重置。所以我的任务将在
我有以下代码块,它调用两个请求,但略有延迟。 final ActorRef actor1 = getContext().actorOf( ActorClass.prop
考虑到 Linux 的情况,我们为每个用户堆栈都有一个内核堆栈,据我所知,每当发生上下文切换时,我们都会切换到当前进程的内核模式。 这里我们保存当前进程的当前状态,寄存器,程序数据等,然后调度器(不确
我有将东西移植到 OpenBSD 的奇怪爱好。我知道它有 pthreads 问题,但在 2013 年 5 月发布版本之前我不会升级。我使用的是 5.0,我对 pthreads 还很陌生。我已经学习了
给定一组任务: T1(20,100) T2(30,250) T3(100,400) (execution time, deadline=peroid) 现在我想将截止日期限制为 Di = f * Pi
使用 Django 开发一个小型日程安排 Web 应用程序,在该应用程序中,人们被分配特定的时间与他们的上级会面。员工存储为模型,与表示时间范围和他们有空的星期几的模型具有 OneToMany 关系。
我想了解贪婪算法调度问题的工作原理。 所以我一直在阅读和谷歌搜索一段时间,因为我无法理解贪心算法调度问题。 我们有 n 个作业要安排在单个资源上。作业 (i) 有一个请求的开始时间 s(i) 和结束时
这是流行的 El Goog 问题的变体。 考虑以下调度问题:有 n 个作业,i = 1..n。有 1 台 super 计算机和无限的 PC。每个作业都需要先经过 super 计算机的预处理,然后再在P
假设我有一个需要运行多次的蜘蛛 class My_spider(Scrapy.spider): #spider def 我想做这样的事 while True: runner = Cra
我已将 podAntiAffinity 添加到我的 DeploymentConfig 模板中。 但是,pod 被安排在我预计会被规则排除的节点上。 我如何查看 kubernetes 调度程序的日志以了
我已经使用 React - Redux - Typescript 堆栈有一段时间了,到目前为止我很喜欢它。但是,由于我对 Redux 很陌生,所以我一直在想这个特定的话题。 调度 Redux 操作(和
我想按照预定的计划(例如,周一至周五,美国东部时间晚上 9 点至 5 点)运行单个 Azure 实例以减少账单,并且想知道最好的方法是什么。 问题的两个部分: 能否使用服务管理 API [1] 按预定
假设最小模块安装(为了简单起见),Drupal 的 index.php 中两个顶级功能的核心“职责”是什么? ? drupal_bootstrap(DRUPAL_BOOTSTRAP_FULL); me
我正在尝试使用 Racket(以前称为 PLT Scheme)连接 URL 调度。我查看了教程和服务器文档。我不知道如何将请求路由到相同的 servlet。 具体例子: #lang 方案 (需要网络服
我想在 Airflow (v1.9.0) 上运行计划。 我的DAG需要在每个月底运行,但我不知道如何编写设置。 my_dag = DAG(dag_id=DAG_ID, cat
我正在尝试在“httpTrigger”类型函数的 function.json 中设置计划字段,但计时器功能似乎未运行。我的目标是拥有一个甚至可以在需要时进行调度和手动启动的功能,而不必仅为了调度而添加
我正在尝试制定每周、每月的 Airflow 计划,但不起作用。有人可以报告可能发生的情况吗?如果我每周、每月进行安排,它就会保持静止,就好像它被关闭一样。没有错误信息,只是不执行。我发送了一个代码示例
我希望每两周自动更新一次我的表格。我希望我的函数能够被 firebase 调用。 这可能吗? 我正在使用 Angular 2 Typescript 和 Firebase。 最佳答案 仅通过fireba
我是一名优秀的程序员,十分优秀!