使用ksqldb连接流和表

pkln4tw6  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(281)

我正在尝试加入以下ksql表:

CREATE TABLE devices
    ("current" STRUCT<
        "device" VARCHAR, 
        "group" VARCHAR, 
        "inventoryState" VARCHAR, 
        "location" STRUCT<
            "geo" STRUCT<
                "latitude" DOUBLE, 
                "longitude" DOUBLE>, 
            "address" STRUCT<
                "city" VARCHAR, 
                "postalCode" VARCHAR, 
                "street" VARCHAR, 
                "houseNumber" VARCHAR, 
                "floor" VARCHAR, 
                "company" VARCHAR, 
                "country" VARCHAR, 
                "reference" VARCHAR, 
                "timeZone" VARCHAR, 
                "region" VARCHAR, 
                "district" VARCHAR>
            >
        >)
WITH (KAFKA_TOPIC='device', VALUE_FORMAT='JSON');

... 使用以下ksql流:

CREATE STREAM "events" (
    "device" VARCHAR,
    "event" VARCHAR,
    "firstOccurenceTime" VARCHAR,
    "lastOccurenceTime" VARCHAR,
    "occurenceCount" INTEGER,
    "receiveTime" VARCHAR,
    "persistTime" VARCHAR,
    "state" VARCHAR,
    "context" MAP < VARCHAR, VARCHAR >) 
WITH (KAFKA_TOPIC = 'device-event', VALUE_FORMAT = 'JSON');

... 包括 location 结构到一个新的流(丰富的事件)。
这是我正在执行的ksql select语句,用于测试新流:

SELECT devices."current"->"device" AS "device",
    devices."current"->"location" AS "location",
    "event",
    "firstOccurenceTime",
    "lastOccurenceTime",
    "receiveTime",
    "persistTime",
    "state",
    "context"
FROM "events"
INNER JOIN devices ON "events".ROWKEY = devices.ROWKEY
EMIT CHANGES;

我没有得到任何数据,即使在设置 auto.offset.resetearliest .
我查过了 device table和table device-event 流中充满了数据。
我做错什么了?
更新
的示例数据 devices 表(根据公司政策修订的敏感值):

{
  "persistTime" : "2020-10-12T11:48:23.384Z",
  "previous" : {
    "device" : "REDACTED",
    "type" : "REDACTED",
    "group" : "REDACTED",
    "inventoryState" : "unknown",
    "managementState" : "connected",
    "communicationId" : "REDACTED",
    "manufacturer" : "",
    "description" : "",
    "model" : "",
    "location" : {
      "geo" : {
        "latitude" : "REDACTED",
        "longitude" : "REDACTED"
      },
      "address" : {
        "city" : "",
        "postalCode" : "",
        "street" : "",
        "houseNumber" : "",
        "floor" : "",
        "company" : "",
        "country" : "",
        "reference" : "",
        "timeZone" : "",
        "region" : "",
        "district" : ""
      },
      "logicalInstallationPoint" : ""
    },
    "tags" : [ ]
  },
  "current" : {
    "device" : "REDACTED",
    "type" : "REDACTED",
    "group" : "REDACTED",
    "inventoryState" : "unknown",
    "managementState" : "connected",
    "communicationId" : "REDACTED",
    "manufacturer" : "",
    "description" : "",
    "model" : "",
    "location" : {
      "geo" : {
        "latitude" : "REDACTED",
        "longitude" : "REDACTED"
      },
      "address" : {
        "city" : "",
        "postalCode" : "",
        "street" : "",
        "houseNumber" : "",
        "floor" : "",
        "company" : "",
        "country" : "",
        "reference" : "",
        "timeZone" : "",
        "region" : "",
        "district" : ""
      },
      "logicalInstallationPoint" : ""
    },
    "tags" : [ ]
  }
}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题