使用c#将csv文件数据插入控制台应用程序中的clickhouse数据库

igsr9ssn  于 2023-01-22  发布在  C#
关注(0)|答案(1)|浏览(205)

我需要使用C#将csv文件数据插入到控制台应用程序中的clickhouse数据库中。在SQL数据库中执行时,数据添加到表中,而在clickhouse中,它不会将数据添加到表中。
数据无法添加到clickhouse数据库中,我也没有收到任何错误

static void Main()
    {
        string csv_file_path = @"C:\Users\thummala.naveen\Downloads\Employee.csv";
       
      InsertDataIntoSQLServerUsingSQLBulkCopy();

    }
    
    public static void InsertDataIntoSQLServerUsingSQLBulkCopy()
    {

        
        SqlConnection con = new SqlConnection(@"data source=10.1.230.49\standard2019;initial catalog=thummala.naveen@hcl.com;uid=sa;password=dbnms#123;integrated security=false;MultipleActiveResultSets=True;App=EntityFramework");
        
        string filepath = "C:\\Users\\thummala.naveen\\Downloads\\Employee.csv";
        StreamReader sr = new StreamReader(filepath);
        string line = sr.ReadLine();
        string[] value = line.Split(',');
        DataTable dt = new DataTable();
        DataRow row;
        foreach (string dc in value)
        {
            dt.Columns.Add(new DataColumn(dc));
        }

        while (!sr.EndOfStream)
        {
            value = sr.ReadLine().Split(',');
            if (value.Length == dt.Columns.Count)
            {
                row = dt.NewRow();
                row.ItemArray = value;
                dt.Rows.Add(row);
            }
        }
        
        ClickHouse.Client.ADO.ClickHouseConnection conn = new ClickHouse.Client.ADO.ClickHouseConnection(@"Compress=True;CheckCompressedHash=False;Compressor=lz4;Host=10.1.162.59;Port=8123;User=default;Password=clickhouse@123;SocketTimeout=600000;Database=naveentest;");
        ClickHouseBulkCopy bcs = new ClickHouseBulkCopy(conn.ConnectionString);
        bcs.DestinationTableName = "Emp";
        bcs.BatchSize = dt.Rows.Count;
        using var csvs = CsvDataReader.Create("C:\\Users\\thummala.naveen\\Downloads\\Employee.csv");

        bcs.WriteToServerAsync(csvs);
        conn.Close();
    }


}

}

p8h8hvxi

p8h8hvxi1#

虽然从技术上讲这不是答案,但这应该有助于您自己找到问题:在单元测试框架中重写你的代码,会非常容易测试。我可以建议使用xunit吗?但是任何单元测试框架都可以。
这个结构并不完美,但应该会引导您找到正确的路径,让您的代码更易于测试(因此更易于调试)。
有了这个结构,你可以“模拟”(即moq框架)你的个人行为,并编写覆盖测试和白盒测试,并精确地找出你的问题所在。
对于你的主方法类,默认名称program,我想这就可以了:

using System.Data;

namespace StackDemoConsoleApp
{
    public class Program
    {
        private static readonly IRowCountHandler _rowCountHandler = new RowCountHandler(new EmployeeFileHandler(), new DataTablePopulator(), new DataTablePreparer());
        private static readonly IPersistenceWriter _persistenceWriter = new PersistenceWriterClickHouse();

        public static void Main(string[] args)
        {
            DataTable dataTableForRowCount = _rowCountHandler.FindRowCount();
            _persistenceWriter.WriteToPersistence(dataTableForRowCount);
        }
    }
}

其余的是接口或类,您应该在项目资源管理器中为它们单独命名文件:

using System.Data;
    using System.IO;
    
    namespace StackDemoConsoleApp
    {
        public class DataTablePopulator : IDataTablePopulator
        {
            public DataTable PopulateDataTable(StreamReader streamReader, ref string[] value, DataTable dataTableForRowCount)
            {
                while (!streamReader.EndOfStream)
                {
                    value = streamReader.ReadLine().Split(',');
                    if (value.Length == dataTableForRowCount.Columns.Count)
                    {
                        var row = dataTableForRowCount.NewRow();
                        row.ItemArray = value;
                        dataTableForRowCount.Rows.Add(row);
                    }
                }
    
                return dataTableForRowCount;
            }
        }
    }

    using System.Data;
    
    namespace StackDemoConsoleApp
    {
        public class DataTablePreparer : IDataTablePreparer
        {
            public DataTable PrepareDataTableStructure(string[] value)
            {
                DataTable dataTableForRowCount = new DataTable();
                foreach (string dc in value)
                {
                    dataTableForRowCount.Columns.Add(new DataColumn(dc));
                }
    
                return dataTableForRowCount;
            }
        }
    }

    using System.IO;
    
    namespace StackDemoConsoleApp
    {
        public class EmployeeFileHandler : IEmployeeFileHandler
        {
            public void ReadEmployeeFile(out StreamReader sr, out string[] value)
            {
                string filepath = "C:\\Users\\thummala.naveen\\Downloads\\Employee.csv";
                sr = new StreamReader(filepath);
                string line = sr.ReadLine();
                value = line.Split(',');
            }
        }
    }

    using System.Data;
    using System.IO;
    
    namespace StackDemoConsoleApp
    {
        public interface IDataTablePopulator
        {
            DataTable PopulateDataTable(StreamReader sr, ref string[] value, DataTable dataTableForRowCount)
        }
    }

using System.Data;
using System.IO;

namespace StackDemoConsoleApp
{
    public interface IDataTablePopulator
    {
        DataTable PopulateDataTable(StreamReader sr, ref string[] value, DataTable dataTableForRowCount)
    }
}

using System.IO;

namespace StackDemoConsoleApp
{
    public interface IEmployeeFileHandler
    {
        void ReadEmployeeFile(out StreamReader sr, out string[] value);
    }
}

using System.Data;

namespace StackDemoConsoleApp
{
    public interface IPersistenceWriter
    {
        void WriteToPersistence(DataTable dataTableForRowCount);
    }
}

using System.Data;

namespace StackDemoConsoleApp
{
    public interface IRowCountHandler
    {
        DataTable FindRowCount();
    }
}

using System.Data;

namespace StackDemoConsoleApp
{
    public class PersistenceWriterClickHouse : IPersistenceWriter
    {
        public void WriteToPersistence(DataTable dataTableForRowCount)
        {
            ClickHouse.Client.ADO.ClickHouseConnection conn = new ClickHouse.Client.ADO.ClickHouseConnection(@"REDACTED");
            ClickHouseBulkCopy bcs = new ClickHouseBulkCopy(conn.ConnectionString);
            bcs.DestinationTableName = "Emp";
            bcs.BatchSize = dataTableForRowCount.Rows.Count;
            using var csvs = CsvDataReader.Create("C:\\Users\\thummala.naveen\\Downloads\\Employee.csv");
            bcs.WriteToServerAsync(csvs);
            conn.Close();
        }
    }
}

using System.Data;
using System.IO;

namespace StackDemoConsoleApp
{
    public class RowCountHandler : IRowCountHandler
    {
        private readonly IEmployeeFileHandler _employeeFileHandler;
        private readonly IDataTablePreparer _dataTablePreparer;
        private readonly IDataTablePopulator _dataTablePopulator;

        public RowCountHandler(IEmployeeFileHandler employeeFileHandler, IDataTablePopulator dataTablePopulator, IDataTablePreparer dataTablePreparer)
        {
            _employeeFileHandler = employeeFileHandler;
            _dataTablePopulator = dataTablePopulator;
            _dataTablePreparer = dataTablePreparer;
        }

        public DataTable FindRowCount()
        {
            StreamReader sr;
            string[] value;
            _employeeFileHandler.ReadEmployeeFile(out sr, out value);
            DataTable dataTableForRowCount= _dataTablePreparer.PrepareDataTableStructure(value);
            return _dataTablePopulator.PopulateDataTable(sr, ref value, dataTableForRowCount);
        }
    }
}

在这个结构中,fx.很容易看出:SqlConnection con =新的SqlConnection(@“已编辑”);
什么都不做。
代码现在更容易阅读和理解了。
由于代码现在被拆分为更小的方法,因此您请求帮助变得更容易,因为您向其他人展示的代码库要小得多,并且更专注于实际问题。
实际上,用这种方式编写代码有很多好处,但这篇文章已经太长了。
为了清楚起见,我遗漏了大量的重命名,因为,说真的,我不想费心去纠正所有的重命名。在这段代码中有太多的“不要做”...
但是请通过DI、SOLID和Clean代码使您的代码可测试。
这将使您的代码质量更好,使其他人能够更快地帮助您,帮助您自己,理解和调试您自己的代码,并使您的代码可测试。

相关问题