鉴于在Web抓取服务和文本挖掘之句子向量中对权重值的计算需要,本文基于MapReduce计算模型实现了PageRank算法。为验证本文算法的有效性,本文采用177万余条源URL到目标URL链接的数据集,并迭代101次来展开测试,测试结果表明:对上述数据集进行测试,总计耗时40.29分钟。但是,本文作者的意图并不是为了实现该算法, 而是将该算法的设计思想引入后续Web抓取服务的优化与改进之中,以及后续文本挖掘中对权重值计算的需要之中。

刘  勇  Email:lyssym@sina.com

  鉴于在Web抓取服务和文本挖掘之句子向量中对权重值的计算需要,本文基于MapReduce计算模型实现了PageRank算法。为验证本文算法的有效性,本文采用177万余条源URL到目标URL链接的数据集,并迭代101次来展开测试,测试结果表明:对上述数据集进行测试,总计耗时40.29分钟。因此,在权重评定的算法设计与实现中引入该思想,具有较好的现实意义。

  在Web抓取服务中,由于采用多个定向爬虫对网页进行抓取,因此其面临2个重要问题,1)爬虫的调度问题,不同的爬虫的抓取频率决定了获取该站点的信息数量;2)爬虫的深度问题,在某个站点内抓取越深,其获取的信息越陈旧,而爬虫设计者的本意是及时回头,即到达一定深度后,返回站点首页或者回退至上一步。因此,基于上述现实问题,本文作者拟将Google的PageRank算法的排名思想引入至该应用中,通过其PageRank值来确定各站点的抓取频率,但是采用这种设计的结果,则是设计一个全网爬虫。

  在文本挖掘研究中,对句子权重的研究中,拟引入PageRank思想来计算句子向量的权重中,主要是基于句子相互间存在语义关联。

  鉴于上述原因,结合Web数据规模日益扩展的需要,采用MapReduce计算模型实现PageRank算法。

  1) 有向图

  互联网中的网页可以视为有向图,每一个网页可以视为一个节点,若网页A中有一个链接到B,则存在一条有向边AàB。如图1所示,为一个简单的网络链接有向图。

 

图1 网络链接有向图

  根据图1对PageRank排名思想解释如下:对于A节点,其存在3条链出(A–>B,A–>C,A–>D),因此B、C、D分别获得A节点PageRank(PR)值的1/3;但是,节点A又存在2条链入(B–> A,C–>A),即节点B的PR的1/2和节点C的PR对A有贡献。其它节点,与之类同,在此不做赘述。

  根据上述分析,很明显PageRank算法需要做多次迭代,以期使各节点的PR值趋于稳定(最终达到收敛)。本文作者设计时,拟采用100为最大迭代次数。

  2) 终止与陷阱

  若将Web抓取服务(网络爬虫)视为马尔科夫链过程,则上述收敛问题,必须满足有向图是强关联的。但是现实网络世界里,强关联有向图不太现实,即有些站点(网址)中的信息中根本没有链接或者链接已失效,即面临节点终止问题,如图2所示。

 

图2网络链接有向图

  在图2中,节点C没有链出,即Web抓取服务在C节点面临终止,其造成PR值在多次迭代之后收敛于0。按照网络爬虫设计的本意,当面临节点终止时,应该跳转至其它的节点,最常见的做法为跳转至站点首页或者返回上一级。

  此外,有些站点(网址)为了方便向用户提供服务,会在网页醒目位置多次嵌入本网页的链接,如门户网站。但是,若某一个站点(网址)只嵌入本网页的链接,则必然造成网络爬虫掉入陷阱中,进入死循环,即面临节点陷阱问题,如图3所示。

 

图3 网络链接有向图

  在图3中,节点C只存在自身的链出,即Web抓取服务在C节点陷入死循环,其造成PR值在多次迭代之后,C节点的PR值为1,其它节点的PR值均为0。按照网络爬虫设计的本意,当面节点陷阱时,应该跳转至其它的节点,最常见的做法为跳转至站点首页或者返回上一级。

       因此,本文作者综合上述2种特殊情形,引入以下策略,每个节点,都有权力选择进入下一个节点,或者回退至上一个节点。具体数学表达式如公式1所示:

          (公式1)

       如公式1所示,α为影响因子, 和分别为当前迭代中某节点的PR值和上一次迭代中某节点的PR。鉴于每个节点均有权利回退至上一个节点,由于表征上一次迭代中所有链入之和,故此引入该参数来表征回退至上一次的可能性。

  3) 算法设计

  本文在设计中,采用两级MapReduce计算模型来实现,第一级MapReduce生成网络链接的有向图;第二级MapReduce用于迭代PR,以确定PR是否收敛。需要指出,本文在设计过程中,Reducer均设定为5个。

  针对第一级MapReduce,对其Mapper和Reducer简要描述如下:

  Mapper:完成源URL和目标URL的标识,如A–>B;

  Reducer: 根据源URL(有向图),实现某一节点至其所有链出的标识,如A–>B–>C–>D,需要指出,上述标识表示,A–>B,A–>C,A–>D。采用该设计主要是为了节省节点存储空间。

  针对第二级MapReduce,对其Mapper和Reducer简要描述如下:

  Mapper: 完成链出节点及其PR获取,如B节点存在B–>A,B–>D,则A和D均分B的PR值;

  Reducer:针对Mapper中每个节点及其PR,对该节点PR值求和,并采用公式1进行量化。

  其中第二级MapReduce采用多次迭代,若迭代过程中,节点的PR已收敛,则退出迭代。

  4)  测试结果

  本文测试环境描述如下,采用10台物理机组成Hadoop集群,CPU:Intel(R) Core(TM) i5-4440 CPU @ 3.10GHz,内存:4G,Hadoop:2.7.1,以上描述为集群的大概配置,其中某个节点的配置可能不一致,本文作者也并未对每个节点进行详细确认。本文测试数据集采用177万余条链接,迭代101次(迭代次数当时控制失误,本意为100次,作者比较懒就没有重新再次迭代了),总计耗时为40.29分钟。从整体而言,其处理速率在1小时内,还是能够接受的。

  本文对基于MapReduce的PageRank算法进行研究与实现,经过实际数据集进行测试,测试结果表明,该测试结果处理速率还是能够接受的。但是,本文作者的意图并不是为了实现该算法,而是将该算法的设计思想引入后续Web抓取服务的优化与改进之中,以及后续文本挖掘中对权重值计算的需要之中。

  程序源代码:

  1. 1 import java.io.IOException;
  2. 2 import org.apache.hadoop.io.NullWritable;
  3. 3 import org.apache.hadoop.io.Text;
  4. 4 import org.apache.hadoop.mapreduce.Mapper;
  5. 5 import org.apache.hadoop.mapreduce.Reducer;
  6. 6 import org.apache.logging.log4j.LogManager;
  7. 7 import org.apache.logging.log4j.Logger;
  8. 8
  9. 9 public class GraphSet {
  10. 10
  11. 11 public static class GraphMapper extends Mapper<Object, Text, Text, Text> {
  12. 12 public static Logger logger = LogManager.getLogger(GraphMapper.class);
  13. 13
  14. 14 public void map(Object key, Text value, Context context)
  15. 15 {
  16. 16 String[] link = value.toString().split("\t");
  17. 17 try {
  18. 18 context.write(new Text(link[0]), new Text(link[1]));
  19. 19 } catch (IOException e) {
  20. 20 e.printStackTrace();
  21. 21 } catch (InterruptedException e) {
  22. 22 e.printStackTrace();
  23. 23 }
  24. 24 }
  25. 25 }
  26. 26
  27. 27
  28. 28 public static class GraphReducer extends Reducer<Text, Text, NullWritable, Text> {
  29. 29 public static Logger logger = LogManager.getLogger(GraphReducer.class);
  30. 30 public void reduce(Text key, Iterable<Text> values, Context context)
  31. 31 {
  32. 32 StringBuilder sb = new StringBuilder();
  33. 33 sb.append(key.toString());
  34. 34 for (Text e : values) {
  35. 35 sb.append("\t");
  36. 36 sb.append(e.toString());
  37. 37 }
  38. 38
  39. 39 try {
  40. 40 context.write(NullWritable.get(), new Text(sb.toString()));
  41. 41 } catch (IOException e1) {
  42. 42 e1.printStackTrace();
  43. 43 } catch (InterruptedException e1) {
  44. 44 e1.printStackTrace();
  45. 45 }
  46. 46 }
  47. 47 }
  48. 48
  49. 49 }

Class GraphSet

  1. 1 import java.io.IOException;
  2. 2 import java.util.Map;
  3. 3 import java.util.HashMap;
  4. 4
  5. 5 import org.apache.hadoop.io.NullWritable;
  6. 6 import org.apache.hadoop.io.Text;
  7. 7 import org.apache.hadoop.io.DoubleWritable;
  8. 8 import org.apache.hadoop.mapreduce.Mapper;
  9. 9 import org.apache.hadoop.mapreduce.Reducer;
  10. 10 import org.apache.logging.log4j.LogManager;
  11. 11 import org.apache.logging.log4j.Logger;
  12. 12
  13. 13 public class PageRank {
  14. 14 public static double alpha = 0.8;
  15. 15
  16. 16 public static class PageRankMapper extends Mapper<Object, Text, Text, DoubleWritable> {
  17. 17 public static Map<String, Double> mapPR = new HashMap<String, Double>();
  18. 18 public static Logger logger = LogManager.getLogger(PageRankMapper.class);
  19. 19
  20. 20 public void map(Object key, Text value, Context context) {
  21. 21 String[] str = value.toString().split("\t");
  22. 22 int size = str.length;
  23. 23 double linkOut = mapPR.get(str[0]);
  24. 24 try {
  25. 25 for (int i = 0; i < size-1; i++) {
  26. 26 DoubleWritable dw = new DoubleWritable();
  27. 27 dw.set(linkOut / (size-1)); // count the output of links
  28. 28 context.write(new Text(str[i+1]), dw);
  29. 29 }
  30. 30 } catch (Exception e) {
  31. 31 e.printStackTrace();
  32. 32 }
  33. 33 }
  34. 34 }
  35. 35
  36. 36
  37. 37 public static class PageRankReducer extends Reducer<Text, DoubleWritable, NullWritable, Text> {
  38. 38 public static Logger logger = LogManager.getLogger(PageRankReducer.class);
  39. 39 private int numberOfUrl = 0;
  40. 40
  41. 41 public void setup(Context context) {
  42. 42 numberOfUrl = context.getConfiguration().getInt("numberOfUrl", Integer.MAX_VALUE);
  43. 43 }
  44. 44
  45. 45
  46. 46 public void reduce(Text key, Iterable<DoubleWritable> values, Context context) {
  47. 47 double factor = 0;
  48. 48 double sum = 0;
  49. 49 for (DoubleWritable d : values)
  50. 50 sum += d.get();
  51. 51
  52. 52 if (PageRankMapper.mapPR.containsKey(key.toString()))
  53. 53 factor = PageRankMapper.mapPR.get((key.toString()));
  54. 54 else
  55. 55 factor = (double)1/(2*numberOfUrl);
  56. 56
  57. 57 sum = alpha*sum + (1-alpha)*factor;
  58. 58 String ret = key.toString() + "\t" + String.valueOf(sum);
  59. 59 try {
  60. 60 context.write(NullWritable.get(), new Text(ret));
  61. 61 } catch (IOException e) {
  62. 62 e.printStackTrace();
  63. 63 } catch (InterruptedException e) {
  64. 64 e.printStackTrace();
  65. 65 }
  66. 66 }
  67. 67 }
  68. 68
  69. 69 }

Class PageRank

  1. 1 import java.io.BufferedReader;
  2. 2 import java.io.IOException;
  3. 3 import java.io.InputStreamReader;
  4. 4 import java.net.URI;
  5. 5 import java.util.Map;
  6. 6 import java.util.HashMap;
  7. 7 import java.util.Set;
  8. 8 import java.util.TreeSet;
  9. 9
  10. 10 import org.apache.hadoop.conf.Configuration;
  11. 11 import org.apache.hadoop.fs.FSDataInputStream;
  12. 12 import org.apache.hadoop.fs.FileSystem;
  13. 13 import org.apache.hadoop.fs.Path;
  14. 14 import org.apache.hadoop.io.DoubleWritable;
  15. 15 import org.apache.hadoop.io.NullWritable;
  16. 16 import org.apache.hadoop.io.Text;
  17. 17 import org.apache.hadoop.mapreduce.Job;
  18. 18 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  19. 19 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  20. 20 import org.apache.logging.log4j.LogManager;
  21. 21 import org.apache.logging.log4j.Logger;
  22. 22
  23. 23 import com.gta.graph.GraphSet;
  24. 24 import com.gta.graph.GraphSet.GraphMapper;
  25. 25 import com.gta.graph.GraphSet.GraphReducer;
  26. 26 import com.gta.pagerank.PageRank.PageRankMapper;
  27. 27 import com.gta.pagerank.PageRank.PageRankReducer;
  28. 28
  29. 29 public class PR {
  30. 30 public static final int MAX = 100;
  31. 31 public static final int TASK = 5;
  32. 32 public static final double THRESHOLD = 1e-10;
  33. 33 public static final String INPUT_PATH = "hdfs://10.1.130.10:9000/user/hadoop/pagerank/input/";
  34. 34 public static final String OUTPUT_PATH = "hdfs://10.1.130.10:9000/user/hadoop/pagerank/output/";
  35. 35 public static final String TMP_PATH = "hdfs://10.1.130.10:9000/user/hadoop/pagerank/tmp/";
  36. 36 public static Logger logger = LogManager.getLogger(PR.class);
  37. 37 private Configuration conf = null;
  38. 38 private int numberOfUrl = 0;
  39. 39
  40. 40 public PR()
  41. 41 {
  42. 42 conf = new Configuration();
  43. 43 }
  44. 44
  45. 45
  46. 46 public void initGraph() throws IOException, InterruptedException, ClassNotFoundException
  47. 47 {
  48. 48 Job job = Job.getInstance(conf, "Init Graph");
  49. 49 job.setJarByClass(GraphSet.class);
  50. 50 job.setMapperClass(GraphMapper.class);
  51. 51 job.setMapOutputKeyClass(Text.class);
  52. 52 job.setMapOutputValueClass(Text.class);
  53. 53 job.setNumReduceTasks(TASK);
  54. 54 job.setReducerClass(GraphReducer.class);
  55. 55 job.setOutputKeyClass(NullWritable.class);
  56. 56 job.setOutputValueClass(Text.class);
  57. 57 FileInputFormat.addInputPath(job, new Path(INPUT_PATH));
  58. 58 FileOutputFormat.setOutputPath(job, new Path(TMP_PATH));
  59. 59 job.waitForCompletion(true);
  60. 60 initPRMap(TMP_PATH);
  61. 61 conf.setInt("numberOfUrl", numberOfUrl);
  62. 62 }
  63. 63
  64. 64
  65. 65 public void pageRank() throws IOException, InterruptedException, ClassNotFoundException
  66. 66 {
  67. 67 int iter = 0;
  68. 68 while (iter <= MAX) {
  69. 69 Job job = Job.getInstance(conf, "PageRank");
  70. 70 job.setJarByClass(PageRank.class);
  71. 71 job.setMapperClass(PageRankMapper.class);
  72. 72 job.setMapOutputKeyClass(Text.class);
  73. 73 job.setMapOutputValueClass(DoubleWritable.class);
  74. 74 job.setNumReduceTasks(TASK);
  75. 75 job.setReducerClass(PageRankReducer.class);
  76. 76 job.setOutputKeyClass(NullWritable.class);
  77. 77 job.setOutputValueClass(Text.class);
  78. 78 FileInputFormat.addInputPath(job, new Path(TMP_PATH));
  79. 79 FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH + iter));
  80. 80 job.waitForCompletion(true);
  81. 81 Map<String, Double> newPR = getNewPageRank(OUTPUT_PATH + iter);
  82. 82 if (compare(PageRankMapper.mapPR, newPR))
  83. 83 break;
  84. 84 else {
  85. 85 for (String key : newPR.keySet())
  86. 86 PageRankMapper.mapPR.put(key, newPR.get(key));
  87. 87 }
  88. 88
  89. 89 iter++;
  90. 90 }
  91. 91 }
  92. 92
  93. 93
  94. 94 public void initPRMap(String filePath)
  95. 95 {
  96. 96 String fileName = filePath + "/part-r-0000";
  97. 97 Set<String> set = new TreeSet<String>();
  98. 98 try {
  99. 99 for (int i = 0; i < TASK; i++) {
  100. 100 FileSystem fs = FileSystem.get(URI.create((fileName+i)), conf);
  101. 101 FSDataInputStream is = fs.open(new Path((fileName+i).toString()));
  102. 102 BufferedReader br = new BufferedReader(new InputStreamReader(is, "UTF-8"));
  103. 103 String s = null;
  104. 104 while ((s = br.readLine()) != null) {
  105. 105 String[] str = s.split("\t");
  106. 106 if (str.length == 0)
  107. 107 break;
  108. 108 set.add(str[0]);
  109. 109 }
  110. 110 br.close();
  111. 111 }
  112. 112
  113. 113 numberOfUrl = set.size();
  114. 114 for (String s : set)
  115. 115 PageRankMapper.mapPR.put(s, (double)1/numberOfUrl);
  116. 116
  117. 117 } catch (IOException e) {
  118. 118 e.printStackTrace();
  119. 119 }
  120. 120 }
  121. 121
  122. 122
  123. 123 public Map<String, Double> getNewPageRank(String filePath)
  124. 124 {
  125. 125 Map<String, Double> newPR = new HashMap<String, Double>();
  126. 126 String fileName = filePath + "/part-r-0000";
  127. 127 try {
  128. 128 for (int i = 0; i < TASK; i++) {
  129. 129 FileSystem fs = FileSystem.get(URI.create((fileName+i)), conf);
  130. 130 FSDataInputStream is = fs.open(new Path((fileName+i).toString()));
  131. 131 BufferedReader br = new BufferedReader(new InputStreamReader(is, "UTF-8"));
  132. 132 String s = null;
  133. 133 while ((s = br.readLine()) != null) {
  134. 134 String[] elements = s.split("\t");
  135. 135 newPR.put(elements[0], Double.parseDouble(elements[1]));
  136. 136 }
  137. 137 br.close();
  138. 138 }
  139. 139 } catch (IOException e) {
  140. 140 e.printStackTrace();
  141. 141 }
  142. 142 return newPR;
  143. 143 }
  144. 144
  145. 145
  146. 146 public boolean compare(Map<String, Double> oldPR, Map<String, Double> newPR) {
  147. 147 boolean ret = false;
  148. 148 int newPRSize = newPR.size();
  149. 149 int oldPRSize = oldPR.size();
  150. 150 if (oldPRSize == newPRSize) {
  151. 151 int count = 0;
  152. 152 for (String key : oldPR.keySet()) {
  153. 153 if (newPR.containsKey(key)) {
  154. 154 if (Math.abs(newPR.get(key) - oldPR.get(key)) <= THRESHOLD)
  155. 155 count++;
  156. 156 }
  157. 157 }
  158. 158
  159. 159 if (count == newPRSize)
  160. 160 ret = true;
  161. 161 }
  162. 162 return ret;
  163. 163 }
  164. 164
  165. 165
  166. 166 public static void main(String[] args) {
  167. 167 PR pr = new PR();
  168. 168 try {
  169. 169 long start = System.currentTimeMillis();
  170. 170 pr.initGraph();
  171. 171 pr.pageRank();
  172. 172 long end = System.currentTimeMillis();
  173. 173 PR.logger.info("共耗时: " + (end-start));
  174. 174 } catch (ClassNotFoundException e) {
  175. 175 e.printStackTrace();
  176. 176 } catch (IOException e) {
  177. 177 e.printStackTrace();
  178. 178 } catch (InterruptedException e) {
  179. 179 e.printStackTrace();
  180. 180 }
  181. 181 }
  182. 182
  183. 183 }

Class PR

   本文插图参考:http://www.cnblogs.com/fengfenggirl/p/pagerank-introduction.html

 

 


  作者:志青云集
  出处:http://www.cnblogs.com/lyssym
  如果,您认为阅读这篇博客让您有些收获,不妨点击一下右下角的【推荐】。
  如果,您希望更容易地发现我的新博客,不妨点击一下左下角的【关注我】。
  如果,您对我的博客所讲述的内容有兴趣,请继续关注我的后续博客,我是【志青云集】。
  本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接。


 

版权声明:本文为lyssym原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/lyssym/p/4969197.html