gpt4 book ai didi

Collecting output from Apache Beam pipeline and displaying it to console(收集来自阿帕奇光束管道的输出并将其显示到控制台)

转载 作者:bug小助手 更新时间:2023-10-28 10:07:53 25 4
gpt4 key购买 nike

I have been working on Apache Beam for a couple of days. I wanted to quickly iterate on the application I am working and make sure the pipeline I am building is error free. In spark we can use sc.parallelise and when we apply some action we get the value that we can inspect.


Similarly when I was reading about Apache Beam, I found that we can create a PCollection and work with it using following syntax

类似地,当我在阅读有关Apache Beam的文章时,我发现我们可以创建一个PCollection并使用以下语法来使用它

with beam.Pipeline() as pipeline:
lines = pipeline | beam.Create(["this is test", "this is another test"])
word_count = (lines
| "Word" >> beam.ParDo(lambda line: line.split(" "))
| "Pair of One" >> beam.Map(lambda w: (w, 1))
| "Group" >> beam.GroupByKey()
| "Count" >> beam.Map(lambda (w, o): (w, sum(o))))
result =

I actually wanted to print the result to console. But I couldn't find any documentation around it.


Is there a way to print the result to console instead of saving it to a file each time?


I have the same question as this post's. I'm working with Java and don't know how to print intermediate values onto the console. Would appreciate if anybody could help me out.



You don't need the temp list. In python 2.7 the following should be sufficient:


def print_row(row):
print row

| ...
| "print" >> beam.Map(print_row)

result =

In python 3.x, print is a function so the following is sufficient:


| ...
| "print" >> beam.Map(print)

result =

After exploring furthermore and understanding how I can write testcases for my application I figure out the way to print the result to console. Please not that I am right now running everything to a single node machine and trying to understand functionality provided by apache beam and how can I adopt it without compromising industry best practices.

在进一步探索并了解如何为应用程序编写测试用例之后,我想出了将结果打印到控制台的方法。请注意,我现在正在一台单节点机器上运行所有东西,并试图了解由apache Beam提供的功能,以及如何才能在不影响行业最佳实践的情况下采用它。

So, here is my solution. At the very last stage of our pipeline we can introduce a map function that will print result to the console or accumulate the result in a variable later we can print the variable to see the value


import apache_beam as beam

# lets have a sample string
data = ["this is sample data", "this is yet another sample data"]

# create a pipeline
pipeline = beam.Pipeline()
counts = (pipeline | "create" >> beam.Create(data)
| "split" >> beam.ParDo(lambda row: row.split(" "))
| "pair" >> beam.Map(lambda w: (w, 1))
| "group" >> beam.CombinePerKey(sum))

# lets collect our result with a map transformation into output array
output = []
def collect(row):
return True

counts | "print" >> beam.Map(collect)

# Run the pipeline
result =

# lets wait until result a available

# print the output
print output

Maybe logging info instead of print?


def _logging(elem):
return elem

P | "logging info" >> beam.Map(_logging)

Follow an example from pycharm Edu

效仿来自pycharm edu的例子

import apache_beam as beam

class LogElements(beam.PTransform):
class _LoggingFn(beam.DoFn):

def __init__(self, prefix=''):
super(LogElements._LoggingFn, self).__init__()
self.prefix = prefix

def process(self, element, **kwargs):
print self.prefix + str(element)
yield element

def __init__(self, label=None, prefix=''):
super(LogElements, self).__init__(label)
self.prefix = prefix

def expand(self, input):
input | beam.ParDo(self._LoggingFn(self.prefix))

class MultiplyByTenDoFn(beam.DoFn):

def process(self, element):
yield element * 10

p = beam.Pipeline()

(p | beam.Create([1, 2, 3, 4, 5])
| beam.ParDo(MultiplyByTenDoFn())
| LogElements())



Out[10]: <apache_beam.runners.portability.fn_api_runner.RunnerResult at 0x7ff41418a210>

with beam.Pipeline() as pipeline:
lines = pipeline | beam.Create(["this is test", "this is another test"])
word_count = (lines
| "Word" >> beam.ParDo(lambda line: line.split(" "))
| "Pair of One" >> beam.Map(lambda w: (w, 1))
| "Group" >> beam.GroupByKey()
| "Count" >> beam.Map(lambda o: (o[0],str(sum(o[1])))))
word_count | beam.ParDo(lambda x: print(x))
result =

This works like a charm !


('this', '2')
('is', '2')
('test', '2')
('another', '1')
('this', '2')
('is', '2')
('test', '2')
('another', '1')


I know it isn't what you asked for but why don't you store it to a text file? It's always better than printing it via stdout and it isn't volatile



Note that if you try to add this in the middle of your pipeline, you may get the error TypeError: 'NoneType' object is not subscriptable from your pipeline. This is because print returns None, which gets passed along to your following instructions. In this case you will need somewhat different code, to print the value and then return it.


Nice idea, but this won't work if your pipeline is executed in a distributed manner as for instance in Apache Yarn (Hadoop) or within Google Dataflow. There must be another way to collect the results. But I'm still searching for it.

好主意,但如果您的管道是以分布式方式执行的,例如在ApacheYarn(Hadoop)或Google Dataflow中,这将不起作用。肯定还有其他方法来收集结果。但我仍在寻找它。

When I am using I am getting this error - 'PBegin' object has no attribute 'windowing'


This is great for unit tests in a DirectRunner.


In the more general case of not printing, but having the value available in runtime, I do have a use case (although I might be using it wrong). In the context of Tensorflow and Tensorflow Transform which I am dealing with, I wanted to count during the transform context, which uses Beam, and then use this value in operations during training. So keeping the count in memory is more handy than saving it to file and loading it again. But as said, this is not printing.


This is more of a comment than an answer


25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号