gpt4 book ai didi

python - PySpark 在 Synapse 链接服务之间切换

转载 作者:行者123 更新时间:2023-12-02 23:10:51 25 4
gpt4 key购买 nike

我在 Synapse 工作区中为两个 AzureDataLakeStorage-Gen2 设置了两个链接服务:

  • one_linked_service_name
  • two_linked_service_name

我需要将数据从一个 ADLS 复制(并进行转换)到另一个 ADLS。

我有一个要复制的 Parquet 目录列表。有时代码执行得很顺利,但随后它似乎在列表中间随机崩溃。令人抓狂的事实是我无法可靠地重现该错误。

起初我认为也许我必须在步骤之间收集数据,因为我想象延迟执行意味着 PySpark 在切换 linkedServiceName 之前没有检索数据,但是错误(随机)仍然存在。

我现在已经没有主意了。如有任何帮助,我们将不胜感激。

代码:

for filepath_source in filepaths:
spark.conf.set("fs.azure.account.auth.type", "SAS")
spark.conf.set("fs.azure.sas.token.provider.type", "com.microsoft.azure.synapse.tokenlibrary.LinkedServiceBasedSASProvider")


spark.conf.set("spark.storage.synapse.linkedServiceName", one_linked_service_name)
print("Switched to", spark.conf.get("spark.storage.synapse.linkedServiceName"))
df = spark.read.option("forwardSparkAzureStorageCredentials ", "true").parquet(ZONE_RAW_CONNECTION_STR + "/" + filepath_source)

_ = df.collect() # test, but did not change the outcome.


# Store into /refined/ zone
spark.conf.set("spark.storage.synapse.linkedServiceName", two_linked_service_name)
print("Switched to", spark.conf.get("spark.storage.synapse.linkedServiceName"))
df.write \
.mode("overwrite") \
.option("forwardSparkAzureStorageCredentials ", "true")\
.option("compression", "snappy") \
.parquet(ZONE_REFINED_CONNECTION_STR + "/" + filepath_target)

错误:

Caused by: java.nio.file.AccessDeniedException: Operation failed: 
"Server failed to authenticate the request.
Make sure the value of Authorization header is formed correctly including the signature.", 403, HEAD,
https://mydatastorename.dfs.core.windows.net/some/path?upn=false&action=getStatus&timeout=90&sv=2020-02-10&ss=bf&srt=sco&se=2022-02-17T17%3A13%3A26Z&sp=rwdl&sig=XXXX
at org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.execute(AbfsRestOperation.java:199)
at org.apache.hadoop.fs.azurebfs.services.AbfsClient.getPathStatus(AbfsClient.java:560)
at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.openFileForRead(AzureBlobFileSystemStore.java:627)
at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.open(AzureBlobFileSystem.java:196)
... 26 more
Traceback (most recent call last):

File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 677, in collect
sock_info = self._jdf.collectToPython()

File "/home/trusted-service-user/cluster-env/env/lib/python3.8/site-packages/py4j/java_gateway.py", line 1304, in __call__
return_value = get_return_value(

File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line
111, in deco
return f(*a, **kw)

File "/home/trusted-service-user/cluster-env/env/lib/python3.8/site-packages/py4j/protocol.py", line 326, in get_return_value
raise Py4JJavaError(

最佳答案

为什么不将数据帧保存在 DBFS/tmp/data/文件夹中,而不是执行collect(),然后写入其他 adls。这应该可行。

关于python - PySpark 在 Synapse 链接服务之间切换,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/71202061/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com