Git Product home page Git Product logo

yelp_data's Introduction

Solution overview:

Data from Yelp dataset - json files - are loaded by kafka console-producers into appropriate kafka topics. Python kafka consumers, based on spark structured streaming read those data, shape it according to cassandra schema (using advanced built-in types: counter, set, map, as well as UDT - user defined types, parsing and typecasting) and write it in appropriate tables.

Decision log

  • Initial attempt to use confluent's kafka connect - miserably failed, for some reason it stop publish messages into kafka topic after approximately 10k msg
  • additionally kafka connect wrap each event into nested structure
  • console-producers was able to publish all messages in less than 10 minutes
  • kafka topics replication & partitions set to fixed values
  • cassandra replication factor is set to 1
  • number of nodes in setup decreased to have options to be able to run all setup within workstation
  • events with mismatched schema are not published to corresponding errors topics
  • testing are very limited just to demonstrate how it can be done
  • for approach result validation and data exploratory analysis check section How to get number of businesses per category from cassandra table

Repository layout

  • conf - contains external config files for docker containers
  • consumer - python module to consume data from kafka and publish it into cassandra
  • data - yelp_dataset.tar MUST be extracted into this folder (all bootstrap scripts expect it to be there)
  • deploy - docker-compose files for cassandra, kafka and spark
  • cql_schemas - cassandra schema and deploy script
  • main.py - driver program that orchestrate streaming logic
  • bootstrap.sh - deploy and start all services
  • stop_all.sh - stop all docker containers and CLEAN container's data and metadata
  • start_consumers.sh - start all streaming jobs as background processes

Prerequisites

  • java 8 (see at the bottom how to use sdk tool to add additional java version)
  • bash shell
  • python 2.7
  • docker && docker-compose
  • HOSTNAME environment variable are set within shell

How to start

  1. copy yelp_dataset.tar to ./data folder
cp yelp_dataset.tar ./data 
  1. create virtual environment and install package with all dependencies:
virtualenv -p /usr/bin/python2.7 venv && source ./venv/bin/activate
python setup.py install

2.5 update value of KAFKA_ADVERTISED_HOST_NAME in deploy/kafka.yml to be ip address (not loopback, not 127.0.0.1). It will work as it is in Linux, but not at Mac. or alternatively you may explicitly export HOSTNAME:

export HOSTNAME
  1. start all services, upload data in Kafka and spawn spark streaming jobs to write data into Cassandra
./bootstrap.sh

Alternatively you may specify location of yelp_dataset.tar:

./bootstrap.sh -d /path/to/data/yelp_dataset.tar

NOTE: sometimes cassandra take more time to start properly, in this case it necessary to wait for several minutes and just re-start ./bootstrap.sh

How to run single streaming job

  1. create virtualenv and install dependencies:
virtualenv -p /usr/bin/python2.7 venv && source ./venv/bin/activate
  1. install requirements:
python setup.py develop

or

pip install -r requirements.txt 
  1. edit ./consumer/1.cfg if it necessary to adjust ip addresses of core services
  2. Assuming data in corresponding kafka topics, list of supported topics: [business, review, user, checkin, tip]
python main.py --cfg consumer/1.cfg --tid business

How to run all streaming jobs

./start_consumers.sh

How to run tests for streaming job

  1. create virtualenv and install dependencies:
virtualenv venv && source ./venv/bin/activate
  1. install tests requirements:
pip install -r test-requirements.txt 
  1. execute tests
tox

How to get number of businesses per category from cassandra table

In case you want to run it with this module available, assuming you already install all requirements:

cd consumer && ./make_dist.sh
sudo docker exec -it spark-master /spark/bin/pyspark \
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0,com.datastax.spark:spark-cassandra-connector_2.11:2.4.0 \
--conf spark.cassandra.connection.host=192.168.0.9 \
--conf spark.cassandra.connection.port=9042 \
--conf spark.cassandra.output.consistency.level=ONE \
--py-files /consumer/dependencies.zip
from pyspark.sql.functions import explode, col, countDistinct
df = spark.read\
.format("org.apache.spark.sql.cassandra")\
.options(table="business", keyspace="yelp_data")\
.load()
df.printSchema()
df.count()
df = df.select(col("business_id"), explode("categories").alias("category"))
df = df.groupBy("category").agg(countDistinct("business_id"))
df.show()

Data schema modeling:

General consideration - Cassandra schema usually defined based on requirements of how user will query data. As part of exercise tables in form of <entity_name> - is just snapshot of information as-is aka fact_tables:

  • yelp_data.business, yelp_data.business_review_count
  • yelp_data.review, yelp_data.review_reactions
  • yelp_data.checkin
  • yelp_data.tip, yelp_data.tip_compliment_count

Just to illustrate how desired tables can be derived I've added as example schema yelp_data.business_by_location (without populating it though) with reversed index. It is designed to answer questions like: Show me businesses by locations, with particular category and rating. and created dedicated table with appropriate indexes - i.e. PK will be geohash(lat, long): String, category, rating NOTE: As a result data will be redundant (for every entry in business) we will multiplication factor for this table only equal to number of categories entries in original row.

Data adjustments

Business:

  • postal code - initial intention was to cast it to integer, but look up at Wiki reveal that for some countries it may contains letters
  • is_open -> from integer to boolean
  • attributes -> from String to map (only programmatic filtering)
  • hours -> from String to udt (only programmatic filtering)

Checkin:

  • date (as space separated String) -> set

Tip:

  • introduce artificial uuid based PK
  • date (string) -> date

In real life stars computed to business, reviews and users will be resided in dedicated column family or periodically re-computed in in-mem cache that will be synced with cassandra table.

Troubleshooting

it usually helps to clean data folders from services:

rm -rf ./deploy/cassandra1/
rm -rf ./deploy/kafka_data/
rm -rf ./deploy/zoo_data/

in case of network issue may worth to check 1.cfg additionally in deploy/kafka.yml - modify KAFKA_ADVERTISED_HOST_NAME

Various

How to run cqlsh:

sudo docker exec -it cassandra1 cqlsh cassandra1

How to run pyspark:

sudo docker exec -it spark-master /spark/bin/pyspark \
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0,com.datastax.spark:spark-cassandra-connector_2.11:2.4.0 \
--conf spark.cassandra.connection.host=192.168.0.9 \
--conf spark.cassandra.connection.port=9042 \
--conf spark.cassandra.output.consistency.level=ONE

How to install java 8:

curl -s "https://get.sdkman.io" | bash
source "$HOME/.sdkman/bin/sdkman-init.sh"
sdk install java 8.0.252-amzn
sdk use java 8.0.252-amzn

time for pushing biggest file into kafka:

time kafka-console-producer.sh --broker-list kafka:9092 --topic review < /raw_data/yelp_academic_dataset_review.json 
real	5m40.304s
user	2m2.007s
sys	1m7.300s

some usefull commands

export PYSPARK_PYTHON=python3
export PYTHONPATH=$PYTHONPATH:/consumer/
pyspark --py-files dependencies.zip

yelp_data's People

Contributors

kruglov-dmitry avatar

Stargazers

 avatar  avatar

Watchers

 avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.