在spark任务中将数据保存到elasticsearch

ne5o7dgx  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(452)

在通过kafka和spark处理avro消息流时,我将处理后的数据保存为elasticsearch索引中的文档。下面是代码(简化):

directKafkaStream.foreachRDD(rdd ->{

        rdd.foreach(avroRecord -> {
            byte[] encodedAvroData = avroRecord._2;
            MyType t = deserialize(encodedAvroData);

    // Creating the ElasticSearch Transport client
    Settings settings = Settings.builder()
            .put("client.transport.ping_timeout", 5, TimeUnit.SECONDS).build();
    TransportClient client = new PreBuiltTransportClient(settings)
            .addTransportAddress(new TransportAddress(InetAddress.getByName("localhost"), 9300));

    IndexRequest indexRequest = new IndexRequest("index", "item", id)
            .source(jsonBuilder()
                    .startObject()
                    .field("name", name)
                    .field("timestamp", new Timestamp(System.currentTimeMillis()))
                    .endObject());

    UpdateRequest updateRequest = new UpdateRequest("index", "item", id)
            .doc(jsonBuilder()
                    .startObject()
                    .field("name", name)
                    .field("timestamp", new Timestamp(System.currentTimeMillis()))
                    .endObject())
            .upsert(indexRequest);

    client.update(updateRequest).get();

    client.close();

一切正常;唯一的问题是性能:保存到es需要一些时间,我想这是因为我为每个rdd打开/关闭了es传输客户机。spark文档表明这种方法是非常正确的:据我所知,唯一可能的优化是使用rdd.foreachpartition,但我只有一个分区,所以我不确定这是否有益。有没有其他解决方案可以实现更好的性能?

ocebsuys

ocebsuys1#

我会将处理过的消息流回到一个单独的kafka主题,然后使用kafka connect将它们放到elasticsearch上。这将使特定于spark的处理与将数据导入elasticsearch分离。
它在行动中的例子:https://www.confluent.io/blog/blogthe-simplest-useful-kafka-connect-data-pipeline-in-the-world-or-thereabouts-part-2/

gstyhher

gstyhher2#

因为只要处理rdd的记录,就创建新的connect。所以,我想用 foreachPartition 无论只使用一个分区,都可以获得更好的性能,因为它可以帮助您将es连接示例带到外部,并在循环中重用它。

相关问题