Source code for resyndicator.fetchers.base
import socket
import time
import requests
from random import randrange
from datetime import datetime
from dateutil.tz import tzutc
from utilofies.stdlib import canonicalized
from .. import settings
from ..utils.logger import logger
from ..models import Entry, DefaultSession
# In accordance with
# https://dev.twitter.com/docs/streaming-apis/connecting#Stalls
socket.setdefaulttimeout(90)
[docs]class UnchangedException(Exception):
"""Indicates that a feed or sitemap has not been changed."""
pass
[docs]class BaseEntryInterface(object):
"""
Base class for entries.
Subclass this to provide a unified interface to any type of entry
you want to import.
"""
Entry = Entry
def __init__(self, fetcher, raw_entry):
self.fetcher = fetcher
self.raw_entry = raw_entry
@property
def id(self):
"""
A globally unique ID for internal deduplication
and identification by feed readers
"""
pass
@property
def summary(self):
"""Summary or description of the entry"""
pass
@property
def summary_type(self):
"""Either `text` or `html` (important for the Atom output)"""
pass
@property
def content(self):
"""Full content of the entry if given"""
pass
@property
def content_type(self):
"""Either `text` or `html` (important for the Atom output)"""
pass
@property
def source(self):
"""
Optional field to insert any entry source code into the
content field. This can be set through `settings.INCLUDE_SOURCE`.
"""
pass
@property
def title(self):
"""Entry title"""
pass
@property
def fetched(self):
"""Fetch time (set to `datetime.datetime.utcnow()` by default)"""
return datetime.utcnow()
@property
def updated(self):
"""Time the entry was updated on the supplier side"""
pass
@property
def published(self):
"""Time the entry was published"""
pass
@property
def link(self):
"""Entry link"""
pass
@property
def author(self):
"""Entry author"""
pass
def _new_entry(self):
"""
Create new entry
The methodology is such that an SQLAlchemy entry is created for each entry from the start.
If it is a new entry, a new SQLAlchemy entry is created for it in memory but not yet
committed. If it is an existing one, a mapped SQLAlchemy entry is created. A `created`
attribute indicates which one is the case. This allows us to handle creations and
updates almost the same without having to have two complete separate control flows
for the cases.
"""
entry = self.fetcher.session.query(self.Entry).filter_by(id=self.id).one_or_none()
if not entry:
entry = self.Entry()
self.fetcher.session.add(entry)
entry.created = True # Marker for later updating
return entry
@property
def entry(self):
"""
Return the SQLAlchemy entry.
Here, the `source` property is used to optionally include the entry source code
(specified through `settings.INCLUDE_SOURCE`). If the entry exists, it is returned
unchanged. Otherwise, it is initialized with all the new values from the supplier.
"""
summary, summary_type = self.summary, self.summary_type
content, content_type = self.content, self.content_type
if settings.INCLUDE_SOURCE:
if content:
content = '{}\n\n{}'.format(content, self.source)
content_type = 'html'
else:
# It is recommended that summary be nonempty
# if there is no content. This is also useful
# for the separate content fetching.
summary = '{}\n\n{}'.format(summary, self.source)
summary_type = 'html'
entry = self._new_entry()
if entry.created:
entry.id = self.id
entry.title = self.title or self.fetcher.title
entry.fetched = self.fetched
entry.updated = self.updated or datetime.utcnow()
entry.published = self.published
entry.link = self.link or self.fetcher.link or self.fetcher.url
entry.summary = summary
entry.summary_type = summary_type
entry.content = content
entry.content_type = content_type
entry.author = self.author or self.fetcher.author
entry.source_id = self.fetcher.id
entry.source_title = self.fetcher.title
entry.source_link = self.fetcher.url
return entry
[docs]class BaseFetcher(object):
"""
Base class for fetchers.
Subclass this to implement your own custom types of fetchers.
"""
type_mapping = {
'text/plain': 'text',
'text/html': 'html',
'application/xhtml+xml': 'xhtml'}
EntryInterface = BaseEntryInterface
def __init__(self, url, interval, session=DefaultSession, default_tz=tzutc,
defaults=None, **kwargs):
self.defaults = defaults or {}
self.url = url
# Fuzziness to spread updates out more evenly
self.interval = interval - randrange(interval // 10 + 1)
self.last_check = time.time() + self.interval
self.default_tz = default_tz
self.kwargs = kwargs
self.kwargs.setdefault('headers', {})
self.kwargs['headers'].setdefault('user-agent', settings.USER_AGENT)
self.kwargs.setdefault('timeout', settings.TIMEOUT)
self.response_headers = {}
self.session = session()
def __str__(self):
return self.url
@property
def id(self):
"""A unique ID for the data source, e.g., the feed"""
pass
@property
def title(self):
"""Title of the data source"""
pass
@property
def subtitle(self):
"""Subtitle of the data source"""
pass
@property
def link(self):
"""Link to the data source"""
pass
@property
def hub(self):
"""Optionally the endpoint of a hub such as PubSubHubbub"""
pass
@property
def author(self):
"""The author of the data source"""
pass
@property
def generator(self):
"""The generator of the data source"""
pass
def __hash__(self):
return hash(self.url)
[docs] def retrieve(self):
"""
Retrieve the data source.
This is by default a wrapper around the requests library that sets headers such that
servers can indicate that the content hasn’t changed since the last retrieval, and
by default also specifies a custom user agent and timeout.
"""
self.kwargs['headers'].update(canonicalized({
'if-modified-since': self.response_headers.get('last-modified'),
'if-none-match': self.response_headers.get('etag')}))
response = requests.get(self.url, **self.kwargs)
response.raise_for_status()
if response.url != self.url:
logger.info('Redirects to %s', response.url)
self.response_headers = response.headers
if response.status_code == 304:
raise UnchangedException
return response
[docs] def parse(self, response):
"""
Implement this function to convert your data source to something the `update` method
can work with.
"""
raise NotImplementedError()
@property
def needs_update(self):
"""Return whether the source is ripe for an update."""
return self.next_check < time.time()
@property
def next_check(self):
"""The time of the next update."""
return self.last_check + self.interval
[docs] def touch(self):
"""Set the time of the last check of the source to the current time."""
self.last_check = time.time()
[docs] def clean(self):
"""Reset the fetcher."""
self.source = None
self.raw_entries = None
[docs] def is_valid(self, entry):
"""Test the validity of an entry. Only returns `True` by default."""
return True
@property
def entries(self):
"""Yield the SQLAlchemy entries after setting any default values."""
for raw_entry in self.raw_entries:
entry = self.EntryInterface(
fetcher=self, raw_entry=raw_entry)
if self.is_valid(entry):
entry = entry.entry # From EntryInterface to models.BaseEntry
for key, value in self.defaults.items():
if not getattr(entry, key, None):
setattr(entry, key, value)
yield entry
[docs] def persist(self):
"""
Commit the entries or any updates to them to the database and return
the entries that have been created."""
entries = list(self.entries) # Entries created in generator
self.session.commit()
fresh_entries = [entry for entry in entries if entry.created]
if fresh_entries:
logger.info('%s new entries', len(fresh_entries))
return fresh_entries