来自嵌套json的datastax apache kafka连接器udt字段问题

gv8xihay  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(442)

无法读取json主题以写入具有udt类型列的cassandra表(在我的主题和表中有很多列,所以我使用了udt类型的列)get below warning和also flatten transform没有帮助,也没有转换。正在删除 value.call.onnet 从Map连接器工作。

[2020-12-08 00:17:12,415] WARN Error decoding/mapping Kafka record SinkRecord{kafkaOffset=358, timestampType=CreateTime} ConnectRecord{topic='account_usage', kafkaPartition=0, key={"number":943821834,"usageDate":20201108}, keySchema=Schema{STRING}, value={"startTime":20201108001019,"endTime":20201108160902,"count":142,"call": { "onnet": { "volume": 3, "unit": "SECOND", "amount": 12.5 }, "offnet": { "volume": 1, "unit": "SECOND", "amount": 2.0 } }, "message": { "roaming": { "volume": 1, "unit": "MSG", "amount": 1.5 }, "local": { "volume": 12, "unit": "MSG", "amount": 3.0 } }, valueSchema=Schema{STRING}, timestamp=1607370857363, headers=ConnectHeaders(headers=)}: Required field 'value.call.onnet' (mapped to column onnet) was missing from record (or may refer to an invalid function). Please remove it from the mapping. (com.datastax.oss.kafka.sink.CassandraSinkTask)

Kafka主题示例-帐户使用

钥匙

{
  "number": 943821834,
  "usageDate": 20201108
}

价值

{
  "startTime": 20201108001019,
  "endTime": 20201108160902,
  "count": 142,
  "call": {
    "onnet": {
      "volume": 3,
      "unit": "SECOND",
      "amount": 12.5
    },
    "offnet": {
      "volume": 1,
      "unit": "SECOND",
      "amount": 2.0
    }
  },
  "message": {
    "roaming": {
      "volume": 1,
      "unit": "MSG",
      "amount": 1.5
    },
    "local": {
      "volume": 12,
      "unit": "MSG",
      "amount": 3.0
    }
  }
}

cassandra udt(用户定义类型)定义

CREATE TYPE ks_usage.usage_type (
    volume bigint,
    amount decimal
);

Cassandra表定义

CREATE TABLE ks_usage.usage_call
(
    number          bigint,
    usage_date      int,
    entry_date      timeuuid,
    onnet           usage_type,
    offnet          usage_type,
    primary key (number, usage_date)
)
WITH CLUSTERING ORDER BY (usage_date DESC)

连接器Map

POST /connectors HTTP/1.1
Host: localhost:8083
Content-Type: application/json
Content-Length: 568

{
    "name": "usage-sink",
    "config": {
        "connector.class": "com.datastax.oss.kafka.sink.CassandraSinkConnector",
        "tasks.max": "1",
        "topics": "account_usage",
        "contactPoints": "10.0.153.27",
        "loadBalancing.localDc": "datacenter1",
        "port": 9042,
        "auth.provider": "PLAIN",
        "auth.username": "testusr",
        "auth.password": "test",
        "topic.account_usage.ks_usage.usage_call.mapping": "number=key.number, usage_date=key.usageDate, entry_date=now(), onnet=value.call.onnet, offnet=value.call.offnet"
    }
}

连接器配置-connect-distributed.properties

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

# key.converter=org.apache.kafka.connect.storage.StringConverter

# value.converter=org.apache.kafka.connect.storage.StringConverter

key.converter.schemas.enable=false
value.converter.schemas.enable=false
transforms=flatten
transforms.flatten.type=org.apache.kafka.connect.transforms.Flatten$Value
transforms.flatten.delimiter=.
bvjveswy

bvjveswy1#

谢谢你的详细描述。我在调试你的问题,似乎连接不检查(传播)内部字段。发送到kafka连接器的记录包含以下字段:

value.count, key.usageDate, value.endTime, value.startTime, value.call, value.message, key.number

你可能会注意到 call 场,但没有像 onnet 或者 offnet . 由于这个事实,没有什么可以扁平化的。这些字段不存在,因此它们不能被展平。为了解决您的问题,您可以考虑:
移动两者 onnet 和离线 one level higher and removing the call value. If you do that, the record will contain 值.onnet and 价值.offnet . You will be able to use the ks\用法。用法\类型。 创建call包含两者的udtonnet以及offnet. 这样,您将拥有一个包含所有call价值观。接下来,在Map中,您可以onnet=value.call` .

相关问题