如何使用kafka控制台生成器将json文件数据插入kafka主题?每个json数据集都可以存储为消息吗?
示例-
{
"id": 1,
"first_name": "John",
"last_name": "Lindt",
"email": "jlindt@gmail.com",
"gender": "Male",
"ip_address": "1.2.3.4"
}
如果你使用这个命令-
cat sampledata.json|kafka-console-producer --broker-list localhost:9092 --topic stream-test-topic
每一行都作为一条单独的消息。
正确的方法是什么?
谢谢!
ps公司-
这个主题正在被ElasticSearch阅读。示例json消息文件-
[{
"id": 1,
"first_name": "John",
"last_name": "Lindt",
"email": "jlindt@gmail.com",
"gender": "Male",
"ip_address": "1.2.3.4"
}, {
"id": 2,
"first_name": "Peter",
"last_name": "Friz",
"email": "Friz3@gmail.com",
"gender": "Male",
"ip_address": "4.5.6.7"
}, {
"id": 3,
"first_name": "Dell",
"last_name": "Chang",
"email": "Dellc@gmail.com",
"gender": "Female",
"ip_address": "8.9.10.11"
}, {
"id": 4,
"first_name": "Lolita",
"last_name": "John",
"email": "LolitaJ@gmail.com",
"gender": "Female",
"ip_address": "12.13.14.15"
}, {
"id": 5,
"first_name": "Pele",
"last_name": "Wang",
"email": "Pele@gmail.com",
"gender": "Male",
"ip_address": "16.17.18.19"
}, {
"id": 6,
"first_name": "Rene",
"last_name": "Charm",
"email": "Rene3@gmail.com",
"gender": "Male",
"ip_address": "20.21.22.23"
4条答案
按热度按时间dwthyt8l1#
从Kafka的观点来看,每条消息都是字节数组。这取决于客户的应用程序(生产者、消费者等),以及如何处理它。kafka生产者、消费者使用反序列化程序、序列化程序将字节数组转换为业务对象(字符串、pojo)
您面临的问题是kafka控制台生产者从标准输入读取消息的方式。默认使用
LineMessageReader
,它将每行视为新消息。您可以实现自己的,或者在发送之前将json中的每一个新行字符翻译成其他空白。例如,可以使用以下命令:
jq -rc . sampledata.json | kafka-console-producer --broker-list localhost:9092 --topic stream-test-topic
nimxete22#
如果文件中有json消息,可以使用以下方式写入kafka主题:
Kafka生产者使用默认值逐行读取消息
LineMessageReader
. 默认的键和值序列化程序是StringSerializer
. 它不会验证是否存在正确的json,而是将其视为原始字符串对象,如发布到kafka主题。但如果您想验证,可以在console producer命令中定义以下配置。例子:
在消费者方面,您可以采用类似的方法。使用jsondeserializer读取数据。
kx7yvsdv3#
我也是Kafka的新手,和你有同样的用例。经过一些研究和开发,我找到了一个简短的答案,可能会帮助你。你可以这样写:
有关详细信息,请单击此处
mzaanser4#
您可以通过管道将json传递到主题: