我想利用apachespark对elasticsearch索引中的文本数据进行预处理。使用elasticsearch hadoop,我正在检索apachespark的索引。我得到一个rdd类型:rdd[(string,scala.collection.map[string,anyref])]
第一个元素看起来像:document:(string,scala.collection.map[string,anyref])=(file:document_id,Map(创建->周一1月20日11:50:35 cet 2014,修改->周五10月23日12:46:40 cest 2015,索引->周五3月25日18:05:37 cet 2016,mimetype->应用程序/pdf,内容->文档的纯文本)
现在关键的部分是使用nlp工具箱处理上述内容字段,并将结果存储在elasticsearch中。第一部分很好。我使用stanfordcorenlp在stackoverflow上发现了一个类似的问题(不幸的是spark本身没有提供这个,我无法直接从elasticsearch检索令牌)。因此,我得到了每个文档的标记rdd[seq[string]],但我不知道如何将其引入elasticsearch。
显然,我需要一个outputdd来连接文档和相关的令牌。类似于:map(“document\u id\u 1”->“tokens for id\u 1”,“document\u id\u 2”->“tokens for id\u 2”)。也许有人想提供一个如何到达那里的提示,或者有一个更好的想法来解决问题。非常感谢您的帮助。
import org.apache.spark._
import org.apache.spark.rdd.RDD
import scala.collection.mutable.ArrayBuffer
import org.elasticsearch.spark._
import org.elasticsearch.spark.rdd._
import edu.stanford.nlp.pipeline._
import edu.stanford.nlp.ling.CoreAnnotations._
import scala.collection.JavaConversions._
import java.util.Properties
object Stemming {
def main(args: Array[String]) {
val conf = new SparkConf() .setMaster("local[*]") .setAppName("SparkLemma")
.set("es.nodes", "hadoop-m:9200" )
.set("es.write.operation", "upsert")
.set("es.mapping.id", "id")
val esIndex = "elastic/documents"
val sc = new SparkContext(conf)
// Read data from ES
val esRDD = sc.esRDD(esIndex)
val id:RDD[String] = esRDD.map(_._1.toString)
val content:RDD[String] = esRDD.map(_._2("content").toString)
val plainText: RDD[(String, String)] = id.zip(content)
val stopWords = sc.broadcast(scala.io.Source.fromFile("stopwords.txt").getLines().toSet).value
def createNLPPipeline(): StanfordCoreNLP = {
val props = new Properties()
props.put("annotators", "tokenize, ssplit, pos, lemma")
new StanfordCoreNLP(props)
}
def plainTextToLemmas(content: String, stopWords: Set[String], nlp: StanfordCoreNLP) : Seq[String] = {
val doc = new Annotation(content)
nlp.annotate(doc)
val lemmas = new ArrayBuffer[String]()
val sentences = doc.get(classOf[SentencesAnnotation])
for (sentence <- sentences;
token <- sentence.get(classOf[TokensAnnotation])) {
val lemma = token.get(classOf[LemmaAnnotation])
if (lemma.length > 3 && !stopWords.contains(lemma)) {
if (lemmas.isEmpty) {
lemmas += id += lemma.toLowerCase
}
else {
lemmas += lemma.toLowerCase
}
}
}
lemmas
}
val lemmatized: RDD[Seq[String]] = plainText.mapPartitions(strings => {
val nlp = createNLPPipeline()
strings.map{case(id, content) => plainTextToLemmas(content, stopWords, nlp)}
})
def writeTokensToES(row:Seq[String]): Map[String,String] = {
val tokens = row.drop(1).mkString(" ")
Map("id" -> row.head, "content" -> tokens, "last-run" -> getDate())
}
val outputRDD = lemmatized.map(row => writeTokensToES(row))
EsSpark.saveToEs(outputRDD, esIndex)
sc.stop()
}
}
暂无答案!
目前还没有任何答案,快来回答吧!