gpt4 book ai didi

python - 类型错误:在 Pandas DataFrame 上使用 dask 时无法腌制 _thread._local 对象

转载 作者:行者123 更新时间:2023-12-03 16:45:10 25 4
gpt4 key购买 nike

我有一个巨大的 DataFrame,我想使用 dask 处理它以节省时间。问题是我陷入了这个 TypeError: can't pickle _thread._local objects一开始运行就报错。有人能帮我吗?

我编写了一个函数,该函数根据其行处理存储在 DF 中的数据,并使用

out = df_query.progress_apply(lambda row: run(row), axis=1)

它运行良好。

由于这需要很多时间,我开始使用 dask:
ddata = dd.from_pandas(df_query, npartitions=3)
out = ddata.map_partitions(lambda df: df.apply((lambda row: run(row)), axis=1)).compute(scheduler='processes')

问题是,一旦处理开始,我就会收到这个错误(在一个巨大的回溯之后,见下文): TypeError: can't pickle _thread._local objects run(...)函数执行一些数据操作,包括对数据库的查询。

这是完整的回溯:
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<ipython-input-14-aefae1f00437> in <module>
----> 1 out = ddata.map_partitions(lambda df: df.apply((lambda row: run(row)), axis=1)).compute(scheduler='processes')

~/anaconda3/envs/testenv/lib/python3.7/site-packages/dask/base.py in compute(self, **kwargs)
154 dask.base.compute
155 """
--> 156 (result,) = compute(self, traverse=False, **kwargs)
157 return result
158

~/anaconda3/envs/testenv/lib/python3.7/site-packages/dask/base.py in compute(*args, **kwargs)
396 keys = [x.__dask_keys__() for x in collections]
397 postcomputes = [x.__dask_postcompute__() for x in collections]
--> 398 results = schedule(dsk, keys, **kwargs)
399 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
400

~/anaconda3/envs/testenv/lib/python3.7/site-packages/dask/multiprocessing.py in get(dsk, keys, num_workers, func_loads, func_dumps, optimize_graph, pool, **kwargs)
190 get_id=_process_get_id, dumps=dumps, loads=loads,
191 pack_exception=pack_exception,
--> 192 raise_exception=reraise, **kwargs)
193 finally:
194 if cleanup:

~/anaconda3/envs/testenv/lib/python3.7/site-packages/dask/local.py in get_async(apply_async, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, **kwargs)
447 # Seed initial tasks into the thread pool
448 while state['ready'] and len(state['running']) < num_workers:
--> 449 fire_task()
450
451 # Main loop, wait on tasks to finish, insert new ones

~/anaconda3/envs/testenv/lib/python3.7/site-packages/dask/local.py in fire_task()
441 # Submit
442 apply_async(execute_task,
--> 443 args=(key, dumps((dsk[key], data)),
444 dumps, loads, get_id, pack_exception),
445 callback=queue.put)

~/anaconda3/envs/testenv/lib/python3.7/site-packages/dask/multiprocessing.py in _dumps(x)
24
25 def _dumps(x):
---> 26 return cloudpickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL)
27
28

~/anaconda3/envs/testenv/lib/python3.7/site-packages/cloudpickle/cloudpickle.py in dumps(obj, protocol)
950 try:
951 cp = CloudPickler(file, protocol=protocol)
--> 952 cp.dump(obj)
953 return file.getvalue()
954 finally:

~/anaconda3/envs/testenv/lib/python3.7/site-packages/cloudpickle/cloudpickle.py in dump(self, obj)
265 self.inject_addons()
266 try:
--> 267 return Pickler.dump(self, obj)
268 except RuntimeError as e:
269 if 'recursion' in e.args[0]:

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in dump(self, obj)
435 if self.proto >= 4:
436 self.framer.start_framing()
--> 437 self.save(obj)
438 self.write(STOP)
439 self.framer.end_framing()

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
502 f = self.dispatch.get(t)
503 if f is not None:
--> 504 f(self, obj) # Call unbound method with explicit self
505 return
506

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_tuple(self, obj)
769 if n <= 3 and self.proto >= 2:
770 for element in obj:
--> 771 save(element)
772 # Subtle. Same as in the big comment below.
773 if id(obj) in memo:

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
502 f = self.dispatch.get(t)
503 if f is not None:
--> 504 f(self, obj) # Call unbound method with explicit self
505 return
506

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_tuple(self, obj)
769 if n <= 3 and self.proto >= 2:
770 for element in obj:
--> 771 save(element)
772 # Subtle. Same as in the big comment below.
773 if id(obj) in memo:

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
547
548 # Save the reduce() output and finally memoize the object
--> 549 self.save_reduce(obj=obj, *rv)
550
551 def persistent_id(self, obj):

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
636 else:
637 save(func)
--> 638 save(args)
639 write(REDUCE)
640

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
502 f = self.dispatch.get(t)
503 if f is not None:
--> 504 f(self, obj) # Call unbound method with explicit self
505 return
506

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_tuple(self, obj)
784 write(MARK)
785 for element in obj:
--> 786 save(element)
787
788 if id(obj) in memo:

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
502 f = self.dispatch.get(t)
503 if f is not None:
--> 504 f(self, obj) # Call unbound method with explicit self
505 return
506

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_dict(self, obj)
854
855 self.memoize(obj)
--> 856 self._batch_setitems(obj.items())
857
858 dispatch[dict] = save_dict

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in _batch_setitems(self, items)
885 k, v = tmp[0]
886 save(k)
--> 887 save(v)
888 write(SETITEM)
889 # else tmp is empty, and we're done

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
502 f = self.dispatch.get(t)
503 if f is not None:
--> 504 f(self, obj) # Call unbound method with explicit self
505 return
506

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_tuple(self, obj)
784 write(MARK)
785 for element in obj:
--> 786 save(element)
787
788 if id(obj) in memo:

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
502 f = self.dispatch.get(t)
503 if f is not None:
--> 504 f(self, obj) # Call unbound method with explicit self
505 return
506

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_tuple(self, obj)
769 if n <= 3 and self.proto >= 2:
770 for element in obj:
--> 771 save(element)
772 # Subtle. Same as in the big comment below.
773 if id(obj) in memo:

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
502 f = self.dispatch.get(t)
503 if f is not None:
--> 504 f(self, obj) # Call unbound method with explicit self
505 return
506

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_list(self, obj)
814
815 self.memoize(obj)
--> 816 self._batch_appends(obj)
817
818 dispatch[list] = save_list

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in _batch_appends(self, items)
838 write(MARK)
839 for x in tmp:
--> 840 save(x)
841 write(APPENDS)
842 elif n:

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
502 f = self.dispatch.get(t)
503 if f is not None:
--> 504 f(self, obj) # Call unbound method with explicit self
505 return
506

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_list(self, obj)
814
815 self.memoize(obj)
--> 816 self._batch_appends(obj)
817
818 dispatch[list] = save_list

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in _batch_appends(self, items)
838 write(MARK)
839 for x in tmp:
--> 840 save(x)
841 write(APPENDS)
842 elif n:

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
502 f = self.dispatch.get(t)
503 if f is not None:
--> 504 f(self, obj) # Call unbound method with explicit self
505 return
506

~/anaconda3/envs/testenv/lib/python3.7/site-packages/cloudpickle/cloudpickle.py in save_function(self, obj, name)
393 or getattr(obj.__code__, 'co_filename', None) == '<stdin>'
394 or themodule is None):
--> 395 self.save_function_tuple(obj)
396 return
397 else:

~/anaconda3/envs/testenv/lib/python3.7/site-packages/cloudpickle/cloudpickle.py in save_function_tuple(self, func)
592 if hasattr(func, '__qualname__'):
593 state['qualname'] = func.__qualname__
--> 594 save(state)
595 write(pickle.TUPLE)
596 write(pickle.REDUCE) # applies _fill_function on the tuple

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
502 f = self.dispatch.get(t)
503 if f is not None:
--> 504 f(self, obj) # Call unbound method with explicit self
505 return
506

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_dict(self, obj)
854
855 self.memoize(obj)
--> 856 self._batch_setitems(obj.items())
857
858 dispatch[dict] = save_dict

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in _batch_setitems(self, items)
880 for k, v in tmp:
881 save(k)
--> 882 save(v)
883 write(SETITEMS)
884 elif n:

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
502 f = self.dispatch.get(t)
503 if f is not None:
--> 504 f(self, obj) # Call unbound method with explicit self
505 return
506

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_dict(self, obj)
854
855 self.memoize(obj)
--> 856 self._batch_setitems(obj.items())
857
858 dispatch[dict] = save_dict

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in _batch_setitems(self, items)
885 k, v = tmp[0]
886 save(k)
--> 887 save(v)
888 write(SETITEM)
889 # else tmp is empty, and we're done

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
502 f = self.dispatch.get(t)
503 if f is not None:
--> 504 f(self, obj) # Call unbound method with explicit self
505 return
506

~/anaconda3/envs/testenv/lib/python3.7/site-packages/cloudpickle/cloudpickle.py in save_function(self, obj, name)
393 or getattr(obj.__code__, 'co_filename', None) == '<stdin>'
394 or themodule is None):
--> 395 self.save_function_tuple(obj)
396 return
397 else:

~/anaconda3/envs/testenv/lib/python3.7/site-packages/cloudpickle/cloudpickle.py in save_function_tuple(self, func)
592 if hasattr(func, '__qualname__'):
593 state['qualname'] = func.__qualname__
--> 594 save(state)
595 write(pickle.TUPLE)
596 write(pickle.REDUCE) # applies _fill_function on the tuple

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
502 f = self.dispatch.get(t)
503 if f is not None:
--> 504 f(self, obj) # Call unbound method with explicit self
505 return
506

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_dict(self, obj)
854
855 self.memoize(obj)
--> 856 self._batch_setitems(obj.items())
857
858 dispatch[dict] = save_dict

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in _batch_setitems(self, items)
880 for k, v in tmp:
881 save(k)
--> 882 save(v)
883 write(SETITEMS)
884 elif n:

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
502 f = self.dispatch.get(t)
503 if f is not None:
--> 504 f(self, obj) # Call unbound method with explicit self
505 return
506

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_dict(self, obj)
854
855 self.memoize(obj)
--> 856 self._batch_setitems(obj.items())
857
858 dispatch[dict] = save_dict

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in _batch_setitems(self, items)
880 for k, v in tmp:
881 save(k)
--> 882 save(v)
883 write(SETITEMS)
884 elif n:

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
502 f = self.dispatch.get(t)
503 if f is not None:
--> 504 f(self, obj) # Call unbound method with explicit self
505 return
506

~/anaconda3/envs/testenv/lib/python3.7/site-packages/cloudpickle/cloudpickle.py in save_function(self, obj, name)
393 or getattr(obj.__code__, 'co_filename', None) == '<stdin>'
394 or themodule is None):
--> 395 self.save_function_tuple(obj)
396 return
397 else:

~/anaconda3/envs/testenv/lib/python3.7/site-packages/cloudpickle/cloudpickle.py in save_function_tuple(self, func)
592 if hasattr(func, '__qualname__'):
593 state['qualname'] = func.__qualname__
--> 594 save(state)
595 write(pickle.TUPLE)
596 write(pickle.REDUCE) # applies _fill_function on the tuple

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
502 f = self.dispatch.get(t)
503 if f is not None:
--> 504 f(self, obj) # Call unbound method with explicit self
505 return
506

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_dict(self, obj)
854
855 self.memoize(obj)
--> 856 self._batch_setitems(obj.items())
857
858 dispatch[dict] = save_dict

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in _batch_setitems(self, items)
880 for k, v in tmp:
881 save(k)
--> 882 save(v)
883 write(SETITEMS)
884 elif n:

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
502 f = self.dispatch.get(t)
503 if f is not None:
--> 504 f(self, obj) # Call unbound method with explicit self
505 return
506

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_dict(self, obj)
854
855 self.memoize(obj)
--> 856 self._batch_setitems(obj.items())
857
858 dispatch[dict] = save_dict

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in _batch_setitems(self, items)
880 for k, v in tmp:
881 save(k)
--> 882 save(v)
883 write(SETITEMS)
884 elif n:

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
547
548 # Save the reduce() output and finally memoize the object
--> 549 self.save_reduce(obj=obj, *rv)
550
551 def persistent_id(self, obj):

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
660
661 if state is not None:
--> 662 save(state)
663 write(BUILD)
664

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
502 f = self.dispatch.get(t)
503 if f is not None:
--> 504 f(self, obj) # Call unbound method with explicit self
505 return
506

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_dict(self, obj)
854
855 self.memoize(obj)
--> 856 self._batch_setitems(obj.items())
857
858 dispatch[dict] = save_dict

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in _batch_setitems(self, items)
880 for k, v in tmp:
881 save(k)
--> 882 save(v)
883 write(SETITEMS)
884 elif n:

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
547
548 # Save the reduce() output and finally memoize the object
--> 549 self.save_reduce(obj=obj, *rv)
550
551 def persistent_id(self, obj):

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
660
661 if state is not None:
--> 662 save(state)
663 write(BUILD)
664

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
502 f = self.dispatch.get(t)
503 if f is not None:
--> 504 f(self, obj) # Call unbound method with explicit self
505 return
506

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_dict(self, obj)
854
855 self.memoize(obj)
--> 856 self._batch_setitems(obj.items())
857
858 dispatch[dict] = save_dict

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in _batch_setitems(self, items)
880 for k, v in tmp:
881 save(k)
--> 882 save(v)
883 write(SETITEMS)
884 elif n:

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
522 reduce = getattr(obj, "__reduce_ex__", None)
523 if reduce is not None:
--> 524 rv = reduce(self.proto)
525 else:
526 reduce = getattr(obj, "__reduce__", None)

TypeError: can't pickle _thread._local objects

最佳答案

您的 run函数可能引用了其范围之外的变量,这些变量正在被捕获到闭包中。确保在函数内部创建任何文件句柄或数据库连接

坏的:

conn = DBConn(...)
def run(row):
return conn.do_stuff(row)

好的:
def run(row):
conn = DBConn(...)
return conn.do_stuff(row)

关于python - 类型错误:在 Pandas DataFrame 上使用 dask 时无法腌制 _thread._local 对象,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55708455/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com