From 4ad8e899f541b72f0e4b813090d018332f784dcc Mon Sep 17 00:00:00 2001 From: "matthew.ng" Date: Tue, 14 Mar 2023 17:43:31 +0800 Subject: [PATCH] write data into db --- build.gradle | 1 + .../towngas/config/MqttConsumerCallBack.java | 18 ++- .../ffii/towngas/config/MqttDataWriter.java | 28 ++++ .../com/ffii/towngas/pojo/MqttRawData.java | 121 ++++++++++++++++++ .../01_create_mqtt_table.sql | 28 ++++ 5 files changed, 192 insertions(+), 4 deletions(-) create mode 100644 src/main/java/com/ffii/towngas/config/MqttDataWriter.java create mode 100644 src/main/java/com/ffii/towngas/pojo/MqttRawData.java create mode 100644 src/main/resources/db/changelog/changes/20220314_01_matthew/01_create_mqtt_table.sql diff --git a/build.gradle b/build.gradle index 2c4a18f..bc5664e 100644 --- a/build.gradle +++ b/build.gradle @@ -36,6 +36,7 @@ dependencies { implementation group: 'org.apache.tika', name: 'tika-core', version: '2.4.1' implementation group: 'org.springframework.integration', name: 'spring-integration-mqtt', version: '6.0.2' implementation 'net.sf.jasperreports:jasperreports:6.19.1' + implementation group: 'com.google.code.gson', name: 'gson', version: '2.10.1' implementation group: 'org.springdoc', name: 'springdoc-openapi-ui', version: '1.6.9' diff --git a/src/main/java/com/ffii/towngas/config/MqttConsumerCallBack.java b/src/main/java/com/ffii/towngas/config/MqttConsumerCallBack.java index d4815a1..ea3628d 100644 --- a/src/main/java/com/ffii/towngas/config/MqttConsumerCallBack.java +++ b/src/main/java/com/ffii/towngas/config/MqttConsumerCallBack.java @@ -4,7 +4,13 @@ import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage; +import com.ffii.towngas.pojo.MqttRawData; +import com.google.gson.Gson; + public class MqttConsumerCallBack implements MqttCallback { + + private Gson gson = new Gson(); + @Override public void connectionLost(Throwable throwable) { System.out.println("Disconnected with server"); @@ -12,10 +18,14 @@ public class MqttConsumerCallBack implements MqttCallback { @Override public void messageArrived(String topic, MqttMessage message) throws Exception { - System.out.println(String.format("Topic: %s", topic)); - System.out.println(String.format("Qos : %d", message.getQos())); - System.out.println(String.format("Content: %s", new String(message.getPayload()))); - System.out.println(String.format("Retained : %b", message.isRetained())); + String payload = new String(message.getPayload()); + + MqttRawData mqttRawData = gson.fromJson(payload, MqttRawData.class); + mqttRawData.setTopic(topic); + mqttRawData.setQos(message.getQos()); + mqttRawData.setPayload(payload); + System.out.println(mqttRawData); + MqttDataWriter.insertMqttRawData(mqttRawData); } @Override diff --git a/src/main/java/com/ffii/towngas/config/MqttDataWriter.java b/src/main/java/com/ffii/towngas/config/MqttDataWriter.java new file mode 100644 index 0000000..9f756e8 --- /dev/null +++ b/src/main/java/com/ffii/towngas/config/MqttDataWriter.java @@ -0,0 +1,28 @@ +package com.ffii.towngas.config; + +import org.springframework.stereotype.Component; + +import com.ffii.core.support.JdbcDao; +import com.ffii.towngas.pojo.MqttRawData; + +@Component +public class MqttDataWriter { + + private static JdbcDao jdbcDao; + private static final String INSERT_SQL = "INSERT INTO mqtt_source_data (topic, qos, payload, customerId, facilityId, equipmentType, equipmentId, gasRating, pilot, `timestamp`) VALUES(:topic, :qos, :payload, :customerId, :facilityId, :equipmentType, :equipmentId, :gasRating, :pilot, :timestamp);"; + + public MqttDataWriter(JdbcDao jdbcDao) { + this.jdbcDao = jdbcDao; + } + + public static Integer insertMqttRawData(MqttRawData mqttRawData) { + try { + return jdbcDao.executeUpdate(INSERT_SQL, mqttRawData.toMap()); + } catch (Exception e) { + e.printStackTrace(); + return null; + } + + } + +} diff --git a/src/main/java/com/ffii/towngas/pojo/MqttRawData.java b/src/main/java/com/ffii/towngas/pojo/MqttRawData.java new file mode 100644 index 0000000..0786d51 --- /dev/null +++ b/src/main/java/com/ffii/towngas/pojo/MqttRawData.java @@ -0,0 +1,121 @@ +package com.ffii.towngas.pojo; + +import java.util.HashMap; +import java.util.Map; + +public class MqttRawData { + + private String topic; + private Integer qos; + private String payload; + private String customerID; + private String facilityID; + private String equipmentType; + private String equipmentID; + private Integer gasrating; + private Integer pilot; + private Long timestamp; + + public String getTopic() { + return topic; + } + + public void setTopic(String topic) { + this.topic = topic; + } + + public Integer getQos() { + return qos; + } + + public void setQos(Integer qos) { + this.qos = qos; + } + + public String getPayload() { + return payload; + } + + public void setPayload(String payload) { + this.payload = payload; + } + + public String getCustomerID() { + return customerID; + } + + public void setCustomerID(String customerID) { + this.customerID = customerID; + } + + public String getFacilityID() { + return facilityID; + } + + public void setFacilityID(String facilityID) { + this.facilityID = facilityID; + } + + public String getEquipmentType() { + return equipmentType; + } + + public void setEquipmentType(String equipmentType) { + this.equipmentType = equipmentType; + } + + public String getEquipmentID() { + return equipmentID; + } + + public void setEquipmentID(String equipmentID) { + this.equipmentID = equipmentID; + } + + public Integer getGasrating() { + return gasrating; + } + + public void setGasrating(Integer gasrating) { + this.gasrating = gasrating; + } + + public Integer getPilot() { + return pilot; + } + + public void setPilot(Integer pilot) { + this.pilot = pilot; + } + + public Long getTimestamp() { + return timestamp; + } + + public void setTimestamp(Long timestamp) { + this.timestamp = timestamp; + } + + @Override + public String toString() { + return "MqttRawData [topic=" + topic + ", qos=" + qos + ", payload=" + payload + ", customerID=" + customerID + + ", facilityID=" + facilityID + ", equipmentType=" + equipmentType + ", equipmentID=" + equipmentID + + ", gasrating=" + gasrating + ", pilot=" + pilot + ", timestamp=" + timestamp + "]"; + } + + public Map toMap() { + Map map = new HashMap<>(); + map.put("topic", topic); + map.put("qos", qos); + map.put("payload", payload); + map.put("customerId", customerID); + map.put("facilityId", facilityID); + map.put("equipmentType", equipmentType); + map.put("equipmentId", equipmentID); + map.put("gasRating", gasrating); + map.put("pilot", pilot); + map.put("timestamp", timestamp); + return map; + } + +} diff --git a/src/main/resources/db/changelog/changes/20220314_01_matthew/01_create_mqtt_table.sql b/src/main/resources/db/changelog/changes/20220314_01_matthew/01_create_mqtt_table.sql new file mode 100644 index 0000000..bec55df --- /dev/null +++ b/src/main/resources/db/changelog/changes/20220314_01_matthew/01_create_mqtt_table.sql @@ -0,0 +1,28 @@ +--liquibase formatted sql + +--changeset matthew:create_mqtt_table +CREATE TABLE `mqtt_source_data` ( + `id` int(11) NOT NULL AUTO_INCREMENT, + `topic` varchar(30) DEFAULT NULL, + `qos` int(11) NOT NULL DEFAULT '0', + `payload` varchar(1000) DEFAULT NULL, + PRIMARY KEY (`id`) +); + +--changeset matthew:create_mqtt_tablev2 +DROP TABLE IF EXISTS `mqtt_source_data`; +CREATE TABLE `mqtt_source_data` ( + `id` int(11) NOT NULL AUTO_INCREMENT, + `created` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP, + `topic` varchar(30) DEFAULT NULL, + `qos` int(11) NOT NULL DEFAULT '0', + `payload` varchar(1000) DEFAULT NULL, + `customerId` varchar(30) DEFAULT NULL, + `facilityId` varchar(30) DEFAULT NULL, + `equipmentType` varchar(30) DEFAULT NULL, + `equipmentId` varchar(30) DEFAULT NULL, + `gasRating` FLOAT DEFAULT NULL, + `pilot` FLOAT DEFAULT NULL, + `timestamp` BIGINT DEFAULT NULL, + PRIMARY KEY (`id`) +);