ksql,在表上选择不显示任何内容

dauxcl2d  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(260)

我创建了一个源主题订阅服务器,其输入消息如下:

{
  "ip_router": "",
  "ip_lan": "",
  "isdn": "2046573688",
  "end_datetime": "",
  "shop_code": "1000405100",
  "reg_type_id": "5131615",
  "contract_id": "",
  "update_datetime": "20170801171355",
  "project": "",
  "telecom_service_id": "2",
  "local_speed": "",
  "password": "",
  "price_plan": "",
  "vip": "",
  "local_price_plan": "",
  "sub_id": "1083168000",
  "sta_datetime": "20090511152847",
  "update_number_1": "1",
  "act_status": "000",
  "network_class": "",
  "limit_usage": "",
  "num_reset_zone": "",
  "deposit": "",
  "create_user": "TUDV_POPBGG",
  "num_of_computer": "",
  "cust_id": "10922428129",
  "status": "2",
  "active_datetime": "20090511152102",
  "ip_view": "",
  "channel_type_id": "",
  "ip_wan": "",
  "imsi": "452049760887694",
  "infrastructure_type": "",
  "product_code": "HPN03",
  "expire_datetime": "",
  "speed": "",
  "private_ip": "",
  "update_user": "MIGRATE",
  "ip_static": "",
  "vlan": "",
  "sub_type": "",
  "create_datetime": "20090511152102",
  "is_info_completed": "1",
  "pay_type": "2",
  "up_link": "",
  "promotion_code": "",
  "technology": "",
  "offer_id": "400001035",
  "dev_staff_id": "",
  "account_id": "",
  "deploy_accept_date": "",
  "serial": "8984049767000887694",
  "group_id": "",
  "ip_gateway": "",
  "first_connect": "",
  "org_product_code": "MIGRATE",
  "start_money": "100000",
  "keep_alive": "",
  "account": ""
}

然后我在上面创建了一个流和一个表:

CREATE STREAM str_subscriber_json (sub_id VARCHAR,contract_id VARCHAR,cust_id VARCHAR,account_id VARCHAR,telecom_service_id VARCHAR,isdn VARCHAR,imsi VARCHAR,serial VARCHAR,status VARCHAR,product_code VARCHAR,offer_id VARCHAR,act_status VARCHAR,sta_datetime BIGINT,active_datetime BIGINT,sub_type VARCHAR,end_datetime BIGINT,expire_datetime BIGINT,shop_code VARCHAR,dev_staff_id VARCHAR,promotion_code VARCHAR,vip VARCHAR,account VARCHAR,create_datetime BIGINT,create_user VARCHAR,update_datetime BIGINT,update_user VARCHAR,deposit VARCHAR,limit_usage VARCHAR,password VARCHAR,org_product_code VARCHAR,num_reset_zone VARCHAR,start_money VARCHAR,is_info_completed VARCHAR,channel_type_id VARCHAR,first_connect VARCHAR,speed VARCHAR,keep_alive VARCHAR,price_plan VARCHAR,local_price_plan VARCHAR,project VARCHAR,local_speed VARCHAR,technology VARCHAR,network_class VARCHAR,infrastructure_type VARCHAR,deploy_accept_date BIGINT,group_id VARCHAR,ip_static VARCHAR,ip_view VARCHAR,ip_lan VARCHAR,ip_wan VARCHAR,ip_gateway VARCHAR,ip_router VARCHAR,num_of_computer VARCHAR,vlan VARCHAR,up_link VARCHAR,private_ip VARCHAR,pay_type VARCHAR,update_number_1 VARCHAR,reg_type_id VARCHAR,type VARCHAR,update_number VARCHAR) WITH (KAFKA_TOPIC='subscriber', VALUE_FORMAT='JSON');

CREATE TABLE tbl_subscriber_json (sub_id VARCHAR,contract_id VARCHAR,cust_id VARCHAR,account_id VARCHAR,telecom_service_id VARCHAR,isdn VARCHAR,imsi VARCHAR,serial VARCHAR,status VARCHAR,product_code VARCHAR,offer_id VARCHAR,act_status VARCHAR,sta_datetime BIGINT,active_datetime BIGINT,sub_type VARCHAR,end_datetime BIGINT,expire_datetime BIGINT,shop_code VARCHAR,dev_staff_id VARCHAR,promotion_code VARCHAR,vip VARCHAR,account VARCHAR,create_datetime BIGINT,create_user VARCHAR,update_datetime BIGINT,update_user VARCHAR,deposit VARCHAR,limit_usage VARCHAR,password VARCHAR,org_product_code VARCHAR,num_reset_zone VARCHAR,start_money VARCHAR,is_info_completed VARCHAR,channel_type_id VARCHAR,first_connect VARCHAR,speed VARCHAR,keep_alive VARCHAR,price_plan VARCHAR,local_price_plan VARCHAR,project VARCHAR,local_speed VARCHAR,technology VARCHAR,network_class VARCHAR,infrastructure_type VARCHAR,deploy_accept_date BIGINT,group_id VARCHAR,ip_static VARCHAR,ip_view VARCHAR,ip_lan VARCHAR,ip_wan VARCHAR,ip_gateway VARCHAR,ip_router VARCHAR,num_of_computer VARCHAR,vlan VARCHAR,up_link VARCHAR,private_ip VARCHAR,pay_type VARCHAR,update_number_1 VARCHAR,reg_type_id VARCHAR,type VARCHAR,update_number VARCHAR) WITH (KAFKA_TOPIC='subscriber', VALUE_FORMAT='JSON', KEY = 'sub_id' );

我尝试使用ksql进行测试:

SELECT * FROM str_subscriber_json;

(当我将新的json放入订阅主题时打印结果)

SELECT * FROM tbl_subscriber_json;

(在订阅主题中添加新的json时,没有显示任何内容)
所以请澄清一下这个案子到底出了什么问题?
非常感谢你。

bxjv4tth

bxjv4tth1#

摘要

你的信息需要加密。如果没有消息键,那么表的语义就没有任何意义(因为如果没有键,就不能显示键的值)。
我复制了你的例子,使用 kafkacat 产生有或没有钥匙的信息。

测试1-无钥匙

生成测试消息

$ echo '{ "ip_router": "", "ip_lan": "", "isdn": "2046573688", "end_datetime": "", "shop_code": "1000405100", "reg_type_id": "5131615", "contract_id": "", "update_datetime": "20170801171355", "project": "", "telecom_service_id": "2", "local_speed": "", "password": "", "price_plan": "", "vip": "", "local_price_plan": "", "sub_id": "1083168000", "sta_datetime": "20090511152847", "update_number_1": "1", "act_status": "000", "network_class": "", "limit_usage": "", "num_reset_zone": "", "deposit": "", "create_user": "TUDV_POPBGG", "num_of_computer": "", "cust_id": "10922428129", "status": "2", "active_datetime": "20090511152102", "ip_view": "", "channel_type_id": "", "ip_wan": "", "imsi": "452049760887694", "infrastructure_type": "", "product_code": "HPN03", "expire_datetime": "", "speed": "", "private_ip": "", "update_user": "MIGRATE", "ip_static": "", "vlan": "", "sub_type": "", "create_datetime": "20090511152102", "is_info_completed": "1", "pay_type": "2", "up_link": "", "promotion_code": "", "technology": "", "offer_id": "400001035", "dev_staff_id": "", "account_id": "", "deploy_accept_date": "", "serial": "8984049767000887694", "group_id": "", "ip_gateway": "", "first_connect": "", "org_product_code": "MIGRATE", "start_money": "100000", "keep_alive": "", "account": "" }' \
| kafkacat -b localhost:9092 -P -t subscriber

流输出

注意 null 在第二列-这是键(第一列是消息的时间戳;其余的列是消息中声明的字段)

ksql> select * from str_subscriber_json;
1528368689380 | null | 1083168000 |  | 10922428129 |  | 2 | 2046573688 | 452049760887694 | 8984049767000887694 | 2 | HPN03 | 400001035 | 000 | 20090511152847 | 20090511152102 |  | 0 | 0 | 1000405100 |  |  |  |  | 20090511152102 | TUDV_POPBGG | 20170801171355 | MIGRATE |  |  |  | MIGRATE |  | 100000 | 1 |  |  |  |  |  |  |  |  |  |  |  | 0 |  |  |  |  |  |  |  |  |  |  |  | 2 | 1 | 5131615 | null | null

表格输出

ksql> SELECT * FROM tbl_subscriber_json;

(无输出)

测试2-带钥匙组

这里的键被任意设置为 1 ,使用 kafkacat-K 要指定的标志 : 作为键/值分隔符。

$ echo '1:{ "ip_router": "", "ip_lan": "", "isdn": "2046573688", "end_datetime": "", "shop_code": "1000405100", "reg_type_id": "5131615", "contract_id": "", "update_datetime": "20170801171355", "project": "", "telecom_service_id": "2", "local_speed": "", "password": "", "price_plan": "", "vip": "", "local_price_plan": "", "sub_id": "1083168000", "sta_datetime": "20090511152847", "update_number_1": "1", "act_status": "000", "network_class": "", "limit_usage": "", "num_reset_zone": "", "deposit": "", "create_user": "TUDV_POPBGG", "num_of_computer": "", "cust_id": "10922428129", "status": "2", "active_datetime": "20090511152102", "ip_view": "", "channel_type_id": "", "ip_wan": "", "imsi": "452049760887694", "infrastructure_type": "", "product_code": "HPN03", "expire_datetime": "", "speed": "", "private_ip": "", "update_user": "MIGRATE", "ip_static": "", "vlan": "", "sub_type": "", "create_datetime": "20090511152102", "is_info_completed": "1", "pay_type": "2", "up_link": "", "promotion_code": "", "technology": "", "offer_id": "400001035", "dev_staff_id": "", "account_id": "", "deploy_accept_date": "", "serial": "8984049767000887694", "group_id": "", "ip_gateway": "", "first_connect": "", "org_product_code": "MIGRATE", "start_money": "100000", "keep_alive": "", "account": "" }' \
| kafkacat -b localhost:9092 -P -t subscriber -K:

流输出

注意 1 在第二列-这是键(第一列是消息的时间戳;其余的列是消息中声明的字段)

1528368781916 | 1 | 1083168000 |  | 10922428129 |  | 2 | 2046573688 | 452049760887694 | 8984049767000887694 | 2 | HPN03 | 400001035 | 000 | 20090511152847 | 20090511152102 |  | 0 | 0 | 1000405100 |  |  |  |  | 20090511152102 | TUDV_POPBGG | 20170801171355 | MIGRATE |  |  |  | MIGRATE |  | 100000 | 1 |  |  |  |  |  |  |  |  |  |  |  | 0 |  |  |  |  |  |  |  |  |  |  |  | 2 | 1 | 5131615 | null | null

表格输出

1528368781916 | 1 | 1083168000 |  | 10922428129 |  | 2 | 2046573688 | 452049760887694 | 8984049767000887694 | 2 | HPN03 | 400001035 | 000 | 20090511152847 | 20090511152102 |  | 0 | 0 | 1000405100 |  |  |  |  | 20090511152102 | TUDV_POPBGG | 20170801171355 | MIGRATE |  |  |  | MIGRATE |  | 100000 | 1 |  |  |  |  |  |  |  |  |  |  |  | 0 |  |  |  |  |  |  |  |  |  |  |  | 2 | 1 | 5131615 | null | null

使用ksql自动重新键入主题

可以使用ksql重新划分主题。例如,把你的消息来源 subscriber 主题,下面是如何使用ksql对其重新分区以设置密钥:

ksql> CREATE STREAM SUBSCRIBER_KEYED AS SELECT * FROM str_subscriber_json PARTITION BY sub_id;

 Message
----------------------------
 Stream created and running
----------------------------
ksql>

这填充了一个Kafka主题( SUBSCRIBER_KEYED )然后可以在其上定义表:

CREATE TABLE subscriber_table (sub_id VARCHAR,contract_id VARCHAR,cust_id VARCHAR,account_id VARCHAR,telecom_service_id VARCHAR,isdn VARCHAR,imsi VARCHAR,serial VARCHAR,status VARCHAR,product_code VARCHAR,offer_id VARCHAR,act_status VARCHAR,sta_datetime BIGINT,active_datetime BIGINT,sub_type VARCHAR,end_datetime BIGINT,expire_datetime BIGINT,shop_code VARCHAR,dev_staff_id VARCHAR,promotion_code VARCHAR,vip VARCHAR,account VARCHAR,create_datetime BIGINT,create_user VARCHAR,update_datetime BIGINT,update_user VARCHAR,deposit VARCHAR,limit_usage VARCHAR,password VARCHAR,org_product_code VARCHAR,num_reset_zone VARCHAR,start_money VARCHAR,is_info_completed VARCHAR,channel_type_id VARCHAR,first_connect VARCHAR,speed VARCHAR,keep_alive VARCHAR,price_plan VARCHAR,local_price_plan VARCHAR,project VARCHAR,local_speed VARCHAR,technology VARCHAR,network_class VARCHAR,infrastructure_type VARCHAR,deploy_accept_date BIGINT,group_id VARCHAR,ip_static VARCHAR,ip_view VARCHAR,ip_lan VARCHAR,ip_wan VARCHAR,ip_gateway VARCHAR,ip_router VARCHAR,num_of_computer VARCHAR,vlan VARCHAR,up_link VARCHAR,private_ip VARCHAR,pay_type VARCHAR,update_number_1 VARCHAR,reg_type_id VARCHAR,type VARCHAR,update_number VARCHAR) \
WITH (KAFKA_TOPIC='SUBSCRIBER_KEYED', VALUE_FORMAT='JSON', KEY = 'sub_id' );

现在当你给 subscriber ,即使未设置键,该表也将工作:

ksql> select * from subscriber_table;
1528369407576 | 1083168000 | 1083168000 |  | 10922428129 |  | 2 | 2046573688 | 452049760887694 | 8984049767000887694 | 2 | HPN03 | 400001035 | 000 | 20090511152847 | 20090511152102 |  | 0 | 0 | 1000405100 |  |  |  |  | 20090511152102 | TUDV_POPBGG | 20170801171355 | MIGRATE |  |  |  | MIGRATE |  | 100000 | 1 |  |  |  |  |  |  |  |  |  |  |  | 0 |  |  |  |  |  |  |  |  |  |  |  | 2 | 1 | 5131615 | null | null

相关问题