将json文件数据转换成kafka主题

xpcnnkqh  于 2021-06-06  发布在  Kafka
关注(0)|答案(4)|浏览(796)

如何使用kafka控制台生成器将json文件数据插入kafka主题?每个json数据集都可以存储为消息吗?
示例-

{
  "id": 1,
  "first_name": "John",
  "last_name": "Lindt",
  "email": "jlindt@gmail.com",
  "gender": "Male",
  "ip_address": "1.2.3.4"
}

如果你使用这个命令-

cat sampledata.json|kafka-console-producer --broker-list localhost:9092 --topic  stream-test-topic

每一行都作为一条单独的消息。
正确的方法是什么?
谢谢!
ps公司-
这个主题正在被ElasticSearch阅读。示例json消息文件-

[{
"id": 1,
  "first_name": "John",
  "last_name": "Lindt",
  "email": "jlindt@gmail.com",
  "gender": "Male",
  "ip_address": "1.2.3.4"
}, {
  "id": 2,
  "first_name": "Peter",
  "last_name": "Friz",
  "email": "Friz3@gmail.com",
  "gender": "Male",
  "ip_address": "4.5.6.7"
}, {
  "id": 3,
  "first_name": "Dell",
  "last_name": "Chang",
  "email": "Dellc@gmail.com",
  "gender": "Female",
  "ip_address": "8.9.10.11"
}, {
"id": 4,
  "first_name": "Lolita",
  "last_name": "John",
  "email": "LolitaJ@gmail.com",
  "gender": "Female",
  "ip_address": "12.13.14.15"
}, {
"id": 5,
  "first_name": "Pele",
  "last_name": "Wang",
  "email": "Pele@gmail.com",
  "gender": "Male",
  "ip_address": "16.17.18.19"
}, {
  "id": 6,
  "first_name": "Rene",
  "last_name": "Charm",
  "email": "Rene3@gmail.com",
  "gender": "Male",
  "ip_address": "20.21.22.23"
dwthyt8l

dwthyt8l1#

从Kafka的观点来看,每条消息都是字节数组。这取决于客户的应用程序(生产者、消费者等),以及如何处理它。kafka生产者、消费者使用反序列化程序、序列化程序将字节数组转换为业务对象(字符串、pojo)
您面临的问题是kafka控制台生产者从标准输入读取消息的方式。默认使用 LineMessageReader ,它将每行视为新消息。您可以实现自己的,或者在发送之前将json中的每一个新行字符翻译成其他空白。
例如,可以使用以下命令: jq -rc . sampledata.json | kafka-console-producer --broker-list localhost:9092 --topic stream-test-topic

nimxete2

nimxete22#

如果文件中有json消息,可以使用以下方式写入kafka主题:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic user-timeline < samplerecords.json

Kafka生产者使用默认值逐行读取消息 LineMessageReader . 默认的键和值序列化程序是 StringSerializer . 它不会验证是否存在正确的json,而是将其视为原始字符串对象,如发布到kafka主题。但如果您想验证,可以在console producer命令中定义以下配置。

key.serializer
value.serializer

例子:

kafka-console-producer --broker-list localhost:9092 --topic testTopic--property value.serializer=custom.class.serialization.JsonSerializer

在消费者方面,您可以采用类似的方法。使用jsondeserializer读取数据。

kx7yvsdv

kx7yvsdv3#

我也是Kafka的新手,和你有同样的用例。经过一些研究和开发,我找到了一个简短的答案,可能会帮助你。你可以这样写:

bin/kafka-console-producer --broker-list localhost:9092 --topic blogpost
{"title":"The Matrix","year":1999,"cast":["Keanu Reeves","Laurence Fishburne","Carrie-Anne Moss","Hugo Weaving","Joe Pantoliano"],"genres":["Science Fiction"]}.

有关详细信息,请单击此处

mzaanser

mzaanser4#

您可以通过管道将json传递到主题:

echo '{"test": 1}' | bin/kafka-console-producer --broker-list localhost:9092 --topic test-topic

相关问题