- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我有一个顶层函数,它获取一个包含 Parquet 文件路径和列名的元组。
该函数仅从文件中加载列,转换为 pandas,然后将其打包/序列化为标准形式。像这样的东西:
import pyarrow as pa
import pyarrow.parquet as pq
from multiprocessing import Pool
def binarizer(file_data_tuple):
''' Read a Parquet column a file, binarize and return'''
path, col_name, col_meta, native = file_data_tuple
if not native:
# Either this or using a top level hdfs_con
hdfs_con = pa.hdfs.connect(params)
read_pq = pq.read_table if native else hdfs_con.read_parquet
arrow_col = read_pq(filepath, columns = (col_name,))
bin_col = imported_binarizng_function(arrow_col)
return bin_col
def read_binarize_parallel(filepaths):
''' Setup parallel reading and binarizing of a parquet file'''
# list of tuples containing the filepath, column name, meta, and mode
pool_params = [(),..]
pool = Pool()
for file in filepaths:
bin_cols = pool.map(binarizer, pool_params)
chunk = b''.join(bin_cols)
send_over_socket(chunk)
这在我使用 native 模式时有效,也就是从本地文件系统读取文件。
但是,如果我尝试读取 hdfs,我会得到奇怪的(对我来说)箭头错误,无论是当我在每个进程中打开一个连接还是当我尝试使用同一个连接时。这是错误的压缩版本:
[libprotobuf ERROR google/protobuf/message_lite.cc:123] Can't parse message of type "Hdfs.Internal.RpcResponseHeaderProto" because it is missing required fields: callId, status [libprotobuf ERROR google/protobuf/message_lite.cc:123] Can't parse message of type "Hdfs.Internal.RpcResponseHeaderProto" because it is missing required fields: callId, status [libprotobuf ERROR google/protobuf/message_lite.cc:123] Can't parse message of type "Hdfs.Internal.RpcResponseHeaderProto" because it is missing required fields: callId, status [libprotobuf ERROR google/protobuf/message_lite.cc:123] Can't parse message of type "Hdfs.Internal.RpcResponseHeaderProto" because it is missing required fields: callId, status 2018-01-09 21:41:47.939006, p10007, th139965275871040, ERROR Failed to invoke RPC call "getFileInfo" on server "192.168.0.101:9000": RpcChannel.cpp: 703: HdfsRpcException: RPC channel to "192.168.0.101:9000" got protocol mismatch: RPC channel cannot find pending call: id = 3. @ Unknown
@ Unknown
@ arrow::io::HadoopFileSystem::GetPathInfo(std::string const&, arrow::io::HdfsPathInfo*)
@ __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info(__pyx_obj_7pyarrow_3lib_HadoopFileSystem*,_object*, arrow::io::HdfsPathInfo*) @ __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_15isfile(_object*, _object*) @ Unknown @ Unknown
@ Unknown
2018-01-09 21:41:47.939103, p10007, th139965275871040, INFO Retry idempotent RPC call "getFileInfo" on server "192.168.0.101:9000" 2018-01-09 21:41:47.939357, p10010, th139965275871040, ERROR Failed to invoke RPC call "getFileInfo" on server "192.168.0.101:9000": RpcChannel.cpp: 780: HdfsRpcException: RPC channel to "192.168.0.101:9000" got protocol mismatch: RPC channel cannot parse response header. @ Unknown
@ Unknown
@ arrow::io::HadoopFileSystem::GetPathInfo(std::string const&, arrow::io::HdfsPathInfo*)
@ __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info(__pyx_obj_7pyarrow_3lib_HadoopFileSystem*,_object*, arrow::io::HdfsPathInfo*) @ __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*) @ Unknown @ Unknown
@ Unknown
@2018-01-09 21:41:47.939406, p10008, th139965275871040, ERROR Failed to invoke RPC call "getFileInfo" on server"192.168.0.101:9000": RpcChannel.cpp: 780: HdfsRpcException: RPC channel to "192.168.0.101:9000" got protocol mismatch: RPC channel cannot parse response header. @ Unknown
@ Unknown
@ arrow::io::HadoopFileSystem::GetPathInfo(std::string const&, arrow::io::HdfsPathInfo*)
@ __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info(__pyx_obj_7pyarrow_3lib_HadoopFileSystem*,_object*, arrow::io::HdfsPathInfo*) @ __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*) @ Unknown
@ Unknown 2018-01-09 21:41:47.939422, p10013, th139965275871040, ERROR Failed to invoke RPC call "getFileInfo" on server
"192.168.0.101:9000": RpcChannel.cpp: 780: HdfsRpcException: RPC channel to "192.168.0.101:9000" got protocol mismatch: RPC channel cannot parse response header. @ Unknown
@ Unknown
@ arrow::io::HadoopFileSystem::GetPathInfo(std::string const&, arrow::io::HdfsPathInfo*)
@ __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info(__pyx_obj_7pyarrow_3lib_HadoopFileSystem*,_object*, arrow::io::HdfsPathInfo*) @ __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*) @ Unknown
@ Unknown
@2018-01-09 21:41:47.939431, p10009, th139965275871040, ERROR Failed to invoke RPC call "getFileInfo" on server"192.168.0.101:9000": RpcChannel.cpp: 780: HdfsRpcException: RPC channel to "192.168.0.101:9000" got protocol mismatch: RPC channel cannot parse response header. @ Unknown
@ Unknown
@ arrow::io::HadoopFileSystem::GetPathInfo(std::string const&, arrow::io::HdfsPathInfo*)
@ __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info(__pyx_obj_7pyarrow_3lib_HadoopFileSystem*,_object*, arrow::io::HdfsPathInfo*) @ __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*) @ Unknown
@ Unknown
@ @ Unknown
Unknown 2018-01-09 21:41:47.939457, p10012, th139965275871040, ERROR Failed to invoke RPC call "getFileInfo" on server"192.168.0.101:9000": RpcChannel.cpp: 780: HdfsRpcException: RPC channel to "192.168.0.101:9000" got protocol mismatch: RPC channel cannot parse response header. @ Unknown
@ Unknown
@ arrow::io::HadoopFileSystem::GetPathInfo(std::string const&, arrow::io::HdfsPathInfo*)
@ __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info(__pyx_obj_7pyarrow_3lib_HadoopFileSystem*,_object*, arrow::io::HdfsPathInfo*) @ __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*) @ Unknown @ Unknown
@ Unknown
@ Unknown
Unknown
@ Unknown binarizing process filepath: /parquet_430mb/5e6.parquet
@ Unknown
Unknown
@ Unknown
@ Unknown
@ Unknown2018-01-09 21:41:47.939854, p10010, th139965275871040, INFO Retry idempotent RPC call "getFileInfo" on server "192.168.0.101:9000"
2018-01-09 21:41:47.939864, p10013, th139965275871040, INFO Retry idempotent RPC call "getFileInfo" on server "192.168.0.101:9000" 2018-01-09 21:41:47.939866, p10008, th139965275871040, INFO Retry idempotent RPC call "getFileInfo" on server "192.168.0.101:9000" 2018-01-09 21:41:47.939868, p10012, th139965275871040, INFO Retry idempotent RPC call "getFileInfo" on server "192.168.0.101:9000" 2018-01-09 21:41:47.939868, p10009, th139965275871040, INFO Retry idempotent RPC call "getFileInfo" on server "192.168.0.101:9000" 2018-01-09 21:41:47.940813, p10014, th139965275871040, ERROR Failed to invoke RPC call "getFileInfo" on server "192.168.0.101:9000": RpcChannel.cpp: 780: HdfsRpcException: RPC channel to "192.168.0.101:9000" got protocol mismatch: RPC channel cannot parse response header. @ Unknown
@ Unknown
@ arrow::io::HadoopFileSystem::GetPathInfo(std::string const&, arrow::io::HdfsPathInfo*)
@ __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info(__pyx_obj_7pyarrow_3lib_HadoopFileSystem*,_object*, arrow::io::HdfsPathInfo*) @ __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*) @ Unknown
@ Unknown
2018-01-09 21:41:47.940937, p10014, th139965275871040, INFO Retry idempotent RPC call "getFileInfo" on server "192.168.0.101:9000" 2018-01-09 21:41:47.944352, p10011, th139965275871040, ERROR Failed to invoke RPC call "getFileInfo" on server "192.168.0.101:9000": RpcChannel.cpp: 393: HdfsRpcException: Failed to invoke RPC call "getFileInfo" on server "192.168.0.101:9000" @ Unknown @ Unknown
@ Unknown
@ arrow::io::HadoopFileSystem::GetPathInfo(std::string const&, arrow::io::HdfsPathInfo*)
@ __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info(__pyx_obj_7pyarrow_3lib_HadoopFileSystem*,_object*, arrow::io::HdfsPathInfo*) @ __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*) @ Unknown
@ Unknown Caused by TcpSocket.cpp: 127: HdfsNetworkException: Write 124 bytes failed to "192.168.0.101:9000": (errno: 32) Broken
pipe @ Unknown @ Unknown
@ Unknown
@ arrow::io::HadoopFileSystem::GetPathInfo(std::string const&, arrow::io::HdfsPathInfo*)
@ __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info(__pyx_obj_7pyarrow_3lib_HadoopFileSystem*,_object*, arrow::io::HdfsPathInfo*) @ __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*) @ Unknown @ Unknown
@ Unknown
2018-01-09 21:41:47.944519, p10011, th139965275871040, INFO Retry idempotent RPC call "getFileInfo" on server "192.168.0.101:9000" --------------------------------------------------------------------------- ArrowIOError Traceback (most recent call last)
/home/parquet_sender.pyc in insert_files_parallel(self) 374 # print ('372 sqparquet filepath:', filepath) 375 params_with_path_and_mode = [col_params+(filepath, native) for col_params in pool_params] --> 376 bin_col = self.pool.map(read_binarize, params_with_path_and_mode) 377 got ('map complete') 378 num_rows = bin_col[0][2]
/usr/lib/python2.7/multiprocessing/pool.pyc in map(self, func, iterable, chunksize) 249 ''' 250 assert self._state == RUN --> 251 return self.map_async(func, iterable, chunksize).get() 252 253 def imap(self, func, iterable, chunksize=1):
/usr/lib/python2.7/multiprocessing/pool.pyc in get(self, timeout) 556 return self._value 557 else: --> 558 raise self._value 559 560 def _set(self, i, obj):
ArrowIOError: HDFS: GetPathInfo failed
我很高兴收到有关此错误原因的任何反馈,以及我应该如何使用并行 Parquet 加载。
最佳答案
这是一个与多处理序列化细节相关的错误。我在这里打开了一个错误报告 https://issues.apache.org/jira/browse/ARROW-1986
关于python - 在 Pyarrows 的 HdfsClient 中使用多处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48177823/
有没有一种方法可以使用纯 pyarrow 表对数据进行排序并删除重复项?我的目标是根据最大更新时间戳检索每个 ID 的最新版本。 一些额外的细节:我的数据集通常至少分为两个版本: 历史 最终 历史数据
我有一个形状为 6132,7 的 pyarrow 表名称 final_table 我想向该表中添加列 list_ = ['IT'] * 6132 final_table.append_column
我有一个形状为 6132,7 的 pyarrow 表名称 final_table 我想向该表中添加列 list_ = ['IT'] * 6132 final_table.append_column
正如标题所述,我想通过使用 pyarrow 并写入多个 Parquet 文件,按大小(或行组大小)重新分区 pyarrow 表。 我查看了 pyarrow 文档,并确定了分区数据集章节,这似乎是一个方
当我尝试加载多个分区的 Parquet 文件时,由于缺少用空值填充架构的数据,某些架构被无效推断。我认为在 pyarrow.parquet.ParquetDataset 中指定架构可以解决此问题,但我
在使用 pyarrow 函数将 Spark df 转换为 pandas df 时,我收到以下警告: UserWarning: pyarrow.open_stream is deprecated, pl
创建了 pyarrow 的 egg 和 whl 文件并将其放在 s3 上,以便在 pythonshell 作业中调用它。收到此消息: 工作代码: import pyarrow raise 错误,whl
我正在通过 pyspark 在本地运行 spark 2.4.2,用于 NLP 中的 ML 项目。 Pipeline 中的部分预处理步骤涉及使用通过 pyarrow 优化的 pandas_udf 函数。
我有一个针对 Apache Arrow C++ 库构建的 C++ 库,使用 Pybind 绑定(bind)到 python。我希望能够用 C++ 编写一个函数来获取用 PyArrow 构造的表,例如:
我正在尝试在我的虚拟环境中安装 apache-beam[gcp]。 My python version is 3.7 My pip3 version is 20.0.2 什么时候尝
假设我加载了一个数据集 myds=ds.dataset('mypath', format='parquet', partitioning='hive') myds.schema # On/Off_Pe
有没有办法使用 pyarrow parquet 数据集来读取特定的列,如果可能的话过滤数据而不是将整个文件读入数据框? 最佳答案 从 pyarrow==2.0.0 开始,这至少可以通过 pyarrow
Pandas 数据框很重,所以我想避免这种情况。 但我想构造 Pyarrow Table 以便以 Parquet 格式存储数据。 我搜索并阅读文档并尝试使用 from_array() 但它不起作用。
我正在尝试 pip install Superset pip install apache-superset 并低于错误 Traceback (most recent call last):
对于较大文件的解析,我需要循环写入大量的parquet文件。但是,似乎此任务消耗的内存在每次迭代中都会增加,而我希望它保持不变(因为内存中不应附加任何内容)。这使得扩展变得棘手。 我添加了一个最小可重
我想在一个公共(public)属性上连接两个 Arrow 表。 Arrow 是否有一些 C++ API 来实现相同的目的?我确实找到了一个叫做 HashJoin 的东西,但我不确定它是否可以用来连接
我有一个 pandas 数据框: import pandas as pd df = pd.DataFrame(data={"col1": [1, 2], "col2": [3.0, 4.0], "co
我正在将一个 Python 对象保存到一个客户端的 Plasma 存储中,并想在另一个客户端中加载它。我的想法是将 ObjectID 作为 HTTP 请求的一部分传递。所以我有类似的东西: impor
我有一个 .dat 文件,我一直在用 pd.read_csv 读取它,并且总是需要使用 encoding="latin" 才能正确读取/没有错误.当我使用 pyarrow.csv.read_csv 时
我正在使用 Pyarrow 库来优化 Pandas DataFrame 的存储。我需要尽可能快地逐行处理 pyarrow Table 而不将其转换为 Pandas DataFrame(它不适合内存)。
我是一名优秀的程序员,十分优秀!