Hadoop MapReduce执行过程详解 https://my.oschina.net/itblog/blog/275294
参考: http://www.cnblogs.com/yjmyzz/p/how-to-remote-debug-hadoop-with-eclipse-and-intellij-idea.html
1. 单词统计
A: 设定环境变量:
1 2 3 |
HADOOP_HOME=/home/pandy/hadoop-2.7.3 HADOOP_BIN_PATH=%HADOOP_HOME%\bin HADOOP_PREFIX=/home/pandy/hadoop-2.7.3 |
B: 程序源码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
package com.first; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class WordCount { public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } /** *传入地址 hdfs://192.168.0.31:8020/tmp/input/words_01.txt hdfs://192.168.0.31:8020/tmp/output/ * @param args * @throws Exception */ public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length < 2) { System.err.println("Usage: wordcount <in> [<in>...] <out>"); System.exit(2); } Job job = Job.getInstance(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); for (int i = 0; i < otherArgs.length - 1; ++i) { FileInputFormat.addInputPath(job, new Path(otherArgs[i])); } FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } } |
C: 执行配置: eclipse右键, run Configuration –> argments输入两行: 注意, 这两个是指向HDFS服务器, 参数是fs.
1 2 |
hdfs://192.168.0.31:8020/tmp/input/words_01.txt hdfs://192.168.0.31:8020/tmp/output/ |
D: run……
2. 分数求平均 http://snaile.blog.51cto.com/8061810/1564051
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 |
package com.pandy.hadoop.maprender;/** * Created by pandy on 16-12-8. */ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import java.io.IOException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.StringTokenizer; /** * 项目名称: workspace * 功能说明: * 创建者: Pandy, * 邮箱: panyongzheng@163.com, 1453261799@qq.com * 版权: * 官网: * 创建日期: 16-12-8. * 创建时间: 下午12:51. * 修改历史: * ----------------------------------------------- */ public class AvgSorceLocalTest { public static class MyMap extends Mapper<Object, Text, Text, IntWritable> { /** * 构造每个人的map, key=名字, value=分数 * @param key * @param value * @param context * @throws IOException * @throws InterruptedException */ public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); StringTokenizer tokenizer = new StringTokenizer(line); while (tokenizer.hasMoreElements()) { String strName = tokenizer.nextToken(); String strSorce = tokenizer.nextToken(); System.out.println(line); context.write(new Text(strName), new IntWritable(Integer.parseInt(strSorce))); } } } public static class MyReduce extends Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; int num = 0; for (IntWritable sorce : values) { sum += sorce.get(); num++; } System.out.println(key.toString()+" 总分: "+sum); context.write(key, new IntWritable((int) (sum / num))); } } /* 1. 传到服务器 scp /home/pandy/workspace/HadoopApp/AvgSorceLocalTest-china.txt root@192.168.0.31:/tmp/AvgSorceLocalTest-china.txt scp /home/pandy/workspace/HadoopApp/AvgSorceLocalTest-english.txt root@192.168.0.31:/tmp/AvgSorceLocalTest-english.txt scp /home/pandy/workspace/HadoopApp/AvgSorceLocalTest-math.txt root@192.168.0.31:/tmp/AvgSorceLocalTest-math.txt 2. 服务器传到hdfs hadoop fs -rm -f /tmp/input/AvgSorceLocalTest-china.txt hadoop fs -rm -f /tmp/input/AvgSorceLocalTest-english.txt hadoop fs -rm -f /tmp/input/AvgSorceLocalTest-math.txt hadoop fs -put /tmp/AvgSorceLocalTest-china.txt /tmp/input/AvgSorceLocalTest-china.txt hadoop fs -put /tmp/AvgSorceLocalTest-english.txt /tmp/input/AvgSorceLocalTest-english.txt hadoop fs -put /tmp/AvgSorceLocalTest-math.txt /tmp/input/AvgSorceLocalTest-math.txt 3. 查看服务器文件 hadoop fs -ls -R /tmp/input 4. 执行后查看结果 hadoop fs -ls -R /tmp/output hadoop fs -cat /tmp/output/AvgSorceLocalTest_20161208131412/part-r-00000 <---这个文件是上面查看,成功的那一个 */ public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); SimpleDateFormat format = new SimpleDateFormat("yyyyMMddHHmmss"); Job job = Job.getInstance(conf, "scan count"); job.setJarByClass(WordCountLocalTest.class); job.setMapperClass(MyMap.class); job.setCombinerClass(MyReduce.class); job.setReducerClass(MyReduce.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setOutputFormatClass(TextOutputFormat.class); //远程文件直接硬编码 FileInputFormat.addInputPath(job, new Path("hdfs://192.168.0.31:9000/tmp/input/AvgSorceLocalTest-china.txt")); FileInputFormat.addInputPath(job, new Path("hdfs://192.168.0.31:9000/tmp/input/AvgSorceLocalTest-english.txt")); FileInputFormat.addInputPath(job, new Path("hdfs://192.168.0.31:9000/tmp/input/AvgSorceLocalTest-math.txt")); FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.0.31:9000/tmp/output/AvgSorceLocalTest_" + format.format(new Date()))); System.exit(job.waitForCompletion(true) ? 0 : 1); } } |
3. 求每年最高最低气温 https://my.oschina.net/itblog/blog/275294
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 |
package com.pandy.hadoop.maprender;/** * Created by pandy on 16-12-8. */ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import java.io.IOException; import java.text.SimpleDateFormat; import java.util.Date; /** * 项目名称: workspace * 功能说明: * 创建者: Pandy, * 邮箱: panyongzheng@163.com, 1453261799@qq.com * 版权: * 官网: * 创建日期: 16-12-8. * 创建时间: 下午12:51. * 修改历史: * ----------------------------------------------- */ public class YearMaxTemperatureLocalTest { public static class MyMap extends Mapper<Object, Text, Text, IntWritable> { /** * 构造每个人的map, key=名字, value=分数 * * @param key * @param value * @param context * @throws IOException * @throws InterruptedException */ public void map(Object key, Text value, Context context) throws IOException, InterruptedException { // 打印样本: Before Mapper: 0, 2000010115 System.out.print("Before Mapper: " + key + ", " + value); String line = value.toString(); String year = line.substring(0, 4); int temperature = Integer.parseInt(line.substring(8)); //分解得到年, 温度 context.write(new Text(year), new IntWritable(temperature)); // 打印样本: After Mapper:2000, 15 System.out.println("======" + "After Mapper:" + new Text(year) + ", " + new IntWritable(temperature)); } } public static class MyReduce extends Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int maxValue = Integer.MIN_VALUE; int minValue = Integer.MAX_VALUE; StringBuffer sb = new StringBuffer(); //取values的最大值 for (IntWritable value : values) { maxValue = Math.max(maxValue, value.get()); minValue = Math.min(minValue, value.get()); sb.append(value).append(", "); } // 打印样本: Before Reduce: 2000, 15, 23, 99, 12, 22, System.out.print("Before Reduce: " + key + ", " + sb.toString()); //打印两次可能出错 //Text minKey = new Text(); //minKey.set(key+" MIN"); //context.write(minKey, new IntWritable(minValue)); //Text maxKey = new Text(); //maxKey.set(key+" MAX"); context.write(key, new IntWritable(maxValue)); // 打印样本: After Reduce: 2000, 99 System.out.println("======" + "After Reduce: " + key + ", " + maxValue); } } /* 1. 传到服务器 scp /home/pandy/workspace/HadoopApp/TemperatureTestData.txt root@192.168.0.31:/tmp/TemperatureTestData.txt 2. 服务器传到hdfs hadoop fs -rm -f /tmp/input/TemperatureTestData.txt hadoop fs -put /tmp/TemperatureTestData.txt /tmp/input/TemperatureTestData.txt 3. 查看服务器文件 hadoop fs -ls -R /tmp/input 4. 执行后查看结果 hadoop fs -ls -R /tmp/output hadoop fs -cat /tmp/output/YearMaxTemperatureLocalTest20161208151223/part-r-00000 <---这个文件是上面查看,成功的那一个 */ public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); SimpleDateFormat format = new SimpleDateFormat("yyyyMMddHHmmss"); Job job = Job.getInstance(conf, "scan count"); job.setJarByClass(WordCountLocalTest.class); job.setMapperClass(MyMap.class); job.setCombinerClass(MyReduce.class); job.setReducerClass(MyReduce.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setOutputFormatClass(TextOutputFormat.class); //远程文件直接硬编码 FileInputFormat.addInputPath(job, new Path("hdfs://192.168.0.31:9000/tmp/input/TemperatureTestData.txt")); FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.0.31:9000/tmp/output/YearMaxTemperatureLocalTest" + format.format(new Date()))); Date d1 = new Date(); System.exit(job.waitForCompletion(true) ? 0 : 1); Date d2 = new Date(); System.out.println("耗时:" + ((d2.getTime() - d1.getTime()) * 1.0 / 1000) + "秒"); } } |
4. 求Top N问题 http://blog.csdn.net/xiaojimanman/article/details/41117357, 多文件输出, 自定义文件或者文件夹输出
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 |
package com.pandy.hadoop.maprender;/** * Created by pandy on 16-12-12. */ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import java.io.IOException; import java.text.SimpleDateFormat; import java.util.Date; /** * 项目名称: workspace * 功能说明: * 1. 多输出 * 2. 区分是否及格并输出 * 3. 输出Top N的信息 * 创建者: Pandy, * 邮箱: panyongzheng@163.com, 1453261799@qq.com * 版权: * 官网: * 创建日期: 16-12-12. * 创建时间: 上午9:55. * 修改历史: * ----------------------------------------------- */ public class AvgTopLocalTest { /** * @Description: map函数,输出的结果为 “学生姓名 成绩” eg "zs 90" * @Author:lulei */ public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> { private MultipleOutputs mos; @Override protected void setup(Context context) throws IOException, InterruptedException { super.setup(context); mos = new MultipleOutputs(context); } @Override protected void cleanup(Context context) throws IOException, InterruptedException { super.cleanup(context); mos.close(); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { Line line = new Line(value.toString()); if (line.isRight()) { //输出给reduce context.write(line.getKey(), line.getValue()); //自定义区分是否及格, 分等级输出 if(line.getValue().get()<=40){ mos.write(line.getKey(), line.getValue(),"E"); }else if(line.getValue().get()<=60){ mos.write(line.getKey(), line.getValue(), "D"); }else if(line.getValue().get()<=70){ mos.write(line.getKey(), line.getValue(), "C"); }else if(line.getValue().get()<=80){ mos.write(line.getKey(), line.getValue(), "B"); }else{ mos.write(line.getKey(), line.getValue(), "A"); } } } } /** * @Description: reduce函数,计算avg & top N * @Author:lulei */ public static class Reduce extends Reducer<Text, IntWritable, Text, DoubleWritable> { private static double[] topN; private static int N = 1; // 多输出处理 private MultipleOutputs mos; @Override protected void setup(Context context) throws IOException, InterruptedException { try { N = Integer.parseInt(context.getConfiguration().get("N")); } catch (Exception e) { N = 1; } topN = new double[N]; mos = new MultipleOutputs(context); } /** * @param avg * @Author:lulei * @Description: 将avg插入到topN中 */ private void addTopN(double avg) { if (avg > topN[N - 1]) { int i = 0; for (i = 0; i < N && avg < topN[i]; i++) ; if (i < N) { for (int j = N - 1; j > i; j--) { topN[j] = topN[j - 1]; } topN[i] = avg; } } } /** * 这里可以根据值的不同, 创建不同的文件夹, 我直接创建另一个就好 * @param value * @return */ private String generateFileName(double value) { /*if (value >= 60d) { return "GT60/"; } else { return "LT60/"; }*/ return "TOPN"; } /** * @Author:lulei * @Description: 输出top N的数据 */ private void print() throws IOException, InterruptedException{ System.out.println("======================================"); for (double n : topN) { System.out.print(n); System.out.print("->"); mos.write(new Text("N_"+n), new DoubleWritable(n), generateFileName(n)); } System.out.println("======================================"); } @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int count = 0; int sum = 0; for (IntWritable value : values) { count++; sum += value.get(); } //计算平均值 double avg = (sum * 1.0D) / count; //加入top N addTopN(avg); context.write(key, new DoubleWritable(avg)); } @Override protected void cleanup(Context context) throws IOException, InterruptedException { //输出topN print(); super.cleanup(context); mos.close(); } } public static void main(String[] args) throws Exception { SimpleDateFormat format = new SimpleDateFormat("yyyyMMddHHmmss"); String f1 = "AvgTop.txt"; String hdfsUrl = "hdfs://192.168.0.31:9000"; HDFSTool.delete(hdfsUrl, "/tmp/input/" + f1); HDFSTool.upload(hdfsUrl, "/home/pandy/workspace/HadoopApp/" + f1, "/tmp/input/" + f1); try { Configuration conf = new Configuration(); conf.set("N", "5"); @SuppressWarnings("deprecation") Job job = new Job(conf); job.setJobName("avg&topn"); job.setInputFormatClass(TextInputFormat.class); //将输出设置为TextOutputFormat job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //job.setOutputFormatClass(TextOutputFormat.class); LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(DoubleWritable.class); //Mapper Reducer job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); //输入 输出路径 FileInputFormat.addInputPath(job, new Path(hdfsUrl+"/tmp/input/"+f1)); FileOutputFormat.setOutputPath(job, new Path(hdfsUrl+"/tmp/output/AvgTop_" + format.format(new Date()))); Date d1 = new Date(); int status = job.waitForCompletion(true) ? 0 : 1; Date d2 = new Date(); System.out.println("耗时:" + ((d2.getTime() - d1.getTime()) * 1.0 / 1000) + "秒"); System.exit(status); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 |
package com.pandy.hadoop.maprender;/** * Created by pandy on 16-12-12. */ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import java.io.IOException; import java.text.SimpleDateFormat; import java.util.Date; /** * 项目名称: workspace * 功能说明: * 1. 多输出 * 2. 区分是否及格并输出 * 3. 输出Top N的信息 * 创建者: Pandy, * 邮箱: panyongzheng@163.com, 1453261799@qq.com * 版权: * 官网: * 创建日期: 16-12-12. * 创建时间: 上午9:55. * 修改历史: * ----------------------------------------------- */ public class AvgTopLocalTest { /** * @Description: map函数,输出的结果为 “学生姓名 成绩” eg "zs 90" * @Author:lulei */ public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> { private MultipleOutputs mos; @Override protected void setup(Context context) throws IOException, InterruptedException { super.setup(context); mos = new MultipleOutputs(context); } @Override protected void cleanup(Context context) throws IOException, InterruptedException { super.cleanup(context); mos.close(); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { Line line = new Line(value.toString()); if (line.isRight()) { //输出给reduce context.write(line.getKey(), line.getValue()); //自定义区分是否及格, 分等级输出 if(line.getValue().get()<40){ mos.write(line.getKey(), line.getValue(),"F"); }else if(line.getValue().get()<60){ mos.write(line.getKey(), line.getValue(), "E"); }else if(line.getValue().get()<70){ mos.write(line.getKey(), line.getValue(), "D"); }else if(line.getValue().get()<80){ mos.write(line.getKey(), line.getValue(), "C"); }else if(line.getValue().get()<90){ mos.write(line.getKey(), line.getValue(), "B"); }else{ mos.write(line.getKey(), line.getValue(), "A"); } } } } /** * @Description: reduce函数,计算avg & top N * @Author:lulei */ public static class Reduce extends Reducer<Text, IntWritable, Text, DoubleWritable> { private static double[] topN; private static int N = 1; // 多输出处理 private MultipleOutputs mos; @Override protected void setup(Context context) throws IOException, InterruptedException { try { N = Integer.parseInt(context.getConfiguration().get("N")); } catch (Exception e) { N = 1; } topN = new double[N]; mos = new MultipleOutputs(context); } /** * @param avg * @Author:lulei * @Description: 将avg插入到topN中 */ private void addTopN(double avg) { if (avg > topN[N - 1]) { int i = 0; for (i = 0; i < N && avg < topN[i]; i++) ; if (i < N) { for (int j = N - 1; j > i; j--) { topN[j] = topN[j - 1]; } topN[i] = avg; } } } /** * 这里可以根据值的不同, 创建不同的文件夹, 我直接创建另一个就好 * @param value * @return */ private String generateFileName(double value) { /*if (value >= 60d) { return "GT60/"; } else { return "LT60/"; }*/ return "TOPN"; } /** * @Author:lulei * @Description: 输出top N的数据 */ private void print() throws IOException, InterruptedException{ System.out.println("======================================"); for (double n : topN) { System.out.print(n); System.out.print("->"); mos.write(new Text("N_"+n), new DoubleWritable(n), generateFileName(n)); } System.out.println("======================================"); } @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int count = 0; int sum = 0; for (IntWritable value : values) { count++; sum += value.get(); } //计算平均值 double avg = (sum * 1.0D) / count; //加入top N addTopN(avg); context.write(key, new DoubleWritable(avg)); } @Override protected void cleanup(Context context) throws IOException, InterruptedException { //输出topN print(); super.cleanup(context); mos.close(); } } public static void main(String[] args) throws Exception { SimpleDateFormat format = new SimpleDateFormat("yyyyMMddHHmmss"); String f1 = "AvgTop.txt"; String hdfsUrl = "hdfs://192.168.0.31:9000"; HDFSTool.delete(hdfsUrl, "/tmp/input/" + f1); HDFSTool.upload(hdfsUrl, "/home/pandy/workspace/HadoopApp/" + f1, "/tmp/input/" + f1); try { Configuration conf = new Configuration(); conf.set("N", "5"); @SuppressWarnings("deprecation") Job job = new Job(conf); job.setJobName("avg&topn"); job.setInputFormatClass(TextInputFormat.class); //将输出设置为TextOutputFormat job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //job.setOutputFormatClass(TextOutputFormat.class); LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(DoubleWritable.class); //Mapper Reducer job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); //输入 输出路径 FileInputFormat.addInputPath(job, new Path(hdfsUrl+"/tmp/input/"+f1)); FileOutputFormat.setOutputPath(job, new Path(hdfsUrl+"/tmp/output/AvgTop_" + format.format(new Date()))); Date d1 = new Date(); int status = job.waitForCompletion(true) ? 0 : 1; Date d2 = new Date(); System.out.println("耗时:" + ((d2.getTime() - d1.getTime()) * 1.0 / 1000) + "秒"); System.exit(status); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } } |
5. MapReduce二次排序 https://my.oschina.net/xiaoluobutou/blog/807362