gpt4 book ai didi

python - 如何为 Durable Azure Functions 编写单元测试?

转载 作者:行者123 更新时间:2023-12-02 06:37:47 25 4
gpt4 key购买 nike

我正在编写一个 Azure Durable Function,并且我想为整个 Azure Function 编写一些单元测试。

我尝试触发客户端功能(通常称为“开始”功能),但无法使其工作。

我这样做有两个原因:

  1. 通过运行“func host start”(或按 F5)然后转到我的浏览器,找到正确的选项卡,转到 http://localhost:7071/api/orchestrators/FooOrchestrator 来运行 Azure Function 代码是令人沮丧的然后返回 VS Code 调试我的代码。
  2. 我想编写一些单元测试来确保项目代码的质量。因此我愿意接受建议,也许只测试 Activity 函数的执行会更容易。

客户端功能代码

这是我的客户端函数的代码,主要是样板代码,如 this one

import logging
import azure.functions as func
import azure.durable_functions as df

async def main(req: func.HttpRequest, starter: str) -> func.HttpResponse:
# 'starter' seems to contains the JSON data about
# the URLs to monitor, stop, etc, the Durable Function
client = df.DurableOrchestrationClient(starter)

# The Client function knows which orchestrator to call
# according to 'function_name'
function_name = req.route_params["functionName"]

# This part fails with a ClientConnectorError
# with the message: "Cannot connect to host 127.0.0.1:17071 ssl:default"
instance_id = await client.start_new(function_name, None, None)

logging.info(f"Orchestration '{function_name}' starter with ID = '{instance_id}'.")

return client.create_check_status_response(req, instance_id)

单元测试尝试

然后我尝试编写一些代码来触发此客户端函数,就像我对某些“经典”Azure Functions 所做的那样:

import asyncio
import json

if __name__ == "__main__":
# Build a simple request to trigger the Client function
req = func.HttpRequest(
method="GET",
body=None,
url="don't care?",
# What orchestrator do you want to trigger?
route_params={"functionName": "FooOrchestrator"},
)

# I copy pasted the data that I obtained when I ran the Durable Function
# with "func host start"
starter = {
"taskHubName": "TestHubName",
"creationUrls": {
"createNewInstancePostUri": "http://localhost:7071/runtime/webhooks/durabletask/orchestrators/{functionName}[/{instanceId}]?code=aakw1DfReOkYCTFMdKPaA1Q6bSfnHZ/0lzvKsS6MVXCJdp4zhHKDJA==",
"createAndWaitOnNewInstancePostUri": "http://localhost:7071/runtime/webhooks/durabletask/orchestrators/{functionName}[/{instanceId}]?timeout={timeoutInSeconds}&pollingInterval={intervalInSeconds}&code=aakw1DfReOkYCTFMdKPaA1Q6bSfnHZ/0lzvKsS6MVXCJdp4zhHKDJA==",
},
"managementUrls": {
"id": "INSTANCEID",
"statusQueryGetUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/INSTANCEID?taskHub=TestHubName&connection=Storage&code=aakw1DfReOkYCTFMdKPaA1Q6bSfnHZ/0lzvKsS6MVXCJdp4zhHKDJA==",
"sendEventPostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/INSTANCEID/raiseEvent/{eventName}?taskHub=TestHubName&connection=Storage&code=aakw1DfReOkYCTFMdKPaA1Q6bSfnHZ/0lzvKsS6MVXCJdp4zhHKDJA==",
"terminatePostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/INSTANCEID/terminate?reason={text}&taskHub=TestHubName&connection=Storage&code=aakw1DfReOkYCTFMdKPaA1Q6bSfnHZ/0lzvKsS6MVXCJdp4zhHKDJA==",
"rewindPostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/INSTANCEID/rewind?reason={text}&taskHub=TestHubName&connection=Storage&code=aakw1DfReOkYCTFMdKPaA1Q6bSfnHZ/0lzvKsS6MVXCJdp4zhHKDJA==",
"purgeHistoryDeleteUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/INSTANCEID?taskHub=TestHubName&connection=Storage&code=aakw1DfReOkYCTFMdKPaA1Q6bSfnHZ/0lzvKsS6MVXCJdp4zhHKDJA==",
"restartPostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/INSTANCEID/restart?taskHub=TestHubName&connection=Storage&code=aakw1DfReOkYCTFMdKPaA1Q6bSfnHZ/0lzvKsS6MVXCJdp4zhHKDJA==",
},
"baseUrl": "http://localhost:7071/runtime/webhooks/durabletask",
"requiredQueryStringParameters": "code=aakw1DfReOkYCTFMdKPaA1Q6bSfnHZ/0lzvKsS6MVXCJdp4zhHKDJA==",
"rpcBaseUrl": "http://127.0.0.1:17071/durabletask/",
}

# I need to use async methods because the "main" of the Client
# uses async.
reponse = asyncio.get_event_loop().run_until_complete(
main(req, starter=json.dumps(starter))
)

但不幸的是,Client 函数在 await client.start_new(function_name, None, None) 部分仍然失败。

如何使用 Python 为我的 Durable Azure Functions 编写一些单元测试?

技术信息

  • Python版本:3.9
  • Azure Functions 核心工具版本 4.0.3971
  • 函数运行时版本:4.0.1.16815

最佳答案

不确定这是否有帮助,这是 Microsoft 关于您正在寻找的内容的单元测试的官方文档 - https://github.com/kemurayama/durable-functions-for-python-unittest-sample

关于python - 如何为 Durable Azure Functions 编写单元测试?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/70619833/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com