gpt4 book ai didi

Collecting output from Apache Beam pipeline and displaying it to console(收集来自阿帕奇光束管道的输出并将其显示到控制台)

转载 作者:bug小助手 更新时间:2023-10-28 10:07:53 25 4
gpt4 key购买 nike



I have been working on Apache Beam for a couple of days. I wanted to quickly iterate on the application I am working and make sure the pipeline I am building is error free. In spark we can use sc.parallelise and when we apply some action we get the value that we can inspect.

我已经在阿帕奇光束上工作了几天。我想快速迭代我正在工作的应用程序,并确保我正在构建的管道是没有错误的。在Spark中,我们可以使用sc.parallise,当我们应用某个操作时,我们会得到可以检查的值。



Similarly when I was reading about Apache Beam, I found that we can create a PCollection and work with it using following syntax

类似地,当我在阅读有关Apache Beam的文章时,我发现我们可以创建一个PCollection并使用以下语法来使用它



with beam.Pipeline() as pipeline:
lines = pipeline | beam.Create(["this is test", "this is another test"])
word_count = (lines
| "Word" >> beam.ParDo(lambda line: line.split(" "))
| "Pair of One" >> beam.Map(lambda w: (w, 1))
| "Group" >> beam.GroupByKey()
| "Count" >> beam.Map(lambda (w, o): (w, sum(o))))
result = pipeline.run()


I actually wanted to print the result to console. But I couldn't find any documentation around it.

我其实想把结果打印到控制台上。但我在它周围找不到任何文件。




Is there a way to print the result to console instead of saving it to a file each time?



更多回答

I have the same question as this post's. I'm working with Java and don't know how to print intermediate values onto the console. Would appreciate if anybody could help me out.

我和这篇文章有同样的问题。我正在使用Java,不知道如何将中间值打印到控制台上。如果有人能帮我的话我会很感激的。

优秀答案推荐

You don't need the temp list. In python 2.7 the following should be sufficient:

你不需要临时工名单。在python2.7中,以下内容应该足够了:



def print_row(row):
print row

(pipeline
| ...
| "print" >> beam.Map(print_row)
)

result = pipeline.run()
result.wait_until_finish()


In python 3.x, print is a function so the following is sufficient:

在python3.x中,print是一个函数,因此以下内容就足够了:



(pipeline 
| ...
| "print" >> beam.Map(print)
)

result = pipeline.run()
result.wait_until_finish()


After exploring furthermore and understanding how I can write testcases for my application I figure out the way to print the result to console. Please not that I am right now running everything to a single node machine and trying to understand functionality provided by apache beam and how can I adopt it without compromising industry best practices.

在进一步探索并了解如何为应用程序编写测试用例之后,我想出了将结果打印到控制台的方法。请注意,我现在正在一台单节点机器上运行所有东西,并试图了解由apache Beam提供的功能,以及如何才能在不影响行业最佳实践的情况下采用它。



So, here is my solution. At the very last stage of our pipeline we can introduce a map function that will print result to the console or accumulate the result in a variable later we can print the variable to see the value

所以,这是我的解决方案。在流水线的最后阶段,我们可以引入一个map函数,该函数将结果打印到控制台或将结果累积到变量中,稍后我们可以打印变量以查看值



import apache_beam as beam

# lets have a sample string
data = ["this is sample data", "this is yet another sample data"]

# create a pipeline
pipeline = beam.Pipeline()
counts = (pipeline | "create" >> beam.Create(data)
| "split" >> beam.ParDo(lambda row: row.split(" "))
| "pair" >> beam.Map(lambda w: (w, 1))
| "group" >> beam.CombinePerKey(sum))

# lets collect our result with a map transformation into output array
output = []
def collect(row):
output.append(row)
return True

counts | "print" >> beam.Map(collect)

# Run the pipeline
result = pipeline.run()

# lets wait until result a available
result.wait_until_finish()

# print the output
print output


Maybe logging info instead of print?

也许记录信息而不是打印?


def _logging(elem):
logging.info(elem)
return elem

P | "logging info" >> beam.Map(_logging)


Follow an example from pycharm Edu

效仿来自pycharm edu的例子



import apache_beam as beam

class LogElements(beam.PTransform):
class _LoggingFn(beam.DoFn):

def __init__(self, prefix=''):
super(LogElements._LoggingFn, self).__init__()
self.prefix = prefix

def process(self, element, **kwargs):
print self.prefix + str(element)
yield element

def __init__(self, label=None, prefix=''):
super(LogElements, self).__init__(label)
self.prefix = prefix

def expand(self, input):
input | beam.ParDo(self._LoggingFn(self.prefix))

class MultiplyByTenDoFn(beam.DoFn):

def process(self, element):
yield element * 10

p = beam.Pipeline()

(p | beam.Create([1, 2, 3, 4, 5])
| beam.ParDo(MultiplyByTenDoFn())
| LogElements())

p.run()


Output

输出



10
20
30
40
50
Out[10]: <apache_beam.runners.portability.fn_api_runner.RunnerResult at 0x7ff41418a210>


with beam.Pipeline() as pipeline:
lines = pipeline | beam.Create(["this is test", "this is another test"])
word_count = (lines
| "Word" >> beam.ParDo(lambda line: line.split(" "))
| "Pair of One" >> beam.Map(lambda w: (w, 1))
| "Group" >> beam.GroupByKey()
| "Count" >> beam.Map(lambda o: (o[0],str(sum(o[1])))))
word_count | beam.ParDo(lambda x: print(x))
result = pipeline.run()

This works like a charm !

这就像一种护身符!


Output:
('this', '2')
('is', '2')
('test', '2')
('another', '1')
('this', '2')
('is', '2')
('test', '2')
('another', '1')

输出:(‘This’,‘2’)(‘is’,‘2’)(‘test’,‘2’)(‘Another’,‘1’)(‘This’,‘2’)(‘is’,‘2’)(‘test’,‘2’)(‘Another’,‘1’)



I know it isn't what you asked for but why don't you store it to a text file? It's always better than printing it via stdout and it isn't volatile

我知道这不是你想要的,但你为什么不把它存储到文本文件中呢?它总是比通过标准输出打印它更好,而且它不会挥发


更多回答

Note that if you try to add this in the middle of your pipeline, you may get the error TypeError: 'NoneType' object is not subscriptable from your pipeline. This is because print returns None, which gets passed along to your following instructions. In this case you will need somewhat different code, to print the value and then return it.

请注意,如果您尝试在管道中间添加此代码,可能会收到错误TypeError:‘NoneType’对象无法从您的管道订阅。这是因为print返回NONE,它将传递给您的以下指令。在这种情况下,您需要稍有不同的代码来打印值,然后返回它。

Nice idea, but this won't work if your pipeline is executed in a distributed manner as for instance in Apache Yarn (Hadoop) or within Google Dataflow. There must be another way to collect the results. But I'm still searching for it.

好主意,但如果您的管道是以分布式方式执行的,例如在ApacheYarn(Hadoop)或Google Dataflow中,这将不起作用。肯定还有其他方法来收集结果。但我仍在寻找它。

When I am using pipeline.run() I am getting this error - 'PBegin' object has no attribute 'windowing'

当我使用bineline.run()时,我收到这个错误-‘pbegin’对象没有‘windowing’属性

This is great for unit tests in a DirectRunner.

这对于DirectRunner中的单元测试非常有用。

In the more general case of not printing, but having the value available in runtime, I do have a use case (although I might be using it wrong). In the context of Tensorflow and Tensorflow Transform which I am dealing with, I wanted to count during the transform context, which uses Beam, and then use this value in operations during training. So keeping the count in memory is more handy than saving it to file and loading it again. But as said, this is not printing.

在更一般的情况下,不打印,但在运行时拥有可用的值,我确实有一个用例(尽管我可能用错了它)。在我正在处理的TensorFlow和TensorFlow转换的上下文中,我想要在Transform上下文(使用Beam)期间进行计数,然后在训练期间的操作中使用此值。因此,将计数保存在内存中比将其保存到文件并再次加载要方便得多。但正如所说,这不是印刷。

This is more of a comment than an answer

与其说这是一个回答,不如说是一个评论

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com