gpt4 book ai didi

python - 使用 python 对 pyspark 代码进行单元测试

转载 作者:太空狗 更新时间:2023-10-30 00:42:38 26 4
gpt4 key购买 nike

我在 pyspark 中有脚本,如下所示。我想在此脚本中对 函数 进行单元测试。

def rename_chars(column_name):
chars = ((' ', '_&'), ('.', '_$'))
new_cols = reduce(lambda a, kv: a.replace(*kv), chars, column_name)
return new_cols


def column_names(df):
changed_col_names = df.schema.names
for cols in changed_col_names:
df = df.withColumnRenamed(cols, rename_chars(cols))
return df

我写了一个 unittest 来测试这个功能。

但是我不知道如何提交unittest。我已经完成了 spark-submit,但它什么也没做。

import unittest
from my_script import column_names

from pyspark import SparkContext, SparkConf
from pyspark.sql import HiveContext
conf = SparkConf()
sc = SparkContext(conf=conf)
sqlContext = HiveContext(sc)

cols = ['ID', 'NAME', 'last.name', 'abc test']
val = [(1, 'Sam', 'SMITH', 'eng'), (2, 'RAM', 'Reddy', 'turbine')]
df = sqlContext.createDataFrame(val, cols)


class RenameColumnNames(unittest.TestCase):
def test_column_names(self):
df1 = column_names(df)
result = df1.schema.names
expected = ['ID', 'NAME', 'last_$name', 'abc_&test']
self.assertEqual(result, expected)

如何将此脚本集成为 unittest

我可以在安装了 pyspark 的节点上运行什么?

最佳答案

Pyspark 单元测试指南

1.你需要download从站点进行 Spark 分发并解压缩。或者,如果您已经拥有 Spark 和 Python 的有效发行版,只需安装 pyspark:pip install pyspark

2.如果需要,像这样设置系统变量:

export SPARK_HOME="/home/eugene/spark-1.6.0-bin-hadoop2.6"
export PYTHONPATH="$SPARK_HOME/python/:$SPARK_HOME/python/lib/py4j-0.9-src.zip:$PYTHONPATH"
export PATH="SPARK_HOME/bin:$PATH"

我在主目录的 .profile 中添加了它。 如果您已经有 Spark 的工作分布,则可以设置此变量。

3.另外你可能需要设置:

PYSPARK_SUBMIT_ARGS="--jars path/to/hive/jars/jar.jar,path/to/other/jars/jar.jar --conf spark.driver.userClassPathFirst=true --master local[*] pyspark-shell"
PYSPARK_PYTHON="/home/eugene/anaconda3/envs/ste/bin/python3"

Python 和 jars?是的。 Pyspark uses py4j与 Spark 的 java 部分进行通信。如果你想解决更复杂的情况,比如 run Kafka server with tests in Python或使用 Scala 中的 TestHiveContext,就像在您应该指定 jars 的示例中一样。我是通过 Idea 运行配置环境变量来完成的。

4.你可以使用pyspark/tests.py, pyspark/streaming/tests.py, pyspark/sql/tests.pypyspark/ml/tests.pypyspark/mllib/tests.py脚本,其中包含各种 TestCase 类和用于测试 pyspark 应用程序的示例。在您的情况下,您可以这样做(来自 pyspark/sql/tests.py 的示例):

class HiveContextSQLTests(ReusedPySparkTestCase):

@classmethod
def setUpClass(cls):
ReusedPySparkTestCase.setUpClass()
cls.tempdir = tempfile.NamedTemporaryFile(delete=False)
try:
cls.sc._jvm.org.apache.hadoop.hive.conf.HiveConf()
except py4j.protocol.Py4JError:
cls.tearDownClass()
raise unittest.SkipTest("Hive is not available")
except TypeError:
cls.tearDownClass()
raise unittest.SkipTest("Hive is not available")
os.unlink(cls.tempdir.name)
_scala_HiveContext =\
cls.sc._jvm.org.apache.spark.sql.hive.test.TestHiveContext(cls.sc._jsc.sc())
cls.sqlCtx = HiveContext(cls.sc, _scala_HiveContext)
cls.testData = [Row(key=i, value=str(i)) for i in range(100)]
cls.df = cls.sc.parallelize(cls.testData).toDF()

@classmethod
def tearDownClass(cls):
ReusedPySparkTestCase.tearDownClass()
shutil.rmtree(cls.tempdir.name, ignore_errors=True)

但您需要在 PYSPARK_SUBMIT_ARGS 中指定 --jars 和 Hive 库,如前所述

或者没有 Hive:

class SQLContextTests(ReusedPySparkTestCase):
def test_get_or_create(self):
sqlCtx = SQLContext.getOrCreate(self.sc)
self.assertTrue(SQLContext.getOrCreate(self.sc) is sqlCtx)

据我所知,如果 pyspark 已通过 pip 安装,则您没有示例中描述的 tests.py。在这种情况下,只需从 Spark 站点下载分发版,然后复制代码示例。

现在您可以正常运行您的测试用例了:python -m unittest test.py

更新:由于 spark 2.3 使用 HiveContext 和 SqlContext 已被弃用。您可以使用 SparkSession Hive API。

关于python - 使用 python 对 pyspark 代码进行单元测试,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49420660/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com