Git Product home page Git Product logo

twitterstream-sentiment-analysis's Introduction

Twitterstream Sentiment-Analysis with Kafka, Spark & Elasticsearch

Reading the Twitterstream from the Twitter-API with Kafka and stream them into a Spark-Cluster to process it

Things you need

Twitter Modules

  • Tweepy (And your own pair of API Keys from Twitter)
  • Kafka-Python
  • Pyspark
  • Elasticsearch
  • NLTK with VADER

Base Architecture

Let's dive into the Base Architecture to get an overview of the tools. It's not that complicated.


At the beginning of our journey we have the twitterusers tweeting some posts with some hashtags. Roumors says there are ~6000 per second. Twitter offers an API to query them in past for some days or to read the livestream.

With a Kafka Producer written in Python (use Java or Scala if you want to) we reading the stream and after some cleaning we send the relevant part of the tweet to a topic at the Kafkaserver.

While the tweets getting send to the topic the Spark-Consumer waits at the other side of the Kafka Messagequeue. The Spark-Streaming libary has some KafkaUtils to collect the messages from the Kafkaserver and return them into Spark RDDs to process them.

Inside the Spark-Consumer we let the NLTK Vader package doing the Sentiment-Magic and add that result to the data of the tweet.

In the end of a consumingloop we can send the data to Elasticsearch to build some fancy dashboards with Kibana to show the result of our sentimentanalysis and proof that the tweets are beeing processed in realtime.

Step 1/2 - Read the stream

To swim with the twitterstream we need some tools:

  • your personal API-Token you can get from Twitter to call the API
  • the Python module Tweepy to access the Stream from Python
  • Kafka-Python module to send the tweets to the Kafkaserver

You can find the code here twitterstreamproducer.py

At first we define a StdOutListener who listenes on the Stream the function on_data() defines what we want to do with every tweet we get. The function cleantweet() cleans every tweet by just extracting the user, date and the text. After this the tweet is send to the topic "tweets".

class StdOutListener(StreamListener):
    def on_data(self, data):
        newdata = cleantweet(data)
        producer.send("tweets", newdata)
        print(newdata)
        return True
    def on_error(self, status):
        print (status)

Besides the API-Token we give the Tweepy Stream object our StdOutListener and set a filter to some hashtag we want to track. That's it.

producer = KafkaProducer(bootstrap_servers='YourHostWithKafka:9092', 
    api_version=(0, 10, 1),
    value_serializer=lambda m: json.dumps(m).encode('ascii'))

l = StdOutListener()
auth = OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
stream = Stream(auth, l,tweet_mode='extended')
stream.filter(track=["#worldkidnessday"], languages=["en"])

Step 2/2 - Consume the tweets

To consume the tweets we can use a boring Python Kafka-Consumer or a cool PySpark-Consumer processing the tweets inside a cluster. The code of the second option you can find inside twittersparkconsumer.py.

First of all we create a SparkContext with our appName, then we create the StreamingContext with the SparkContext an let it wait for 3 seconds to consume the next package of tweets. After that we use the StreamingContext to build a KafkaConsumer and for each RDD we get from the Stream we call the function process(). As long as we don't kill the process tis code will run to infinity (and beyond).

sc = SparkContext(appName="PythonStreaming")
ssc = StreamingContext(sc, 3)
  
kafkaStream = KafkaUtils.createStream(ssc, "YourHostWithKafka:2181", "consumer-group", {"tweets": 1})
lines = kafkaStream.map(lambda x: json.loads(x[1]))

lines.foreachRDD(process)

ssc.start()
ssc.awaitTermination()

Inside the process() function we process the tweets and for every tweet we call the dosentiment() function othergiving the text of the tweet. The result of this function will added to the data as column "sentiment".

udf_func = udf(lambda x: dosentiment(x),returnType=StringType())
df = df.withColumn("sentiment",lit(udf_func(df.text)))

Inside the dosentiment() function we processing the text of the tweet with the SentimentAnalyzer of NLTK Vader, which gives us some scores other the positive,negative and neutral elements of the text and a calculated compoundscore from -1 (Negative) to 1 (Positive) and between. These information can be added returned to the tweetdata

def dosentiment(tweet):
  scores = dict([('pos', 0), ('neu', 0), ('neg', 0), ('compound', 0)])
  sid = SentimentIntensityAnalyzer()
  ss = sid.polarity_scores(tweet)
  for k in sorted(ss):
      scores[k] += ss[k]

  return json.dumps(scores)

Still inside the process() function we can now send our result to elasticsearch and build a dashboard with kibana. The sth2elastic() function out of my ownelastic.py will do the job for us but don't forget to config your Elastichost in this file.

results = df.toJSON().map(lambda j: json.loads(j)).collect()
for result in results:
      result["date"]= datetime.strptime(result["date"],"%Y-%m-%d %H:%M:%S")
      result["sentiment"]=json.loads(result["sentiment"])
      sth2elastic(results,"tweets","doc")

Now the Tweets are saved inside the "tweets" Index of Elasticsearch and you can build a dashboard that refreshes every few seconds.


twitterstream-sentiment-analysis's People

Contributors

benschr avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.