Thomas N. Thomas N. - 8 days ago 6
Python Question

RxPy - Turn Live Twitter Stream into Rx Observable?

I followed along this great tutorial to leverage a live Twitter stream in Python using tweepy. This will print Tweets in live time that mention RxJava, RxPy, RxScala, or ReactiveX.

from tweepy.streaming import StreamListener
from tweepy import OAuthHandler
from tweepy import Stream
from rx import Observable, Observer

#Variables that contains the user credentials to access Twitter API
access_token = "CONFIDENTIAL"
access_token_secret = "CONFIDENTIAL"
consumer_key = "CONFIDENTIAL"
consumer_secret = "CONFIDENTIAL"


#This is a basic listener that just prints received tweets to stdout.
class TweetObserver(StreamListener):

def on_data(self, data):
print(data)
return True

def on_error(self, status):
print(status)



if __name__ == '__main__':

#This handles Twitter authetification and the connection to Twitter Streaming API
l = TweetObserver()
auth = OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
stream = Stream(auth, l)

#This line filter Twitter Streams to capture data by the keywords: 'python', 'javascript', 'ruby'
stream.filter(track=['rxjava','rxpy','reactivex','rxscala'])


This is the perfect candidate to turn into a ReactiveX Observable via RxPy. But how exactly do I turn this into a hot source
Observable
? I cannot seem to find documentation anywhere on how to perform an
Observable.create()
...

Answer

I figured this out some time ago. You have to define a function that manipulates a passed Observer argument. Then you pass that to Observable.create().

from tweepy.streaming import StreamListener
from tweepy import OAuthHandler
from tweepy import Stream
import json
from rx import Observable

# Variables that contains the user credentials to access Twitter API
access_token = "PUT YOURS HERE"
access_token_secret = "PUT YOURS HERE"
consumer_key = "PUT YOURS HERE"
consumer_secret = "PUT YOURS HERE"


def tweets_for(topics):
    def observe_tweets(observer):
        class TweetListener(StreamListener):
            def on_data(self, data):
                observer.on_next(data)
                return True

            def on_error(self, status):
                observer.on_error(status)

        # This handles Twitter authetification and the connection to Twitter Streaming API
        l = TweetListener()
        auth = OAuthHandler(consumer_key, consumer_secret)
        auth.set_access_token(access_token, access_token_secret)
        stream = Stream(auth, l)
        stream.filter(track=topics)

    return Observable.create(observe_tweets).share()


topics = ['Britain', 'France']

tweets_for(topics) \
    .map(lambda d: json.loads(d)) \
    .subscribe(on_next=lambda s: print(s), on_error=lambda e: print(e))