gpt4 book ai didi

java - kafka connect - 审计 - 任务完成时触发事件

转载 作者:行者123 更新时间:2023-12-02 08:31:23 27 4
gpt4 key购买 nike

我们正在使用 kafka 构建异常管理工具。将有源连接器 - 它将从物理文件中提取记录。另一方面,将会有接收器连接(mongodb-sinkconnect),它将从主题中拉取记录并将其推送到 mongoDb。一切正常。

我们需要在不同的主题中捕获事件(出于审计目的)。事件例如,

  1. 源任务(文件轮询任务)启动事件 例如,如果收到文件 A
  2. 源任务(文件轮询任务)结束事件 例如,如果文件 A 已完全处理
  3. Sink Task(推送记录到mongodb任务)启动事件 例如,文件 A 的记录开始由 mongodb-connect 处理
  4. Sink Task(推送记录到 mongodb 任务)结束事件 例如,文件 A 的记录完全推送到 MongoDB

我有几个问题:1.我们可以通过在SourceTask中实例化KafkaProducer来将事件发送到不同的主题,一旦文件被完全处理,我们就发送一个事件

public class FileSourceTask extends SourceTask {
private Producer<Key, Event> auditProducer;

public void start(Map<String, String> props) {
auditProducer = new KafkaProducer<Key, Event>(auditProps);
}

public List<SourceRecord> poll() {
List<SourceRecord> results = this.filePoller.poll();
if(results.isEmpty() && eventNotSentForCurrentFile) {
Event event = new Event();
auditProducer.send(
new ProducerRecord<Key, Event>(this.props.get("event.topic"), key, event));

}
// futher processing
}

上述方法正确吗?

  • 上述解决方案工作正常 - 因为它运行一个任务 (maxTasks = 1),但对于我们的用例来说,在接收器任务 (mongoDB connect) 中实现这一目标非常困难。由于这个topic是分区的,所以会创建很多task。我们无法跟踪接收器任务的开始事件和结束事件
  • 请提出解决此问题的方法。

    非常感谢。

    最佳答案

    我认为,你可以围绕 Kafka-connect ReST API 构建一些东西

    https://docs.confluent.io/current/connect/restapi.html#get--connectors-(string-name)-status

    但是这样,您需要持续关注连接器状态,一旦连接器的所有任务完成,您就可以采取操作。

    关于java - kafka connect - 审计 - 任务完成时触发事件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44042195/

    27 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com