我一直在努力解决使用Kafka连接和s3接收器的问题。
首先是结构:
{
Partition: number
Offset: number
Key: string
Message: json string
Timestamp: timestamp
}
通常在向Kafka发帖时,时间戳应由制作人设置。不幸的是,似乎有这样的情况没有发生。这意味着时间戳有时可能是 null
要提取此时间戳,连接器被设置为以下值: "timestamp.extractor":"Record"
.
现在可以肯定的是 Message
字段本身也总是包含时间戳。 Message
:
{
timestamp: "2019-04-02T06:27:02.667Z"
metadata: {
creationTimestamp: "1554186422667"
}
}
但问题是,现在,我想用这个字段 timestamp.extractor
我以为这样就够了,但这似乎行不通:
"timestamp.extractor":"RecordField",
"timestamp.field":"message.timestamp",
这也会导致空指针。
关于如何使用kafka消息负载本身的时间戳,而不是为kafka v0.10设置的默认时间戳字段,有什么想法吗+
编辑:完整配置:
{ "name": "<name>",
"config": {
"connector.class":"io.confluent.connect.s3.S3SinkConnector",
"tasks.max":"4",
"topics":"<topic>",
"flush.size":"100",
"s3.bucket.name":"<bucket name>",
"s3.region": "<region>",
"s3.part.size":"<partition size>",
"rotate.schedule.interval.ms":"86400000",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"storage.class":"io.confluent.connect.s3.storage.S3Storage",
"format.class":"io.confluent.connect.s3.format.json.JsonFormat",
"locale":"ENGLISH",
"timezone":"UTC",
"schema.generator.class":"io.confluent.connect.storage.hive.schema.TimeBasedSchemaGenerator",
"partitioner.class":"io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"partition.duration.ms": "3600000",
"path.format": "'year'=YYYY/'month'=MM/'day'=dd",
"timestamp.extractor":"RecordField",
"timestamp.field":"message.timestamp",
"max.poll.interval.ms": "600000",
"request.timeout.ms": "610000",
"heartbeat.interval.ms": "6000",
"session.timeout.ms": "20000",
"s3.acl.canned":"bucket-owner-full-control"
}
}
编辑2:kafka消息有效负载结构:
{
"reference": "",
"clientId": "",
"gid": "",
"timestamp": "2019-03-19T15:27:55.526Z",
}
编辑3:
{
"transforms": "convert_op_creationDateTime",
"transforms.convert_op_creationDateTime.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.convert_op_creationDateTime.target.type": "Timestamp",
"transforms.convert_op_creationDateTime.field": "timestamp",
"transforms.convert_op_creationDateTime.format": "yyyy-MM-dd'T'HH:mm:ss.SSSXXX"
}
所以我试着在物体上做了一个变换,但好像我又被困在这个东西上了。这种模式似乎是无效的。环顾互联网,这似乎是一个有效的简化模式。它似乎在抱怨 'T'
. 也更新了消息架构。
2条答案
按热度按时间kq0g1dla1#
如果数据是一个字符串,那么connect将尝试解析为毫秒-这里是源代码。
无论如何,
message.timestamp
假设数据看起来像{ "message" : { "timestamp": ... } }
,所以timestamp
是正确的。而且嵌套字段在过去是不可能的,所以您可能需要澄清您使用的是哪个版本的connect。我不太确定你会怎么做
instanceof Date
在使用json转换器时,即使设置了schema.enable = true
,那么在代码中,也可以看到只有数字和字符串的模式类型才有条件,但仍然假设它是毫秒。您可以尝试使用timestampconverter转换来转换日期字符串。
ijnw1ujt2#
根据您共享的架构,您应该设置:
i、 e.不
message
时间戳字段名称的前缀。