kafka主题,其中变量嵌套json对象作为ksql db流

niknxzdl  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(273)

我正在尝试加入ksql中现有的两个kafka主题。kafka的一些数据样本(实际值因公司环境而修订):
设备主题:

{
  "persistTime" : "2020-10-06T13:30:25.373Z",
  "previous" : {
    "device" : "REDACTED",
    "type" : "REDACTED",
    "group" : "REDACTED",
    "inventoryState" : "unknown",
    "managementState" : "registered",
    "communicationId" : "REDACTED",
    "manufacturer" : "",
    "description" : "",
    "model" : "",
    "location" : {
      "geo" : {
        "latitude" : "REDACTED",
        "longitude" : "REDACTED"
      },
      "address" : {
        "city" : "",
        "postalCode" : "",
        "street" : "",
        "houseNumber" : "",
        "floor" : "",
        "company" : "",
        "country" : "",
        "reference" : "",
        "timeZone" : "",
        "region" : "",
        "district" : ""
      },
      "logicalInstallationPoint" : ""
    },
    "tags" : [ ]
  },
  "current" : {
    "device" : "REDACTED",
    "type" : "REDACTED",
    "group" : "REDACTED",
    "inventoryState" : "unknown",
    "managementState" : "registered",
    "communicationId" : "REDACTED",
    "manufacturer" : "",
    "description" : "",
    "model" : "",
    "location" : {
      "geo" : {
        "latitude" : "REDACTED",
        "longitude" : "REDACTED"
      },
      "address" : {
        "city" : "",
        "postalCode" : "",
        "street" : "",
        "houseNumber" : "",
        "floor" : "",
        "company" : "",
        "country" : "",
        "reference" : "",
        "timeZone" : "",
        "region" : "",
        "district" : ""
      },
      "logicalInstallationPoint" : ""
    },
    "tags" : [ ]
  }
}

设备事件主题(第一个示例):

{
  "device" : "REDACTED",
  "event" : "403151",
  "firstOccurrenceTime" : "2020-09-30T11:03:50.000Z",
  "lastOccurrenceTime" : "2020-09-30T11:03:50.000Z",
  "occurrenceCount" : 1,
  "receiveTime" : "2020-09-30T11:03:50.000Z",
  "persistTime" : "2020-09-30T14:32:59.580Z",
  "state" : "open",
  "context" : {
    "2" : "25",
    "3" : "0",
    "4" : "60",
    "1" : "REDACTED"
  }
}

设备事件主题(第二个示例):

{
  "device" : "REDACTED",
  "event" : "402004",
  "firstOccurrenceTime" : "2020-10-07T07:02:48Z",
  "lastOccurrenceTime" : "2020-10-07T07:02:48Z",
  "occurrenceCount" : 1,
  "receiveTime" : "2020-10-07T07:02:48Z",
  "persistTime" : "2020-10-07T07:15:28.533Z",
  "state" : "open",
  "context" : {
    "2" : "2020-10-07T07:02:48.0000000Z",
    "1" : "REDACTED"
  }
}

我所面临的问题是 context 在设备事件主题下。
我尝试了以下语句在ksqldb上创建事件流:

CREATE STREAM "events"\
("device" VARCHAR, \
"event" VARCHAR, \
"firstOccurenceTime" VARCHAR, \
"lastOccurenceTime" VARCHAR, \
"occurenceCount" INTEGER, \
"receiveTime" VARCHAR, \
"persistTime" VARCHAR, \
"state" VARCHAR, \
"context" ARRAY<STRING>) \
WITH (KAFKA_TOPIC='device-event', VALUE_FORMAT='JSON');
CREATE STREAM "events"\
("device" VARCHAR, \
"event" VARCHAR, \
"firstOccurenceTime" VARCHAR, \
"lastOccurenceTime" VARCHAR, \
"occurenceCount" INTEGER, \
"receiveTime" VARCHAR, \
"persistTime" VARCHAR, \
"state" VARCHAR, \
"context" STRUCT\
<"1" VARCHAR, \
"2" VARCHAR, \
"3" VARCHAR, \
"4" VARCHAR>) \
WITH (KAFKA_TOPIC='ext_device-event_10195', VALUE_FORMAT='JSON');

第二条语句只引入所有四个上下文变量都存在的数据(“1”、“2”、“3”和“4”)。
如何为设备事件kafka主题创建ksql等价流?

suzh9iv8

suzh9iv81#

你需要使用 MAP 而不是一个 STRUCT .
顺便说一句,你也不需要 \ 行分隔符:)
下面是一个使用ksqldb0.12的工作示例。
将示例数据加载到主题中

kafkacat -b localhost:9092 -P -t events <<EOF
{ "device" : "REDACTED", "event" : "403151", "firstOccurrenceTime" : "2020-09-30T11:03:50.000Z", "lastOccurrenceTime" : "2020-09-30T11:03:50.000Z", "occurrenceCount" : 1, "receiveTime" : "2020-09-30T11:03:50.000Z", "persistTime" : "2020-09-30T14:32:59.580Z", "state" : "open", "context" : { "2" : "25", "3" : "0", "4" : "60", "1" : "REDACTED" } }
{ "device" : "REDACTED", "event" : "402004", "firstOccurrenceTime" : "2020-10-07T07:02:48Z", "lastOccurrenceTime" : "2020-10-07T07:02:48Z", "occurrenceCount" : 1, "receiveTime" : "2020-10-07T07:02:48Z", "persistTime" : "2020-10-07T07:15:28.533Z", "state" : "open", "context" : { "2" : "2020-10-07T07:02:48.0000000Z", "1" : "REDACTED" } }
EOF

在ksqldb中,声明流:

CREATE STREAM "events" (
    "device" VARCHAR,
    "event" VARCHAR,
    "firstOccurenceTime" VARCHAR,
    "lastOccurenceTime" VARCHAR,
    "occurenceCount" INTEGER,
    "receiveTime" VARCHAR,
    "persistTime" VARCHAR,
    "state" VARCHAR,
    "context" MAP < VARCHAR, VARCHAR >
) WITH (KAFKA_TOPIC = 'events', VALUE_FORMAT = 'JSON');

查询流以检查工作情况:

ksql> SET 'auto.offset.reset' = 'earliest';
Successfully changed local property 'auto.offset.reset' to 'earliest'. Use the UNSET command to revert your change.

ksql> SELECT "device", "event", "receiveTime", "state", "context" FROM "events" EMIT CHANGES;
+----------+--------+--------------------------+--------+------------------------------------+
|device    |event   |receiveTime               |state   |context                             |
+----------+--------+--------------------------+--------+------------------------------------+
|REDACTED  |403151  |2020-09-30T11:03:50.000Z  |open    |{1=REDACTED, 2=25, 3=0, 4=60}       |
|REDACTED  |402004  |2020-10-07T07:02:48Z      |open    |{1=REDACTED, 2=2020-10-07T07:02:48.0|
|          |        |                          |        |000000Z}                            |

使用 [''] 访问Map中特定键的语法:

ksql> SELECT "device", "event", "context", "context"['1'] AS CONTEXT_1, "context"['3'] AS CONTEXT_3 FROM "events" EMIT CHANGES;
+-----------+--------+------------------------------------+-----------+-----------+
|device     |event   |context                             |CONTEXT_1  |CONTEXT_3  |
+-----------+--------+------------------------------------+-----------+-----------+
|REDACTED   |403151  |{1=REDACTED, 2=25, 3=0, 4=60}       |REDACTED   |0          |
|REDACTED   |402004  |{1=REDACTED, 2=2020-10-07T07:02:48.0|REDACTED   |null       |
|           |        |000000Z}                            |           |           |

相关问题