Skip to content
Snippets Groups Projects
Francesco Corcoglioniti's avatar
Updated instructions and solutions for bonus exercise using Spark NLP (Linux, Mac, untested on Windows)
17494936
History

Spark MLlib + Kafka + Flask/RxJS/GridJS Lab

Screenshot of the application frontend

Overview

This lab showcases an end-to-end Docker Compose application featuring Spark MLlib along with Kafka and a simple Python Flask frontend, to collect, categorize and display sport news items from various RSS feeds.

The main task implemented by the application, using Spark, is to train a news classifier model and apply it to collected news items. We start from some selected DailyMail feeds for specific sports (e.g., football, tennis, racing, etc): they provide use some text, in the form of news titles and short abstracts, along with a category associated to each specific feed (10 categories total). We use a batch of this data from Kafka to train a multiclass classifier (train-java, train-python folders, which you will have to edit), and then apply this model to classify the stream of RSS data, reading and writing back to Kafka. A Python Flask frontend then consumes classified news offering a simple web search UI. While we don't do online learning (no support for that in the framework), the model can in principle retrained periodcally and the code is already setup to switch to the newest model for the predictions.

The following figure (svg version) depicts the application architecture in terms of components (e.g., internal Docker containers), data flows (solid links) and other dependencies (dotted links):

flowchart LR
    rss("RSS feeds"):::compext
    ui("HTML/JS web frontend\nrunning in the browser"):::compext
    train("Spark offline training code\n(Python notebook/script or Java)"):::compext
    model[("\nClassifier Model\n(directory ./model)")]:::compext
    subgraph app [Docker Compose application]
        zookeeper("ZooKeeper"):::comp
        broker("Kafka Broker"):::comp
        connect("Kafka Connect\nwith RSS source connector"):::comp
        subgraph spark ["Spark cluster"]
            direction LR
            sparkmaster("Spark Master"):::comp
            sparkworker("Spark Worker"):::comp
            sparkdriver("Spark processor\n(driver submitting job)"):::comp
        end
        flask("Python Flask\nweb server"):::comp
        kafkaui("Kafka UI"):::comp
    end
    rss --> connect
    connect --> broker
    broker <--> spark
    broker --> flask
    flask ------> ui
    broker -.- zookeeper
    kafkaui -.- broker
    sparkdriver -.- sparkmaster
    sparkmaster -.- sparkworker
    broker --> train
    train --> model
    model --> sparkdriver
    classDef scope fill:#fff,stroke:#333,stroke-width:1px,stroke-dasharray: 5 5,color:#444,font-size:10pt;
    classDef comp fill:#fafafa,stroke:#333,stroke-width:1.5px,font-size:10pt;
    classDef compsub fill:#eee,stroke:#333,stroke-width:1.5px,font-size:10pt;
    classDef compext fill:#fff,stroke:#333,stroke-width:1.5px,font-size:10pt;
    classDef dataext fill:#fff,stroke:#333,stroke-width:1.5px,font-size:10pt;
    classDef none fill:#fff,stroke:#fff,stroke-width:0px,font-size:0pt;
    class app scope
    class spark scope
    class connect comp

Macro-components:

  • RSS feeds - they include 10 feeds from Dailymain each one associated to a specific sport category, which we use for training, plus additional 9 uncategorized feeds from other sport news sources, whose content we are going to classify
  • Kafka connect - runs one RSS source connector to inject RSS news items into Kafka topic rss
  • Kafka + ZooKeeper + Kafka UI - stores the input rss topic as well as the classified output topic rss_categorized, which is then read from the frontend; Kafka UI is available at http://localhost:28080/
  • Spark offline training code - dumps all categorized data in Kafka, trains a text classification model and saves it to ./model
  • Spark processor - loads the classifier model from ./model (once available, reloading it if it changes) and uses it to read news items from the rss topic, classify them, and write the result into rss_categorized
  • Python Flask server - uses the Kafka consumer API to collect data from topic rss_categorized, keeping the last 72 hours of news and producing a HTML/JS ui that polls the server for news every minute, displaing them in a searchable table

How to Run the Application

This is a Docker Compose application that involves building custom images for its Java/Python processor components.

  • make sure you have both Docker and Java 11 installed (even if you work in Python), while for Python you need version 3.7 or later. If you run this on Windows, you will need some native library related to HDFS: you may follow the instructions here and when asked to install winutils, use the binaries (.dll and .exe) for Spark 3.3.1 from here

  • clone this repository and copy .env.example into .env, possibly editing it (just creating .env is enough):

    $  git clone git@gitlab.inf.unibz.it:rtbdp/labs/lab_spark_rss.git
    $  cd lab_spark_rss
    $  cp .env.example .env
  • in a terminal, use Docker Compose to start or stop all the required components (in this case they include the supplied Spark processor, that will remain idle waiting for a model):

    $  docker compose up                        # to start the application
    $  docker compose down -v --remove-orphans  # to stop the application, deleting all data and dangling containers (--remove-orphans)

    where --remove-orphans (optional) is useful to make sure that enabling/disabling the Spark containers does not result in some stopped Spark container remaining around.

  • once the application is started, the following web UIs can be accessed:

  • to develop in Python, you need to create a virtual environment and install dependencies

    $ cd train-python
    $ python -m venv myenv
    $ source myenv/bin/activate
    $ pip install -U pip
    $ pip install -r requirements.txt

    then you can either edit (and run when ready) script train.py, or better, you can start jupyter-lab and open train.ipynb and edit/execute it one cell at a time; either way, make sure JAVA_HOME is set and points to a Java 11 installation before

  • to develop in Java, open the Maven project under train-java with your favorite IDE (e.g., IntelliJ IDEA), and then edit (and run when ready) class it.unibz.inf.rtbdp.Train.

  • the supplied skeletons of class Train.java and script train.py / train.ipynb only contain the definition of a dummy classifier that always predict the first category (label 0.0, not even the majority class), with very bad results. It's there just to allow you running the script/notebook/class, get some 'model' and see it being used by the processor and eventually the frontend: you will later have to edit the code and define a proper classifier (script train_solution.{py,ipynb} and class TrainSolution.java contain such classifier definition, but they represent the solutions of this lab, are provided later and do not look at them right now). You should be ready to develop now.

  • the train code always saves the model under ./model, which is then picked up by the processor that will restart computing predictions: if the model is 'broken' (i.e., will fail at prediction time), the processor should in principle cope with that and try again later until the model is fixed, but if this does not work and the processor crashes, you may restart it with docker compose up -d processor (after fixing the model!).

What You Have to Do

You have to go through the following exercises 0, 1, 2, which eventually will provide you a working news classifier applied in a streaming fashion (solutions provided later), after which you may try the bonus exercise consisting in using Spark NLP for the same text classification task (note: it's computationally heavy and may not work out-of-the-box on your OS/architecture).

Exercise 0

Start the Docker Compose application and run the supplied train code as is. This will produce a dummy classification model that will be used to generate dummy predictions for the news not coming with a category (i.e., all news from sources different from DailyMail). If everything works properly, you should be able to see a populated news table in the web frontend (where most news will have the category matching label 0.0, typically 'football' but this is not deterministic).

Exercise 1

You have to modify the definition of the classification pipeline in train.ipynb / train.py / Train.java by defining proper feature extraction from text and use a proper classifier.

The instructions in the code indicate a possible solution based on standard TF-IDF text representation and a simple Logistic Regression classifier, but feel free to play with alternatives. While developing, you may find useful to check the reference documentation of Spark MLlib for feature extraction/transformation and for supervised classification. Also check the example provided during the lecture, e.g., the one for Python.

Once done (and at any time), run the train code in order to generate a model, which will be immediately used for online predictions and whose performance on the test set will be displayed at the end of the train procedure.

Exercise 2

Here you are asked to tune the regParam of the Logistic Regression classifier (if you used a different classifier, then adapt the code picking the hyper-parameter(s) of such classifier) via grid search using 3-fold cross validation. I.e., for each possible candidate hyper-parameter value combination (here, 5 combinations), the training data will be split in 3 folds and the model will be trained on 2 folds and tested on the remaining one rotating it each time, computing an average performance metric (here and by default, micro-averaged F1) and picking the hyper-parameter value that maximizes it. Follow the instructions in the code, then run the train code again to see this process at work and generate a new model.

Bonus Exercise

If you finish the previous exercises, you may consider trying Spark NLP for this text classification task. Note that you would need further 5GB of free disk space and 16 GB RAM or more, and that Spark NLP might have problems running under Windows or Mac OS X on ARM, due to its reliance on native code (TensorFlow): the following steps have been tested on Linux and Mac (both Intel/ARM) but not on Windows. You should:

  • edit .env, uncommenting the following lines (see instructions there as well)

    PROCESSOR_SUBMIT_ARGS=--driver-memory 4g --conf spark.executor.memory=4g                                            # allocate enough RAM to Spark processes
    PROCESSOR_PACKAGES=org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.2,com.johnsnowlabs.nlp:spark-nlp_2.12:4.2.8       # for Intel/AMD
    #PROCESSOR_PACKAGES=org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.2,com.johnsnowlabs.nlp:spark-nlp-m1_2.12:4.2.8   # for Apple M1 (incl. M1 pro)
  • restart the Docker Compose application

    docker compose down  # no need to specify -v as can reuse existing data
    docker compose up
  • in Python, you need to include the necessary options and JAR files when creating the SparkSession:

    spark = SparkSession \
        .builder \
        .appName("processor") \
        .config("spark.sql.shuffle.partitions", 4) \
        .config("spark.driver.memory", "4G") \
        .config("spark.kryoserializer.buffer.max", "2000M") \
        .config("spark.driver.maxResultSize", "0") \
        .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.2,com.johnsnowlabs.nlp:spark-nlp_2.12:4.2.8") \
        .getOrCreate()
  • in Java, you need to include the necessary options as shown below when creating the SparkSession; JARs are already imported as Maven dependencies for Intel/Amd, whereas for Apple M1 you need to edit pom.xml, commenting the base dependency for Spark NLP and uncommenting the Spark NLP one with the -m1 qualifier

    SparkSession spark = SparkSession
        .builder()
        .appName("processor")
        .config("spark.sql.shuffle.partitions", 4)
        .config("spark.driver.memory", "4G")
        .config("spark.kryoserializer.buffer.max", "2000M")
        .config("spark.driver.maxResultSize", "0")
        .getOrCreate();
  • after the previous setup, you are ready to revise the pipeline including Spark NLP components, such as:

    • a DocumentAssembler transformer, to map text to a document annotation;
    • a UniversalSentenceEncoder transformer, to map the document annotation to a 512-element embeddings annotation;
    • a ClassifierDLApproach estimator, i.e., a densely connected neural classifier trained on embeddings / label and producing a prediction_annotation;
    • a SQLTransformer transformer, to extract the prediction class number via SQL expression SELECT *, CAST(element_at(prediction_annotation, 1).result AS DOUBLE) AS prediction FROM __THIS__;
    • an IndexToString transformer from Spark MLlib as before Note that CrossValidator is apparently not compatible with Spark NLP (it triggers training for the different folds / grid parameter candidates, but there are issues consuming Spark NLP annotations), so do some hyper-parameter fine tuning manually (e.g., learning rate and dropout, may start from 0.01 and 0.1)
  • once you save the model, it will be automatically imported by the processor in the Docker Compose application, which will require ~10 GB RAM (be sure to have it available, or the machine will start swapping)