- c - 在位数组中找到第一个零
- linux - Unix 显示有关匹配两种模式之一的文件的信息
- 正则表达式替换多个文件
- linux - 隐藏来自 xtrace 的命令
这里是重现的步骤。下面的程序使用 .Net Core 控制台应用程序和 EF Core 将 10,000 行从一个 SQL 表复制到另一个。该程序分 100 批插入记录,并且(这很重要!)它为每次插入创建一个新的 DbContext 实例。
1) 创建 SQL Server 数据库,以及“Froms”和“Tos”表:
create table Froms (
Id int identity(1, 1) not null,
Guid [uniqueidentifier] not null,
constraint [PK_Froms] primary key clustered (Id asc)
)
go
create table Tos (
Id int not null,
Guid [uniqueidentifier] not null,
constraint [PK_Tos] primary key clustered (Id asc)
)
go
2) 填充“发件人”表:
set nocount on
declare @i int = 0
while @i < 10000
begin
insert Froms (Guid)
values (newid())
set @i += 1
end
go
3) 创建名为 TestForEachAsync
的 .Net Core 控制台应用程序项目。将 C# 版本更改为 7.1 或更高版本(async Main
需要)。添加 Microsoft.EntityFrameworkCore.SqlServer
nuget 包。
4) 创建类:
数据库实体
using System;
namespace TestForEachAsync
{
public class From
{
public int Id { get; set; }
public Guid Guid { get; set; }
}
}
using System;
namespace TestForEachAsync
{
public class To
{
public int Id { get; set; }
public Guid Guid { get; set; }
}
}
DbContext
using Microsoft.EntityFrameworkCore;
namespace TestForEachAsync
{
public class Context : DbContext
{
public DbSet<From> Froms { get; set; }
public DbSet<To> Tos { get; set; }
protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
{
optionsBuilder.UseSqlServer("YOUR_CONNECTION_STRING");
}
}
}
主要
using System;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
namespace TestForEachAsync
{
internal class Program
{
private static async Task Main(string[] args)
{
//Get the "froms"
var selectContext = new Context();
var froms = selectContext.Froms.Select(f => new { f.Id, f.Guid });
int count = 0;
Task<int> saveChangesTask = null;
Context insertContext = new Context();
Context prevInsertContext = null;
//Iterate through "froms"
await froms.ForEachAsync(
async f =>
{
//Add instace of "to" to the context
var to = new To { Id = f.Id, Guid = f.Guid };
await insertContext.Tos.AddAsync(to);
count++;
//If another 100 of "to"s has been added to the context...
if (count % 100 == 0)
{
//Wait for the previous 100 "to"s to finish saving to the database
if (saveChangesTask != null)
{
await saveChangesTask;
}
//Start saving the next 100 "to"s
saveChangesTask = insertContext.SaveChangesAsync();
//Dispose of the context that was used to save previous 100 "to"s
prevInsertContext?.Dispose();
//Reassign the context used to save the current 100 "to"s to a "prev" variable,
//and set context variable to the new Context instance.
prevInsertContext = insertContext;
insertContext = new Context();
}
}
);
//Wait for second last 100 "to"s to finish saving to the database
if (saveChangesTask != null)
{
await saveChangesTask;
}
//Save the last 100 "to"s to the database
await insertContext.SaveChangesAsync();
insertContext.Dispose();
Console.WriteLine("Done");
Console.ReadKey();
}
}
}
5) 运行应用程序 - 您会收到异常 The connection does not support MultipleActiveResultSets
。看起来正在对 insertContext
启动多个操作,但我不明白为什么。
6) 我找到了两种解决问题的方法:
await froms.ForEachAsync(...)
循环替换为“正常”循环 foreach (var f in froms) {...}
,或<await saveChangesTask;
替换为 saveChangesTask.Wait();
但是有人能解释一下为什么原始代码不能按我的预期工作吗?
注意:如果您多次运行该应用,请不要忘记在每次运行前截断“Tos”表。
最佳答案
您陷入了将异步 lambda 传递给期望返回 void 的委托(delegate)的方法的典型陷阱(在这种特殊情况下为 Action<T>
),如 Stephen Toub 所述在 Potential pitfalls to avoid when passing around async lambdas .这实际上相当于使用 async void
有它的陷阱,因为你的异步代码根本不是 await
-ed,从而打破了它的内部逻辑。
像往常一样,解决方案是一个特殊的重载,它接受 Func<T, Task>
而不是 Action<T>
.可能它应该由 EF Core 提供(您可以考虑为此发布一个请求),但现在您可以使用类似这样的东西自己实现它:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore.Extensions.Internal;
namespace Microsoft.EntityFrameworkCore
{
public static class AsyncExtensions
{
public static Task ForEachAsync<T>(this IQueryable<T> source, Func<T, Task> action, CancellationToken cancellationToken = default) =>
source.AsAsyncEnumerable().ForEachAsync(action, cancellationToken);
public static async Task ForEachAsync<T>(this IAsyncEnumerable<T> source, Func<T, Task> action, CancellationToken cancellationToken = default)
{
using (var asyncEnumerator = source.GetEnumerator())
while (await asyncEnumerator.MoveNext(cancellationToken))
await action(asyncEnumerator.Current);
}
}
}
这基本上是 EF Core implementation添加了 await
的 action
.
完成后,您的代码将解析为该方法,一切都应按预期工作。
关于c# - Microsoft.EntityFrameworkCore.EntityFrameworkQueryableExtensions.ForEachAsync<T>() 的意外行为,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50920636/
我预计此代码需要 1 秒才能执行: public async void Test() { DateTime start = DateTime.Now; await Parallel.F
如果我在下面的代码中使用 ForEachAsync,我应该使用列表还是并发集合? var images = new List(); // or a concurrency collection? aw
下面是 ForEachAsync 的实现 written by Stephen Toub . public static Task ForEachAsync(this IEnumerable sour
在 .NET 5 中,我们有 Parallel.ForEach,您可以使用 ParallelLoopState.Break() 方法来停止处理额外的迭代。允许当前的完成处理。 但是新的 .NET 6
我必须对项目列表运行一个函数。我正在使用 Azure Durable Functions,并且可以使用它们的 fan out/fan in 并行运行这些项目。策略。 但是,我想知道这样做与在单个 Ac
我必须对项目列表运行一个函数。我正在使用 Azure Durable Functions,并且可以使用它们的 fan out/fan in 并行运行这些项目。策略。 但是,我想知道这样做与在单个 Ac
我有代码可以从 SQL 流式传输数据并将其写入不同的存储。代码大概是这样的: using (var cmd = new SqlCommand("select * from MyTable", conn
我最近发现下面的代码可以有效地运行大量 I/O 绑定(bind)任务: Implementing a simple ForEachAsync, part 2 我的印象是以下内容是真实的: 这比使用 P
这里是重现的步骤。下面的程序使用 .Net Core 控制台应用程序和 EF Core 将 10,000 行从一个 SQL 表复制到另一个。该程序分 100 批插入记录,并且(这很重要!)它为每次插入
我正在试验如何跳出 ForEachAsync 循环。 break 不起作用,但我可以在 CancellationTokenSource 上调用 Cancel。 ForEachAsync 的签名有两个标
下面应该返回“C”,但它返回“B” using System.Data.Entity; //... var state = "A"; var qry = (from f in db.myTable s
在 .NET 6 项目中,我必须调用偏移分页(页面/每页)的 Web API,并且我希望尽可能使 n 个调用并行。 这是使用给定页码一次调用 API 的方法: private Task CallApi
我是一名优秀的程序员,十分优秀!