你的位置:首页 > Java教程

[Java教程]爬虫入门——02


1. 引言


    在上一篇中,我们简单的了解了爬虫的工作流程,也简单的实现了一个爬虫,并且在文末简单分析了目前存在的问题。这一篇博客将会对上一篇分析出的问题,给出改进方法。我们将从以下几个方面加以改进。

2. 改进


(1) Bloom Filter

    我们首先利用Bloom Filet来改进UrlQueue中的visitedSet。

    在上一篇中,我们使用visitedSet(HashSet)来存放已经访问过的url。之所以使用HashSet是因为我们需要不断的插入url到visitedSet中,并且还需要频繁地判断某个url是否在其中,而采用Hash Table,在平均情况下,所有的字典操作在O(1)时间内都能完成(具体分析请看散列表(hash table)——算法导论(13))。但不足之处在于我们需要花费大量的内存空间去维护hash table,我们是否可以减小它的空间复杂度呢?

    从visitedSet的作用入手,它只是用来判断某个url是否被包含在它内部,仅此而已。因此完全没有必要保存每个url的完整信息,保存指纹信息即可。这时我们可想到常用的md5和sha1摘要算法。但尽管对url做了压缩,我们还是需要去保存压缩后的信息。我们还有没有更好的方法呢?这时,Bloom Filter就派上用场了(关于Bloom Filter的介绍及实现在散列表(hash table)——算法导论(13)中的最后一小节)。

(2) Berkeley DB

    我们再来使用Berkeley DB改进我们UrlQueue中的unvisitedList。

    Berkeley DB是一个嵌入式数据库系统,简单、小巧、性能高(简单小巧没得说,至于性能,没验证过)。关于Berkeley DB的下载和使用请到其官网:http://www.oracle.com/technetwork/database/database-technologies/berkeleydb/overview/index.html

    使用Berkeley DB后,我们会将从页面解析出的url直接存入DB中,而unvisitedList只是作为从DB中取url时的缓冲池。即我们会开启一个线程以一定的频率从DB中读取一定数量的url到unvisitedList中,执行页面请求的线程还是从unvisitedList读取url。

(3). 多线程

    最后我们引入多线程来提高爬虫的效率。

    多线程的关键在于同步与通信。这些内容请自行百度。

改进后的整个结构图如下:

image

3. 实现


(1) 代码

最后我们给出改进后的代码:

① 首先是改进后的UrlQueue.java,我们重命名为BloomQueue.java(其中的BloomFilter类,在散列表(hash table)——算法导论(13)中可找到)

public class BloomQueue<T> {  private BloomFilter<T> bloomFilter;  private LinkedBlockingQueue<T> visitedList;  private AtomicInteger flowedCount;  private int queueCapacity;  public BloomQueue() {    this(0.000001, 10000000, 500);  }  public BloomQueue(double falsePositiveProbability, int filterCapacity, int queueCapacity) {    this.queueCapacity = queueCapacity;    bloomFilter = new BloomFilter<>(falsePositiveProbability, filterCapacity);    visitedList = new LinkedBlockingQueue<>(queueCapacity);    flowedCount = new AtomicInteger(0);  }  /**   * 入队(当无法入队时,默认阻塞3秒)   *   * @param t   * @return   */  public boolean enqueue(T t) {    return enqueue(t, 3000);  }  /**   * 入队   *   * @param t   * @param timeout   *      单位为毫秒   */  public boolean enqueue(T t, long timeout) {    try {      boolean result = visitedList.offer(t, timeout, TimeUnit.MILLISECONDS);      if (result) {        bloomFilter.add(t);        flowedCount.getAndIncrement();      }      return result;    } catch (InterruptedException e) {      e.printStackTrace();    }    return false;  }  /**   * 出队(当队列为空时,默认会阻塞3秒)   *   * @return   */  public T dequeue() {    return dequeue(3000);  }  /**   * 出队   *   * @param timeout   *      单位为毫秒   * @return   */  public T dequeue(long timeout) {    try {      return visitedList.poll(timeout, TimeUnit.MILLISECONDS);    } catch (InterruptedException e) {    }    return null;  }  /**   * 当前是否包含   *   * @return   */  public boolean contains(T t) {    return visitedList.contains(t);  }  /**   * 曾经是否包含   *   * @param t   * @return   */  public boolean contained(T t) {    return bloomFilter.contains(t);  }  public boolean isEmpty() {    return visitedList.isEmpty();  }  public boolean isFull() {    return visitedList.size() == queueCapacity;  }  public int size() {    return visitedList.size();  }  public int flowedCount() {    return flowedCount.get();  }  @Override  public String toString() {    return visitedList.toString();  }}

② 然后我们对Berkeley DB做一个简单的封装,便于使用。

public class DBHelper<T> {  public static final String DEFAULT_DB_DIR = "C:/Users/Administrator/Desktop/db/";  public static final String DEFAULT_Entity_Store = "EntityStore";  public Environment myEnv;  public EntityStore store;  public PrimaryIndex<Long, T> primaryIndex;  public DBHelper(Class<T> clazz) {    this(clazz, DEFAULT_DB_DIR, DEFAULT_Entity_Store, false);  }  public DBHelper(Class<T> clazz, String dbDir, String storeName, boolean isRead) {    File dir = new File(dbDir);    if (!dir.exists()) {      dir.mkdirs();    }    EnvironmentConfig envConfig = new EnvironmentConfig();    envConfig.setAllowCreate(!isRead);    // Environment    myEnv = new Environment(dir, envConfig);    // StoreConfig    StoreConfig storeConfig = new StoreConfig();    storeConfig.setAllowCreate(!isRead);    // store    store = new EntityStore(myEnv, storeName, storeConfig);    // PrimaryIndex    primaryIndex = store.getPrimaryIndex(Long.class, clazz);  }  public void put(T t) {    primaryIndex.put(t);    store.sync();    myEnv.sync();  }  public EntityCursor<T> entities() {    return primaryIndex.entities();  }  public T get(long key) {    return primaryIndex.get(key);  }  public void close() {    if (store != null) {      store.close();    }    if (myEnv != null) {      myEnv.cleanLog();      myEnv.close();    }  }}

③ 接着我们写一个url的entity,便于存储。

import com.sleepycat.persist.model.Entity;import com.sleepycat.persist.model.PrimaryKey;@Entitypublic class Url {  @PrimaryKey(sequence = "Sequence_Namespace")  private long id;  private String url;  public Url() {  }  public Url(String url) {    super();    this.url = url;  }  public long getId() {    return id;  }  public void setId(long id) {    this.id = id;  }  public String getUrl() {    return url;  }  public void setUrl(String url) {    this.url = url;  }  @Override  public String toString() {    return url;  }  public boolean isEmpty() {    return url == null || url.isEmpty();  }}

④最后是我们的核心类CrawlerEngine。该类有两个内部类:Feeder和Fetcher。Feeder意为饲养员、进料器,负责向urlQueue中添加url;Fetcher意为抓取者,负责从urlQueue中取出url,进行请求,解析。其中用到的JsoupDownLoader类和上一篇一样,保持不变。

public class CrawlerEngine {  public static final String DEFAULT_SAVE_DIR = "C:/Users/Administrator/Desktop/html/";  private static final long FEEDER_SLEEP_TIME = 10;  private static final long FEEDER_MAX_WAIT_TIME = 3 * 1000;// 当DB中取不到url时,feeder最长等待时间(即如果等待该时间后,DB还是为空,则feeder结束工作)  private static final int FEEDER_MAX_WAIT_COUNT = (int) (FEEDER_MAX_WAIT_TIME / FEEDER_SLEEP_TIME);// 当DB中取不到url时,feeder最长等待时间(即如果等待该时间后,DB还是为空,则feeder结束工作)  private static final boolean LOG = false;  private BloomQueue<Url> urlQueue;  private ExecutorService fetcherPool;  private int fetcherCount;  private boolean running;  private DBHelper<Url> dbHelper;  private JsoupDownloader downloader;  private String parseRegex;  private String saveRegex;  private String saveDir;  private String saveName;  private long maxCount = 1000;  private long startTime;  private long endTime = Long.MAX_VALUE;  public CrawlerEngine() {    this(20, DEFAULT_SAVE_DIR, null);  }  public CrawlerEngine(int fetcherCount, String saveDir, String saveName) {    this.fetcherCount = fetcherCount;    urlQueue = new BloomQueue<>();    fetcherPool = Executors.newFixedThreadPool(fetcherCount);    dbHelper = new DBHelper<>(Url.class);    downloader = JsoupDownloader.getInstance();    this.saveDir = saveDir;    this.saveName = saveName;  }  public void startUp(String[] seeds) {    if (running) {      return;    }    running = true;    startTime = System.currentTimeMillis();    for (String seed : seeds) {      Url url = new Url(seed);      urlQueue.enqueue(url);    }    for (int i = 0; i < fetcherCount; i++) {      fetcherPool.execute(new Fetcher());    }    new Feeder().start();  }  public void shutdownNow() {    running = false;    fetcherPool.shutdown();  }  public void shutdownAtTime(long time) {    if (time > startTime) {      endTime = time;    }  }  public void shutdownDelayed(long delayed) {    shutdownAtTime(startTime + delayed);  }  public void shutdownAtCount(long count) {    maxCount = count;  }  private boolean isEnd() {    return urlQueue.flowedCount() > maxCount || System.currentTimeMillis() > endTime;  }  private long currId = 1;  private int currWaitCount;  /**   * 饲养员   * <p>   * 从DB中获取一定数量的url到queue   * </p>   *   * @author D.K   *   */  private class Feeder extends Thread {    @Override    public void run() {      while (!isEnd() && running && currWaitCount != FEEDER_MAX_WAIT_COUNT) {        try {          sleep(FEEDER_SLEEP_TIME);          if (urlQueue.isFull()) {            log("Feeder", "队列已满");            continue;          }          Url url = dbHelper.get(currId);          if (url == null) {            currWaitCount++;            log("Feeder", "url为null,currWaitCount = " + currWaitCount);          } else {            while (urlQueue.contained(url)) {              currId++;              url = dbHelper.get(currId);            }            if (url != null) {              log("Feeder", "url准备入队");              urlQueue.enqueue(url);              currId++;              log("Feeder", "url已经入队,currId = " + currId);              currWaitCount = 0;            }          }        } catch (Exception e) {          e.printStackTrace();        }      }      log("Feeder", "执行结束...");      while (true) {        try {          sleep(100);          log("Feeder", "等待Fetcher结束...");        } catch (InterruptedException e) {        }        if (urlQueue.isEmpty()) {          shutdownNow();          System.out.println(">>>>>>>>>>>>爬取结束,共请求了" + urlQueue.flowedCount() + "个页面,用时" + (System.currentTimeMillis() - startTime) + "毫秒<<<<<<<<<<<<");          return;        }      }    }  }  /**   * 抓取者   * <p>   * 从queue中取出url,下载页面,解析页面,并把解析出的新的url添加到DB中   * </p>   *   * @author D.K   *   */  private class Fetcher implements Runnable {    @Override    public void run() {      while (!isEnd() && (running || !urlQueue.isEmpty())) {        log("Fetcher", "开始从队列获取url,size=" + urlQueue.size());        Url url = urlQueue.dequeue();        if (url == null) {          log("Fetcher", "url为null");          continue;        }        log("Fetcher", "取出了url");        Document doc = downloader.downloadPage(url.getUrl());        Set<String> urlSet = downloader.parsePage(doc, parseRegex);        downloader.savePage(doc, saveDir, saveName, saveRegex);        for (String str : urlSet) {          Url u = new Url(str);          if (!urlQueue.contained(u)) {            dbHelper.put(u);          }        }      }    }  }  private void log(String talker, String content) {    if (LOG) {      System.out.println("[" + talker + "] " + content);    }  }  public String getParseRegex() {    return parseRegex;  }  public void setParseRegex(String parseRegex) {    this.parseRegex = parseRegex;  }  public String getSaveRegex() {    return saveRegex;  }  public void setSaveRegex(String saveRegex) {    this.saveRegex = saveRegex;  }  public void setSavePath(String saveDir, String saveName) {    this.saveDir = saveDir;    this.saveName = saveName;  }}

(2) 测试

    我们采用上一篇的测试例子来做同样的测试,以检验我们优化后的效果。下面是测试代码:

public class Test {  public static void main(String[] args) throws InterruptedException {    CrawlerEngine crawlerEngine = new CrawlerEngine();    crawlerEngine.setParseRegex("(http://www.cnblogs.com/artech/p|http://www.cnblogs.com/artech/default|http://www.cnblogs.com/artech/archive/\\d{4}/\\d{2}/\\d{2}/).*");    crawlerEngine.setSaveRegex("(http://www.cnblogs.com/artech/p|http://www.cnblogs.com/artech/archive/\\d{4}/\\d{2}/\\d{2}/).*");    crawlerEngine.startUp(new String[] { "http://www.cnblogs.com/artech/" });    crawlerEngine.shutdownAtCount(1000);  }}

下面是运行结果:

image

image

(4) 总结


    对比我们上一篇中的测试时间61s,改进后用时14s,效率有明显的提升。

    在下一篇中,我们要对整个代码再次进行小的优化,完善一些细节,如对请求状态码的处理,抽取出一些接口以降低代码之间的耦合度,增强灵活性。