我有一个生产者-消费者场景,其中生产者是一个可枚举的项目序列(IEnumerable<Item>
)。我希望以10个项目为一组的块/批处理这些项目。因此我决定使用新的(.NET 6)Chunk
LINQ运算符,如以下问题中所建议的:是的。
我的问题是,有时生产者会失败,在这种情况下,块化序列的消费者会收到错误,而不是首先收到错误前生产的最后一个项目的块。因此,如果生产者生成了15个项目,然后失败,消费者将得到一个包含1-10项的块,然后将得到一个异常。11-15项将丢失!下面是一个演示这种不良行为的最小示例:
static IEnumerable<int> Produce()
{
int i = 0;
while (true)
{
i++;
Console.WriteLine($"Producing #{i}");
yield return i;
if (i == 15) throw new Exception("Oops!");
}
}
// Consume
foreach (int[] chunk in Produce().Chunk(10))
{
Console.WriteLine($"Consumed: [{String.Join(", ", chunk)}]");
}
输出量:
Producing #1
Producing #2
Producing #3
Producing #4
Producing #5
Producing #6
Producing #7
Producing #8
Producing #9
Producing #10
Consumed: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
Producing #11
Producing #12
Producing #13
Producing #14
Producing #15
Unhandled exception. System.Exception: Oops!
at Program.<Main>g__Produce|0_0()+MoveNext()
at System.Linq.Enumerable.ChunkIterator[TSource](IEnumerable`1 source, Int32 size)+MoveNext()
at Program.Main()
Online demo中的一个。
理想的行为是在获取异常之前获取值为[11, 12, 13, 14, 15]
的块。
**我的问题是:**是否有任何方法可以配置Chunk
运算符,使其优先考虑发出的数据而不是异常?如果没有,如何实现具有所需行为的自定义LINQ运算符,例如名为ChunkNonDestructive
的运算符?
public static IEnumerable<TSource[]> ChunkNonDestructive<TSource>(
this IEnumerable<TSource> source, int size);
**注意:**除了System.Linq.Chunk
操作符之外,我还试验了System.Interactive包中的Buffer
操作符,以及MoreLinq包中的Batch
操作符。显然,它们的行为都是相同的(破坏性的)。
**Update:**以下是上述示例的理想输出:
Producing #1
Producing #2
Producing #3
Producing #4
Producing #5
Producing #6
Producing #7
Producing #8
Producing #9
Producing #10
Consumed: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
Producing #11
Producing #12
Producing #13
Producing #14
Producing #15
Consumed: [11, 12, 13, 14, 15]
Unhandled exception. System.Exception: Oops!
at Program.<Main>g__Produce|0_0()+MoveNext()
at System.Linq.Enumerable.ChunkIterator[TSource](IEnumerable`1 source, Int32 size)+MoveNext()
at Program.Main()
不同之处在于Consumed: [11, 12, 13, 14, 15]
线,它在实际输出中不存在。
4条答案
按热度按时间pkln4tw61#
如果对源代码进行预处理,使其在遇到异常时停止,则可以按原样使用
Chunk()
。第一个
我觉得这样可以很好地将责任分开。如果你想要一个抛出异常的帮助器,而不是自己捕获它,你可以将它作为一个组件来使用,以简化编写帮助器的过程:
请注意,这将引发一个与生成器发出的异常不同的异常。这使您可以保留与原始异常关联的堆栈跟踪,而
throw e
将覆盖该堆栈跟踪。您可以根据需要对此进行调整。如果您需要捕获您希望生成器发出的特定类型的异常,使用带有模式匹配的
when
上下文关键字非常简单。eh57zj3b2#
首先,语义问题。
Chunk
或Buffer
或其他任何东西都没有破坏性,它只是从一个可枚举的源中读取项目,直到它结束或抛出异常。代码中唯一破坏性的事情是抛出异常,它的行为与预期的一样(例如,将堆栈从生成器、Linq函数中展开,并放入代码中的catch(如果存在))。同样显而易见的是,每个Linq函数在异常方面的行为都是一样的。事实上,这就是异常的工作方式,而围绕它们来支持用例的成本相对较高:您需要为您生成的每一项接受异常。在我看来,这是一个非常糟糕的设计,如果您为我工作并这样做,您将被当场解雇。
这样一来,编写这样的
BadDesignChunk
就很简单了(如果代价很高的话):See it in action here.
vxqlmq5t3#
我受到StripingWarrior的answer的启发,它基于一个我最初并不理解的想法。这个想法是重用现有的
Chunk
实现,并围绕它而不是通过它传播异常。基于这个想法,我编写了一个泛型方法DeferErrorUntilCompletion
,它根据以下规则鲁棒性¹所有类型的LINQ操作符或操作符组合:input
序列失败,则在生成output
序列的所有元素后传播错误。*然后我使用
DeferErrorUntilCompletion
方法实现ChunkNonDestructive
运算符,如下所示:是的。
通过阅读代码可能不会很明显,但是
DeferErrorUntilCompletion
序列的所有枚举数都获得了它们自己的edi
状态。没有并排创建两个枚举数的风险,这会相互干扰。尽管重用现有的内置实现有一定的价值(简单性、一致性、健壮性),但也有缺点。在核心功能之上添加两个额外的枚举可能会导致不可忽略的开销。Charlieface的implementation在生成块方面的速度大约是这个实现的两倍。因此,对于吞吐量非常高的生产者-消费者场景(每秒数千个块),我可能更喜欢使用Charlieface的实现,而不是这个。
¹ LINQ操作符需要健壮化的想法可能听起来很奇怪,甚至有些傲慢。请注意,这个答案的上下文非常具体:这是生产者-消费者场景。在这些场景中,多个生产者和消费者可能并行运行,偶尔会出现异常,并且弹性机制已准备就绪以应对此类异常,因此通常要避免由于错误而到处丢失消息。
ia2d9nvy4#
你不能捕获一个异常,然后yield再重新抛出,因为你不能在
catch
中包含yield
。(原因很明显:一旦你屈服了,你就不再是catch
了。)我认为保留原始异常 * 和 * 原始堆栈跟踪的唯一解决方案是使用
ExceptionDispatchInfo.Capture
。您的
foreach
将始终接收最后一块项,并且只在 next 迭代中抛出。