gpt4 book ai didi

testing - Foundry 转换的 Python 单元测试?

转载 作者:行者123 更新时间:2023-12-03 23:43:19 24 4
gpt4 key购买 nike

我想在我的转换为 Foundry 时设置测试,传递测试输入并检查输出是否是预期的。是否可以使用虚拟数据集(repo 中的 .csv 文件)调用转换,或者我应该在转换中创建函数以供测试调用(在代码中创建的数据)?

最佳答案

如果您查看 Code Repositories 下的平台文档-> Python Transforms -> Python Unit Tests ,您会在那里找到很多有用的资源。
特别是关于编写和运行测试的部分是您正在寻找的。

//开始文档
编写测试
完整的文档可以在 https://docs.pytest.org 找到
Pytest 在任何以 test_ 开头的 Python 文件中查找测试。
建议将你所有的测试放到你项目的 src 目录下的一个测试包中。
测试只是 Python 函数,它们也以 test_ 前缀命名,并且使用 Python 的 assert 语句进行断言。
PyTest 还将运行使用 Python 的内置 unittest 模块编写的测试。
例如,在 transforms-python/src/test/test_increment.py 中,一个简单的测试如下所示:

def increment(num):
return num + 1

def test_increment():
assert increment(3) == 5
运行此测试将导致检查失败,并显示如下所示的消息:
============================= test session starts =============================
collected 1 item

test_increment.py F [100%]

================================== FAILURES ===================================
_______________________________ test_increment ________________________________

def test_increment():
> assert increment(3) == 5
E assert 4 == 5
E + where 4 = increment(3)

test_increment.py:5: AssertionError
========================== 1 failed in 0.08 seconds ===========================
使用 PySpark 进行测试
PyTest 固定装置是一项强大的功能,只需添加同名参数即可将值注入(inject)测试函数。此功能用于提供 spark_session 固定装置以在您的测试功能中使用。例如:
def test_dataframe(spark_session):
df = spark_session.createDataFrame([['a', 1], ['b', 2]], ['letter', 'number'])
assert df.schema.names == ['letter', 'number']
//结束文档

如果您不想在代码中指定模式,也可以按照 How To 下文档中的说明读取存储库中的文件。 -> Read file in Python repository
//开始文档
读取 Python 存储库中的文件
您可以将存储库中的其他文件读取到转换上下文中。这可能有助于为您的转换代码设置参数以供引用。
首先,在您的 python 存储库中编辑 setup.py:
setup(
name=os.environ['PKG_NAME'],
# ...
package_data={
'': ['*.yaml', '*.csv']
}
)
这告诉 python 将 yaml 和 csv 文件捆绑到包中。然后在 python 转换旁边放置一个配置文件(例如 config.yaml,但也可以是 csv 或 txt)(例如 read_yml.py 见下文):
- name: tbl1
primaryKey:
- col1
- col2
update:
- column: col3
with: 'XXX'
您可以在转换 read_yml.py 中阅读它使用以下代码:
from transforms.api import transform_df, Input, Output
from pkg_resources import resource_stream
import yaml
import json

@transform_df(
Output("/Demo/read_yml")
)
def my_compute_function(ctx):
stream = resource_stream(__name__, "config.yaml")
docs = yaml.load(stream)
return ctx.spark_session.createDataFrame([{'result': json.dumps(docs)}])
所以你的项目结构是:
  • 一些文件夹
  • config.yaml
  • read_yml.py


  • 这将在您的数据集中输出一行,其中包含一列“结果”和内容:
    [{"primaryKey": ["col1", "col2"], "update": [{"column": "col3", "with": "XXX"}], "name": "tbl1"}]
    //结束文档

    关于testing - Foundry 转换的 Python 单元测试?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64439809/

    24 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com