如何研究Linq AsParallel()抛出“已添加具有相同键的项”的限制,

ht4b089n  于 2023-11-14  发布在  其他
关注(0)|答案(1)|浏览(123)

我试图找到一种方法来调查运行AsParallel()时的Linq错误。由于代码的性质,我不能全部共享,但我正在运行类似的东西:

ParallelQuery<MyResults> myObjectResults = null
myObjectResults = myObjects.AsParallel().AsOrdered().Select(myObject=> Foo(myObject));

字符串
这是我得到的错误:

System.ArgumentException: An item with the same key has already been added.
   at Foo(Object MyObject)
   at System.Linq.Parallel.PartitionedDataSource`1.ListContiguousIndexRangeEnumerator.MoveNext(T& currentElement, Int32& currentKey)
   at System.Linq.Parallel.OrderPreservingPipeliningSpoolingTask`2.SpoolingWork()
   at System.Linq.Parallel.SpoolingTaskBase.Work()
   at System.Linq.Parallel.QueryTask.BaseWork(Object unused)
   at System.Threading.Tasks.Task.Execute()
   --- End of inner exception stack trace ---
   at System.Linq.Parallel.QueryTaskGroupState.QueryEnd(Boolean userInitiatedDispose)
   at System.Linq.Parallel.DefaultMergeHelper`2.System.Linq.Parallel.IMergeHelper<TInputOutput>.Execute()
   at System.Linq.Parallel.MergeExecutor`1.Execute[TKey](PartitionedStream`2 partitions, Boolean ignoreOutput, ParallelMergeOptions options, TaskScheduler taskScheduler, Boolean isOrdered, CancellationState cancellationState, Int32 queryId)
   at System.Linq.Parallel.PartitionedStreamMerger`1.Receive[TKey](PartitionedStream`2 partitionedStream)
   at System.Linq.Parallel.ForAllOperator`1.WrapPartitionedStream[TKey](PartitionedStream`2 inputStream, IPartitionedStreamRecipient`1 recipient, Boolean preferStriping, QuerySettings settings)
   at System.Linq.Parallel.UnaryQueryOperator`2.UnaryQueryOperatorResults.ChildResultsRecipient.Receive[TKey](PartitionedStream`2 inputStream)
   at System.Linq.Parallel.ScanQueryOperator`1.ScanEnumerableQueryOperatorResults.GivePartitionedStream(IPartitionedStreamRecipient`1 recipient)
   at System.Linq.Parallel.UnaryQueryOperator`2.UnaryQueryOperatorResults.GivePartitionedStream(IPartitionedStreamRecipient`1 recipient)
   at System.Linq.Parallel.QueryOperator`1.GetOpenedEnumerator(Nullable`1 mergeOptions, Boolean suppressOrder, Boolean forEffect, QuerySettings querySettings)
   at System.Linq.Parallel.ForAllOperator`1.RunSynchronously()
   at System.Linq.Parallel.ArrayMergeHelper`1.Execute()
   at System.Linq.Parallel.QueryOperator`1.ExecuteAndGetResultsAsArray()
   at System.Linq.ParallelEnumerable.ToList[TSource](ParallelQuery`1 source)


我无法在我的测试中重现这个问题,所以我希望找到AsParallel如何详细工作或找到避免这个问题的建议。

t2a7ltrp

t2a7ltrp1#

您可以尝试以不同的方式重写Parallel,这可能会给您给予捕获错误的机会:这只是两种并行方法的一个示例

void Main()
{
       DataSet ds = new DataSet();
       DataTable dt = new DataTable();
       dt.Columns.Add("ItemID", typeof(int));
       dt.Columns.Add("test", typeof(string));
       dt.Columns.Add("Email", typeof(string));
       dt.Columns.Add("Date", typeof(DateTime));
       dt.Columns.Add("TaskData", typeof(string));
       dt.Rows.Add(1, "Aaaaaa", "[email protected]", DateTime.Now.AddMinutes(-20), "<?xml version=\"1.0\" encoding=\"utf-16\"?><TaskItem><Email>[email protected]</Email><DeploymentID>1545377</DeploymentID></TaskItem>");
       dt.Rows.Add(2, "Bbbbbb", "[email protected]", DateTime.Now, "<?xml version=\"1.0\" encoding=\"utf-16\"?><TaskItem><Email>[email protected]</Email><DeploymentID>1545377</DeploymentID></TaskItem>");
       dt.Rows.Add(3, "Cccccc", "[email protected]", DateTime.Now, "<?xml version=\"1.0\" encoding=\"utf-16\"?><TaskItem><Email>[email protected]</Email><DeploymentID>1545377</DeploymentID></TaskItem>");
       dt.Rows.Add(4, "Dddddd", "[email protected]", DateTime.Now.AddMinutes(-10), "<?xml version=\"1.0\" encoding=\"utf-16\"?><TaskItem><Email>[email protected]</Email><DeploymentID>1545377</DeploymentID></TaskItem>");
       dt.Rows.Add(5, "Eeeeee", "[email protected]", DateTime.Now.AddMinutes(10), "<?xml version=\"1.0\" encoding=\"utf-16\"?><TaskItem><Email>[email protected]</Email><DeploymentID>1545377</DeploymentID></TaskItem>");
       ds.Tables.Add(dt);

       var options = new ParallelOptions();
       options.MaxDegreeOfParallelism = Math.Max(Environment.ProcessorCount / 2, 1);
       
       ParallelLoopResult x = Parallel.ForEach(
                               dt.AsEnumerable()
                               .OrderBy(d =>d.Field<string>("Email"))
                               .OrderBy(d =>d.Field<DateTime>("Date"))
                               ,options, drow => 
                            {
                               (drow["ItemID"] + " " + drow["Email"] + drow["Date"]).Dump();
                            });
       "..............".Dump();
       List<DataRow> OrderedData = dt.AsEnumerable().AsParallel()
                                       .AsOrdered()
                                       .WithDegreeOfParallelism(2)
                                       .ToList();
                                               OrderedData.ForEach(drow=>{
                                               drow["itemId"].Dump();
                                               drow["Email"].Dump();
                                       });
       "..............".Dump();

      
       
       
}

字符串
此外,在进程内部,您可以将结果添加到ConcurrentBag,以便在运行后查看结果。

相关问题