linq 如何将IEnumerable分块化< T>,而在失败的情况下不丢失/丢弃项?

oo7oh9g9  于 2022-12-06  发布在  其他
关注(0)|答案(4)|浏览(166)

我有一个生产者-消费者场景,其中生产者是一个可枚举的项目序列(IEnumerable<Item>)。我希望以10个项目为一组的块/批处理这些项目。因此我决定使用新的(.NET 6)Chunk LINQ运算符,如以下问题中所建议的:是的。
我的问题是,有时生产者会失败,在这种情况下,块化序列的消费者会收到错误,而不是首先收到错误前生产的最后一个项目的块。因此,如果生产者生成了15个项目,然后失败,消费者将得到一个包含1-10项的块,然后将得到一个异常。11-15项将丢失!下面是一个演示这种不良行为的最小示例:

static IEnumerable<int> Produce()
{
    int i = 0;
    while (true)
    {
        i++;
        Console.WriteLine($"Producing #{i}");
        yield return i;
        if (i == 15) throw new Exception("Oops!");
    }
}

// Consume
foreach (int[] chunk in Produce().Chunk(10))
{
    Console.WriteLine($"Consumed: [{String.Join(", ", chunk)}]");
}

输出量:

Producing #1
Producing #2
Producing #3
Producing #4
Producing #5
Producing #6
Producing #7
Producing #8
Producing #9
Producing #10
Consumed: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
Producing #11
Producing #12
Producing #13
Producing #14
Producing #15
Unhandled exception. System.Exception: Oops!
   at Program.<Main>g__Produce|0_0()+MoveNext()
   at System.Linq.Enumerable.ChunkIterator[TSource](IEnumerable`1 source, Int32 size)+MoveNext()
   at Program.Main()

Online demo中的一个。
理想的行为是在获取异常之前获取值为[11, 12, 13, 14, 15]的块。

**我的问题是:**是否有任何方法可以配置Chunk运算符,使其优先考虑发出的数据而不是异常?如果没有,如何实现具有所需行为的自定义LINQ运算符,例如名为ChunkNonDestructive的运算符?

public static IEnumerable<TSource[]> ChunkNonDestructive<TSource>(
    this IEnumerable<TSource> source, int size);

**注意:**除了System.Linq.Chunk操作符之外,我还试验了System.Interactive包中的Buffer操作符,以及MoreLinq包中的Batch操作符。显然,它们的行为都是相同的(破坏性的)。
**Update:**以下是上述示例的理想输出:

Producing #1
Producing #2
Producing #3
Producing #4
Producing #5
Producing #6
Producing #7
Producing #8
Producing #9
Producing #10
Consumed: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
Producing #11
Producing #12
Producing #13
Producing #14
Producing #15
Consumed: [11, 12, 13, 14, 15]
Unhandled exception. System.Exception: Oops!
   at Program.<Main>g__Produce|0_0()+MoveNext()
   at System.Linq.Enumerable.ChunkIterator[TSource](IEnumerable`1 source, Int32 size)+MoveNext()
   at Program.Main()

不同之处在于Consumed: [11, 12, 13, 14, 15]线,它在实际输出中不存在。

pkln4tw6

pkln4tw61#

如果对源代码进行预处理,使其在遇到异常时停止,则可以按原样使用Chunk()
第一个
我觉得这样可以很好地将责任分开。如果你想要一个抛出异常的帮助器,而不是自己捕获它,你可以将它作为一个组件来使用,以简化编写帮助器的过程:

public static IEnumerable<T[]> ChunkUntilFirstException<T>(this IEnumerable<T> source, int size)
    {
        Exception? e = null;
        var result = source.UntilFirstException(thrown => e = thrown).Chunk(size);
        foreach (var element in result)
        {
            yield return element;
        }
        if (e != null)
        {
            throw new InvalidOperationException("source threw an exception", e);
        }
    }

请注意,这将引发一个与生成器发出的异常不同的异常。这使您可以保留与原始异常关联的堆栈跟踪,而throw e将覆盖该堆栈跟踪。
您可以根据需要对此进行调整。如果您需要捕获您希望生成器发出的特定类型的异常,使用带有模式匹配的when上下文关键字非常简单。

try
    {
        foreach (int[] chunk in Produce().ChunkUntilFirstException(10))
        {
            Console.WriteLine($"Consumed: [{String.Join(", ", chunk)}]");
        }
    }
    catch (InvalidOperationException e) when (e.InnerException is {Message: "Oops!"})
    {
        Console.WriteLine(e.InnerException.ToString());
    }
eh57zj3b

eh57zj3b2#

首先,语义问题。ChunkBuffer或其他任何东西都没有破坏性,它只是从一个可枚举的源中读取项目,直到它结束或抛出异常。代码中唯一破坏性的事情是抛出异常,它的行为与预期的一样(例如,将堆栈从生成器、Linq函数中展开,并放入代码中的catch(如果存在))。
同样显而易见的是,每个Linq函数在异常方面的行为都是一样的。事实上,这就是异常的工作方式,而围绕它们来支持用例的成本相对较高:您需要为您生成的每一项接受异常。在我看来,这是一个非常糟糕的设计,如果您为我工作并这样做,您将被当场解雇。
这样一来,编写这样的BadDesignChunk就很简单了(如果代价很高的话):

public static IEnumerable<IEnumerable<TSource>> BadDesignChunk<TSource>(this IEnumerable<TSource> source, int size)
{
    Exception caughtException = default;
    var chunk = new List<TSource>();
    using var enumerator = source.GetEnumerator();
    
    while(true)
    {
        while(chunk.Count < size)
        {
            try
            {
                if(!enumerator.MoveNext())
                {
                    // end of the stream, send what we have and finish
                    goto end;
                }
            }
            catch(Exception ex)
            {
                // exception, send what we have and finish
                caughtException = ex;
                goto end;
            }
            
            chunk.Add(enumerator.Current);
        }
        
        // chunk full, send it
        yield return chunk;
        chunk.Clear();
    }
    
    end:
    if(chunk.Count > 0)
        yield return chunk;
    if(caughtException is not null)
        throw caughtException;
}

See it in action here.

vxqlmq5t

vxqlmq5t3#

我受到StripingWarrior的answer的启发,它基于一个我最初并不理解的想法。这个想法是重用现有的Chunk实现,并围绕它而不是通过它传播异常。基于这个想法,我编写了一个泛型方法DeferErrorUntilCompletion,它根据以下规则鲁棒性¹所有类型的LINQ操作符或操作符组合:

  • 如果input序列失败,则在生成output序列的所有元素后传播错误。*
private static IEnumerable<TOutput> DeferErrorUntilCompletion<TInput, TOutput>(
    IEnumerable<TInput> input,
    Func<IEnumerable<TInput>, IEnumerable<TOutput>> conversion)
{
    ExceptionDispatchInfo edi = null;
    IEnumerable<TInput> InputIterator()
    {
        using var enumerator = input.GetEnumerator();
        while (true)
        {
            TInput item;
            try
            {
                if (!enumerator.MoveNext()) break;
                item = enumerator.Current;
            }
            catch (Exception ex)
            {
                edi = ExceptionDispatchInfo.Capture(ex);
                break;
            }
            yield return item;
        }
    }
    IEnumerable<TOutput> output = conversion(InputIterator());
    foreach (TOutput item in output) yield return item;
    edi?.Throw();
}

然后我使用DeferErrorUntilCompletion方法实现ChunkNonDestructive运算符,如下所示:

/// <summary>
/// Splits the elements of a sequence into chunks of the specified size.
/// In case the sequence fails and there are buffered elements, a last chunk
/// that contains these elements is emited before propagating the error.
/// </summary>
public static IEnumerable<TSource[]> ChunkNonDestructive<TSource>(
    this IEnumerable<TSource> source, int size)
{
    ArgumentNullException.ThrowIfNull(source);
    if (size < 1) throw new ArgumentOutOfRangeException(nameof(size));
    return DeferErrorUntilCompletion(source, s => s.Chunk(size));
}

是的。
通过阅读代码可能不会很明显,但是DeferErrorUntilCompletion序列的所有枚举数都获得了它们自己的edi状态。没有并排创建两个枚举数的风险,这会相互干扰。
尽管重用现有的内置实现有一定的价值(简单性、一致性、健壮性),但也有缺点。在核心功能之上添加两个额外的枚举可能会导致不可忽略的开销。Charlieface的implementation在生成块方面的速度大约是这个实现的两倍。因此,对于吞吐量非常高的生产者-消费者场景(每秒数千个块),我可能更喜欢使用Charlieface的实现,而不是这个。
¹ LINQ操作符需要健壮化的想法可能听起来很奇怪,甚至有些傲慢。请注意,这个答案的上下文非常具体:这是生产者-消费者场景。在这些场景中,多个生产者和消费者可能并行运行,偶尔会出现异常,并且弹性机制已准备就绪以应对此类异常,因此通常要避免由于错误而到处丢失消息。

ia2d9nvy

ia2d9nvy4#

你不能捕获一个异常,然后yield再重新抛出,因为你不能在catch中包含yield。(原因很明显:一旦你屈服了,你就不再是catch了。)
我认为保留原始异常 * 和 * 原始堆栈跟踪的唯一解决方案是使用ExceptionDispatchInfo.Capture

private static IEnumerable<IList<TSource>> ChunkIterator<TSource>(this IEnumerable<TSource> source, int size)
{
    using var e = source.GetEnumerator();

    var chunk = new List<TSource>(size);
    ExceptionDispatchInfo exDispatch = null;
    try
    {
        while(true)
        {
            try
            {
                while(e.MoveNext())
                {
                    chunk.Add(e.Current);
                    if (chunk.Count == size)
                        break;
                }
            }
            catch(Exception ex)
            {
                exDispatch = ExceptionDispatchInfo.Capture(ex);
            }

            if(chunk.Count > 0)
                yield return chunk.ToArray();

            var exDispatch2 = exDispatch;
            exDispatch = null;
            exDispatch2?.Throw();

            if(chunk.Count > 0)
                chunk.Clear();
            else
                yield break;
        }
    }
    finally
    {
        exDispatch?.Throw();
    }
}

您的foreach将始终接收最后一块项,并且只在 next 迭代中抛出。

相关问题