您的位置:首页 > 财经 > 产业 > 无代码网站_网站布局设计排版_关键词seo服务_nba西部最新排名

无代码网站_网站布局设计排版_关键词seo服务_nba西部最新排名

2025/5/12 13:51:50 来源:https://blog.csdn.net/qq_41519442/article/details/143443537  浏览:    关键词:无代码网站_网站布局设计排版_关键词seo服务_nba西部最新排名
无代码网站_网站布局设计排版_关键词seo服务_nba西部最新排名

工作中遇到这样的一个业务,业务方给的是一个视图,查了一下文档视图不能监听,这个时候想着要不要用datastream去自定义,然后发现flinksql也是可以实现
创建对应数据库和表

-- 创建班级表 tb_class
CREATE TABLE tb_class (class_id INT PRIMARY KEY AUTO_INCREMENT,  -- 主键class_name VARCHAR(50) NOT NULL           -- 班级名称
);-- 创建学生表 tb_student
CREATE TABLE tb_student (student_id INT PRIMARY KEY AUTO_INCREMENT,  -- 主键student_name VARCHAR(50) NOT NULL,          -- 姓名class_id INT,                               -- 班级IDFOREIGN KEY (class_id) REFERENCES tb_class(class_id)  -- 外键关联到 tb_class
);
-- 向 tb_class 表中插入数据
INSERT INTO tb_class (class_name) VALUES ('Class A');
INSERT INTO tb_class (class_name) VALUES ('Class B');
INSERT INTO tb_class (class_name) VALUES ('Class C');-- 向 tb_student 表中插入数据
INSERT INTO tb_student (student_name, class_id) VALUES ('Alice', 1);
INSERT INTO tb_student (student_name, class_id) VALUES ('Bob', 2);
INSERT INTO tb_student (student_name, class_id) VALUES ('Charlie', 3);
INSERT INTO tb_student (student_name, class_id) VALUES ('Diana', 1);-- 创建视图 tb_student_view
CREATE VIEW tb_student_view AS
SELECT s.student_id AS student_id,       -- 主键s.student_name AS student_name,   -- 学生姓名c.class_name AS class_name        -- 班级名称
FROM tb_student s
JOIN tb_class c ON s.class_id = c.class_id;select * from tb_student_view;-- 创建 ads_student 表
CREATE TABLE ads_student (student_id INT PRIMARY KEY AUTO_INCREMENT,  -- 主键student_name VARCHAR(50) NOT NULL,          -- 学生姓名class_name VARCHAR(50) NOT NULL             -- 班级名称
);

编写对应的flinksql代码,这里没有flinksql客户端,只能在idea上完成了

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class FlinkSQLJoinExample {public static void main(String[] args) throws Exception {// 设置流执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 创建 Table 环境EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);env.setParallelism(2);  // 将并行度设置为2,根据需要调整// 定义 tb_student 表tableEnv.executeSql("CREATE TABLE tb_student (" +"   student_id INT, " +"   student_name STRING, " +"   class_id INT ," +"   PRIMARY KEY (student_id) NOT ENFORCED" +") WITH (" +"   'connector' = 'mysql-cdc', " +  // 使用 datagen 连接器模拟数据,开发测试用"    'hostname' = 'localhost'," +"  'port' = '3307', " +"   'username' = 'root', " +"   'password' = '123456'," +"    'database-name' = 't2', " +"    'scan.incremental.snapshot.enabled' = 'false', " +"   'table-name' = 'tb_student' " +")");// 定义 tb_class 表tableEnv.executeSql("CREATE TABLE tb_class (" +"   class_id INT, " +"   class_name STRING, " +"   PRIMARY KEY (class_id) NOT ENFORCED" +") WITH (" +"  'connector' = 'mysql-cdc', " +  // 使用 datagen 连接器模拟数据,开发测试用"    'hostname' = 'localhost'," +"  'port' = '3307', " +"   'username' = 'root', " +"   'password' = '123456'," +"    'database-name' = 't2', " +"    'scan.incremental.snapshot.enabled' = 'false', " +"   'table-name' = 'tb_class' " +")");// 定义 ads_student 表,使用 jdbc 连接器作为目标表tableEnv.executeSql("CREATE TABLE ads_student (" +"   student_id INT, " +"   student_name STRING, " +"   class_name STRING, " +"   PRIMARY KEY (student_id) NOT ENFORCED" +") WITH (" +"   'connector' = 'jdbc', " +"   'url' = 'jdbc:mysql://localhost:3307/t2', " +"   'table-name' = 'ads_student', " +"   'username' = 'root', " +"   'password' = '123456' " +")");// 执行 JOIN 查询并插入到 ads_student 表tableEnv.executeSql("INSERT INTO ads_student " +"SELECT s.student_id, s.student_name, c.class_name " +"FROM tb_student AS s " +"JOIN tb_class AS c ON s.class_id = c.class_id");// 启动任务env.execute("Flink SQL Join Example");}
}

注意要加上额外的pom依赖

# sink 使用的是jdbc连接的方式
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>3.2.0-1.19</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-base -->
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>1.15.0</version>
</dependency>

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com