kafka connects3 sink-如何使用消息本身的时间戳[timestamp extractor]

qni6mghb  于 2021-06-06  发布在  Kafka
关注(0)|答案(2)|浏览(415)

我一直在努力解决使用Kafka连接和s3接收器的问题。
首先是结构:

{
   Partition: number
   Offset: number
   Key: string
   Message: json string
   Timestamp: timestamp
}

通常在向Kafka发帖时,时间戳应由制作人设置。不幸的是,似乎有这样的情况没有发生。这意味着时间戳有时可能是 null 要提取此时间戳,连接器被设置为以下值: "timestamp.extractor":"Record" .
现在可以肯定的是 Message 字段本身也总是包含时间戳。 Message :

{
   timestamp: "2019-04-02T06:27:02.667Z"
   metadata: {
     creationTimestamp: "1554186422667"
   }
}

但问题是,现在,我想用这个字段 timestamp.extractor 我以为这样就够了,但这似乎行不通:

"timestamp.extractor":"RecordField",
"timestamp.field":"message.timestamp",

这也会导致空指针。
关于如何使用kafka消息负载本身的时间戳,而不是为kafka v0.10设置的默认时间戳字段,有什么想法吗+
编辑:完整配置:

{ "name": "<name>",
  "config": {
    "connector.class":"io.confluent.connect.s3.S3SinkConnector",
    "tasks.max":"4",
    "topics":"<topic>",
    "flush.size":"100",
    "s3.bucket.name":"<bucket name>",
    "s3.region": "<region>",
    "s3.part.size":"<partition size>",
    "rotate.schedule.interval.ms":"86400000",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "false",
    "value.converter.schemas.enable": "false",
    "storage.class":"io.confluent.connect.s3.storage.S3Storage",
    "format.class":"io.confluent.connect.s3.format.json.JsonFormat",
    "locale":"ENGLISH",
    "timezone":"UTC",
    "schema.generator.class":"io.confluent.connect.storage.hive.schema.TimeBasedSchemaGenerator",
    "partitioner.class":"io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
    "partition.duration.ms": "3600000",
    "path.format": "'year'=YYYY/'month'=MM/'day'=dd",
    "timestamp.extractor":"RecordField",
    "timestamp.field":"message.timestamp",
    "max.poll.interval.ms": "600000",
    "request.timeout.ms": "610000",
    "heartbeat.interval.ms": "6000",
    "session.timeout.ms": "20000",
    "s3.acl.canned":"bucket-owner-full-control"
  }
}

编辑2:kafka消息有效负载结构:

{
  "reference": "",
  "clientId": "",
  "gid": "",
  "timestamp": "2019-03-19T15:27:55.526Z",
}

编辑3:

{
"transforms": "convert_op_creationDateTime",
"transforms.convert_op_creationDateTime.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.convert_op_creationDateTime.target.type": "Timestamp",
"transforms.convert_op_creationDateTime.field": "timestamp",
"transforms.convert_op_creationDateTime.format": "yyyy-MM-dd'T'HH:mm:ss.SSSXXX"
}

所以我试着在物体上做了一个变换,但好像我又被困在这个东西上了。这种模式似乎是无效的。环顾互联网,这似乎是一个有效的简化模式。它似乎在抱怨 'T' . 也更新了消息架构。

kq0g1dla

kq0g1dla1#

如果数据是一个字符串,那么connect将尝试解析为毫秒-这里是源代码。
无论如何, message.timestamp 假设数据看起来像 { "message" : { "timestamp": ... } } ,所以 timestamp 是正确的。而且嵌套字段在过去是不可能的,所以您可能需要澄清您使用的是哪个版本的connect。
我不太确定你会怎么做 instanceof Date 在使用json转换器时,即使设置了 schema.enable = true ,那么在代码中,也可以看到只有数字和字符串的模式类型才有条件,但仍然假设它是毫秒。
您可以尝试使用timestampconverter转换来转换日期字符串。

ijnw1ujt

ijnw1ujt2#

根据您共享的架构,您应该设置:

"timestamp.extractor":"RecordField",
    "timestamp.field":"timestamp",

i、 e.不 message 时间戳字段名称的前缀。

相关问题