gpt4 book ai didi

python - 使用 Nifi ExecuteScript 处理器生成多个流文件

转载 作者:太空宇宙 更新时间:2023-11-03 13:33:14 29 4
gpt4 key购买 nike

我正在处理一个 Nifi 流,我在其中获取一个包含多个键值对的 JSON 文档。我将 ExecuteScript 处理器与 python 一起使用。

我的目标是基于 JSON keys 创建各种 URLS。键是数字的,它们看起来像这样:

keys = [10200, 10201, 10202, ...]

我想要的 URL 有 3 种类型,它们应该如下所示:

http://google.com/10200
http://bing.com/10200
http://yahoo.com/10200

我正在尝试遍历我的 keys[] 并为其包含的每个数字键创建 3 个特定的 url。我在尝试执行以下代码:

从列表中读取一个数字键 --> 创建 3 个 URL --> 吐出一个流文件。

......并读取列表中的下一个数字键并继续循环......

我有以下代码,但是当我给它一个 JSON 流文件时,它现在没有做任何事情。有人可以告诉我我做错了什么吗?

import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback

class ModJSON(StreamCallback):

def __init__(self):
self.parentFlowFile = None
pass
def process(self, inputStream, outputStream):
text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
obj = json.loads(text)
flowfiles_list = []

outputStream.write(bytearray(json.dumps(obj.keys(), indent=4).encode('utf-8')))


for numerical_key in obj.keys():
# create 1 flowfile for each numerical_key. Each flow file should have 3 url attributes
flowFile = session.create(self.parentFlowFile)
if (flowFile != None):
flowFile = session.write(flowFile, "Does not matter")
flowFile = session.putAttribute(flowFile, "google", "http://google.com/"+ numerical_key)

flowFile = session.putAttribute(flowFile, "google", "http://bing.com/"+ numerical_key)

flowFile = session.putAttribute(flowFile, "google", "http://yahoo.com/"+ numerical_key)
flowfiles_list.append(flowFile)

for flow in flowfiles_list:
session.transfer(flow, REL_SUCCESS)

最佳答案

问得好,这是流文件 API 回调方法的细微差别。您已创建 StreamCallback 的子类,但尚未检索输入流文件或使用它通过类的实例覆盖内容。

在你的 ModJSON 类定义之后试试这个:

originalFlowFile = session.get()
if(originalFlowFile != None):
originalFlowFile = session.write(flowFile, ModJSON())
session.remove(originalFlowFile)

这将获得一个输入流文件(或等待一个出现),然后调用您的 StreamCallback 以覆盖您的流文件的内容。在我的例子中你会丢弃你的输入流文件,所以如果这对你的用例来说是正确的行为,那么你可以只扩展 InputStreamCallback 而不是 StreamCallback 并删除 outputStream.write(),如果你没有将 outputStream 用于任何事情.为此,将 StreamCallback 替换为 InputStreamCallback 并从 process() 方法中删除“outputStream”参数。

在你的例子中,一旦你在上面添加了我的代码片段,你就会用你的 json.dumps() 命令覆盖输入内容,以及创建和传输新文件,所有这些都是相同的关系(成功),这样就可以如果它们的格式不同(这就是我添加 session.remove() 的原因),则会导致问题。如果您需要原始流文件与其他流文件建立不同的关系,请考虑 InvokeScriptedProcessor而不是执行脚本。如果您不关心处理后的输入流文件(添加 URL 属性已完成),请按照我上面的建议进行操作。如果他们都能走出同样的关系(成功),那么用

替换我的session.remove()
session.transfer(originalFlowFile, REL_SUCCESS)

查看我的 ExecuteScript 食谱文章(part 2,共 3 篇),以获取 Jython(和其他语言)中这些用例的更多示例:)

关于python - 使用 Nifi ExecuteScript 处理器生成多个流文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43105561/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com