您的位置:首页 > 新闻 > 热点要闻 > MapReduce基础编程项目实践

MapReduce基础编程项目实践

2025/7/1 20:01:53 来源:https://blog.csdn.net/weixin_66158110/article/details/139510818  浏览:    关键词:MapReduce基础编程项目实践

一、项目实现效果概述

在accounts.txt文件中存储如下,第一列为金额大小,第二列表示收入、支出(0表示收入,1表示支出),第三列表示金额出入的月份。我们要通过MapReduce计算每个月过去后的结余,并根据月份大小进行分区,1-3月为1分区,4-6月为2分区,7-9月为3分区,10-12月为4分区

accounts.txt文件内容如下:

123.45,1,1  
56.78,0,2  
89.12,1,3  
45.67,0,4  
34.56,1,5  
78.90,0,6  
67.89,1,7  
23.45,0,8  
98.76,1,9  
12.34,0,10  
56.78,1,11  
43.21,0,12  
87.65,1,1  
34.56,0,2  
76.54,1,3  
65.43,0,4  
54.32,1,5  
43.21,0,6  
32.10,1,7  
21.98,0,8  
10.98,1,9  
98.76,0,10  
76.54,1,11  
65.43,0,12
68.23,1,7  
34.56,0,10  
98.76,1,5  
23.45,0,1  
56.78,1,9  
78.90,0,12  
45.67,1,6  
89.12,0,4  
12.34,1,3  
34.56,0,11  
27.89,1,8  
65.43,0,2  
76.54,1,1  
98.76,0,7  
43.21,1,10  
56.78,0,5  
34.56,1,12  
23.45,0,6  
89.12,1,4  
67.89,0,3  
15.67,1,9  
45.32,0,1  
78.90,1,11  
23.45,0,8  
56.78,1,2  
98.76,0,10  
34.56,1,7  
67.89,0,5  
45.67,1,12  
89.12,0,1  
32.10,1,6  
76.54,0,9  
43.21,1,4  
56.78,0,8  
23.45,1,3  
98.76,0,11  
67.89,1,2  
34.56,0,7  
12.34,1,10  
56.78,0,1  
78.90,1,5  
45.67,0,12  
89.12,1,8  
23.45,0,4  
67.89,1,11  
34.56,0,10  
12.34,1,9  
56.78,0,6  
98.76,1,7  
34.56,0,3  

二、代码部分

1、AccountBean编写
package org.example.maperduce.model;import org.apache.hadoop.io.Writable;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;public class AccountBean implements Writable {//金额大小private Float spend;//表示类型private Integer type;//支出月份private Integer month;@Overridepublic String toString() {return spend+" "+month;}public AccountBean() {}public AccountBean(Float spend, Integer type, Integer month) {this.spend = spend;this.type = type;this.month = month;}//重写序列化方法@Overridepublic void write(DataOutput out) throws IOException {out.writeFloat(spend);out.writeInt(type);out.writeInt(month);}//重写反序列化方法@Overridepublic void readFields(DataInput in) throws IOException {this.spend=in.readFloat();this.type=in.readInt();this.month= in.readInt();}public Integer getMonth() {return month;}public void setMonth(Integer month) {this.month = month;}public Float getSpend() {return spend;}public void setSpend(Float spend) {this.spend = spend;}public Integer getType() {return type;}public void setType(Integer type) {this.type = type;}
}
2、AccountMapper编写
package org.example.maperduce.account;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.example.maperduce.model.AccountBean;import java.io.IOException;public class AccountMapper extends Mapper<LongWritable,Text, IntWritable, AccountBean> {//新建AccountBean对象,作为输出的valueprivate AccountBean valueOut=new AccountBean();//新建IntWritable作为输出的keyprivate IntWritable keyOut=new IntWritable();@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//获取一行数据String line=value.toString();//先对数据进行去空格处理,再根据分隔符进行拆分String[] accountData=line.split(",");//根据下标提取数据String spend=accountData[0];String type=accountData[1];String month=accountData[2];//System.out.println(spend);//为对象赋值valueOut.setSpend(Float.parseFloat(spend.trim()));valueOut.setType(Integer.parseInt(type.trim()));valueOut.setMonth(Integer.parseInt(month.trim()));//为输出的key赋值keyOut.set(Integer.parseInt(month.trim()));//System.out.println("keyOut:"+keyOut);//System.out.println("valueOut:"+valueOut.toString());//map阶段输出context.write(keyOut,valueOut);}}
3、 AccountReducer编写
package org.example.maperduce.account;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;
import org.example.maperduce.model.AccountBean;import java.io.IOException;public class AccountReducer extends Reducer<IntWritable, AccountBean, IntWritable, AccountBean> {//创建一个AccountBean对象作为输出的valueprivate AccountBean valueOut=new AccountBean();@Overrideprotected void reduce(IntWritable key, Iterable<AccountBean> values,Context context) throws IOException, InterruptedException {//定义一个月的结余Float totalSumSpend=0f;Integer month=0;//累加计算总花费for(AccountBean accountBean:values){Float Spend=accountBean.getSpend();Integer type=accountBean.getType();month=accountBean.getMonth();if(type==0){totalSumSpend+=Spend;}else {totalSumSpend-=Spend;}}//为输出的value赋值valueOut.setSpend(totalSumSpend);valueOut.setMonth(month);//System.out.println("reducer:"+valueOut.toString());//reduce阶段输出context.write(key,valueOut);}
}
4、SpendPartitioner编写
package org.example.maperduce.account;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;
import org.example.maperduce.model.AccountBean;public class SpendPartitioner extends Partitioner<IntWritable, AccountBean> {@Overridepublic int getPartition(IntWritable intWritable, AccountBean accountBean, int i) {//获取花销int month=accountBean.getMonth();//定义分区号int partitionNum=0;if(month<4){partitionNum=0;}else if(month<7) {partitionNum=1;}else if(month<10){partitionNum=2;}else {partitionNum=3;}return partitionNum;}}
5、AccountDriver编写
package org.example.maperduce.account;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.example.maperduce.model.AccountBean;import java.io.IOException;public class AccountDriver {public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {//1、获取配置信息对象和job对象Configuration conf=new Configuration();Job job=Job.getInstance(conf);//2、关联Driver类job.setJarByClass(AccountDriver.class);//3、设置Mapper和Reduce的类job.setMapperClass(AccountMapper.class);job.setReducerClass(AccountReducer.class);//4、设置Mapper输出的kv类型job.setMapOutputKeyClass(IntWritable.class);job.setMapOutputValueClass(AccountBean.class);//5、设置最终输出的kv类型(Reduce输出的kv类型)job.setOutputKeyClass(IntWritable.class);job.setOutputValueClass(AccountBean.class);//6、设置文件的输入路径和计算结果的输出路径FileInputFormat.setInputPaths(job,new Path(args[0]));FileOutputFormat.setOutputPath(job,new Path(args[1]));//7、设置采用自定义分区job.setPartitionerClass(SpendPartitioner.class);//设置Reduce Task的个数job.setNumReduceTasks(4);//8、提交任务进行计算boolean result=job.waitForCompletion(true);System.out.println(result?"计算成功":"计算失败");}
}
6、pom.xml文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>hdfs_api</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.11</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common --><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>3.1.3</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs --><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs</artifactId><version>3.1.3</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client --><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.1.3</version></dependency></dependencies><build><plugins><plugin><artifactId>maven-compiler-plugin</artifactId><version>3.6.1</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><artifactId>maven-assembly-plugin</artifactId><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build></project>

三、运行配置

(1)选择Edit Configurations

(2)点击“+”号选择Application

(3)如图输入信息,输入好后先点Apply再点OK

四、运行结果

注意运行前需保证output文件夹在对应目录下不存在

1、在idea上运行

(1)控制台输出结果

(2)output文件夹结果

在对应目录下可看见/output目录生成,output文件夹中内容如下:

点击进去即可查看结果

2、在集群上运行

可参考另一篇博客内容:打包idea代码至集群上运行-CSDN博客

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com