cassandra 在齐柏林飞艇里找不到临时工

6fe3ivhb  于 2022-11-05  发布在  Cassandra
关注(0)|答案(1)|浏览(158)

enter image description here我在尝试选择临时表时收到一个错误。有人可以帮助我吗?

object StreamingLinReg extends java.lang.Object{

    val conf = new SparkConf(true)
          .set("spark.cassandra.connection.host", "127.0.0.1").setAppName("Streaming Liniar Regression")
          .set("spark.cassandra.connection.port", "9042")
          .set("spark.driver.allowMultipleContexts", "true")
          .set("spark.streaming.receiver.writeAheadLog.enable", "true")

        val sc = new SparkContext(conf);
        val ssc = new StreamingContext(sc, Seconds(1));

        val sqlContext = new org.apache.spark.sql.SQLContext(sc);

        import sqlContext.implicits._

            val trainingData = ssc.cassandraTable[String]("features","consumodata").select("consumo", "consumo_mensal", "soma_pf", "tempo_gasto").map(LabeledPoint.parse)

           trainingData.toDF.registerTempTable("training")
           val dstream = new ConstantInputDStream(ssc, trainingData)

            val numFeatures = 100
            val model  = new StreamingLinearRegressionWithSGD()
                   .setInitialWeights(Vectors.zeros(numFeatures))
                   .setNumIterations(1)
                   .setStepSize(0.1)
                   .setMiniBatchFraction(1.0)

 model.trainOn(dstream)
                   model.predictOnValues(dstream.map(lp => (lp.label,  lp.features))).foreachRDD { rdd => 
                   val metrics = new RegressionMetrics(rdd)
                   val MSE = metrics.meanSquaredError //Squared error
                   val RMSE = metrics.rootMeanSquaredError //Squared error
                   val MAE = metrics.meanAbsoluteError //Mean absolute error
                   val Rsquared = metrics.r2
                   //val Explained variance = metrics.explainedVariance
                   rdd.toDF.registerTempTable("liniarRegressionModel")
                  }

     }
        ssc.start()
        ssc.awaitTermination()
  //}
 }

 %sql
 select * from liniarRegressionModel limit 10

当我选择临时表时,我得到一个错误消息。我在执行选择临时表后运行第一段。

org.apache.spark.sql.AnalysisException: Table not found:       liniarRegressionModel; line 1 pos 14 at      org.apache.spark.sql.catalyst.analysis.
 package$AnalysisErrorAt.failAnalysis (package.scala:42) at   org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations
 $.getTable (Analyzer.scala:305) at   org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations
 $$anonfun$apply$9.applyOrElse
 (Analyzer.scala:314) at    org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations
    $$anonfun$apply$9.applyOrElse(Analyzer.scala:309) at   org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
    $$anonfun$resolveOperators$1.apply(LogicalPlan.scala:57) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
    $$anonfun$resolveOperators$1.apply(LogicalPlan.scala:57) at   org.apache.spark.sql.catalyst.trees.CurrentOrigin
    $.withOrigin(TreeNode.scala:69) at   org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators
   (LogicalPlan.scala:56) at   org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
    $$anonfun$1.apply(LogicalPlan.scala:54) at   org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$1.apply
   (LogicalPlan.scala:54) at      org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply
   (TreeNode.scala:281) at scala.collection.Iterator
   $$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$
   class.foreach(Iterator.scala:727) at     scala.collection.AbstractIterator.foreach
   (Iterator.scala:1157) at scala.collection.generic.Growable   $class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.
   $plus$plus$eq(ArrayBuffer.scala:103) at   scala.collection.mutable.ArrayBuffer.
   $plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to
   (TraversableOnce.scala:273) at scala.collection.AbstractIterator.to
   (Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer
   (TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer
   (Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray
   (TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray
   (Iterator.scala:1157)
j8ag8udp

j8ag8udp1#

执行代码后输出

import java.lang.Object
 import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.SparkContext._
 import org.apache.spark.sql.cassandra._
 import org.apache.spark.sql.SaveMode
 import org.apache.spark.sql
 import org.apache.spark.streaming._
 import org.apache.spark.streaming.dstream.DStream
 import org.apache.spark.streaming.StreamingContext._
 import com.datastax.spark.connector.streaming._
 import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.mllib.linalg.{Vector, Vectors}
 import org.apache.spark.mllib.regression.LabeledPoint
 import org.apache.spark.mllib.regression.StreamingLinearRegressionWithSGD
 import org.apache.spark.streaming.{Seconds, StreamingContext}
 import org.apache.spark.streaming.dstream.ConstantInputDStream
 import org.apache.spark.mllib.evaluation.RegressionMetrics
 defined module StreamingLinReg

 FINISHED
 Took 15 seconds

相关问题