gpt4 book ai didi

java - 在 Nifi 中创建自定义处理器并发送到 PublishKafka

转载 作者:行者123 更新时间:2023-11-30 01:40:59 26 4
gpt4 key购买 nike

有人可以为我提供一个简单的示例,用于在自定义 Nifi 处理器中设置流文件,以便可以通过 PublishKafka 处理器发送有效负载吗?

我有一个旧的消息传递协议(protocol),我为其编写了一个自定义处理器。结构非常简单,只有一个 MessageID(字符串)和 MessageBody(字节[])。我的自定义处理器可以正常接收消息来处理输入。我现在尝试将这些数据放入流文件中,以便可以将其发送到publishKafka处理器,但我在网上找不到任何有关如何执行此操作的资源。这是我当前相关部分的代码片段:

    try {
this.getLogger().info("[INFO - ListenMW] - Message Received: " +
data.getMsgID().toString() + " Size: " +
data.getMsgData().length);
this.currentSession.adjustCounter("MW Counter", 1, true);

// Setup the flowfile to transfer
FlowFile flowfile = this.currentSession.create();
flowfile = this.currentSession.putAttribute(flowfile, "key",data.getMsgID().toString());
flowfile = this.currentSession.putAttribute(flowfile, "value", new String(data.getMsgData(),StandardCharsets.UTF_8));

this.currentSession.transfer(flowfile, SUCCESS);


}catch(Exception e) {
this.getLogger().error("[INFO - ListenMW] - "+e.getMessage());
this.currentSession.adjustCounter("MW Failure", 1, true);
}

我无法确定 msgID 和 msgData 使用哪些属性,因此我现在创建了自己的属性。我看到一篇文章,其中有人建议构建您自己的 json 结构并将其作为您的有效负载发送,但是您会再次发送哪个属性,以便它将正确映射到 kafka 消息?我对卡夫卡还很陌生,到目前为止只尝试过基本的测试用例,所以请原谅我对任何错误假设的无知。

感谢您的指导!我使用的是 Kafka2.0.1 和 PublishKafka_2.0 处理器。

最佳答案

根据您分享的内容,您没有将任何内容发布到 Kafka 的主要原因似乎是您实际上没有向流文件内容写入任何内容。作为引用点,here is a copy of the javadocs对于 NiFi(也, here are the processor docs )。你应该做的是这样的:

flowFile = session.write(flowFile, outStream -> {
outStream.write("some string here".getBytes());
});

我使用 PublishKafkaRecord,但 PublishKafka 处理器在概念上非常相似。您可以按照您在此处执行的方式设置消息的键,但需要通过将其写入流文件正文来设置值。

如果不知道这里更广泛的用例,您似乎可以使用 ExecuteScript 完成您需要做的事情。请参阅this作为具有多种脚本语言引用的 ExecuteScript 的起点。

如果您需要进一步帮助,我们有多种选择 here为你。

关于java - 在 Nifi 中创建自定义处理器并发送到 PublishKafka,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59993719/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com