Module media_analyzer.streams.twitter_stream
Expand source code
# twitter streaming module
import os
import tweepy
from twitter_analyzer.models import Tweet
from queue import Queue
import os
"""
Inherit from Tweepy's StreamingClinet to override filter, on_tweet, on_connect, etc.
"""
class TwitterStream(tweepy.StreamingClient):
"""Subclass of Tweepy's StreamingClient. Overrides important method and used as an interface for
getting and processing actual tweets."""
def __init__(self):
"""Initialize the Stream."""
# subscribe filters
self.subscription = {}
self.worker = None
self.timeline: "Queue[Tweet]" = Queue()
# True when stream is connected, false when disconnected.
self.is_connected = False
# True when stream is paused.
self.is_paused = True
# Initialize class with authorization
super().__init__(
bearer_token=os.getenv("BEAR_TOKEN")
)
"""
Get status
0 means stopped
1 means running
-1 means paused
"""
def get_status(self):
"""Get the current status of the stream- whether it's running, paused, or connected."""
if self.is_connected:
return 1 if not self.is_paused else -1
else:
return 0
def toggle_module(self):
"""
Connect to stream if not connected. Disconnect the stream otherwise.
"""
self.is_connected = not self.is_connected
# create worker and start the stream
if self.is_connected:
self.is_paused = False
self.worker = self.sample(threaded=True)
# join thread and stop stream
else:
self.is_paused = True
if self.worker is not None:
self.worker.join()
def on_tweet(self, tweet):
"""Put a tweet into the timeline from the stream.
Automatically called when stream gets a tweet."""
# put into queue if stream is not paused
if not self.is_paused:
self.timeline.put(Tweet(tweet))
# end stream
if not self.is_connected:
self.disconnect()
# things to do when connect to stream
def on_connect(self):
"""Run when the stream connects.
Log a message to let us know we're connected."""
print("connected to stream\n")
# things to do when disconnect to stream
def on_disconnect(self):
"""Run when the stream disconnects.
Log a message to let us know we're disonnected."""
print("connected to stream\n")
self.is_connected = False
print("disconnected")
def on_exception(self):
"""Run when the stream throws an exception.
Disconnect the stream and set appropriate flags."""
self.disconnect()
self.is_connected = False
print("Disconnected by Twitter.")
def pause_resume(self):
"""Toggles whether stream is paused or not."""
self.is_paused = not self.is_paused
def result_generator(self):
"""
Gets all tweets from the current timeline and returns them as serialized objects.
"""
# pour tweets into list
raw_tweets = []
while not self.timeline.empty():
raw_tweets.append(self.timeline.get())
self.timeline.task_done()
results = []
for tweet in raw_tweets:
results.append([tweet.get_id(), tweet.get_content()])
return results
stream = TwitterStream()
Classes
class TwitterStream
-
Subclass of Tweepy's StreamingClient. Overrides important method and used as an interface for getting and processing actual tweets.
Initialize the Stream.
Expand source code
class TwitterStream(tweepy.StreamingClient): """Subclass of Tweepy's StreamingClient. Overrides important method and used as an interface for getting and processing actual tweets.""" def __init__(self): """Initialize the Stream.""" # subscribe filters self.subscription = {} self.worker = None self.timeline: "Queue[Tweet]" = Queue() # True when stream is connected, false when disconnected. self.is_connected = False # True when stream is paused. self.is_paused = True # Initialize class with authorization super().__init__( bearer_token=os.getenv("BEAR_TOKEN") ) """ Get status 0 means stopped 1 means running -1 means paused """ def get_status(self): """Get the current status of the stream- whether it's running, paused, or connected.""" if self.is_connected: return 1 if not self.is_paused else -1 else: return 0 def toggle_module(self): """ Connect to stream if not connected. Disconnect the stream otherwise. """ self.is_connected = not self.is_connected # create worker and start the stream if self.is_connected: self.is_paused = False self.worker = self.sample(threaded=True) # join thread and stop stream else: self.is_paused = True if self.worker is not None: self.worker.join() def on_tweet(self, tweet): """Put a tweet into the timeline from the stream. Automatically called when stream gets a tweet.""" # put into queue if stream is not paused if not self.is_paused: self.timeline.put(Tweet(tweet)) # end stream if not self.is_connected: self.disconnect() # things to do when connect to stream def on_connect(self): """Run when the stream connects. Log a message to let us know we're connected.""" print("connected to stream\n") # things to do when disconnect to stream def on_disconnect(self): """Run when the stream disconnects. Log a message to let us know we're disonnected.""" print("connected to stream\n") self.is_connected = False print("disconnected") def on_exception(self): """Run when the stream throws an exception. Disconnect the stream and set appropriate flags.""" self.disconnect() self.is_connected = False print("Disconnected by Twitter.") def pause_resume(self): """Toggles whether stream is paused or not.""" self.is_paused = not self.is_paused def result_generator(self): """ Gets all tweets from the current timeline and returns them as serialized objects. """ # pour tweets into list raw_tweets = [] while not self.timeline.empty(): raw_tweets.append(self.timeline.get()) self.timeline.task_done() results = [] for tweet in raw_tweets: results.append([tweet.get_id(), tweet.get_content()]) return results
Ancestors
- tweepy.streaming.StreamingClient
- tweepy.client.BaseClient
- tweepy.streaming.BaseStream
Methods
def get_status(self)
-
Get the current status of the stream- whether it's running, paused, or connected.
Expand source code
def get_status(self): """Get the current status of the stream- whether it's running, paused, or connected.""" if self.is_connected: return 1 if not self.is_paused else -1 else: return 0
def on_connect(self)
-
Run when the stream connects. Log a message to let us know we're connected.
Expand source code
def on_connect(self): """Run when the stream connects. Log a message to let us know we're connected.""" print("connected to stream\n")
def on_disconnect(self)
-
Run when the stream disconnects. Log a message to let us know we're disonnected.
Expand source code
def on_disconnect(self): """Run when the stream disconnects. Log a message to let us know we're disonnected.""" print("connected to stream\n") self.is_connected = False print("disconnected")
def on_exception(self)
-
Run when the stream throws an exception. Disconnect the stream and set appropriate flags.
Expand source code
def on_exception(self): """Run when the stream throws an exception. Disconnect the stream and set appropriate flags.""" self.disconnect() self.is_connected = False print("Disconnected by Twitter.")
def on_tweet(self, tweet)
-
Put a tweet into the timeline from the stream. Automatically called when stream gets a tweet.
Expand source code
def on_tweet(self, tweet): """Put a tweet into the timeline from the stream. Automatically called when stream gets a tweet.""" # put into queue if stream is not paused if not self.is_paused: self.timeline.put(Tweet(tweet)) # end stream if not self.is_connected: self.disconnect()
def pause_resume(self)
-
Toggles whether stream is paused or not.
Expand source code
def pause_resume(self): """Toggles whether stream is paused or not.""" self.is_paused = not self.is_paused
def result_generator(self)
-
Gets all tweets from the current timeline and returns them as serialized objects.
Expand source code
def result_generator(self): """ Gets all tweets from the current timeline and returns them as serialized objects. """ # pour tweets into list raw_tweets = [] while not self.timeline.empty(): raw_tweets.append(self.timeline.get()) self.timeline.task_done() results = [] for tweet in raw_tweets: results.append([tweet.get_id(), tweet.get_content()]) return results
def toggle_module(self)
-
Connect to stream if not connected. Disconnect the stream otherwise.
Expand source code
def toggle_module(self): """ Connect to stream if not connected. Disconnect the stream otherwise. """ self.is_connected = not self.is_connected # create worker and start the stream if self.is_connected: self.is_paused = False self.worker = self.sample(threaded=True) # join thread and stop stream else: self.is_paused = True if self.worker is not None: self.worker.join()