Skip to content
Snippets Groups Projects
README.md 12.3 KiB
Newer Older
# Kafka Producer/Consumer API + Docker Lab

![Screenshot of the application frontend](screenshot.png)
In this lab we build a simple application using Kafka consisting of:
* a Kafka cluster consisting of a single broker, using ZooKeeper and with Kafka UI on top of it
* a producer (given, either Java or Python) that listens to real-time [Wikipedia edit events](https://wikitech.wikimedia.org/wiki/Event_Platform/EventStreams) and loads a subset of them into a Kafka `edits` topic;
* a processor (you will have to implement it, solution will be made available at the end of the lab) that consumes `edits` messages, computes statistics (# edits, total page length changes) for various Wikipedia chapters (en, it, de, fr...), storing statistics back to Kafka in a different `stats` topic
* a [Kafka WebSocket proxy](https://kpmeen.gitlab.io/kafka-websocket-proxy/) that enables JavaScript code in a browser to consume/produce Kafka topics
* a simple `index.html` + `index.js` web frontend that uses the proxy to consume events from `edits` (initially) and `stats` (after you implement the processor), showing statistics in the page

The following figure depicts the application architecture in terms of components (e.g., internal Docker containers), data flows (solid links) and other dependencies (dotted links):

```mermaid
flowchart LR
    wiki(Wikipedia SSE endpoint):::compext
    ui("browser showing\nRxJS web UI\n(index.html)"):::compext
    processorext("Java/Python processor\n(run outside Docker)"):::compext
    subgraph app [Docker Compose application]
        zookeeper("ZooKeeper"):::comp
        broker("Kafka Broker"):::comp
        producer("Java/Python\nproducer"):::comp
        processor("Java/Python processor\n(run as Docker container)"):::comp
        wsproxy(Kafka WebSocket Proxy):::comp
        webserver(Apache Web Server):::comp
        kafkaui("Kafka UI"):::comp
    end
    wiki --> producer
    producer --> broker
    broker <--> processor
    broker <--> processorext
    broker --> wsproxy
    wsproxy --> ui
    webserver --> ui
    broker -.- zookeeper
    kafkaui -.- broker
    classDef scope fill:#fff,stroke:#333,stroke-width:1px,stroke-dasharray: 5 5,color:#444,font-size:10pt;
    classDef comp fill:#fafafa,stroke:#333,stroke-width:1.5px,font-size:10pt;
    classDef compsub fill:#eee,stroke:#333,stroke-width:1.5px,font-size:10pt;
    classDef compext fill:#fff,stroke:#333,stroke-width:1.5px,font-size:10pt;
    classDef none fill:#fff,stroke:#fff,stroke-width:0px,font-size:0pt;
    class app scope
```
Note that:
* although very simple, the overall setup of the application is close to what you will likely implement in your course project (i.e., ingest some feed of real-time data using Kafka, process it, show/interact with results in a frontend)
* the Wikipedia EventStreams source can be a reasonable source of real-time data also for your project: after all, events happening around the World are quickly reflected in Wikipedia as a result of the collaborative editing activity of its users, and you may imagine starting from this lab application (e.g., also fetching page data from Wikipedia) to build something interesting for your project; see [here](https://codepen.io/ottomata/pen/LYpPpxj) for real-time examples of data flowing through EventStreams
* the Kafka WebSocket proxy is a simple component that you may also use in your projects to avoid building a backend application whose sole purpose is to take result data from Kafka and send it to the UI (still, if you use this proxy you will have to implement JavaScript code in the UI). Note we use an older version 1.1.1 that works more reliably than more recent versions.
* clone this repository and (optionally, defaults are already good) setup your `.env` file for Docker Compose
  ```
  $  git clone https://gitlab.inf.unibz.it/rtbdp/labs/lab_kafka.git
  $  cd lab_kafka
  $  cp .env.example .env
  ```
* in a terminal, start Docker Compose to launch a Kafka broker, the Kafka UI, and a
  ```
  $  docker-compose up
  ```
* open the Kafka UI at `http://localhost:28080` (if you did not change port) - while developing, you'll find easier to check (and possibly wipe out) the content of Kafka using the UI
* if you want to develop in Java, open the Maven project in `/processor-java` in IntelliJ IDEA (or your Maven-aware IDE)
* if you want to develop in Python, create a virtual environment and install dependencies, and enter directory `/processor-python`
  $ python -m venv myenv
  $ source myenv/bin/activate
  $ pip install -U pip
  $ pip install -r requirements.txt
  ```
**Note for Apple ARM CPU users**: there was a problem running the Python producer/processor as Docker container, due to difficulties of installing `confluent_kafka` on ARM platform (aarch64) like the one of recent Apple machines. Specifically, `pip install confluent_kafka` on ARM requires compiling some C code against a recent librdkafka (the C client library for Kafka), which basically requires cloning the librdkafka source repository and compiling it (usual `configure`, `make`, `make install` commands, after required libraries have been installed as well). The Dockerfiles for the Python producer/processor have been updated accordingly and successfully tested on an ARM server environment. Instead, for issues related to installing `confluent_kafka` on ARM *outside* Docker, please refer to [`these instructions`](https://github.com/confluentinc/confluent-kafka-python/issues/1190#issuecomment-1195952767). These problems are related to how `confluent_kafka` is distributed, so it is likely that `pip install confluent_kafka` on ARM might work out of the box on more recent versions of `confluent_kafka`.


## Starting Point
Initially, the processor computing statistics is missing (if you run it, there is only the skeleton of it and it will do nothing) and the `index.html` UI may only display data from the `edits` Kafka topic.

Instead, the producer has been integrated into the Docker Compose application (you may look at `docker-compose.yml` and the `Dockerfile`s for the producer to see how it has been done) and it immediately will start getting data out from the Wikipedia SSE endpoint and injecting them into the `edits` topic of Kafka.

You can observe these `edits` messages in Kafka UI, and you can already open the frontend at `http://localhost:8888/` (if you haven't changed the port), select the `edits` topic in the top bar, and observe the raw data table in the webpage being updated as a result of the events fetched from the Kafka `edits` topic.
From Kafka UI (but also in the logs in the browser console) you can see examples of `edits` records like the following:

```json
{
	"$schema": "/mediawiki/recentchange/1.0.0",
	"meta": {
		"uri": "https://en.wikipedia.org/wiki/Jos%C3%A9_Luis_Gonz%C3%A1lez_(rugby_union)",
		"request_id": "f6e8c8a9-a8dd-4bc9-924c-2cec8ebb4ba8",
		"id": "4e9b4c3a-9e21-4425-bb50-c9f6f7c2956d",
		"dt": "2023-03-31T08:37:45Z",
		"domain": "en.wikipedia.org",
		"stream": "mediawiki.recentchange",
		"topic": "codfw.mediawiki.recentchange",
		"partition": 0,
		"offset": 559625004
	},
	"id": 1617455216,
	"type": "edit",
	"namespace": 0,
	"title": "José Luis González (rugby union)",
	"comment": "added [[Category:Expatriate rugby union players in France]] using [[WP:HC|HotCat]]",
	"timestamp": 1680251865,
	"user": "Crowsus",
	"bot": false,
	"minor": false,
	"length": {
		"old": 4015,
		"new": 4069
	},
	"revision": {
		"old": 1147488605,
		"new": 1147488622
	},
	"server_url": "https://en.wikipedia.org",
	"server_name": "en.wikipedia.org",
	"server_script_path": "/w",
	"wiki": "enwiki",
	"parsedcomment": "added <a href=\"/wiki/Category:Expatriate_rugby_union_players_in_France\" title=\"Category:Expatriate rugby union players in France\">Category:Expatriate rugby union players in France</a> using <a href=\"/wiki/Wikipedia:HC\" class=\"mw-redirect\" title=\"Wikipedia:HC\">HotCat</a>",
	"stats": [
		{
			"wiki": "enwiki",
			"num_edits": 1,
			"delta_len": 54
		}
	]
}
```
For this lab, you need to pay attention to the last `"stats"` field which is added by the producer and includes some statistics for that particular edit event, i.e., the number of edits (1, by definition), and the change in page length `"delta_len"` (here, added 54 characters to the page, but can be negative as well). Note that `"stats"` contain an array of statistics objects, one for wikipedia chapter, but since here we are dealing with a single edit event on a specific chapter page, there is only one object under `"stats"`.
As we are not computing statistics here, the UI will only display the content of the `stats` field in the last message consumed from the `edits` topic, which reflects the statistics for that individual change.
## Excercise 1
Implement the processor either in Python or Java. Assume there will be only one instance of it running at any time.

The processor has to go through the `edits` messages in Kafka and aggregate the statistics for each wikipedia chapter, by summing the values under `"num_edits"` and `"delta_len"` so to compute the total number of edits and the total page length change since we started receiving data from the Wikipedia SSE endpoint. The processor has to keep track of these aggregate numbers for all the Wikipedia chapters encountered so far (e.g., use a dictionary). Every time an `edits` message is processed, a corresponding *statistics* message has to be produced on the `stats` Kafka topic, where the latest statistics for *all* the encountered Wikipedia chapters have to be emitted. Here is an example of produced `stats` message:

```json
{
	"stats": [
		{
			"wiki": "enwiki",
			"num_edits": 517,
			"delta_len": 38270
		},
		{
			"wiki": "dewiki",
			"num_edits": 115,
			"delta_len": 13184
		},
		{
			"wiki": "frwiki",
			"num_edits": 147,
			"delta_len": 16365
		},
		{
			"wiki": "itwiki",
			"num_edits": 95,
			"delta_len": 5772
		},
    ...
	]
}
```

Once you have a working processor, you may (develop and) run it outside the Docker Compose application and then switch the web frontend to display data from your newly computed `stats` topic, by using the topic selector on the top of the web page.
For developing the processor, edit `processor-java/src/main/java/it/unibz/inf/rtbdp/Processor.java` (Java) or `/processor-python/processor.py` (Python) and follow the instructions contained there. For both langauges, some utility code has been supplied to deal with topics and offset seeking (this is more useful for the next exercise).
Independently of the programming language, the general structure oIts general structure may be something like the following:

```
start with an empty statistics dictionary
create a Kafka producer
create a Kafka consumer
subscribe the consumer to `edits` topic
while there are messages coming from `edits`:
    update statistics dictionary based on the message
    send new statistics dictionary (as array) to `stats` topic
## Excercise 2
If you stop your processor, the next time you run it, it will recompute statistics from scratch on new messages, losing previous statistics that however can still be found in the Kafka `stats` topic. Therefore, your task now is to modify your processor so to load previously computed statistics. The new general structure of your processor will be:
```
start with an empty statistics dictionary

create a Kafka producer

create a Kafka consumer

subscribe the consumer to `stats` topic
read the last message from `stats`
recover statistics dictionary based on last message
unsubscribe from `stats`

subscribe the consumer to `edits` topic

while there are messages coming from `edits`:

    update statistics dictionary based on the message
    send new statistics dictionary (as array) to `stats` topic
```

A difficult step above is to seek in the `stats` stream so that you will start consuming from the last written message (note that by default you will receive only newly written messages, so you will not get the last one written before your code started). To ease this task, in `commons.py` there is a function `on_assign_seek` that you may use as follows in your code:
consumer.subscribe(["topic"], on_assign=on_assign_seek(before_end=1))
to seek to end of stream -1, i.e., re-read last element. The function also allow you seeking from beginning (`after_start`, if 0 you will start from the very first item of the stream, if greater you will skip some items). For Java, similar functionalities are provided in `Util.java`.