Module media_analyzer.twitter_analyzer.views

Expand source code
from django.shortcuts import render
from streams.twitter_stream import stream
from django.http import JsonResponse
import json
import twitter_analyzer.tasks as tasks
from queue import Queue
from twitter_analyzer.scheduler import background_scheduler
scheduler = background_scheduler.background_scheduler
# module status
modules_status = {"stream": True, "sentiment": False,
                  "topic": False, "lang": False}
# simulate databse
data_base = {}
# a cache stream over 2 secs period to alleviate call to database
stream_cache = Queue()


"""
clear stream cache
input Queue: cache
"""


def cache_stream(stream_cache):
    """Gets the results from the stream and puts them into the cache."""
    stream_new_entries = stream.result_generator()
    for entry in stream_new_entries:
        entry_id = entry[0]
        entry_text = entry[1]
        stream_cache.put({"id": entry_id, "text": entry_text})


def clear_cache(stream_cache, db):
    """
    clear stream cache and copy to database
    stream_cache:Queue
    db:used a dictionary for database for now.
    """
    # save to db and clear cache
    while not stream_cache.empty():
        data = stream_cache.get()
        stream_cache.task_done()
        for key in data.keys():
            if key == "id":
                continue
            if data["id"] not in db:
                db[data["id"]] = {}
            db[data["id"]][key] = data[key]


def schedule_result_by_category(category, stream_cache, ids, db):
    """
    An event triggered scheduler
    category: task name
    stream_cache: Queue
    ids: ind tweets need to be processed
    db: a dictionary, used to represent database for now
    """
    if category == "sentiment":
        scheduler.add_job(
            tasks.get_sentiment,
            kwargs={"stream_cache": stream_cache, "id": ids, "db": data_base},
        )


def schedule_job(scheduler):
    """
    scheduler for periodically scheduled jobs.
    """
    # print("running routine")
    clear_cache(stream_cache, data_base)
    cache_stream(stream_cache)

    if modules_status["sentiment"]:
        # cancel before reschedule to have a better performance
        if scheduler.get_job('stream_sentiment'):
            scheduler.remove_job('stream_sentiment')
        scheduler.add_job(
            tasks.get_sentiment,
            kwargs={"stream_cache": stream_cache, "id": None, "db": None}, id="stream_sentiment"
        )
    if modules_status["topic"]:
        if scheduler.get_job('stream_topic'):
            scheduler.remove_job('stream_topic')
        scheduler.add_job(
            tasks.get_topic,
            kwargs={"stream_cache": stream_cache, "ids": None, "db": None}, id="stream_topic"
        )
    if modules_status["lang"]:
        if scheduler.get_job('stream_lang'):
            scheduler.remove_job('stream_lang')
        scheduler.add_job(
            tasks.get_lang,
            kwargs={"stream_cache": stream_cache, "ids": None, "db": None}, id="stream_lang"
        )


def send_result(request):
    """
    API end point to return processed result and stream.
    """
    if request.method == "POST":
        # De-Serialize Request to a Python Object
        packet = json.load(request)
        # TODO: consider convert id and category to lower case and remove duplication for security
        categories = packet["category"]
        # toggle model
        for category in categories:
            modules_status[category] = True
        ids = packet["id"]
        fetched_result = {"stream": [], "inds": []}
        # fetch result from stream first if requested
        if "stream" in categories:
            fetched_result["stream"] = fetch_from_stream(categories)
        # fetch result in db
        fetched_result["inds"] = fetch_from_db(ids, categories)
        return JsonResponse(fetched_result)


def fetch_from_stream(categories):
    """
    fetch result from stream by categories
    stream data alwasy has text data,
    if data not exist, schedule to generate it
    """

    # remove stream from category
    categories.remove("stream")
    fetched_result = {}
    for _ in range(stream_cache.qsize()):
        data = stream_cache.get()
        id = data["id"]
        # put text in first for stream
        fetched_result[id] = {"text": data["text"]}
        for category in categories:
            # if result exist fetch
            if category in data:
                fetched_result[id][category] = data[category]
            # schedule to generate the result
            else:
                # None value tell the frontend try again later
                fetched_result[id][category] = None
                schedule_result_by_category(category, stream_cache, None, None)
        stream_cache.task_done()
        # put back to stream
        stream_cache.put(data)
    return fetched_result


def fetch_from_db(ids, categories):
    """
    fetch result from db,
    if data not exist, schedule to generate it
    """

    fetched_result = {}
    for id in ids:
        # check if database has this entry
        if id not in data_base:
            continue
        for category in categories:
            fetched_result[id] = {}
            # fetch if exist
            if category in data_base[id]:
                fetched_result[id][category] = data_base[id][category]
            # schedule to generate the result
            else:
                fetched_result[id][category] = None
                schedule_result_by_category(
                    category, stream_cache, id, data_base)
    return fetched_result


def index(request):
    """
    The only page for this application
    """

    results = []
    tweets = []
    for _ in range(stream_cache.qsize()):
        result = stream_cache.get()
        stream_cache.task_done()
        stream_cache.put(result)
        results.append(result)

    for result in results:
        tweets.append(result["text"])
    return render(request, "twitter_analyzer/index.html", {"tweets": tweets})


def rest_module():
    """
    Periodically shuts down the model to reduce consumption.
    """
    print("shutting down module")
    for key in modules_status.keys():
        modules_status[key] = False


if scheduler is not None:
    # schedule job
    scheduler.add_job(schedule_job, 'interval', seconds=2,
                      kwargs={'scheduler': scheduler})
    # scheduler.add_job(rest_module, 'interval', minutes=5)
    scheduler.start()
else:
    print("scheduler not initalized")

Global variables

var stream_cache

clear stream cache input Queue: cache

Functions

def cache_stream(stream_cache)

Gets the results from the stream and puts them into the cache.

Expand source code
def cache_stream(stream_cache):
    """Gets the results from the stream and puts them into the cache."""
    stream_new_entries = stream.result_generator()
    for entry in stream_new_entries:
        entry_id = entry[0]
        entry_text = entry[1]
        stream_cache.put({"id": entry_id, "text": entry_text})
def clear_cache(stream_cache, db)

clear stream cache and copy to database stream_cache:Queue db:used a dictionary for database for now.

Expand source code
def clear_cache(stream_cache, db):
    """
    clear stream cache and copy to database
    stream_cache:Queue
    db:used a dictionary for database for now.
    """
    # save to db and clear cache
    while not stream_cache.empty():
        data = stream_cache.get()
        stream_cache.task_done()
        for key in data.keys():
            if key == "id":
                continue
            if data["id"] not in db:
                db[data["id"]] = {}
            db[data["id"]][key] = data[key]
def fetch_from_db(ids, categories)

fetch result from db, if data not exist, schedule to generate it

Expand source code
def fetch_from_db(ids, categories):
    """
    fetch result from db,
    if data not exist, schedule to generate it
    """

    fetched_result = {}
    for id in ids:
        # check if database has this entry
        if id not in data_base:
            continue
        for category in categories:
            fetched_result[id] = {}
            # fetch if exist
            if category in data_base[id]:
                fetched_result[id][category] = data_base[id][category]
            # schedule to generate the result
            else:
                fetched_result[id][category] = None
                schedule_result_by_category(
                    category, stream_cache, id, data_base)
    return fetched_result
def fetch_from_stream(categories)

fetch result from stream by categories stream data alwasy has text data, if data not exist, schedule to generate it

Expand source code
def fetch_from_stream(categories):
    """
    fetch result from stream by categories
    stream data alwasy has text data,
    if data not exist, schedule to generate it
    """

    # remove stream from category
    categories.remove("stream")
    fetched_result = {}
    for _ in range(stream_cache.qsize()):
        data = stream_cache.get()
        id = data["id"]
        # put text in first for stream
        fetched_result[id] = {"text": data["text"]}
        for category in categories:
            # if result exist fetch
            if category in data:
                fetched_result[id][category] = data[category]
            # schedule to generate the result
            else:
                # None value tell the frontend try again later
                fetched_result[id][category] = None
                schedule_result_by_category(category, stream_cache, None, None)
        stream_cache.task_done()
        # put back to stream
        stream_cache.put(data)
    return fetched_result
def index(request)

The only page for this application

Expand source code
def index(request):
    """
    The only page for this application
    """

    results = []
    tweets = []
    for _ in range(stream_cache.qsize()):
        result = stream_cache.get()
        stream_cache.task_done()
        stream_cache.put(result)
        results.append(result)

    for result in results:
        tweets.append(result["text"])
    return render(request, "twitter_analyzer/index.html", {"tweets": tweets})
def rest_module()

Periodically shuts down the model to reduce consumption.

Expand source code
def rest_module():
    """
    Periodically shuts down the model to reduce consumption.
    """
    print("shutting down module")
    for key in modules_status.keys():
        modules_status[key] = False
def schedule_job(scheduler)

scheduler for periodically scheduled jobs.

Expand source code
def schedule_job(scheduler):
    """
    scheduler for periodically scheduled jobs.
    """
    # print("running routine")
    clear_cache(stream_cache, data_base)
    cache_stream(stream_cache)

    if modules_status["sentiment"]:
        # cancel before reschedule to have a better performance
        if scheduler.get_job('stream_sentiment'):
            scheduler.remove_job('stream_sentiment')
        scheduler.add_job(
            tasks.get_sentiment,
            kwargs={"stream_cache": stream_cache, "id": None, "db": None}, id="stream_sentiment"
        )
    if modules_status["topic"]:
        if scheduler.get_job('stream_topic'):
            scheduler.remove_job('stream_topic')
        scheduler.add_job(
            tasks.get_topic,
            kwargs={"stream_cache": stream_cache, "ids": None, "db": None}, id="stream_topic"
        )
    if modules_status["lang"]:
        if scheduler.get_job('stream_lang'):
            scheduler.remove_job('stream_lang')
        scheduler.add_job(
            tasks.get_lang,
            kwargs={"stream_cache": stream_cache, "ids": None, "db": None}, id="stream_lang"
        )
def schedule_result_by_category(category, stream_cache, ids, db)

An event triggered scheduler category: task name stream_cache: Queue ids: ind tweets need to be processed db: a dictionary, used to represent database for now

Expand source code
def schedule_result_by_category(category, stream_cache, ids, db):
    """
    An event triggered scheduler
    category: task name
    stream_cache: Queue
    ids: ind tweets need to be processed
    db: a dictionary, used to represent database for now
    """
    if category == "sentiment":
        scheduler.add_job(
            tasks.get_sentiment,
            kwargs={"stream_cache": stream_cache, "id": ids, "db": data_base},
        )
def send_result(request)

API end point to return processed result and stream.

Expand source code
def send_result(request):
    """
    API end point to return processed result and stream.
    """
    if request.method == "POST":
        # De-Serialize Request to a Python Object
        packet = json.load(request)
        # TODO: consider convert id and category to lower case and remove duplication for security
        categories = packet["category"]
        # toggle model
        for category in categories:
            modules_status[category] = True
        ids = packet["id"]
        fetched_result = {"stream": [], "inds": []}
        # fetch result from stream first if requested
        if "stream" in categories:
            fetched_result["stream"] = fetch_from_stream(categories)
        # fetch result in db
        fetched_result["inds"] = fetch_from_db(ids, categories)
        return JsonResponse(fetched_result)