gpt4 book ai didi

csv - Apache NiFi : How to compare multiple rows in a csv and create new column

转载 作者:行者123 更新时间:2023-12-03 19:23:25 30 4
gpt4 key购买 nike

我有一个看起来像这样的 csv。

Jc,TXF,timer,alpha,beta
15,44,55,12,33
18,87,33,111
9,87,61,29,77

Alpha 和 Beta 组合构成了一个城市代码。我想将城市名称作为新列添加到 csv 中。
Jc,TXF,timer,alpha,beta,city
15,44,55,12,33,York
18,87,33,111,London
9,87,61,29,77,Sydney

我有另一个只有列的 csv alpha,beta,city .看起来像这样:
alpha,beta,city
12,33,York
33,111,London
29,77,Sydney

我如何使用 Apache NiFi 实现这一点。请提出实现此目标所需的处理器和工作流程。

最佳答案

我看到有两种方法可以解决这个问题。

首先使用 CsvLookupService .然而CsvLookupService只支持一个键,但你有两个,alpha 和 beta。因此,要使用此解决方案,您必须将两个键连接成一个键,例如 12_33。

第二次使用 ExecuteScript处理器。这个更好,因为您不必修改源数据。战略:

  • 将 CSV 文本分成几行
  • 通过在映射文件
  • 中查找 alpha 和 beta 键,用 city 列丰富每一行
  • 将各个行合并到一个 CSV 文件中。

  • 总流量:

    enter image description here

    生成流文件:

    enter image description here

    拆分文本:

    enter image description here

    套装 header line count到 1 以在拆分内容中包含标题行。对于 ExecuteScript处理器将 python 设置为 scripting engine并提供以下 script body :
    from org.apache.commons.io import IOUtils
    from java.nio.charset import StandardCharsets
    from org.apache.nifi.processor.io import StreamCallback
    import csv

    # Define a subclass of StreamCallback for use in session.write()
    class PyStreamCallback(StreamCallback):
    def __init__(self):
    pass
    def process(self, inputStream, outputStream):
    # fetch the mapping CSV file
    with open('/home/nifi/mapping.csv', 'r') as mapping:
    # read the mapping file
    mappingContent = csv.reader(mapping, delimiter=',')
    # flowfile content is CSV text with two lines, header and actual content
    # split by newline to get access to each inidvidual line
    lines = IOUtils.toString(inputStream, StandardCharsets.UTF_8).split('\n')
    # the result will contain the header line
    # the result will have the additional city column
    result = lines[0] + ',city\n'
    # take the second line and split it
    # to get access to alpha, beta and city values
    lineSplit = lines[1].split(',')

    # Go through the mapping file
    # item[0] -> alpha
    # item[1] -> beta
    # item[2] -> city
    # See if you find alpha and beta on the line content
    for item in mappingContent:
    if item[0] == lineSplit[3] and item[1] == lineSplit[4]:
    result += lines[1] + ',' + item[2]
    break

    if result is None:
    raise Exception('No matching found.')
    else:
    outputStream.write(bytearray(result.encode('utf-8')))
    # end class

    flowFile = session.get()
    if(flowFile != None):
    try:
    flowFile = session.write(flowFile, PyStreamCallback())
    session.transfer(flowFile, REL_SUCCESS)
    except Exception as e:
    session.transfer(flowFile, REL_FAILURE)

    有关脚本的详细说明,请参阅注释。 /home/nifi/mapping.csv必须在您的 NiFi 实例上可用。如果您想了解更多关于 ExecuteScript处理器,请引用 ExecuteScript Cookbook .最后,您将所有行合并到一个 CSV 文件中:

    enter image description here

    设置 CSV 读取器和写入器。保留其默认属性。调整 MergeContent属性来控制每个生成的 CSV 文件中应该有多少行。结果:

    enter image description here

    关于csv - Apache NiFi : How to compare multiple rows in a csv and create new column,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58620599/

    30 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com