无法从ksqldb中的kafka主题读取消息

woobm2wo  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(304)
{
 "event": {
  "header":{ 
   "name":"abc",
   "version":"1.0",
   "producer":"123",
   "channel":"lab",
   "countryCode":"US"
  },
  "body":{
   "customerIdentifiers":[ 
    {"customerIdentifier":"1234","customerIdType":"cc"},
    {"customerIdentifier":"234", "customerIdType":"id"}
   ],
   "accountIdentifiers":[
    {"accountIdentifier":"123",    "accountIdType":"no"}, 
    {"accountIdentifier":"Primary","accountIdType":"da"}
   ],
   "eventDetails":{
    "transactionDateTime":"2019-03-26 05:28:13.000",
    "transactionDate":"2019-03-26",
    "monthAverage":"188",
    "dailyAverage":"7"
   }
  }
 }
}

已为上述json创建流:

CREATE STREAM STREAM_NAME(
  event STRUCT<
    header STRUCT<
      name VARCHAR,
      version VARCHAR,
      producer VARCHAR,
      channel VARCHAR,
      countryCode VARCHAR
      eventTimeStamp VARCHAR
    >,
    body STRUCT<
      customerIdentifiers STRUCT<
         customerIdentifier VARCHAR,
         customerIdType VARCHAR
      >,
      accountIdentifiers STRUCT<
         accountIdentifier VARCHAR,
         accountIdType VARCHAR
      >,                            
      eventDetails STRUCT<
         transactionDateTime VARCHAR,
         transactionDate VARCHAR,
         productDescription VARCHAR,
         monthAverage VARCHAR,
         dailyAverage VARCHAR
      >
    >
  >
) WITH (
  KAFKA_TOPIC = 'TOPIC1',
  VALUE_FORMAT = 'JSON',
  PARTITIONS = 1
  ;

我无法从流中读取消息:

select * from STREAM_NAME emit changes;

有什么建议吗?

yh2wf1be

yh2wf1be1#

您可能会遇到反序列化错误,因为您创建的流的架构与数据的架构不匹配。
重新格式化示例数据和sql后,错误会变得更加明显:
这个 $.event.body.customerIdentifiers 元素是一个结构数组,但ddl将其定义为一个结构。
这个 $.event.body.accountIdentifiers 元素是一个结构数组,但ddl将其定义为一个结构。
应该起作用的ddl是:

CREATE STREAM STREAM_NAME(
  event STRUCT<
    header STRUCT<
      name VARCHAR,
      version VARCHAR,
      producer VARCHAR,
      channel VARCHAR,
      countryCode VARCHAR
      eventTimeStamp VARCHAR
    >,
    body STRUCT<
      customerIdentifiers ARRAY<STRUCT<
         customerIdentifier VARCHAR,
         customerIdType VARCHAR
      >>,
      accountIdentifiers ARRAY<STRUCT<
         accountIdentifier VARCHAR,
         accountIdType VARCHAR
      >>,                            
      eventDetails STRUCT<
         transactionDateTime VARCHAR,
         transactionDate VARCHAR,
         productDescription VARCHAR,
         monthAverage VARCHAR,
         dailyAverage VARCHAR
      >
    >
  >
) WITH (
  KAFKA_TOPIC = 'TOPIC1',
  VALUE_FORMAT = 'JSON',
  PARTITIONS = 1
  ;

另外请注意,数据不包含ddl中的以下内容(尽管这不会导致任何问题,因为字段只是 NULL ):
$.event.header.eventTimeStamp $.event.body.eventDetails.productDescription

相关问题