作者热门文章
- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在编写一个 Azure Durable Function,并且我想为整个 Azure Function 编写一些单元测试。
我尝试触发客户端功能(通常称为“开始”功能),但无法使其工作。
我这样做有两个原因:
这是我的客户端函数的代码,主要是样板代码,如 this one
import logging
import azure.functions as func
import azure.durable_functions as df
async def main(req: func.HttpRequest, starter: str) -> func.HttpResponse:
# 'starter' seems to contains the JSON data about
# the URLs to monitor, stop, etc, the Durable Function
client = df.DurableOrchestrationClient(starter)
# The Client function knows which orchestrator to call
# according to 'function_name'
function_name = req.route_params["functionName"]
# This part fails with a ClientConnectorError
# with the message: "Cannot connect to host 127.0.0.1:17071 ssl:default"
instance_id = await client.start_new(function_name, None, None)
logging.info(f"Orchestration '{function_name}' starter with ID = '{instance_id}'.")
return client.create_check_status_response(req, instance_id)
然后我尝试编写一些代码来触发此客户端函数,就像我对某些“经典”Azure Functions 所做的那样:
import asyncio
import json
if __name__ == "__main__":
# Build a simple request to trigger the Client function
req = func.HttpRequest(
method="GET",
body=None,
url="don't care?",
# What orchestrator do you want to trigger?
route_params={"functionName": "FooOrchestrator"},
)
# I copy pasted the data that I obtained when I ran the Durable Function
# with "func host start"
starter = {
"taskHubName": "TestHubName",
"creationUrls": {
"createNewInstancePostUri": "http://localhost:7071/runtime/webhooks/durabletask/orchestrators/{functionName}[/{instanceId}]?code=aakw1DfReOkYCTFMdKPaA1Q6bSfnHZ/0lzvKsS6MVXCJdp4zhHKDJA==",
"createAndWaitOnNewInstancePostUri": "http://localhost:7071/runtime/webhooks/durabletask/orchestrators/{functionName}[/{instanceId}]?timeout={timeoutInSeconds}&pollingInterval={intervalInSeconds}&code=aakw1DfReOkYCTFMdKPaA1Q6bSfnHZ/0lzvKsS6MVXCJdp4zhHKDJA==",
},
"managementUrls": {
"id": "INSTANCEID",
"statusQueryGetUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/INSTANCEID?taskHub=TestHubName&connection=Storage&code=aakw1DfReOkYCTFMdKPaA1Q6bSfnHZ/0lzvKsS6MVXCJdp4zhHKDJA==",
"sendEventPostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/INSTANCEID/raiseEvent/{eventName}?taskHub=TestHubName&connection=Storage&code=aakw1DfReOkYCTFMdKPaA1Q6bSfnHZ/0lzvKsS6MVXCJdp4zhHKDJA==",
"terminatePostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/INSTANCEID/terminate?reason={text}&taskHub=TestHubName&connection=Storage&code=aakw1DfReOkYCTFMdKPaA1Q6bSfnHZ/0lzvKsS6MVXCJdp4zhHKDJA==",
"rewindPostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/INSTANCEID/rewind?reason={text}&taskHub=TestHubName&connection=Storage&code=aakw1DfReOkYCTFMdKPaA1Q6bSfnHZ/0lzvKsS6MVXCJdp4zhHKDJA==",
"purgeHistoryDeleteUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/INSTANCEID?taskHub=TestHubName&connection=Storage&code=aakw1DfReOkYCTFMdKPaA1Q6bSfnHZ/0lzvKsS6MVXCJdp4zhHKDJA==",
"restartPostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/INSTANCEID/restart?taskHub=TestHubName&connection=Storage&code=aakw1DfReOkYCTFMdKPaA1Q6bSfnHZ/0lzvKsS6MVXCJdp4zhHKDJA==",
},
"baseUrl": "http://localhost:7071/runtime/webhooks/durabletask",
"requiredQueryStringParameters": "code=aakw1DfReOkYCTFMdKPaA1Q6bSfnHZ/0lzvKsS6MVXCJdp4zhHKDJA==",
"rpcBaseUrl": "http://127.0.0.1:17071/durabletask/",
}
# I need to use async methods because the "main" of the Client
# uses async.
reponse = asyncio.get_event_loop().run_until_complete(
main(req, starter=json.dumps(starter))
)
但不幸的是,Client 函数在 await client.start_new(function_name, None, None)
部分仍然失败。
如何使用 Python 为我的 Durable Azure Functions 编写一些单元测试?
最佳答案
不确定这是否有帮助,这是 Microsoft 关于您正在寻找的内容的单元测试的官方文档 - https://github.com/kemurayama/durable-functions-for-python-unittest-sample
关于python - 如何为 Durable Azure Functions 编写单元测试?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/70619833/
我是一名优秀的程序员,十分优秀!