akka-http elasticsearch {“错误”:“找不到URI [https://localhost:9200/myindex/_bulk]和方法[POST]的处理程序"}

ftf50wuq  于 2023-05-16  发布在  ElasticSearch
关注(0)|答案(1)|浏览(289)

我用的是ElasticSearch 8.7.0。我创建索引'我的索引':

curl --insecure --user "elastic:password" -X PUT "https://localhost:9200/myindex"

我可以使用curl手动将JSON文档添加到elasticsearch 8.7.0

curl --insecure --user "elastic:password" -X POST "https://localhost:9200/myindex/_bulk" -H 'Content-Type: application/json' -d'{"some": "json"}'

但是当我尝试在Scala 3中使用akka-http编程时,使用以下代码:

@main
def main(args: String*): Unit = {
  val records = Seq(
    """{ "index":{ "_index" : "myindex", "_id" : "1" } }
      |{"title":"title1"}""".stripMargin,
  """{ "index":{ "_index" : "myindex", "_id" : "2" } }
      |{"title":"title2"}""".stripMargin,
  """{ "index":{ "_index" : "myindex", "_id" : "3" } }
      |{"title":"title3"}""".stripMargin,
  """{ "index":{ "_index" : "myindex", "_id" : "4" } }
    |{"title":"title4"}""".stripMargin)

  implicit val system: ActorSystem = ActorSystem("elastic")
  implicit val ec: ExecutionContextExecutor = system.dispatcher

  val trustfulSslContext: SSLContext = {
    // https://github.com/raboof/akka-http-connect-without-checking-cert/blob/0ca55a06afb83b587f592dd97358970502f7b834/Main.scala
    object NoCheckX509TrustManager extends X509TrustManager {
      override def checkClientTrusted(chain: Array[X509Certificate], authType: String): Unit = ()
      override def checkServerTrusted(chain: Array[X509Certificate], authType: String): Unit = ()
      override def getAcceptedIssuers: Array[X509Certificate] = Array[X509Certificate]()
    }
    val context: SSLContext = SSLContext.getInstance("TLS")
    context.init(Array[KeyManager](), Array(NoCheckX509TrustManager), new SecureRandom())
    context
  }

  Source
    .fromIterator[String]( () => records.iterator )
    .grouped(2)
    .map(_.mkString("\n"))
    .map { payload =>
      val entity = HttpEntity(ContentTypes.`application/json`, payload)
      val request = HttpRequest(
        HttpMethods.POST,
        uri = Uri("https://localhost:9200/myindex/_bulk"),
        headers = Seq(headers.Authorization(BasicHttpCredentials("elastic", "password"))),
        entity = entity
      )
      println(s"request: $request")
      request
    }
    .via(Http().outgoingConnectionHttps(host = "localhost", port = 9200, connectionContext = ConnectionContext.httpsClient(trustfulSslContext)))
    .map(response => {
      println(s"response: ${response.entity.toStrict(1 second).map(_.data.utf8String)}")
      response
    })
    .toMat(Sink.ignore)(Keep.none)
    .run()
}

我得到以下输出

request: HttpRequest(HttpMethod(POST),https://localhost:9200/myindex/_bulk,List(Authorization),HttpEntity.Strict(application/json,137 bytes total),HttpProtocol(HTTP/1.1))
response: FulfilledFuture({"error":"no handler found for uri [https://localhost:9200/myindex/_bulk] and method [POST]"})
request: HttpRequest(HttpMethod(POST),https://localhost:9200/myindex/_bulk,List(Authorization),HttpEntity.Strict(application/json,137 bytes total),HttpProtocol(HTTP/1.1))
response: FulfilledFuture({"error":"no handler found for uri [https://localhost:9200/myindex/_bulk] and method [POST]"})

我错过了什么?

gajydyqb

gajydyqb1#

显然,URI只能包含路径,而不是完整的URL。
将HttpRequest中的 Uri(“https://localhost:9200/myindex/_bulk”) 替换为 Uri(“/myindex/_bulk”) 可解决此问题。
此外,有效负载必须以 \n 结尾,因此,我还必须将 .map(_.mkString(“\n”)) 替换为 .map(_.mkString(“",“\n”,“\n”)) 才能使其工作。

相关问题