Source code for resyndicator.fetchers.twitter
import json
import time
from datetime import datetime
from operator import attrgetter, itemgetter
from random import randrange
from birdy.twitter import UserClient, StreamClient
from dateutil.parser import parse as parse_date
from dateutil.tz import tzutc
from utilofies.stdlib import itertimeout, sub_slices, cached_property
from .. import settings
from .base import BaseFetcher, BaseEntryInterface
from ..models import Entry, DefaultSession
from ..utils.logger import logger
[docs]class TweetInterface(BaseEntryInterface):
"""Mapping for individual tweets."""
Entry = Entry
@property
def tweet_text(self):
"""Assemble a presentable text respresentation of the tweet."""
if 'retweeted_status' in self.raw_entry:
tweet = self.raw_entry['retweeted_status']
prefix = 'RT @{screen_name}: '.format(
screen_name=tweet['user']['screen_name'])
else:
tweet = self.raw_entry
prefix = ''
replacements = {}
for url in tweet['entities']['urls']:
replacements[tuple(url['indices'])] = \
url.get('display_url', url['expanded_url'])
for medium in tweet['entities'].get('media', []):
replacements[tuple(medium['indices'])] = \
medium.get('display_url', medium['expanded_url'])
# Purging possible None values
replacements = {key: value for key, value in replacements.items() if value}
return prefix + sub_slices(tweet['text'], replacements)
@property
def tweet_html(self):
"""Assemble a presentable HTML respresentation of the tweet."""
# TODO: Embed replies
if 'retweeted_status' in self.raw_entry:
tweet = self.raw_entry['retweeted_status']
prefix = (
'RT <a href="https://twitter.com/{screen_name}"'
' title="{name}">@{screen_name}</a>: ').format(
screen_name=tweet['user']['screen_name'],
name=tweet['user']['name'])
else:
tweet = self.raw_entry
prefix = ''
images = []
replacements = {}
for url in tweet['entities']['urls']:
replacements[tuple(url['indices'])] = (
'<a href="{expanded_url}">{display_url}</a>'.format(
expanded_url=url['expanded_url'],
display_url=url.get('display_url',
url['expanded_url'])))
if any(map((url['expanded_url'] or '').endswith,
('.png', '.jpg', '.jpeg', '.gif', '.svg'))):
images.append(url['expanded_url'])
for hashtag in tweet['entities']['hashtags']:
replacements[tuple(hashtag['indices'])] = (
('<a href="https://twitter.com/#!/search/'
'?q=%23{hashtag}&src=hash">#{hashtag}</a>').format(
hashtag=hashtag['text']))
for mention in tweet['entities']['user_mentions']:
# Case insensitive
verbatim = tweet['text'][slice(*mention['indices'])]
replacements[tuple(mention['indices'])] = (
('<a href="https://twitter.com/{screen_name}" title="{name}">'
'{verbatim}</a>').format(
screen_name=mention['screen_name'],
name=mention['name'],
verbatim=verbatim))
for medium in tweet['entities'].get('media', []):
replacements[tuple(medium['indices'])] = (
'<a href="{expanded_url}">{display_url}</a>'.format(
expanded_url=medium['expanded_url'],
display_url=medium.get('display_url',
medium['expanded_url'])))
if medium['type'] == 'photo':
images.append(medium['media_url'])
# Purging possible None values
replacements = {key: value for key, value in replacements.items() if value}
text = prefix + sub_slices(tweet['text'], replacements)
images = '<br />'.join('<img src="{url}" alt="" />'.format(url=url)
for url in images)
return '<p>{text}</p><p>{images}</p>'.format(text=text, images=images)
@property
def id(self):
"""The tweet ID as string."""
return str(self.raw_entry['id'])
@property
def fetched(self):
"""Time the tweet was fetched."""
return datetime.utcnow()
@property
def updated(self):
"""Time the tweet was created."""
date = parse_date(self.raw_entry['created_at'])
return date.astimezone(tzutc()).replace(tzinfo=None)
@property
def title(self):
"""Optionally shortened representation of the tweet text."""
tweet_text = self.tweet_text
max_length = settings.TWITTER_TITLE_LENGTH
if max_length and len(tweet_text) > max_length:
return tweet_text[:max_length - 1] + '…'
return tweet_text
@property
def author(self):
"""The tweep."""
return '{screen_name} ({name})'.format(
screen_name=self.raw_entry['user']['screen_name'],
name=self.raw_entry['user']['name'])
@property
def link(self):
"""The URL of the tweet."""
return 'https://twitter.com/{screen_name}/statuses/{id}'.format(
screen_name=self.raw_entry['user']['screen_name'],
id=self.raw_entry['id'])
@property
def content(self):
"""The HTML representation of the tweet."""
return self.tweet_html
@property
def source_id(self):
"""URN to identify the tweep."""
return 'urn:twitter:user:{id}'.format(id=self.raw_entry['user']['id'])
@property
def source_title(self):
"""Some generated source title based on the author name."""
return '{author} on Twitter'.format(author=self.author)
@property
def source_link(self):
"""Link to the Twitter account of the author."""
return 'https://twitter.com/{screen_name}'.format(
screen_name=self.raw_entry['user']['screen_name'])
@property
def entry(self):
"""The SQLAlchemy entry representing the tweet."""
entry = super().entry
if entry.created:
entry.published = self.updated
entry.source_id = self.source_id
entry.source_title = self.source_title
entry.source_link = self.source_link
return entry
class TwitterFetcher(BaseFetcher):
EntryInterface = TweetInterface
count = 200
url = 'https://twitter.com/' # For consistency
def __init__(self, oauth_token, oauth_secret, interval,
session=DefaultSession, default_tz=tzutc, defaults=None):
self.client = UserClient(
settings.CONSUMER_KEY, settings.CONSUMER_SECRET,
oauth_token, oauth_secret)
self.defaults = defaults or {}
# Fuzziness to spread updates out more evenly
self.interval = interval - randrange(interval // 10 + 1)
self.last_check = time.time() + self.interval
self.default_tz = default_tz
self.session = session()
@cached_property
def profile(self):
"""The profile of the Twitter user."""
response = self.client.api.account.verify_credentials.get(skip_status=True)
return response.data
def __str__(self):
return '@{} on Twitter'.format(self.profile.screen_name)
def update(self):
"""Retrieve the current set of tweets."""
response = self.client.api.statuses.home_timeline.get(
count=self.count, exclude_replies=False, include_entities=True)
self.raw_entries = response.data
[docs]class TwitterStreamer:
"""
A Twitter streaming client that doesn’t work at the moment due to an error in
the Birdy library. Please use the TwitterFetcher in the meantime.
"""
def __init__(self, oauth_token, oauth_secret, session=DefaultSession, timeout=0, **kwargs):
self.oauth_token = oauth_token
self.oauth_secret = oauth_secret
self.timeout = timeout
self.kwargs = kwargs
self.friends = None
self.client = StreamClient(
settings.CONSUMER_KEY,
settings.CONSUMER_SECRET,
settings.OAUTH_TOKEN,
settings.OAUTH_SECRET)
self.session = session()
def store(self, entries, new=False):
if not new:
existing_ids = map(
itemgetter(0),
self.session.query(self.TweetInterface.Entry.id)
.filter(self.TweetInterface.Entry.id.in_(
map(attrgetter('id'), entries))))
entries = [entry for entry in entries
if entry.id not in existing_ids]
try:
for entry in entries:
self.session.merge(entry)
self.session.commit()
except:
self.session.rollback()
raise
return entries
def retrieve_home_timeline(self, count=200):
return self.rest.get_home_timeline(
count=count, exclude_replies=False, include_entities=True)
def run(self):
for i, tweet in enumerate(itertimeout(
self.client.userstream.user.get(**{'replies': 'all', 'with': 'followings'}),
timeout=self.timeout)):
fresh_entries = []
if i % settings.TWITTER_STREAM_GET_INTERVAL == 0:
# i == 0: Initial fetch in case we missed something
# i != 0: Additional fetch in case the streaming API
# missed something.
try:
get_tweets = self.retrieve_home_timeline()
except Exception as excp:
logger.exception('Error during initial fetch: %r', excp)
else:
entries = [TweetInterface(get_tweet).entry
for get_tweet in get_tweets]
fresh_entries = self.store(entries)
logger.info(
'Stored %s missed tweets by %s',
len(fresh_entries),
', '.join(entry.author for entry in fresh_entries)
or 'everyone')
logger.debug(json.dumps(tweet, indent=4))
if 'id' in tweet:
if tweet['user']['id'] in self.friends:
entry = TweetInterface(tweet).entry
self.store([entry], new=True)
logger.info('Stored tweet %s by %s',
tweet['id'], tweet['user']['screen_name'])
fresh_entries.append(entry)
else:
# Unfortunately Twitter also streams replies to tweets by
# people we’re following regardless of whether we’re
# following the author.
logger.info('Skipping foreign tweet by %s',
tweet['user']['screen_name'])
elif 'friends' in tweet:
# Should be the first item to be streamed
self.friends = tweet['friends']
logger.info('Received list of %s friends',
len(tweet['friends']))
elif tweet.get('event') == 'follow':
self.friends.append(tweet['target']['id'])
logger.info('Received follow event for %s',
tweet['target']['screen_name'])
else:
logger.info('Skipping weird object: %s', tweet)
# Final yield
if fresh_entries:
yield fresh_entries