gpt4 book ai didi

c# - 使用 "ConcurrentDictionary.GetOrAdd()"时避免陈旧(逻辑上损坏)数据,包括重现代码

转载 作者:太空狗 更新时间:2023-10-29 22:27:41 26 4
gpt4 key购买 nike

The bottom of this article描述了使用 GetOrAdd 如何导致(如果我理解正确的话)损坏/意外结果。

剪断/

ConcurrentDictionary is designed for multithreaded scenarios. You do not have to use locks in your code to add or remove items from the collection. However, it is always possible for one thread to retrieve a value, and another thread to immediately update the collection by giving the same key a new value.

Also, although all methods of ConcurrentDictionary are thread-safe, not all methods are atomic, specifically GetOrAdd and AddOrUpdate. The user delegate that is passed to these methods is invoked outside of the dictionary's internal lock. (This is done to prevent unknown code from blocking all threads.) Therefore it is possible for this sequence of events to occur:

1) threadA calls GetOrAdd, finds no item and creates a new item to Add by invoking the valueFactory delegate.

2) threadB calls GetOrAdd concurrently, its valueFactory delegate is invoked and it arrives at the internal lock before threadA, and so its new key-value pair is added to the dictionary.

3) threadA's user delegate completes, and the thread arrives at the lock, but now sees that the item exists already

4) threadA performs a "Get", and returns the data that was previously added by threadB.

Therefore, it is not guaranteed that the data that is returned by GetOrAdd is the same data that was created by the thread's valueFactory. A similar sequence of events can occur when AddOrUpdate is called.

问题

验证数据并重试更新的正确方法是什么?一个不错的方法是基于旧值的内容尝试/重试此操作的扩展方法。

这将如何实现?我可以依赖结果 (verify) 作为有效的结束状态,还是必须重试并使用不同的方法重新检索值?

代码

以下代码在更新值时存在竞争条件。所需的行为是 AddOrUpdateWithoutRetrieving() 将以不同的方式递增各种值(使用 ++Interlocked.Increment())。

我还想在单个单元中执行多个字段操作,如果之前的更新由于竞争条件而没有“执行”,则重试更新。

运行代码,您会看到控制台中显示的每个值开始增加 1,但每个值都会漂移,有些会提前/落后几次迭代。

namespace DictionaryHowTo
{
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

// The type of the Value to store in the dictionary:
class FilterConcurrentDuplicate
{
// Create a new concurrent dictionary.
readonly ConcurrentDictionary<int, TestData> eventLogCache =
new ConcurrentDictionary<int, TestData>();

static void Main()
{
FilterConcurrentDuplicate c = new FilterConcurrentDuplicate();

c.DoRace(null);
}

readonly ConcurrentDictionary<int, TestData> concurrentCache =
new ConcurrentDictionary<int, TestData>();
void DoRace(string[] args)
{
int max = 1000;

// Add some key/value pairs from multiple threads.
Task[] tasks = new Task[3];

tasks[0] = Task.Factory.StartNew(() =>
{

System.Random RandNum = new System.Random();
int MyRandomNumber = RandNum.Next(1, 500);

Thread.Sleep(MyRandomNumber);
AddOrUpdateWithoutRetrieving();

});

tasks[1] = Task.Factory.StartNew(() =>
{
System.Random RandNum = new System.Random();
int MyRandomNumber = RandNum.Next(1, 1000);

Thread.Sleep(MyRandomNumber);

AddOrUpdateWithoutRetrieving();

});

tasks[2] = Task.Factory.StartNew(() =>
{
AddOrUpdateWithoutRetrieving();

});
// Output results so far.
Task.WaitAll(tasks);

AddOrUpdateWithoutRetrieving();

Console.WriteLine("Press any key.");
Console.ReadKey();
}
public class TestData : IEqualityComparer<TestData>
{
public string aStr1 { get; set; }
public Guid? aGud1 { get; set; }
public string aStr2 { get; set; }
public int aInt1 { get; set; }
public long? aLong1 { get; set; }

public DateTime aDate1 { get; set; }
public DateTime? aDate2 { get; set; }

//public int QueryCount { get; set; }
public int QueryCount = 0;//

public string zData { get; set; }
public bool Equals(TestData x, TestData y)
{
return x.aStr1 == y.aStr1 &&
x.aStr2 == y.aStr2 &&
x.aGud1 == y.aGud1 &&
x.aStr2 == y.aStr2 &&
x.aInt1 == y.aInt1 &&
x.aLong1 == y.aLong1 &&
x.aDate1 == y.aDate1 &&
x.QueryCount == y.QueryCount ;
}

public int GetHashCode(TestData obj)
{
TestData ci = (TestData)obj;
// http://stackoverflow.com/a/263416/328397
return
new {
A = ci.aStr1,
Aa = ci.aStr2,
B = ci.aGud1,
C = ci.aStr2,
D = ci.aInt1,
E = ci.aLong1,
F = ci.QueryCount ,
G = ci.aDate1}.GetHashCode();
}
}
private void AddOrUpdateWithoutRetrieving()
{
// Sometime later. We receive new data from some source.
TestData ci = new TestData()
{
aStr1 = "Austin",
aGud1 = new Guid(),
aStr2 = "System",
aLong1 = 100,
aInt1 = 1000,
QueryCount = 0,
aDate1 = DateTime.MinValue
};

TestData verify = concurrentCache.AddOrUpdate(123, ci,
(key, existingVal) =>
{
existingVal.aStr2 = "test1" + existingVal.QueryCount;
existingVal.aDate1 = DateTime.MinValue;
Console.WriteLine
("Thread:" + Thread.CurrentThread.ManagedThreadId +
" Query Count A:" + existingVal.QueryCount);
Interlocked.Increment(ref existingVal.QueryCount);
System.Random RandNum = new System.Random();
int MyRandomNumber = RandNum.Next(1, 1000);

Thread.Sleep(MyRandomNumber);
existingVal.aInt1++;
existingVal.aDate1 =
existingVal.aDate1.AddSeconds
(existingVal.aInt1);
Console.WriteLine(
"Thread:" + Thread.CurrentThread.ManagedThreadId +
" Query Count B:" + existingVal.QueryCount);
return existingVal;
});


// After each run, every value here should be ++ the previous value
Console.WriteLine(
"Thread:"+Thread.CurrentThread.ManagedThreadId +
": Query Count returned:" + verify.QueryCount +
" eid:" + verify.aInt1 + " date:" +
verify.aDate1.Hour + " " + verify.aDate1.Second +
" NAME:" + verify.aStr2
);
}

}
}

输出

Thread:12: Query Count returned:0 eid:1000 date:0 0 NAME:System

Thread:12 Query Count A:0
Thread:13 Query Count A:1
Thread:12 Query Count B:2
Thread:12: Query Count returned:2 eid:1001 date:0 41 NAME:test11

Thread:12 Query Count A:2
Thread:13 Query Count B:3
Thread:13: Query Count returned:3 eid:1002 date:0 42 NAME:test12

Thread:13 Query Count A:3
Thread:11 Query Count A:4
Thread:11 Query Count B:5
Thread:11: Query Count returned:5 eid:1003 date:0 43 NAME:test14

Thread:11 Query Count A:5
Thread:13 Query Count B:6
Thread:13: Query Count returned:6 eid:1004 date:0 44 NAME:test15

....

Thread:11  Query Count A:658
Thread:11 Query Count B:659
Thread:11: Query Count returned:659 eid:1656 date:0 36 NAME:test1658

Thread:11 Query Count A:659
Thread:11 Query Count B:660
Thread:11: Query Count returned:660 eid:1657 date:0 37 NAME:test1659

Thread:11 Query Count A:660
Thread:11 Query Count B:661
Thread:11: Query Count returned:661 eid:1658 date:0 38 NAME:test1660

Thread:11 Query Count A:661
Thread:11 Query Count B:662
Thread:11: Query Count returned:662 eid:1659 date:0 39 NAME:test1661

在此代码中,“eid”应始终比查询计数多 1,000,但在迭代过程中,两者之间的差异从 1 到 7 不等。这种不一致可能会导致某些应用程序失败或报告不正确的数据。

最佳答案

本次提交是基于对“How to: Add and Remove Items from a ConcurrentDictionary”一文底部备注的错误理解 http://msdn.microsoft.com/en-us/library/dd997369.aspx以及一个基本的并发错误——共享对象的并发非原子修改。

首先,让我们澄清链接文章的真正含义。我将使用 AddOrUpdate 作为示例,但 GetOrAdd 的推理是等效的。

例如,您从多个线程调用 AddOrUpdate 并指定相同的键。假设具有该键的条目已经存在。每个线程都会出现,请注意已经有一个具有指定键的条目,并且 AddOrUpdate 的更新部分是相关的。这样做时,没有线程会锁定字典。相反,它会使用一些互锁指令来自动检查入口 key 是否存在。

因此,我们的几个线程都注意到 key 存在并且需要调用 updateValueFactory。该委托(delegate)被传递给 AddOrUpdate;它引用现有的键和值并返回更新值。现在,所有涉及的线程都将同时调用工厂。它们都将以一些以前未知的顺序完成,并且每个线程都将尝试使用原子操作(使用互锁指令)将现有值替换为它刚刚计算的值。没有办法知道哪个线程会“获胜”。获胜的线程将存储其计算值。其他人会注意到字典中的值不再是作为参数传递给他们的 updateValueFactory 的值。为了响应这种认识,他们将放弃操作并丢弃刚刚计算的值。这正是您想要发生的事情。

接下来,让我们澄清一下为什么在运行此处列出的代码示例时会得到奇怪的值:

回想一下,传递给 AddOrUpdate 的 updateValueFactory 委托(delegate)采用现有键和值的 REFERENCES 并返回更新值。AddOrUpdateWithoutRetrieving() 方法中的代码示例开始直接对该引用执行操作。它不是创建一个新的替换值并修改它,而是修改 existingVal 的实例成员值——一个已经在字典中的对象——然后简单地返回该引用。它不是以原子方式执行的——它读取一些值,更新一些值,读取更多,更新更多。当然,我们在上面已经看到,这在多个线程上同时发生——它们都修改同一个对象。难怪结果是在任何时候(当代码示例调用 WriteLine 时),该对象包含来自不同线程的成员实例值。

字典与此无关——代码只是简单地修改一个在线程之间非原子共享的对象。这是最常见的并发错误之一。两种最常见的解决方法取决于具体情况。要么使用共享锁使整个对象修改原子化,要么先原子地复制整个对象,然后修改本地副本。

对于后者,尝试将其添加到 TestData 类中:

private Object _copyLock = null;

private Object GetLock() {

if (_copyLock != null)
return _copyLock;

Object newLock = new Object();
Object prevLock = Interlocked.CompareExchange(ref _copyLock, newLock, null);
return (prevLock == null) ? newLock : prevLock;
}

public TestData Copy() {

lock (GetLock()) {
TestData copy = new TestData();
copy.aStr1 = this.aStr1;
copy.aStr2 = this.aStr2;
copy.aLong1 = this.aLong1;
copy.aInt1 = this.aInt1;
copy.QueryCount = this.QueryCount;
copy.aDate1 = this.aDate1;
copy.aDate2 = this.aDate2;
copy.zData = this.zData;

return copy;
}
}

然后修改工厂如下:

TestData verify = concurrentCache.AddOrUpdate(123, ci,
(key, existingVal) =>
{
TestData newVal = existingVal.Copy();
newVal.aStr2 = "test1" + newVal.QueryCount;
newVal.aDate1 = DateTime.MinValue;
Console.WriteLine("Thread:" + Thread.CurrentThread.ManagedThreadId + " Query Count A:" + newVal.QueryCount);
Interlocked.Increment(ref newVal.QueryCount);
System.Random RandNum = new System.Random();
int MyRandomNumber = RandNum.Next(1, 1000);

Thread.Sleep(MyRandomNumber);
newVal.aInt1++;
newVal.aDate1 = newVal.aDate1.AddSeconds(newVal.aInt1);
Console.WriteLine("Thread:" + Thread.CurrentThread.ManagedThreadId + " Query Count B:" + newVal.QueryCount);
return newVal;
});

希望对您有所帮助。

关于c# - 使用 "ConcurrentDictionary.GetOrAdd()"时避免陈旧(逻辑上损坏)数据,包括重现代码,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/10666284/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com