.net oracle clob的通用xml流编写器

fcg9iug3  于 2023-05-08  发布在  .NET
关注(0)|答案(1)|浏览(126)

我的问题是,我们是否有任何类似的实现流巨大的xml异步和存储到oracle clob列库?尝试将通用XML流读写写入Oracle XML CLOB
我已经搜索了很多,但我没有看到任何库可以做到这一点,仍然需要一些改进的代码,使它更新clob列,而不是插入新的记录,为每一个读取。

static async Task Main()
{
    const string connectionString = "<your connection string>";
    const string tableName = "<your table name>";
    const int chunkSize = 100000;

    using var connection = new OracleConnection(connectionString);
    await connection.OpenAsync();
    using var transaction = connection.BeginTransaction();
    using var command = connection.CreateCommand();
    command.Transaction = transaction;
    command.CommandText = $"INSERT INTO {tableName} (xml_clob) VALUES (:clob)";
    var parameter = new OracleParameter("clob", OracleDbType.Clob);

    const string filePath = "large-file.xml";
    using var reader = XmlReader.Create(filePath);
    var semaphore = new SemaphoreSlim(50);

    var tasks = new List<Task<int>>();
    var settings = new XmlWriterSettings
    {
        Indent = true,
        IndentChars = "    "
    };

    while (await reader.ReadAsync())
    {
        switch (reader.NodeType)
        {
            case XmlNodeType.Element:
                using var ms = new MemoryStream();
                using var writer = XmlWriter.Create(ms, settings);
                writer.WriteStartElement(reader.Name);
                if (reader.HasAttributes)
                {
                    while (reader.MoveToNextAttribute())
                    {
                        writer.WriteAttributeString(reader.Name, reader.Value);
                    }

                    reader.MoveToElement();
                }
                writer.Flush();
                ms.Seek(0, SeekOrigin.Begin);
                parameter.Value = new OracleClob(connection, true);
                using var clobWriter = new StreamWriter(parameter.Value, Encoding.Unicode);
                await ms.CopyToAsync(clobWriter.BaseStream);
                await clobWriter.FlushAsync();
                var task = command.ExecuteNonQueryAsync();
                tasks.Add(task);
                break;

            case XmlNodeType.EndElement:
                break;

            case XmlNodeType.Text:
                using var ms2 = new MemoryStream();
                using var writer2 = XmlWriter.Create(ms2);
                while (!reader.EOF)
                {
                    char[] buffer = new char[chunkSize];
                    int charsRead = await reader.ReadValueChunkAsync(buffer, 0, buffer.Length);

                    if (charsRead == 0)
                    {
                        break;
                    }

                    await writer2.WriteCharsAsync(buffer, 0, charsRead);
                }

                writer2.Flush();
                ms2.Seek(0, SeekOrigin.Begin);
                parameter.Value = new OracleClob(connection, true);
                using var clobWriter2 = new StreamWriter(parameter.Value, Encoding.Unicode);
                await ms2.CopyToAsync(clobWriter2.BaseStream);
                await clobWriter2.FlushAsync();
                var task2 = command.ExecuteNonQueryAsync();
                tasks.Add(task2);
                break;
        }
    }

    await Task.WhenAll(tasks);
    await transaction.CommitAsync();
    Console.WriteLine($"Finished processing XML file '{filePath}'");
}

也正在计划这种方法,使正确的更快

using Oracle.ManagedDataAccess.Client;
using System;
using System.IO;
using System.Threading.Tasks;

public async Task SaveFileToOracleClobParallel(string filePath, string connectionString)
{
    const int bufferSize = 81920; // 80KB buffer size
    const int maxDegreeOfParallelism = 4; // maximum number of tasks to run in parallel

    try
    {
        using (var connection = new OracleConnection(connectionString))
        {
            await connection.OpenAsync();

            using (var transaction = connection.BeginTransaction())
            using (var command = connection.CreateCommand())
            {
                command.Transaction = transaction;
                command.CommandText = "INSERT INTO MyTable (MyClobColumn) VALUES (:clob)";

                using (var clob = new OracleClob(connection))
                {
                    await clob.OpenAsync(OracleDbType.Clob, System.Data.ConnectionState.Open);

                    var tasks = new List<Task>();

                    using (var stream = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.Read, bufferSize, FileOptions.SequentialScan))
                    {
                        for (int i = 0; i < maxDegreeOfParallelism; i++)
                        {
                            tasks.Add(ReadAndWriteChunkAsync(stream, clob, bufferSize));
                        }

                        await Task.WhenAll(tasks);
                    }

                    await clob.CloseAsync();
                    command.Parameters.Add(new OracleParameter("clob", OracleDbType.Clob) { Value = clob });

                    await command.ExecuteNonQueryAsync();
                    await transaction.CommitAsync();
                }
            }
        }
    }
    catch (Exception ex)
    {
        Console.WriteLine($"Error: {ex.Message}");
        // Handle the exception as appropriate
    }
}

private async Task ReadAndWriteChunkAsync(Stream stream, OracleClob clob, int bufferSize)
{
    var buffer = new byte[bufferSize];
    int bytesRead;

    while ((bytesRead = await stream.ReadAsync(buffer, 0, buffer.Length)) > 0)
    {
        await clob.WriteAsync(buffer, 0, bytesRead);
    }
}
wswtfjt7

wswtfjt71#

您可以使用CopyToAsync直接传递文件

static async Task Main()
{
    const string connectionString = "<your connection string>";
    const string query = "INSERT INTO <your table name> (xml_clob) VALUES (:clob)"

    await using var connection = new OracleConnection(connectionString);
    await connection.OpenAsync();
    using var command = new OracleCommand(query, connection, transaction);
    var parameter = new OracleParameter(":clob", OracleDbType.Clob).Value = new OracleClob(connection, true);

    const string filePath = "large-file.xml";
    await using var file = File.Open(filePath, FileMode.Open, FileAccess.Read);
    await ms.CopyToAsync(parameter.Value);
    await clobWriter.FlushAsync();
    await command.ExecuteNonQueryAsync();
    Console.WriteLine($"Finished processing XML file '{filePath}'");
}

相关问题