csv 如何确定IEnumerable的大小(以字节为单位)以获得良好的批处理大小?

yr9zkbsy  于 2023-03-10  发布在  其他
关注(0)|答案(1)|浏览(120)

我将记录写入csv格式,以便通过外部API将其作为文件上传,该API对上传的文件大小有限制。我将记录写入内存的方法如下:

using CsvHelper;

public async Task<byte[]> WriteToMemoryAsync<T>(IEnumerable<T> recordsToWrite) where T : class
{
    using (var memoryStream = new MemoryStream())
    {
        using (var writer = new StreamWriter(memoryStream))
        using (var csv = new CsvWriter(writer, new CultureInfo("sv-SE")))
        {
            await csv.WriteRecordsAsync(recordsToWrite);

        }
        return memoryStream.ToArray();
    }
}

我当前的批处理方法如下所示:

public async Task<Dictionary<int, byte[]>> BatchWriteToMemoryAsync<T>(IEnumerable<T> recordsToWrite) where T : class
{
    var maxBatchSize = 50_000;

    var nrOfBatches = (int)Math.Ceiling((double)recordsToWrite.Count() / maxBatchSize);

    Dictionary<int, byte[]> records = new();
    for (int batchNr = 0; batchNr < nrOfBatches; batchNr++)
    {
        records.Add(batchNr, await WriteToMemoryAsync<T>(recordsToWrite));
    }

    return records;
}

问题是批量大小是任意的,取决于T包含的对象有多大,这可能会失败。
是否有任何方法可以获得IEnumerable<T>的大小(以字节为单位),以便获得文件大小的近似值,然后基于此确定批处理的数量?
有没有其他办法来解决这个问题?

编辑

现在我已经实现了Magnus建议的第一个解决方案,但是,在检查流的长度之前刷新写入的记录有一个问题,因为流在那个时候已经超过了大小限制。
我创建了一个测试来模拟这个问题,但由于第一批的批量大小达到1009B,因此测试失败。

[Fact]
public void WhenWritingToMemoryInBatches_ShouldEnsureFileSizeDoesNotExceedLimit()
{
    //Arrange
    var records = GenerateTestRecords(100);

    var fileSizeLimit = 1_000;  //1000B limit

    var csvHandler = new CsvHandler();

    //Act
    var file = csvHandler.BatchWriteToMemory(records, fileSizeLimit);

    //Assert
    Assert.All(file, f => Assert.True(f.Length < fileSizeLimit, $"Expected fileSize to be less than {fileSizeLimit}. Actual fileSize was {f.Length}."));
}

private IEnumerable<TestRecord> GenerateTestRecords(int amountOfRecords)
{
    List<TestRecord> records = new();
    for (int i = 0; i < amountOfRecords; i++)
    {
        records.Add(new TestRecord
        {
            StringType = $"String {i}",
            IntType = 1,
        });
    }
    return records;
}

private class TestRecord
{
    public string? StringType { get; set; }
    public int IntType { get; set; }
}
ssgvzors

ssgvzors1#

如果将所有项目发送到WriteToMemory,然后检查流的大小,如果是所需的大小,则返回写入的项目,并为下一批初始化新流。因此WriteToMemory将返回一组批。您可能不想在每个写入记录后刷新,请找到一个合适的刷新间隔。

public static IEnumerable<byte[]> WriteToMemory<T>(IEnumerable<T> recordsToWrite)
{
    var memoryStream = new MemoryStream();
    var writer = new StreamWriter(memoryStream);
    var csv = new CsvWriter(writer, new CultureInfo("sv-SE"));

    try
    {
        foreach (var r in recordsToWrite)
        {
            csv.WriteRecord(r);
            csv.Flush(); //might want to flush after every x items instead of each.
            if (memoryStream.Length >= 1024)
            {
                csv.Dispose();
                writer.Dispose();
                memoryStream.Dispose();
                
                yield return memoryStream.ToArray();
                
                memoryStream = new MemoryStream();
                writer = new StreamWriter(memoryStream);
                csv = new CsvWriter(writer, new CultureInfo("sv-SE"));
            }
        }
        
        csv.Flush();
        if (memoryStream.Length > 0)
            yield return memoryStream.ToArray();
    }
    finally
    {
        csv.Dispose();
        writer.Dispose();
        memoryStream.Dispose();
    }
}

若要避免在内存中保存大量字节数组,您可以改为接受方法的委托以创建流(例如磁盘上的文件)。

public class Program
{
    private static int count = 0;
    public static async Task Main()
    {
        await WriteToStreamAsync(Enumerable.Range(0, 10_000), () => File.Create($"C:\\temp\\\\files\\file{count++}.csv"));
    }

    public static async Task WriteToStreamAsync<T>(IEnumerable<T> recordsToWrite, Func<Stream> createFile)
    {
        var stream = createFile();
        var writer = new StreamWriter(stream);
        var csv = new CsvWriter(writer, new CultureInfo("sv-SE"));

        try
        {
            var i = 0;
            foreach (var r in recordsToWrite)
            {
                csv.WriteRecord(r);
                if (++i % 100 == 0) //Find some good interval
                    await csv.FlushAsync();
                if (stream.Length >= 1024)
                {
                    await csv.DisposeAsync();
                    await writer.DisposeAsync();
                    await stream.DisposeAsync();
                    
                    stream = createFile();
                    writer = new StreamWriter(stream);
                    csv = new CsvWriter(writer, new CultureInfo("sv-SE"));
                }
            }
        }
        finally
        {
            await csv.DisposeAsync();
            await writer.DisposeAsync();
            await stream.DisposeAsync();
        }
    }
}

相关问题