Source code for crawly

# -*- encoding: utf8 -*-
"Crawly is a small library to crawl websites."
import sys
import json
import getopt
import pprint
import inspect
import urlparse
import logging.config
import itertools as it
from functools import partial
from datetime import datetime
from collections import Counter

import lxml
import lxml.html

import gevent
from gevent import monkey
from gevent.pool import Pool, Group
from gevent.queue import Queue, Empty
from gevent.event import AsyncResult

# Patch std library before importing requests.
monkey.patch_all(thread=False, select=False)

from requests import Session


__author__ = 'Mouad Benchchaoui'
__version__ = '0.1b'
__copyright__ = 'Copyright © 2012, Mouad Benchchaoui'
__license__ = 'BSD license'


__all__ = ['Pagination', 'WebPage', 'WebSite', 'XPath', 'HTML', 'runner']


# Global Configuration.
_CONFIG = {
    'timeout': 15,
    # Requests configuration: http://tinyurl.com/dyvdj57
    'requests': {
        'base_headers': {
            'Accept': '*/*',
            'Accept-Encoding': 'identity, deflate, compress, gzip',
            'User-Agent': 'crawly/' + __version__
        },
        'danger_mode': False,
        'encode_uri': True,
        'keep_alive': True,
        'max_redirects': 30,
        # FIXME: Test max retries !?
        'max_retries': 3,
        'pool_connections': 10,
        'pool_maxsize': 10,
        'safe_mode': True,   # Default to False.
        'strict_mode': False,
        'trust_env': True,
        'verbose': False
    },
    # Logging configuration: http://tinyurl.com/crt6rkw
    'logging': {
        'version': 1,
        'formatters': {
            'standard': {
                'format': '%(asctime)s [%(levelname)s] %(name)s: %(message)s'
            }
        },
        'handlers': {
            'console': {
                'formatter': 'standard',
                'class': 'logging.StreamHandler',
            }
        },
        'loggers': {
            '': {
                'handlers': ['console'],
                'level': 'DEBUG',
                'propagate': False,
            }
        }
    }
}


[docs]class ExtractionError(Exception): "Error raised when extracting data from HTML fail."
class _AsyncRequest(AsyncResult): """Asynchrone request. This class is a wrapper around ``requests.Request`` and in the same time a subclass of ``gevent.event.AsyncResult``, which mean that when we call ``request.response`` we are actually performing a ``gevent.gevent.AsyncResult.get`` which block and trigger a context-switch and before that it send a request to fetch the page content, and when it's finish it inform and wake up all the Greenlets waiting for the response. Arguments: *args, **kws: Same arguments as ``requests.Session.request`` method. Example: >>> request = _AsyncRequest('GET', 'http://google.com') >>> request.response <Response [200]> """ def __init__(self, *args, **kws): super(_AsyncRequest, self).__init__() kws.setdefault('hooks', {}).update({'response': self.set}) self._request = runner.build_request(*args, **kws) def _send(self): "Send this request in it's own greenlet (using the ``runner`` pool)." return runner.fetch(self) @property def pretty_url(self): "Get a beautiful URL representation in the form <(method: data) url>." method = self._request.method if self._request.data: method = '(%s: %s)' % (method, self._request.data) return '<%s %s>' % (method, self._request.full_url) def __getattr__(self, attr): "Return ``requests.Session.Request`` instance attribute." return getattr(self._request, attr) @property def response(self): """Get the response and send request if it's not already sent. If the request wasn't sent yet, accessing this attribute will first trigger sending the request and than it will wait and block until the response is available. Blocking mean that if this attribute is accessed in a greenlet it will trigger a context switch and the current greenlet will be awaken when the response is received. """ if not self.sent: self._send() return self.get()
[docs]class XPath(object): r"""Callable class that define XPATH query with callbacks. Arguments: - xpath: A string representing the XPath query. - callbacks: A list of functions to call in order (first to last) over the result returned by ``lxml.etree.XPath``, this class have also a ``callbacks`` class variable that can be set by subclasses which have priority over the callbaks passed in this argument, which mean that callbacks passed here will be called after the class variable callbacks. Illustration :: XPath("...", callback1, callback2, callback3) <=> callback3( callback2( callback1( XPath("...") ) ) ) Raise: ExtractionError if extraction failed. Example :: >>> import string >>> x = XPath('//div/h2/text()', string.strip) >>> x('<html><body><div><h2>\r\ntest\n</h2></div></body></html>') 'test' >>> x = XPath('//ul/li/text()', lambda ls: map(int, ls)) >>> x('<html><body><ul><li>1</li><li>2</li></ul></body></html>') [1, 2] """ callbacks = [] def __init__(self, xpath, *callbacks): self._xpath = lxml.etree.XPath(xpath) self._callbacks = self.callbacks + list(callbacks) def __call__(self, html, *args, **kws): if isinstance(html, basestring): html = lxml.html.fromstring(html) result = self._xpath(html, *args, **kws) if isinstance(result, list): if len(result) == 1: result = result[0] elif len(result) == 0: result = '' try: for i, func in enumerate(self._callbacks): result = func(result) except Exception as ex: # Catch all exception and raise a more meaningful exception. raise ExtractionError('Callback #%d failed: %s' % (i, ex)) return result
[docs]class HTML(object): """Class to represent HTML code. This class is a wrapper around ``lxml.html.HtmlElement`` class, so you can interact with instance of this class in the same way you do with ``lxml.html.HtmlElement`` instances, with the addition that this class define a new method ``extract`` that allow extracting data from html by supplying callables (or dictionary of callables) to do so. Example :: >>> html = HTML('<html><body><div><h2>test</h2></div></body></html>') >>> html.extract('//div/h2/text()') 'test' """ def __init__(self, html): self._html = lxml.html.fromstring(html) def __getattr__(self, attr): "Get ``lxml.html.HtmlElement`` instance attribute." return getattr(self._html, attr)
[docs] def extract(self, extractor): """Extract from this HTML the data given. Argument: extractor: Can be a dictionary in the form {'name': <callable> or <string>}, or unique callable object that accept a ``lxml.html.HtmlElement`` e.g. ``XPath`` class instance or a string which in this case the string will be automatically transformed to an XPath instance. Return: The extracted data in the form of a dictionary if the ``extractor`` argument given was a dictionary else it return a list or string depending on the extractor callbacks. Raise: ExtractionError if extraction failed. """ if not isinstance(extractor, dict): # String is transformed to XPath. if isinstance(extractor, basestring): extractor = XPath(extractor) return extractor(self._html) result = {} try: for key, ext in extractor.iteritems(): if isinstance(ext, basestring): ext = XPath(ext) result[key] = ext(self._html) except Exception as ex: # Catch all exception and raise a more meaningful exception. raise ExtractionError('Extracting "%s" failed: %s' % (key, ex)) return result
[docs]class WebPage(object): """Class that represent a WEB site page that can be used to extract data or extract links to follow. Extract data from the page :: >>> class PythonJobs(WebPage): ... toextract = { ... 'title': '//div[5]/div/div/div[2]/h2/a/text()' ... } ... >>> page = PythonJobs('http://www.python.org/community/jobs/') >>> page.extract() # doctest: +ELLIPSIS {'title': ...} Extract links to follow :: >>> class MyWebPage(WebPage): ... tofollow = XPath('//div[5]/div/div/div[2]/h2/a/@href') ... >>> page = MyWebPage('http://www.python.org/community/jobs/') >>> list(page.follow_links()) # doctest: +SKIP [...] Arguments: - url_or_request: This argument can be a string representing the URL of this page or for better customizing it can be also a request. - parent: A ``WebPage`` or a ``WebSite`` instance that represent the parent site/page of this one. - initial: Initial data related to this page. """ # Define extractor callbacks of data to extract from this WEB page. toextract = None # Define XPath (or an extractor callback) of links to follow. tofollow = None # Class that will wrap new WEB pages generated by following links. WebPageCls = None def __init__(self, url_or_request, parent=None, initial=None): if isinstance(url_or_request, basestring): self._request = self._build_request(url_or_request) else: self._request = url_or_request self._html = None self._data = initial or {} self._parent = parent # Set to false when self.data is first accessed. self.__firstime = True def __str__(self): return '%s: %s' % (self.__class__, self.url) __repr__ = __str__ @property
[docs] def url(self): "Get a pretty URL of this page in the form <(method: data) url>." return self._request.pretty_url
@property
[docs] def request(self): "Get the request used by this page." return self._request
@property
[docs] def html(self): "Get the HTML of this page as a ``HTML`` class instance." if self._html is None: # Compute only once. response = self._request.response self._html = HTML(response.text) return self._html
[docs] def extract(self, toextract=None, update=True): """Extract the data given by sections. Argument: - toextract: same argument accepted by ``HTML.extract`` method. - update: Boolean that enable updating the internal data holder when it's set to True (default) else it will return extracted data w/o updating internal data holder. Return: Extracted data using ``toextract``. Raise: - ``ExtractionError`` if extraction failed. - ``ValueError`` if the argument didn't follow the documentation guidline. """ toextract = toextract or self.toextract if toextract is None: raise ValueError('No sections was given') data = self.html.extract(toextract) if update: self._data.update(data) return data
def _build_request(self, url, method='GET'): "Build a request to ``url`` using HTTP ``method``." return _AsyncRequest(method, url)
[docs] def _getdata(self): """Method that is meant to be overrided by third-party that should return extra data as a dictionary to update the extracted data with. This method is called only once when accessing the ``data`` attribute the first time. """ return NotImplemented
@property
[docs] def data(self): """Get extracted data. **WARNING**: This property will recalculate each time the data to return when it's accessed, so be careful about side effect, what i mean by that is if you override this method and for example the new method define a new value that change in each call e.g. ``datetime.now()``, than you will have inconsistency in your data. In this case and if inconsistency is a problem, developers should use ``_getdata`` method instead to define any extra data, which is computed only the first time this property is accessed. """ if self.__firstime: extra = self._getdata() if extra is not NotImplemented: self._data.update(extra) self.__firstime = False return self._data
[docs]class WebSite(object): """An abstract super class that represent a website. Class inheriting from this class should implement the ``url`` class variable, else this class will raise an Exception. Examples :: >>> class PythonQuestions(WebSite): ... url = "http://stackoverflow.com/question/tagged/python" ... Pagination = Pagination( ... 'http://stackoverflow.com/questions/tagged/python', ... data={'page': '{page}'}, ... end=4 ... ) ... >>> [page.url for page in PythonQuestions().pages] # doctest: +NORMALIZE_WHITESPACE ['<GET http://stackoverflow.com/questions/tagged/python?page=1>', '<GET http://stackoverflow.com/questions/tagged/python?page=2>', '<GET http://stackoverflow.com/questions/tagged/python?page=3>', '<GET http://stackoverflow.com/questions/tagged/python?page=4>'] """ WebPageCls = WebPage # Class to return for each extracted page. Pagination = None # Set to a Pagination class instance. url = None # Website URL. def __new__(cls, *args, **kws): if cls.url is None: raise Exception( "'url' wasn't set for website class." ) return super(WebSite, cls).__new__(cls) @property
[docs] def pages(self): """Get pages from the website. If ``Pagination`` class variable was set, this return a list of pages yield by the pagination, else it return a list containing only this ``WebSite`` in a ``WebPage``. """ WebPageCls = partial(self.WebPageCls, parent=self) if self.Pagination is not None: return it.imap(WebPageCls, self.Pagination) return [WebPageCls(self.url)]
[docs]class _Runner(object): """Class to manage running all requests concurrently and extracting data from each website page. """ def _init(self): # Configure logging. logging.config.dictConfig(self.config['logging']) # Hold greenlets that fetch pages which limit concurrent requests. self._requests = Pool(self.config['requests']['pool_maxsize']) # Hold greenlets that extract data. self._extractors = Group() # Hold greenlets that write to pipelines. self._pipes = Group() # Hold web page to process. self._toprocess = Queue(-1) # Hold pipeline functions to call after extraction. self._pipelines = [] # Predicate that check when stop crawling. self._takewhile = lambda *args: True # Predicate to define URLs to crawl. self._filter = lambda *args: False # Request session. self._session = Session(config=self.config['requests']) # Report data. self._report = { 'CRAWLED URLS': 0, 'EXTRACTED DATA': 0, 'EXCEPTIONS COUNTER': Counter(), 'START TIME': None, 'FINISH TIME': None, 'SHUTDOWN REASON': 'FINISH CRAWLING' } # Function to call at any greenlet exception. self._exception_func = None # Function to call when crawling finished. self._finish_func = None @property
[docs] def config(self): """Read configuration from command line. Command line argument: --config: Configuration file in JSON format which will replace default configuration that is taken from the global variable ``CONFIG``. """ # TODO: sys.argv[1:] is hardcoded. try: return self._config except AttributeError: self._config = _CONFIG.copy() opts, _ = getopt.getopt(sys.argv[1:], '', ['config=']) for opt, value in opts: if opt == '--config': with open(value) as fp: self._config.update(json.load(fp)) else: sys.exit('Unknown option: %s' % opt) return self._config
[docs] def set_website(self, website): """Set the website to crawl, the ``website`` argument can be an instance or a class that inherit from ``WebSite`` class. Return: ``self`` to allow Fluent Interface creation pattern. """ if inspect.isclass(website): website = website() self._website = website return self
[docs] def add_pipeline(self, pipeline): """Add a pipeline which is a callable that accept a WebPage instance, which will be passed after extracting all the data. Return: ``self`` to allow "Fluent Interface" creation pattern. """ self._pipelines.append(pipeline) return self
[docs] def takewhile(self, predicate): """Add a predicate to take URLs from which the predicate return True. Argument: predicate: A function that accept a page as an argument and return a boolean; when the predicate return False all URLs after this one in the website will not be fetched. Return: ``self`` to allow "Fluent Interface" creation pattern. WARNING: The page when passed to the ``predicate`` is not fetched yet, so no data is extracted from this page yet. """ self._takewhile = predicate return self
[docs] def filter(self, predicate): """Add a predicate to filter pages (URLs) to only the ones from which the predicate return True. The difference between this method and ``takewhile`` is that ``filter`` method allow only to filter individual URLs while ``takewhile`` will stop at a given URL when the predicate return True and all URLs which come after this last URL will not be crawled Return: ``self`` to allow "Fluent Interface" creation pattern. """ self._filter = predicate return self
[docs] def on_finish(self, func): """Add a function to be executed when the crawler finish crawling and all the greenlet has been joined. Argument: func: A function that should accept no arguments. Return: ``self`` to allow "Fluent Interface" creation pattern. """ self._finish_func = func return self
[docs] def on_exception(self, func): """Add a function to be executed when the crawler find an exception. Argument: func: A function that should accept two arguments: ``runner`` and ``greenlet``, which will hold respectively this runner instance and the greenlet that raised the exception. Return: ``self`` to allow "Fluent Interface" creation pattern. """ self._exception_func = func
[docs] def build_request(self, method, url, **kws): """Build a request from the configured ``requests.Session``. Arguments: The same as ``requests.Session.request`` method. Return: An **unsent** request unless ``return_response`` was explicitly set to True. """ kws['timeout'] = self.config['timeout'] # Request timeout. kws.setdefault('return_response', False) # Don't send request. return self._session.request(method, url, **kws)
[docs] def fetch(self, request): "Fetch a request in a greenlet of a pool with limited size." # Create an inner function to only show the logging message when the # pool start fetching the request, b/c the pool have a limited size. def _send(*args, **kws): self.log('Fetching %s' % request.pretty_url) request.send(*args, **kws) g = self._requests.spawn(_send, prefetch=False) # Increment report key when URL was succefully fetched. g.link_value(self._on_fetched) g.link_exception(self._on_exception)
[docs] def log(self, msg, level=logging.INFO): "Log a message under ``level``, default to INFO." logging.log(level, msg)
[docs] def start(self): "Start/Launch crawling." self.log('Start Crawling ...') self._init() self._report['START TIME'] = datetime.now() error = None try: for page in self._website.pages: self._toprocess.put(page) gevent.spawn(self._loop).join() except: error = sys.exc_info()[1] self._report['EXCEPTIONS COUNTER'][error] += 1 raise finally: self._finish(reason=error)
def _loop(self): """Main loop that fetch every page and extract data from this later each action in an eventlet, until all pages are treated. """ while 1: try: page = self._toprocess.get_nowait() if page is StopIteration: break except Empty: # Wait until all requests and all extractors are done. if len(self._requests) == len(self._extractors) == 0: break gevent.sleep() else: g = self._extractors.spawn(self._extract, page) g.link_exception(self._on_exception) gevent.sleep() def _on_exception(self, greenlet): """Callback called when an exception is raised in greenlets that are spawned by this class, which include: * greenlet responsible of fetching URLs. * greenlet responsible of extracting data from HTML. * greenlet responsilbe of writing into pipelines. This method report the exception. """ if self._exception_func: self._exception_func(self, greenlet) self._report['EXCEPTIONS COUNTER'] \ [greenlet.exception.__class__.__name__] += 1 def _on_fetched(self, greenlet): """Callback called when an URL was fetched. This method report the fetched URL. """ self.report['CRAWLED URLS'] += 1 def _extract(self, page): """Extract data from a crawled web page. If the ``page`` contain links to follow this later will be added to the Queue of webpages to crawl. """ # Extract data from this page. if page.toextract: try: page.extract() except ExtractionError: logging.error("Extraction from %s failed:", page.url) raise else: self.log( 'Scraped Data from %s:\n %s' % (page.url, page.data), logging.DEBUG ) self._report['EXTRACTED DATA'] += 1 # Pass the page to pipelines after extraction. for pipeline in self._pipelines: g = self._pipes.spawn(pipeline, page) g.link_exception(self._on_exception) # Add new pages to crawl from links to follow. if page.tofollow: empty = True for new_page in page.follow_links(): empty = False self.log('Follow: %s' % new_page.url, logging.DEBUG) self._add_to_queue(new_page) if empty: self.log('No links to follow in %s' % page.url,logging.WARNING) def _add_to_queue(self, page): """Add a page to the queue to fetch.""" if not self._takewhile(page): self.log( "Take while predicate return False for %s" % page.url, logging.DEBUG ) self._toprocess.put(StopIteration) elif self._filter(page): self.log("Filter url %s" % page.url, logging.DEBUG) else: self._toprocess.put(page) def _finish(self, reason=None): """Clean up, set and show report.""" self.log('Shutting down ...') try: # Join all the greenlets. self._extractors.join() self._requests.join() self._pipes.join() # Execute finish function after that all greenlet has finished. if self._finish_func: self._finish_func() finally: # Add report information. self._report['FINISH TIME'] = datetime.now() self._report['TOTAL TIME'] = str( self._report['FINISH TIME'] - self._report['START TIME'] ) if reason: self._report['SHUTDOWN REASON'] = reason pprint.pprint(self._report) @property
[docs] def report(self): """Get execution report. The report contains the following fields: - CRAWLED URLS: count of crawled URLs. - EXTRACTED DATA: count of extracted data passed to pipelines. - EXCEPTIONS COUNTER: count number of exceptions raised. - START TIME: Date time when the crawler started. - FINISH TIME: Date time when the crawler finished. - TOTAL TIME: The total time spend crawling. - SHUTDOWN REASON: Reason why the crawler finished, i.e. show the exception that made the crawler stop if there is one, else show 'FINISH CRAWLING' which mean the crawler finish normally. """ return self._report # Singleton default runner.
runner = _Runner()