gpt4 book ai didi

python - pySpark addfile 选项,执行器中的工作人员会发生什么

转载 作者:太空宇宙 更新时间:2023-11-03 13:58:49 26 4
gpt4 key购买 nike

很明显,为了更好地分发小型查找数据,使用广播变量。

假设我们在yarn客户端模式下从主节点运行pySpark代码(spark提交)。因此应用程序驱动程序将始终在主节点上创建。我们从主节点上的本地路径读取文件。

with open('/tmp/myfile.txt', 'r') as f:
lookup = {}
for line in f.readlines():
line = parse(line) # Method parse uses re and return dict
lookup[line['name']] = line['age']

然后我们创建广播变量并使用它:

lookupBC = sc.broadcast(lookup)

output = sc.textFile('/path/to/hdfs/')\
.map(lambda e: (lookupBC.value.get(e, e), 1))\
.collect()

在我们的例子中,这个 bc var 是在驱动程序(主节点)上创建的,并且 Spark 在集群中的所有数据节点之间复制这个 var,其中创建了执行器,并将其保存在这些节点的内存中。因此文件将被读取一次,然后分发给执行者。

如果我们使用addFile选项会发生什么?

sc.addFile('/tmp/myfile.txt')

with open(SparkFiles.get('/tmp/myfile.txt')) as f:
lookup = {}
for line in f.readlines():
line = parse(line) # Method parse uses re and return dict
lookup[line['name']] = line['age']

output = sc.textFile('/path/to/hdfs/')\
.map(lambda e: (lookup.get(e, e), 1))\
.collect()

Spark 会将文件 '/tmp/myfile.txt' 复制到每个节点,在该节点上创建执行器。然后:

  1. 文件将被读取多少次?特定节点上每个执行程序一次?或者每个任务一次?
  2. 具体步骤是什么,代码将如何在执行器上处理?
  3. addFile 和 bc var 哪个更好用?
  4. spark 会基于 pyspark 代码进行任何优化并创建隐式 bc 变量吗?

在执行程序日志中,我看到有关 bc 变量的信息,但我没有在代码中使用任何信息:

18/03/21 15:36:27 INFO util.Utils: Fetching spark://172.25.235.201:36478/files/myfile.txt to /data/disk01/yarn/nm/usercache/testuser/appcache/application_1520754626920_6227/spark-f3d19076-0642-4db8-961d-99daae0dfaff/fetchFileTemp230224632617642846.tmp
18/03/21 15:36:27 INFO util.Utils: Copying /data/disk01/yarn/nm/usercache/testuser/appcache/application_1520754626920_6227/spark-f3d19076-0642-4db8-961d-99daae0dfaff/-17884647971521635771454_cache to /data/disk01/yarn/nm/usercache/testuser/appcache/application_1520754626920_6227/container_1520754626920_6227_01_000002/./myfile.txt
18/03/21 15:36:28 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 1
18/03/21 15:36:28 INFO client.TransportClientFactory: Successfully created connection to strt01we.ebb.er.com/172.25.235.216:43791 after 4 ms (0 ms spent in bootstraps)
18/03/21 15:36:28 INFO memory.MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 6.3 KB, free 366.3 MB)
18/03/21 15:36:28 INFO broadcast.TorrentBroadcast: Reading broadcast variable 1 took 551 ms

最佳答案

广播变量似乎被加载到内存中,直到它们被显式销毁。相比之下,sc.addFile似乎正在创建一个副本到磁盘(对于每个执行程序)。所以我会猜测SparkFiles.get()每次调用时都会将文件加载到内存中。

  • 因此,在上面的示例中,它将加载一次。
  • 但是如果您调用 SparkFiles.get()里面.map() ,它会尝试为 RDD 中的每个条目重新加载文件。

最后回答一下大家的问题

How many times the file will be read? One time per executor on particular node? or one time per task?

取决于,哪里 .get被调用,如上所述。

What will be the steps, how the code will be processed on executor?

我不明白这部分。

What to use better addFile or bc var?

这些是不同的用例。例如,考虑我们有 1GB sqliteDB 转储的情况。 Spark 可以通过 JDBC 连接到该数据库对象。它实际上并不需要将整个对象加载到内存中。

Will spark do any optimizations based on pyspark code and create implicit bc vars?

不确定,但我不这么认为。

关于python - pySpark addfile 选项,执行器中的工作人员会发生什么,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49408272/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com