gpt4 book ai didi

python - Azure Databricks 中的多重处理

转载 作者:行者123 更新时间:2023-12-02 23:09:18 26 4
gpt4 key购买 nike

我最近的任务是将 JSON 响应摄取到 Databricks Delta-lake 上。我必须使用不同的参数访问 REST API 端点 URL 6500 次并提取响应。

我尝试了多处理库中的两个模块:ThreadPool 和 Pool,以使每次执行速度更快一些。

线程池:

  1. 当 Azure Databricks 群集设置为从 2 个工作节点自动缩放到 13 个工作节点时,如何选择 ThreadPool 的线程数?

现在,我已经设置了n_pool = multiprocessing.cpu_count(),如果集群自动扩展,会有什么不同吗?

  1. 当我使用 Pool 来使用处理器而不是线程时。我在每次执行时随机看到以下错误。好吧,我从错误中了解到 Spark Session/Conf 丢失,我需要从每个进程中设置它。但我在 Databricks 上启用了默认 Spark session ,那么为什么我会看到这些错误。
Py4JError: SparkConf does not exist in the JVM 
**OR**
py4j.protocol.Py4JError: org.apache.spark.api.python.PythonUtils.getEncryptionEnabled does not exist in the JVM
  • 最后,计划用“concurrent.futures.ProcessPoolExecutor”替换多处理。有什么区别吗?
  • 最佳答案

    如果您使用线程池,它们将仅在驱动程序节点上运行,执行程序将处于空闲状态。相反,您需要使用 Spark 本身来并行化请求。这通常是通过创建一个包含 URL 列表(如果基本 URL 相同则为 URL 参数)的数据框来完成的,然后使用 Spark user defined function做实际的请求。像这样的事情:

    import urllib

    df = spark.createDataFrame([("url1", "params1"), ("url2", "params2")],
    ("url", "params"))

    @udf("body string, status int")
    def do_request(url: str, params: str):
    full_url = url + "?" + params # adjust this as required
    with urllib.request.urlopen(full_url) as f:
    status = f.status
    body = f.read().decode("utf-8")

    return {'status': status, 'body': body}


    res = df.withColumn("result", do_requests(col("url"), col("params")))

    这将返回带有名为 result 的新列的数据帧,该列将有两个字段 - statusbody (JSON 答案作为字符串)。

    关于python - Azure Databricks 中的多重处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/71094840/

    26 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com