您的位置:首页 > 教育 > 培训 > 吉林省吉林市天气预报_从零开始学习网络营销_广州seo托管_青岛seo网站推广

吉林省吉林市天气预报_从零开始学习网络营销_广州seo托管_青岛seo网站推广

2025/5/4 8:43:14 来源:https://blog.csdn.net/weixin_64726356/article/details/144131155  浏览:    关键词:吉林省吉林市天气预报_从零开始学习网络营销_广州seo托管_青岛seo网站推广
吉林省吉林市天气预报_从零开始学习网络营销_广州seo托管_青岛seo网站推广

目录

一、维表介绍

二、预加载维表

(一)实现方式

(二)优缺点

(三)改进尝试与局限

三、使用本地缓存(HashMap)加载维表

(一)实现方式

第一版

第二版

(二)优缺点

四、热存储维表

(一)常规方式问题剖析

(二)使用 cache 减轻访问压力(以 Guava Cache 为例)

(三)Guava Cache 优势与场景适配

五、总结


        在 Flink 大数据处理框架的实际应用场景里,常常会碰到这样的需求:进入 Flink 的实时数据,需要关联存储在外部设备(像 MySQL、HBase 等)中的数据(也就是维表),以此来得出完整准确的计算结果。本文将深入探讨 Flink 中维表 Join 的多种实现方式及其优缺点,助力大家在不同业务场景下做出合理抉择。

一、维表介绍

所谓的维表Join: 进入Flink的数据,需要关联另外一些存储设备的数据,才能计算出来结果,那么存储在外部设备上的表称之为维表,可能存储在mysql也可能存储在hbase 等。维表一般的特点是变化比较慢。

需求:kafka输入的数据格式: 姓名,城市编号 例如 zhangsan,1001。

期望输出的数据: 姓名,城市编号,城市名称 例如 zhangsan,1001,北京

在MySQL创建城市表:

create table city(city_id varchar(50) primary key,city_name varchar(50) 
);
insert into city values('1001','北京'),('1002','上海'),('1003','郑州') ;

二、预加载维表

(一)实现方式

        借助定义一个类实现 RichMapFunction,在其 open()方法里读取维表数据加载至内存,后续在 kafka 流 map()方法中和维表数据关联。以代码示例来说

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.HashMap;
import java.util.Map;public class _04PreLoadDataDemo {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据DataStreamSource<String> dataStreamSource = env.socketTextStream("localhost", 9999);//3. transformation-数据处理转换dataStreamSource.map(new RichMapFunction<String, Tuple3<String,Integer,String>>() {Map<Integer,String> cityMap = new HashMap<Integer,String>();Connection connection;PreparedStatement statement;@Overridepublic void open(Configuration parameters) throws Exception {// 将mysql的数据加载到map中connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/test1","root","123456");statement = connection.prepareStatement("select * from city");ResultSet resultSet = statement.executeQuery();while(resultSet.next()){int cityId =  resultSet.getInt("city_id");String cityName =  resultSet.getString("city_name");cityMap.put(cityId,cityName);}}@Overridepublic void close() throws Exception {statement.close();connection.close();}// zhangsan,1001@Overridepublic Tuple3<String, Integer, String> map(String s) throws Exception {String[] arr = s.split(",");System.out.println("+++++++++++++++" +cityMap);String cityName = cityMap.get(Integer.valueOf(arr[1]));return Tuple3.of(arr[0],Integer.valueOf(arr[1]),cityName);}}).print();//4. sink-数据输出//5. execute-执行env.execute();}
}

测试

在黑窗口输入:
张三,1001
李四,1001
王五,1002

_04PreLoadDataDemo类中:

  1. 环境准备与数据源加载:先通过StreamExecutionEnvironment.getExecutionEnvironment()获取执行环境,设置运行模式为AUTOMATIC,再利用env.socketTextStream("localhost", 9999)从本地端口读取数据作为数据源(这里只是示例,实际可替换为 kafka 等数据源)。
  2. 维表数据加载到内存:在open方法里,用DriverManager.getConnection连接本地 MySQL 数据库,执行查询语句select * from city获取维表数据,将城市编号与城市名称以cityMap.put(cityId,cityName)形式存入HashMap
  3. 数据关联与输出map方法里拆分输入数据(格式如“zhangsan,1001”),依据城市编号从cityMap获取城市名称,组合成期望输出格式(“zhangsan,1001,北京”)并打印输出。

(二)优缺点

优点

        实现逻辑较为简易,上手快,对于简单场景和开发初期能快速搭建功能。

缺点

        数据存于内存,仅适合数据量小且维表更新频率低场景。虽说可在open中设定时器定时更新维表,可仍难避免更新不及时状况,毕竟维表只是变化慢并非一成不变。

(三)改进尝试与局限

那如果数据多了怎么办,数据更新了怎么办

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.HashMap;
import java.util.Map;public class _05SelectDBDemo {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据DataStreamSource<String> dataStreamSource = env.socketTextStream("localhost", 9999);//3. transformation-数据处理转换dataStreamSource.map(new RichMapFunction<String, Tuple3<String,Integer,String>>() {Connection connection;PreparedStatement statement;@Overridepublic void open(Configuration parameters) throws Exception {// 将mysql的数据加载到map中connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/test1","root","123456");statement = connection.prepareStatement("select city_name from city where city_id = ? ");}@Overridepublic void close() throws Exception {statement.close();connection.close();}// zhangsan,1001@Overridepublic Tuple3<String, Integer, String> map(String s) throws Exception {String[] arr = s.split(",");statement.setInt(1,Integer.valueOf(arr[1]));ResultSet resultSet = statement.executeQuery();String cityName = null;if(resultSet.next()){cityName = resultSet.getString("city_name");}return Tuple3.of(arr[0],Integer.valueOf(arr[1]),cityName);}}).print();//4. sink-数据输出//5. execute-执行env.execute();}
}

 

         _05SelectDBDemo类做出改进,每次map操作都查询数据库获取最新维表数据,优点是能及时获取更新后数据,可一旦数据量庞大(像 kafka 高频大量输入数据),频繁查询数据库会致使效率低下,性能损耗严重。

三、使用本地缓存(HashMap)加载维表

(一)实现方式

 以上两个版本使用的是socket进行演示的,以下是使用kafka演示的,不太一样。

第一版

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.Properties;public class Demo03 {/*** 需求:kafka输入的数据格式: 姓名,城市编号 例如 zhangsan,1001。*  期望输出的数据: 姓名,城市编号,城市名称 例如 zhangsan,1001,北京**  每一次都从数据库中查询一下:*  优点是:假如数据库中的数据有更新,每次都可以查询到最新的数据*  缺点是:每次都查询数据库,假如kafka中的数据特别多,就会查询数据库多次,效率低*/public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);Properties properties = new Properties();properties.setProperty("bootstrap.servers","bigdata01:9092");properties.setProperty("group.id","g4");//2. source-加载数据FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<String>("kafka-01",new SimpleStringSchema(),properties);//3. transformation-数据处理转换DataStreamSource<String> streamSource = env.addSource(kafkaConsumer);streamSource.map(new RichMapFunction<String, String>() {Connection connection= null;PreparedStatement statement =null;@Overridepublic void open(Configuration parameters) throws Exception {connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/kettle", "root", "root");statement = connection.prepareStatement("select * from city where city_id = ? ");}@Overridepublic void close() throws Exception {if(statement != null){statement.close();}if(connection != null) {connection.close();}}@Overridepublic String map(String value) throws Exception {// zhangsan,1001String[] arr = value.split(",");String name = arr[0];String cityCode = arr[1];statement.setString(1,cityCode);ResultSet resultSet = statement.executeQuery();String cityName = "";if(resultSet.next()){cityName = resultSet.getString("city_name");}return name+","+cityCode+","+cityName;}}).print();//4. sink-数据输出//5. execute-执行env.execute();}
}

第二版

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;public class Demo04 {/*** 需求:kafka输入的数据格式: 姓名,城市编号 例如 zhangsan,1001。*  期望输出的数据: 姓名,城市编号,城市名称 例如 zhangsan,1001,北京**  使用hashmap*  将数据库中的数据只查询一次,放入map集合,map号称本地缓存*  优点:查询数据库只查询一次,每次数据都可以直接从map中获取,效率高*  缺点:假如数据库中的数据更新了,map缓存的数据是没有办法更新的,而且假如数据库中的数据特别多,全部加载到map中会导致堆内存爆炸 OOM*/public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);Properties properties = new Properties();properties.setProperty("bootstrap.servers","bigdata01:9092");properties.setProperty("group.id","g4");//2. source-加载数据FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<String>("kafka-01",new SimpleStringSchema(),properties);//3. transformation-数据处理转换DataStreamSource<String> streamSource = env.addSource(kafkaConsumer);streamSource.map(new RichMapFunction<String, String>() {Connection connection= null;PreparedStatement statement =null;Map<String,String> hashMap = new HashMap<String,String>();@Overridepublic void open(Configuration parameters) throws Exception {connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/kettle", "root", "root");statement = connection.prepareStatement("select * from city");ResultSet resultSet = statement.executeQuery();while(resultSet.next()){String cityCode = resultSet.getString("city_id");String cityName = resultSet.getString("city_name");hashMap.put(cityCode,cityName);}}@Overridepublic void close() throws Exception {if(statement != null){statement.close();}if(connection != null) {connection.close();}}@Overridepublic String map(String value) throws Exception {// zhangsan,1001String[] arr = value.split(",");String name = arr[0];String cityCode = arr[1];String cityName = hashMap.get(cityCode);return name+","+cityCode+","+cityName;}}).print();//4. sink-数据输出//5. execute-执行env.execute();}
}

(二)优缺点

优点

        仅查询数据库一次将数据存入HashMap,后续数据处理直接从本地缓存获取,效率颇高,减少数据库交互开销。

缺点

        数据库数据更新时,HashMap缓存无法同步更新,易造成数据不一致;且若维表数据海量,全加载进HashMap易引发堆内存溢出(OOM)问题。

四、热存储维表

(一)常规方式问题剖析

        以前的方式是将维表数据存储在Redis、HBase、MySQL等外部存储中,实时流在关联维表数据的时候实时去外部存储中查询,这种方式特点如下:

  • 优点:维度数据量不受内存限制,可以存储很大的数据量。
  • 缺点:因为维表数据在外部存储中,读取速度受制于外部存储的读取速度;另外维表的同步也有延迟。

(二)使用 cache 减轻访问压力(以 Guava Cache 为例)

示例代码

package com.bigdata.day06;import avro.shaded.com.google.common.cache.*;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;public class Demo05 {/*** 需求:kafka输入的数据格式: 姓名,城市编号 例如 zhangsan,1001。*  期望输出的数据: 姓名,城市编号,城市名称 例如 zhangsan,1001,北京**  第三个版本:使用guawaCache [google的技术]*/public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);env.setParallelism(1);Properties properties = new Properties();properties.setProperty("bootstrap.servers","bigdata01:9092");properties.setProperty("group.id","g4");//2. source-加载数据FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<String>("kafka-01",new SimpleStringSchema(),properties);//3. transformation-数据处理转换DataStreamSource<String> streamSource = env.addSource(kafkaConsumer);System.out.println(streamSource.getParallelism());streamSource.map(new RichMapFunction<String, String>() {Connection connection= null;PreparedStatement statement =null;// 定义一个CacheLoadingCache<String, String> cache;@Overridepublic void open(Configuration parameters) throws Exception {connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/kettle", "root", "root");statement = connection.prepareStatement("select * from city where city_id = ?");cache = CacheBuilder.newBuilder()//最多缓存个数,超过了就根据最近最少使用算法来移除缓存 LRU.maximumSize(1000)//在更新后的指定时间后就回收// 不会自动调用,而是当过期后,又用到了过期的key值数据才会触发的。.expireAfterWrite(100, TimeUnit.SECONDS)//指定移除通知/*.removalListener(new RemovalListener<Integer, String>() {@Overridepublic void onRemoval(RemovalNotification<Integer, String> removalNotification) {// 在这个需求中,我们没有要删除的数据,所以这个代码可以删除掉,没有意义,但是为了学习,保留了。System.out.println(removalNotification.getKey() + "被移除了,值为:" + removalNotification.getValue());}})*/.build(//指定加载缓存的逻辑new CacheLoader<String, String>() {// 假如缓存中没有数据,会触发该方法的执行,并将结果自动保存到缓存中@Overridepublic String load(String cityId) throws Exception {System.out.println("进入数据库查询啦。。。。。。。");statement.setString(1,cityId);ResultSet resultSet = statement.executeQuery();String cityName = null;if(resultSet.next()){System.out.println("进入到了if中.....");cityName = resultSet.getString("city_name");}return cityName;}});}@Overridepublic void close() throws Exception {if(statement != null){statement.close();}if(connection != null) {connection.close();}}@Overridepublic String map(String value) throws Exception {// zhangsan,1001String[] arr = value.split(",");String name = arr[0];String cityCode = arr[1];String cityName = cache.get(cityCode);return name+","+cityCode+","+cityName;}}).print();//4. sink-数据输出//5. execute-执行env.execute();}
}

_06GuavaCacheDemo类中:

        环境准备阶段,设置并行度为 1(方便查看缓存效果,因 Guava Cache 是各分区独立缓存不共享),加载本地端口数据作为源。

        open方法构建 Guava Cache,设置最大缓存个数(maximumSize(1000))、过期时间(expireAfterWrite(10, TimeUnit.SECONDS))及移除监听器(可跟踪缓存移除情况),并指定缓存加载逻辑,即缓存无数据时查询数据库获取维表数据并保存进缓存。

        map方法按输入数据城市编号从缓存获取城市名称组装输出,若缓存无对应数据则触发数据库查询填充缓存再获取。Demo05类是基于 kafka 数据源的类似实现,只是适配 kafka 消费配置,原理一致。

(三)Guava Cache 优势与场景适配

        Guava Cache 优势在于平衡内存使用与数据更新及时性,对于频繁访问且数据有一定时效性、更新不太频繁的维表数据场景,既能减少数据库查询次数提升性能,又能借助过期机制保障数据在一定时间后更新,避免数据太过陈旧。

五、总结

        Flink 维表 Join 多种实现各有利弊,预加载维表简单但受限内存与更新及时性;本地缓存高效却有更新与内存容量隐患;热存储维表灵活支持大数据量但有读写速度与同步延迟问题,引入 Guava Cache 等缓存机制可优化。实际应用要依据数据量、更新频率、实时性要求等业务特性权衡选择,精心设计维表 Join 方案,让 Flink 大数据处理流程更稳健高效。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com