- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在使用 Kafka Connect 从 Kafka Broker (v0.10.2) 获取消息,然后将其同步到下游服务。
目前,我在 SinkTask#put
中有代码将处理 SinkRecord
& 然后将其持久化到下游服务。
几个关键要求,
SinkTask#flush
通过抛出异常或告诉 Connect 不要提交偏移量,而是在下一次轮询中重试,有效地退出为接收到的消息的特定轮询/周期提交偏移量。
flush
实际上是基于时间的 & 或多或少独立于民意调查 & 当它达到某个时间阈值时,它将提交偏移量。
SinkTask#preCommit
被引入,所以我们认为我们可以将它用于我们的目的。但文档中没有提到
SinkTask#put
之间存在 1:1 的关系。 &
SinkTask#preCommit
.
commit offsets
尽快下单
put succeeds
.同样,
不是 提交偏移量,如果是特定的
put
失败的。
SinkTask#preCommit
如何做到这一点?
最佳答案
正确地将数据传入和传出 Kafka 可能具有挑战性,而 Kafka Connect 使这变得更容易,因为它使用最佳实践并隐藏了许多复杂性。对于接收器连接器,Kafka Connect 从主题读取消息,将它们发送到您的连接器,然后定期提交已读取和处理的各种主题分区的最大偏移量。
请注意,“将它们发送到您的连接器”对应于 put(Collection<SinkRecord>)
方法,并且在 Kafka Connect 提交偏移量之前可能会多次调用此方法。您可以控制 Kafka Connect 提交偏移量的频率,但 Kafka Connect 确保它只会在连接器成功处理该消息时提交该消息的偏移量。
当连接器正常运行时,一切都很好,即使定期提交偏移量,您的连接器也会看到每条消息一次。但是,如果连接器失败,那么当它重新启动时,连接器将从上次提交的偏移量开始。这可能意味着您的连接器会看到它在崩溃前处理的一些相同的消息。如果您仔细编写连接器以使其至少具有一次语义,这通常不是问题。
为什么 Kafka Connect 会定期提交偏移量而不是每条记录?因为它可以节省大量工作,并且在名义上进行时并不重要。只有当出现问题时,偏移滞后才重要。即便如此,如果您让 Kafka Connect 处理偏移量,您的连接器至少需要准备好处理一次消息。恰好一次是可能的,但您的连接器必须做更多的工作(见下文)。
写作记录
您在编写连接器时有很大的灵活性,这很好,因为在很大程度上取决于它所写入的外部系统的功能。让我们看看不同的实现方式 put
和 flush
.
如果系统支持事务或可以处理一批更新,您的连接器的 put(Collection<SinkRecord>)
可以使用单个事务/批处理写入该集合中的所有记录,根据需要重试多次,直到事务/批处理完成或最终抛出错误之前。在这种情况下,put
做所有的工作,要么成功,要么失败。如果成功,那么 Kafka Connect 知道所有记录都得到了正确处理,因此可以(在某个时候)提交偏移量。如果您的 put
调用失败,然后 Kafka Connect 假定不知道是否处理了任何记录,因此它不会更新其偏移量并停止您的连接器。您的连接器 flush(...)
什么都不用做,因为 Kafka Connect 正在处理所有的偏移量。
如果系统不支持事务,而您一次只能提交一个项目,则您可能拥有连接器的 put(Collection<SinkRecord>)
尝试单独写出每个记录,阻塞直到它成功并在抛出错误之前根据需要重试每个记录。再次,put
完成所有工作,flush
方法可能不需要做任何事情。
到目前为止,我的示例完成了 put
中的所有工作。 .您始终可以选择拥有 put
简单地缓冲记录并改为在 flush
中完成写入外部服务的所有工作或 preCommit
.您可能这样做的原因之一是您的写入是基于时间的,就像 flush
一样。和 preCommit
.如果您不希望您的写入是基于时间的,您可能不想在 flush
中执行写入操作。或 preCommit
.
记录偏移或不记录
如上所述,默认情况下 Kafka Connect 会定期记录偏移量,以便在重新启动时连接器可以从上次停止的地方开始。
但是,有时需要连接器记录外部系统中的偏移量,尤其是当可以原子方式完成时。当这样的连接器启动时,它可以查看外部系统以找出最后写入的偏移量,然后可以告诉 Kafka Connect 它想要从哪里开始读取。使用这种方法,您的连接器可以只处理一次消息。
当接收器连接器这样做时,它们实际上根本不需要 Kafka Connect 来提交任何偏移量。 flush
方法只是让连接器知道 Kafka Connect 正在为您提交哪些偏移量的机会,并且由于它不返回任何内容,因此无法修改这些偏移量或告诉 Kafka Connect 连接器正在处理哪些偏移量。
这是preCommit
的地方方法进来了,真的是flush
的替代品(它实际上采用与 flush
相同的参数),除了预计返回 Kafka Connect 应该提交的偏移量。默认情况下,preCommit
只需拨打 flush
然后返回传递给 preCommit
的相同偏移量,这意味着 Kafka Connect 应该提交它通过 preCommit
传递给连接器的所有偏移量。 .但是如果您的 preCommit
返回一组空的偏移量,然后 Kafka Connect 将根本不记录任何偏移量。
因此,如果您的连接器要处理外部系统中的所有偏移并且不需要 Kafka Connect 来记录任何内容,那么您应该覆盖 preCommit
方法而不是 flush
,并返回一组空的偏移量。
关于apache-kafka - 在 SinkTask 中完成 "commit offsets"后立即使用 Kafka Connect HOWTO "put",我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45200585/
使用 caret::train() 运行逻辑回归模型时出现问题。LR = caret::train(Satisfaction ~., data= log_train, method = "glm",
我正在尝试将nginx容器作为我所有网站和Web服务的主要入口点。我设法将portainer作为容器运行,并且可以从互联网上访问它。现在,我正在尝试访问由另一个Nginx容器托管的静态网站,但这样做失
我有一个在 Windows XP SP3 x86 上运行的 Visual Studio 2008 C# .NET 3.5 应用程序。在我的应用程序中,我有一个事件处理程序 OnSendTask 可以同
我在 Eclipse 中创建了作为独立程序执行的此类,它可以毫无问题地连接所有 http URL(例如:http://stackoverflow.com),但是当我尝试连接到 https(例如 htt
我在我的 nginx 错误日志中收到大量以下错误: connect() failed (111: Connection refused) while connecting to upstream 我的
我正在尝试将新的 log4j2 与 Socket Appender 一起使用,但我有点不走运。这是我的 XML 配置文件:
我目前正在尝试寻找 Android 应用程序后端的替代方案。目前,我使用 php servlet 来查询 Mysql 数据库。数据库(Mysql)托管在我大学的计算机上,因此我无法更改任何配置,因为我
类MapperExtension有一些方法,before_insert, before_update, ...都有一个参数connection. def before_insert(self, map
嗨,我正在尝试更改位于连接库 (v 5.5) 中的文档的文档所有者,我仍在等待 IBM 的回复,但对我来说可能需要太长时间,这就是我尝试的原因逆向工程。 我尝试使用标准编辑器 POST 请求将编辑器更
我在 nginx( http://52.xx.xx.xx/ )上访问我的 IP 时遇到 502 网关错误,日志只是这样说: 2015/09/18 13:03:37 [error] 32636#0: *
我要实现 Connected-Component Labeling但我不确定我应该以 4-connected 还是 8-connected 的方式来做。我已经阅读了大约 3 种 Material ,但
我在Resources ->JMS ->Connection Factories下有两个连接工厂。 1) 连接工厂 2)集成连接工厂 我想修改两个连接工厂下连接池的最大连接数。资源 ->JMS ->连
我在将 mongoengine 合并到我的 django 应用程序时遇到问题。以下是我收到的错误: Traceback (most recent call last): File "/home/d
上下文 我正在关注 tutorial on writing a TCP server last week in Real World Haskell .一切顺利,我的最终版本可以正常工作,并且能够在
我在访问我的域时遇到了这个问题:我看到了我的默认 http500 错误 django 模板正在显示。 我有 gunicorn 设置: command = '/usr/local/bin/gunicor
我更换了电脑,并重新安装了所有版本:tomcat 8 和 6、netbeans 8、jdk 1.7、hibernate 4.3.4,但是当我运行 Web 应用程序时,出现此错误。过去使用我的旧电脑时,
您好,我是这个项目的新手,我在 CentOS7 ec2 实例上托管它时遇到问题。当我访问我的域时出现此错误: 2017/02/17 05:53:35 [error] 27#27: *20 connec
在开始之前,我已经查看了所有我能找到的类似问题,但没有找到解决我的问题的方法。 我正在运行 2 个 docker 容器,1 个用于 nginx,1 个用于 nodejs api。我正在使用 nginx
使用 debian 包将 kaa -iot 平台配置为单节点时。我收到以下错误。 himanshu@himpc:~/kaa/deb$ sudo dpkg -i kaa-node-0.10.0.deb
我是我公司开发团队的成员,担任管理员角色。我可以通过 https://developer.apple.com/ 访问团队的成员(member)中心 但是,当我尝试在 https://itunescon
我是一名优秀的程序员,十分优秀!