gpt4 book ai didi

java - KafkaStreams 未运行。状态为错误

转载 作者:行者123 更新时间:2023-12-05 06:34:46 26 4
gpt4 key购买 nike

我有一个 Kafka 消费者类,它监听事件并执行事件(订单、客户)之间的连接并将它们存储在物化 View 中。当收到 REST 调用时,我创建了另一个类来访问状态存储。但我越来越java.lang.IllegalStateException:KafkaStreams 未运行。状态错误。 我尝试分配 application.server 属性,但没有成功。类 EventsListener参加 Activity 和类(class) StateStoreService查询事件。代码上传到Github供引用。

基于此讨论 https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1500079616414627在 Matt 的下面的注释中,我删除了 application.server 属性。

The error means, that a consumer subscribed to topic "source-topic" but got partition "test-name-0" (ie, partition 0 from topic "test-name") assigned. Of course, the consumer did not expect to get a partition of topic "test-name" as it never subscribed to it and this is fails.
In Streams, this can happen if you have two application instances of the same application, and both subscribe to different topics (what is not allowed).
Ie, all instanced of an application must have the same processing logic and must subscribe to the same topics. This implies that if you want to consume 2 topics, all instances need to read both topics.

但我仍然无法从状态存储中读取。代码停留在 thread.sleep() 方法。

异常(exception):

Exception in thread "cqrs-streams-5d77fdb3-f75e-443d-b7b7-fabedcbc483f-StreamThread-1" java.lang.IllegalArgumentException: Assigned partition order-0 for non-subscribed topic regex pattern; subscription pattern is 
at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignFromSubscribed(SubscriptionState.java:195)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:225)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:367)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:295)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1146)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)
at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:851)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:808)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
java.lang.IllegalStateException: KafkaStreams is not running. State is ERROR.
at org.apache.kafka.streams.KafkaStreams.validateIsRunning(KafkaStreams.java:312)
at org.apache.kafka.streams.KafkaStreams.allMetadata(KafkaStreams.java:934)
at com.kafkastream.service.StateStoreService.waitUntilStoreIsQueryable(StateStoreService.java:105)
at com.kafkastream.service.StateStoreService.getCustomerOrders(StateStoreService.java:59)
at com.kafkastream.web.EventsController.getCustomerOrders(EventsController.java:101)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:209)
at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:136)
at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:102)
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:877)
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:783)
at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87)
at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:991)
at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:925)
at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:974)
at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:866)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:635)
at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:851)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:742)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:231)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:52)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:99)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.springframework.web.filter.HttpPutFormContentFilter.doFilterInternal(HttpPutFormContentFilter.java:109)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.springframework.web.filter.HiddenHttpMethodFilter.doFilterInternal(HiddenHttpMethodFilter.java:81)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:200)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:198)
at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:96)
at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:496)
at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:140)
at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:81)
at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:87)
at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:342)
at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:803)
at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:66)
at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:790)
at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1459)
at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
at java.lang.Thread.run(Thread.java:748)

最佳答案

“KafkaStreams 未运行。状态为错误”我收到此错误是因为我没有定义我的流媒体应用程序的接收器主题。当我定义接收器主题时。我能够解决此错误。

关于java - KafkaStreams 未运行。状态为错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50107514/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com