Appearance
彩虹鱼SQL语法汇总
基本语法
彩虹鱼的基本语法来自于Flink SQL, 具体可以参考Flink SQL的官方文档。
彩虹鱼的SQL目前主要支持的是CREATE TABLE, DROP TABLE, INSERT INTO SELECT和SET语法,其他语法暂时未经验证。
CREATE TABLE语句
Flink的CREATE TABLE主要是用于定义输入源或者输出源的数据格式,语法如下:
CREATE TABLE table1(
`id` BIGINT,
`name` STRING
) WITH (
'connector' = 'jdbc',
'property' = 'value'
);- 每个源的定义的属性都不一样, 具体参考Connecter里的具体描述。
- CREATE TABLE 在不同的SQL文件里面是不能共享的,在不同的sql文件里面需要重复定义表结构才行。
DROP TABLE语句
主要用于在相同的任务里面删除定义的表结构。
INSERT INTO语句
INSERT INTO 主要用于数据写入,语法和普通的SQL里的类似,可以直接指定VALUES也可以指定一个查询语句。
- INSERT INTO可以是批处理,也可以是流处理,取决于后面跟的SELECT源是批还是流
SELECT语句
主要用于查询源数据
SET语句
SET语句主要用于设置全局的环境变量,如并行度,checkpoint间隔等。参考官方文档
彩虹鱼内置变量
彩虹鱼平台内置了一些变量,可以通过#{['变量名称'].method(arg1)}调用。
注意:彩虹鱼的内置变量只有在彩虹鱼平台运行任务时才能生效,本地执行时不生效。
date 变量
date 变量用于获取时间相关功能,
- #{['date'].lastRun()}, 获取最后一次任务运行成功的运行的时间戳
- #{['date'].currentRun()}, 获取本次运行的时间戳
- lastRun和currentRun都支持一个时间格式参数,用于格式化时间输出,如#
数据源变量
除了上面的内置变量外,其他的变量都是数据源变量,数据源变量用于统一管理flink的数据源,防止在每个sql任务里面维护数据源连接地址,用户名和密码等信息。 数据源可以根据环境配置不同的连接地址,任务在执行时会自动查询当前环境的数据源地址。
- #{['数据源名称']}, 获取数据源连接字符串
CREATE TABLE jc_mall_goods_evaluation
(
id INTEGER,
order_id STRING,
goods_id INTEGER,
goods_name STRING,
goods_sku STRING,
scores INTEGER,
content STRING,
image STRING,
evaluation_time TIMESTAMP,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
#{['jc_qc_service']},
'table-name' = 'jc_mall_goods_evaluation'
);上述的jc_qc_service就定义了一个数据源,在彩虹鱼平台数据源管理里面建立一个jc_qc_service名称的数据源即可。
支持的Connectors
Source Connector
jdbc
jdbc Connector主要用于连接mysql和oracle,具体请参考官方文档。
CREATE TABLE MyUserTable (
id BIGINT,
name STRING,
age INT,
status BOOLEAN,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/mydatabase',
'table-name' = 'users'
);elasticsearch-source
elasticsearch-source是一个自定义的connector,用于使用rest api读取elasticsearch里面的index的数据。
基本用法
CREATE TABLE gauss_access_log (
`id` STRING,
ip STRING,
`appId` STRING,
`type` STRING,
`data` STRING,
`timestamp` BIGINT
) WITH (
'connector' = 'elasticsearch-source',
'url' = 'http://localhost:9200',
'query' = '{"query_string":{"query": "type: s*"}}',
'format'='json',
'index'='gauss_access_log',
'mode'='searchAfter'
);如果数据是复杂类型,可以使用一下语法定义。
CREATE TABLE GAUSS_ACCESS_LOG_TMP (
id STRING,
ip STRING,
`timestamp` STRING,
appId STRING,
`data` ROW<event ROW<event STRING, properties STRING>>
) WITH (
'connector' = 'elasticsearch-source',
'url' = 'http://localhost:9200',
'query' = '{"query_string":{"query": "type: s*"}}',
'format'='json',
'index'='gauss_access_log',
'mode'='searchAfter'
);支持的模式
可以支持两种模式的es全量拉取方式
- scroll: 用于后台同步数据,性能比较好
- searchAfter:用于实时查询数据
sql
DROP TABLE IF EXISTS device;
CREATE TABLE device (
gbid STRING,
status INT
) WITH (
'connector' = 'elasticsearch-source',
'url' = 'http://localhost:9200',
'query' = '{"match_all":{}}',
'format'='json',
'index'='kibana_sample_data_flights',
'mode'='searchAfter'
);参数列表
- url: es服务器的地址
- query: 服务端过滤条件
- index: 索引/别名名称
- mode: scroll或者searchAfter
rest-source
rest-source是一个自定义的connector,用于读取rest api的数据。
CREATE TABLE test (
`id` STRING,
`workNo` STRING
) WITH (
'connector' = 'rest-source',
'url' = 'https://cloud-inner.tineco.com/dataway-api/api/public/listSignature',
'format'='json',
'query'='page',
'body'='{}',
'pageArg'='pageNumber',
'dataPath'='$.value.pageData',
'method'='get'
);支持的模式
可以支持两种模式的es全量拉取方式
- page: 分页读取数据
- list:全量读取数据
参数列表
- url: http请求的地址
- method: 请求方式
- query: 查询方式page或者list
- body: 请求体
- pageArg: 分页参数
- dataPath: 响应里面的列表的jsonpath
hbase-source
hbase-source是一个自定义的connector,用于读取Hbase里的数据。注意,此连接器和官方的hbase-2.2的处理方式不一样,此连接器支持服务端的过滤,官方的连接器只能做客户端过滤,性能上要好于官方连接器。
CREATE TABLE ticket_deal_info (
rowkey STRING,
info ROW<replyTime STRING,
ticketId STRING,
companyId STRING,
replyTitle STRING,
replyContent STRING>,
PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
'connector' = 'hbase-source',
'zookeeper.quorum' = 'zk1,zk2,zk3',
'filter' = 'SingleColumnValueFilter("info","replyTime",>=,"binary:1656216649") AND SingleColumnValueFilter("info","replyTime",<,"binary:1656303049")',
'format'='json',
'table-name'='ticket_deal_info'
);参数列表
- zookeeper.quorum: 集群的zk地址
- filter: 自定义过滤器
- table-name: Hbase表名
kafka
kafka是流处理,请参考官方文档
CREATE TABLE KafkaTable (
`partition_id` BIGINT METADATA FROM 'partition' VIRTUAL, -- from Kafka connector
`offset` BIGINT METADATA VIRTUAL, -- from Kafka connector
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'value.format' = 'json'
);mysql-cdc
mysql-cdc用的是第三方的cdc连接器,也是流处理类型的,请参考官方文档
CREATE TABLE if not exists mysql_odb_order_goods_cdc (
`order_goods_id` int ,
`order_id` string ,
`shop_id` bigint ,
`shop_name` string ,
PRIMARY KEY(`order_goods_id`) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'test',
'password' = 'test',
'database-name' = 'java_order',
'table-name' = 'odb_order_goods'
);Sink Connector
jdbc
jdbc Connector主要用于连接mysql和oracle,具体请参考官方文档
CREATE TABLE ODS_HBASE_TICKET_DEAL_INFO (
ID STRING,
REPLYTIME TIMESTAMP,
REPLYTYPE STRING,
STARTTYPE STRING,
TICKETID STRING,
DEALUSERID STRING,
PRIMARY KEY (ID) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:oracle:thin:@localhost:1521:ORCL',
'username' = 'test',
'password' = 'test',
'table-name' = 'ODS_HBASE_TICKET_DEAL_INFO'
);elasticsearch
elasticsearch是用于写入elasticsearch,也是官方组件,请参考官方文档官方文档
CREATE TABLE csm_ticket_deal_info
(
rowkey STRING,
ticketId STRING,
replyContent STRING,
replyTitle STRING,
replyTime bigint,
PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-6',
'hosts' = 'http://es.10.108.110.33.nip.io:80',
'index' = 'csm_ticket_deal_info',
'document-type' = '_doc',
'format' = 'json'
);kafka
kafka的Sink Connector也是官方组件,请参考官方文档
print
print connector主要用于将数据打印到控制台,一般用于本地调试场景。
CREATE TABLE print_table (
id INT,
grade INT,
name STRING,
score DOUBLE
) WITH (
'connector' = 'print'
)