@@ -36,6 +36,7 @@ dependencies { | |||||
implementation group: 'org.apache.tika', name: 'tika-core', version: '2.4.1' | implementation group: 'org.apache.tika', name: 'tika-core', version: '2.4.1' | ||||
implementation group: 'org.springframework.integration', name: 'spring-integration-mqtt', version: '6.0.2' | implementation group: 'org.springframework.integration', name: 'spring-integration-mqtt', version: '6.0.2' | ||||
implementation 'net.sf.jasperreports:jasperreports:6.19.1' | implementation 'net.sf.jasperreports:jasperreports:6.19.1' | ||||
implementation group: 'com.google.code.gson', name: 'gson', version: '2.10.1' | |||||
implementation group: 'org.springdoc', name: 'springdoc-openapi-ui', version: '1.6.9' | implementation group: 'org.springdoc', name: 'springdoc-openapi-ui', version: '1.6.9' | ||||
@@ -4,7 +4,13 @@ import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; | |||||
import org.eclipse.paho.client.mqttv3.MqttCallback; | import org.eclipse.paho.client.mqttv3.MqttCallback; | ||||
import org.eclipse.paho.client.mqttv3.MqttMessage; | import org.eclipse.paho.client.mqttv3.MqttMessage; | ||||
import com.ffii.towngas.pojo.MqttRawData; | |||||
import com.google.gson.Gson; | |||||
public class MqttConsumerCallBack implements MqttCallback { | public class MqttConsumerCallBack implements MqttCallback { | ||||
private Gson gson = new Gson(); | |||||
@Override | @Override | ||||
public void connectionLost(Throwable throwable) { | public void connectionLost(Throwable throwable) { | ||||
System.out.println("Disconnected with server"); | System.out.println("Disconnected with server"); | ||||
@@ -12,10 +18,14 @@ public class MqttConsumerCallBack implements MqttCallback { | |||||
@Override | @Override | ||||
public void messageArrived(String topic, MqttMessage message) throws Exception { | public void messageArrived(String topic, MqttMessage message) throws Exception { | ||||
System.out.println(String.format("Topic: %s", topic)); | |||||
System.out.println(String.format("Qos : %d", message.getQos())); | |||||
System.out.println(String.format("Content: %s", new String(message.getPayload()))); | |||||
System.out.println(String.format("Retained : %b", message.isRetained())); | |||||
String payload = new String(message.getPayload()); | |||||
MqttRawData mqttRawData = gson.fromJson(payload, MqttRawData.class); | |||||
mqttRawData.setTopic(topic); | |||||
mqttRawData.setQos(message.getQos()); | |||||
mqttRawData.setPayload(payload); | |||||
System.out.println(mqttRawData); | |||||
MqttDataWriter.insertMqttRawData(mqttRawData); | |||||
} | } | ||||
@Override | @Override | ||||
@@ -0,0 +1,28 @@ | |||||
package com.ffii.towngas.config; | |||||
import org.springframework.stereotype.Component; | |||||
import com.ffii.core.support.JdbcDao; | |||||
import com.ffii.towngas.pojo.MqttRawData; | |||||
@Component | |||||
public class MqttDataWriter { | |||||
private static JdbcDao jdbcDao; | |||||
private static final String INSERT_SQL = "INSERT INTO mqtt_source_data (topic, qos, payload, customerId, facilityId, equipmentType, equipmentId, gasRating, pilot, `timestamp`) VALUES(:topic, :qos, :payload, :customerId, :facilityId, :equipmentType, :equipmentId, :gasRating, :pilot, :timestamp);"; | |||||
public MqttDataWriter(JdbcDao jdbcDao) { | |||||
this.jdbcDao = jdbcDao; | |||||
} | |||||
public static Integer insertMqttRawData(MqttRawData mqttRawData) { | |||||
try { | |||||
return jdbcDao.executeUpdate(INSERT_SQL, mqttRawData.toMap()); | |||||
} catch (Exception e) { | |||||
e.printStackTrace(); | |||||
return null; | |||||
} | |||||
} | |||||
} |
@@ -0,0 +1,121 @@ | |||||
package com.ffii.towngas.pojo; | |||||
import java.util.HashMap; | |||||
import java.util.Map; | |||||
public class MqttRawData { | |||||
private String topic; | |||||
private Integer qos; | |||||
private String payload; | |||||
private String customerID; | |||||
private String facilityID; | |||||
private String equipmentType; | |||||
private String equipmentID; | |||||
private Integer gasrating; | |||||
private Integer pilot; | |||||
private Long timestamp; | |||||
public String getTopic() { | |||||
return topic; | |||||
} | |||||
public void setTopic(String topic) { | |||||
this.topic = topic; | |||||
} | |||||
public Integer getQos() { | |||||
return qos; | |||||
} | |||||
public void setQos(Integer qos) { | |||||
this.qos = qos; | |||||
} | |||||
public String getPayload() { | |||||
return payload; | |||||
} | |||||
public void setPayload(String payload) { | |||||
this.payload = payload; | |||||
} | |||||
public String getCustomerID() { | |||||
return customerID; | |||||
} | |||||
public void setCustomerID(String customerID) { | |||||
this.customerID = customerID; | |||||
} | |||||
public String getFacilityID() { | |||||
return facilityID; | |||||
} | |||||
public void setFacilityID(String facilityID) { | |||||
this.facilityID = facilityID; | |||||
} | |||||
public String getEquipmentType() { | |||||
return equipmentType; | |||||
} | |||||
public void setEquipmentType(String equipmentType) { | |||||
this.equipmentType = equipmentType; | |||||
} | |||||
public String getEquipmentID() { | |||||
return equipmentID; | |||||
} | |||||
public void setEquipmentID(String equipmentID) { | |||||
this.equipmentID = equipmentID; | |||||
} | |||||
public Integer getGasrating() { | |||||
return gasrating; | |||||
} | |||||
public void setGasrating(Integer gasrating) { | |||||
this.gasrating = gasrating; | |||||
} | |||||
public Integer getPilot() { | |||||
return pilot; | |||||
} | |||||
public void setPilot(Integer pilot) { | |||||
this.pilot = pilot; | |||||
} | |||||
public Long getTimestamp() { | |||||
return timestamp; | |||||
} | |||||
public void setTimestamp(Long timestamp) { | |||||
this.timestamp = timestamp; | |||||
} | |||||
@Override | |||||
public String toString() { | |||||
return "MqttRawData [topic=" + topic + ", qos=" + qos + ", payload=" + payload + ", customerID=" + customerID | |||||
+ ", facilityID=" + facilityID + ", equipmentType=" + equipmentType + ", equipmentID=" + equipmentID | |||||
+ ", gasrating=" + gasrating + ", pilot=" + pilot + ", timestamp=" + timestamp + "]"; | |||||
} | |||||
public Map<String, Object> toMap() { | |||||
Map<String, Object> map = new HashMap<>(); | |||||
map.put("topic", topic); | |||||
map.put("qos", qos); | |||||
map.put("payload", payload); | |||||
map.put("customerId", customerID); | |||||
map.put("facilityId", facilityID); | |||||
map.put("equipmentType", equipmentType); | |||||
map.put("equipmentId", equipmentID); | |||||
map.put("gasRating", gasrating); | |||||
map.put("pilot", pilot); | |||||
map.put("timestamp", timestamp); | |||||
return map; | |||||
} | |||||
} |
@@ -0,0 +1,28 @@ | |||||
--liquibase formatted sql | |||||
--changeset matthew:create_mqtt_table | |||||
CREATE TABLE `mqtt_source_data` ( | |||||
`id` int(11) NOT NULL AUTO_INCREMENT, | |||||
`topic` varchar(30) DEFAULT NULL, | |||||
`qos` int(11) NOT NULL DEFAULT '0', | |||||
`payload` varchar(1000) DEFAULT NULL, | |||||
PRIMARY KEY (`id`) | |||||
); | |||||
--changeset matthew:create_mqtt_tablev2 | |||||
DROP TABLE IF EXISTS `mqtt_source_data`; | |||||
CREATE TABLE `mqtt_source_data` ( | |||||
`id` int(11) NOT NULL AUTO_INCREMENT, | |||||
`created` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP, | |||||
`topic` varchar(30) DEFAULT NULL, | |||||
`qos` int(11) NOT NULL DEFAULT '0', | |||||
`payload` varchar(1000) DEFAULT NULL, | |||||
`customerId` varchar(30) DEFAULT NULL, | |||||
`facilityId` varchar(30) DEFAULT NULL, | |||||
`equipmentType` varchar(30) DEFAULT NULL, | |||||
`equipmentId` varchar(30) DEFAULT NULL, | |||||
`gasRating` FLOAT DEFAULT NULL, | |||||
`pilot` FLOAT DEFAULT NULL, | |||||
`timestamp` BIGINT DEFAULT NULL, | |||||
PRIMARY KEY (`id`) | |||||
); |