.net 并行不适用于实体框架

q43xntqr  于 2023-01-22  发布在  .NET
关注(0)|答案(4)|浏览(140)

我有一个ID列表,需要对每个ID运行几个存储过程。
当我使用标准的foreach循环时,它工作正常,但是当我有很多记录时,它工作得相当慢。
我想转换代码以使用EF,但遇到了一个异常:
打开基础提供程序失败。
我在Parallel.ForEach中使用以下代码:

using (XmlEntities osContext = new XmlEntities())
{
    //The code
}

但它仍然抛出异常。
我知道如何在EF中使用Parallel吗?我需要为我运行的每个过程创建一个新的上下文吗?我有大约10个过程,所以我认为为每个过程创建10个上下文是非常糟糕的。

vsikbqxv

vsikbqxv1#

The underlying database connections that the Entity Framework are using are not thread-safe。您 * 将 * 需要为您要在另一个线程上执行的每个操作创建一个新的上下文。
您对如何并行化操作的关注是有道理的;很多上下文的打开和关闭都是很昂贵的。
相反,您可能需要颠倒一下您对代码并行化的想法,看起来您正在循环处理许多项,然后为每个项串行调用存储过程。
如果可以的话,为每个 procedure 创建一个新的Task<TResult>(或者Task,如果不需要结果的话),然后在那个Task<TResult>中,打开一个上下文,循环所有的项,然后执行存储过程。这样,你就只有与你并行运行的存储过程相同数量的上下文。
假设您有一个MyDbContext,它包含两个存储过程DoSomething1DoSomething2,这两个存储过程都使用类MyItem的示例。
实现上面的代码类似于:

// You'd probably want to materialize this into an IList<T> to avoid
// warnings about multiple iterations of an IEnumerable<T>.
// You definitely *don't* want this to be an IQueryable<T>
// returned from a context.
IEnumerable<MyItem> items = ...;

// The first stored procedure is called here.
Task t1 = Task.Run(() => { 
    // Create the context.
    using (var ctx = new MyDbContext())
    // Cycle through each item.
    foreach (MyItem item in items)
    {
        // Call the first stored procedure.
        // You'd of course, have to do something with item here.
        ctx.DoSomething1(item);
    }
});

// The second stored procedure is called here.
Task t2 = Task.Run(() => { 
    // Create the context.
    using (var ctx = new MyDbContext())
    // Cycle through each item.
    foreach (MyItem item in items)
    {
        // Call the first stored procedure.
        // You'd of course, have to do something with item here.
        ctx.DoSomething2(item);
    }
});

// Do something when both of the tasks are done.

如果您 * 不能 * 并行执行存储过程(每个存储过程都依赖于以特定顺序运行),那么您仍然可以并行执行操作,只是稍微复杂一些。
你可以在你的项目上查看creating custom partitions(在Partitioner class上使用静态Create method),这将给你提供获得IEnumerator<T>实现的方法(注意,这不是IEnumerable<T>,所以你不能在它上面使用foreach)。
对于返回的每个IEnumerator<T>示例,您将创建一个新的Task<TResult>(如果需要结果),并且在Task<TResult>主体中,您将创建上下文,然后循环通过IEnumerator<T>返回的项,按顺序调用存储过程。
它看起来像这样:

// Get the partitioner.
OrdinalPartitioner<MyItem> partitioner = Partitioner.Create(items);

// Get the partitions.
// You'll have to set the parameter for the number of partitions here.
// See the link for creating custom partitions for more
// creation strategies.
IList<IEnumerator<MyItem>> paritions = partitioner.GetPartitions(
    Environment.ProcessorCount);

// Create a task for each partition.
Task[] tasks = partitions.Select(p => Task.Run(() => { 
        // Create the context.
        using (var ctx = new MyDbContext())
        // Remember, the IEnumerator<T> implementation
        // might implement IDisposable.
        using (p)
        // While there are items in p.
        while (p.MoveNext())
        {
            // Get the current item.
            MyItem current = p.Current;

            // Call the stored procedures.  Process the item
            ctx.DoSomething1(current);
            ctx.DoSomething2(current);
        }
    })).
    // ToArray is needed (or something to materialize the list) to
    // avoid deferred execution.
    ToArray();
kdfy810k

kdfy810k2#

EF不是线程安全的,因此不能使用Parallel。
看一看Entity Framework and Multi threading
和这个article

bwleehnv

bwleehnv3#

这是我使用的,而且效果很好。它还支持处理错误异常,并且有一个调试模式,这使得跟踪事情变得容易得多

public static ConcurrentQueue<Exception> Parallel<T>(this IEnumerable<T> items, Action<T> action, int? parallelCount = null, bool debugMode = false)
{
    var exceptions = new ConcurrentQueue<Exception>();
    if (debugMode)
    {
        foreach (var item in items)
        {
            try
            {
                action(item);
            }
            // Store the exception and continue with the loop.                     
            catch (Exception e)
            {
                exceptions.Enqueue(e);
            }
        }
    }
    else
    {
        var partitions = Partitioner.Create(items).GetPartitions(parallelCount ?? Environment.ProcessorCount).Select(partition => Task.Factory.StartNew(() =>
        {
            while (partition.MoveNext())
            {
                try
                {
                    action(partition.Current);
                }
                // Store the exception and continue with the loop.                     
                catch (Exception e)
                {
                    exceptions.Enqueue(e);
                }
            }
        }));
        Task.WaitAll(partitions.ToArray());
    }
    return exceptions;
}

可以像下面这样使用它,其中,as db是原始DbContext,db.CreateInstance()使用相同的连接字符串创建一个新示例。

var batch = db.Set<SomeListToIterate>().ToList();
        var exceptions = batch.Parallel((item) =>
        {
            using (var batchDb = db.CreateInstance())
            {
                var batchTime = batchDb.GetDBTime();
                var someData = batchDb.Set<Permission>().Where(x=>x.ID = item.ID).ToList();
                //do stuff to someData
                item.WasMigrated = true; //note that this record is attached to db not batchDb and will only be saved when db.SaveChanges() is called
                batchDb.SaveChanges();        
            }                
        });
        if (exceptions.Count > 0)
        {
            logger.Error("ContactRecordMigration : Content: Error processing one or more records", new AggregateException(exceptions));
            throw new AggregateException(exceptions); //optionally throw an exception
        }
        db.SaveChanges(); //save the item modifications
wnavrhmk

wnavrhmk4#

这是我解决我的问题的方法。我试着在我的Parallel中得到一个订单。对于与EF有唯一连接的每个,这是我以为的,但不是真的,我每次都有相同的最后一个订单。在操作结束时删除使用并关闭连接后,它工作得很好。
简而言之,如果你使用了using,不要在Parallel中的操作结束时关闭连接。
我确实换了这个

using ApplicationDbContext Context = CubiCommon.GetContext<ApplicationDbContext>("APIConnection");

由此

ApplicationDbContext Context = CubiCommon.GetContext<ApplicationDbContext>("APIConnection");

并在手术结束时添加了这个

Context.Dispose();
Context = null;

也许这可以解决类似的问题

相关问题