- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在尝试做一些相当简单的事情,从传入的 FlowFile 中读取 i9 PDF 表单,将其中的名字和姓氏解析为 JSON,然后将 JSON 输出到传出的 FlowFile。
我没有找到关于如何执行此操作的官方文档,但有人写了几个 cookbooks on doing things in several scripting languages in NiFi here.这看起来很简单,我很确定我正在做那里写的,但我什至不确定是否正在阅读 PDF。每次它只是将未修改的 PDF 传递给 REL_SUCCESS。
import java.nio.charset.StandardCharsets
import org.apache.pdfbox.io.IOUtils
import org.apache.pdfbox.pdmodel.PDDocument
import org.apache.pdfbox.util.PDFTextStripperByArea
import java.awt.Rectangle
import org.apache.pdfbox.pdmodel.PDPage
import com.google.gson.Gson
import java.nio.charset.StandardCharsets
def flowFile = session.get()
flowFile = session.write(flowFile, { inputStream, outputStream ->
try {
//Load Flowfile contents
PDDocument document = PDDocument.load(inputStream)
PDFTextStripperByArea stripper = new PDFTextStripperByArea()
//Get the first page
List<PDPage> allPages = document.getDocumentCatalog().getAllPages()
PDPage page = allPages.get(0)
//Define the areas to search and add them as search regions
stripper = new PDFTextStripperByArea()
Rectangle lname = new Rectangle(25, 226, 240, 15)
stripper.addRegion("lname", lname)
Rectangle fname = new Rectangle(276, 226, 240, 15)
stripper.addRegion("fname", fname)
//Load the results into a JSON
def boxMap = [:]
stripper.setSortByPosition(true)
stripper.extractRegions(page)
regions = stripper.getRegions()
for (String region : regions) {
String box = stripper.getTextForRegion(region)
boxMap.put(region, box)
}
Gson gson = new Gson()
//Remove random noise from the output
json = gson.toJson(boxMap, LinkedHashMap.class)
json = json.replace('\\n', '')
json = json.replace('\\r', '')
json = json.replace(',"', ',\n"')
//Overwrite flowfile contents with JSON
outputStream.write(json.getBytes(StandardCharsets.UTF_8))
} catch (Exception e){
System.out.println(e.getMessage())
session.transfer(flowFile, REL_FAILURE)
}
} as StreamCallback)
session.transfer(flowFile, REL_SUCCESS)
编辑:能够通过插入一个 txt 文件来确认 flowFile 对象正在被正确读取。所以问题似乎是 inputStream 永远不会被移交给 PDDocument 或者当它发生时发生了什么。我编辑了代码,尝试先将其读入 File 对象,但这导致了错误:
FlowFileHandlingException:null 在此 session 中未知
编辑编辑:通过移动我的 try/catch 解决。我似乎不明白它是如何工作的,我上面的代码已经过编辑并且可以正常工作。
最佳答案
session.get
可以返回null,所以一定要在if(!flowFile) return
后面加一行。还要将 try/catch 放在 session.write
之外,这样您就可以将 session.transfer(flowFile, REL_SUCCESS) 放在 session.write(在 try 内)和 catch< 之后
可以转移到失败。
此外,我无法从代码中看出 PDFTextStripperByArea 如何从传入文档中获取信息。看起来所有文档内容都在 try 中,因此 PDFTextStripper 不可用(并且未传入)。
这些都不能解释为什么您在 success
关系上获得原始流文件,但也许有一些我没有看到的东西会被上面的更改神奇地修复 :)
此外,如果您使用 log.info()
或 log.error()
而不是 System.out.println,您将在 NiFi 日志中看到输出(对于错误,它会向处理器发布公告,如果将鼠标悬停在处理器的右上角(如果存在公告,则为红色方 block ),您可以看到该消息。
关于java - 使用 Groovy 覆盖 NiFi 中的 FlowFile,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51770565/
例如,如果我有一个指定为隔离的 GetFile 处理器,那么来自该处理器的流文件如何分布在集群节点上? 是否需要添加任何额外的工作/处理器? 最佳答案 在今天的 Apache NiFi 中,跨集群的负
我正在使用 Apache NiFi 来摄取和预处理一些 CSV 文件,但是在长时间运行时,它总是失败。错误总是一样的: FlowFile Repository failed to update 在日志
是否可以在 @OnStopped 注释上发送流文件? 基本上,我想编写自定义处理器,它可以在处理器停止时在 flowFile 中发送一个属性。 有什么建议吗? 我在下面尝试: ProcessSessi
我需要从 NiFi 执行 HTTP Post,但我不希望/不需要请求携带所有 FlowFile 的内容。 有没有办法传递 FlowFile 的属性但不完整内容? 最佳答案 如果你的Http Post的
NiFi 新手! 我想知道是否有办法在 NiFi 中发送带有流文件属性的空流文件?我想用它作为触发器来指示某种类型的事件已经开始。 在 NiFi 中,我还有其他方法可以指示一组事件已经开始和结束吗?例
我需要执行类似的操作:sed '1d' simple.tsv > noHeader.tsv 这将从我的大流文件(> 1 GB)中删除第一行。 问题是 - 我需要在我的流程文件上执行它,所以它是: se
我试图将原始流文件作为输入发送到下一个处理器,但最终收到错误,我对 Nifi 很陌生,并且在 Java 方面也有一些经验。 public void onTrigger(ProessorContext
我有一个工作流程,其中两个或多个输入已对其执行集合操作(并集、补集等)以生成单个输出。我希望自己编写一个处理器来执行设置逻辑,但是否有可能同时处理不同来源的多个流文件并同时处理它们? 最佳答案 N
我在 Centos 7 上使用 Apache NiFi 0.4.1 和 Java 7,并尝试根据一些传入数据创建流文件。由于生成的数据量很大,我遇到了“OutOfMemoryError:Java 堆空
我正在尝试做一些相当简单的事情,从传入的 FlowFile 中读取 i9 PDF 表单,将其中的名字和姓氏解析为 JSON,然后将 JSON 输出到传出的 FlowFile。 我没有找到关于如何执行此
nifi 术语和流文件处理的新手。 找到了一种方便的方法来处理传入的 xml 并使用 XmlSlurper 对其进行解析,但是对于传入流文件的以下 GroovyScript 收到警告- 流程: 处理器
有没有办法在 NiFi 的自定义处理器中同时写入不同的流?例如,我有第三方库使用像这样工作的 API 进行重要处理: public void process(InputStream in, Outpu
我有名为 (1,3,4,5 等) 的流文件,我使用这个 ${filename} 属性来调用在线服务,然后我得到了很大的响应并将其逐行拆分,但最后我需要合并我的流文件根据他们的名字,我认为合并内容无法正
我是 NIFI 的新手,想将数据从 Kafka 推送到 S3 存储桶。我正在使用 PutS3Object 处理器,如果我将 Bucket 值硬编码为 mphdf/orderEvent,则可以将数据推送
我正在从以下 JSON/AVRO 架构生成随机数据: { "type" : "record", "namespace" : "test", "name" : "metro_data",
我正在从以下 JSON/AVRO 架构生成随机数据: { "type" : "record", "namespace" : "test", "name" : "metro_data",
我在我的自定义 NiFi 处理器中从远程 API 服务获取不同时间范围的数据。 我有时间范围全局计数器,每次迭代都会更新(我正在使用计时器驱动的调度策略)。 当计数器大于最大值时,我想仅从请求(ses
我试图在我的 python 脚本中获取流文件的属性,我已经完成了以下操作: class TransformCallback(StreamCallback): def __init__(self
这是关于Nifi中MergeContent处理器的问题。目前,我需要一次性将所有流文件与一个特定属性结合起来。 但是发生的事情是因为有太多的流文件具有相同的属性,处理器生成几个与属性合并的不同流文件,
我有一个创建一些新属性/内容的流程。我想从流文件中获取一个属性,并将其添加到文件的 JSON 内容中。 我可以使用AttributesToJSON,但这只会覆盖文件内容。 我的流程示例如下: Upda
我是一名优秀的程序员,十分优秀!