KafkaStreams 不是 运行。状态为错误
KafkaStreams is not running. State is ERROR
我有一个 Kafka 消费者 class,它监听事件并执行事件(订单、客户)之间的连接并将它们存储在物化视图中。当收到 REST 调用时,我创建了另一个 class 来访问状态存储。但我越来越
java.lang.IllegalStateException:KafkaStreams 不是 运行。 State is ERROR. 我试着赋值 application.server 属性,没成功。 Class EventsListener joins events and class StateStoreService queries events. Code uploaded to Github 供参考。
基于此讨论 https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1500079616414627 和下面 Matt 的注释,我删除了 application.server 属性。
The error means, that a consumer subscribed to topic "source-topic" but got partition "test-name-0" (ie, partition 0 from topic "test-name") assigned. Of course, the consumer did not expect to get a partition of topic "test-name" as it never subscribed to it and this is fails.
In Streams, this can happen if you have two application instances of the same application, and both subscribe to different topics (what is not allowed).
Ie, all instanced of an application must have the same processing logic and must subscribe to the same topics. This implies that if you want to consume 2 topics, all instances need to read both topics.
但我仍然无法从状态存储中读取。代码停留在 thread.sleep() 方法。
异常:
Exception in thread "cqrs-streams-5d77fdb3-f75e-443d-b7b7-fabedcbc483f-StreamThread-1" java.lang.IllegalArgumentException: Assigned partition order-0 for non-subscribed topic regex pattern; subscription pattern is
at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignFromSubscribed(SubscriptionState.java:195)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:225)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:367)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:295)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1146)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)
at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:851)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:808)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
java.lang.IllegalStateException: KafkaStreams is not running. State is ERROR.
at org.apache.kafka.streams.KafkaStreams.validateIsRunning(KafkaStreams.java:312)
at org.apache.kafka.streams.KafkaStreams.allMetadata(KafkaStreams.java:934)
at com.kafkastream.service.StateStoreService.waitUntilStoreIsQueryable(StateStoreService.java:105)
at com.kafkastream.service.StateStoreService.getCustomerOrders(StateStoreService.java:59)
at com.kafkastream.web.EventsController.getCustomerOrders(EventsController.java:101)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:209)
at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:136)
at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:102)
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:877)
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:783)
at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87)
at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:991)
at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:925)
at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:974)
at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:866)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:635)
at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:851)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:742)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:231)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:52)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:99)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.springframework.web.filter.HttpPutFormContentFilter.doFilterInternal(HttpPutFormContentFilter.java:109)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.springframework.web.filter.HiddenHttpMethodFilter.doFilterInternal(HiddenHttpMethodFilter.java:81)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:200)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:198)
at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:96)
at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:496)
at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:140)
at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:81)
at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:87)
at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:342)
at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:803)
at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:66)
at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:790)
at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1459)
at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
at java.lang.Thread.run(Thread.java:748)
问题是由错误的方法引起的。 Rest Proxy 应该由消费者 class 启动,第二个 class 应该从 REST API 由码头服务器
启动获取数据
- 从 EventsListener 启动 REST 代理 class(只需执行 EventsListener class)
- 应该可以从 http://localhost:8095/customer-orders/all
访问所有客户订单
- 启动SpringBootclass并尝试访问http://localhost:8090/customerOrders/CU9978。这应该连接到由 EventsListener class.
启动的 REST 代理
“KafkaStreams 不是 运行。状态错误”
我收到此错误是因为我没有定义我的流媒体应用程序的接收器主题。当我定义接收器主题时。我能够解决此错误。
我有一个 Kafka 消费者 class,它监听事件并执行事件(订单、客户)之间的连接并将它们存储在物化视图中。当收到 REST 调用时,我创建了另一个 class 来访问状态存储。但我越来越 java.lang.IllegalStateException:KafkaStreams 不是 运行。 State is ERROR. 我试着赋值 application.server 属性,没成功。 Class EventsListener joins events and class StateStoreService queries events. Code uploaded to Github 供参考。
基于此讨论 https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1500079616414627 和下面 Matt 的注释,我删除了 application.server 属性。
The error means, that a consumer subscribed to topic "source-topic" but got partition "test-name-0" (ie, partition 0 from topic "test-name") assigned. Of course, the consumer did not expect to get a partition of topic "test-name" as it never subscribed to it and this is fails.
In Streams, this can happen if you have two application instances of the same application, and both subscribe to different topics (what is not allowed).
Ie, all instanced of an application must have the same processing logic and must subscribe to the same topics. This implies that if you want to consume 2 topics, all instances need to read both topics.
但我仍然无法从状态存储中读取。代码停留在 thread.sleep() 方法。
异常:
Exception in thread "cqrs-streams-5d77fdb3-f75e-443d-b7b7-fabedcbc483f-StreamThread-1" java.lang.IllegalArgumentException: Assigned partition order-0 for non-subscribed topic regex pattern; subscription pattern is
at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignFromSubscribed(SubscriptionState.java:195)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:225)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:367)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:295)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1146)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)
at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:851)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:808)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
java.lang.IllegalStateException: KafkaStreams is not running. State is ERROR.
at org.apache.kafka.streams.KafkaStreams.validateIsRunning(KafkaStreams.java:312)
at org.apache.kafka.streams.KafkaStreams.allMetadata(KafkaStreams.java:934)
at com.kafkastream.service.StateStoreService.waitUntilStoreIsQueryable(StateStoreService.java:105)
at com.kafkastream.service.StateStoreService.getCustomerOrders(StateStoreService.java:59)
at com.kafkastream.web.EventsController.getCustomerOrders(EventsController.java:101)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:209)
at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:136)
at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:102)
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:877)
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:783)
at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87)
at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:991)
at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:925)
at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:974)
at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:866)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:635)
at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:851)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:742)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:231)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:52)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:99)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.springframework.web.filter.HttpPutFormContentFilter.doFilterInternal(HttpPutFormContentFilter.java:109)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.springframework.web.filter.HiddenHttpMethodFilter.doFilterInternal(HiddenHttpMethodFilter.java:81)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:200)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:198)
at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:96)
at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:496)
at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:140)
at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:81)
at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:87)
at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:342)
at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:803)
at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:66)
at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:790)
at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1459)
at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
at java.lang.Thread.run(Thread.java:748)
问题是由错误的方法引起的。 Rest Proxy 应该由消费者 class 启动,第二个 class 应该从 REST API 由码头服务器
启动获取数据- 从 EventsListener 启动 REST 代理 class(只需执行 EventsListener class)
- 应该可以从 http://localhost:8095/customer-orders/all 访问所有客户订单
- 启动SpringBootclass并尝试访问http://localhost:8090/customerOrders/CU9978。这应该连接到由 EventsListener class. 启动的 REST 代理
“KafkaStreams 不是 运行。状态错误” 我收到此错误是因为我没有定义我的流媒体应用程序的接收器主题。当我定义接收器主题时。我能够解决此错误。