Linguistics Student Linguistics Student - 9 months ago 69
Python Question

Running Time Estimate for Stream Twitter with Location Filter in Tweepy

PROBLEM SOLVED, SEE SOLUTION AT THE END OF THE POST

I need help to estimate running time for my tweepy program calling Twitter Stream API with location filter.

After I kicked it off, it has run for over 20 minutes, which is longer than what I expected. I am new to Twitter Stream API, and have only worked with REST API for couple of days. It looks to me that REST API will give me 50 tweets in a few seconds, easy. But this Stream request is taking a lot more time. My program hasn't died on me or given any error. So I don't know if there's anything wrong with it. If so, please do point out.

In conclusion, if you think my code is correct, could you provide an estimate for the running time? If you think my code is wrong, could you help me to fix it?

Thank you in advance!

Here's the code:

# Import Tweepy, sys, sleep, credentials.py
import tweepy, sys
from time import sleep
from credentials import *

# Access and authorize our Twitter credentials from credentials.py
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
api = tweepy.API(auth)

box = [-86.33,41.63,-86.20,41.74]

class CustomStreamListener(tweepy.StreamListener):
def on_error(self, status_code):
print >> sys.stderr, 'Encountered error with status code:', status_code
return True # Don't kill the stream
def on_timeout(self):
print >> sys.stderr, 'Timeout...'
return True # Don't kill the stream

stream = tweepy.streaming.Stream(auth, CustomStreamListener()).filter(locations=box).items(50)
stream


I tried the method from http://docs.tweepy.org/en/v3.4.0/auth_tutorial.html#auth-tutorial Apparently it is not working for me... Here is my code below. Would you mind giving any input? Let me know if you have some working code. Thanks!

# Import Tweepy, sys, sleep, credentials.py
import tweepy, sys
from time import sleep
from credentials import *

# Access and authorize our Twitter credentials from credentials.py
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
api = tweepy.API(auth)

# Assign coordinates to the variable
box = [-74.0,40.73,-73.0,41.73]

import tweepy
#override tweepy.StreamListener to add logic to on_status
class MyStreamListener(tweepy.StreamListener):

def on_status(self, status):
print(status.text)
def on_error(self, status_code):
if status_code == 420:
#returning False in on_data disconnects the stream
return False

myStreamListener = MyStreamListener()
myStream = tweepy.Stream(auth = api.auth, listener=myStreamListener())
myStream.filter(track=['python'], locations=(box), async=True)


Here is the error message:

Traceback (most recent call last):
File "test.py", line 26, in <module>
myStream = tweepy.Stream(auth = api.auth, listener=myStreamListener())
TypeError: 'MyStreamListener' object is not callable





PROBLEM SOLVED! SEE SOLUTION BELOW

After another round of debug, here is the solution for one who may have interest in the same topic:

# Import Tweepy, sys, sleep, credentials.py
try:
import json
except ImportError:
import simplejson as json
import tweepy, sys
from time import sleep
from credentials import *

# Access and authorize our Twitter credentials from credentials.py
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
api = tweepy.API(auth)

# Assign coordinates to the variable
box = [-74.0,40.73,-73.0,41.73]

import tweepy
#override tweepy.StreamListener to add logic to on_status
class MyStreamListener(tweepy.StreamListener):

def on_status(self, status):
print(status.text.encode('utf-8'))
def on_error(self, status_code):
if status_code == 420:
#returning False in on_data disconnects the stream
return False

myStreamListener = MyStreamListener()
myStream = tweepy.Stream(api.auth, listener=myStreamListener)
myStream.filter(track=['NYC'], locations=(box), async=True)

Answer Source

Core Problem: I think you're misunderstanding what the Stream is here.

Tl;dr: Your code is working, you're just not doing anything with the data that gets back.

The rest API call is a single call for information. You make a request, Twitter sends back some information, which gets assigned to your variable.

The StreamObject (which you've created as stream) from Tweepy opens a connection to twitter with your search parameters, and Twitter, well, streams Tweets to it. Forever.

From the Tweepy docs:

The streaming api is quite different from the REST api because the REST api is used to pull data from twitter but the streaming api pushes messages to a persistent session. This allows the streaming api to download more data in real time than could be done using the REST API.

So, you need to build a handler (streamListener, in tweepy's terminology), like this one that prints out the tweets..

Additional

Word of warning, from bitter experience - if you're going to try and save the tweets to a database: Twitter can, and will, stream objects to you much faster than you can save them to the database. This will result in your Stream being disconnected, because the tweets back up at Twitter, and over a certain level of backed-up-ness (not an actual phrase), they'll just disconnect you.

I handled this by using django-rq to put save jobs into a jobqueue - this way, I could handle hundreds of tweets a second (at peak), and it would smooth out. You can see how I did this below. Python-rq would also work if you're not using django as a framework round it. The read both method is just a function that reads from the tweet and saves it to a postgres database. In my specific case, I did that via the Django ORM, using the django_rq.enqueue function.

__author__ = 'iamwithnail'

from django.core.management.base import BaseCommand, CommandError
from django.db.utils import DataError
from harvester.tools import read_both
import django_rq

class Command(BaseCommand):

    args = '<search_string search_string>'
    help = "Opens a listener to the Twitter stream, and tracks the given string or list" \
           "of strings, saving them down to the DB as they are received."


    def handle(self, *args, **options):
        try:
            import urllib3.contrib.pyopenssl
            urllib3.contrib.pyopenssl.inject_into_urllib3()
        except ImportError:
            pass

        consumer_key = '***'
        consumer_secret = '****'
        access_token='****'
        access_token_secret_var='****'
        import tweepy
        import json

        # This is the listener, responsible for receiving data
        class StdOutListener(tweepy.StreamListener):
            def on_data(self, data):
                decoded = json.loads(data)
                try:
                    if decoded['lang'] == 'en':
                        django_rq.enqueue(read_both, decoded)
                    else:
                        pass
                except KeyError,e:
                    print "Error on Key", e
                except DataError, e:
                    print "DataError", e
                return True


            def on_error(self, status):
                print status


        l = StdOutListener()
        auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
        auth.set_access_token(access_token, access_token_secret_var)
        stream = tweepy.Stream(auth, l)
stream.filter(track=args)

Edit: Your subsequent problem is caused by calling the listener wrongly.

myStreamListener = MyStreamListener() #creates an instance of your class

Where you have this:

myStream = tweepy.Stream(auth = api.auth, listener=myStreamListener())

You're trying to call the listener as a function when you use the (). So it should be:

myStream = tweepy.Stream(auth = api.auth, listener=myStreamListener)

And in fact, can probably just be more succinctly written as:

myStream = tweepy.Stream(api.auth,myStreamListener)