Git Product home page Git Product logo

ms-sio's Introduction

ms-sio

Cours 1: spark

  • Objectifs:
  • manipuler le dsl de spark (Dataframe API)
  • comprendre le concept de RDD
  • comprendre le modele de memoire
  • voir comment configurer un job spark
  • voir comment lancer un job spark
  • preparer le dataset:

télécharger le dataset via cette page: https://www.kaggle.com/datasnaek/youtube-new

le fichier zip télécharger doit etre a ce path : ~/Downloads/youtube-new.zip.

make prepare-dataset
  • lancer pyspark :
make run-pyspark

Spark est un outil permettant de manipuler les données via différentes "API": RDD, Dataframe, SQL et differents languages (essentiellement Scala et Python)

Dans ce cours on utlisera l'API Dataframe via son implémentation en Python.

Pour faciliter l'introspection dans un dataset ou pour modéliser un traitement ou simplement le tester Spark offre des REPL, dans ce cours on a utilisé pyspark qui est la version en python.

  • je charge mon dataset, pour cela on utilise le concept de Datasource.
videos = spark.read.option('header','true').option('inferSchema','true').csv('/data/raw/FRvideos.csv')
  • je decouvre mon dataset.
videos.count()

videos.show()

videos.show(50)

videos.show(10,False)
  • Chaque dataframe possede un schema:
videos.printSchema()
  • chercher et corriger les erreurs d'inference de schema:
videos.select('category_id').show()
videos.select('category_id').sample(0.1).distinct().show()
videos.select('category_id').filter(videos.category_id.rlike('\\d*')).count()

videos = videos.filter(videos.category_id.rlike('\\d*')).withColumn('category_id',videos.category_id.cast('integer'))

Remarque ici on a fait que des operations de type transformation (select, filter, where, ...). Spark va juste les prendre en compte sans les éxécuter immédiatement. Pour provoquer l'execution d'un job, il faut soumettre une action (write, save, count, show).

En general une transformation prend une RDD/DataFrame en entrée et donne une autre en sortie. Tandis qu'une action donne un resultat autre qu'un RDD/DataFrame. Par exemple:

  • count() = f(RDD) => Long

  • show() = f(RDD) => String

  • write.parquet() = f(RDD => "Fichiers.parquet"

  • lire le dataset en json.

categories = spark.read.option('multiline','true').json('/data/raw/FR_category_id.json')

categories.count()

categories.show()

categories.printSchema()

Attention dans le monde spark quand on parle de JSON on s'attend a avoir du Json Lines

  • je transforme les données pour qu'elles soient plus manipuable:
from pyspark.sql.functions import *

categories = categories.select(explode('items'))

categories = categories.select('col.id','col.snippet.title')

categories = categories.withColumnRenamed('title','category_title')
  • Je fait une jointure entre les deux datasets:
df = videos.join(categories.hint('broadcast'),videos.category_id == categories.id,'inner')

Je me sert du fait que le dataset des catégories a une petite taille pouvant etre supportée par la mémoire du Driver et envoyée a tous les executors (via le réseau!!) assez rapidement; pour faire une optimisation de la jointure dites: Broadcast join

Dans le cas ou je fait une jointure de deux datasets de tailles importantes il faut travailler sur la definition des partitions des deux datasets (co-partitionning) qui doit pouvoir limiter l'impact des opération d'echanges de données via le reseau (shuffle). Un bon choix de partitionnement necessite une bonne connaissance des datasets (nature, connaissance du domaine, tailles) et aussi des ressources a dispositions;

voir cette article: https://techmagie.wordpress.com/2015/12/19/understanding-spark-partitioning/

comprendre la gestion de la mémoire:

 df.cache() 
 df.count() # action pour forcer la mise en cache
  • voir l'onglet storage.

  • autre facon de mettre en cache:

df.persist(pyspark.StorageLevel.MEMORY_ONLY) # voir https://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence

A retenir

  • La memoire est l'une des ressources avec laquelle on inter-agit le plus en utilisant/optimisant spark.
  • Quand on parle de mémoire ne surtout pas confondre:
    • La Memoire des machines sur lesquelles tournent mes executeurs spark. Cette memoire ne doit pas etre entierement donner a spark.
    • La Memoire totale du process JVM qui représente l'executeur spark, qui ne peut etre entierement dédiée a nos traitements;
    • La fraction de mémoire gérer par le Memory Manager de Spark
    • Cette derniere est divisée en deux parties l'une pour l'execution (jointure, sort, aggregation,...) et l'autre pour le stockage (cache).
    • C'est sur cette fraction qu'on va le plus souvent agir pour optimiser l'usage de nos resources.

Lire cet article qui détaille bien le modele de mémoire de spark: Spark Memory Management

Mais aussi:

A NE JAMAIS JAMAIS JAMAIS OUBLIER : Spark n'est ni conçu ni pensé pour charger ENTIEREMENT un dataset en mémoire. Un dataset est représenté par une RDD qui est partitionnée en plusieurs blocks/partitions. Il faut qu'au moins la taille "d'un seul" block/partition "puisse tenir en mémoire".

  • Comment je vais sauvegarder mon resultat:
    df.write.parquet('/data/my-joined-dataset')

Je choisis de sauvegarder mon dataset en parquet car c'est un format de stockage trés optimisé pour un usage classique de spark.

  • Comment je lance mon job en production?

voir d'abord comment j'ai ré-écrit mes traitements sous forme d'un programme python plus structuré que les commandes que j'ai jusqu'ici lancé avec pyspark.

Pour lancer un job spark j'utilise la commande spark-submit et ceux quelque soit le language utilisé.

Le lancement d'un job consiste concrétement a demander au scheduler du cluster spark de lancer le traitement soumis.

Le scheduler le plus utilisé pour le moment en production est YARN

   docker run --rm -ti -v ~/cours-hdp2/datasets:/data -v $(pwd)/data-ingestion-job/src:/src  --entrypoint bash stebourbi/sio:pyspark
   
   /spark-2.4.4-bin-hadoop2.7/bin/spark-submit /src/process.py -v /data/raw/FRvideos.csv -c /data/raw/FR_category_id.json -o /data/output00

Je peux aussi changer certaines configurations au lancement de mon job, par exemple celle de la mémoire:

    /spark-2.4.4-bin-hadoop2.7/bin/spark-submit --conf spark.executor.memory=8G /src/process.py -v /data/raw/FRvideos.csv -c /data/raw/FR_category_id.json -o /data/output00

D'autres parametres "importants" sont souvent passé au lancement d'un job:

    /spark-2.4.4-bin-hadoop2.7/bin/spark-submit --name videos-integration \
     --master yarn \ # YARN reste le scheduler le plus utilisé pour des cluster de production
     --deploy-mode cluster \ # quand on lance en mode `cluster` le driver sera lancé sur une machine du cluster et gérer par le scheduler. Contrairement au mode `client` 
     --conf spark.executor.memory=12G \
     --conf spark.executor.cores=4 \ # nombre de cpu par executor ce qui lui permet d'executer 4 taches en paralleles qui se partagerons la memoire d'execution. 
     --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
     --conf spark.shuffle.service.enabled=true \
     --conf spark.dynamicAllocation.enabled=true \
      /src/process.py -v /data/raw/FRvideos.csv -c /data/raw/FR_category_id.json -o /data/output00

Voici la liste des configurations. Attention certaines configurations dependent du scheduler utilisé!

a voir/aller plus loin

Cours 2: streaming

Objectifs:

  • comprendre les concepts
  • manipuler un backend de stream : kafka
  • faire des traitement en streaming avec spark structured streaming

kafka:

lancer une instance:

docker-compose -f zk-single-kafka-single.yml up

s'y connecter et faire des manips:

kafka-topics --bootstrap-server localhost:9092 --create --topic topic03 --partitions 3 --replication-factor 1

kafka-console-producer --broker-list localhost:9092  --topic topic03

kafka-console-consumer --bootstrap-server  localhost:9092  --topic topic03

lancer un spark-shell (pyspark) sur un container docker qui voit le broker kafka

docker run --rm -ti -v ~/cours-hdp2/datasets:/data --network docker_default -p 4040:4040 stebourbi/sio:pyspark --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.4

Lire depuis le stream/topic kafka

df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "kafka1:19092").option("subscribe", "topic01").load()

df.isStreaming
df.printSchema()

df.count()
df.show()

query = df.writeStream.outputMode("update").format("console").start()

query.awaitTermination()

inserer des messages dans le topic

cat <<EOF >data
2020-01-26 10:12:15,ligne1,1
2020-01-26 10:13:12,ligne2,2
2020-01-26 10:13:13,ligne3,3
2020-01-26 10:14:16,ligne4,4
EOF

kafka-console-producer --broker-list localhost:9092  --topic topic01 < data

Les questions qu'on se pose pour un traitement en streaming:

  • Quel traitement?

  • A quelle periode/fenetre s'applique le traitement?

  • Quand est ce qu'on emet le resultat?

  • Comment on restitue le resultat?

  • ajouter le temps de capture de la données

df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "kafka1:19092").option("subscribe", "topic01").option("startingOffsets", "latest").load()
df = df.select(df.value.cast("string"),df.timestamp.alias('capture_time'))
query = df.writeStream.outputMode("update").format("console").option('truncate', 'false').start()
query.awaitTermination()
  • ajouter le temps de l'evenement
from pyspark.sql.functions import *
df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "kafka1:19092").option("subscribe", "topic01").option("startingOffsets", "latest").load()
df = df.select(df.value.cast("string"),df.timestamp.alias('capture_time'))
df = df.withColumn('event_time',to_timestamp(split(df.value,',')[0]))
query = df.writeStream.outputMode("update").format("console").option('truncate', 'false').start()
query.awaitTermination()
  • ajouter le temps de traitement
from pyspark.sql.functions import *
df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "kafka1:19092").option("subscribe", "topic01").option("startingOffsets", "latest").load()
df = df.select(df.value.cast("string"),df.timestamp.alias('capture_time'))
df = df.withColumn('processing_time',current_timestamp())
df = df.withColumn('event_time',to_timestamp(split(df.value,',')[0]))
query = df.writeStream.outputMode("update").format("console").option('truncate', 'false').start()
query.awaitTermination()
  • sortir la valeur de l'evenement
from pyspark.sql.functions import *
df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "kafka1:19092").option("subscribe", "topic01").option("startingOffsets", "latest").load()
df = df.select(df.value.cast("string"),df.timestamp.alias('capture_time'))
df = df.withColumn('processing_time',current_timestamp())
df = df.withColumn('event_time',to_timestamp(split(df.value,',')[0]))
df = df.withColumn('value',split(df.value,',')[1])
query = df.writeStream.outputMode("update").format("console").option('truncate', 'false').start()
query.awaitTermination()
  • traiter par fenetre de temps:
from pyspark.sql.functions import *
df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "kafka1:19092").option("subscribe", "topic01").option("startingOffsets", "latest").load()
df = df.select(df.value.cast("string"),df.timestamp.alias('capture_time'))
df = df.withColumn('processing_time',current_timestamp())
df = df.withColumn('event_time',to_timestamp(split(df.value,',')[0]))
df = df.withColumn('value',split(df.value,',')[1])
df = df.groupBy(window(df.capture_time,"10 minutes","10 minutes"),df.value).count()
query = df.writeStream.outputMode("update").format("console").option('truncate', 'false').start()
query.awaitTermination()
  • gerer les evenement en retard:
from pyspark.sql.functions import *
df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "kafka1:19092").option("subscribe", "topic03").option("startingOffsets", "latest").load()
df = df.select(df.value.cast("string"),df.timestamp.alias('capture_time'))
df = df.withColumn('processing_time',current_timestamp())
df = df.withColumn('event_time',to_timestamp(split(df.value,',')[0]))
df = df.withColumn('value',split(df.value,',')[1])
df = df.withWatermark("capture_time", "15 minutes") 
df = df.groupBy(window(df.capture_time,"10 minutes","10 minutes"),df.value).count()
query = df.writeStream.outputMode("update").format("console").option('truncate', 'false').start()
query.awaitTermination()

___ a retenir__

windowing:

  • fixed window: simple decoupage en periode egale (sliding avec pas == periode)

  • sliding window: definie par un pas et une periode (souvent pas >= periode)

  • session: depend de la presence ou pas d'un pattern dans la donée le windowing peut se baser sur :

  • event time

  • processing time

  • capture time

  • decider du temps de traitement:

from pyspark.sql.functions import *
df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "kafka1:19092").option("subscribe", "topic3").option("startingOffsets", "latest").load()
df = df.select(df.value.cast("string"),df.timestamp.alias('capture_time'))
df = df.withColumn('processing_time',current_timestamp())
df = df.withColumn('event_time',to_timestamp(split(df.value,',')[0]))
df = df.withColumn('value',split(df.value,',')[1])
df = df.withWatermark("capture_time", "15 minutes") 
df = df.groupBy(window(df.capture_time,"10 minutes","10 minutes"),df.value).count()
query = df.writeStream.trigger(processingTime='30 seconds').outputMode("complete").format("console").option('truncate', 'false').start()
query.awaitTermination()

a retenir: la notion completude de donées par rapport au frontiere des windows:

  • watermark: la limite aprés laquelle on considere/suppose que tte les données sont parvenues
  • triggering: quand on considere que les events/données relative a une window sont constituée et prete au processing la restitution du resultat:
    • complete
    • 'append'
    • mise a jour

ms-sio's People

Contributors

stebourbi avatar

Watchers

 avatar  avatar  avatar

Forkers

s0n

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.