您的位置:首页 > 文旅 > 美景 > 网站架构优化_黄页堆广_网站seo的优化怎么做_肇庆seo按天收费

网站架构优化_黄页堆广_网站seo的优化怎么做_肇庆seo按天收费

2025/11/7 2:48:20 来源:https://blog.csdn.net/z_ran/article/details/147333750  浏览:    关键词:网站架构优化_黄页堆广_网站seo的优化怎么做_肇庆seo按天收费
网站架构优化_黄页堆广_网站seo的优化怎么做_肇庆seo按天收费

FLinkSQL-Kafka

CREATE TABLE `source_aaa_mysql_kafka_test` (`last_name` string,`id` int,`hire_date` date,`department` string,`salary` float,`is_manager` int,`first_name` string,`email` string
)
WITH    ('connector' = 'kafka','value.format' = 'json','datasource.name' = 'Kafka','topic' = 'aaa_mysql_kafka_test','properties.group.id' = 'kafka_mysql_test','scan.startup.mode' = 'earliest-offset')
;CREATE TABLE `sink_zyztest_employees_0902` (`department` VARCHAR(50),`email` VARCHAR(100),`is_manager` INT,`hire_date` TIMESTAMP,`first_name` VARCHAR(50),`id` INT,`last_name` VARCHAR(50),`salary` DECIMAL(10, 2),PRIMARY KEY (id) NOT ENFORCED
)
WITH    ('connector' = 'jdbc','datasource.name' = 'mysql5','table-name' = 'employees_0902','url' = 'jdbc:mysql://[host:port]/zyztest?characterEncoding=UTF-8')
;INSERT INTO `sink_zyztest_employees_0902`
SELECT  CAST(`department` AS VARCHAR(50)) AS `department`,CAST(`email` AS VARCHAR(100)) AS `email`,`is_manager` AS `is_manager`,CAST(`hire_date` AS TIMESTAMP) AS `hire_date`,CAST(`first_name` AS VARCHAR(50)) AS `first_name`,`id` AS `id`,CAST(`last_name` AS VARCHAR(50)) AS `last_name`,CAST(`salary` AS DECIMAL(10, 2)) AS `salary`
FROM    `source_aaa_mysql_kafka_test`
;

FLink-kafka-Hudi

CREATE CATALOG `hudi_catalog`
WITH    ('type' = 'hudi','datasource.name' = 'Hive_Cyber_Engine','mode' = 'hms');-- create database if not EXISTS hudi_catalog.default;
CREATE TABLE IF NOT EXISTS hudi_catalog.`default`.studentinfo11(uuid INT PRIMARY KEY NOT ENFORCED,name VARCHAR(10),age INT,ts TIMESTAMP(3),`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
'connector' = 'hudi',
'table.type' = 'COPY_ON_WRITE' 
);CREATE TABLE IF NOT EXISTS hudi_catalog.`default`.studentinfo22(uuid INT PRIMARY KEY NOT ENFORCED,name VARCHAR(10),age INT,ts TIMESTAMP(3),`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
'connector' = 'hudi',
'table.type' = 'COPY_ON_WRITE' 
);
INSERT INTO hudi_catalog.`default`.studentinfo11 VALUES(1,'Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),(2,'Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),(3,'Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),(4,'Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),(5,'Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),(6,'Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),(7,'Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),(8,'Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');INSERT INTO hudi_catalog.`default`.studentinfo22 VALUES(1,'Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),(2,'Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),(3,'Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),(4,'Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),(5,'Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),(6,'Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),(7,'Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),(8,'Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');

FLink-Mysql-Hudi 

CREATE TABLE `source_Kafka_1008_3` (`partition` string,`name` string,`uuid` int,`age` int,`ts` TIMESTAMP
)
WITH    ('connector' = 'kafka','value.format' = 'json','datasource.name' = 'kafka32','topic' = 'Kafka_1008_3','scan.startup.mode' = 'earliest-offset')
;CREATE CATALOG `sink_hudi_catalog`
WITH    ('type' = 'hudi','datasource.name' = 'Hive_Cluster_512','mode' = 'hms')
;INSERT INTO `sink_hudi_catalog`.`default`.`studentinfo22` (`name`, `partition`, `uuid`, `age`, `ts`)
SELECT  `name` AS `name`,`partition` AS `partition`,`uuid` AS `uuid`,`age` AS `age`,`ts` AS `ts`
FROM    `source_Kafka_1008_3`
;

FLink-Mysql-Kafaka

CREATE TABLE `source_fctest_test` (uuid INT PRIMARY KEY NOT ENFORCED,name VARCHAR(10),age INT,ts TIMESTAMP(3),`partition` VARCHAR(20)
)
WITH    ('connector' = 'mysql-cdc','database-name' = 'fctest','table-name' = 'studentinfo22','datasource.name' = 'Mysql_3306','jdbc.properties.useSSL' = 'false','jdbc.properties.characterEncoding' = 'UTF-8','jdbc.properties.serverTimezone' = 'Asia/Shanghai','debezium.database.serverTimezone' = 'Asia/Shanghai','debezium.database.connectionTimeZone' = 'Asia/Shanghai','scan.startup.mode' = 'initial','server-id' = '815111232-814446237')
;CREATE TABLE `sink_Kafka_1008_2` (uuid INT PRIMARY KEY NOT ENFORCED,name VARCHAR(10),age INT,ts TIMESTAMP(3),`partition` VARCHAR(20)
)
WITH    ('connector' = 'upsert-kafka','value.format' = 'json','datasource.name' = 'kafka32','topic' = 'Kafka_1008_4')
;INSERT INTO `sink_Kafka_1008_2` SELECT *
FROM    `source_fctest_test`
;

FLink-application-k8s

CREATE TABLE Orders (order_number BIGINT,price        DECIMAL(32,2),buyer        ROW<first_name STRING, last_name STRING>,order_time   TIMESTAMP(3)
) WITH ('connector' = 'datagen'
);
CREATE TABLE Orders1 (order_number BIGINT,price        DECIMAL(32,2),buyer        ROW<first_name STRING, last_name STRING>,order_time   TIMESTAMP(3)
) WITH ('connector' = 'print'
);insert into Orders1 select * from Orders;

FLink-session-k8s

CREATE TABLE `source_pipeline2_test1` (`name` VARCHAR(255),`id` INT,`age` VARCHAR(11),PRIMARY KEY (id) NOT ENFORCED
)
WITH    ('jdbc.properties.useSSL' = 'false','jdbc.properties.characterEncoding' = 'UTF-8','scan.incremental.snapshot.enabled' = 'false','connector' = 'mysql-cdc','database-name' = 'pipeline2','datasource.name' = 'mysql0720','debezium.database.connectionTimeZone' = 'Asia/Shanghai','table-name' = 'ccc_test1','jdbc.properties.serverTimezone' = 'Asia/Shanghai','scan.startup.mode' = 'initial','debezium.database.serverTimezone' = 'Asia/Shanghai')
;
CREATE TABLE `sink_FLINKUSER_j_COPY` (`AGE` STRING,`ID` BIGINT,`NAME` STRING,PRIMARY KEY (ID) NOT ENFORCED
)
WITH    ('connector' = 'jdbc','datasource.name' = 'Oracle_chuan','table-name' = 'FLINKUSER."j_COPY"')
;INSERT INTO `sink_FLINKUSER_j_COPY`
SELECT  CAST(`source_pipeline2_ccc_test1`.`age` AS STRING) AS `AGE`,CAST(`source_pipeline2_ccc_test1`.`id` AS BIGINT) AS `ID`,CAST(`source_pipeline2_ccc_test1`.`name` AS STRING) AS `NAME`
FROM    `source_pipeline2_test1`
;
select * from `sink_FLINKUSER_j_COPY`;

FLink-application-yarn

CREATE TABLE `source_pipeline2_test1` (`name` VARCHAR(255),`id` INT,`age` VARCHAR(11),PRIMARY KEY (id) NOT ENFORCED
)
WITH    ('jdbc.properties.useSSL' = 'false','jdbc.properties.characterEncoding' = 'UTF-8','scan.incremental.snapshot.enabled' = 'false','connector' = 'mysql-cdc','database-name' = 'pipeline2','datasource.name' = 'mysql0720','debezium.database.connectionTimeZone' = 'Asia/Shanghai','table-name' = 'ccc_test1','jdbc.properties.serverTimezone' = 'Asia/Shanghai','scan.startup.mode' = 'initial','debezium.database.serverTimezone' = 'Asia/Shanghai')
;
CREATE TABLE `sink_FLINKUSER_j_COPY` (`AGE` STRING,`ID` BIGINT,`NAME` STRING,PRIMARY KEY (ID) NOT ENFORCED
)
WITH    ('connector' = 'jdbc','datasource.name' = 'Oracle_chuan','table-name' = 'FLINKUSER."j_COPY"')
;INSERT INTO `sink_FLINKUSER_j_COPY`
SELECT  CAST(`source_pipeline2_ccc_test1`.`age` AS STRING) AS `AGE`,CAST(`source_pipeline2_ccc_test1`.`id` AS BIGINT) AS `ID`,CAST(`source_pipeline2_ccc_test1`.`name` AS STRING) AS `NAME`
FROM    `source_pipeline2_test1`
;

FLink-paimon

CREATE CATALOG `paimon_catalog` WITH ('type'='paimon','metastore' = 'hive','uri' = 'thrift://master2:port'
);use catalog paimon_catalog;
create table fctest.j_test_changelog(
`id` int,
`age` int,
`name` string,
`school` int not null,
`otl_biz_time` string COMMENT '离线同步时间',
`rt_biz_time` string COMMENT '实时同步时间',
`deleted` int COMMENT '数据删除状态',
`deleted_time` timestamp COMMENT '数据删除时间',
`pt_admdvs` string COMMENT '分区_医保区划代码',
PRIMARY KEY(id,pt_admdvs) NOT ENFORCED 
)PARTITIONED BY (pt_admdvs) WITH ('bucket' = '-1','file.format' = 'parquet'); create table fctest.j_test_append(
`id` int,
`age` int,
`name` string,
`school` int not null,
`otl_biz_time` string COMMENT '离线同步时间',
`rt_biz_time` string COMMENT '实时同步时间',
`deleted` int COMMENT '数据删除状态',
`deleted_time` timestamp COMMENT '数据删除时间',
`pt_biz_date` string COMMENT '分区_业务日期',
`pt_admdvs` string COMMENT '分区_医保区划代码',
`op` string 
)PARTITIONED BY (pt_biz_date,pt_admdvs) WITH ('bucket' = '-1','row.kind.column' = 'op','file.format' = 'parquet'); 

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com