大家在学习Hadoop的MapReduce的时候,90%的第一个程序都是WordCount,所以在这里分享一下我的第二个MapReduce程序。对于学习编程语言的人来说,有时候代码是最好的沟通方式之一。

 1 package com.zhongxin.mr;
 2 
 3 import org.apache.hadoop.conf.Configuration;
 4 import org.apache.hadoop.fs.Path;
 5 import org.apache.hadoop.io.LongWritable;
 6 import org.apache.hadoop.io.Text;
 7 import org.apache.hadoop.mapreduce.Job;
 8 import org.apache.hadoop.mapreduce.Mapper;
 9 import org.apache.hadoop.mapreduce.Reducer;
10 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
11 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
12 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
13 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
14 
15 import java.io.IOException;
16 import java.math.BigDecimal;
17 import java.util.regex.Pattern;
18 
19 /**
20  * 用户已收本息
21  * Created by DingYS on 2017/11/21.
22  */
23 public class UserReceiveAmount {
24 
25     public static class Map extends Mapper<LongWritable,Text,Text,Text>{
26         private Text outKey = new Text();
27         private Text outValue = new Text();
28         private Pattern pattern = Pattern.compile(",");
29 
30 
31         @Override
32         public void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException{
33             // 利息
34             BigDecimal interest = new BigDecimal(0);
35             // 本金
36             BigDecimal capital = new BigDecimal(0);
37             String splits[] = pattern.split(String.valueOf(value));
38             String onwerType = splits[2];
39             String fundsDirection = splits[6];
40             String tradeType = splits[5];
41             String penaltyAmount = splits[15];
42             String tradeAmount = splits[7];
43             String tradeShare = splits[8];
44             String ownerCustomNo = splits[1];
45             if("USER".equals(onwerType) && "INCR".equals(fundsDirection) && !Pattern.matches("CURRENT_.*?",tradeType)){
46                 if("INTEREST".equals(tradeType) && ("null".equals(penaltyAmount) || "".equals(penaltyAmount) ||"0.00".equals(penaltyAmount))){
47                     interest =new BigDecimal(Double.parseDouble(tradeAmount)).setScale(2,BigDecimal.ROUND_HALF_UP);
48                 }else{
49                         interest = new BigDecimal(Double.parseDouble(tradeAmount)).subtract(new BigDecimal(Double.parseDouble(tradeShare))).setScale(2,BigDecimal.ROUND_HALF_UP);
50                         capital = new BigDecimal(Double.parseDouble(tradeShare)).setScale(2,BigDecimal.ROUND_HALF_UP);
51                 }
52                 outKey.set(ownerCustomNo);
53                 outValue.set(String.valueOf(interest) + pattern + String.valueOf(capital));
54                 context.write(outKey,outValue);
55             }
56         }
57     }
58 
59     public static class Reduce extends Reducer<Text,Text,Text,Text>{
60 
61         public void reduce(Text key,Iterable<Text> values,Context context) throws IOException,InterruptedException{
62             Text outValue = new Text();
63             BigDecimal interest = new BigDecimal(0);
64             BigDecimal capital = new BigDecimal(0);
65             for(Text value:values){
66                 String[] splits = value.toString().split(",");
67                 interest = interest.add(new BigDecimal(Double.parseDouble(splits[0]))).setScale(2,BigDecimal.ROUND_HALF_UP);
68                 capital = capital.add(new BigDecimal(Double.parseDouble(splits[1]))).setScale(2,BigDecimal.ROUND_HALF_UP);
69             }
70             outValue.set(String.valueOf(interest) + "\t" + String.valueOf(capital));
71             context.write(key,outValue);
72         }
73     }
74 
75     public static void main(String[] args) throws Exception{
76         Configuration config = new Configuration();
77         Job job = Job.getInstance(config);
78         job.setJobName("userReceiveAmount");
79         job.setJarByClass(UserReceiveAmount.class);
80 
81         job.setOutputKeyClass(Text.class);
82         job.setOutputValueClass(Text.class);
83 
84         job.setMapperClass(Map.class);
85         job.setReducerClass(Reduce.class);
86 
87         job.setInputFormatClass(TextInputFormat.class);
88         job.setOutputFormatClass(TextOutputFormat.class);
89 
90         FileInputFormat.addInputPath(job,new Path(args[0]));
91         FileOutputFormat.setOutputPath(job,new Path(args[1]));
92 
93         job.waitForCompletion(true);
94 
95     }
96 }

对于看懂mapReduce这个程序,有一个非常关键的点就是:map每次读取一行数据,相同key的数据进入到一个reduce中。

版权声明:本文为Smilence1024原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:http://www.cnblogs.com/Smilence1024/p/7884679.html