我的问题有点像这里描述的那样。部分代码(实际上取自apache站点)如下所示
val httpHosts = new java.util.ArrayList[HttpHost]
httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"))
httpHosts.add(new HttpHost("10.2.3.1", 9200, "http"))
val esSinkBuilder = new ElasticsearchSink.Builder[String](
httpHosts,
new ElasticsearchSinkFunction[String] {
def createIndexRequest(element: String): IndexRequest = {
val json = new java.util.HashMap[String, String]
json.put("data", element)
return Requests.indexRequest()
.index("my-index")
.`type`("my-type")
.source(json)
如果我加上这三个语句,我得到的错误如下
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink
我犯了个错误
object elasticsearch is not a member of package org.apache.flink.streaming.connectors
object elasticsearch6 is not a member of package org.apache.flink.streaming.connectors
如果我不添加那些import语句,我会得到如下错误
Compiling 1 Scala source to E:\sar\scala\practice\readstbdata\target\scala-2.11\classes ...
[error] E:\sar\scala\practice\readstbdata\src\main\scala\example\readcsv.scala:35:25: not found: value ElasticsearchSink
[error] val esSinkBuilder = new ElasticsearchSink.Builder[String](
[error] ^
[error] E:\sar\scala\practice\readstbdata\src\main\scala\example\readcsv.scala:37:7: not found: type ElasticsearchSinkFunction
[error] new ElasticsearchSinkFunction[String] {
[error] ^
[error] two errors found
[error] (Compile / compileIncremental) Compilation failed
[error] Total time: 1 s, completed 10 Feb, 2020 2:15:04 PM
上面提到的堆栈流问题,有些函数已经扩展了。我的理解是,flink.streaming.connectors.elasticsearch必须扩展到rest库中。1) 我的理解是正确的2)如果是,我可以有完整的扩展3)如果我的理解是错误的,请给我一个解决方案。
注意:我在build.sbt中添加了以下语句
libraryDependencies += "org.elasticsearch.client" % "elasticsearch-rest-high-level-client" % "7.5.2" ,
libraryDependencies += "org.elasticsearch" % "elasticsearch" % "7.5.2",
2条答案
按热度按时间wgmfuz8q1#
流连接器不是flink二进制发行版的一部分。你必须把它们和你的申请一起打包。
为了
elasticsearch6
你需要加上flink-connector-elasticsearch6_2.11
,你可以这样做一旦这个jar成为构建的一部分,编译器就会找到缺少的组件。但是,我不知道这个es6客户机是否可以与7.5.2版一起使用。
11dmarpk2#
flink elasticsearch接头7
请看我在这里提供的工作和详细的答案,这是用scala写的。