我一直在使用ksql,到目前为止它工作得很好。但是现在我想通过kafka connect将输出接收到bigquery,并且需要附加一个json模式。我不知道怎么做。我的问题是:
CREATE STREAM tweets_original (
CreatedAt BIGINT,
Id BIGINT,
Text VARCHAR,
Source VARCHAR,
GeoLocation VARCHAR,
User STRUCT<Id BIGINT, Name VARCHAR, Description VARCHAR, ScreenName VARCHAR, URL VARCHAR, FollowersCount BIGINT, FriendsCount BIGINT>
)
WITH (kafka_topic='tweets', value_format='JSON');
CREATE STREAM tweets_new
WITH (kafka_topic='tweets-new') AS
SELECT
CreatedAt as created_at,
Id as tweet_id,
Text as tweet_text,
Source as source,
GeoLocation as geo_location,
User->Id as user_id,
User->Name as user_name,
User->Description as user_description,
User->ScreenName as user_screenname
FROM tweets_original ;
下面是一个写入输出主题的记录示例( tweets-new
).
{
"CREATED_AT": 1535036410000,
"TWEET_ID": 1032643668614819800,
"TWEET_TEXT": "Sample text",
"SOURCE": "<a href=\"http://twitter.com\" rel=\"nofollow\">Twitter Web Client</a>",
"GEO_LOCATION": null,
"USER_ID": 123,
"USER_NAME": "John Smith",
"USER_DESCRIPTION": "Developer in Chief",
"USER_SCREENNAME": "newphonewhodis"
}
但是,为了让kafka connect将这些记录接收到bigquery,我需要附加一个模式,如下所示:
{
"schema": {
"type": "struct",
"fields": [
{
"type": "int64",
"optional": false,
"field": "CREATED_AT"
},
{
"type": "int64",
"optional": false,
"field": "TWEET_ID"
},
{
"type": "string",
"optional": false,
"field": "TWEET_TEXT"
}
...
],
"optional": false,
"name": "foobar"
},
"payload": {...}
}
无论如何,我在文档中没有看到任何东西表明我可能会如何处理这个问题(也许我找错地方了)。任何帮助都将不胜感激!
1条答案
按热度按时间y1aodyip1#
这是一个简单的ksql解决方案,只需将第二个流更新为avro。
然后在kafka connect配置中,可以使用avroconvertor并允许在google big query中进行模式演化/管理。