Skip to content

彩虹鱼SQL语法汇总

基本语法

彩虹鱼的基本语法来自于Flink SQL, 具体可以参考Flink SQL的官方文档

彩虹鱼的SQL目前主要支持的是CREATE TABLE, DROP TABLE, INSERT INTO SELECT和SET语法,其他语法暂时未经验证。

CREATE TABLE语句

Flink的CREATE TABLE主要是用于定义输入源或者输出源的数据格式,语法如下:

CREATE TABLE table1(
  `id` BIGINT,
  `name` STRING
) WITH (
  'connector' = 'jdbc',
  'property' = 'value'
);
  • 每个源的定义的属性都不一样, 具体参考Connecter里的具体描述。
  • CREATE TABLE 在不同的SQL文件里面是不能共享的,在不同的sql文件里面需要重复定义表结构才行。

DROP TABLE语句

主要用于在相同的任务里面删除定义的表结构。

INSERT INTO语句

INSERT INTO 主要用于数据写入,语法和普通的SQL里的类似,可以直接指定VALUES也可以指定一个查询语句。

  • INSERT INTO可以是批处理,也可以是流处理,取决于后面跟的SELECT源是批还是流

SELECT语句

主要用于查询源数据

SET语句

SET语句主要用于设置全局的环境变量,如并行度,checkpoint间隔等。参考官方文档

彩虹鱼内置变量

彩虹鱼平台内置了一些变量,可以通过#{['变量名称'].method(arg1)}调用。

注意:彩虹鱼的内置变量只有在彩虹鱼平台运行任务时才能生效,本地执行时不生效。

date 变量

date 变量用于获取时间相关功能,

  • #{['date'].lastRun()}, 获取最后一次任务运行成功的运行的时间戳
  • #{['date'].currentRun()}, 获取本次运行的时间戳
  • lastRun和currentRun都支持一个时间格式参数,用于格式化时间输出,如#

数据源变量

除了上面的内置变量外,其他的变量都是数据源变量,数据源变量用于统一管理flink的数据源,防止在每个sql任务里面维护数据源连接地址,用户名和密码等信息。 数据源可以根据环境配置不同的连接地址,任务在执行时会自动查询当前环境的数据源地址。

  • #{['数据源名称']}, 获取数据源连接字符串
CREATE TABLE jc_mall_goods_evaluation
(
    id INTEGER,
	order_id STRING,
	goods_id INTEGER,
	goods_name STRING,
	goods_sku STRING,
	scores INTEGER,
	content STRING,
	image STRING,
	evaluation_time TIMESTAMP,
	PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    #{['jc_qc_service']},
    'table-name' = 'jc_mall_goods_evaluation'
);

上述的jc_qc_service就定义了一个数据源,在彩虹鱼平台数据源管理里面建立一个jc_qc_service名称的数据源即可。

支持的Connectors

Source Connector

jdbc

jdbc Connector主要用于连接mysql和oracle,具体请参考官方文档

CREATE TABLE MyUserTable (
  id BIGINT,
  name STRING,
  age INT,
  status BOOLEAN,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://localhost:3306/mydatabase',
   'table-name' = 'users'
);

elasticsearch-source

elasticsearch-source是一个自定义的connector,用于使用rest api读取elasticsearch里面的index的数据。

基本用法
CREATE TABLE gauss_access_log (
  `id`  STRING,
  ip STRING,
  `appId` STRING,
  `type` STRING,
  `data` STRING,
  `timestamp` BIGINT
) WITH (
  'connector' = 'elasticsearch-source',
  'url' = 'http://localhost:9200',
  'query' = '{"query_string":{"query": "type: s*"}}',
  'format'='json',
  'index'='gauss_access_log',
  'mode'='searchAfter'
);

如果数据是复杂类型,可以使用一下语法定义。

CREATE TABLE GAUSS_ACCESS_LOG_TMP (
    id STRING,
    ip STRING,
    `timestamp` STRING,
    appId STRING,
    `data` ROW<event ROW<event STRING, properties STRING>>
) WITH (
  'connector' = 'elasticsearch-source',
  'url' = 'http://localhost:9200',
  'query' = '{"query_string":{"query": "type: s*"}}',
  'format'='json',
  'index'='gauss_access_log',
  'mode'='searchAfter'
);
支持的模式

可以支持两种模式的es全量拉取方式

  1. scroll: 用于后台同步数据,性能比较好
  2. searchAfter:用于实时查询数据
sql
DROP TABLE IF EXISTS device;
CREATE TABLE device (
    gbid  STRING,
    status INT
) WITH (
  'connector' = 'elasticsearch-source',
  'url' = 'http://localhost:9200',
  'query' = '{"match_all":{}}',
  'format'='json',
  'index'='kibana_sample_data_flights',
  'mode'='searchAfter'
);
参数列表
  • url: es服务器的地址
  • query: 服务端过滤条件
  • index: 索引/别名名称
  • mode: scroll或者searchAfter

rest-source

rest-source是一个自定义的connector,用于读取rest api的数据。

CREATE TABLE test (
    `id`  STRING,
    `workNo` STRING
) WITH (
    'connector' = 'rest-source',
    'url' = 'https://cloud-inner.tineco.com/dataway-api/api/public/listSignature',
    'format'='json',
    'query'='page',
    'body'='{}',
    'pageArg'='pageNumber',
    'dataPath'='$.value.pageData',
    'method'='get'
);
支持的模式

可以支持两种模式的es全量拉取方式

  1. page: 分页读取数据
  2. list:全量读取数据
参数列表
  • url: http请求的地址
  • method: 请求方式
  • query: 查询方式page或者list
  • body: 请求体
  • pageArg: 分页参数
  • dataPath: 响应里面的列表的jsonpath

hbase-source

hbase-source是一个自定义的connector,用于读取Hbase里的数据。注意,此连接器和官方的hbase-2.2的处理方式不一样,此连接器支持服务端的过滤,官方的连接器只能做客户端过滤,性能上要好于官方连接器。

CREATE TABLE ticket_deal_info (
    rowkey STRING,
    info ROW<replyTime STRING,
    ticketId STRING,
    companyId STRING,
    replyTitle STRING,
    replyContent STRING>,
    PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
    'connector' = 'hbase-source',
    'zookeeper.quorum' = 'zk1,zk2,zk3',
    'filter' = 'SingleColumnValueFilter("info","replyTime",>=,"binary:1656216649") AND SingleColumnValueFilter("info","replyTime",<,"binary:1656303049")',
    'format'='json',
    'table-name'='ticket_deal_info'
);
参数列表
  • zookeeper.quorum: 集群的zk地址
  • filter: 自定义过滤器
  • table-name: Hbase表名

kafka

kafka是流处理,请参考官方文档

CREATE TABLE KafkaTable (
  `partition_id` BIGINT METADATA FROM 'partition' VIRTUAL,  -- from Kafka connector
  `offset` BIGINT METADATA VIRTUAL,  -- from Kafka connector
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'user_behavior',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'value.format' = 'json'
);

mysql-cdc

mysql-cdc用的是第三方的cdc连接器,也是流处理类型的,请参考官方文档

CREATE TABLE if not exists mysql_odb_order_goods_cdc (
  `order_goods_id` int  ,
  `order_id` string ,
  `shop_id` bigint ,
  `shop_name` string  ,
  PRIMARY KEY(`order_goods_id`) NOT ENFORCED
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = 'localhost',
  'port' = '3306',
  'username' = 'test',
  'password' = 'test',
  'database-name' = 'java_order',
  'table-name' = 'odb_order_goods'
);

Sink Connector

jdbc

jdbc Connector主要用于连接mysql和oracle,具体请参考官方文档

CREATE TABLE ODS_HBASE_TICKET_DEAL_INFO (
	ID STRING,
	REPLYTIME TIMESTAMP,
	REPLYTYPE STRING,
	STARTTYPE STRING,
	TICKETID STRING,
	DEALUSERID STRING,
    PRIMARY KEY (ID) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:oracle:thin:@localhost:1521:ORCL',
    'username' = 'test',
    'password' = 'test',
    'table-name' = 'ODS_HBASE_TICKET_DEAL_INFO'
);

elasticsearch

elasticsearch是用于写入elasticsearch,也是官方组件,请参考官方文档官方文档

CREATE TABLE csm_ticket_deal_info
(
    rowkey STRING,
    ticketId STRING,
    replyContent STRING,
    replyTitle STRING,
    replyTime bigint,
    PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
    'connector' = 'elasticsearch-6',
    'hosts' = 'http://es.10.108.110.33.nip.io:80',
    'index' = 'csm_ticket_deal_info',
    'document-type' = '_doc',
    'format' = 'json'
);

kafka

kafka的Sink Connector也是官方组件,请参考官方文档

print

print connector主要用于将数据打印到控制台,一般用于本地调试场景。

CREATE TABLE print_table (
 id INT,
 grade INT,
 name STRING,
 score DOUBLE
) WITH (
 'connector' = 'print'
)