FLinkSQL-Kafka
CREATE TABLE `source_aaa_mysql_kafka_test` (`last_name` string,`id` int,`hire_date` date,`department` string,`salary` float,`is_manager` int,`first_name` string,`email` string
)
WITH ('connector' = 'kafka','value.format' = 'json','datasource.name' = 'Kafka','topic' = 'aaa_mysql_kafka_test','properties.group.id' = 'kafka_mysql_test','scan.startup.mode' = 'earliest-offset')
;CREATE TABLE `sink_zyztest_employees_0902` (`department` VARCHAR(50),`email` VARCHAR(100),`is_manager` INT,`hire_date` TIMESTAMP,`first_name` VARCHAR(50),`id` INT,`last_name` VARCHAR(50),`salary` DECIMAL(10, 2),PRIMARY KEY (id) NOT ENFORCED
)
WITH ('connector' = 'jdbc','datasource.name' = 'mysql5','table-name' = 'employees_0902','url' = 'jdbc:mysql://[host:port]/zyztest?characterEncoding=UTF-8')
;INSERT INTO `sink_zyztest_employees_0902`
SELECT CAST(`department` AS VARCHAR(50)) AS `department`,CAST(`email` AS VARCHAR(100)) AS `email`,`is_manager` AS `is_manager`,CAST(`hire_date` AS TIMESTAMP) AS `hire_date`,CAST(`first_name` AS VARCHAR(50)) AS `first_name`,`id` AS `id`,CAST(`last_name` AS VARCHAR(50)) AS `last_name`,CAST(`salary` AS DECIMAL(10, 2)) AS `salary`
FROM `source_aaa_mysql_kafka_test`
;
FLink-kafka-Hudi
CREATE CATALOG `hudi_catalog`
WITH ('type' = 'hudi','datasource.name' = 'Hive_Cyber_Engine','mode' = 'hms');-- create database if not EXISTS hudi_catalog.default;
CREATE TABLE IF NOT EXISTS hudi_catalog.`default`.studentinfo11(uuid INT PRIMARY KEY NOT ENFORCED,name VARCHAR(10),age INT,ts TIMESTAMP(3),`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
'connector' = 'hudi',
'table.type' = 'COPY_ON_WRITE'
);CREATE TABLE IF NOT EXISTS hudi_catalog.`default`.studentinfo22(uuid INT PRIMARY KEY NOT ENFORCED,name VARCHAR(10),age INT,ts TIMESTAMP(3),`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
'connector' = 'hudi',
'table.type' = 'COPY_ON_WRITE'
);
INSERT INTO hudi_catalog.`default`.studentinfo11 VALUES(1,'Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),(2,'Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),(3,'Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),(4,'Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),(5,'Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),(6,'Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),(7,'Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),(8,'Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');INSERT INTO hudi_catalog.`default`.studentinfo22 VALUES(1,'Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),(2,'Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),(3,'Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),(4,'Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),(5,'Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),(6,'Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),(7,'Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),(8,'Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');
FLink-Mysql-Hudi
CREATE TABLE `source_Kafka_1008_3` (`partition` string,`name` string,`uuid` int,`age` int,`ts` TIMESTAMP
)
WITH ('connector' = 'kafka','value.format' = 'json','datasource.name' = 'kafka32','topic' = 'Kafka_1008_3','scan.startup.mode' = 'earliest-offset')
;CREATE CATALOG `sink_hudi_catalog`
WITH ('type' = 'hudi','datasource.name' = 'Hive_Cluster_512','mode' = 'hms')
;INSERT INTO `sink_hudi_catalog`.`default`.`studentinfo22` (`name`, `partition`, `uuid`, `age`, `ts`)
SELECT `name` AS `name`,`partition` AS `partition`,`uuid` AS `uuid`,`age` AS `age`,`ts` AS `ts`
FROM `source_Kafka_1008_3`
;
FLink-Mysql-Kafaka
CREATE TABLE `source_fctest_test` (uuid INT PRIMARY KEY NOT ENFORCED,name VARCHAR(10),age INT,ts TIMESTAMP(3),`partition` VARCHAR(20)
)
WITH ('connector' = 'mysql-cdc','database-name' = 'fctest','table-name' = 'studentinfo22','datasource.name' = 'Mysql_3306','jdbc.properties.useSSL' = 'false','jdbc.properties.characterEncoding' = 'UTF-8','jdbc.properties.serverTimezone' = 'Asia/Shanghai','debezium.database.serverTimezone' = 'Asia/Shanghai','debezium.database.connectionTimeZone' = 'Asia/Shanghai','scan.startup.mode' = 'initial','server-id' = '815111232-814446237')
;CREATE TABLE `sink_Kafka_1008_2` (uuid INT PRIMARY KEY NOT ENFORCED,name VARCHAR(10),age INT,ts TIMESTAMP(3),`partition` VARCHAR(20)
)
WITH ('connector' = 'upsert-kafka','value.format' = 'json','datasource.name' = 'kafka32','topic' = 'Kafka_1008_4')
;INSERT INTO `sink_Kafka_1008_2` SELECT *
FROM `source_fctest_test`
;
FLink-application-k8s
CREATE TABLE Orders (order_number BIGINT,price DECIMAL(32,2),buyer ROW<first_name STRING, last_name STRING>,order_time TIMESTAMP(3)
) WITH ('connector' = 'datagen'
);
CREATE TABLE Orders1 (order_number BIGINT,price DECIMAL(32,2),buyer ROW<first_name STRING, last_name STRING>,order_time TIMESTAMP(3)
) WITH ('connector' = 'print'
);insert into Orders1 select * from Orders;
FLink-session-k8s
CREATE TABLE `source_pipeline2_test1` (`name` VARCHAR(255),`id` INT,`age` VARCHAR(11),PRIMARY KEY (id) NOT ENFORCED
)
WITH ('jdbc.properties.useSSL' = 'false','jdbc.properties.characterEncoding' = 'UTF-8','scan.incremental.snapshot.enabled' = 'false','connector' = 'mysql-cdc','database-name' = 'pipeline2','datasource.name' = 'mysql0720','debezium.database.connectionTimeZone' = 'Asia/Shanghai','table-name' = 'ccc_test1','jdbc.properties.serverTimezone' = 'Asia/Shanghai','scan.startup.mode' = 'initial','debezium.database.serverTimezone' = 'Asia/Shanghai')
;
CREATE TABLE `sink_FLINKUSER_j_COPY` (`AGE` STRING,`ID` BIGINT,`NAME` STRING,PRIMARY KEY (ID) NOT ENFORCED
)
WITH ('connector' = 'jdbc','datasource.name' = 'Oracle_chuan','table-name' = 'FLINKUSER."j_COPY"')
;INSERT INTO `sink_FLINKUSER_j_COPY`
SELECT CAST(`source_pipeline2_ccc_test1`.`age` AS STRING) AS `AGE`,CAST(`source_pipeline2_ccc_test1`.`id` AS BIGINT) AS `ID`,CAST(`source_pipeline2_ccc_test1`.`name` AS STRING) AS `NAME`
FROM `source_pipeline2_test1`
;
select * from `sink_FLINKUSER_j_COPY`;
FLink-application-yarn
CREATE TABLE `source_pipeline2_test1` (`name` VARCHAR(255),`id` INT,`age` VARCHAR(11),PRIMARY KEY (id) NOT ENFORCED
)
WITH ('jdbc.properties.useSSL' = 'false','jdbc.properties.characterEncoding' = 'UTF-8','scan.incremental.snapshot.enabled' = 'false','connector' = 'mysql-cdc','database-name' = 'pipeline2','datasource.name' = 'mysql0720','debezium.database.connectionTimeZone' = 'Asia/Shanghai','table-name' = 'ccc_test1','jdbc.properties.serverTimezone' = 'Asia/Shanghai','scan.startup.mode' = 'initial','debezium.database.serverTimezone' = 'Asia/Shanghai')
;
CREATE TABLE `sink_FLINKUSER_j_COPY` (`AGE` STRING,`ID` BIGINT,`NAME` STRING,PRIMARY KEY (ID) NOT ENFORCED
)
WITH ('connector' = 'jdbc','datasource.name' = 'Oracle_chuan','table-name' = 'FLINKUSER."j_COPY"')
;INSERT INTO `sink_FLINKUSER_j_COPY`
SELECT CAST(`source_pipeline2_ccc_test1`.`age` AS STRING) AS `AGE`,CAST(`source_pipeline2_ccc_test1`.`id` AS BIGINT) AS `ID`,CAST(`source_pipeline2_ccc_test1`.`name` AS STRING) AS `NAME`
FROM `source_pipeline2_test1`
;
FLink-paimon
CREATE CATALOG `paimon_catalog` WITH ('type'='paimon','metastore' = 'hive','uri' = 'thrift://master2:port'
);use catalog paimon_catalog;
create table fctest.j_test_changelog(
`id` int,
`age` int,
`name` string,
`school` int not null,
`otl_biz_time` string COMMENT '离线同步时间',
`rt_biz_time` string COMMENT '实时同步时间',
`deleted` int COMMENT '数据删除状态',
`deleted_time` timestamp COMMENT '数据删除时间',
`pt_admdvs` string COMMENT '分区_医保区划代码',
PRIMARY KEY(id,pt_admdvs) NOT ENFORCED
)PARTITIONED BY (pt_admdvs) WITH ('bucket' = '-1','file.format' = 'parquet'); create table fctest.j_test_append(
`id` int,
`age` int,
`name` string,
`school` int not null,
`otl_biz_time` string COMMENT '离线同步时间',
`rt_biz_time` string COMMENT '实时同步时间',
`deleted` int COMMENT '数据删除状态',
`deleted_time` timestamp COMMENT '数据删除时间',
`pt_biz_date` string COMMENT '分区_业务日期',
`pt_admdvs` string COMMENT '分区_医保区划代码',
`op` string
)PARTITIONED BY (pt_biz_date,pt_admdvs) WITH ('bucket' = '-1','row.kind.column' = 'op','file.format' = 'parquet');