gpt4 book ai didi

python - Azure blob触发器python函数对每个子文件夹执行多次并创建文件的多个副本

转载 作者:行者123 更新时间:2023-12-03 06:12:10 27 4
gpt4 key购买 nike

  1. 监控容器输入/着陆
  2. .json 文件以 yy/mm/DD/myfile.json 格式到达
  3. 如果 json 文件有效 --> 将其移至 input/staging/.json
  4. 如果无效 --> 复制到 input/rejected/.json

每个子文件夹的函数都会触发多次,并且输出文件夹具有同一文件的 3 个副本。如何修改函数只触发一次并且只复制文件一次?

导入日志记录导入 azure.functions 作为 func导入json

我的init.py

def main(myblob: func.InputStream, inputBlob: bytes, outputBlob1: func.Out[bytes], outputBlob2: func.Out[bytes]):
logging.info(f"Python blob trigger function processed blob \n"
f"Name: {myblob.name}\n"
f"Blob Size: {myblob.length} bytes")

# Read the contents of the input blob
blob_content = myblob.read()
processed_file = validateJSON(blob_content) # returns True or False

# if pass json validation
if processed_file:
outputBlob1.set(myblob.read())
logging.info(f"Blob copied to outputBlob1: {myblob.name}")
else:
outputBlob2.set(myblob.read())
logging.info(f"Blob copied to outputBlob2: {myblob.name}")

# func to validate json data (not file!)
def validateJSON(jsonData):
try:
json.loads(jsonData)
except ValueError as err:
return False
return True

我的 function.json 文件:

{
"scriptFile": "__init__.py",
"bindings": [
{
"name": "myblob",
"type": "blobTrigger",
"direction": "in",
"path": "input/landing/{name}",
"connection": "mystorageaccount"
},
{
"name": "inputBlob",
"type": "blob",
"dataType": "binary",
"direction": "in",
"path": "input/landing/{name}",
"connection": "mystorageaccount"
},
{
"name": "outputBlob1",
"type": "blob",
"dataType": "binary",
"direction": "out",
"path": "input/staging/{rand-guid}.json",
"connection": "mystorageaccount"
},
{
"name": "outputBlob2",
"type": "blob",
"dataType": "binary",
"direction": "out",
"path": "input/regected/{rand-guid}.json",
"connection": "mystorageaccount"
}
]
}

我的终端输出:

[2023-07-08T14:44:03.452Z] Host lock lease acquired by instance ID '000000000000000000000000FA91B3A1'.
[2023-07-08T14:46:27.618Z] Executing 'Functions.BlobTrigger1' (Reason='New blob detected(LogsAndContainerScan): input/landing/2023/07',

[2023-07-08T14:46:28.031Z] Python blob trigger function processed blob
Name: input/landing/2023/07
Blob Size: None bytes
[2023-07-08T14:46:28.164Z] Blob copied to outputBlob2: input/landing/2023/07
[2023-07-08T14:46:28.282Z] Executing 'Functions.BlobTrigger1' (Reason='New blob detected(LogsAndContainerScan): input/landing/2023/07/08',

[2023-07-08T14:46:28.485Z] Python blob trigger function processed blob
Name: input/landing/2023/07/08
Blob Size: None bytes[2023-07-08T14:46:28.500Z] Blob copied to outputBlob2: input/landing/2023/07/08

[2023-07-08T14:46:28.991Z] Executed 'Functions.BlobTrigger1' (Succeeded, Id=6a6e5f58-b49e-46c9-a019-c8814c87e5fb, Duration=1656ms)
[2023-07-08T14:46:29.166Z] Executed 'Functions.BlobTrigger1' (Succeeded, Id=cfe1f858-fe5e-46cd-85fd-281fff7a0204, Duration=1057ms)
[2023-07-08T14:46:29.330Z] Executing 'Functions.BlobTrigger1' (Reason='New blob detected(LogsAndContainerScan): input/landing/2023/07/08/invalidJSON.json', Id=5a81c13f-b633-4be1-bdac-7281389f4403)

[2023-07-08T14:46:29.629Z] Python blob trigger function processed blob
Name: input/landing/2023/07/08/invalidJSON.json
Blob Size: None bytes
[2023-07-08T14:46:29.629Z] Blob copied to outputBlob2: input/landing/2023/07/08/invalidJSON.json
[2023-07-08T14:46:30.211Z] Executed 'Functions.BlobTrigger1' (Succeeded, Id=5a81c13f-b633-4be1-bdac-7281389f4403, Duration=1157ms)

结果:多个副本

enter image description here

最佳答案

Azure blob trigger python function executes multiple times for each subfolder and creates multiple copies of the file

我已经在我的环境中重现了,下面是对我有用的代码:

function.json:

{
"bindings": [
{
"name": "myblob",
"path": "samples-workitems/land/{name}",
"connection": "AzureWebJobsStorage",
"direction": "in",
"type": "blobTrigger"
},
{
"name": "outputBlob1",
"direction": "out",
"type": "blob",
"connection": "AzureWebJobsStorage",
"path": "samples-workitems/approved/{rand-guid}.json"
},
{
"name": "outputBlob2",
"direction": "out",
"type": "blob",
"connection": "AzureWebJobsStorage",
"path": "samples-workitems/rejected/{rand-guid}.json"
}
]
}

init.py:

import logging
import azure.functions as func
import json



def main(myblob: func.InputStream, outputBlob1: func.Out[bytes], outputBlob2: func.Out[bytes]):
logging.info(f"Python blob trigger function processed blob \n"
f"Name: {myblob.name}\n"
f"Blob Size: {myblob.length} bytes")



blob_content1 = myblob.read()
processed_file = validateJSON(blob_content1) # returns True or False



# if pass json validation
if processed_file:
outputBlob1.set(blob_content1)
logging.info(f"Blob copied to outputBlob1: {myblob.name}")
else:
outputBlob2.set(blob_content1)
logging.info(f"Blob copied to outputBlob2: {myblob.name}")







# func to validate json data (not file!)
def validateJSON(jsonData1):
try:
json.loads(jsonData1)
except ValueError as err:
return False
return True

输出:

如果成功:

enter image description here

enter image description here如果被拒绝:

enter image description here

enter image description here

这是对我有用的代码和过程,尝试更改 function.json(我观察到 4 个绑定(bind),将其更改为 3)和 init 文件(为什么根据我的说法,您是否使用 inputblob 将其删除)。尝试更改您的代码,您将得到我想要的输出

关于python - Azure blob触发器python函数对每个子文件夹执行多次并创建文件的多个副本,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/76643565/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com