- c - 在位数组中找到第一个零
- linux - Unix 显示有关匹配两种模式之一的文件的信息
- 正则表达式替换多个文件
- linux - 隐藏来自 xtrace 的命令
我有一个 PySpark 作业可以更新 HBase 中的一些对象(Spark v1.6.0;happybase v0.9)。
如果我为每一行打开/关闭一个 HBase 连接,它会有点工作:
def process_row(row):
conn = happybase.Connection(host=[hbase_master])
# update HBase record with data from row
conn.close()
my_dataframe.foreach(process_row)
几千次更新插入后,我们开始看到这样的错误:
TTransportException: Could not connect to [hbase_master]:9090
显然,为每个更新插入打开/关闭连接是低效的。这个函数实际上只是一个正确解决方案的占位符。
然后我尝试创建一个使用连接池的 process_row
函数版本:
pool = happybase.ConnectionPool(size=20, host=[hbase_master])
def process_row(row):
with pool.connection() as conn:
# update HBase record with data from row
由于某种原因,此函数的连接池版本返回错误(参见 complete error message ):
TypeError: can't pickle thread.lock objects
你能看出我做错了什么吗?
我看到了this post并怀疑我遇到了同样的问题:Spark 尝试序列化 pool
对象并将其分发给每个执行程序,但不能在多个执行程序之间共享此连接池对象。
听起来我需要将数据集分成多个分区,并为每个分区使用一个连接(参见 design patterns for using foreachrdd)。我根据文档中的示例尝试了此操作:
def persist_to_hbase(dataframe_partition):
hbase_connection = happybase.Connection(host=[hbase_master])
for row in dataframe_partition:
# persist data
hbase_connection.close()
my_dataframe.foreachPartition(lambda dataframe_partition: persist_to_hbase(dataframe_partition))
不幸的是,它仍然返回“无法腌制 thread.lock 对象”错误。
最佳答案
从根本上说,happybase 连接只是 tcp 连接,因此它们不能在进程之间共享。连接池主要用于多线程应用程序,也证明对单线程应用程序有用,单线程应用程序可以将池用作具有连接重用的全局“连接工厂”,这可以简化代码,因为不需要传递“连接”对象大约。它还使错误恢复更容易一些。
在任何情况下,池(只是一组连接)都不能在进程之间共享。出于这个原因,尝试序列化它没有意义。 (池使用导致序列化失败的锁,但这只是一个症状。)
也许您可以使用一个助手来有条件地创建一个池(或连接)并将其存储为模块局部变量,而不是在导入时实例化它,例如
_pool = None
def get_pool():
global _pool
if _pool is None:
_pool = happybase.ConnectionPool(size=1, host=[hbase_master])
return pool
def process(...)
with get_pool().connection() as connection:
connection.table(...).put(...)
这会在首次使用时而不是在导入时实例化池/连接。
关于python - 带有 HappyBase 连接池的 PySpark dataframe.foreach() 返回 'TypeError: can' t pickle thread.lock 对象',我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36455624/
假设我有一个 A 类和一个派生自 A 的 B 类。我想 pickle/unpickle B 类的一个实例。A 和 B 都定义了 __getstate__/__setstate__ 方法(假设 A
似乎有两种方法可以将指标推向 Graphite /碳, 线路接收器 pickle 接收器 根据文档 http://graphite.readthedocs.org/en/1.0/feeding-car
Perforce命令行有一个特殊的开关-G,它使用python的“pickle”序列化格式可以使输出成为机器可读的。一般来说,实际上是这样吗? 例如,考虑p4 -G diff -duw3 的输出。
如何从 BytesIO 对象写入和读回 pickled 数据? 我尝试过: import io import cPickle as pickle s1 = "foo" bytes_io = io.By
我有两个文件: x.py class BF(object) def __init__(): . . def add(self,z): . . y.py from y
在 post昨天发帖,无意中发现改了__qualname__函数对 pickle 有意想不到的影响.通过运行更多测试,我发现在对函数进行 pickle 时,pickle不像我想的那样工作,改变 __q
为什么 pickle 重用现有的 Python 类“C”而不是从 pickle 字节重建类?有没有一种方法可以在没有副作用的情况下 pickle 和解 pickle ? 这是我的回复 session
我使用 mpi4py 将一些计算拆分到多个过程中。基本上我只是计算一些凸包的体积,这是我使用 tvtk 和 mayavi 创建的。 只有第一个过程导入这些库: ... if rank==0: f
我正在用 pygame 制作一个绘图程序,我想在其中为用户提供一个选项来保存程序的确切状态,然后在稍后重新加载它。在这一点上,我保存了我的全局字典的副本,然后遍历, pickle 每个对象。 pyga
所以,我有一个对象,里面有很多不可 pickle 的东西(pygame 事件、orderedDicts、时钟等),我需要将它保存到磁盘。 事情是,如果我可以让这个东西存储一个有进度的字符串(我只需要一
import pickle variety = ["sweet", "box", "cat"] shape = ["back","spear", "log"] pickleFile = open("
我有一个关于 gensim 的问题。我想知道在保存或加载模型(或多个模型)时是否建议或需要使用 pickle,因为我在 GitHub 上找到了可以使用的脚本。 mymodel = Doc2Vec(do
我正在使用 python3.6/。我使用 protocol=pickle.HIGHEST_PROTOCOL pickle 了我的文件 当我按如下方式加载时: with open('data.sav',
给定一个像这样的任意Pythonic对象: class ExampleObj(object): def __init__(self): self.a = 'a'
简介 我有一本具有以下格式的字典: dict_list = {'S0':[[list of int],[list of int]], 'S1':[[list of int],[list of int]
我想知道这个错误可能意味着什么: PicklingError: Can't pickle : attribute lookup __builtin__.function failed 我理解这与使用多
我对 python 变量持久性有点困惑,在我的代码中,我使用以下代码使模型参数在某些迭代期间持久化 with open('W_Hs_Hu_iter'+str(inx)+'.pickle', 'wb'
当对象通过其属性之一引用自身时,从带有插槽的类中挑选对象的正确方法是什么?这是一个简单的示例,使用我当前的实现,我不确定它是否 100% 正确: import weakref import pickl
我有数千个长 (8640) 整数列表元组。例如: type(l1) tuple len(l1) 2 l1[0][:10] [0, 31, 23, 0, 0, 0, 0, 0, 0, 0] l1[1][
我有一个对象 gui_project,它有一个属性 .namespace,这是一个命名空间字典。 (即从字符串到对象的字典。) (这在类似 IDE 的程序中使用,让用户在 Python shell 中
我是一名优秀的程序员,十分优秀!