无法读取json主题以写入具有udt类型列的cassandra表(在我的主题和表中有很多列,所以我使用了udt类型的列)get below warning和also flatten transform没有帮助,也没有转换。正在删除 value.call.onnet
从Map连接器工作。
[2020-12-08 00:17:12,415] WARN Error decoding/mapping Kafka record SinkRecord{kafkaOffset=358, timestampType=CreateTime} ConnectRecord{topic='account_usage', kafkaPartition=0, key={"number":943821834,"usageDate":20201108}, keySchema=Schema{STRING}, value={"startTime":20201108001019,"endTime":20201108160902,"count":142,"call": { "onnet": { "volume": 3, "unit": "SECOND", "amount": 12.5 }, "offnet": { "volume": 1, "unit": "SECOND", "amount": 2.0 } }, "message": { "roaming": { "volume": 1, "unit": "MSG", "amount": 1.5 }, "local": { "volume": 12, "unit": "MSG", "amount": 3.0 } }, valueSchema=Schema{STRING}, timestamp=1607370857363, headers=ConnectHeaders(headers=)}: Required field 'value.call.onnet' (mapped to column onnet) was missing from record (or may refer to an invalid function). Please remove it from the mapping. (com.datastax.oss.kafka.sink.CassandraSinkTask)
Kafka主题示例-帐户使用
钥匙
{
"number": 943821834,
"usageDate": 20201108
}
价值
{
"startTime": 20201108001019,
"endTime": 20201108160902,
"count": 142,
"call": {
"onnet": {
"volume": 3,
"unit": "SECOND",
"amount": 12.5
},
"offnet": {
"volume": 1,
"unit": "SECOND",
"amount": 2.0
}
},
"message": {
"roaming": {
"volume": 1,
"unit": "MSG",
"amount": 1.5
},
"local": {
"volume": 12,
"unit": "MSG",
"amount": 3.0
}
}
}
cassandra udt(用户定义类型)定义
CREATE TYPE ks_usage.usage_type (
volume bigint,
amount decimal
);
Cassandra表定义
CREATE TABLE ks_usage.usage_call
(
number bigint,
usage_date int,
entry_date timeuuid,
onnet usage_type,
offnet usage_type,
primary key (number, usage_date)
)
WITH CLUSTERING ORDER BY (usage_date DESC)
连接器Map
POST /connectors HTTP/1.1
Host: localhost:8083
Content-Type: application/json
Content-Length: 568
{
"name": "usage-sink",
"config": {
"connector.class": "com.datastax.oss.kafka.sink.CassandraSinkConnector",
"tasks.max": "1",
"topics": "account_usage",
"contactPoints": "10.0.153.27",
"loadBalancing.localDc": "datacenter1",
"port": 9042,
"auth.provider": "PLAIN",
"auth.username": "testusr",
"auth.password": "test",
"topic.account_usage.ks_usage.usage_call.mapping": "number=key.number, usage_date=key.usageDate, entry_date=now(), onnet=value.call.onnet, offnet=value.call.offnet"
}
}
连接器配置-connect-distributed.properties
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# key.converter=org.apache.kafka.connect.storage.StringConverter
# value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
transforms=flatten
transforms.flatten.type=org.apache.kafka.connect.transforms.Flatten$Value
transforms.flatten.delimiter=.
1条答案
按热度按时间bvjveswy1#
谢谢你的详细描述。我在调试你的问题,似乎连接不检查(传播)内部字段。发送到kafka连接器的记录包含以下字段:
你可能会注意到
call
场,但没有像onnet
或者offnet
. 由于这个事实,没有什么可以扁平化的。这些字段不存在,因此它们不能被展平。为了解决您的问题,您可以考虑:移动两者
onnet
和离线one level higher and removing the call value. If you do that, the record will contain
值.onnetand
价值.offnet. You will be able to use the
ks\用法。用法\类型。 创建
call包含两者的udt
onnet以及
offnet. 这样,您将拥有一个包含所有
call价值观。接下来,在Map中,您可以
onnet=value.call` .