我是nifi的新手,在我的例子中,必须使用kafka的json主题,并希望将其转换为csv文件,在该文件中,我只需要选择几个标量和一些嵌套字段。
我需要做以下事情:
Consume topic - Done
Json to CSV
Include header in the CSV file
Merge into single file (if its split)
Give a proper filename with date
点击以下链接:https://community.hortonworks.com/articles/64069/converting-a-large-json-file-into-csv.html
但不确定这是否是正确的方法,也不知道如何制作单个文件。我正在使用 NiFi (1.8) and schema is stored in Confluent Schema Registry
json示例:
{
"type" : "record",
"name" : "Customer",
"namespace" : "namespace1"
"fields" : [ {
"name" : "header",
"type" : {
"type" : "record",
"name" : "CustomerDetails",
"namespace" : "namespace1"
"fields" : [ {
"name" : "Id",
"type" : "string"
}, {
"name" : "name",
"type" : "string"
}, {
"name" : "age",
"type" : [ "null", "int" ],
"default" : null
}, {
"name" : "comm",
"type" : [ "null", "int" ],
"default" : null
} ]
},
"doc" : ""
}, {
"name" : "data",
"type" : {
"type" : "record",
"name" : "CustomerData"
"fields" : [ {
"name" : "tags",
"type" : {
"type" : "map",
"values" : "string"
}
}, {
"name" : "data",
"type" : [ "null", "bytes" ]
"default" : null
} ]
},
"doc" : ""
} ]
}
请引导我。
1条答案
按热度按时间3ks5zfa01#
使用jsonterereader和csvrecordsetwriter尝试convertrecord。可以将writer配置为包含标题行,而不需要拆分/合并。updateattribute可用于设置
filename
属性,它是流文件的关联文件名(例如,由putfile使用)。如果convertrecord不能提供所需的输出,您能否详细说明它所提供的内容与您期望的内容之间的区别?