我正在尝试从一些kakfa消息创建一个json格式的流,比如:
"beat": {
"name": "xxxxxxx",
"hostname": "xxxxxxxxxx",
"version": "zzzzz"
},
"log_instance": "forwarder-2",
"type": "prod",
"message": "{ ... json string.... }",
"@timestamp": "2020-06-14T23:31:33.925Z",
"input_type": "log",
"@version": "1"
}
我试过用
CREATE STREAM S (
beat STRUCT<
name VARCHAR,
hostname VARCHAR,
version VARCHAR
>,
log_instance VARCHAR,
type VARCHAR,
message VARCHAR, # for brevity - I also have this with a struct
@timestamp VARCHAR,
input_type VARCHAR,
@version` VARCHAR )
WITH (KAFKA_TOPIC='some_topic', VALUE_FORMAT='JSON');
但是我得到一个错误:
Caused by: line 10:5: extraneous input '@' expecting ....
我把引用和前面的下划线连在一起,但运气不好。我也尝试过在注册表中创建一个条目,但是我不能用这种方式创建legit avro。
如何将主题“绑定”到注册表模式?
谢谢。
1条答案
按热度按时间d6kp6zgx1#
如果您使用的是足够新的ksqldb版本,那么简单地用无效字符引用列名就可以了:
如果上述方法不起作用,那么很可能您使用的是旧版本的ksqldb。升级应该可以解决这个问题。
如何将主题“绑定”到注册表模式?
如果您使用
JSON_SR
格式,而不仅仅是JSON
. 后者只支持从模式注册表读取模式。如果您更想知道如何在sr中为现有主题注册模式。。。那你最好去看医生。注意,ksqldb只支持
TopicNameStrategy
命名策略。值模式具有主题{topic-name}-value
,例如,下面为test
主题的价值观。有关更多信息,请参见sr教程:https://docs.confluent.io/current/schema-registry/schema_registry_tutorial.html
我也尝试过在注册表中创建一个条目,但是我不能用这种方式创建legit avro。
avro不允许
@
在其字段名称中。但是,看起来您的数据是json格式的,这允许@
. 请参阅上面的curl示例,了解如何注册json模式。