**摘要:**华为Flink可视化开发平台FlinkServer作为自研服务,能够提供比原生flinksql接口更强的企业级特性,比如任务的集中管理,可视化开发,多数据源配置等。
本文分享自华为云社区《华为FusionInsight MRS实战 - Flink增强特性之可视化开发平台FlinkSever开发学习》,作者:晋红轻。
随着流计算的发展,挑战不再仅限于数据量和计算量,业务变得越来越复杂。如何提高开发者的效率,降低流计算的门槛,对推广实时计算非常重要。
SQL 是数据处理中使用最广泛的语言,它允许用户简明扼要地展示其业务逻辑。Flink 作为流批一体的计算引擎自1.7.2版本开始引入Flink SQL的特性,并不断发展。之前,用户可能需要编写上百行业务代码,使用 SQL 后,可能只需要几行 SQL 就可以轻松搞定。
但是真正的要将Flink SQL开发工作投入到实际的生产场景中,如果使用原生的API接口进行作业的开发还是存在门槛较高,易用性低,SQL代码可维护性差的问题。新需求由业务人员提交给IT人员,IT人员排期开发。从需求到上线,周期长,导致错失新业务最佳市场时间窗口。同时,IT人员工作繁重,大量相似Flink作业,成就感低。
下面将以kafka为例分别使用原生API接口以及FlinkServer进行作业开发,对比突出FlinkServer的优势
参考已发论坛帖 《华为FusionInsight MRS FlinkSQL 复杂嵌套Json解析最佳实践》
需要使用FlinkSQL从一个源kafka topic接收cdl复杂嵌套json数据并进行解析,将解析后的数据发送到另一个kafka topic里
source /opt/hadoopclient/bigdata_env
kinit developuser
cd /opt/hadoopclient/Flink/flink
./bin/yarn-session.sh -t ssl/
cd /opt/hadoopclient/Flink/flink/bin
./sql-client.sh embedded -d ./../conf/sql-client-defaults.yaml
CREATE TABLE huditableout_source(
`schema` ROW < `fields` ARRAY< ROW<type STRING, optional BOOLEAN, field STRING>> >,
payload ROW < `TIMESTAMP` BIGINT, `data` ROW < uid INT,
uname VARCHAR(32),
age INT,
sex VARCHAR(30),
mostlike VARCHAR(30),
lastview VARCHAR(30),
totalcost INT> >,
type1 as `schema`.`fields`[1].type,
optional1 as `schema`.`fields`[1].optional,
field1 as `schema`.`fields`[1].field,
type2 as `schema`.`fields`[2].type,
optional2 as `schema`.`fields`[2].optional,
field2 as `schema`.`fields`[2].field,
ts as payload.`TIMESTAMP`,
uid as payload.`data`.uid,
uname as payload.`data`.uname,
age as payload.`data`.age,
sex as payload.`data`.sex,
mostlike as payload.`data`.mostlike,
lastview as payload.`data`.lastview,
totalcost as payload.`data`.totalcost,
localts as LOCALTIMESTAMP
) WITH(
'connector' = 'kafka',
'topic' = 'huditableout',
'properties.bootstrap.servers' = '172.16.9.113:21007,172.16.9.117:21007,172.16.9.118:21007',
'properties.group.id' = 'example',
'scan.startup.mode' = 'latest-offset',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true',
'properties.sasl.kerberos.service.name' = 'kafka',
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.kerberos.domain.name' = 'hadoop.hadoop.com'
);
CREATE TABLE huditableout(
type1 VARCHAR(32),
optional1 BOOLEAN,
field1 VARCHAR(32),
type2 VARCHAR(32),
optional2 BOOLEAN,
field2 VARCHAR(32),
ts BIGINT,
uid INT,
uname VARCHAR(32),
age INT,
sex VARCHAR(30),
mostlike VARCHAR(30),
lastview VARCHAR(30),
totalcost INT,
localts TIMESTAMP
) WITH(
'connector' = 'kafka',
'topic' = 'huditableout2',
'properties.bootstrap.servers' = '172.16.9.113:21007,172.16.9.117:21007,172.16.9.118:21007',
'properties.group.id' = 'example',
'scan.startup.mode' = 'latest-offset',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true',
'properties.sasl.kerberos.service.name' = 'kafka',
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.kerberos.domain.name' = 'hadoop.hadoop.com'
);
insert into
huditableout
select
type1,
optional1,
field1,
type2,
optional2,
field2,
ts,
uid,
uname,
age,
sex,
mostlike,
lastview,
totalcost,
localts
from
huditableout_source;
消费生产源kafka topic的数据(由cdl生成)
消费目标端kafka topic解析后的数据(flink sql任务生成的结果)
可以登录flink原生界面查看任务
首先使用命令set execution.result-mode=tableau; 可以让查询结果直接输出到终端
使用flink sql查询上面已创建好的流表
select * from huditableout
注意:因为是kafka流表,所以查询结果只会显示select任务启动之后写进该topic的数据
说明: 可以看到开发flink sql任务时在FlinkServer界面可以自行设置flink集群规模
CREATE TABLE huditableout_source(
`schema` ROW < `fields` ARRAY< ROW<type STRING, optional BOOLEAN, field STRING>> >,
payload ROW < `TIMESTAMP` BIGINT, `data` ROW < uid INT,
uname VARCHAR(32),
age INT,
sex VARCHAR(30),
mostlike VARCHAR(30),
lastview VARCHAR(30),
totalcost INT> >,
type1 as `schema`.`fields`[1].type,
optional1 as `schema`.`fields`[1].optional,
field1 as `schema`.`fields`[1].field,
type2 as `schema`.`fields`[2].type,
optional2 as `schema`.`fields`[2].optional,
field2 as `schema`.`fields`[2].field,
ts as payload.`TIMESTAMP`,
uid as payload.`data`.uid,
uname as payload.`data`.uname,
age as payload.`data`.age,
sex as payload.`data`.sex,
mostlike as payload.`data`.mostlike,
lastview as payload.`data`.lastview,
totalcost as payload.`data`.totalcost,
localts as LOCALTIMESTAMP
) WITH(
'connector' = 'kafka',
'topic' = 'huditableout',
'properties.bootstrap.servers' = '172.16.9.113:21007,172.16.9.117:21007,172.16.9.118:21007',
'properties.group.id' = 'example',
'scan.startup.mode' = 'latest-offset',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true',
'properties.sasl.kerberos.service.name' = 'kafka',
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.kerberos.domain.name' = 'hadoop.hadoop.com'
);
CREATE TABLE huditableout(
type1 VARCHAR(32),
optional1 BOOLEAN,
field1 VARCHAR(32),
type2 VARCHAR(32),
optional2 BOOLEAN,
field2 VARCHAR(32),
ts BIGINT,
uid INT,
uname VARCHAR(32),
age INT,
sex VARCHAR(30),
mostlike VARCHAR(30),
lastview VARCHAR(30),
totalcost INT,
localts TIMESTAMP
) WITH(
'connector' = 'kafka',
'topic' = 'huditableout2',
'properties.bootstrap.servers' = '172.16.9.113:21007,172.16.9.117:21007,172.16.9.118:21007',
'properties.group.id' = 'example',
'scan.startup.mode' = 'latest-offset',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true',
'properties.sasl.kerberos.service.name' = 'kafka',
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.kerberos.domain.name' = 'hadoop.hadoop.com'
);
insert into
huditableout
select
type1,
optional1,
field1,
type2,
optional2,
field2,
ts,
uid,
uname,
age,
sex,
mostlike,
lastview,
totalcost,
localts
from
huditableout_source;
消费生产源kafka topic的数据(由cdl生成)
消费目标端kafka topic解析后的数据(flink sql任务生成的结果)
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://huaweicloud.blog.csdn.net/article/details/122230918
内容来源于网络,如有侵权,请联系作者删除!