gpt4 book ai didi

python-3.x - 如何最好地使用 Python 并行化 grakn 查询?

转载 作者:行者123 更新时间:2023-12-04 13:39:19 25 4
gpt4 key购买 nike

我运行 Windows 10、Python 3.7,并有一个 6 核 CPU。我机器上的一个 Python 线程每秒向 grakn 提交 1,000 个插入。我想并行化我的代码以更快地插入和匹配。人们如何做到这一点?

我在并行化方面的唯一经验是在另一个项目中,我向 dask 分布式客户端提交了一个自定义函数以生成数千个任务。现在,只要自定义函数接收或生成 grakn 事务对象/句柄,这种相同的方法就会失败。我收到如下错误:

Traceback (most recent call last):
File "C:\Users\dvyd\.conda\envs\activefiction\lib\site-packages\distributed\protocol\pickle.py", line 41, in dumps
return cloudpickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL)
...
File "stringsource", line 2, in grpc._cython.cygrpc.Channel.__reduce_cython__
TypeError: no default __reduce__ due to non-trivial __cinit__

我从来没有直接使用过 Python 的多处理模块。其他人在做什么来将他们的查询并行化到 grakn?

最佳答案

我发现执行一批查询的最简单方法是将 Grakn session 传递给 ThreadPool 中的每个线程。 .在每个线程中,您可以管理事务,当然还可以执行一些更复杂的逻辑:

from grakn.client import GraknClient
from multiprocessing.dummy import Pool as ThreadPool
from functools import partial

def write_query_batch(session, batch):
tx = session.transaction().write()
for query in batch:
tx.query(query)
tx.commit()

def multi_thread_write_query_batches(session, query_batches, num_threads=8):
pool = ThreadPool(num_threads)
pool.map(partial(write_query_batch, session), query_batches)
pool.close()
pool.join()

def generate_query_batches(my_data_entries_list, batch_size):
batch = []
for index, data_entry in enumerate(my_data_entries_list):
batch.append(data_entry)
if index % batch_size == 0 and index != 0:
yield batch
batch = []
if batch:
yield batch


# (Part 2) Somewhere in your application open a client and a session
client = GraknClient(uri="localhost:48555")
session = client.session(keyspace="grakn")

query_batches_iterator = generate_query_batches(my_data_entries_list, batch_size)
multi_thread_write_query_batches(session, query_batches_iterator, num_threads=8)

session.close()
client.close()
以上是通用方法。作为一个具体的例子,您可以使用上面的(省略第 2 部分)来并行化 insert 的批次。来自两个文件的语句。将此附加到上面应该工作:
files = [
{
"file_path": f"/path/to/your/file.gql",
},
{
"file_path": f"/path/to/your/file2.gql",
}
]

KEYSPACE = "grakn"
URI = "localhost:48555"
BATCH_SIZE = 10
NUM_BATCHES = 1000

# ​Entry point where migration starts
def migrate_graql_files():
start_time = time.time()

for file in files:
print('==================================================')
print(f'Loading from {file["file_path"]}')
print('==================================================')

open_file = open(file["file_path"], "r") # Here we are assuming you have 1 Graql query per line!
batches = generate_query_batches(open_file.readlines(), BATCH_SIZE)

with GraknClient(uri=URI) as client: # Using `with` auto-closes the client
with client.session(KEYSPACE) as session: # Using `with` auto-closes the session
multi_thread_write_query_batches(session, batches, num_threads=16) # Pick `num_threads` according to your machine

elapsed = time.time() - start_time
print(f'Time elapsed {elapsed:.1f} seconds')

elapsed = time.time() - start_time
print(f'Time elapsed {elapsed:.1f} seconds')

if __name__ == "__main__":
migrate_graql_files()
您还应该能够看到如何从 csv 加载。或任何其他文件类型,但采用您在该文件中找到的值并将它们替换为 Graql 查询字符串模板。看看 migration example in the docs有关更多信息。

关于python-3.x - 如何最好地使用 Python 并行化 grakn 查询?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59822987/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com