我目前正在编写一个由生产者和消费者组成的scala应用程序。制作人从外部获取一些数据,并将它们写入Kafka内部。消费者阅读Kafka的内容,然后写信给elasticsearch。
消费者基于spark流媒体,每5秒从kafka获取新消息并将其写入elasticsearch。问题是我无法写入es,因为我会遇到很多错误,如以下错误:
错误][2015-04-24 11:21:14734][org.apache.spark.taskcontextimpl]:taskcompletionlistener org.elasticsearch.hadoop.eshadoopexception中出错:无法写入所有条目[3/26560](可能es过载?)。救出。。。在org.elasticsearch.hadoop.rest.restrepository.flush(restrepository。java:225)~[elasticsearch-spark\ 2.10-2.1.0.beta3。jar:2.1.0.beta3]在org.elasticsearch.hadoop.rest.restrepository.close(restrepository。java:236)~[elasticsearch-spark\ 2.10-2.1.0.beta3。jar:2.1.0.beta3]在org.elasticsearch.hadoop.rest.restservice$partitionwriter.close(restservice。java:125)~[elasticsearch-spark\ 2.10-2.1.0.beta3。jar:2.1.0.beta3]在org.elasticsearch.spark.rdd.esrddriter$$anonfun$write$1.apply$mcv$sp(esrddriter。scala:33)~[elasticsearch-spark\ 2.10-2.1.0.beta3。jar:2.1.0.beta3]在org.apache.spark.taskcontextimpl$$anon$2.ontaskcompletion(taskcontextimpl。scala:57)[Spark芯2.10-1.2.1。jar:1.2.1]在org.apache.spark.taskcontextimpl$$anonfun$marktaskcompleted$1.apply(taskcontextimpl。scala:68)[Spark芯2.10-1.2.1。jar:1.2.1]在org.apache.spark.taskcontextimpl$$anonfun$marktaskcompleted$1.apply(taskcontextimpl。scala:66) [Spark芯2.10-1.2.1。jar:1.2.1]在scala.collection.mutable.resizablearray$class.foreach(resizablearray。scala:59)[na:na]在scala.collection.mutable.arraybuffer.foreach(arraybuffer。scala:47)[na:na]位于org.apache.spark.taskcontextimpl.marktaskcompleted(taskcontextimpl。scala:66)[Spark芯2.10-1.2.1。jar:1.2.1]在org.apache.spark.scheduler.task.run(任务。scala:58)[Spark芯2.10-1.2.1。jar:1.2.1]在org.apache.spark.executor.executor$taskrunner.run(executor。scala:200)[Spark芯2.10-1.2.1。jar:1.2.1]位于java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor。java:1145)[na:1.7.0_]在java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor。java:615)[na:1.7.0_]at java.lang.thread.run(线程。java:745)[na:1.7.0キ65]
假设制作人每15秒写6条消息,所以我真的不明白这种“过载”是怎么发生的(我甚至清理了主题并刷新了所有旧消息,我认为这与偏移量问题有关)。spark streaming每5秒执行一次的任务可以用以下代码总结:
val result = KafkaUtils.createStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc, kafkaParams, Map("wasp.raw" -> 1), StorageLevel.MEMORY_ONLY_SER_2)
val convertedResult = result.map(k => (k._1 ,AvroToJsonUtil.avroToJson(k._2)))
//TO-DO : Remove resource (yahoo/yahoo) hardcoded parameter
log.info(s"***EXECUTING SPARK STREAMING TASK + ${java.lang.System.currentTimeMillis()}***")
convertedResult.foreachRDD(rdd => {
rdd.map(data => data._2).saveToEs("yahoo/yahoo", Map("es.input.json" -> "true"))
})
如果我尝试打印消息而不是发送给es,一切都很好,实际上我只看到6条消息。为什么我不能给她写信?
为了完整起见,我使用这个库编写了es:elasticsearch-spark_.10的最新beta版本。
6条答案
按热度按时间t3irkdon1#
正如前面提到的,这是一个文档写入冲突。
你的
convertedResult
数据流包含多个具有相同id的记录。当作为同一批的一部分写入elastic时,会产生上述错误。可能的解决方案:
为每个记录生成唯一的id。根据您的用例,可以用几种不同的方法来完成。例如,一种常见的解决方案是通过组合id和lastmodifieddate字段来创建一个新字段,并在写入时将该字段用作id。
根据id执行重复数据消除-仅选择一个具有特定id的记录并丢弃其他重复项。根据您的用例,这可能是最新的记录(基于时间戳字段)、最完整的记录(大多数字段包含数据)等。
1解决方案将存储流中收到的所有记录。
2解决方案将根据重复数据消除逻辑仅存储特定id的唯一记录。此结果与设置相同
"es.batch.size.entries" -> "1"
,除非您不会通过一次写入一条记录来限制性能。ifmq2ha22#
其中一种可能性是集群/碎片状态为红色。请解决这个问题,这可能是由于未分配的副本。一旦状态变为绿色,api调用就成功了。
k97glaaz3#
当我把dataframe推到spark上的es时,我收到了相同的错误消息。即使有
"es.batch.size.entries" -> "1"
配置,我也有同样的错误。一旦我在es中增加了线程池,我就可以解决这个问题。例如,
大容量池
jexiocij4#
这是文档写入冲突。
例如:
多个文档为elasticsearch指定相同的\u id。
这些文档位于不同的分区中。
spark同时向es写入多个分区。
结果是elasticsearch一次从多个来源/通过多个节点/包含不同数据的单个文档接收多个更新
“我每批丢失1到3条消息。”
批量大小大于1时失败次数波动
批量写入大小为“1”时成功
llycmphe5#
只是为这个错误添加了另一个可能的原因,希望它能帮助某些人。如果elasticsearch索引包含子文档,则:
如果您使用的是自定义路由字段(而不是\u id),则根据文档,文档的唯一性不能得到保证。这可能会导致从spark更新时出现问题。
如果您使用的是标准的\u id,则将保留唯一性,但是您需要确保在从spark写入elasticsearch时提供以下选项:
es.mapping.join连接
es.Map.路由
d4so4syb6#
在多次尝试之后,我发现了一种写入elasticsearch而不会出错的方法。基本上是传递参数
"es.batch.size.entries" -> "1"
用savetoes方法解决了这个问题。我不明白为什么使用默认或任何其他批处理大小会导致上述错误,因为如果我尝试写入的内容超过允许的最大批处理大小,而不是更少,我会收到错误消息。此外,我还注意到,实际上我是在给es写信,但不是所有的消息,我每批丢失1到3条消息。