gpt4 book ai didi

java - 如何关闭 Apache Kafka 连接器任务?

转载 作者:行者123 更新时间:2023-11-30 07:07:22 29 4
gpt4 key购买 nike

现在我正在使用 Apache Kafka 并有任务:我们的目录中有一些 csv 文件,它是一个小批量文件,每个文件约为 25-30 mb。我所需要的只是解析文件并将其放入 kafka。

正如我所见,Kafka 有一些有趣的东西,比如 Connector。

我可以创建 Source-Connector 和 SourceTask,但我不明白一件事:当我处理文件时,如何停止或删除我的任务?

例如,我有虚拟连接器:

public class DummySourceConnector extends SourceConnector {
private static final Logger logger = LogManager.getLogger();

@Override
public String version() {
logger.info("version");

return "1";
}

@Override
public ConfigDef config() {
logger.info("config");

return null;
}

@Override
public Class<? extends Task> taskClass() {
return DummySourceTask.class;
}

@Override
public void start(Map<String, String> props) {
logger.info("start {}", props);
}

@Override
public void stop() {
logger.info("stop");
}

@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
logger.info("taskConfigs {}", maxTasks);

return ImmutableList.of(ImmutableMap.of("key", "value"));
}

和任务:

public class DummySourceTask extends SourceTask {
private static final Logger logger = LogManager.getLogger();

private long offset = 0;

@Override
public String version() {
logger.info("version");

return "1";
}

@Override
public void start(Map<String, String> props) {
logger.info("start {}", props);
}


@Override
public List<SourceRecord> poll() throws InterruptedException {
Thread.sleep(3000);

final String value = "Offset " + offset++ + " Timestamp " + Instant.now().toString();

logger.info("poll value {}", value);

return ImmutableList.of(new SourceRecord(
ImmutableMap.of("partition", 0),
ImmutableMap.of("offset", offset),
"topic-dummy",
SchemaBuilder.STRING_SCHEMA,
value
));
}

public void stop() {
logger.info("stop");
}

但是当任务全部完成后我如何才能关闭它呢?或者也许你可以帮助我为这项任务提供另一个想法。

感谢您的帮助!

最佳答案

首先,我鼓励您查看现有的连接器 here 。我觉得 spooldir 连接器会对你有帮助。您甚至可以只下载并安装它,而无需编写任何代码。

其次,如果我理解正确的话,你想停止一个任务。我相信this discussion就是你想要的。

关于java - 如何关闭 Apache Kafka 连接器任务?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39849434/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com