At the end of this tutorial, we will see data starting from an MQTT broker and ending in a PostgreSQL table.
We’ll use 2 connectors:
- Inbound MQTT connector
- Outbound SQL connector
- There will be an example of combining multiple SmartModules, known as SmartModule chaining
The Outbound connector will be using a PostgreSQL database. It will listen to the topic for new records and insert them into a table.
You can use your own PostgreSQL instance, if it can be reached over the internet. But you can still follow along by creating a PostgreSQL database at a hosting service, such as ElephantSQL.
This connector expects to take json
input from the MQTT broker, from an MQTT topic named ag-mqtt-topic
. These parameters will be reflected in the final JSON payload that gets produced to the fluvio topic mqtt-topic
MQTT connector config: mqtt.yml
# mqtt.yml
apiVersion: 0.1.0
meta:
version: 0.2.1
name: fluvio-mqtt-connector
type: mqtt-source
topic: mqtt-topic
direction: source
create-topic: true
mqtt:
url: "mqtt://test.mosquitto.org/"
topic: "ag-mqtt-topic"
timeout:
secs: 30
nanos: 0
payload_output_type: json
$ fluvio cloud connector create --config mqtt.yml
First install mosquito to follow later steps for sending JSON to our test MQTT broker
You can start one of both of the following connectors
- Connector with no transformation
- Download SmartModule for example
- Example connector config
- Start connector
- Connector with extra JSON to JSON transformation
- Download SmartModules for example
- Example connector config
- Start connector
Download json-sql
SmartModule
Example output
$ fluvio hub download infinyon/json-sql@0.1.0
downloading infinyon/json-sql@0.1.0 to infinyon-json-sql-0.1.0.ipkg
... downloading complete
... checking package
trying connection to fluvio router.dev.infinyon.cloud:9003
... cluster smartmodule install complete
SQL Connector with no transformation config
# sql.yml
apiVersion: 0.1.0
meta:
name: fluvio-sql-connector
type: sql-sink
version: 0.2.1
topic: mqtt-topic
create-topic: true
sql:
url: "postgres://user:password@db.postgreshost.example/dbname"
transforms:
- uses: infinyon/json-sql@0.1.0
with:
invoke: insert
mapping:
table: "topic_message"
map-columns:
"device_id":
json-key: "payload.device.device_id"
value:
type: "int"
default: "0"
required: true
"record":
json-key: "$"
value:
type: "jsonb"
required: true
Start No transformation connector SQL connector
$ fluvio cloud connector create --config sql.yml
Download the Jolt and Json-Sql SmartModules used by this example connector
Example output
$ fluvio hub download infinyon/json-sql@0.1.0
downloading infinyon/json-sql@0.1.0 to infinyon-json-sql-0.1.0.ipkg
... downloading complete
... checking package
trying connection to fluvio router.infinyon.cloud:9003
... cluster smartmodule install complete
$ fluvio hub download infinyon/jolt@0.1.0
downloading infinyon/jolt@0.1.0 to infinyon-jolt-0.1.0.ipkg
... downloading complete
... checking package
trying connection to fluvio router.infinyon.cloud:9003
... cluster smartmodule install complete
Connector with JSON to JSON transformation config
# sql-chain.yml
apiVersion: 0.1.0
meta:
name: fluvio-sql-connector-chain
type: sql-sink
version: 0.2.1
topic: mqtt-topic
create-topic: true
sql:
url: "postgres://user:password@db.postgreshost.example/dbname"
rust_log: "sql_sink=INFO,sqlx=WARN"
transforms:
- uses: infinyon/jolt@0.1.0
with:
spec:
- operation: shift
spec:
payload:
device: "device"
- operation: default
spec:
device:
type: "mobile"
- uses: infinyon/json-sql@0.1.0
with:
invoke: insert
mapping:
table: "topic_message"
map-columns:
"device_id":
json-key: "device.device_id"
value:
type: "int"
default: "0"
required: true
"record":
json-key: "$"
value:
type: "jsonb"
required: true
Start SQL connector with JSON transformation
$ fluvio cloud connector create --config sql-chain.yml
Install pgcli
to follow the later DB validation steps
https://www.pgcli.com
📋 Using example JSON, this is the sequence of events that will occur
- (user) Publish JSON to MQTT broker
- (Inbound MQTT connector) Produce data to fluvio topic
mqtt-topic
- Produce a transformed JSON object with config parameter data with the name of the MQTT topic embedded
- (Outbound SQL connector) Consume the inbound record from topic
mqtt-topic
- Apply transformations to record (JSON to JSON connector only)
- Insert record into DB
- (user) Validate JSON record in PostgreSQL database
This is what our input JSON to MQTT looks like
example JSON (formatted)
{
"device": {
"device_id": 17,
"name": "device17"
}
}
Run the following to send a test JSON message to the demo MQTT broker with mosquito
([Installation steps](h#the-actual-test
Command:
$ mosquitto_pub -h test.mosquitto.org -t ag-mqtt-topic -m '{"device": {"device_id":17, "name":"device17"}}'
Produced data in topic:
$ fluvio consume mqtt-topic -B
Consuming records from the beginning of topic 'mqtt-topic'
{"mqtt_topic":"ag-mqtt-topic","payload":{"device":{"device_id":17,"name":"device17"}}}
Produced data in topic:
Run the following to connect to PostgreSQL DB with pgcli
(Installation steps)
Use pgcli
to examine the database.
$ pgcli -U user -h db.postgreshost.example -p 5432 dbname
Check that the JSON from MQTT has been inserted into table
select * from topic_message;
Example output from both connectors
+-----------+-----------------------------------------------------------------------------------------------+
| device_id | record |
|-----------+-----------------------------------------------------------------------------------------------|
| 17 | {"payload": {"device": {"name": "device17", "device_id": 17}}, "mqtt_topic": "ag-mqtt-topic"} |
| 17 | {"device": {"name": "device17", "type": "mobile", "device_id": 17}} |
+-----------+-----------------------------------------------------------------------------------------------+
SELECT 2
Time: 0.080s
Output explanation:
In both cases, we’ve used the device_id key in the MQTT JSON as the value in the column of the same name. The first row is from our No Transformation connector. The record data appears unchanged from what we saw in the topic.
Resulting record
{
"payload": {
"device": {
"name": "device17",
"device_id": 17
}
},
"mqtt_topic": "ag-mqtt-topic"
}
The second row is from our JSON to JSON transformation connector
We’ve shifted
the topic JSON data, so it more closely resembles the original JSON.
Then we enrich the payload by adding the .device.type
key with the value mobile before inserting into the DB
{
"device": {
"name": "device17",
"type": "mobile",
"device_id": 17
}
}
- Transformations in the
transforms
section of SQL Connector config are deliberately decoupled from connectors. We can move a SmartModule from an Inbound to an Outbound connector and accomplish the same result. The decision depends on the shape of the data you want to store in a topic. For Inbound connectors, the data is transformed before sending to Fluvio topic, while for Outbound, it happens after the data is sent to Fluvio topic but before it is sent to the connector.
Let’s try it.
Modify our mqtt.yml
config with one transformation that we are moving from the SQL Connector:
# mqtt.yml
apiVersion: 0.1.0
meta:
version: 0.2.1
name: fluvio-mqtt-connector
type: mqtt-source
topic: mqtt-topic
direction: source
create-topic: true
mqtt:
url: "mqtt://test.mosquitto.org/"
topic: "ag-mqtt-topic"
timeout:
secs: 30
nanos: 0
payload_output_type: json
transforms:
- uses: infinyon/jolt@0.1.0
with:
spec:
- operation: shift
spec:
payload:
device: "device"
- operation: default
spec:
device:
type: "mobile"
We don’t need this transformation on SQL Connector anymore, remove it from sql-chain.yml
file:
# sql-chain.yml
apiVersion: 0.1.0
meta:
name: fluvio-sql-connector-chain
type: sql-sink
version: 0.2.1
topic: mqtt-topic
create-topic: true
sql:
url: "postgres://user:password@db.postgreshost.example/dbname"
transforms:
- uses: infinyon/json-sql@0.1.0
with:
invoke: insert
mapping:
table: "topic_message"
map-columns:
"device_id":
json-key: "device.device_id"
value:
type: "int"
default: "0"
required: true
"record":
json-key: "$"
value:
type: "jsonb"
required: true
We need to re-create connectors:
$ fluvio cloud connector delete fluvio-mqtt-connector
$ fluvio cloud connector create --config mqtt.yml
also, we delete one now obsolete SQL connector and re-create another without the transformation that we moved to MQTT:
$ fluvio cloud connector delete fluvio-sql-connector-chain
$ fluvio cloud connector delete fluvio-sql-connector
$ fluvio cloud connector create --config sql-chain.yml
And now, if we execute command:
$ mosquitto_pub -h test.mosquitto.org -t ag-mqtt-topic -m '{"device": {"device_id":17, "name":"device17"}}'
The new record differs from what we saw previously:
$ fluvio consume mqtt-topic -B
Consuming records from the beginning of topic 'mqtt-topic'
{"mqtt_topic":"ag-mqtt-topic","payload":{"device":{"device_id":17,"name":"device17"}}}
{"device":{"device_id":17,"name":"device17","type":"mobile"}}
We can see that the record was transformed before producing to Fluvio cluster.
However, in the database table, the new record equals to the previous one.
+-----------+-----------------------------------------------------------------------------------------------+
| device_id | record |
|-----------+-----------------------------------------------------------------------------------------------|
| 17 | {"payload": {"device": {"name": "device17", "device_id": 17}}, "mqtt_topic": "ag-mqtt-topic"} |
| 17 | {"device": {"name": "device17", "type": "mobile", "device_id": 17}} |
| 17 | {"device": {"name": "device17", "type": "mobile", "device_id": 17}} |
+-----------+-----------------------------------------------------------------------------------------------+
SELECT 3
Time: 0.080s
Although the final result is the same (the same records will end up in SQL database with the same content), choosing the proper side of a pipeline where transformations should reside may significantly affect performance on high volumes of data.
After setting up our end-to-end MQTT to SQL scenario, we were able to send JSON data to the MQTT broker and track the data to the PostgreSQL table.
We saw the results for the JSON just being inserted into the table with the json-sql
SmartModule.
Using SmartModule chaining with the jolt
and json-sql
SmartModules, we observed that the resulting JSON was successfully transformed.
We can choose on which side of a pipeline we wanted to transform our data without material impact to the result.