Source code for resyndicator.fetchers.content
import requests
from copy import copy
from datetime import datetime
from urllib.parse import urlparse
from lxml.etree import LxmlError
from sqlalchemy.sql import or_
from readability import Document
from readability.readability import Unparseable
from ..utils import decode_response
from ..utils.logger import logger
from ..models import Entry, DefaultSession
from .. import settings
[docs]class ContentFetcher(object):
"""
Fetcher class for retrieval and extraction of content from websites.
This fetcher incrementally downloads and extracts (using Readability)
the content from any pages associated with entries that don’t already
have long-form content associated with them.
"""
Entry = Entry
def __init__(self, session=DefaultSession, past=None, **kwargs):
self.past = past
self.session = session()
self.stub_entries = []
self.entries = []
self.kwargs = kwargs
self.kwargs.setdefault('headers', {})
self.kwargs['headers'].setdefault('user-agent', settings.USER_AGENT)
self.kwargs.setdefault('timeout', settings.TIMEOUT)
[docs] def update(self):
"""Replenish the in-memory list of entries to process."""
query = self.session.query(self.Entry) \
.filter(or_(self.Entry.content==None, self.Entry.content==''),
self.Entry.link!=None)
if self.past:
query = query.filter(self.Entry.updated > datetime.utcnow() - self.past)
# Sorted newest first so that if content vanishes we at least get to
# fetch some of it. Minimal risk rather than all or nothing.
self.stub_entries = query.order_by(self.Entry.updated.desc()).all()
@staticmethod
[docs] def get_hostname(entry):
"""
Wrapper for extrating the hostname from entry links.
(Another ContentFetcher might need to remove the `.www`.)
"""
# For subclassing for different entry types
return urlparse(entry.link).hostname
[docs] def select(self):
"""
Return a list of entries where there is only one per host so to
maximize the time between requests to the same host.
"""
known_hosts = set()
for entry in copy(self.stub_entries):
hostname = self.get_hostname(entry)
if hostname not in known_hosts:
known_hosts.add(hostname)
self.stub_entries.remove(entry)
yield entry
def _fetch(self, url):
"""Retrieve the unprocessed web page."""
response = requests.get(url, **self.kwargs)
response.raise_for_status()
return response
def _decode(self, response):
"""Decode the response."""
return decode_response(response)
def _extract(self, html, url):
"""Use Readability to extract the main content of the page."""
return Document(html, url=url)
def _enrich(self, entry, document):
"""Extend the entry with content from the extraction."""
entry.content = document.summary()
entry.content_type = 'html'
if not entry.title or not entry.title.strip():
entry.title = document.short_title()
return entry
[docs] def fetch(self):
"""
Run one full fetching cycle. This is the main entry point for the content fetching process.
"""
if not self.stub_entries:
self.update()
if not self.stub_entries:
return
logger.info('%s entries queued', len(self.stub_entries))
for entry in self.select():
logger.info('Fetching %s', entry.link)
try:
response = self._fetch(entry.link)
except (IOError, requests.RequestException) as excp:
logger.error('Request exception %r', excp)
entry.content = 'Request exception {!r}'.format(excp)
entry.content_type = 'text'
continue
try:
html = self._decode(response)
document = self._extract(html, url=response.url)
entry = self._enrich(entry, document) # Exceptions were typically raised here
except (LxmlError, Unparseable) as excp:
logger.error('Parser exception %r', excp)
entry.content = 'Parser exception {!r}'.format(excp)
entry.content_type = 'text'
continue
self.entries.append(entry)
[docs] def persist(self):
"""Commit the updates to the entries."""
# Actually no need to access self.entries again
self.session.commit()
self.entries = []