gpt4 book ai didi

azure - 从 Durable Function 运行未知数量的事件函数

转载 作者:行者123 更新时间:2023-12-03 06:10:01 26 4
gpt4 key购买 nike

我正在使用 .NET Core 3.1

我有一个需要分成几部分的文件列表

我想为每个部分运行一个事件函数

例如,如果我有 100 个文件名,并且我想将它们分成 10 个部分,那么我最终会得到 10 个部分

我想针对每个批处理执行我的事件函数

由于批处理数量会有所不同,我必须在运行时添加它们

每个部分完全独立,即彼此没有影响

如何使用 Durable Functions 来并行运行每个部分

我当前的代码如下

当针对一个简单的列表运行时,它很好,但这里有更多的部分

这是因为我正在分析一个包含很多文件的文件夹

using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.Extensions.Logging;

namespace Functions
{
public static class FunctionTestParallel
{
[FunctionName("FunctionTestParallel")]
public static async Task<List<string>> RunOrchestrator(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
string directoryPath = @"C:\temp\All Data 3 - Copy";
string searchPattern = "*.*";

using var client = new HttpClient();
string[] files = Directory.GetFiles(directoryPath, searchPattern, SearchOption.AllDirectories);

var hashSet = new HashSet<string>();
var uniqueFiles = new List<string>();

foreach (var file in files)
{
var json = await File.ReadAllTextAsync(file);
if (hashSet.Contains(json) == false)
{
uniqueFiles.Add(file);
hashSet.Add(json);
}
}

var batchSize = 10;

var tranches = BatchList(uniqueFiles, batchSize);

var tasks = new List<Task<string>>();

var enumerable = tranches.ToList();
for (var index = 0; index < enumerable.Count(); index++)
{
var tranche = enumerable.ElementAt(index).ToList();
var dto = new TrancheDataDto
{
Number = index + 1,
Filenames = tranche
};

tasks.Add(context.CallActivityAsync<string>("SayHello", dto));
}

await Task.WhenAll(tasks);

var outputs = new List<string>();
foreach (var task in tasks)
{
outputs.Add(task.Result);
}
return outputs;
}

[FunctionName("SayHello")]
public static async Task<string> SayHello([ActivityTrigger] TrancheDataDto dto, ILogger log)
{
log.LogInformation($"Processing Tranche {dto.Number} with {dto.Filenames.Count} files");

foreach (var filename in dto.Filenames)
{

log.LogInformation($"Processing file: {filename}");

await Task.Delay(100);
}

return $"Processed Tranche {dto.Number}";
}

private static IEnumerable<IEnumerable<T>> BatchList<T>(List<T> source, int batchSize)
{
for (int i = 0; i < source.Count; i += batchSize)
{
yield return source.Skip(i).Take(batchSize);
}
}


[FunctionName("Function1_HttpStart")]
public static async Task<HttpResponseMessage> HttpStart(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", "post")]
HttpRequestMessage req,
[DurableClient] IDurableOrchestrationClient starter,
ILogger log)
{
// Function input comes from the request content.
string instanceId = await starter.StartNewAsync("FunctionTestParallel", null);

log.LogInformation($"Started orchestration with ID = '{instanceId}'.");

return starter.CreateCheckStatusResponse(req, instanceId);
}
}

public class TrancheDataDto
{
public int Number { get; set; }
public List<string> Filenames { get; set; }
}
}

这是错误(与我一直遇到的错误相同)

enter image description here保罗

最佳答案

持久函数是处理长时间运行和可并行工作流程的好方法。但是,您的代码需要进行一些调整,以确保每个部分都是并行且独立处理的。

首先,让我们确保您的事件函数异步运行并返回一个值。以下是如何修改代码以实现每个部分的并行处理:

using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.Azure.WebJobs.Host;
using Microsoft.Extensions.Logging;

namespace _2._0
{
public class TrancheDataDto
{
public int Number { get; set; }
public List<string>? Filenames { get; set; }
}
public static class Function1
{
[FunctionName("Function1")]
public static async Task<List<string>> RunOrchestrator(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
var uniqueFilenames = new List<string>
{
"myfile.txt", "yourfile.txt", "therefile.txt", /* ... */ "file100.txt"
};
var batchSize = 10;

var tranches = BatchList(uniqueFilenames, batchSize);

var tasks = new List<Task<string>>();

for (var index = 0; index < tranches.Count(); index++)
{
var tranche = tranches.ElementAt(index).ToList();
var dto = new TrancheDataDto
{
Number = index + 1,
Filenames = tranche
};

tasks.Add(context.CallActivityAsync<string>("SayHello", dto));
}

await Task.WhenAll(tasks);

var outputs = new List<string>();
foreach (var task in tasks)
{
outputs.Add(task.Result);
}
return outputs;
}

[FunctionName(nameof(SayHello))]
public static async Task<string> SayHello([ActivityTrigger] TrancheDataDto dto, ILogger log)
{
log.LogInformation($"Processing Tranche {dto.Number} with {dto.Filenames.Count} files");

foreach (var filename in dto.Filenames)
{

log.LogInformation($"Processing file: {filename}");

await Task.Delay(100);
}

return $"Processed Tranche {dto.Number}";
}
private static IEnumerable<IEnumerable<T>> BatchList<T>(List<T> source, int batchSize)
{
for (int i = 0; i < source.Count; i += batchSize)
{
yield return source.Skip(i).Take(batchSize);
}
}
}
}

结果 enter image description here

关于azure - 从 Durable Function 运行未知数量的事件函数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/76916711/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com