如何使用datastax-cassandra-core-api将数据插入到4节点cassandra集群中?

gcuhipw9  于 2022-09-27  发布在  Cassandra
关注(0)|答案(1)|浏览(177)

我有一个单节点(DataStax)Casandra集群,在这个集群中,我必须从一个文件中插入大约10gb的数据。我编写了一个java程序来读取文件并将数据存储为以下内容:

import java.io.BufferedReader;
 import java.io.FileReader;
 import java.io.IOException;
 import java.util.Date;
 import com.datastax.driver.core.BoundStatement;
 import com.datastax.driver.core.Cluster;
 import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.Session;

 public class Xb {

//cluster and session for cassandra connection
private static Cluster cluster;
private static Session session;

//variables for storing file elements
private static String taxid;
private static String geneid;
private static String status;
private static String rna_version;
private static String rna_gi;

private static String protein_version;
private static String protein_gi;
private static String gen_nuc_ver;

private static String gen_nuc_gi;
private static String start_gen_acc;
private static String end_gen_acc;

private static String orientation;
private static String assembly;

     private static String mature_ver;

     private static String mature_gi;

     private static String symbol;

    //Connecting the cassandra node(local host)
    public static Cluster connect(String node){
    return Cluster.builder().addContactPoint(node).build();
   }
    public static void main(String[] args) {
    private static String symbol;
    long lStartTime = new Date().getTime();
    // TODO Auto-generated method stub
    //call connect by passing localhost 
    cluster =connect("localhost");
    session = cluster.connect();
    //session.execute("CREATE KEYSPACE test1 WITH REPLICATION =" +"{'class':'SimpleStrategy','replication_factor':3}");
    //session.createtable('genomics');
    //use test1 : triggers the use of test1 keyspace
    session.execute("USE test1");
    //for counting the lines in the file
    int lineCount=0;

    try
    {
        //Reading the file
        FileReader fr = new FileReader("/home/syedammar/gene2refseq/gene2refseq");
        BufferedReader bf = new BufferedReader(fr);
        String line;
        //iterating over each line in file
        while((line= bf.readLine())!=null){
                lineCount++;
                //splitting the line based on tab spaces
                String[] a =line.split("\\s+");
                System.out.println("Line Count now is ->"+lineCount);
                //System.out.println("This is content"+line+" OVER HERE");
                /*for(int i =0;i<a.length;i++){
                System.out.println(i+"->"+a[i]);
              }*/
                //assigning the values to the corresponding variables
                taxid =a[0];
                geneid=a[1];
                status=a[2];
                rna_version=a[3];
                rna_gi=a[4];
                protein_version=a[5];
                protein_gi=a[6]; 
                gen_nuc_ver=a[7];
                gen_nuc_gi=a[8];
                start_gen_acc=a[9];
                end_gen_acc=a[10];
                orientation=a[11];
                assembly=a[12];
                mature_ver=a[13];
                mature_gi=a[14];
                symbol=a[15];

            //Writing the insert query
            PreparedStatement statement = session.prepare(
            "INSERT INTO test.genomics " +
            "(taxid, " +
            "geneid, " +
            "status, " +
            "rna_version, " +
            "rna_gi, " +
            "protein_version, " +
            "protein_gi, " +
            "gen_nuc_ver, " +
            "gen_nuc_gi, " +
            "start_gen_acc, " +
            "end_gen_acc, " +
            "orientation, " +
            "assembly, " +
            "mature_ver, " +
            "mature_gi," +
            "symbol" + 
            ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);"); 

            //create the bound statement and initialise it with your prepared statement
            BoundStatement boundStatement = new BoundStatement(statement); 

            session.execute( // this is where the query is executed
            boundStatement.bind( // here you are binding the 'boundStatement'
            taxid,geneid,status,rna_version,rna_gi,protein_version,protein_gi,gen_nuc_ver,gen_nuc_gi,start_gen_acc,end_gen_acc,orientation,assembly,mature_ver,mature_gi,symbol));
    }//end of while
} //end of try
    catch(IOException e){
        e.printStackTrace();
    }   
        long lEndTime = new Date().getTime(); 
        long difference = lEndTime - lStartTime;
        int seconds = (int) (difference / 1000) % 60 ; //converting milliseconds to seconds
        System.out.println("Elapsed seconds: " + seconds);
        System.out.println("No of lines read are :"+ lineCount);
        System.out.println("Record's entered into cassandra successfully");

        session.close();
        cluster.close();http://stackoverflow.com/editing-help

    }//end of m}// end of class

这很好,我把记录保存在Cassandra。
现在我已经建立了一个4节点的Cassandra集群,我想做同样的任务,读取相同的文件并将其内容存储到4节点集群中。
我的问题是我该如何做到这一点,我需要把这个程序送到哪个节点。我该怎么做?
我的问题是,我将如何与4节点集群建立连接,我必须在上面的代码中进行哪些更改。好像这部分会有一些变化

public static Cluster connect(String node){
    return Cluster.builder().addContactPoint(node).build();
}

会有什么变化,N我将这个程序发送到哪个节点?我不清楚这将如何发生。另外,请告诉我,在4节点集群中插入整个数据所需的时间与在单个节点上插入数据所需时间相同,还是更快。
谢谢

gmol1639

gmol16391#

有关如何使用DataStax java驱动程序将数据最佳加载到Cassandra的一个好例子(参考程序),请看Brian Hess's Cassandra-loader
我需要在哪个节点输入这个程序
所有cassandra节点都是相等的,并且都可以进行写操作。然而,司机会为您处理这件事。只需给它一些节点作为端点,当它建立连接时,它就会知道存在哪些节点。它还将知道哪些节点拥有哪些数据,并相应地执行写入操作。
在4节点集群中插入整个数据所需的时间与在单个节点上插入数据所需时间相同还是更快。
考虑复制因素后,集群将随着节点的添加而线性扩展。因此,您将能够线性增加吞吐量。i、 e.如果3个节点RF3可以进行X次写入,则6个节点RF2可以进行~2X次写入。

相关问题