二次排序,从字面上可以理解为在对key排序的基础上对key所对应的值value排序,也叫辅助排序。一般情况下,MapReduce框架只对key排序,而不对key所对应的值排序,因此value的排序经常是不固定的。但是我们经常会遇到同时对key和value排序的需求,例如Hadoop权威指南中的求一年的高高气温,key为年份,value为最高气温,年份按照降序排列,气温按照降序排列。还有水果电商网站经常会有按天统计水果销售排行榜的需求等等,这些都是需要对key和value同时进行排序。如下图所示:

如何设计一个MapReduce程序解决对key和value同时排序的需求呢?这就需要用到组合键、分区、分组的概念。在这里又看到分区的影子,可知分区在MapReduce是多么的重要,一定要好好掌握,是优化的重点。

按照上图中数据流转的方向,我们首先设计一个Fruit类,有三个字段,分别是日期、水果名和销量,将日期、水果名和销量作为一个复合键;接着设计一个自定义Partition类,根据Fruit的日期字段分区,让相同日期的数据流向同一个partition分区中;最后定义一个分组类,实现同一个分区内的数据分组,然后按照销量字段进行二次排序。

具体实现思路:
1、定义Fruit类,实现WritableComparable接口,并且重写compareTo、equal和hashcode方法以及序列化和反序列化方法readFields和write方法。Java类要在网络上传输必须序列化和反序列化。在Map端的map函数中将Fruit对象当做key。compareTo方法用于比较两个key的大小,在本文中就是比较两个Fruit对象的排列顺序。

2、自定义第一次排序类,继承WritableComparable或者WritableComparator接口,重写compareTo或者compare方法,。就是在Map端对Fruit对象的第一个字段进行排序

3、自定义Partition类,实现Partitioner接口,并且重写getPartition方法,将日期相同的Fruit对象分发到同一个partition中。

4、定义分组类,继承WritableComparator接口,并且重写compare方法。用于比较同一分组内两个Fruit对象的排列顺序,根据销量字段比较。日期相同的Fruit对象会划分到同一个分组。通过setGroupingComparatorClass方法设置分组类。如果不设置分组类,则按照key默认的compare方法来对key进行排序。

代码如下:

  1. 1 import org.apache.hadoop.conf.Configured;
  2. 2 import org.apache.hadoop.io.WritableComparable;
  3. 3 import java.io.DataInput;
  4. 4 import java.io.DataOutput;
  5. 5 import java.io.IOException;
  6. 6 import org.apache.hadoop.io.*;
  7. 7 import org.apache.hadoop.mapreduce.Partitioner;
  8. 8 import org.apache.hadoop.mapreduce.Mapper;
  9. 9 import org.apache.hadoop.mapreduce.Reducer;
  10. 10 import org.apache.hadoop.conf.Configuration;
  11. 11 import org.apache.hadoop.fs.FileSystem;
  12. 12 import org.apache.hadoop.fs.Path;
  13. 13 import org.apache.hadoop.mapreduce.Job;
  14. 14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  15. 15 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  16. 16 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
  17. 17 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
  18. 18 import org.apache.hadoop.util.Tool;
  19. 19 import org.apache.hadoop.util.ToolRunner;
  20. 20 import org.slf4j.Logger;
  21. 21 import org.slf4j.LoggerFactory;
  22. 22
  23. 23 public class SecondrySort extends Configured implements Tool {
  24. 24
  25. 25 static class Fruit implements WritableComparable<Fruit>{
  26. 26 private static final Logger logger = LoggerFactory.getLogger(Fruit.class);
  27. 27 private String date;
  28. 28 private String name;
  29. 29 private Integer sales;
  30. 30 public Fruit(){
  31. 31 }
  32. 32 public Fruit(String date,String name,Integer sales){
  33. 33 this.date = date;
  34. 34 this.name = name;
  35. 35 this.sales = sales;
  36. 36 }
  37. 37
  38. 38 public String getDate(){
  39. 39 return this.date;
  40. 40 }
  41. 41
  42. 42 public String getName(){
  43. 43 return this.name;
  44. 44 }
  45. 45
  46. 46 public Integer getSales(){
  47. 47 return this.sales;
  48. 48 }
  49. 49
  50. 50 @Override
  51. 51 public void readFields(DataInput in) throws IOException{
  52. 52 this.date = in.readUTF();
  53. 53 this.name = in.readUTF();
  54. 54 this.sales = in.readInt();
  55. 55 }
  56. 56
  57. 57 @Override
  58. 58 public void write(DataOutput out) throws IOException{
  59. 59 out.writeUTF(this.date);
  60. 60 out.writeUTF(this.name);
  61. 61 out.writeInt(sales);
  62. 62 }
  63. 63
  64. 64 @Override
  65. 65 public int compareTo(Fruit other) {
  66. 66 int result1 = this.date.compareTo(other.getDate());
  67. 67 if(result1 == 0) {
  68. 68 int result2 = this.sales - other.getSales();
  69. 69 if (result2 == 0) {
  70. 70 double result3 = this.name.compareTo(other.getName());
  71. 71 if(result3 > 0) return -1;
  72. 72 else if(result3 < 0) return 1;
  73. 73 else return 0;
  74. 74 }else if(result2 >0){
  75. 75 return -1;
  76. 76 }else if(result2 < 0){
  77. 77 return 1;
  78. 78 }
  79. 79 }else if(result1 > 0){
  80. 80 return -1;
  81. 81 }else{
  82. 82 return 1;
  83. 83 }
  84. 84 return 0;
  85. 85 }
  86. 86
  87. 87 @Override
  88. 88 public int hashCode(){
  89. 89 return this.date.hashCode() * 157 + this.sales + this.name.hashCode();
  90. 90 }
  91. 91
  92. 92 @Override
  93. 93 public boolean equals(Object object){
  94. 94 if (object == null)
  95. 95 return false;
  96. 96 if (this == object)
  97. 97 return true;
  98. 98 if (object instanceof Fruit){
  99. 99 Fruit r = (Fruit) object;
  100. 100 // if(r.getDate().toString().equals(this.getDate().toString())){
  101. 101 return r.getDate().equals(this.getDate()) && r.getName().equals(this.getName())
  102. 102 && this.getSales() == r.getSales();
  103. 103 }else{
  104. 104 return false;
  105. 105 }
  106. 106 }
  107. 107
  108. 108 public String toString() {
  109. 109 return this.date + " " + this.name + " " + this.sales;
  110. 110 }
  111. 111
  112. 112 }
  113. 113
  114. 114 static class FruitPartition extends Partitioner<Fruit, NullWritable>{
  115. 115 @Override
  116. 116 public int getPartition(Fruit key, NullWritable value,int numPartitions){
  117. 117 return Math.abs(Integer.parseInt(key.getDate()) * 127) % numPartitions;
  118. 118 }
  119. 119 }
  120. 120
  121. 121 public static class GroupingComparator extends WritableComparator{
  122. 122 protected GroupingComparator(){
  123. 123 super(Fruit.class, true);
  124. 124 }
  125. 125
  126. 126 @Override
  127. 127 public int compare(WritableComparable w1, WritableComparable w2){
  128. 128 Fruit f1 = (Fruit) w1;
  129. 129 Fruit f2 = (Fruit) w2;
  130. 130
  131. 131 if(!f1.getDate().equals(f2.getDate())){
  132. 132 return f1.getDate().compareTo(f2.getDate());
  133. 133 }else{
  134. 134 return f1.getSales().compareTo(f2.getSales());
  135. 135 }
  136. 136 }
  137. 137 }
  138. 138
  139. 139 public static class Map extends Mapper<LongWritable, Text, Fruit, NullWritable> {
  140. 140
  141. 141 public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  142. 142 String line = value.toString();
  143. 143 String str[] = line.split(" ");
  144. 144 Fruit fruit = new Fruit(str[0],str[1],new Integer(str[2]));
  145. 145 //Fruit fruit = new Fruit();
  146. 146 //fruit.set(str[0],str[1],new Integer(str[2]));
  147. 147 context.write(fruit, NullWritable.get());
  148. 148 }
  149. 149 }
  150. 150
  151. 151 public static class Reduce extends Reducer<Fruit, NullWritable, Text, NullWritable> {
  152. 152
  153. 153 public void reduce(Fruit key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
  154. 154 String str = key.getDate() + " " + key.getName() + " " + key.getSales();
  155. 155 context.write(new Text(str), NullWritable.get());
  156. 156 }
  157. 157 }
  158. 158
  159. 159 @Override
  160. 160 public int run(String[] args) throws Exception {
  161. 161 Configuration conf = new Configuration();
  162. 162 // 判断路径是否存在,如果存在,则删除
  163. 163 Path mypath = new Path(args[1]);
  164. 164 FileSystem hdfs = mypath.getFileSystem(conf);
  165. 165 if (hdfs.isDirectory(mypath)) {
  166. 166 hdfs.delete(mypath, true);
  167. 167 }
  168. 168
  169. 169 Job job = Job.getInstance(conf, "Secondry Sort app");
  170. 170 // 设置主类
  171. 171 job.setJarByClass(SecondrySort.class);
  172. 172
  173. 173 // 输入路径
  174. 174 FileInputFormat.setInputPaths(job, new Path(args[0]));
  175. 175 // 输出路径
  176. 176 FileOutputFormat.setOutputPath(job, new Path(args[1]));
  177. 177
  178. 178 // Mapper
  179. 179 job.setMapperClass(Map.class);
  180. 180 // Reducer
  181. 181 job.setReducerClass(Reduce.class);
  182. 182
  183. 183 // 分区函数
  184. 184 job.setPartitionerClass(FruitPartition.class);
  185. 185
  186. 186 // 分组函数
  187. 187 job.setGroupingComparatorClass(GroupingComparator.class);
  188. 188
  189. 189 // map输出key类型
  190. 190 job.setMapOutputKeyClass(Fruit.class);
  191. 191 // map输出value类型
  192. 192 job.setMapOutputValueClass(NullWritable.class);
  193. 193
  194. 194 // reduce输出key类型
  195. 195 job.setOutputKeyClass(Text.class);
  196. 196 // reduce输出value类型
  197. 197 job.setOutputValueClass(NullWritable.class);
  198. 198
  199. 199 // 输入格式
  200. 200 job.setInputFormatClass(TextInputFormat.class);
  201. 201 // 输出格式
  202. 202 job.setOutputFormatClass(TextOutputFormat.class);
  203. 203
  204. 204 return job.waitForCompletion(true) ? 0 : 1;
  205. 205 }
  206. 206
  207. 207 public static void main(String[] args) throws Exception{
  208. 208 int exitCode = ToolRunner.run(new SecondrySort(), args);
  209. 209 System.exit(exitCode);
  210. 210 }
  211. 211 }

测试数据:

20180906 Apple 200
20180904 Apple 200
20180905 Banana 100
20180906 Orange 300
20180906 Banana 400
20180904 Orange 100
20180905 Apple 400
20180904 Banana 300
20180905 Orange 500

运行结果:

20180906 Banana 400
20180906 Orange 300
20180906 Apple 200
20180905 Orange 500
20180905 Apple 400
20180905 Banana 100
20180904 Banana 300
20180904 Apple 200
20180904 Orange 100

 

总结:

1、在使用自定义比较器时,必须有一个无参的构造函数。
2、readFields和write方法中处理字段的顺序必须一致,否则会报MapReduce Error: java.io.EOFException at java.io.DataInputStream.readFully(DataInputStream.java:197)的错误。

 

了解更多大数据的知识请关注我的微信公众号:summer_bigdata

欢迎可以扫码关注本人的公众号:

 

版权声明:本文为airnew原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/airnew/p/9631718.html