hadoop—使用安装在同一集群上的apachespark将hdfs文件中的数据插入到另一集群中的dse cassandra表中

4jb9z9bj  于 2021-06-02  发布在  Hadoop
关注(0)|答案(0)|浏览(161)

我们已经在hadoop cluster 2.8上安装了spark 1.6。我们有一个单独的datastax cassandra cluster 4.8.6(c*3.x和spark 1.4.2)。我们需要将hdfs文件中的数据复制到dse cassandra表中。在我们当前的作业中,我们在hdfs+spark cluster上提交一个spark作业(spark submit in standalone mode),它将数据加载到dse cassandra表(单独的集群)中。在这项工作中,我们是连接到Cassandra集群和使用可用的Spark集群那里。我们正试图找到一种方法来使用hadoop集群上可用的spark资源(spark提交Yarn模式),并将其写入cassandra表。是否可以在hadoop集群中使用spark集群并加载到外部cassandra集群中?类似于使用hadoopspark将数据加载到rdd或dataframe,然后连接到cassandra来加载数据?根据我目前的理解和研究,这是不可能的。有什么意见吗---
相同的示例代码(不起作用)-

class LoadData() extends Serializable {

def ReadHDFSFile() {

val HDFSUrl = "hdfs://hadoopcluster"
val hdfsFile = HDFSUrl+"/testHDFSFile"

// Connecting to Spark on HDFS+Spark cluster
val conf = new SparkConf()
conf.setAppName("DataScala")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "false").option("inferSchema", "true").option("delimiter", "\u0001").load(hdfsFile)
df.registerTempTable("testTable")

// now connect to external cassandra cluster
try{

  val cluster = Cluster.builder().addContactPoints(CassandraNodes).withQueryOptions(new QueryOptions().setConsistencyLevel(ConsistencyLevel.ONE)).withCredentials(CassandraUser, CassandraPassword).build
  val session= cluster.connect()
   // is it possible to use DataFrame from different sc to load data into cassandra?
   session.execute("INSERT INTO CassandraTableName SELECT 1, C3, C4, C5, C6, C7, null, C8, C9, null FROM testTable")

} catch  {
  case e: Exception => println(e.fillInStackTrace())

}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题