我的问题是,我们是否有任何类似的实现流巨大的xml异步和存储到oracle clob列库?尝试将通用XML流读写写入Oracle XML CLOB
我已经搜索了很多,但我没有看到任何库可以做到这一点,仍然需要一些改进的代码,使它更新clob列,而不是插入新的记录,为每一个读取。
static async Task Main()
{
const string connectionString = "<your connection string>";
const string tableName = "<your table name>";
const int chunkSize = 100000;
using var connection = new OracleConnection(connectionString);
await connection.OpenAsync();
using var transaction = connection.BeginTransaction();
using var command = connection.CreateCommand();
command.Transaction = transaction;
command.CommandText = $"INSERT INTO {tableName} (xml_clob) VALUES (:clob)";
var parameter = new OracleParameter("clob", OracleDbType.Clob);
const string filePath = "large-file.xml";
using var reader = XmlReader.Create(filePath);
var semaphore = new SemaphoreSlim(50);
var tasks = new List<Task<int>>();
var settings = new XmlWriterSettings
{
Indent = true,
IndentChars = " "
};
while (await reader.ReadAsync())
{
switch (reader.NodeType)
{
case XmlNodeType.Element:
using var ms = new MemoryStream();
using var writer = XmlWriter.Create(ms, settings);
writer.WriteStartElement(reader.Name);
if (reader.HasAttributes)
{
while (reader.MoveToNextAttribute())
{
writer.WriteAttributeString(reader.Name, reader.Value);
}
reader.MoveToElement();
}
writer.Flush();
ms.Seek(0, SeekOrigin.Begin);
parameter.Value = new OracleClob(connection, true);
using var clobWriter = new StreamWriter(parameter.Value, Encoding.Unicode);
await ms.CopyToAsync(clobWriter.BaseStream);
await clobWriter.FlushAsync();
var task = command.ExecuteNonQueryAsync();
tasks.Add(task);
break;
case XmlNodeType.EndElement:
break;
case XmlNodeType.Text:
using var ms2 = new MemoryStream();
using var writer2 = XmlWriter.Create(ms2);
while (!reader.EOF)
{
char[] buffer = new char[chunkSize];
int charsRead = await reader.ReadValueChunkAsync(buffer, 0, buffer.Length);
if (charsRead == 0)
{
break;
}
await writer2.WriteCharsAsync(buffer, 0, charsRead);
}
writer2.Flush();
ms2.Seek(0, SeekOrigin.Begin);
parameter.Value = new OracleClob(connection, true);
using var clobWriter2 = new StreamWriter(parameter.Value, Encoding.Unicode);
await ms2.CopyToAsync(clobWriter2.BaseStream);
await clobWriter2.FlushAsync();
var task2 = command.ExecuteNonQueryAsync();
tasks.Add(task2);
break;
}
}
await Task.WhenAll(tasks);
await transaction.CommitAsync();
Console.WriteLine($"Finished processing XML file '{filePath}'");
}
也正在计划这种方法,使正确的更快
using Oracle.ManagedDataAccess.Client;
using System;
using System.IO;
using System.Threading.Tasks;
public async Task SaveFileToOracleClobParallel(string filePath, string connectionString)
{
const int bufferSize = 81920; // 80KB buffer size
const int maxDegreeOfParallelism = 4; // maximum number of tasks to run in parallel
try
{
using (var connection = new OracleConnection(connectionString))
{
await connection.OpenAsync();
using (var transaction = connection.BeginTransaction())
using (var command = connection.CreateCommand())
{
command.Transaction = transaction;
command.CommandText = "INSERT INTO MyTable (MyClobColumn) VALUES (:clob)";
using (var clob = new OracleClob(connection))
{
await clob.OpenAsync(OracleDbType.Clob, System.Data.ConnectionState.Open);
var tasks = new List<Task>();
using (var stream = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.Read, bufferSize, FileOptions.SequentialScan))
{
for (int i = 0; i < maxDegreeOfParallelism; i++)
{
tasks.Add(ReadAndWriteChunkAsync(stream, clob, bufferSize));
}
await Task.WhenAll(tasks);
}
await clob.CloseAsync();
command.Parameters.Add(new OracleParameter("clob", OracleDbType.Clob) { Value = clob });
await command.ExecuteNonQueryAsync();
await transaction.CommitAsync();
}
}
}
}
catch (Exception ex)
{
Console.WriteLine($"Error: {ex.Message}");
// Handle the exception as appropriate
}
}
private async Task ReadAndWriteChunkAsync(Stream stream, OracleClob clob, int bufferSize)
{
var buffer = new byte[bufferSize];
int bytesRead;
while ((bytesRead = await stream.ReadAsync(buffer, 0, buffer.Length)) > 0)
{
await clob.WriteAsync(buffer, 0, bytesRead);
}
}
1条答案
按热度按时间wswtfjt71#
您可以使用
CopyToAsync
直接传递文件