webMagic+RabbitMQ+ES爬取京东建材数据

     本次爬虫所要爬取的数据为京东建材数据,在爬取京东的过程中,发现京东并没有做反爬虫动作,所以爬取的过程还是比较顺利的。

为什么要用WebMagic:

  • WebMagic作为一款轻量级的Java爬虫框架,可以极大的减少爬虫的开发时间

为什么要使用MQ(本项目用的RabbitMq,其他的MQ也可以):

  • 解耦各个模块,实现各个爬虫之间相互独立
  • 项目健壮性,不管是主动还是被动原因(断电等状况)停下了项目,只需要重新读取MQ中的数据就能继续工作
  • 拆分了业务逻辑,使每个模块更加简单。代码易于编写

为什么要用ES:

  • 方便后期搜索
  • 业务需求

 

项目大体架构图:

 

 此处有多个spider,前面几层的spider分别处理不同模块的数据,将处理好的数据放入mq,供下一级的spider来调用。

 本次爬取的最终页面是商品的详情页,所以最后一级的spider将详情数据爬取完之后存储到ES之中。

 spider1处理京东建材主页:

 

  spider2:处理京东分页栏:

 

 spider3:处理京东列表:

 spider4:处理产品详情:

 

根据上面的框架图。我们发现每一个spider都需要跟MQ链接,第一级的Spider不需要对MQ进行消费,最后一级的Spider不需要负责Mq数据的生产。 其他的spider既需要对MQ进行消费,也需要对MQ进行生产。

因此我们给没一个spider都绑上一个消费者和生产者,框架示意图如下:

 

  WebMagic作为一款优秀爬虫框架,拓展性良好,我们在原先的框架上稍作拓展。

我们在原先的webMagic spider基础上添加一个异步的消费者consumer(consumer封装了rabbitMq的消费操作,比较简单就不附代码),它的作用

  • 负责读取MQ中待消费的信息,并将需要爬取数据添加的spider的requestList。
  • 记录所有读取到消息。当spider消费完这段消息后,返回消息的ack给MQ,表示消息已经被成功消费。
  • 读取queue中的消息剩余量,作为关闭spider的条件之一

       附上代码:

spider基础上添加一个父级的spider,它的作用:

  • 配合consumer读取消息剩余量关闭spider。如果父级的spider不存在或者已经关闭,当前spider已经消费完毕,queue中也没有剩余的消息。当前的spider就可以关闭了
  1 package com.chinaredstar.jc.core.spider;
  2 
  3 import com.chinaredstar.jc.core.page.CrawlerPage;
  4 import com.chinaredstar.jc.crawler.consumer.Consumer;
  5 import com.chinaredstar.jc.infras.utils.JSONUtil;
  6 import com.rabbitmq.client.QueueingConsumer;
  7 import org.apache.commons.collections.map.HashedMap;
  8 import us.codecraft.webmagic.Request;
  9 import us.codecraft.webmagic.Spider;
 10 import us.codecraft.webmagic.SpiderListener;
 11 import us.codecraft.webmagic.processor.PageProcessor;
 12 
 13 import java.io.IOException;
 14 import java.util.ArrayList;
 15 import java.util.Map;
 16 
 17 
 18 /**
 19  * @author zhuangj
 20  * @date 2017/12/1
 21  */
 22 public class JcSpider extends Spider {
 23 
 24     /**
 25      * 队列消费者,为spider提供数据
 26      */
 27     private Consumer consumer;
 28 
 29     /**
 30      * 队列消费者,为spider提供数据
 31      */
 32     private String  consumerQueueName;
 33 
 34     /**
 35      * 用于确认Mq中的消息是否执行完毕
 36      */
 37     private Map<String,QueueingConsumer.Delivery> ackMap=new HashedMap();
 38 
 39     /**
 40      *  剩余消息数量
 41      */
 42     private Integer messageNum=0;
 43 
 44     /**
 45      * 父节点爬虫,父节点停止,子节点才能停止
 46      */
 47     private JcSpider parentSpider;
 48 
 49 
 50     public JcSpider(PageProcessor pageProcessor) {
 51         super(pageProcessor);
 52         exitWhenComplete=false;
 53     }
 54 
 55     public Consumer getConsumer() {
 56         return consumer;
 57     }
 58 
 59     public void setConsumer(Consumer consumer) {
 60         this.consumer = consumer;
 61     }
 62 
 63     public JcSpider getParentSpider() {
 64         return parentSpider;
 65     }
 66 
 67     public void setParentSpider(JcSpider parentSpider) {
 68         this.parentSpider = parentSpider;
 69     }
 70 
 71 
 72     public Integer getMessageNum() {
 73         return messageNum;
 74     }
 75 
 76     public void setMessageNum(Integer messageNum) {
 77         this.messageNum = messageNum;
 78     }
 79 
 80     public String getConsumerQueueName() {
 81         return consumerQueueName;
 82     }
 83 
 84     public void setConsumerQueueName(String consumerQueueName) {
 85         this.consumerQueueName = consumerQueueName;
 86     }
 87 
 88     @Override
 89     protected void initComponent() {
 90         super.initComponent();
 91         this.setSpiderListeners(new ArrayList<>());
 92         this.requestMessageListen();
 93 //        this.startConsumer(consumerQueueName);
 94     }
 95 
 96     public void startConsumer(String queueName) {
 97         if(consumer==null){
 98             this.exitWhenComplete=true;
 99             return;
100         }
101         logger.info("queueName:{},startConsumer",queueName);
102         JcSpider jcSpider = this;
103         Runnable myRunnable = () -> {
104             try {
105                 messageNum=consumer.getQueueMsgNum(queueName);
106 
107                 Status parentStatus = Status.Stopped;
108                 if(parentSpider!=null){
109                      parentStatus=parentSpider.getStatus();
110                 }
111 
112                 while (!parentStatus.equals(Status.Stopped) || messageNum > 0) {
113                     if(!jcSpider.getStatus().equals(Status.Running)){
114                         Thread.sleep(500);
115                     }
116                     QueueingConsumer.Delivery delivery = consumer.getDeliveryMessage(queueName);
117                     String message = new String(delivery.getBody());
118                     CrawlerPage crawlerPage = JSONUtil.toObject(message, CrawlerPage.class);
119                     Request request = crawlerPage.translateRequest();
120 
121                     //添加监听
122                     ackMap.put(request.getUrl(),delivery);
123                     jcSpider.addRequest(request);
124                     messageNum=consumer.getQueueMsgNum(queueName);
125                     if(messageNum==0){
126                         Thread.sleep(500);
127                     }
128                 }
129                 System.out.println("spider:"+getUUID()+",consumer stop");
130                 if(parentSpider!=null){
131                     System.out.println("parentStatus:"+parentSpider.getStatus().name());
132                 }
133                 System.out.println("messageNum:"+messageNum);
134                 //父级没有消息,消息队列没有消息,爬虫完成后就退出了
135                 Thread.sleep(2000);
136                 this.exitWhenComplete=true;
137             } catch (Exception e) {
138                 e.printStackTrace();
139             }
140         };
141         Thread thread = new Thread(myRunnable);
142         thread.start();
143     }
144 
145 
146     /**
147      * 添加请求RequestMessage
148      */
149     private void requestMessageListen(){
150         this.getSpiderListeners().add(new SpiderListener() {
151             @Override
152             public void onSuccess(Request request) {
153                 ackMq(request);
154             }
155             @Override
156             public void onError(Request request) {
157                 ackMq(request);
158             }
159         });
160     }
161 
162     public  void ackMq(Request request){
163         try {
164             QueueingConsumer.Delivery delivery=ackMap.get(request.getUrl());
165             if(delivery!=null){
166                 consumer.ackMessage(delivery);
167                 ackMap.remove(request.getUrl());
168             }
169         } catch (IOException e) {
170             e.printStackTrace();
171         }
172     }
173 
174 
175 }

 

spider根据级别添加MqPipeline或者EsPipeline,将处理后的数据添加到MQ或者ES之中:

package com.chinaredstar.jc.core.pipeline;

import com.chinaredstar.jc.core.page.CrawlerPage;
import com.chinaredstar.jc.crawler.producer.Producer;
import com.chinaredstar.jc.infras.utils.JSONUtil;
import org.apache.commons.collections4.CollectionUtils;
import us.codecraft.webmagic.ResultItems;
import us.codecraft.webmagic.Task;
import us.codecraft.webmagic.pipeline.Pipeline;

import java.io.IOException;
import java.util.List;

/**
 * 消息队列pipeline
 * @author zhuangj
 * @date 2017/12/1
 */
public class MqPipeline implements Pipeline {

    private Producer producer;

    public MqPipeline(Producer producer) {
        this.producer = producer;
    }

    public Producer getProducer() {
        return producer;
    }

    public void setProducer(Producer producer) {
        this.producer = producer;
    }

    @Override
    public void process(ResultItems resultItems, Task task) {
        try {
            List<CrawlerPage> crawlerPageList= resultItems.get("nextPageList");
            if(CollectionUtils.isEmpty(crawlerPageList)){
                return;
            }
            for(CrawlerPage page:crawlerPageList){
//                System.out.println("into MQ:"+JSONUtil.toJSonString(page));
                producer.basicPublish(JSONUtil.toJSonString(page));
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
package com.chinaredstar.jc.core.pipeline;

import com.chinaredstar.jc.core.es.EsConnectionPool;
import com.chinaredstar.jc.infras.utils.json.JsonFormatter;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.xcontent.XContentType;
import us.codecraft.webmagic.ResultItems;
import us.codecraft.webmagic.Task;
import us.codecraft.webmagic.pipeline.Pipeline;

import java.io.IOException;
import java.net.UnknownHostException;

/**
 * Created by zhuangj on 2017/12/1.
 */
public class EsPipeline implements Pipeline {

    private String taskName;

    private EsConnectionPool pool=new EsConnectionPool(3);

    public EsPipeline(String taskName) {
        this.taskName=taskName;
    }

    @Override
    public void process(ResultItems resultItems, Task task) {
        try {
            TransportClient client=pool.get();
            this.createIndex(client,taskName);
            this.insertData(client,taskName,"crawler",null, JsonFormatter.toJsonAsString(resultItems.getAll()));
//            System.out.println("save ES:" + JsonFormatter.toJsonAsString(resultItems.getAll()));
            pool.returnToPool(client);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * 创建索引
     */
    private boolean createIndex(TransportClient client,String index) {
        try {
            if (isIndexExist(client,index)) {
                return true;
            }
            client.admin().indices().create(new CreateIndexRequest(index)).actionGet();
            return true;
        }catch (Exception e){
            return false;
        }
    }


    /**
     * 查询索引是否存在
     * @param index
     * @return
     */
    private boolean isIndexExist(TransportClient client,String index) throws UnknownHostException, InterruptedException {
        IndicesExistsRequest request = new IndicesExistsRequest(index);
        IndicesExistsResponse response = client.admin().indices().exists(request).actionGet();
        return response.isExists();
    }


    /**
     * 导入数据
     *
     * @param index
     * @param type
     * @param id
     * @param data
     * @throws IOException
     */
    private BulkResponse insertData(TransportClient client,String index, String type, String id, String data) throws IOException, InterruptedException {
        //核心方法BulkRequestBuilder拼接多个Json
        BulkRequestBuilder bulkRequest = client.prepareBulk();
        IndexRequestBuilder requestBuilder = client.prepareIndex(index, type, id).setSource(data, XContentType.JSON);
        bulkRequest.add(requestBuilder);
        //插入文档至ES, 完成!
        BulkResponse bulkRequestBuilder=bulkRequest.execute().actionGet();
        return bulkRequestBuilder;
    }

}

上方的EsPipine中创建了一个EsConnectionPool,使用池技术重用ES的conection,提高了数据储存到ES中速度。

 

 

 在业务代码中,将spider创建出来,并递归地创建子级spider

package com.chinaredstar.jc.crawler.biz.service.impl;

import com.chinaredstar.jc.core.downloader.JcHttpClientDownloader;
import com.chinaredstar.jc.core.page.CrawlerPage;
import com.chinaredstar.jc.core.pipeline.EsPipeline;
import com.chinaredstar.jc.core.pipeline.MqPipeline;
import com.chinaredstar.jc.core.processor.jd.JdProcessorLevelEnum;
import com.chinaredstar.jc.core.spider.JcSpider;
import com.chinaredstar.jc.crawler.biz.service.IJdService;
import com.chinaredstar.jc.crawler.channel.MqChannel;
import com.chinaredstar.jc.crawler.common.MqConnectionFactory;
import com.chinaredstar.jc.crawler.consumer.Consumer;
import com.chinaredstar.jc.crawler.exchange.DefaultExchange;
import com.chinaredstar.jc.crawler.exchange.Exchange;
import com.chinaredstar.jc.crawler.producer.Producer;
import org.springframework.stereotype.Service;
import us.codecraft.webmagic.processor.PageProcessor;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeoutException;

/**
 *
 * 京东数据爬取服务类
 * @author zhuangj
 * @date 2017/11/29
 */
@Service
public class JdServiceImpl implements IJdService {

    @Override
    public List<CrawlerPage> startSpider(String url,String taskName,Integer maxLevel) throws IOException, TimeoutException, InterruptedException {
        for(int level=1;level<maxLevel;level++){
            createJcSpider(url, taskName, maxLevel,level,null);
        }
       return null;
    }

    @Override
    public List<CrawlerPage> startSpider(String url, String taskName, Integer level, Integer maxLevel) throws IOException, TimeoutException, InterruptedException {
        createJcSpider(url, taskName, maxLevel,level,null);
        return null;
    }



    private void createJcSpider(String url, String taskName, Integer maxLevel,Integer level,JcSpider parentSpider) throws IOException, TimeoutException, InterruptedException {
        PageProcessor pageProcessor= JdProcessorLevelEnum.getProcessorByLevel(level);
        JcSpider jcSpider=new JcSpider(pageProcessor);
        jcSpider.setUUID(taskName+level);
        jcSpider.setDownloader(new JcHttpClientDownloader());
        jcSpider.setParentSpider(parentSpider);


        String producerQueueName=taskName+level;
        String consumerQueueName=taskName+(level-1);

        MqChannel mqChannelProducer=createMqChannel(producerQueueName);
        Producer producer=createProduct(mqChannelProducer);

        MqChannel mqChannelConsumer=createMqChannel(consumerQueueName);
        Consumer consumer=createConsumer(mqChannelConsumer);
        jcSpider.setConsumer(consumer);
        jcSpider.setConsumerQueueName(consumerQueueName);



        //最后一级直接进入ES,所以不用进MQ,不需要MQ生产者
        if(level<maxLevel){
            jcSpider.addPipeline(new MqPipeline(producer));
        }else {
            jcSpider.addPipeline(new EsPipeline(taskName));
        }

        //第一级不需要从MQ中取数据,所以不需要消费者
        if(level==1){
            jcSpider.addUrl(url);
            jcSpider.setConsumer(null);
        }

        jcSpider.thread(10).start();
        jcSpider.startConsumer(consumerQueueName);




        //创建子集
        if(level<maxLevel){
            //稍等等待父级spider和consumer启动
            Thread.sleep(2000);
            createJcSpider(url,taskName,maxLevel,level+1,jcSpider);
        }
    }






    /**
     * 创建消费者
     * @param mqChannel
     * @return
     * @throws IOException
     * @throws TimeoutException
     */
    private Producer createProduct(MqChannel mqChannel) throws IOException, TimeoutException {
        return mqChannel.createProducer();
    }

    /**
     * 创建生产者
     * @param mqChannel
     * @return
     * @throws IOException
     * @throws TimeoutException
     */
    private Consumer createConsumer(MqChannel mqChannel) throws IOException, TimeoutException {
        return mqChannel.createConsumer();
    }

    /**
     * 创建连接渠道
     * @return
     * @throws IOException
     * @throws TimeoutException
     */
    private MqChannel createMqChannel(String queueName) throws IOException, TimeoutException {
        MqChannel mqChannel=MqConnectionFactory.getConnectionChannel();
        Exchange exchange=new DefaultExchange(mqChannel.getChannel(),queueName);
        mqChannel.setExchange(exchange);
        return  mqChannel;
    }

}

 

最后启动项目,跑一遍结果:

rabbitMq中的创建的相应的队列并且跑起了数据,unacked问题尚未解决。

 

数据存入ES:

因为分析相对简单,只获取了部分数据。

 

有任何的不合适的地方还请指正。

 

posted on 2017-12-12 17:26 阿姆斯特朗回旋炮 阅读() 评论() 编辑 收藏

版权声明:本文为null-qige原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:http://www.cnblogs.com/null-qige/p/8028832.html