gpt4 book ai didi

python - 如何使单元测试模拟代码对每个 spark worker 节点(进程)生效

转载 作者:行者123 更新时间:2023-12-05 06:01:25 29 4
gpt4 key购买 nike

我有三个文件:

  1. spark_mock_dependency.py 提供了一个user() 方法来读取/etc/user,
  2. spark_mock.py 用于创建一个 Env 类,它使用 user() 方法来获取用户是谁。
  3. spark_mock_test.py 用于 Env 类的单元测试。

我的环境中没有/etc/user,所以我需要模拟方法user() 来伪造它。但是,单元测试 test_env_without_spark 有效,但 test_env_with_spark 无效。看起来模拟只适用于驱动程序节点,我不能模拟所有工作节点(进程)上的类或方法。请参阅我的代码和下面的错误。

有没有人知道如何在所有工作节点(进程)上模拟方法?

spark_mock_dependency.py

def user():
with open('/etc/user') as f:
return f.readline().strip()

spark_mock.py

from pkgname.spark_mock_dependency import user


class Env:
user = user()

spark_mock_test.py

import unittest
from unittest.mock import patch
from pyspark import SparkConf, SparkContext


class EnvTest(unittest.TestCase):
sc = None

@classmethod
def setUpClass(cls) -> None:
conf = SparkConf().setMaster("local[2]").setAppName("testing")
cls.sc = SparkContext(conf=conf)

@patch('pkgname.spark_mock_dependency.user')
def test_env_with_spark(self, user_mocker):
user_mocker.return_value = 'anyone'

from pkgname.spark_mock import Env
rdd = self.__class__.sc.parallelize([1, 2])
results = rdd.map(lambda: f'{Env.user}').collect()
self.assertTrue([res == 'anyone' for res in results])

@patch('pkgname.spark_mock_dependency.hb_user')
def test_env_without_spark(self, user_mocker):
user_mocker.return_value = 'anyone'
from pkgname.spark_mock import Env
self.assertEqual('anyone', Env.user)

错误信息

py4j.protocol.Py4JJavaError: An error occurred while calling 
z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0) (executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
"lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 586, in main
func, profiler, deserializer, serializer = read_command(pickleSer, infile)
"lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 69, in read_command
command = serializer._read_with_length(file)
"lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 160, in _read_with_length
return self.loads(obj)
"lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 430, in loads
return pickle.loads(obj, encoding=encoding)
"lib/python3.6/site-packages/pkgname/spark_mock.py", line 4, in <module>
class Env:
"lib/python3.6/site-packages/pkgname/spark_mock.py", line 5, in Env
user = user()
"lib/python3.6/site-packages/pkgname/spark_mock_dependency.py", line 2, in hb_user
with open('/etc/user') as f:
FileNotFoundError: [Errno 2] No such file or directory: '/etc/user'

最佳答案

您可以在单元测试中导入用户包并模拟该包本身

检查下面的代码:

def test_env_with_spark(self, user_mocker):

from pkgname.spark_mock import Env, user
user_mocker.patch("pkgname.spark_mock.user", return_value='anyone')
rdd = self.__class__.sc.parallelize([1, 2])
results = rdd.map(lambda: f'{Env.user}').collect()

关于python - 如何使单元测试模拟代码对每个 spark worker 节点(进程)生效,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/67207586/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com