从hdp-2.6.5.0的druid.10.1版本我们使用druid-kafka索引器服务摄取将数据从kafka主题加载到druid中,在这期间我们发现druid存储的度量值为0或0.0被存储为null,并且在通过超集或druid-api检索时得到的响应为null。如果我们遗漏了什么,我需要建议。
超集出错: {"status": "failed", "error_type": "warning", "error": "unsupported operand type(s) for +: 'int' and 'NoneType'"}
摄入规范文件如下:
{
"type": "kafka",
"dataSchema": {
"dataSource": "data-source",
"parser": {
"type": "string",
"parseSpec": {
"format": "json",
"timestampSpec": {
"column": "datetime",
"format": "YYYYMMdd_HHmmss"
},
"columns": [
"created_date",
"s_type",
"datetime",
"ds_ser",
"ven",
"cou_name",
"c_name",
"d_name",
"dv_name",
"p_name",
"redTime",
"wrTime",
"tRate",
"MTRate"
],
"dimensionsSpec": {
"dimensions": [
"created_date",
"s_type",
"datetime",
"ds_ser",
"ven",
"cou_name",
"c_name",
"d_name",
"dv_name",
"p_name",
]
}
}
},
"metricsSpec": [{
"name": "count",
"type": "count"
},
{
"type": "doubleMax",
"name": "redTime",
"fieldName": "redTime"
},
{
"type": "doubleMax",
"name": "wrTime",
"fieldName": "wrTime"
},
{
"type": "longMax",
"name": "tRate",
"fieldName": "tRate"
},
{
"type": "longMax",
"name": "MTRate",
"fieldName": "MTRate"
}
],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "HOUR",
"queryGranularity": "NONE"
}
},
"tuningConfig": {
"type": "kafka",
"maxRowsPerSegment": 5000000
},
"ioConfig": {
"topic": "ptopic",
"useEarliestOffset": "true",
"consumerProperties": {
"bootstrap.servers": "host:port"
},
"taskCount": 1,
"replicas": 1,
"taskDuration": "PT5M"
}
}
使用Druid的rest api:http://host:port/druid/v2?漂亮
正文:
{
"queryType": "groupBy",
"dataSource": "data-source",
"granularity": "all",
"dimensions": ["ds_ser"],
"aggregations": [
{"type": "doubleMax", "name": "redTime", "redTime": "writeresponsetime"},
{"type": "doubleMax", "name": "wrTime", "wrTime": "totalResponseTime"},
{"type": "longMax", "name": "tRate", "fieldName": "tRate"},
{"type": "longMax", "name": "MTRate", "MTRate": "MaxTransferRate"}
],
"intervals": ["2019-01-02T00:00/2019-01-02T23:59"]
}
Druid的回应:
[
{
"version": "v1",
"timestamp": "2019-01-02T00:00:00.000Z",
"event": {
"redTime": null,
"ds_ser": "240163",
"wrTime": null,
"tRate": null,
"MTRate": null
}
},
{
"version": "v1",
"timestamp": "2019-01-02T00:00:00.000Z",
"event": {
"redTime": null,
"ds_ser": "443548",
"wrTime": null,
"tRate": 0,
"MTRate": null
}
}
]
Kafka资料:
> {"created_date":"2019-02-03T18:35:59.514Z","s_type":"BLOCK","datetime":"20181121_070000","ds_ser":"443551","ven":"abc","cou_name":"USA","c_name":"Piscataway","d_name":"Piscataway","dv_name":"USPSCG","p_name":"443551-CK","redTime":0.0,"wrTime":0.0,"tRate":0,"MTRate":0}
> {"created_date":"2019-02-03T18:35:59.514Z","s_type":"BLOCK","datetime":"20181121_070000","ds_ser":"443551","ven":"abc","cou_name":"USA","c_name":"Piscataway","d_name":"Piscataway","dv_name":"USPSCG4","p_name":"443551-CF","redTime":0.0,"wrTime":0.0,"tRate":0,"MTRate":0}
1条答案
按热度按时间6ie5vjzr1#
我已经找到了我自己问题的答案。
我在编写DruidKafkainderex json时犯了错误。我不知道这些字段是区分大小写的。这里发布的json片段是虚构的,因此字段名是匹配的,但在我的实际生产代码和json文件中,这些字段不匹配,因此druid假设这些字段是新字段,并在接收它们时将值分配为null。示例如下:
Kafkajson:
{"created_date":"2019-02-03T18:35:59.514Z","s_type":"BLOCK","datetime":"20181121_070000","ds_ser":"443551","ven":"","cou_name":"USA","c_name":"Piscataway","d_name":"Piscataway","dv_name":"USPSCG","p_name":"443551-CK","redTime":0.0,"wrTime":0.0,"tRate":0,"MTRate":0}
druid索引器json列如下:"columns": [ "created_date", "s_type", "datetime", "ds_ser", "ven", "cou_name", "c_name", "d_name", "dv_name", "p_name", "redTime", "wrtime", "trate", "MTRate" ],
如果我们观察到上面有一个不匹配wrTime --> wrtime
以及tRate --> trate
. 所以对我来说,这是根本原因,一旦Druid开始吸收正确的值。