apache-flink-svm预测

yx2lnoni  于 2021-06-24  发布在  Flink
关注(0)|答案(1)|浏览(653)

我正在使用apache flink预测来自twitter的流。
代码是用scala实现的
我的问题是,我从dataset api训练的svm模型需要一个数据集作为predict()方法的输入。
我在这里已经看到一个问题,一个用户说,您需要编写一个自己的mapfunction,在作业开始时读取模型(参考:flink中使用scala的实时流预测)
但我不能写/理解这段代码。
即使我在streamingmap函数中得到模型。我仍然需要一个数据集作为预测结果的参数。
我真的希望有人能向我展示/解释这是怎么做到的。
flink版本:1.9 scala版本:2.11 flink ml:2.11

val strEnv = StreamExecutionEnvironment.getExecutionEnvironment
val env = ExecutionEnvironment.getExecutionEnvironment

//this is my Model including all the terms to calculate the tfidf-values and to create a libsvm
val featureVectorService = new FeatureVectorService
        featureVectorService.learnTrainingData(labeledData, false)

//reads the created libsvm
val trainingData: DataSet[LabeledVector] = MLUtils.readLibSVM(env, "...")
        val svm = SVM()
                .setBlocks(env.getParallelism)
                .setIterations(100)
                .setRegularization(0.001)
                .setStepsize(0.1)
                .setSeed(42)
//learning
svm.fit(trainingData)

//this is my twitter stream - text should be predicted later
val streamSource: DataStream[String] = strEnv.addSource(new TwitterSource(params.getProperties))

//the texts i want to transform to tfidf using the service upon and give it the svm to predict
val tweets: DataStream[(String, String)] = streamSource
                .flatMap(new SelectEnglishTweetWithCreatedAtFlatMapper)
yqhsw0fo

yqhsw0fo1#

所以,目前的flinkml SVM 是的一部分,不支持流式api。这就是为什么 SVM 仅接受 DataSet . 我们的想法不是使用flinkml,而是使用scala或java中的一些svm库。然后你可以从文件中读取模型。问题是您必须自己实现大部分逻辑。
你提到的帖子中的评论或多或少是在说同样的事情。

相关问题