我正在尝试加入以下ksql表:
CREATE TABLE devices
("current" STRUCT<
"device" VARCHAR,
"group" VARCHAR,
"inventoryState" VARCHAR,
"location" STRUCT<
"geo" STRUCT<
"latitude" DOUBLE,
"longitude" DOUBLE>,
"address" STRUCT<
"city" VARCHAR,
"postalCode" VARCHAR,
"street" VARCHAR,
"houseNumber" VARCHAR,
"floor" VARCHAR,
"company" VARCHAR,
"country" VARCHAR,
"reference" VARCHAR,
"timeZone" VARCHAR,
"region" VARCHAR,
"district" VARCHAR>
>
>)
WITH (KAFKA_TOPIC='device', VALUE_FORMAT='JSON');
... 使用以下ksql流:
CREATE STREAM "events" (
"device" VARCHAR,
"event" VARCHAR,
"firstOccurenceTime" VARCHAR,
"lastOccurenceTime" VARCHAR,
"occurenceCount" INTEGER,
"receiveTime" VARCHAR,
"persistTime" VARCHAR,
"state" VARCHAR,
"context" MAP < VARCHAR, VARCHAR >)
WITH (KAFKA_TOPIC = 'device-event', VALUE_FORMAT = 'JSON');
... 包括 location
结构到一个新的流(丰富的事件)。
这是我正在执行的ksql select语句,用于测试新流:
SELECT devices."current"->"device" AS "device",
devices."current"->"location" AS "location",
"event",
"firstOccurenceTime",
"lastOccurenceTime",
"receiveTime",
"persistTime",
"state",
"context"
FROM "events"
INNER JOIN devices ON "events".ROWKEY = devices.ROWKEY
EMIT CHANGES;
我没有得到任何数据,即使在设置 auto.offset.reset
至 earliest
.
我查过了 device
table和table device-event
流中充满了数据。
我做错什么了?
更新
的示例数据 devices
表(根据公司政策修订的敏感值):
{
"persistTime" : "2020-10-12T11:48:23.384Z",
"previous" : {
"device" : "REDACTED",
"type" : "REDACTED",
"group" : "REDACTED",
"inventoryState" : "unknown",
"managementState" : "connected",
"communicationId" : "REDACTED",
"manufacturer" : "",
"description" : "",
"model" : "",
"location" : {
"geo" : {
"latitude" : "REDACTED",
"longitude" : "REDACTED"
},
"address" : {
"city" : "",
"postalCode" : "",
"street" : "",
"houseNumber" : "",
"floor" : "",
"company" : "",
"country" : "",
"reference" : "",
"timeZone" : "",
"region" : "",
"district" : ""
},
"logicalInstallationPoint" : ""
},
"tags" : [ ]
},
"current" : {
"device" : "REDACTED",
"type" : "REDACTED",
"group" : "REDACTED",
"inventoryState" : "unknown",
"managementState" : "connected",
"communicationId" : "REDACTED",
"manufacturer" : "",
"description" : "",
"model" : "",
"location" : {
"geo" : {
"latitude" : "REDACTED",
"longitude" : "REDACTED"
},
"address" : {
"city" : "",
"postalCode" : "",
"street" : "",
"houseNumber" : "",
"floor" : "",
"company" : "",
"country" : "",
"reference" : "",
"timeZone" : "",
"region" : "",
"district" : ""
},
"logicalInstallationPoint" : ""
},
"tags" : [ ]
}
}
暂无答案!
目前还没有任何答案,快来回答吧!