Spark MLlib + Kafka + Flask/RxJS/GridJS Lab
Overview
This lab showcases an end-to-end Docker Compose application featuring Spark MLlib along with Kafka and a simple Python Flask frontend, to collect, categorize and display sport news items from various RSS feeds.
The main task implemented by the application, using Spark, is to train a news classifier model and apply it to collected news items. We start from some selected DailyMail feeds for specific sports (e.g., football, tennis, racing, etc): they provide use some text, in the form of news titles and short abstracts, along with a category associated to each specific feed (10 categories total). We use a batch of this data from Kafka to train a multiclass classifier (train-java
, train-python
folders, which you will have to edit), and then apply this model to classify the stream of RSS data, reading and writing back to Kafka. A Python Flask frontend then consumes classified news offering a simple web search UI. While we don't do online learning (no support for that in the framework), the model can in principle retrained periodcally and the code is already setup to switch to the newest model for the predictions.
The following figure (svg version) depicts the application architecture in terms of components (e.g., internal Docker containers), data flows (solid links) and other dependencies (dotted links):
flowchart LR
rss("RSS feeds"):::compext
ui("HTML/JS web frontend\nrunning in the browser"):::compext
train("Spark offline training code\n(Python notebook/script or Java)"):::compext
model[("\nClassifier Model\n(directory ./model)")]:::compext
subgraph app [Docker Compose application]
zookeeper("ZooKeeper"):::comp
broker("Kafka Broker"):::comp
connect("Kafka Connect\nwith RSS source connector"):::comp
subgraph spark ["Spark cluster"]
direction LR
sparkmaster("Spark Master"):::comp
sparkworker("Spark Worker"):::comp
sparkdriver("Spark processor\n(driver submitting job)"):::comp
end
flask("Python Flask\nweb server"):::comp
kafkaui("Kafka UI"):::comp
end
rss --> connect
connect --> broker
broker <--> spark
broker --> flask
flask ------> ui
broker -.- zookeeper
kafkaui -.- broker
sparkdriver -.- sparkmaster
sparkmaster -.- sparkworker
broker --> train
train --> model
model --> sparkdriver
classDef scope fill:#fff,stroke:#333,stroke-width:1px,stroke-dasharray: 5 5,color:#444,font-size:10pt;
classDef comp fill:#fafafa,stroke:#333,stroke-width:1.5px,font-size:10pt;
classDef compsub fill:#eee,stroke:#333,stroke-width:1.5px,font-size:10pt;
classDef compext fill:#fff,stroke:#333,stroke-width:1.5px,font-size:10pt;
classDef dataext fill:#fff,stroke:#333,stroke-width:1.5px,font-size:10pt;
classDef none fill:#fff,stroke:#fff,stroke-width:0px,font-size:0pt;
class app scope
class spark scope
class connect comp
Macro-components:
- RSS feeds - they include 10 feeds from Dailymain each one associated to a specific sport category, which we use for training, plus additional 9 uncategorized feeds from other sport news sources, whose content we are going to classify
-
Kafka connect - runs one RSS source connector to inject RSS news items into Kafka topic
rss
-
Kafka + ZooKeeper + Kafka UI - stores the input
rss
topic as well as the classified output topicrss_categorized
, which is then read from the frontend; Kafka UI is available at http://localhost:28080/ -
Spark offline training code - dumps all categorized data in Kafka, trains a text classification model and saves it to
./model
-
Spark processor - loads the classifier model from
./model
(once available, reloading it if it changes) and uses it to read news items from therss
topic, classify them, and write the result intorss_categorized
-
Python Flask server - uses the Kafka consumer API to collect data from topic
rss_categorized
, keeping the last 72 hours of news and producing a HTML/JS ui that polls the server for news every minute, displaing them in a searchable table
How to Run the Application
This is a Docker Compose application that involves building custom images for its Java/Python processor components.
-
make sure you have both Docker and Java 11 installed (even if you work in Python), while for Python you need version 3.7 or later. If you run this on Windows, you will need some native library related to HDFS: you may follow the instructions here and when asked to install winutils, use the binaries (
.dll
and.exe
) for Spark 3.3.1 from here -
clone this repository and copy
.env.example
into.env
, possibly editing it (just creating.env
is enough):$ git clone git@gitlab.inf.unibz.it:rtbdp/labs/lab_spark_rss.git $ cd lab_spark_rss $ cp .env.example .env
-
in a terminal, use Docker Compose to start or stop all the required components (in this case they include the supplied Spark processor, that will remain idle waiting for a model):
$ docker compose up # to start the application $ docker compose down -v --remove-orphans # to stop the application, deleting all data and dangling containers (--remove-orphans)
where
--remove-orphans
(optional) is useful to make sure that enabling/disabling the Spark containers does not result in some stopped Spark container remaining around. -
once the application is started, the following web UIs can be accessed:
- Kafka UI: http://localhost:28080/ (initially showing just
changes
andscores
topics) - Web frontend: http://localhost:8880/ (empty table until a classifier model is generated)
- Spark master UI: http://localhost:8080/ (no particular need to access it)
- Spark worker UI: http://localhost:8081/ (no particular need to access it)
- Spark application UI: http://localhost:4040/ (UI maintained by the Spark processor, no need to access it)
- Kafka UI: http://localhost:28080/ (initially showing just
-
to develop in Python, you need to create a virtual environment and install dependencies
$ cd train-python $ python -m venv myenv $ source myenv/bin/activate $ pip install -U pip $ pip install -r requirements.txt
then you can either edit (and run when ready) script
train.py
, or better, you can startjupyter-lab
and opentrain.ipynb
and edit/execute it one cell at a time; either way, make sureJAVA_HOME
is set and points to a Java 11 installation before -
to develop in Java, open the Maven project under
train-java
with your favorite IDE (e.g., IntelliJ IDEA), and then edit (and run when ready) classit.unibz.inf.rtbdp.Train
. -
the supplied skeletons of class
Train.java
and scripttrain.py
/train.ipynb
only contain the definition of a dummy classifier that always predict the first category (label 0.0, not even the majority class), with very bad results. It's there just to allow you running the script/notebook/class, get some 'model' and see it being used by the processor and eventually the frontend: you will later have to edit the code and define a proper classifier (scripttrain_solution.{py,ipynb}
and classTrainSolution.java
contain such classifier definition, but they represent the solutions of this lab, are provided later and do not look at them right now). You should be ready to develop now. -
the train code always saves the model under
./model
, which is then picked up by the processor that will restart computing predictions: if the model is 'broken' (i.e., will fail at prediction time), the processor should in principle cope with that and try again later until the model is fixed, but if this does not work and the processor crashes, you may restart it withdocker compose up -d processor
(after fixing the model!).
What You Have to Do
You have to go through the following exercises 0, 1, 2, which eventually will provide you a working news classifier applied in a streaming fashion (solutions provided later), after which you may try the bonus exercise consisting in using Spark NLP for the same text classification task (note: it's computationally heavy and may not work out-of-the-box on your OS/architecture).
Exercise 0
Start the Docker Compose application and run the supplied train code as is. This will produce a dummy classification model that will be used to generate dummy predictions for the news not coming with a category (i.e., all news from sources different from DailyMail). If everything works properly, you should be able to see a populated news table in the web frontend (where most news will have the category matching label 0.0, typically 'football' but this is not deterministic).
Exercise 1
You have to modify the definition of the classification pipeline in train.ipynb
/ train.py
/ Train.java
by defining proper feature extraction from text and use a proper classifier.
The instructions in the code indicate a possible solution based on standard TF-IDF text representation and a simple Logistic Regression classifier, but feel free to play with alternatives. While developing, you may find useful to check the reference documentation of Spark MLlib for feature extraction/transformation and for supervised classification. Also check the example provided during the lecture, e.g., the one for Python.
Once done (and at any time), run the train code in order to generate a model, which will be immediately used for online predictions and whose performance on the test set will be displayed at the end of the train procedure.
Exercise 2
Here you are asked to tune the regParam
of the Logistic Regression classifier (if you used a different classifier, then adapt the code picking the hyper-parameter(s) of such classifier) via grid search using 3-fold cross validation. I.e., for each possible candidate hyper-parameter value combination (here, 5 combinations), the training data will be split in 3 folds and the model will be trained on 2 folds and tested on the remaining one rotating it each time, computing an average performance metric (here and by default, micro-averaged F1) and picking the hyper-parameter value that maximizes it. Follow the instructions in the code, then run the train code again to see this process at work and generate a new model.
Bonus Exercise
If you finish the previous exercises, you may consider trying Spark NLP for this text classification task. Note that you would need further 5GB of free disk space and 16 GB RAM or more, and that Spark NLP might have problems running under Windows or Mac OS X on ARM, due to its reliance on native code (TensorFlow): the following steps have been tested on Linux and Mac (both Intel/ARM) but not on Windows. You should:
-
edit
.env
, uncommenting the following lines (see instructions there as well)PROCESSOR_SUBMIT_ARGS=--driver-memory 4g --conf spark.executor.memory=4g # allocate enough RAM to Spark processes PROCESSOR_PACKAGES=org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.2,com.johnsnowlabs.nlp:spark-nlp_2.12:4.2.8 # for Intel/AMD #PROCESSOR_PACKAGES=org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.2,com.johnsnowlabs.nlp:spark-nlp-m1_2.12:4.2.8 # for Apple M1 (incl. M1 pro)
-
restart the Docker Compose application
docker compose down # no need to specify -v as can reuse existing data docker compose up
-
in Python, you need to include the necessary options and JAR files when creating the SparkSession:
spark = SparkSession \ .builder \ .appName("processor") \ .config("spark.sql.shuffle.partitions", 4) \ .config("spark.driver.memory", "4G") \ .config("spark.kryoserializer.buffer.max", "2000M") \ .config("spark.driver.maxResultSize", "0") \ .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.2,com.johnsnowlabs.nlp:spark-nlp_2.12:4.2.8") \ .getOrCreate()
-
in Java, you need to include the necessary options as shown below when creating the SparkSession; JARs are already imported as Maven dependencies for Intel/Amd, whereas for Apple M1 you need to edit
pom.xml
, commenting the base dependency for Spark NLP and uncommenting the Spark NLP one with the-m1
qualifierSparkSession spark = SparkSession .builder() .appName("processor") .config("spark.sql.shuffle.partitions", 4) .config("spark.driver.memory", "4G") .config("spark.kryoserializer.buffer.max", "2000M") .config("spark.driver.maxResultSize", "0") .getOrCreate();
-
after the previous setup, you are ready to revise the pipeline including Spark NLP components, such as:
- a
DocumentAssembler
transformer, to maptext
to adocument
annotation; - a
UniversalSentenceEncoder
transformer, to map thedocument
annotation to a 512-elementembeddings
annotation; - a
ClassifierDLApproach
estimator, i.e., a densely connected neural classifier trained onembeddings
/label
and producing aprediction_annotation
; - a
SQLTransformer
transformer, to extract theprediction
class number via SQL expressionSELECT *, CAST(element_at(prediction_annotation, 1).result AS DOUBLE) AS prediction FROM __THIS__
; - an
IndexToString
transformer from Spark MLlib as before Note thatCrossValidator
is apparently not compatible with Spark NLP (it triggers training for the different folds / grid parameter candidates, but there are issues consuming Spark NLP annotations), so do some hyper-parameter fine tuning manually (e.g., learning rate and dropout, may start from 0.01 and 0.1)
- a
-
once you save the model, it will be automatically imported by the processor in the Docker Compose application, which will require ~10 GB RAM (be sure to have it available, or the machine will start swapping)