SparkStreaming和elasticsearch无法写入所有条目

pgky5nke  于 2021-06-08  发布在  Kafka
关注(0)|答案(6)|浏览(621)

我目前正在编写一个由生产者和消费者组成的scala应用程序。制作人从外部获取一些数据,并将它们写入Kafka内部。消费者阅读Kafka的内容,然后写信给elasticsearch。
消费者基于spark流媒体,每5秒从kafka获取新消息并将其写入elasticsearch。问题是我无法写入es,因为我会遇到很多错误,如以下错误:
错误][2015-04-24 11:21:14734][org.apache.spark.taskcontextimpl]:taskcompletionlistener org.elasticsearch.hadoop.eshadoopexception中出错:无法写入所有条目[3/26560](可能es过载?)。救出。。。在org.elasticsearch.hadoop.rest.restrepository.flush(restrepository。java:225)~[elasticsearch-spark\ 2.10-2.1.0.beta3。jar:2.1.0.beta3]在org.elasticsearch.hadoop.rest.restrepository.close(restrepository。java:236)~[elasticsearch-spark\ 2.10-2.1.0.beta3。jar:2.1.0.beta3]在org.elasticsearch.hadoop.rest.restservice$partitionwriter.close(restservice。java:125)~[elasticsearch-spark\ 2.10-2.1.0.beta3。jar:2.1.0.beta3]在org.elasticsearch.spark.rdd.esrddriter$$anonfun$write$1.apply$mcv$sp(esrddriter。scala:33)~[elasticsearch-spark\ 2.10-2.1.0.beta3。jar:2.1.0.beta3]在org.apache.spark.taskcontextimpl$$anon$2.ontaskcompletion(taskcontextimpl。scala:57)[Spark芯2.10-1.2.1。jar:1.2.1]在org.apache.spark.taskcontextimpl$$anonfun$marktaskcompleted$1.apply(taskcontextimpl。scala:68)[Spark芯2.10-1.2.1。jar:1.2.1]在org.apache.spark.taskcontextimpl$$anonfun$marktaskcompleted$1.apply(taskcontextimpl。scala:66) [Spark芯2.10-1.2.1。jar:1.2.1]在scala.collection.mutable.resizablearray$class.foreach(resizablearray。scala:59)[na:na]在scala.collection.mutable.arraybuffer.foreach(arraybuffer。scala:47)[na:na]位于org.apache.spark.taskcontextimpl.marktaskcompleted(taskcontextimpl。scala:66)[Spark芯2.10-1.2.1。jar:1.2.1]在org.apache.spark.scheduler.task.run(任务。scala:58)[Spark芯2.10-1.2.1。jar:1.2.1]在org.apache.spark.executor.executor$taskrunner.run(executor。scala:200)[Spark芯2.10-1.2.1。jar:1.2.1]位于java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor。java:1145)[na:1.7.0_]在java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor。java:615)[na:1.7.0_]at java.lang.thread.run(线程。java:745)[na:1.7.0キ65]
假设制作人每15秒写6条消息,所以我真的不明白这种“过载”是怎么发生的(我甚至清理了主题并刷新了所有旧消息,我认为这与偏移量问题有关)。spark streaming每5秒执行一次的任务可以用以下代码总结:

val result = KafkaUtils.createStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc, kafkaParams, Map("wasp.raw" -> 1), StorageLevel.MEMORY_ONLY_SER_2)
  val convertedResult = result.map(k => (k._1 ,AvroToJsonUtil.avroToJson(k._2)))

  //TO-DO : Remove resource (yahoo/yahoo) hardcoded parameter
  log.info(s"***EXECUTING SPARK STREAMING TASK  + ${java.lang.System.currentTimeMillis()}***")

  convertedResult.foreachRDD(rdd => {
      rdd.map(data => data._2).saveToEs("yahoo/yahoo", Map("es.input.json" -> "true"))

  })

如果我尝试打印消息而不是发送给es,一切都很好,实际上我只看到6条消息。为什么我不能给她写信?
为了完整起见,我使用这个库编写了es:elasticsearch-spark_.10的最新beta版本。

t3irkdon

t3irkdon1#

正如前面提到的,这是一个文档写入冲突。
你的 convertedResult 数据流包含多个具有相同id的记录。当作为同一批的一部分写入elastic时,会产生上述错误。
可能的解决方案:
为每个记录生成唯一的id。根据您的用例,可以用几种不同的方法来完成。例如,一种常见的解决方案是通过组合id和lastmodifieddate字段来创建一个新字段,并在写入时将该字段用作id。
根据id执行重复数据消除-仅选择一个具有特定id的记录并丢弃其他重复项。根据您的用例,这可能是最新的记录(基于时间戳字段)、最完整的记录(大多数字段包含数据)等。

1解决方案将存储流中收到的所有记录。

2解决方案将根据重复数据消除逻辑仅存储特定id的唯一记录。此结果与设置相同 "es.batch.size.entries" -> "1" ,除非您不会通过一次写入一条记录来限制性能。

ifmq2ha2

ifmq2ha22#

其中一种可能性是集群/碎片状态为红色。请解决这个问题,这可能是由于未分配的副本。一旦状态变为绿色,api调用就成功了。

k97glaaz

k97glaaz3#

当我把dataframe推到spark上的es时,我收到了相同的错误消息。即使有 "es.batch.size.entries" -> "1" 配置,我也有同样的错误。一旦我在es中增加了线程池,我就可以解决这个问题。
例如,

大容量池

threadpool.bulk.type: fixed
threadpool.bulk.size: 600
threadpool.bulk.queue_size: 30000
jexiocij

jexiocij4#

这是文档写入冲突。
例如:
多个文档为elasticsearch指定相同的\u id。
这些文档位于不同的分区中。
spark同时向es写入多个分区。
结果是elasticsearch一次从多个来源/通过多个节点/包含不同数据的单个文档接收多个更新
“我每批丢失1到3条消息。”
批量大小大于1时失败次数波动
批量写入大小为“1”时成功

llycmphe

llycmphe5#

只是为这个错误添加了另一个可能的原因,希望它能帮助某些人。如果elasticsearch索引包含子文档,则:
如果您使用的是自定义路由字段(而不是\u id),则根据文档,文档的唯一性不能得到保证。这可能会导致从spark更新时出现问题。
如果您使用的是标准的\u id,则将保留唯一性,但是您需要确保在从spark写入elasticsearch时提供以下选项:
es.mapping.join连接
es.Map.路由

d4so4syb

d4so4syb6#

在多次尝试之后,我发现了一种写入elasticsearch而不会出错的方法。基本上是传递参数 "es.batch.size.entries" -> "1" 用savetoes方法解决了这个问题。我不明白为什么使用默认或任何其他批处理大小会导致上述错误,因为如果我尝试写入的内容超过允许的最大批处理大小,而不是更少,我会收到错误消息。
此外,我还注意到,实际上我是在给es写信,但不是所有的消息,我每批丢失1到3条消息。

相关问题