elasticsearch hadoop spark连接器无法使用现成的es服务器设置和默认库设置进行连接/写入

eni9jsuy  于 2021-05-22  发布在  Spark
关注(0)|答案(3)|浏览(755)

我在使用elasticsearchSpark连接器时遇到了一些问题,如下所述:https://www.elastic.co/guide/en/elasticsearch/hadoop/master/spark.html. 我甚至无法在他们的页面上找到使用ElasticSearch7.4.0的普通示例的例子,而我是通过下载和启动ElasticSearch7.4.0的

<downloadDir>/bin/elasticsearch

这是我为跑步所做的。我通过命令启动了spark:

spark-shell --packages "org.elasticsearch:elasticsearch-hadoop:7.4.0"

然后,我输入了上述文档页面上给出的代码行:

import org.apache.spark.SparkContext        // attempt 1
import org.apache.spark.SparkContext._
import org.elasticsearch.spark._

val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3)
val airports = Map("arrival" -> "Otopeni", "SFO" -> "San Fran")

spark.sparkContext.makeRDD( Seq(numbers, airports)).saveToEs("spark/docs")

我收到一些奇怪的错误,表明es连接到的不是默认的主节点[127.0.0.1:9200],然后即使使用该节点也会失败:

[Stage 0:>                                                        (0 + 12) / 12]20/10/13 19:39:21 ERROR NetworkClient: Node [172.20.0.3:9200] failed (org.apache.commons.httpclient.ConnectTimeoutException: The host did not accept the connection within timeout of 60000 ms); selected next node [127.0.0.1:9200]
20/10/13 19:39:21 ERROR NetworkClient: Node [172.20.0.3:9200] failed (org.apache.commons.httpclient.ConnectTimeoutException: The host did not accept the connection within timeout of 60000 ms); selected next node [127.0.0.1:9200]

注意如果我输入http://127.0.0.1:9200/在我的浏览器url栏中,我得到一个json文档,指示集群已启动localhost:9200. 所以,我被难住了!非常感谢您的指导。

更新

我尝试了mikalai建议的答案(必须通过rdd调用savetoes,而不是dataframe,因为它由于某种原因无法编译)。不幸的是,我得到了同样的结果。

import org.apache.spark.rdd.RDD   // attempt 2
import org.apache.spark.sql.SparkSession
import org.elasticsearch.spark._

object classes {
  case class AlbumIndex(group: String, year: Int,  title: String)

}
object App extends App {
  import classes._
  val spark = SparkSession .builder() .appName("writetoes") .master("local[*]") .config("spark.es.nodes","localhost").config("spark.es.port","9200").getOrCreate()
  val indexDocuments: Seq[AlbumIndex] = Seq(
      AlbumIndex("Led Zeppelin",1969,"Led Zeppelin"),
      AlbumIndex("Boston",1976,"Boston"),
      AlbumIndex("Fleetwood Mac", 1979,"Tusk")
  )
  val rdd: RDD[AlbumIndex] = spark.sparkContext.makeRDD( indexDocuments)
  rdd.saveToEs("demoindex/albumindex")
}
7cwmlq89

7cwmlq891#

所以,问题是我在另一个窗口中有另一个elasticsearch示例监听同一个端口。总是以奇怪的方式处理事情。所以。。这个适配器完全没有问题。问题是我。

dfuffjeb

dfuffjeb2#

你需要配置elasticsearch的端口和ip在哪里运行请找到下面的我想这会对你有所帮助。

val spark = SparkSession
    .builder()
    .appName("writetoes")
    .master("local[*]")
    .config("spark.es.nodes","localhost")//give your elastic node ip
    .config("spark.es.port","9200")//port where its running
    .getOrCreate()

import spark.implicits._

val indexDocuments = Seq(
    AlbumIndex("Led Zeppelin",1969,"Led Zeppelin"),
    AlbumIndex("Boston",1976,"Boston"),
    AlbumIndex("Fleetwood Mac", 1979,"Tusk")
).toDF

indexDocuments.saveToEs("demoindex/albumindex")
3pmvbmvn

3pmvbmvn3#

请注意,172.0.0.0网络空间是ip的专用网络范围。很可能您的elasticsearch节点将其中一个地址作为绑定地址而不是127.0.0.1。es hadoop/spark尝试在对集群进行任何写入之前“发现”集群。发现过程的一部分涉及从给定的节点列表中随机联系一个节点,并要求它提供集群中所有节点的ip地址。很可能您的elasticsearch节点认为它应该可以在172.x.x.x上访问,并且连接器正在将该地址作为发现过程的一部分进行提取,并尝试将其用于将来的所有通信,即使无法建立到该地址的ip连接(出于任何原因)。
您应该能够为这些本地运行禁用节点发现。这将切换es hadoop/spark连接器,使其不尝试查找集群中尚未在中指定的任何节点 es.nodes 设置。可以通过设置 es.nodes.discovery 属性设置为false。在spark中,您需要在它前面加上 spark. 否则斯帕克会把房子扔掉。

SparkSession.builder()
      .appName("my-app")
      .config("spark.es.nodes", "localhost")
      .config("spark.es.port", "9200")
      .config("spark.es.nodes.discovery", false)
      .getOrCreate()

相关问题