카테고리 없음
mqtt
Ynghan
2023. 6. 30. 11:49
MyMqtt_Pub_Client
package basic;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
/**
* MQTT 통신을 통해서 메시지를 broker로 전송하기 위한 객체
* 1. broker에 접속
* 2. broker에 메시지를 전송
*/
public class MyMqtt_Pub_Client {
//MQTT통신에서 Publisher와 Subscriber의 역할을 하기 위한 기능을 가지고 있는 객체
private MqttClient client;
public MyMqtt_Pub_Client() throws MqttException {
//broker와 MQTT 통신을 하며 메시지를 전송할 클라이언트 객체를 만들고 접속
client = new MqttClient("tcp://192.168.63.95:1883", "myid");
client.connect(); //broker 접속
}
//메시지 전송을 위한 메소드
public boolean send(String topic, String msg) throws MqttException {
//broker로 전송할 메시지를 생성 - MqttMessage
MqttMessage message = new MqttMessage();
message.setPayload(msg.getBytes()); //실제 broker로 전송할 메시지
client.publish(topic, message);
return true;
}
//종료
public void close() throws MqttException {
if(client != null) {
client.disconnect();
client.close();
}
}
public static void main(String[] args) throws MqttException {
MyMqtt_Pub_Client sender = new MyMqtt_Pub_Client();
new Thread(new Runnable() {
@Override
public void run() {
int i = 1;
String msg="";
while(true) {
if(i == 5) {
break;
} else {
if(i%2 == 1) {
msg = "t:24";
} else {
msg = "error";
}
}
try {
sender.send("iot", msg);
} catch (MqttException e) {
throw new RuntimeException(e);
}
i++;
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
try {
sender.close();//작업완료되면 종료하기
} catch (MqttException e) {
throw new RuntimeException(e);
}
}
}
).start();
}
}
MyMqtt_Sub_Client
package basic;
import org.eclipse.paho.client.mqttv3.*;
/**
* MQTT 클라이언트 작성 - broker에 메시지를 전달받기 위해 구독신청을 하고 대기하는 객체
* 1. MqttCallback인터페이스를 상속
* 2. MqttCallback인터페이스의 abstract메소드를 오버라이딩
*/
public class MyMqtt_Sub_Client implements MqttCallback {
//broker와 통신하는 역할 - subscriber, publishier의 역할
private MqttClient mqttclient;
//MQTT프로토콜을 이용해서 broker에 연결하면서 연결정보를 설정할 수 있는 객체
private MqttConnectOptions mqttOption;
//clientId는 broker가 클라이언트를 식별하기 위한 문자열 - 고유
public MyMqtt_Sub_Client init(String server, String clientId) throws MqttException {
mqttOption = new MqttConnectOptions();
mqttOption.setCleanSession(true);
mqttOption.setKeepAliveInterval(30);
//broker에 subscribe하기 위한 클라이언트객체 생성
mqttclient = new MqttClient(server, clientId);
//클라이언트객체에 MqttCallback을 등록 - 구독신청 후 적절한 시점에 처리하고 싶은 기능을 구현하고
//메소드가 자동으로 그 시점에 호출되도록 할 수 있다.
mqttclient.setCallback(this);
mqttclient.connect(mqttOption);
return this;
}
//커넥션이 종료되면 호출 - 통신오류로 연결이 끊어지는 경우 호출
@Override
public void connectionLost(Throwable throwable) {
}
//메시지가 도착하면 호출되는 메소드
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
System.out.println("==================메시지도착==================");
System.out.println(message);
System.out.println("topic:" + topic + ", id:" + message.getId()
+ ", payload:" + new String(message.getPayload()));
}
//메시지의 배달이 완료되면 호출
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
}
//구독 신청
public boolean subscribe(String topic) {
boolean result = true;
try {
if(topic != null) {
//topic과 Qos를 전달
//Qos는 메시지가 도착하기 위한 품질에 값을 설정 - 서비스 품질
//0,1,2를 설정
mqttclient.subscribe(topic, 0);
}
} catch (MqttException e) {
e.printStackTrace();
result = false;
}
return result;
}
public static void main(String[] args) throws MqttException {
MyMqtt_Sub_Client subobj = new MyMqtt_Sub_Client();
subobj.init("tcp://192.168.63.95:1883", "myid2").subscribe("iot");
}
}
MyMqtt_Sub_Client