我创建了一个源主题订阅服务器,其输入消息如下:
{
"ip_router": "",
"ip_lan": "",
"isdn": "2046573688",
"end_datetime": "",
"shop_code": "1000405100",
"reg_type_id": "5131615",
"contract_id": "",
"update_datetime": "20170801171355",
"project": "",
"telecom_service_id": "2",
"local_speed": "",
"password": "",
"price_plan": "",
"vip": "",
"local_price_plan": "",
"sub_id": "1083168000",
"sta_datetime": "20090511152847",
"update_number_1": "1",
"act_status": "000",
"network_class": "",
"limit_usage": "",
"num_reset_zone": "",
"deposit": "",
"create_user": "TUDV_POPBGG",
"num_of_computer": "",
"cust_id": "10922428129",
"status": "2",
"active_datetime": "20090511152102",
"ip_view": "",
"channel_type_id": "",
"ip_wan": "",
"imsi": "452049760887694",
"infrastructure_type": "",
"product_code": "HPN03",
"expire_datetime": "",
"speed": "",
"private_ip": "",
"update_user": "MIGRATE",
"ip_static": "",
"vlan": "",
"sub_type": "",
"create_datetime": "20090511152102",
"is_info_completed": "1",
"pay_type": "2",
"up_link": "",
"promotion_code": "",
"technology": "",
"offer_id": "400001035",
"dev_staff_id": "",
"account_id": "",
"deploy_accept_date": "",
"serial": "8984049767000887694",
"group_id": "",
"ip_gateway": "",
"first_connect": "",
"org_product_code": "MIGRATE",
"start_money": "100000",
"keep_alive": "",
"account": ""
}
然后我在上面创建了一个流和一个表:
CREATE STREAM str_subscriber_json (sub_id VARCHAR,contract_id VARCHAR,cust_id VARCHAR,account_id VARCHAR,telecom_service_id VARCHAR,isdn VARCHAR,imsi VARCHAR,serial VARCHAR,status VARCHAR,product_code VARCHAR,offer_id VARCHAR,act_status VARCHAR,sta_datetime BIGINT,active_datetime BIGINT,sub_type VARCHAR,end_datetime BIGINT,expire_datetime BIGINT,shop_code VARCHAR,dev_staff_id VARCHAR,promotion_code VARCHAR,vip VARCHAR,account VARCHAR,create_datetime BIGINT,create_user VARCHAR,update_datetime BIGINT,update_user VARCHAR,deposit VARCHAR,limit_usage VARCHAR,password VARCHAR,org_product_code VARCHAR,num_reset_zone VARCHAR,start_money VARCHAR,is_info_completed VARCHAR,channel_type_id VARCHAR,first_connect VARCHAR,speed VARCHAR,keep_alive VARCHAR,price_plan VARCHAR,local_price_plan VARCHAR,project VARCHAR,local_speed VARCHAR,technology VARCHAR,network_class VARCHAR,infrastructure_type VARCHAR,deploy_accept_date BIGINT,group_id VARCHAR,ip_static VARCHAR,ip_view VARCHAR,ip_lan VARCHAR,ip_wan VARCHAR,ip_gateway VARCHAR,ip_router VARCHAR,num_of_computer VARCHAR,vlan VARCHAR,up_link VARCHAR,private_ip VARCHAR,pay_type VARCHAR,update_number_1 VARCHAR,reg_type_id VARCHAR,type VARCHAR,update_number VARCHAR) WITH (KAFKA_TOPIC='subscriber', VALUE_FORMAT='JSON');
CREATE TABLE tbl_subscriber_json (sub_id VARCHAR,contract_id VARCHAR,cust_id VARCHAR,account_id VARCHAR,telecom_service_id VARCHAR,isdn VARCHAR,imsi VARCHAR,serial VARCHAR,status VARCHAR,product_code VARCHAR,offer_id VARCHAR,act_status VARCHAR,sta_datetime BIGINT,active_datetime BIGINT,sub_type VARCHAR,end_datetime BIGINT,expire_datetime BIGINT,shop_code VARCHAR,dev_staff_id VARCHAR,promotion_code VARCHAR,vip VARCHAR,account VARCHAR,create_datetime BIGINT,create_user VARCHAR,update_datetime BIGINT,update_user VARCHAR,deposit VARCHAR,limit_usage VARCHAR,password VARCHAR,org_product_code VARCHAR,num_reset_zone VARCHAR,start_money VARCHAR,is_info_completed VARCHAR,channel_type_id VARCHAR,first_connect VARCHAR,speed VARCHAR,keep_alive VARCHAR,price_plan VARCHAR,local_price_plan VARCHAR,project VARCHAR,local_speed VARCHAR,technology VARCHAR,network_class VARCHAR,infrastructure_type VARCHAR,deploy_accept_date BIGINT,group_id VARCHAR,ip_static VARCHAR,ip_view VARCHAR,ip_lan VARCHAR,ip_wan VARCHAR,ip_gateway VARCHAR,ip_router VARCHAR,num_of_computer VARCHAR,vlan VARCHAR,up_link VARCHAR,private_ip VARCHAR,pay_type VARCHAR,update_number_1 VARCHAR,reg_type_id VARCHAR,type VARCHAR,update_number VARCHAR) WITH (KAFKA_TOPIC='subscriber', VALUE_FORMAT='JSON', KEY = 'sub_id' );
我尝试使用ksql进行测试:
SELECT * FROM str_subscriber_json;
(当我将新的json放入订阅主题时打印结果)
SELECT * FROM tbl_subscriber_json;
(在订阅主题中添加新的json时,没有显示任何内容)
所以请澄清一下这个案子到底出了什么问题?
非常感谢你。
1条答案
按热度按时间bxjv4tth1#
摘要
你的信息需要加密。如果没有消息键,那么表的语义就没有任何意义(因为如果没有键,就不能显示键的值)。
我复制了你的例子,使用
kafkacat
产生有或没有钥匙的信息。测试1-无钥匙
生成测试消息
流输出
注意
null
在第二列-这是键(第一列是消息的时间戳;其余的列是消息中声明的字段)表格输出
(无输出)
测试2-带钥匙组
这里的键被任意设置为
1
,使用kafkacat
的-K
要指定的标志:
作为键/值分隔符。流输出
注意
1
在第二列-这是键(第一列是消息的时间戳;其余的列是消息中声明的字段)表格输出
使用ksql自动重新键入主题
可以使用ksql重新划分主题。例如,把你的消息来源
subscriber
主题,下面是如何使用ksql对其重新分区以设置密钥:这填充了一个Kafka主题(
SUBSCRIBER_KEYED
)然后可以在其上定义表:现在当你给
subscriber
,即使未设置键,该表也将工作: