- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我想将 Spring Kafka 与事务一起使用,但我真的不明白它应该如何配置以及如何工作。
这是我的配置
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
props.put(ProducerConfig.RETRIES_CONFIG, String.valueOf(Integer.MAX_VALUE));
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
props.put(ProducerConfig.ACKS_CONFIG, "all");
此配置用于具有事务 ID 前缀的 DefaultKafkaProducerFactory:
defaultKafkaProducerFactory.setTransactionIdPrefix("my_app.");
问题 1:
我应该如何选择这个交易 ID 前缀?如果我理解正确的话,这个前缀被 spring 用来为每个创建的生产者生成一个事务 id。
为什么我们不能只使用“UUID.randomUUID()”?
问题 2:
如果生产者被销毁,它会生成一个新的交易ID。因此,如果应用程序崩溃,在重新启动时它将重用旧的事务 ID。
这正常吗???
问题 3:
我正在使用部署在云上的应用程序,它可以自动向上/向下扩展。这意味着我的前缀无法修复,因为每个实例上的所有生产者都会有冲突的事务 ID。
我应该在其中添加一个随机部分吗?当实例缩小/放大或崩溃并重新启动时,我是否需要恢复相同的前缀?
问题 4:
最后但同样重要的是,我们正在为我们的 Kafka 使用凭据。这似乎不起作用:
Current ACLs for resource `TransactionalId:my_app*`:
User:CN... has Allow permission for operations: All from hosts: *
在知道我的交易 ID 已生成的情况下,我应该如何设置我的 ACL?
编辑 1
进一步阅读后,如果我理解正确。
如果您有一个从 P0(分区)读取的 C0(消费者)。如果代理开始消费者再平衡。P0 可以分配给另一个消费者 C1。这个消费者 C1 应该使用与之前的 C0 相同的交易 ID 以防止重复(僵尸围栏)?
你如何在 spring-kafka 中实现这一点?事务 ID 似乎与消费者无关,因此与读取的分区无关。
谢谢
最佳答案
由于僵尸防护,您不能使用随机 TID - 如果服务器崩溃,您可能会在主题中有一个部分事务,该事务永远不会完成,并且不会从任何分区使用写入消耗更多内容对于该交易。
出于上述原因,这是设计使然。
同样,您不能随机化;出于上述原因。
例如,Cloud Foundry 有一个指示实例索引的环境变量。如果您使用的云平台不包含类似的东西,您将不得不以某种方式模拟它。然后,在事务 id 中使用它:
spring.kafka.producer.transaction-id-prefix=foo-${instance.index}-
ACL - 我无法回答这个问题;我对kafka权限不熟悉;可能最好为此问一个单独的问题。
我认为我们需要向 Spring 添加一些逻辑以确保相同的事务 ID 始终用于特定的主题/分区。
https://github.com/spring-projects/spring-kafka/issues/800#issuecomment-419501929
编辑
自从这个答案 (KIP-447) 以来,情况发生了变化;如果您的经纪人是 2.5.0 或更高版本 - 请参阅。 https://docs.spring.io/spring-kafka/docs/2.5.5.RELEASE/reference/html/#exactly-once和 https://docs.spring.io/spring-kafka/docs/2.6.0-SNAPSHOT/reference/html/#exactly-once
关于Spring Kafka 和事务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52220225/
我在 Windows 机器上启动 Kafka-Server 时出现以下错误。我已经从以下链接下载了 Scala 2.11 - kafka_2.11-2.1.0.tgz:https://kafka.ap
关于Apache-Kafka messaging queue . 我已经从 Kafka 下载页面下载了 Apache Kafka。我已将其提取到 /opt/apache/installed/kafka
假设我有 Kafka 主题 cars。 我还有一个消费者组 cars-consumers 订阅了 cars 主题。 cars-consumers 消费者组当前位于偏移量 89。 当我现在删除 cars
我想知道什么最适合我:Kafka 流或 Kafka 消费者 api 或 Kafka 连接? 我想从主题中读取数据,然后进行一些处理并写入数据库。所以我编写了消费者,但我觉得我可以编写 Kafka 流应
我曾研究过一些 Kafka 流应用程序和 Kafka 消费者应用程序。最后,Kafka流不过是消费来自Kafka的实时事件的消费者。因此,我无法弄清楚何时使用 Kafka 流或为什么我们应该使用
Kafka Acknowledgement 和 Kafka 消费者 commitSync() 有什么区别 两者都用于手动偏移管理,并希望两者同步工作。 请协助 最佳答案 使用 spring-kafka
如何在 Kafka 代理上代理 Apache Kafka 生产者请求,并重定向到单独的 Kafka 集群? 在我的特定情况下,无法更新写入此集群的客户端。这意味着,执行以下操作是不可行的: 更新客户端
我需要在 Kafka 10 中命名我的消费者,就像我在 Kafka 8 中所做的一样,因为我有脚本可以嗅出并进一步使用这些信息。 显然,consumer.id 的默认命名已更改(并且现在还单独显示了
1.概述 我们会看到zk的数据中有一个节点/log_dir_event_notification/,这是一个序列号持久节点 这个节点在kafka中承担的作用是: 当某个Broker上的LogDir出现
我正在使用以下命令: bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test.topic --property
我很难理解 Java Spring Boot 中的一些 Kafka 概念。我想针对在服务器上运行的真实 Kafka 代理测试消费者,该服务器有一些生产者已将数据写入/已经将数据写入各种主题。我想与服务
我的场景是我使用了很多共享前缀的 Kafka 主题(例如 house.door, house.room ) 并使用 Kafka 流正则表达式主题模式 API 使用所有主题。 一切看起来都不错,我得到了
有没有办法以编程方式获取kafka集群的版本?例如,使用AdminClient应用程序接口(interface)。 我想在消费者/生产者应用程序中识别 kafka 集群的版本。 最佳答案 目前无法检索
每当我尝试重新启动 kafka 时,它都会出现以下错误。一旦我删除/tmp/kafka-logs 它就会得到解决,但它也会删除我的主题。 有办法解决吗? ERROR Error while
我是 Apache Kafka 的新用户,我仍在了解内部结构。 在我的用例中,我需要从 Kafka Producer 客户端动态增加主题的分区数。 我发现了其他类似的 questions关于增加分区大
正如 Kafka 文档所示,一种方法是通过 kafka.tools.MirrorMaker 来实现这一点。但是,我需要将一个主题(比如 测试 带 1 个分区)(其内容和元数据)从生产环境复制到没有连接
我已经在集群中配置了 3 个 kafka,我正在尝试与 spring-kafka 一起使用。 但是在我杀死 kafka 领导者之后,我无法将其他消息发送到队列中。 我将 spring.kafka.bo
我的 kafka sink 连接器从多个主题(配置了 10 个任务)读取,并处理来自所有主题的 300 条记录。根据每个记录中保存的信息,连接器可以执行某些操作。 以下是触发器记录中键值对的示例: "
我有以下 kafka 流代码 public class KafkaStreamHandler implements Processor{ private ProcessorConte
当 kafka-streams 应用程序正在运行并且 Kafka 突然关闭时,应用程序进入“等待”模式,发送警告日志的消费者和生产者线程无法连接,当 Kafka 回来时,一切都应该(理论上)去恢复正常
我是一名优秀的程序员,十分优秀!