flink linearregression:如何加载数据(scala)

iyr7buue  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(377)

我开始在Flink训练多元线性回归算法。我正在关注令人敬畏的官方文档和快速启动。我用齐柏林飞艇来开发这个代码。
如果从csv文件加载数据:

//Read the file:
val data = benv.readCsvFile[(Int, Double, Double, Double)]("/.../quake.csv")
val mapped = data.map {x => new org.apache.flink.ml.common.LabeledVector (x._4, org.apache.flink.ml.math.DenseVector(x._1,x._2,x._3)) }

//Data created:
mapped: org.apache.flink.api.scala.DataSet[org.apache.flink.ml.common.LabeledVector] = org.apache.flink.api.scala.DataSet@7cb37ad3
    LabeledVector(6.7, DenseVector(33.0, -52.26, 28.3))
    LabeledVector(5.8, DenseVector(36.0, 45.53, 150.93))
    LabeledVector(5.8, DenseVector(57.0, 41.85, 142.78))

//Predict with the model created:
Predict with the model createdval predictions:DataSet[org.apache.flink.ml.common.LabeledVector] = mlr.predict(mapped)

如果我从libsvm文件加载数据:

val testingDS: DataSet[(Vector, Double)] = MLUtils.readLibSVM(benv, "/home/borja/Desktop/bbb/quake.libsvm").map(x => (x.vector,   x.label))

但我有个错误:
->csv格式:

res13: org.apache.flink.api.scala.DataSet[org.apache.flink.ml.common.LabeledVector] = org.apache.flink.api.scala.DataSet@7cb37ad3
    <console>:89: error: type mismatch;
     found   : org.apache.flink.api.scala.DataSet[Any]
     required: org.apache.flink.api.scala.DataSet[org.apache.flink.ml.common.LabeledVector]
    Note: Any >: org.apache.flink.ml.common.LabeledVector, but class DataSet is invariant in type T.
    You may wish to define T as -T instead. (SLS 4.5)
    Error occurred in an application involving default arguments.
           val predictions:DataSet[org.apache.flink.ml.common.LabeledVector] = mlr.predict(mapped)

->libsvm公司:

<console>:111: error: type Vector takes type parameters
       val testingDS: DataSet[(Vector, Double)] = MLUtils.readLibSVM(benv, "/home/borja/Desktop/bbb/quake.libsvm").map(x => (x.vector,   x.label))

好吧,我写了:
新代码:

val testingDS: DataSet[(Vector[org.apache.flink.ml.math.Vector], Double)] = MLUtils.readLibSVM(benv, "/home/borja/Desktop/bbb/quake.libsvm").map(x => (x.vector,   x.label))

新错误:

<console>:111: error: type mismatch;
 found   : org.apache.flink.ml.math.Vector
 required: scala.collection.immutable.Vector[org.apache.flink.ml.math.Vector]
       val testingDS: DataSet[(Vector[org.apache.flink.ml.math.Vector], Double)] = MLUtils.readLibSVM(benv, "/home/borja/Desktop/bbb/quake.libsvm").map(x => (x.vector,   x.label))

我真的很感激你的帮助!:)

flmtquvp

flmtquvp1#

您不应该导入和使用scala Vector 班级。flink ml自带 Vector . 这应该起作用:

val testingDS: DataSet[(org.apache.flink.ml.math.Vector, Double)] = MLUtils.readLibSVM(benv, "/home/borja/Desktop/bbb/quake.libsvm").map(x => (x.vector,   x.label))

相关问题