gpt4 book ai didi

python-3.x - Dask:持续提交,处理所有提交的数据

转载 作者:行者123 更新时间:2023-12-03 17:12:02 25 4
gpt4 key购买 nike

拥有500个,持续增长DataFrames ,我想提交对(每个DataFrame独立的)数据的操作到dask .我的主要问题是:可以 dask保持不断提交的数据,所以我可以submit对所有提交的数据的功能 - 不仅仅是新提交的?

但是让我们用一个例子来解释它:

创建 dask_server.py :

from dask.distributed import Client, LocalCluster
HOST = '127.0.0.1'
SCHEDULER_PORT = 8711
DASHBOARD_PORT = ':8710'

def run_cluster():
cluster = LocalCluster(dashboard_address=DASHBOARD_PORT, scheduler_port=SCHEDULER_PORT, n_workers=8)
print("DASK Cluster Dashboard = http://%s%s/status" % (HOST, DASHBOARD_PORT))
client = Client(cluster)
print(client)
print("Press Enter to quit ...")
input()

if __name__ == '__main__':
run_cluster()

现在我可以从我的 my_stream.py 连接并开始到 submitgather数据:
DASK_CLIENT_IP = '127.0.0.1'
dask_con_string = 'tcp://%s:%s' % (DASK_CLIENT_IP, DASK_CLIENT_PORT)
dask_client = Client(self.dask_con_string)

def my_dask_function(lines):
return lines['a'].mean() + lines['b'].mean

def async_stream_redis_to_d(max_chunk_size = 1000):
while 1:

# This is a redis queue, but can be any queueing/file-stream/syslog or whatever
lines = self.queue_IN.get(block=True, max_chunk_size=max_chunk_size)

futures = []
df = pd.DataFrame(data=lines, columns=['a','b','c'])
futures.append(dask_client.submit(my_dask_function, df))

result = self.dask_client.gather(futures)
print(result)

time sleep(0.1)

if __name__ == '__main__':
max_chunk_size = 1000
thread_stream_data_from_redis = threading.Thread(target=streamer.async_stream_redis_to_d, args=[max_chunk_size])
#thread_stream_data_from_redis.setDaemon(True)
thread_stream_data_from_redis.start()
# Lets go

这按预期工作,而且速度非常快!!!

但接下来,我想实际上 append lines首先在计算发生之前 - 想知道这是否可能?所以在我们这里的例子中,我想计算 mean已提交的所有行,而不仅仅是最后提交的行。

问题/方法:
  • 这种累积计算可能吗?
  • 糟糕的选择 1:我
    在本地缓存所有行和 submit所有数据到集群
    每次新行到达时。这就像一个指数级的开销。试过了,可以用,就是很慢!
  • 黄金选择:Python
    程序 1 推送数据。比有可能与
    另一个客户端(来自另一个 python 程序)到累积数据
    并将分析逻辑从插入逻辑中移开。我想 Published DataSets是要走的路,但是否适用于这种高速附加?

  • 可能相关: Distributed Variables , Actors Worker

    最佳答案

    将 future 列表分配给已发布的数据集对我来说似乎是理想的。这是相对便宜的(一切都是元数据),您将在几毫秒内保持最新状态

    client.datasets["x"] = list_of_futures

    def worker_function(...):
    futures = get_client().datasets["x"]
    data = get_client.gather(futures)
    ... work with data

    正如您提到的,还有其他系统,例如 PubSub 或 Actors。虽然我怀疑 Futures + Published datasets 更简单,更实用的选择。

    关于python-3.x - Dask:持续提交,处理所有提交的数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61776056/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com