如何使用kafka connect将json消息从kinesis发送到msk,然后发送到ElasticSearch

fquxozlt  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(416)

我已经准备好了,流程也在运行。我正在使用lambda函数将我的数据从kinesis流发送到msk,消息的格式如下

{
        "data": {
                "RequestID":    517082653,
                "ContentTypeID":        9,
                "OrgID":        16145,
                "UserID":       4,
                "PromotionStartDateTime":       "2019-12-14T16:06:21Z",
                "PromotionEndDateTime": "2019-12-14T16:16:04Z",
                "SystemStartDatetime":  "2019-12-14T16:17:45.507000000Z"
        },
        "metadata":     {
                "timestamp":    "2019-12-29T10:37:31.502042Z",
                "record-type":  "data",
                "operation":    "insert",
                "partition-key-type":   "schema-table",
                "schema-name":  "dbo",
                "table-name":   "TRFSDIQueue"
        }
}

这个json消息我发送到Kafka主题如下

props.put("acks", "all");
            props.put("retries", 0);
            props.put("batch.size", 16384);
            props.put("linger.ms", 1);
            props.put("buffer.memory", 33554432);
            props.put("producer.type", "async");
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            Producer<String, String> producer = new KafkaProducer<String, String>(props);
            System.out.println("Inside loop successfully");
            try {
                producer.send(
                        new ProducerRecord<String, String>(topicName, new String(rec.getKinesis().getData().array())));
                Thread.sleep(1000);

                System.out.println("Message sent successfully");
            } catch (Exception e) {
                System.out.println("------------Exception message=-------------" + e.toString());
            }

            finally {
                producer.flush();
                producer.close();
            }

当我启动kafka连接进行ElasticSearch时,我得到了如下错误

DataException: Converting byte[] to Kafka Connect data failed due to serialization error

我还修改了quickstart-elasticsearch.properties,并将键值序列化程序更改为字符串。
当它是json时,它抛出了错误。
我可以看到索引是用ElasticSearch中的kafka主题名创建的,但没有记录。
所以请帮我解决一些困惑。1我是否正确地从制作人kinesis流发送消息?我正在使用

props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

或者我应该在这里使用json,但是这里没有json。
或者我必须在中使用json序列化程序 quickstart-elasticsearch.properties ?
如果事件是insert,那么它将在elastci search中插入记录delete and update呢,kafka connect在elastic search中为我们处理delete and update?
提前谢谢

mm5n2pyu

mm5n2pyu1#

对于30天的免费试用,您可以使用kinesis源连接器,也可以学习如何编写自己的源连接器,并将其部署在elasticsearch接收器旁边,而不是使用lambda。。。
第二,逆向工作。你能创建一个假主题并在lambda之外发送相同格式的记录吗?最后会在Kafka吗?elasticsearch怎么样?把kibana也从方程中去掉,如果你在用它,但它不起作用的话
然后关注lambda集成
回答你的问题
1) 您正在以字符串形式发送json。不需要单独的json序列化程序,除非在序列化程序接口中发送Map到json字符串的pojo类。
你正在发送json记录,所以你应该使用jsonconverter,在connect中,是的。但是,我不认为elasticsearchMap会自动创建,除非您有一个模式和负载,因此简单的解决方法是提前创建es索引Map(但是如果您已经知道,那么您已经设计了一个模式,因此,生产者代码最终有责任发送正确的记录)。
如果您提前定义Map,您应该能够在connect中简单地使用stringconverter
关于您的生产者代码,我唯一想更改的是重试次数大于0。并使用try with resources,而不是显式关闭producer

//... parse input 
try (Producer<String, String> producer = new KafkaProducer<>(props)) {
//... send record 
}

2) 您可以在github问题中搜索连接器,但上次我检查时,它执行完整的文档更新和插入,没有部分更新或任何删除

相关问题