.net C# LINQ OrderBy在与ConcurrentDictionary〈Tkey,TValue>一起使用时是线程安全的吗?

6g8kf2rb  于 2023-02-06  发布在  .NET
关注(0)|答案(1)|浏览(215)

我的工作假设是LINQ在与System.Collections.Concurrent集合(包括ConcurrentDictionary)一起使用时是线程安全的。
(其他溢出帖子似乎也同意:link
但是,对LINQOrderBy扩展方法实现的检查表明,对于实现ICollection的并发集合子集(例如,ConcurrentDictionary),它似乎不是线程安全的。

    • OrderedEnumerable****GetEnumerator**(此处为源代码)构造Buffer结构(此处为源代码)的示例,该示例尝试将集合强制转换为ICollectionConcurrentDictionary实现),然后使用初始化为集合大小的数组执行collection.CopyTo。

因此,如果ConcurrentDictionary(在本例中为具体的ICollection)在OrderBy操作期间(初始化数组和复制到数组之间)大小增加,则此操作将引发。
以下测试代码显示了此异常:
(Note:我知道在线程安全的集合上执行OrderBy并没有什么意义,但我认为它不应该引发)

using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace Program
{
    class Program
    {
        static void Main(string[] args)
        {
            try
            {
                int loop = 0;
                while (true) //Run many loops until exception thrown
                {
                    Console.WriteLine($"Loop: {++loop}");

                    _DoConcurrentDictionaryWork().Wait();
                }
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex);
            }
        }

        private static async Task _DoConcurrentDictionaryWork()
        {
            var concurrentDictionary = new ConcurrentDictionary<int, object>();
            var keyGenerator = new Random();
            var tokenSource = new CancellationTokenSource();

            var orderByTaskLoop = Task.Run(() =>
            {
                var token = tokenSource.Token;
                while (token.IsCancellationRequested == false)
                {
                    //Keep ordering concurrent dictionary on a loop
                    var orderedPairs = concurrentDictionary.OrderBy(x => x.Key).ToArray(); //THROWS EXCEPTION HERE

                    //...do some more work with ordered snapshot...
                }
            });

            var updateDictTaskLoop = Task.Run(() =>
            {
                var token = tokenSource.Token;
                while (token.IsCancellationRequested == false)
                {
                    //keep mutating dictionary on a loop
                    var key = keyGenerator.Next(0, 1000);
                    concurrentDictionary[key] = new object();
                }
            });

            //Wait for 1 second
            await Task.Delay(TimeSpan.FromSeconds(1));

            //Cancel and dispose token
            tokenSource.Cancel();
            tokenSource.Dispose();

            //Wait for orderBy and update loops to finish (now token cancelled)
            await Task.WhenAll(orderByTaskLoop, updateDictTaskLoop);
        }
    }
}
    • OrderBy**抛出异常会导致以下几个可能的结论之一:

1)我关于LINQ对于并发集合是线程安全的假设是不正确的,只有在LINQ查询过程中不发生变化的集合(无论是否并发)上执行LINQ才是安全的
2)LINQOrderBy的实现存在缺陷,该实现尝试将源集合强制转换为ICollection并尝试执行集合复制是不正确的(它应直接进入其迭代IEnumerable的默认行为)。
3)我误解了这里发生的事...
非常感谢!

shyt4zoc

shyt4zoc1#

没有任何地方声明OrderBy(或其他LINQ方法)应该始终使用源IEnumerableGetEnumerator,或者它应该在并发收集上是线程安全的。
根据键以升序对序列的元素进行排序。
ConcurrentDictionary在某种全局意义上也不是线程安全的,它对于在其上执行的其他操作是线程安全的。
ConcurrentDictionary的所有公共和受保护成员都是线程安全的,可以从多个线程并发使用。**但是,通过ConcurrentDictionary实现的接口之一访问的成员,包括扩展*方法,不能保证是线程安全的,可能需要由调用方同步。
所以你的理解是正确的(OrderBy将看到您传递给它的IEnumerable实际上是ICollection,然后将获取该集合的长度,分配该大小的缓冲区,然后将调用ICollection.CopyTo,当然这在任何类型的集合上都不是线程安全的),但这不是OrderBy中的bug,因为OrderByConcurrentDictionary都没有承诺您所假设的内容。
如果你想在ConcurrentDictionary上以线程安全的方式执行OrderBy,你需要依赖那些被承诺为线程安全的方法。

// note: this is NOT IEnumerable.ToArray()
// but public ToArray() method of ConcurrentDictionary itself
// it is guaranteed to be thread safe with respect to other operations
// on this dictionary
var snapshot = concurrentDictionary.ToArray();
// we are working on snapshot so no one other thread can modify it
// of course at this point real contents of dictionary might not be
// the same as our snapshot
var sorted = snapshot.OrderBy(c => c.Key);

如果不想分配额外的数组(与ToArray),您可以使用Select(c => c),它将在这种情况下工作,但后来我们又一次在模拟领域,并依赖于一些东西是安全的使用情况下,它没有承诺(Select也不会总是枚举你的集合。如果集合是数组或列表-它会快捷方式并使用索引器代替)。所以你可以这样创建扩展方法:

public static class Extensions {
    public static IEnumerable<T> ForceEnumerate<T>(this ICollection<T> collection) {
        foreach (var item in collection)
            yield return item;
    }
}

如果你想安全并且不想分配数组,可以这样使用它:

concurrentDictionary.ForceEnumerate().OrderBy(c => c.Key).ToArray();

在本例中,我们强制枚举ConcurrentDictionary(我们从文档中知道这是安全的),然后将其传递给OrderBy,知道它不会对那个纯IEnumerable造成任何伤害。注意,正如mjwills在注解中正确指出的那样,这与ToArray并不完全相同,因为ToArray生成快照(锁集合防止生成数组时进行修改),并且Select`yield不获取任何锁(因此,当枚举正在进行时,可能会添加/删除项目)。虽然我怀疑在执行问题中描述的操作时是否重要-在两种情况下都是在OrderBy`完成后-您不知道排序的结果是否反映了当前的收集状态。

相关问题