- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我对 NiFi 比较陌生,不确定如何正确执行以下操作。我想使用 ExecuteScript
处理器(脚本引擎:python)来执行以下操作(请仅在 python 中):
1) 有一个包含以下信息的CSV文件(第一行是标题):
first,second,third
1,4,9
7,5,2
3,8,7
2) 我想找到各行的总和并生成一个带有修改后的标题的最终文件。最终文件应如下所示:
first,second,third,total
1,4,9,14
7,5,2,14
3,8,7,18
对于python脚本,我这样写:
def summation(first,second,third):
numbers = first + second + third
return numbers
flowFile = session.get()
if (flowFile != None):
flowFile = session.write(flowFile, summation())
但它不起作用,我不确定如何解决这个问题。谁能告诉我如何解决这个问题?
谢谢
最佳答案
您的脚本没有执行您希望它执行的操作。有几种方法可以解决这个问题:
我将对您的脚本进行更改以立即处理整个流文件内容;您可以阅读有关 Record*
处理器的更多信息 here , here , 和 here .
这是一个执行您期望的操作的脚本。请注意差异以查看我在哪里进行了更改(此脚本当然可以变得更加高效和简洁;演示正在发生的事情是冗长的,而且我不是 Python 专家)。
import json
from java.io import BufferedReader, InputStreamReader
from org.apache.nifi.processor.io import StreamCallback
# This PyStreamCallback class is what the processor will use to ingest and output the flowfile content
class PyStreamCallback(StreamCallback):
def __init__(self):
pass
def process(self, inputStream, outputStream):
try:
# Get the provided inputStream into a format where you can read lines
reader = BufferedReader(InputStreamReader(inputStream))
# Set a marker for the first line to be the header
isHeader = True
try:
# A holding variable for the lines
lines = []
# Loop indefinitely
while True:
# Get the next line
line = reader.readLine()
# If there is no more content, break out of the loop
if line is None:
break
# If this is the first line, add the new column
if isHeader:
header = line + ",total"
# Write the header line and the new column
lines.append(header)
# Set the header flag to false now that it has been processed
isHeader = False
else:
# Split the line (a string) into individual elements by the ',' delimiter
elements = self.extract_elements(line)
# Get the sum (this method is unnecessary but shows where your "summation" method would go)
sum = self.summation(elements)
# Write the output of this line
newLine = ",".join([line, str(sum)])
lines.append(newLine)
# Now out of the loop, write the output to the outputStream
output = "\n".join([str(l) for l in lines])
outputStream.write(bytearray(output.encode('utf-8')))
finally:
if reader is not None:
reader.close()
except Exception as e:
log.warn("Exception in Reader")
log.warn('-' * 60)
log.warn(str(e))
log.warn('-' * 60)
raise e
session.transfer(flowFile, REL_FAILURE)
def extract_elements(self, line):
# This splits the line on the ',' delimiter and converts each element to an integer, and puts them in a list
return [int(x) for x in line.split(',')]
# This method replaces your "summation" method and can accept any number of inputs, not just 3
def summation(self, list):
# This returns the sum of all items in the list
return sum(list)
flowFile = session.get()
if (flowFile != None):
flowFile = session.write(flowFile,PyStreamCallback())
session.transfer(flowFile, REL_SUCCESS)
我的流程的结果(使用您在 GenerateFlowFile
处理器中的输入):
2018-07-20 13:54:06,772 INFO [Timer-Driven Process Thread-5] o.a.n.processors.standard.LogAttribute LogAttribute[id=b87f0c01-0164-1000-920e-799647cb9b48] logging for flow file StandardFlowFileRecord[uuid=de888571-2947-4ae1-b646-09e61c85538b,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1532106928567-1, container=default, section=1], offset=2499, length=51],offset=0,name=470063203212609,size=51]
--------------------------------------------------
Standard FlowFile Attributes
Key: 'entryDate'
Value: 'Fri Jul 20 13:54:06 EDT 2018'
Key: 'lineageStartDate'
Value: 'Fri Jul 20 13:54:06 EDT 2018'
Key: 'fileSize'
Value: '51'
FlowFile Attribute Map Content
Key: 'filename'
Value: '470063203212609'
Key: 'path'
Value: './'
Key: 'uuid'
Value: 'de888571-2947-4ae1-b646-09e61c85538b'
--------------------------------------------------
first,second,third,total
1,4,9,14
7,5,2,14
3,8,7,18
关于apache-nifi - 如何使用ExecuteScript(以python为脚本引擎)进行加法练习? [尝试学习NiFi的新手用户],我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51430116/
我的问题是关于脚本文件运行的顺序(顺序)。 我调用一个名为“aaa.js”的脚本文件,并作为 executeScript 的回调,我有文件脚本“bbb.js”,如下所示是以下草图代码: chrome.
我正在使用 Ionic 和 ngCordova 创建构建混合应用程序。在应用程序登录工作流程中,我将一个页面从服务器返回到 InAppBrowser 插件。我需要从 Ionic 应用程序的返回页面中读
我正在编写一个Chrome扩展程序,需要根据某些给定参数修改特定域中的页面,这需要XSS才能获取,因此简单地使用内容脚本似乎是不可能的。因此,我决定使用 tabs.executeScript 注入(i
我正在尝试将值放入新选项卡上的元素中,然后单击“提交”登录。我尝试了很多变体,但似乎无法让它发挥作用。 function injectScript(){ var script = ' var e =
如何将参数传递到 browser.executeScript static sortableDragAndDropByClassName(dragElementClassName: string,
目前我正在使用以下代码将警报语句注入(inject)网页,打印出我传入的变量。脚本如下: //Inject statement setting the variable "count" in th
我正在编写一个 JavaFX 应用程序,它使用 WebView 和 WebEngine(.executeScript() 方法)与 JavaScript 交互。 在这里,我有来自 Medow.java
所以我有一个正在编写的扩展程序,我试图在用户单击 pageAction 图标时执行脚本。单击图标时,该方法调用 chrome.tabs.executeScript(...)。问题是 chrome.ta
我正在尝试从后台脚本向选定的选项卡上下文中注入(inject)一些代码,但我遇到了一些权限问题。 list .json { "manifest_version": 2, "name": "pr
是否可以将匿名函数传递给 chrome API executeScript 调用?目前我有以下代码: chrome.tabs.executeScript(tab.id, {code: "documen
我有一个应用程序,它使用 WebView 及其支持的 WebEngine 来显示具有一些 JavaScript 函数的简单网页。我还有另一个线程,有时会从互联网接收消息。我希望它们使用 webengi
我想为我的单元测试加载 SQL 脚本文件。当我使用 Spring 2.5.2 时,我 decided使用 SimpleJdbcTestUtils.executeScript()使用以下代码加载我的脚本
我希望在测试期间使用 browser.executescript 动态设置一些数据。像这样的东西: var x; browser.executeScript(function () { var s
我正在尝试使用 Groovy 登录 ExecuteScript 组件。每当我调用 log.info 时,没有任何内容写入 nifi-app.log,但是当我调用 log.errorit 时。是否有一些
为什么 Chrome 的这个功能不起作用?我正在尝试这个例子: https://developer.chrome.com/docs/extensions/mv3/content_scripts/#pr
我在点击事件中有这个代码 let browser = new InAppBrowser('http://example.com/test', '_self'); browser.executeScri
我正在尝试编写一个 chrome 扩展,它有一个名为“btn3”的按钮。当我在 chrome 扩展 (popup.html) 中点击那个按钮时,它会点击网页上的一个按钮。网页上的按钮具有以下 id:“
我正在尝试学习一些有关 javascript 的知识,并遇到了这个问题。基本上我想做的是获取 url ,重定向到代码中的站点并将其放在文本框中并激活按钮并不是说所有这些都是为了获得答案,而是作为我的目
这个问题已经有答案了: "Cannot read property of undefined" when using chrome.tabs or other chrome API in conten
我一直在尝试从 nifi 的 ExecuteScript 处理器中提取数据并将其作为属性附加到流文件。我尝试了很多来源,尤其是 Matt Burgess 的 funnifi 博客上的一个。 以下是我的
我是一名优秀的程序员,十分优秀!