- c - 在位数组中找到第一个零
- linux - Unix 显示有关匹配两种模式之一的文件的信息
- 正则表达式替换多个文件
- linux - 隐藏来自 xtrace 的命令
我有一个 PySpark 作业可以更新 HBase 中的一些对象(Spark v1.6.0;happybase v0.9)。
如果我为每一行打开/关闭一个 HBase 连接,它会有点工作:
def process_row(row):
conn = happybase.Connection(host=[hbase_master])
# update HBase record with data from row
conn.close()
my_dataframe.foreach(process_row)
几千次更新插入后,我们开始看到这样的错误:
TTransportException: Could not connect to [hbase_master]:9090
显然,为每个更新插入打开/关闭连接是低效的。这个函数实际上只是一个正确解决方案的占位符。
然后我尝试创建一个使用连接池的 process_row
函数版本:
pool = happybase.ConnectionPool(size=20, host=[hbase_master])
def process_row(row):
with pool.connection() as conn:
# update HBase record with data from row
由于某种原因,此函数的连接池版本返回错误(参见 complete error message ):
TypeError: can't pickle thread.lock objects
你能看出我做错了什么吗?
我看到了this post并怀疑我遇到了同样的问题:Spark 尝试序列化 pool
对象并将其分发给每个执行程序,但不能在多个执行程序之间共享此连接池对象。
听起来我需要将数据集分成多个分区,并为每个分区使用一个连接(参见 design patterns for using foreachrdd)。我根据文档中的示例尝试了此操作:
def persist_to_hbase(dataframe_partition):
hbase_connection = happybase.Connection(host=[hbase_master])
for row in dataframe_partition:
# persist data
hbase_connection.close()
my_dataframe.foreachPartition(lambda dataframe_partition: persist_to_hbase(dataframe_partition))
不幸的是,它仍然返回“无法腌制 thread.lock 对象”错误。
最佳答案
从根本上说,happybase 连接只是 tcp 连接,因此它们不能在进程之间共享。连接池主要用于多线程应用程序,也证明对单线程应用程序有用,单线程应用程序可以将池用作具有连接重用的全局“连接工厂”,这可以简化代码,因为不需要传递“连接”对象大约。它还使错误恢复更容易一些。
在任何情况下,池(只是一组连接)都不能在进程之间共享。出于这个原因,尝试序列化它没有意义。 (池使用导致序列化失败的锁,但这只是一个症状。)
也许您可以使用一个助手来有条件地创建一个池(或连接)并将其存储为模块局部变量,而不是在导入时实例化它,例如
_pool = None
def get_pool():
global _pool
if _pool is None:
_pool = happybase.ConnectionPool(size=1, host=[hbase_master])
return pool
def process(...)
with get_pool().connection() as connection:
connection.table(...).put(...)
这会在首次使用时而不是在导入时实例化池/连接。
关于python - 带有 HappyBase 连接池的 PySpark dataframe.foreach() 返回 'TypeError: can' t pickle thread.lock 对象',我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36455624/
我对这个错误很困惑: Cannot implicitly convert type 'System.Func [c:\Program Files (x86)\Reference Assemblies\
考虑这段代码: pub trait Hello { fn hello(&self); } impl Hello for Any { fn hello(&self) {
问题很简单。是否可以构造这样一个类型 T,对于它下面的两个变量声明会产生不同的结果? T t1 = {}; T t2{}; 我已经研究 cppreference 和标准一个多小时了,我了解以下内容:
Intellij idea 给我这个错误:“Compare (T, T) in Comparator cannot be applied to (T, T)” 对于以下代码: public class
任何人都可以告诉我 : n\t\t\t\t\n\t\t\t 在以下来自和 dwr 服务的响应中的含义和用途是什么. \r\n\t\t\t \r\n\t\t\t
让 T 成为一个 C++ 类。 下面三个指令在行为上有什么区别吗? T a; T a(); T a = T(); T 为不带参数的构造函数提供了显式定义这一事实是否对问题有任何改变? 后续问题:如果
Rust中的智能指针是什么 智能指针(smart pointers)是一类数据结构,是拥有数据所有权和额外功能的指针。是指针的进一步发展 指针(pointer)是一个包含内存地
比如我有一个 vector vector > v={{true,1},{true,2},{false,3},{false,4},{false,5},{true,6},{false,7},{true,8
我有一个来自 .xls 电子表格的数据框,我打印了 print(df.columns.values) 列,输出包含一个名为:Poll Responses\n\t\t\t\t\t。 我查看了 Excel
This question already has answers here: What are good reasons for choosing invariance in an API like
指针类型作为类型前缀与在类型前加斜杠作为后缀有什么区别。斜线到底是什么意思? 最佳答案 语法 T/~ 和 T/& 基本上已被弃用(我什至不确定编译器是否仍然接受它)。在向新向量方案过渡的初始阶段,[T
我正在尝试找到一种方法来获取模板参数的基类。 考虑以下类: template class Foo { public: Foo(){}; ~Foo(){};
这是一个让我感到困惑的小问题。我不知道如何描述它,所以只看下面的代码: struct B { B() {} B(B&) { std::cout ::value #include
为什么有 T::T(T&) 而 T::T(const T&) 更适合 copy ? (大概是用来实现move语义的???) 原始描述(被melpomene证明是错误的): 在C++11中,支持了一种新
在 Java 7 中使用 eclipse 4.2 并尝试实现 List 接口(interface)的以下方法时,我收到了警告。 public T[] toArray(T[] a) { ret
假设有三个函数: def foo[T](a:T, b:T): T = a def test1 = foo(1, "2") def test2 = foo(List(), ListBuffer()) 虽
我对柯里化(Currying)和非柯里化(Currying)泛型函数之间类型检查的差异有点困惑: scala> def x[T](a: T, b: T) = (a == b) x: [T](a: T,
考虑一个类A,我如何编写一个具有与相同行为的模板 A& pretty(A& x) { /* make x pretty */ return x; } A pretty(A&& x) {
Eclipse 表示由于泛型类型橡皮擦,类型参数不允许使用 instanceof 操作。 我同意在运行时不会保留任何类型信息。但是请考虑以下类的通用声明: class SomeClass{ T
在 C++14 中: 对于任何整数或枚举类型 T 以及对于任何表达式 expr: 有没有区别: struct S { T t { expr }; }; 和 struct S { T t = { exp
我是一名优秀的程序员,十分优秀!