- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在使用数据流 0.5.5 Python。在非常简单的代码中遇到以下错误:
print(len(row_list))
row_list
是一个列表。完全相同的代码、相同的数据和相同的管道在 DirectRunner 上运行得非常好,但在 DataflowRunner 上抛出了以下异常。这是什么意思,我该如何解决?
job name: `beamapp-root-0216042234-124125`
(f14756f20f567f62): Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 544, in do_work
work_executor.execute()
File "dataflow_worker/executor.py", line 973, in dataflow_worker.executor.MapTaskExecutor.execute (dataflow_worker/executor.c:30547)
with op.scoped_metrics_container:
File "dataflow_worker/executor.py", line 974, in dataflow_worker.executor.MapTaskExecutor.execute (dataflow_worker/executor.c:30495)
op.start()
File "dataflow_worker/executor.py", line 302, in dataflow_worker.executor.GroupedShuffleReadOperation.start (dataflow_worker/executor.c:12149)
def start(self):
File "dataflow_worker/executor.py", line 303, in dataflow_worker.executor.GroupedShuffleReadOperation.start (dataflow_worker/executor.c:12053)
with self.scoped_start_state:
File "dataflow_worker/executor.py", line 316, in dataflow_worker.executor.GroupedShuffleReadOperation.start (dataflow_worker/executor.c:11968)
with self.shuffle_source.reader() as reader:
File "dataflow_worker/executor.py", line 320, in dataflow_worker.executor.GroupedShuffleReadOperation.start (dataflow_worker/executor.c:11912)
self.output(windowed_value)
File "dataflow_worker/executor.py", line 152, in dataflow_worker.executor.Operation.output (dataflow_worker/executor.c:6317)
cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
File "dataflow_worker/executor.py", line 85, in dataflow_worker.executor.ConsumerSet.receive (dataflow_worker/executor.c:4021)
cython.cast(Operation, consumer).process(windowed_value)
File "dataflow_worker/executor.py", line 766, in dataflow_worker.executor.BatchGroupAlsoByWindowsOperation.process (dataflow_worker/executor.c:25558)
self.output(wvalue.with_value((k, wvalue.value)))
File "dataflow_worker/executor.py", line 152, in dataflow_worker.executor.Operation.output (dataflow_worker/executor.c:6317)
cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
File "dataflow_worker/executor.py", line 85, in dataflow_worker.executor.ConsumerSet.receive (dataflow_worker/executor.c:4021)
cython.cast(Operation, consumer).process(windowed_value)
File "dataflow_worker/executor.py", line 545, in dataflow_worker.executor.DoOperation.process (dataflow_worker/executor.c:18474)
with self.scoped_process_state:
File "dataflow_worker/executor.py", line 546, in dataflow_worker.executor.DoOperation.process (dataflow_worker/executor.c:18428)
self.dofn_receiver.receive(o)
File "apache_beam/runners/common.py", line 195, in apache_beam.runners.common.DoFnRunner.receive (apache_beam/runners/common.c:5137)
self.process(windowed_value)
File "apache_beam/runners/common.py", line 262, in apache_beam.runners.common.DoFnRunner.process (apache_beam/runners/common.c:7078)
self.reraise_augmented(exn)
File "apache_beam/runners/common.py", line 274, in apache_beam.runners.common.DoFnRunner.reraise_augmented (apache_beam/runners/common.c:7467)
raise type(exn), args, sys.exc_info()[2]
File "apache_beam/runners/common.py", line 258, in apache_beam.runners.common.DoFnRunner.process (apache_beam/runners/common.c:6967)
self._dofn_simple_invoker(element)
File "apache_beam/runners/common.py", line 198, in apache_beam.runners.common.DoFnRunner._dofn_simple_invoker (apache_beam/runners/common.c:5283)
self._process_outputs(element, self.dofn_process(element.value))
File "apache_beam/runners/common.py", line 286, in apache_beam.runners.common.DoFnRunner._process_outputs (apache_beam/runners/common.c:7678)
for result in results:
File "trip_augmentation_test.py", line 120, in get_osm_way
TypeError: object of type '_UnwindowedValues' has no len() [while running 'Pull way info from mapserver']
#!/usr/bin/env python
# coding: utf-8
from __future__ import absolute_import
import argparse
import logging
import json
import apache_beam as beam
from apache_beam.utils.options import PipelineOptions
from apache_beam.utils.options import SetupOptions
def get_osm_way(pairs_same_group):
import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.exceptions import InsecureRequestWarning
from multiprocessing.pool import ThreadPool
import time
#disable InsecureRequestWarning for a cleaner output
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
print('processing hardwareid={} trips'.format(pairs_same_group[0]))
row_list = pairs_same_group[1]
print(row_list)
http_request_num = len(row_list) ######### this line ran into the above error##########
with requests.Session() as s:
s.mount('https://ip address',HTTPAdapter(pool_maxsize=http_request_num)) ##### a host name is needed for this http persistent connection
pool = ThreadPool(processes=1)
for row in row_list:
hardwareid=row['HardwareId']
tripid=row['TripId']
latlonArr = row['LatLonStrArr'].split(',');
print('gps points num: {}'.format(len(latlonArr)))
cor_array = []
for latlon in latlonArr:
lat = latlon.split(';')[0]
lon = latlon.split(';')[1]
cor_array.append('{{"x":"{}","y":"{}"}}'.format(lon, lat))
url = 'https://<ip address>/functionname?coordinates=[{}]'.format(','.join(cor_array))
print(url)
print("Requesting")
r = pool.apply_async(thread_get, (s, url)).get()
print ("Got response")
print(r)
if r.status_code==200:
yield (hardwareid,tripid,r.text)
else:
yield (hardwareid,tripid,None)
def run(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument('--input',
help=('Input BigQuery table to process specified as: '
'PROJECT:DATASET.TABLE or DATASET.TABLE.'))
parser.add_argument(
'--output',
required=True,
help=
('Output BigQuery table for results specified as: PROJECT:DATASET.TABLE '
'or DATASET.TABLE.'))
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(argv)
pipeline_options.view_as(SetupOptions).save_main_session = True
p = beam.Pipeline(options=pipeline_options)
(p
| 'Read trip from BigQuery' >> beam.io.Read(beam.io.BigQuerySource(query=known_args.input))
| 'Convert' >> beam.Map(lambda row: (row['HardwareId'],row))
| 'Group devices' >> beam.GroupByKey()
| 'Pull way info from mapserver' >> beam.FlatMap(get_osm_way)
| 'Map way info to dictionary' >> beam.FlatMap(convert_to_dict)
| 'Save to BQ' >> beam.io.Write(beam.io.BigQuerySink(
known_args.output, schema='HardwareId:INTEGER,TripId:INTEGER,OrderBy:INTEGER,IndexRatio:FLOAT,IsEstimate:BOOLEAN,IsOverRide:BOOLEAN,MaxSpeed:FLOAT,Provider:STRING,RoadName:STRING,WayId:STRING,LastEdited:TIMESTAMP,WayLatLons:STRING,BigDataComment:STRING',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))
)
# Run the pipeline (all operations are deferred until run() is called).
p.run()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
!python trip_augmentation_test.py \
--output 'my-project:my-dataset.mytable' \
--input 'SELECT HardwareId,TripId, LatLonStrArr FROM [my-project:my-dataset.mytable] ' \
--project 'my-project' \
--runner 'DataflowRunner' \ ### if just change this to DirectRunner, everything's fine
--temp_location 'gs://mybucket/tripway_temp' \
--staging_location 'gs://mybucket/tripway_staging' \
--worker_machine_type 'n1-standard-2' \
--profile_cpu True \
--profile_memory True
row_list
的类型,原来,在 DataflowRunner 中,它是
<class 'apache_beam.transforms.trigger._UnwindowedValues'>
,而在 DirectRunner 中,它是
list
.这是预期的不一致吗?
最佳答案
这种抽象在像 Beam/Dataflow(和其他)这样的大数据系统中是必要的。考虑到列表中的元素数量可能是任意大的。_UnwindowedValues
提供了可迭代的接口(interface)来访问这组可以是任何大小的元素,并且可能无法将整个元素保存在内存中。
Direct Runner 返回一个列表的事实是一个不一致的问题,在 Beam 的几个版本中已修复。在 Dataflow 中,GroupByKey
的结果不以列表形式出现,不支持len
- 但它是 可迭代的。
总之,在做http_request_num = len(row_list)
之前,您可以将其强制转换为支持 len 的类型,例如:
row_list = list(pairs_same_group[1])
http_request_num = len(row_list)
关于google-cloud-dataflow - '_UnwindowedValues' 类型的对象没有 len() 是什么意思?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42276520/
给定 data Person = Person { _name :: String } makeClassy ''Person 它创建了一个 name :: Lens' Person String 我
Python内置的len()函数的成本是多少?(列表/元组/字符串/词典)
这个问题在这里已经有了答案: 9年前关闭。 Possible Duplicate: lenses, fclabels, data-accessor - which library for struct
Python 哪个性能更好: 1) for i in range(len(a[:-1])): foo() 或 2) for i in range(len(a)-1): foo() 更新
我正在学习 Python 并正在学习谷歌代码类(class)。在 list2.py 示例中,他们要求我们编写一个函数: Given two lists sorted in increasing ord
我最近开始使用 Python 进行数据分析,由于我不是从头开始学习 Python,所以我觉得我错过了一些细微差别。 我注意到的一件事是,在我的一份报告中,我从 CSV 文件中导入了一个数据集,将其作为
为什么a[len(a):] = [x]等同于a.append(x),但是a[len(a) ] = [x] 给出超出范围的错误? 最佳答案 根据 the documentation (强调我的): If
当我运行以下宏时: Sub try() Dim num As Integer num = 123 MsgBox Len(num) MsgBox VBA.Len(num)
我目前正在获取 Python 3.x 中以 0 的随机长度结尾的随机数列表。例如,我得到以下一组随机数字字符串: String 1 = 203502000000 String 2 = 30293300
我正在学习 numba 并遇到了这种我不理解的“奇怪”行为。我尝试使用以下代码(在 iPython 中,用于计时): import numpy as np import numba as nb @nb
在Go , 要检查字符串是否为空,可以使用: len(str) == 0 或 len(str) < 1 或 str == "" 基本上就是选择运营商的一米== , < , != ,但就性能而言希望选项
我正在尝试创建一个函数 hpure通过重复相同的元素直到达到所需的长度来生成 hvect。每个元素可能有不同的类型。例如:如果参数是 show 每个元素将是 show 函数的特化。 hpure sho
我正在实现一个图形操作脚本,但我对以下错误感到困惑: Traceback (most recent call last): File ".....py", line 12, in pri
通常为了节省一些时间,我希望我们在本地函数中使用 n = len(s)。我很好奇哪个调用更快或者它们相同? while i < len(s): # do something 对比 while i
按照目前的情况,这个问题不适合我们的问答形式。我们希望答案得到事实、引用或专业知识的支持,但这个问题可能会引发辩论、争论、投票或扩展讨论。如果您觉得这个问题可以改进并可能重新打开,visit the
大家好! 我从这个网站找到了这段很棒的代码: var points = [30,100]; document.getElementById("demo").innerHTML = myArrayMax
我有一个输出 17 个维度的解码器,其中不同部分是标签和数字。因此,对于标签,我使用了 one-hot 编码并使用“softmax”激活,对于数字,我使用了“sigmoid”激活函数。 这是解码器:
我在下面得到了这段代码,但即使调试它,我也不明白为什么给出 7 而不是 6。 更准确地说,当我调试每个返回时都会给我预期的结果: 第一个函数调用:ipdb> --Return-- ['a'] 第二个函
上述分配可能会出现什么样的问题?如果我们分配实际数据类型的大小而不是该类型指针的大小? 对于 sizeof (char*) > sizeof (char) 的字符来说,这会是一个问题吗?其他数据类型和
我知道 somelist[len(somelist)] 无法访问定义列表之外的索引 - 这是有道理的。 但是为什么 Python 允许你做 somelist[len(somelist):]? 我什至读
我是一名优秀的程序员,十分优秀!