Skip to content
Snippets Groups Projects
Commit 49a505a5 authored by Francesco Corcoglioniti (personal)'s avatar Francesco Corcoglioniti (personal)
Browse files

Further suggestions for exercise 2

parent ddb09724
Branches master
No related tags found
No related merge requests found
......@@ -16,7 +16,7 @@ PROCESSOR_JAVA_CLASS=it.unibz.inf.rtbdp.Processor
# - huobi(192 pairs, 96 trades/s),
# - hitbtc(714 pairs, 1.3 trades/s), bitso(27 pairs, 0.8 trades/s)
# - luno(23 pairs, 0.2 trades/s), gemini(68 pairs, 0.1 trades/s)
EXCHANGE=bitso
EXCHANGE=hitbtc
# PostgreSQL DB parameters (no need to change them)
USERID=user
......
......@@ -59,7 +59,7 @@ Macro-components:
- as a self-contained (Flink Mini Cluster) application outside Docker, which is what you can use for developing the processor code
- as a Flink cluster (http://localhost:8081/) inside Docker comprising three containers: Job Manager, Task Manager, and a Flink client that submits the processor job to the Job Manager
* *PostgreSQL + pgAdmin* (http://localhost:20080/) - stores time window data (prices and volumes) in two tables directly populated by Flink, with pgAdmin providing a web UI to browse DB content (may use `psql` as well)
* *Grafana* (http://localhost:23000/) - displays a dashboard with candlesticks and volumes visualization for the data in PostgreSQL
* *Grafana* (http://localhost:23000/) - displays a dashboard with [candlesticks](https://en.wikipedia.org/wiki/Candlestick_chart) and volumes visualization for the data in PostgreSQL
Cryptocurrency trades obtained from *coincap* and injected into Kafka are JSON records like the following ones:
```json
......
......@@ -104,7 +104,7 @@ public class Processor {
") WITH (" + getOutputConnectorConfig(cmd, "candlesticks") + ")");
// Define the query to populate table 'candlesticks'
Table candlesticks = tenv.sqlQuery("" +
Table candlesticksQuery = tenv.sqlQuery("" +
"SELECT `window_start` AS `ts_start`,\n" +
" `window_end` AS `ts_end`,\n" +
" `base` || ' / ' || `quote` AS `currency_pair`,\n" +
......@@ -119,7 +119,7 @@ public class Processor {
"GROUP BY `base`, `quote`, `window_start`, `window_end`");
// Enqueue an INSERT statement to populate table 'candlesticks' with the corresponding query
statementsToExecute.addInsert("candlesticks", candlesticks);
statementsToExecute.addInsert("candlesticks", candlesticksQuery);
// Define sink table 'volumes' writing to PostgreSQL
tenv.executeSql("" +
......@@ -150,6 +150,11 @@ public class Processor {
// more easily check whether your output is correct.
//
// Table volumesQuery = ... // compute via SQL/Table API expression (see how 'candlesticksQuery' as reference)
// statementsToExecute.addInsert("volumes", volumesQuery); // Enqueue INSERT statement to populate table 'volumes'
// <END EXERCISES>
// Execute all statements as a single job
......
......@@ -83,7 +83,7 @@ def main():
""")
# Define the query to populate table 'candlesticks'
candlesticks = tenv.sql_query("""
candlesticks_query = tenv.sql_query("""
SELECT `window_start` AS `ts_start`,
`window_end` AS `ts_end`,
`base` || ' / ' || `quote` AS `currency_pair`,
......@@ -99,7 +99,7 @@ def main():
""")
# Enqueue an INSERT statement to populate table 'candlesticks' with the corresponding query
statements_to_execute.add_insert("candlesticks", candlesticks)
statements_to_execute.add_insert("candlesticks", candlesticks_query)
# Define sink table 'volumes' writing to PostgreSQL
tenv.execute_sql(f"""
......@@ -127,10 +127,14 @@ def main():
# of the rows you have to produce. Refer to the 'SELECT' query generating table `candlesticks` above as a template
# of window-based query you may start from (remember also to enqueue an INSERT statement to statements_to_execute
# as otherwise your code won't run). To check your solution, you are suggested to run this script with option
# '--dry-run' that will print computed rows to stdout (instead of writing them to PostgreSQL), so that you# can
# '--dry-run' that will print computed rows to stdout (instead of writing them to PostgreSQL), so that you can
# more easily check whether your output is correct.
#
# volumes_query: Table = ... # compute via SQL/Table API expression (see how 'candlesticks_query' as reference)
# statements_to_execute.add_insert("volumes", volumes_query) # Enqueue an INSERT statement to populate table 'volumes'
# <END EXERCISES>
# Execute all statements as a single job
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment