- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我目前正在构建一个应用程序,它写入一个 kafka 主题并监听同一个主题以从中生成一个 ktable
并将其具体化到一个商店中。我正在运行的代码基于以下 sample .我几乎复制了其中的大部分内容(除了 PageViewEventSource 之外的所有内容)并将名称重构到我的用例中。我还使用示例中使用的 key 更新了我的 application.properties
。
运行应用程序时出现以下错误:
2020-02-12 17:54:31.982 ERROR 69005 --- [-StreamThread-1] o.a.k.s.p.i.InternalTopicManager : stream-thread [restartedMain] Unexpected error during topic creation for pairing-events-pcmv-changelog.
Error message was: org.apache.kafka.common.errors.PolicyViolationException: Topic replication factor must be 3
2020-02-12 17:54:31.986 ERROR 69005 --- [-StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [pairing-events-2d949c85-df82-4961-a849-6c6e079edf9c-StreamThread-1] Encountered the following unexpected Kafka exception during processing, this usually indicate Streams internal errors:
org.apache.kafka.streams.errors.StreamsException: Could not create topic pairing-events-pcmv-changelog.
at org.apache.kafka.streams.processor.internals.InternalTopicManager.makeReady(InternalTopicManager.java:148) ~[kafka-streams-2.3.1.jar:na]
at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.prepareTopic(StreamsPartitionAssignor.java:1049) ~[kafka-streams-2.3.1.jar:na]
at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:635) ~[kafka-streams-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:424) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:622) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1100(AbstractCoordinator.java:107) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:544) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:527) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:978) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:958) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:578) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:388) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:294) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:415) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:358) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:353) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1251) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:963) ~[kafka-streams-2.3.1.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:859) ~[kafka-streams-2.3.1.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:819) ~[kafka-streams-2.3.1.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:788) ~[kafka-streams-2.3.1.jar:na]
Caused by: org.apache.kafka.common.errors.PolicyViolationException: Topic replication factor must be 3
2020-02-12 17:54:31.986 INFO 69005 --- [-StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [pairing-events-2d949c85-df82-4961-a849-6c6e079edf9c-StreamThread-1] State transition from PARTITIONS_REVOKED to PENDING_SHUTDOWN
2020-02-12 17:54:31.986 INFO 69005 --- [-StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [pairing-events-2d949c85-df82-4961-a849-6c6e079edf9c-StreamThread-1] Shutting down
2020-02-12 17:54:31.986 INFO 69005 --- [-StreamThread-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=pairing-events-2d949c85-df82-4961-a849-6c6e079edf9c-StreamThread-1-restore-consumer, groupId=null] Unsubscribed all topics or patterns and assigned partitions
2020-02-12 17:54:31.986 INFO 69005 --- [-StreamThread-1] o.a.k.clients.producer.KafkaProducer : [Producer clientId=pairing-events-2d949c85-df82-4961-a849-6c6e079edf9c-StreamThread-1-producer] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
2020-02-12 17:54:31.991 INFO 69005 --- [-StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [pairing-events-2d949c85-df82-4961-a849-6c6e079edf9c-StreamThread-1] State transition from PENDING_SHUTDOWN to DEAD
2020-02-12 17:54:31.991 INFO 69005 --- [-StreamThread-1] org.apache.kafka.streams.KafkaStreams : stream-client [pairing-events-2d949c85-df82-4961-a849-6c6e079edf9c] State transition from REBALANCING to ERROR
2020-02-12 17:54:31.991 ERROR 69005 --- [-StreamThread-1] org.apache.kafka.streams.KafkaStreams : stream-client [pairing-events-2d949c85-df82-4961-a849-6c6e079edf9c] All stream threads have died. The instance will be in error state and should be closed.
2020-02-12 17:54:31.991 INFO 69005 --- [-StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [pairing-events-2d949c85-df82-4961-a849-6c6e079edf9c-StreamThread-1] Shutdown complete
Exception in thread "pairing-events-2d949c85-df82-4961-a849-6c6e079edf9c-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Could not create topic pairing-events-pcmv-changelog.
at org.apache.kafka.streams.processor.internals.InternalTopicManager.makeReady(InternalTopicManager.java:148)
at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.prepareTopic(StreamsPartitionAssignor.java:1049)
at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:635)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:424)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:622)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1100(AbstractCoordinator.java:107)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:544)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:527)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:978)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:958)
at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:578)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:388)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:294)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:415)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:358)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:353)
at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1251)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201)
at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:963)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:859)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:819)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:788)
Caused by: org.apache.kafka.common.errors.PolicyViolationException: Topic replication factor must be 3
任何修复方法的线索?
最佳答案
您的代理设置要求最小复制因子为 3。
您可以为绑定(bind)设置 ... topic.replication-factor
属性。
参见 Consumer Properties在 Binder 文档中。
关于apache-kafka - 卡夫卡流 : PolicyViolationException: Topic replication factor must be 3,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60194656/
根据槌documentation ,可以逐步训练主题模型: "-output-model [FILENAME] This option specifies a file to write a seri
下面我创建了一个完整的可重现示例来计算给定 DataFrame 的主题模型。 import numpy as np import pandas as pd data = pd.DataFrame({
在启动 Kafka 时出现以下错误。 KeeperErrorCode = NoNode for /brokers/topics/test-topic/partitions 任何帮助将不胜感激。 最佳答
我是这个范例的新手,所以请容忍我的愚蠢。我开始阅读这个主题是因为我正在构建一个物联网系统,该系统将使用消息队列来允许设备相互通信。 我目前正在使用 Azure 的服务总线,但是我相信我的问题通常适用于
我有三个 Kafka 经纪人( kafka_2.11-0.10.0.0 )每个经纪人的安全配置如下, listeners=PLAINTEXT://xxxx:9093,SASL_PLAINTEXT://
我使用apache-rocketmq发送消息,但出现异常。我尝试了csdn上的很多解决方案,但不起作用。现在我不知道如何处理它。 这是一个 Linux 服务器,运行 Rocketmq 4.2.0、ja
我收集了一些文档,其中大部分都是关于同一主题的,其余的基本上都是随机主题。我希望将文档分类为它们是关于“多数主题”还是这些随机“少数主题”之一。如果我在这个只有 2 个主题的语料库上使用主题建模算法会
我正在使用 gensim 包中的 LDA 算法来查找给定文本中的主题。 我被问到生成的主题将包含每个主题的不同词,例如,如果主题 A 中包含“猴子”一词,那么其他主题的列表中不应包含“猴子”一词。 到
我想使用 htaccess 更改我网站的 URL URL currently: example.com/index.php?p=my-topic-title I want to be like : e
我正在学习 Spring Boot 并制作了一个演示,但是当我发布一个添加对象的请求时,它不起作用! 错误信息是: { "timestamp": 1516897619316, "sta
背景 我们公司有由 Zookeeper 管理的 Apache Kafka。我们的 Spring Boot 应用程序之一需要检查所有可用主题的列表,并列出哪些主题启用了日志压缩 (cleanup.pol
在我的工作场所,我偶然发现了以下需要我解决的问题。尽管不是绝对需要,但首选解决方案。 有一个包含一组故事的数据库,每个故事都有一组与之关联的主题。主题存储在单独的表中,格式为 (storyid, to
我有一个由主题名称、描述组成的 Pandas 数据框 我正在尝试将其转换为主题名称、单词(出现在描述列中)及其频率 我已经试过了,但它只给了我描述的单词和频率,而不是主题名称级别。 import nl
我尝试使用以下命令删除现有主题(我使用 kafka 管理控制台检查过); #./kafka-topics.sh --zookeeper zookeeper.xx.com:2181/chroot --d
试图在一台机器上运行 Kafka 生产者,在另一台机器上运行消费者。 设置以下属性: 广告.host.name 广告端口 但是在控制台消费者上收到以下错误: bin/kafka-console-con
我正在使用 Gmail API,并尝试使用 Python 3.9 设置推送通知。当我尝试在 Gmail 收件箱上调用 watch() 时,出现错误,即使我已遵循针对类似问题给出的所有建议。错误如下:
我希望构建一个主题 map 来对内容进行分类。 例如,主题“艺术”可能有“艺术史”、“绘画”、“雕塑”等子类别。 我已经抓取了一些在线资源,但遇到了一个与我希望如何使用层次结构有关的问题。 我有很多内
我想在我的 Java 中编译 mallet(而不是使用命令行),所以我将 jar 包含在我的项目中,并引用了来自以下示例的代码:http://mallet.cs.umass.edu/topics-de
我正在尝试根据父级中的记录内容从kafka中的一个主题(父级)写入另一个主题(子级)。如果我从父主题消费,则示例记录为 {"date":{"string":"2017-03-20"},"time":{
我有一个 Kafka 生产者类,运行良好。生产者填充了 Kafka 主题。其代码如下: public class kafka_test { private final static String TO
我是一名优秀的程序员,十分优秀!