gpt4 book ai didi

json - Apache NiFi ExecuteScript : Groovy script to replace Json values via a mapping file

转载 作者:行者123 更新时间:2023-12-02 05:19:31 26 4
gpt4 key购买 nike

我正在使用 Groovy 脚本上的 Apache NiFi 0.5.1 以将传入的 Json 值替换为映射文件中包含的值。映射文件如下所示(它是一个简单的 .txt):

Header1;Header2;Header3
A;some text;A2

我从以下开始:
import groovy.json.JsonBuilder 
import groovy.json.JsonSlurper
import java.nio.charset.StandardCharsets

def flowFile = session.get();
if (flowFile == null) {
return;
}

flowFile = session.write(flowFile,
{ inputStream, outputStream ->

def content = """
{
"field1": "A"
"field2": "A",
"field3": "A"

}"""

def slurped = new JsonSlurper().parseText(content)
def builder = new JsonBuilder(slurped)
builder.content.field1 = "A"
builder.content.field2 = "some text"
builder.content.field3 = "A2"
outputStream.write(builder.toPrettyString().getBytes(StandardCharsets.UTF_8))
} as StreamCallback)
session.transfer(flowFile, ExecuteScript.REL_SUCCESS)

这第一步工作得很好,尽管它是硬编码的,而且远非理想。我最初的想法是使用 ReplaceTextWithMapping 来执行替换,但是它不适用于复杂的映射文件(例如多列)。我想更进一步,但我不知道如何去做。首先,我不想传入整个硬编码的 JSON,而是想读取传入的流文件。 NiFi怎么可能?在将脚本作为 ExecuteScript 的一部分运行之前,我已经通过 UpdateAttribute 输出了一个包含内容的 .Json 文件,其中文件名 = myResultingJSON.json。此外,我知道如何使用 Groovy ( String mappingContent= new File('/path/to/file').getText('UTF-8' ) 加载 .txt 文件,但是如何使用加载的文件执行替换,以便生成的 JSON 如下所示:
{ 
"field1": "A"
"field2": "some text",
"field3": "A2"
}

感谢您的帮助,

一世。

编辑:

对脚本的第一次修改确实允许我从 InputStream 中读取:
import groovy.json.JsonBuilder
import groovy.json.JsonSlurper

import java.nio.charset.StandardCharsets

def flowFile = session.get();
if (flowFile == null) {
return;
}

flowFile = session.write(flowFile,
{ inputStream, outputStream ->

def content = org.apache.commons.io.IOUtils.toString(inputStream, java.nio.charset.StandardCharsets.UTF_8)

def slurped = new JsonSlurper().parseText(content)
def builder = new JsonBuilder(slurped)
builder.content.field1 = "A"
builder.content.field2 = "some text"
builder.content.field3 = "A2"
outputStream.write(builder.toPrettyString().getBytes(StandardCharsets.UTF_8))
} as StreamCallback)
session.transfer(flowFile, ExecuteScript.REL_SUCCESS)

然后,我开始使用 ConfigSlurper 测试该方法,并在将逻辑注入(inject) Groovy ExecuteScript 之前编写了一个通用类:
class TestLoadingMappings {

static void main(String[] args) {

def content = '''
{"field2":"A",
"field3": "A"
}
'''

println "This is the content of the JSON file" + content

def slurped = new JsonSlurper().parseText(content)
def builder = new JsonBuilder(slurped)

println "This is the content of my builder " + builder

def propertiesFile = new File("D:\\myFile.txt")
Properties props = new Properties()
props.load(new FileInputStream(propertiesFile))
def config = new ConfigSlurper().parse(props).flatten()

println "This is the content of my config " + config

config.each { k, v ->
if (builder[k]) {
builder[k] = v
}
}
println(builder.toPrettyString())
}

}

我返回一个 groovy.lang.MissinPropertyException ,这是因为映射不是那么简单。所有字段/属性(从字段 1 到字段 3)以相同的值(例如)进入 InpuStream,这意味着每次字段 2 具有该值时,您都可以确定它对其他两个属性有效。但是,我不能有映射“field2”:“someText”的映射字段,因为实际映射是由映射文件中的第一个值驱动的。这里有一个例子:
{ 
"field1": "A"
"field2": "A",
"field3": "A"

}

在我的映射文件中,我有:
A;some text;A2

但是,如果您愿意,field1 需要映射到 A(文件中的第一个值)或保持不变。 Field2 需要映射到最后一列 (A2) 中的值,最后 Field3 需要映射到中间列中的“一些文本”。

你能帮忙吗?这是我可以用 Groovy 和 ExecuteScript 实现的吗?如果需要,我可以将配置文件分成两个。

另外,我快速浏览了另一个选项(PutDistributedMapCache),我不确定我是否了解如何将键值对加载到分布式 map 缓存中。看起来您需要一个 DistributedMapCacheClient ,但我不确定这是否易于实现。

谢谢!

编辑2:

其他一些进展,我现在映射工作,但不知道为什么它在读取属性文件的第二行时失败:
"A" someText
"A2" anotherText

class TestLoadingMappings {

static void main(String[] args) {

def content = '''
{"field2":"A",
"field3":"A"
}
'''

println "This is the content of the JSON file" + content

def slurper = new JsonSlurper().parseText(content)
def builder = new JsonBuilder(slurper)

println "This is the content of my builder " + builder

assert builder.content.field2 == "A"
assert builder.content.field3 == "A"

def propertiesFile = new File('D:\\myTest.txt')
Properties props = new Properties()
props.load(new FileInputStream(propertiesFile))
println "This is the content of the properties " + props
def config = new ConfigSlurper().parse(props).flatten()

config.each { k, v ->
if (builder.content.field2) {

builder.content.field2 = config[k]
}
if (builder.content.field3) {

builder.content.field3 = config[k]
}

println(builder.toPrettyString())
println "This is my builder " + builder
}
}
}

我返回: This is my builder {"field2":"someText","field3":"someText"}
知道为什么吗?

非常感谢

编辑 3(从下面移动)

我写了以下内容:
    import groovy.json.JsonBuilder
import groovy.json.JsonSlurper

class TestLoadingMappings {

static void main(String[] args) {

def content =
'''
{"field2":"A",
"field3":"A"
}
'''
def slurper = new JsonSlurper().parseText(content)
def builder = new JsonBuilder(slurper)

println "This is the content of my builder " + builder

def propertiesFile = new File('D:\\properties.txt')
Properties props = new Properties()
props.load(new FileInputStream(propertiesFile))
def conf = new ConfigSlurper().parse(props).flatten()

conf.each { k, v ->
if (builder.content[k]) {
builder.content[k] = v
}
println("This prints the resulting JSON :" + builder.toPrettyString())
}
}
}

但是,我必须更改映射文件的结构,如下所示:
"field1"="substitutionText"
"field2"="substitutionText2"

然后我将 ConfigSlurper “合并”到 ExecuteScript 脚本中,如下所示:
import groovy.json.JsonBuilder
import groovy.json.JsonSlurper
import org.apache.commons.io.IOUtils
import org.apache.nifi.processor.io.StreamCallback

import java.nio.charset.StandardCharsets

def flowFile = session.get();
if (flowFile == null) {
return;
}

flowFile = session.write(flowFile,
{ inputStream, outputStream ->

def content = IOUtils.toString(inputStream, StandardCharsets.UTF_8)

def slurped = new JsonSlurper().parseText(content)
def builder = new JsonBuilder(slurped)
outputStream.write(builder.toPrettyString().getBytes(StandardCharsets.UTF_8))

def propertiesFile = new File(''D:\\properties.txt')
Properties props = new Properties()
props.load(new FileInputStream(propertiesFile))
def conf = new ConfigSlurper().parse(props).flatten();

conf.each { k, v ->
if (builder.content[k]) {
builder.content[k] = v
}
}
outputStream.write(content.toString().getBytes(StandardCharsets.UTF_8))
} as StreamCallback)
session.transfer(flowFile, ExecuteScript.REL_SUCCESS)

问题似乎是这样一个事实,即我无法通过使用类似于在我的 TestLoadingMappings 中创建的逻辑来真正复制原始映射文件中的逻辑。正如我之前的评论/编辑中提到的,映射应该以这种方式工作:

field2 = if A then 替换为“一些文本”

field3 = 如果 A 则替换为 A2

...

field2 = B 然后替换为“其他一些文本”

field3 = B 然后替换为 B2

和儿子。

简而言之,映射由 InputStream 中的传入值驱动(变化),根据 JSON 属性有条件地映射到不同的值。您能否推荐一种通过 Groovy/ExecuteScript 实现此映射的更好方法?我可以灵活地修改映射文件,您能看到我可以更改它以实现所需映射的方法吗?

谢谢

最佳答案

我有一些关于如何读取包含 JSON 的流文件的示例:

http://funnifi.blogspot.com/2016/02/executescript-explained-split-fields.html
http://funnifi.blogspot.com/2016/05/validating-json-in-nifi-with.html
http://funnifi.blogspot.com/2016/02/executescript-processor-replacing-flow.html

你在上面得到了正确的结构;基本上你可以在闭包中使用“inputStream”变量来读取传入的流文件内容。如果您想一次全部阅读(您可能需要为 JSON 执行此操作),您可以使用 IOUtils.toString() 后跟 JsonSlurper,如上面链接中的示例中所做的那样。

对于您的映射文件,特别是如果您的 JSON 是“平面”的,您可以有一个 Java 属性文件,将字段名称映射到新值:

field2=一些文本

字段3=A2

查看 ConfigSlurper用于读取属性文件。

一旦你读入了传入的 JSON 文件并读入了映射文件,你就可以使用数组表示法而不是直接成员表示法来获取 JSON 的各个字段。因此,假设我将属性读入 ConfigSlurper,并且我想用属性文件中的属性覆盖输入 JSON(例如称为“json”)中的任何现有属性。这可能如下所示:

config.parse(props).flatten().each { k,v ->
if(json[k]) {
json[k] = v
}
}

然后您可以继续使用您的 outputStream.write()。

除了从文件中读取映射之外,您还可以通过 PutDistributedMapCache 将其加载到分布式缓存中。处理器。您可以从 ExecuteScript 中的 DistributedCacheMapServer 读取数据,这里有一个示例:

http://funnifi.blogspot.com/2016/04/inspecting-your-nifi.html

如果您的映射很复杂,您可能需要使用 TransformJSON 处理器,它将在 NiFi 的下一版本(0.7.0)中提供。相关的 Jira 案例在这里:

https://issues.apache.org/jira/browse/NIFI-361

编辑 :

在回应您的编辑时,我没有意识到您对各种值有多个规则。在这种情况下,属性文件可能不是表示映射的最佳方式。相反,您可以使用 JSON:
{
"field2": {
"A": "some text",
"B": "some other text"
},
"field3": {
"A": "A2",
"B": "B2"
}
}

然后您可以使用 JSONSlurper 读取映射文件。以下是使用上述映射文件的示例:
import groovy.json.JsonBuilder
import groovy.json.JsonSlurper
import org.apache.commons.io.IOUtils
import org.apache.nifi.processor.io.StreamCallback

import java.nio.charset.StandardCharsets

def flowFile = session.get();
if (flowFile == null) {
return;
}

def mappingJson = new File('/Users/mburgess/mappings.json').text

flowFile = session.write(flowFile, { inputStream, outputStream ->

def content = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
def inJson = new JsonSlurper().parseText(content)
def mappings = new JsonSlurper().parseText(mappingJson)

inJson.each {k,v ->
inJson[k] = mappings[k][v]
}
outputStream.write(inJson.toString().getBytes(StandardCharsets.UTF_8))
} as StreamCallback)

session.transfer(flowFile, REL_SUCCESS)

关于json - Apache NiFi ExecuteScript : Groovy script to replace Json values via a mapping file,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37577453/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com