我在使用elasticsearchSpark连接器时遇到了一些问题,如下所述:https://www.elastic.co/guide/en/elasticsearch/hadoop/master/spark.html. 我甚至无法在他们的页面上找到使用ElasticSearch7.4.0的普通示例的例子,而我是通过下载和启动ElasticSearch7.4.0的
<downloadDir>/bin/elasticsearch
这是我为跑步所做的。我通过命令启动了spark:
spark-shell --packages "org.elasticsearch:elasticsearch-hadoop:7.4.0"
然后,我输入了上述文档页面上给出的代码行:
import org.apache.spark.SparkContext // attempt 1
import org.apache.spark.SparkContext._
import org.elasticsearch.spark._
val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3)
val airports = Map("arrival" -> "Otopeni", "SFO" -> "San Fran")
spark.sparkContext.makeRDD( Seq(numbers, airports)).saveToEs("spark/docs")
我收到一些奇怪的错误,表明es连接到的不是默认的主节点[127.0.0.1:9200],然后即使使用该节点也会失败:
[Stage 0:> (0 + 12) / 12]20/10/13 19:39:21 ERROR NetworkClient: Node [172.20.0.3:9200] failed (org.apache.commons.httpclient.ConnectTimeoutException: The host did not accept the connection within timeout of 60000 ms); selected next node [127.0.0.1:9200]
20/10/13 19:39:21 ERROR NetworkClient: Node [172.20.0.3:9200] failed (org.apache.commons.httpclient.ConnectTimeoutException: The host did not accept the connection within timeout of 60000 ms); selected next node [127.0.0.1:9200]
注意如果我输入http://127.0.0.1:9200/在我的浏览器url栏中,我得到一个json文档,指示集群已启动localhost:9200. 所以,我被难住了!非常感谢您的指导。
更新
我尝试了mikalai建议的答案(必须通过rdd调用savetoes,而不是dataframe,因为它由于某种原因无法编译)。不幸的是,我得到了同样的结果。
import org.apache.spark.rdd.RDD // attempt 2
import org.apache.spark.sql.SparkSession
import org.elasticsearch.spark._
object classes {
case class AlbumIndex(group: String, year: Int, title: String)
}
object App extends App {
import classes._
val spark = SparkSession .builder() .appName("writetoes") .master("local[*]") .config("spark.es.nodes","localhost").config("spark.es.port","9200").getOrCreate()
val indexDocuments: Seq[AlbumIndex] = Seq(
AlbumIndex("Led Zeppelin",1969,"Led Zeppelin"),
AlbumIndex("Boston",1976,"Boston"),
AlbumIndex("Fleetwood Mac", 1979,"Tusk")
)
val rdd: RDD[AlbumIndex] = spark.sparkContext.makeRDD( indexDocuments)
rdd.saveToEs("demoindex/albumindex")
}
3条答案
按热度按时间7cwmlq891#
所以,问题是我在另一个窗口中有另一个elasticsearch示例监听同一个端口。总是以奇怪的方式处理事情。所以。。这个适配器完全没有问题。问题是我。
dfuffjeb2#
你需要配置elasticsearch的端口和ip在哪里运行请找到下面的我想这会对你有所帮助。
3pmvbmvn3#
请注意,172.0.0.0网络空间是ip的专用网络范围。很可能您的elasticsearch节点将其中一个地址作为绑定地址而不是127.0.0.1。es hadoop/spark尝试在对集群进行任何写入之前“发现”集群。发现过程的一部分涉及从给定的节点列表中随机联系一个节点,并要求它提供集群中所有节点的ip地址。很可能您的elasticsearch节点认为它应该可以在172.x.x.x上访问,并且连接器正在将该地址作为发现过程的一部分进行提取,并尝试将其用于将来的所有通信,即使无法建立到该地址的ip连接(出于任何原因)。
您应该能够为这些本地运行禁用节点发现。这将切换es hadoop/spark连接器,使其不尝试查找集群中尚未在中指定的任何节点
es.nodes
设置。可以通过设置es.nodes.discovery
属性设置为false。在spark中,您需要在它前面加上spark.
否则斯帕克会把房子扔掉。