我的工作假设是LINQ在与System.Collections.Concurrent集合(包括ConcurrentDictionary)一起使用时是线程安全的。
(其他溢出帖子似乎也同意:link)
但是,对LINQOrderBy扩展方法实现的检查表明,对于实现ICollection的并发集合子集(例如,ConcurrentDictionary),它似乎不是线程安全的。
- OrderedEnumerable****GetEnumerator**(此处为源代码)构造Buffer结构(此处为源代码)的示例,该示例尝试将集合强制转换为ICollection(ConcurrentDictionary实现),然后使用初始化为集合大小的数组执行collection.CopyTo。
因此,如果ConcurrentDictionary(在本例中为具体的ICollection)在OrderBy操作期间(初始化数组和复制到数组之间)大小增加,则此操作将引发。
以下测试代码显示了此异常:
(Note:我知道在线程安全的集合上执行OrderBy并没有什么意义,但我认为它不应该引发)
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace Program
{
class Program
{
static void Main(string[] args)
{
try
{
int loop = 0;
while (true) //Run many loops until exception thrown
{
Console.WriteLine($"Loop: {++loop}");
_DoConcurrentDictionaryWork().Wait();
}
}
catch (Exception ex)
{
Console.WriteLine(ex);
}
}
private static async Task _DoConcurrentDictionaryWork()
{
var concurrentDictionary = new ConcurrentDictionary<int, object>();
var keyGenerator = new Random();
var tokenSource = new CancellationTokenSource();
var orderByTaskLoop = Task.Run(() =>
{
var token = tokenSource.Token;
while (token.IsCancellationRequested == false)
{
//Keep ordering concurrent dictionary on a loop
var orderedPairs = concurrentDictionary.OrderBy(x => x.Key).ToArray(); //THROWS EXCEPTION HERE
//...do some more work with ordered snapshot...
}
});
var updateDictTaskLoop = Task.Run(() =>
{
var token = tokenSource.Token;
while (token.IsCancellationRequested == false)
{
//keep mutating dictionary on a loop
var key = keyGenerator.Next(0, 1000);
concurrentDictionary[key] = new object();
}
});
//Wait for 1 second
await Task.Delay(TimeSpan.FromSeconds(1));
//Cancel and dispose token
tokenSource.Cancel();
tokenSource.Dispose();
//Wait for orderBy and update loops to finish (now token cancelled)
await Task.WhenAll(orderByTaskLoop, updateDictTaskLoop);
}
}
}
- OrderBy**抛出异常会导致以下几个可能的结论之一:
1)我关于LINQ对于并发集合是线程安全的假设是不正确的,只有在LINQ查询过程中不发生变化的集合(无论是否并发)上执行LINQ才是安全的
2)LINQOrderBy的实现存在缺陷,该实现尝试将源集合强制转换为ICollection并尝试执行集合复制是不正确的(它应直接进入其迭代IEnumerable的默认行为)。
3)我误解了这里发生的事...
非常感谢!
1条答案
按热度按时间shyt4zoc1#
没有任何地方声明
OrderBy
(或其他LINQ方法)应该始终使用源IEnumerable
的GetEnumerator
,或者它应该在并发收集上是线程安全的。根据键以升序对序列的元素进行排序。
ConcurrentDictionary
在某种全局意义上也不是线程安全的,它对于在其上执行的其他操作是线程安全的。ConcurrentDictionary的所有公共和受保护成员都是线程安全的,可以从多个线程并发使用。**但是,通过ConcurrentDictionary实现的接口之一访问的成员,包括扩展*方法,不能保证是线程安全的,可能需要由调用方同步。
所以你的理解是正确的(
OrderBy
将看到您传递给它的IEnumerable
实际上是ICollection
,然后将获取该集合的长度,分配该大小的缓冲区,然后将调用ICollection.CopyTo
,当然这在任何类型的集合上都不是线程安全的),但这不是OrderBy
中的bug,因为OrderBy
和ConcurrentDictionary
都没有承诺您所假设的内容。如果你想在
ConcurrentDictionary
上以线程安全的方式执行OrderBy
,你需要依赖那些被承诺为线程安全的方法。如果不想分配额外的数组(与
ToArray
),您可以使用Select(c => c)
,它将在这种情况下工作,但后来我们又一次在模拟领域,并依赖于一些东西是安全的使用情况下,它没有承诺(Select
也不会总是枚举你的集合。如果集合是数组或列表-它会快捷方式并使用索引器代替)。所以你可以这样创建扩展方法:如果你想安全并且不想分配数组,可以这样使用它:
在本例中,我们强制枚举
ConcurrentDictionary
(我们从文档中知道这是安全的),然后将其传递给OrderBy
,知道它不会对那个纯IEnumerable
造成任何伤害。注意,正如mjwills在注解中正确指出的那样,这与ToArray
并不完全相同,因为ToArray
生成快照(锁集合防止生成数组时进行修改),并且Select
`yield不获取任何锁(因此,当枚举正在进行时,可能会添加/删除项目)。虽然我怀疑在执行问题中描述的操作时是否重要-在两种情况下都是在
OrderBy`完成后-您不知道排序的结果是否反映了当前的收集状态。