From 49a505a5cd03d0eb9ea286baf493107f14cc74cb Mon Sep 17 00:00:00 2001 From: Francesco Corcoglioniti <francesco.corcoglioniti@gmail.com> Date: Wed, 8 May 2024 15:37:22 +0200 Subject: [PATCH] Further suggestions for exercise 2 --- .env.example | 2 +- README.md | 2 +- .../src/main/java/it/unibz/inf/rtbdp/Processor.java | 9 +++++++-- processor-python/processor.py | 10 +++++++--- 4 files changed, 16 insertions(+), 7 deletions(-) diff --git a/.env.example b/.env.example index aba667c..13d7a4f 100644 --- a/.env.example +++ b/.env.example @@ -16,7 +16,7 @@ PROCESSOR_JAVA_CLASS=it.unibz.inf.rtbdp.Processor # - huobi(192 pairs, 96 trades/s), # - hitbtc(714 pairs, 1.3 trades/s), bitso(27 pairs, 0.8 trades/s) # - luno(23 pairs, 0.2 trades/s), gemini(68 pairs, 0.1 trades/s) -EXCHANGE=bitso +EXCHANGE=hitbtc # PostgreSQL DB parameters (no need to change them) USERID=user diff --git a/README.md b/README.md index 3fc7c01..ce4319a 100644 --- a/README.md +++ b/README.md @@ -59,7 +59,7 @@ Macro-components: - as a self-contained (Flink Mini Cluster) application outside Docker, which is what you can use for developing the processor code - as a Flink cluster (http://localhost:8081/) inside Docker comprising three containers: Job Manager, Task Manager, and a Flink client that submits the processor job to the Job Manager * *PostgreSQL + pgAdmin* (http://localhost:20080/) - stores time window data (prices and volumes) in two tables directly populated by Flink, with pgAdmin providing a web UI to browse DB content (may use `psql` as well) -* *Grafana* (http://localhost:23000/) - displays a dashboard with candlesticks and volumes visualization for the data in PostgreSQL +* *Grafana* (http://localhost:23000/) - displays a dashboard with [candlesticks](https://en.wikipedia.org/wiki/Candlestick_chart) and volumes visualization for the data in PostgreSQL Cryptocurrency trades obtained from *coincap* and injected into Kafka are JSON records like the following ones: ```json diff --git a/processor-java/src/main/java/it/unibz/inf/rtbdp/Processor.java b/processor-java/src/main/java/it/unibz/inf/rtbdp/Processor.java index c442624..9bfbeeb 100644 --- a/processor-java/src/main/java/it/unibz/inf/rtbdp/Processor.java +++ b/processor-java/src/main/java/it/unibz/inf/rtbdp/Processor.java @@ -104,7 +104,7 @@ public class Processor { ") WITH (" + getOutputConnectorConfig(cmd, "candlesticks") + ")"); // Define the query to populate table 'candlesticks' - Table candlesticks = tenv.sqlQuery("" + + Table candlesticksQuery = tenv.sqlQuery("" + "SELECT `window_start` AS `ts_start`,\n" + " `window_end` AS `ts_end`,\n" + " `base` || ' / ' || `quote` AS `currency_pair`,\n" + @@ -119,7 +119,7 @@ public class Processor { "GROUP BY `base`, `quote`, `window_start`, `window_end`"); // Enqueue an INSERT statement to populate table 'candlesticks' with the corresponding query - statementsToExecute.addInsert("candlesticks", candlesticks); + statementsToExecute.addInsert("candlesticks", candlesticksQuery); // Define sink table 'volumes' writing to PostgreSQL tenv.executeSql("" + @@ -150,6 +150,11 @@ public class Processor { // more easily check whether your output is correct. // + + // Table volumesQuery = ... // compute via SQL/Table API expression (see how 'candlesticksQuery' as reference) + + // statementsToExecute.addInsert("volumes", volumesQuery); // Enqueue INSERT statement to populate table 'volumes' + // <END EXERCISES> // Execute all statements as a single job diff --git a/processor-python/processor.py b/processor-python/processor.py index 288e8b8..7527370 100755 --- a/processor-python/processor.py +++ b/processor-python/processor.py @@ -83,7 +83,7 @@ def main(): """) # Define the query to populate table 'candlesticks' - candlesticks = tenv.sql_query(""" + candlesticks_query = tenv.sql_query(""" SELECT `window_start` AS `ts_start`, `window_end` AS `ts_end`, `base` || ' / ' || `quote` AS `currency_pair`, @@ -99,7 +99,7 @@ def main(): """) # Enqueue an INSERT statement to populate table 'candlesticks' with the corresponding query - statements_to_execute.add_insert("candlesticks", candlesticks) + statements_to_execute.add_insert("candlesticks", candlesticks_query) # Define sink table 'volumes' writing to PostgreSQL tenv.execute_sql(f""" @@ -127,10 +127,14 @@ def main(): # of the rows you have to produce. Refer to the 'SELECT' query generating table `candlesticks` above as a template # of window-based query you may start from (remember also to enqueue an INSERT statement to statements_to_execute # as otherwise your code won't run). To check your solution, you are suggested to run this script with option - # '--dry-run' that will print computed rows to stdout (instead of writing them to PostgreSQL), so that you# can + # '--dry-run' that will print computed rows to stdout (instead of writing them to PostgreSQL), so that you can # more easily check whether your output is correct. # + # volumes_query: Table = ... # compute via SQL/Table API expression (see how 'candlesticks_query' as reference) + + # statements_to_execute.add_insert("volumes", volumes_query) # Enqueue an INSERT statement to populate table 'volumes' + # <END EXERCISES> # Execute all statements as a single job -- GitLab