val hbaseInput: DataSet[String] = ...
val labelCOODS = hbaseInput.flatMap {
line =>
// remove all comments which start with a '#'
val commentFreeLine = line.takeWhile(_ != '#').trim
if(commentFreeLine.nonEmpty) {
val splits = commentFreeLine.split(' ')
val label = splits.head.toDouble
val sparseFeatures = splits.tail
val coos = sparseFeatures.map {
str =>
val pair = str.split(':')
require(
pair.length == 2,
"Each feature entry has to have the form <feature>:<value>")
// libSVM index is 1-based, but we expect it to be 0-based
val index = pair(0).toInt - 1
val value = pair(1).toDouble
(index, value)
}
Some((label, coos))
} else {
None
}
// Calculate maximum dimension of vectors
val dimensionDS = labelCOODS.map {
labelCOO =>
labelCOO._2.map( _._1 + 1 ).max
}.reduce(scala.math.max(_, _))
val labeledVectors: DataSet[LabeledVector] =
labelCOODS.map{ new RichMapFunction[(Double, Array[(Int, Double)]), LabeledVector] {
var dimension = 0
override def open(configuration: Configuration): Unit = {
dimension = getRuntimeContext.getBroadcastVariable(DIMENSION).get(0)
}
override def map(value: (Double, Array[(Int, Double)])): LabeledVector = {
new LabeledVector(value._1, SparseVector.fromCOO(dimension, value._2))
}
}}.withBroadcastSet(dimensionDS, DIMENSION)
1条答案
按热度按时间sqyvllje1#
不需要将数据写入磁盘,然后用
MLUtils.readLibSVM
. 原因如下。MLUtils.readLibSVM
需要一个文本文件,其中每一行都是稀疏特征向量及其相关标签。它使用以下格式表示标签特征向量对:哪里
<feature>
是后续项目的索引value
在特征向量中。MLUtils.readLibSVM
可以读取此格式的文件,并在LabeledVector
示例。因此,您将获得DataSet[LabeledVector]
在读取了libsvm文件之后。这正是你需要的输入格式SVM
以及MultipleLinearRegression
预测器。但是,根据从hbase获得的数据格式,首先必须将数据转换为
libSVM
格式。否则,MLUtils.readLibSVM
无法读取写入的文件。如果您转换数据,那么您也可以直接将数据转换为DataSet[LabeledVector]
并将其用作flink的ml算法的输入。这样可以避免不必要的磁盘周期。如果您从hbase a获得
DataSet[String]
每根弦都有libSVM
格式(请参阅上面的规范),然后您可以应用map
hbase上的操作DataSet
具有以下Map功能。这将把libsvm格式的数据转换成
LabeledVectors
.