gpt4 book ai didi

python - 使用 pytest 测试 Spark - 无法在本地模式下运行 Spark

转载 作者:行者123 更新时间:2023-11-28 21:44:15 24 4
gpt4 key购买 nike

我正在尝试使用来自该站点的 pytest 运行 wordcount 测试 - Unit testing Apache Spark with py.test .问题是我无法启动 spark 上下文。我用来运行 Spark 上下文的代码:

@pytest.fixture(scope="session")
def spark_context(request):
""" fixture for creating a spark context
Args:
request: pytest.FixtureRequest object
"""
conf = (SparkConf().setMaster("local[2]").setAppName("pytest-pyspark-local-testing"))
sc = SparkContext(conf=conf)
request.addfinalizer(lambda: sc.stop())

quiet_py4j()
return sc

我使用命令执行此代码:

#first way
pytest spark_context_fixture.py

#second way
python spark_context_fixture.py

输出:

platform linux2 -- Python 2.7.5, pytest-3.0.4, py-1.4.31, pluggy-0.4.0
rootdir: /home/mgr/test, inifile:
collected 0 items

然后我想使用 pytest 运行 wordcount 测试。

pytestmark = pytest.mark.usefixtures("spark_context")

def test_do_word_counts(spark_context):
""" test word couting
Args:
spark_context: test fixture SparkContext
"""
test_input = [
' hello spark ',
' hello again spark spark'
]

input_rdd = spark_context.parallelize(test_input, 1)
results = wordcount.do_word_counts(input_rdd)

expected_results = {'hello':2, 'spark':3, 'again':1}
assert results == expected_results

但是输出是:

________ ERROR at setup of test_do_word_counts _________
file /home/mgrabowski/test/wordcount_test.py, line 5
def test_do_word_counts(spark_context):
E fixture 'spark_context' not found
> available fixtures: cache, capfd, capsys, doctest_namespace, monkeypatch, pytestconfig, record_xml_property, recwarn, tmpdir, tmpdir_factory
> use 'pytest --fixtures [testpath]' for help on them.

有谁知道这个问题的原因是什么?

最佳答案

我做了一些研究,终于找到了解决方案。我使用 Spark 1.6。

首先,我在 .bashrc 文件中添加了两行。

export SPARK_HOME=/usr/hdp/2.5.0.0-1245/spark
export PYTHONPATH=$SPARK_HOME/python/:$SPARK_HOME/python/lib/py4j-0.9-src.zip:$PYTHONPA‌​TH

然后我创建了文件“conftest.py”。文件名非常重要,您不应该更改它,否则您会看到 spark_context 错误。如果您在本地模式下使用 Spark 并且不使用 YARN,conftest.py 应该如下所示:

import logging
import pytest

from pyspark import HiveContext
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

def quiet_py4j():
logger = logging.getLogger('py4j')
logger.setLevel(logging.WARN)

@pytest.fixture(scope="session")
def spark_context(request):
conf = (SparkConf().setMaster("local[2]").setAppName("pytest-pyspark-local-testing"))
request.addfinalizer(lambda: sc.stop())

sc = SparkContext(conf=conf)
quiet_py4j()
return sc

@pytest.fixture(scope="session")
def hive_context(spark_context):
return HiveContext(spark_context)

@pytest.fixture(scope="session")
def streaming_context(spark_context):
return StreamingContext(spark_context, 1)

现在您可以使用简单的 pytest 命令运行测试。 Pytest 应该运行 Spark 并最终停止它。

如果您使用 YARN,您可以将 conftest.py 更改为:

    import logging
import pytest

from pyspark import HiveContext
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

def quiet_py4j():
""" turn down spark logging for the test context """
logger = logging.getLogger('py4j')
logger.setLevel(logging.WARN)

@pytest.fixture(scope="session",
params=[pytest.mark.spark_local('local'),
pytest.mark.spark_yarn('yarn')])
def spark_context(request):
if request.param == 'local':
conf = (SparkConf()
.setMaster("local[2]")
.setAppName("pytest-pyspark-local-testing")
)
elif request.param == 'yarn':
conf = (SparkConf()
.setMaster("yarn-client")
.setAppName("pytest-pyspark-yarn-testing")
.set("spark.executor.memory", "1g")
.set("spark.executor.instances", 2)
)
request.addfinalizer(lambda: sc.stop())

sc = SparkContext(conf=conf)
return sc

@pytest.fixture(scope="session")
def hive_context(spark_context):
return HiveContext(spark_context)

@pytest.fixture(scope="session")
def streaming_context(spark_context):
return StreamingContext(spark_context, 1)

现在您可以通过调用 py.test -m spark_local 在本地模式下运行测试,在 YARN 模式下通过调用 py.test -m spark_yarn 运行测试。

字数统计示例

在同一个文件夹中创建三个文件:conftest.py(上面),wordcount.py:

def do_word_counts(lines):
counts = (lines.flatMap(lambda x: x.split())
.map(lambda x: (x, 1))
.reduceByKey(lambda x, y: x+y)
)
results = {word: count for word, count in counts.collect()}
return results

和 wordcount_test.py:

import pytest
import wordcount

pytestmark = pytest.mark.usefixtures("spark_context")

def test_do_word_counts(spark_context):
test_input = [
' hello spark ',
' hello again spark spark'
]

input_rdd = spark_context.parallelize(test_input, 1)
results = wordcount.do_word_counts(input_rdd)

expected_results = {'hello':2, 'spark':3, 'again':1}
assert results == expected_results

现在您可以通过调用 pytest 来运行测试。

关于python - 使用 pytest 测试 Spark - 无法在本地模式下运行 Spark,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40975360/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com