Ynghan 2023. 6. 30. 11:49

MyMqtt_Pub_Client

package basic;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;

/**
 * MQTT 통신을 통해서 메시지를 broker로 전송하기 위한 객체
 * 1. broker에 접속
 * 2. broker에 메시지를 전송
 */
public class MyMqtt_Pub_Client {
    //MQTT통신에서 Publisher와 Subscriber의 역할을 하기 위한 기능을 가지고 있는 객체
    private MqttClient client;
    public MyMqtt_Pub_Client() throws MqttException {

            //broker와 MQTT 통신을 하며 메시지를 전송할 클라이언트 객체를 만들고 접속
            client = new MqttClient("tcp://192.168.63.95:1883", "myid");
            client.connect(); //broker 접속


    }

    //메시지 전송을 위한 메소드
    public boolean send(String topic, String msg) throws MqttException {
        //broker로 전송할 메시지를 생성 - MqttMessage
        MqttMessage message = new MqttMessage();
        message.setPayload(msg.getBytes()); //실제 broker로 전송할 메시지
        client.publish(topic, message);
        return true;
    }

    //종료
    public void close() throws MqttException {
        if(client != null) {
            client.disconnect();
            client.close();
        }
    }

    public static void main(String[] args) throws MqttException {
        MyMqtt_Pub_Client sender = new MyMqtt_Pub_Client();
        new Thread(new Runnable() {
                @Override
                public void run() {
                    int i = 1;
                    String msg="";
                    while(true) {
                        if(i == 5) {
                            break;
                        } else {
                            if(i%2 == 1) {
                                msg = "t:24";
                            } else {
                                msg = "error";
                            }
                        }
                        try {
                            sender.send("iot", msg);
                        } catch (MqttException e) {
                            throw new RuntimeException(e);
                        }
                        i++;
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            throw new RuntimeException(e);
                        }
                    }

                    try {
                        sender.close();//작업완료되면 종료하기
                    } catch (MqttException e) {
                        throw new RuntimeException(e);
                    }
                }
            }
        ).start();
    }
}

MyMqtt_Sub_Client

package basic;

import org.eclipse.paho.client.mqttv3.*;

/**
 * MQTT 클라이언트 작성 - broker에 메시지를 전달받기 위해 구독신청을 하고 대기하는 객체
 * 1. MqttCallback인터페이스를 상속
 * 2. MqttCallback인터페이스의 abstract메소드를 오버라이딩
 */
public class MyMqtt_Sub_Client implements MqttCallback {

    //broker와 통신하는 역할 - subscriber, publishier의 역할
    private MqttClient mqttclient;
    //MQTT프로토콜을 이용해서 broker에 연결하면서 연결정보를 설정할 수 있는 객체
    private MqttConnectOptions mqttOption;

    //clientId는 broker가 클라이언트를 식별하기 위한 문자열 - 고유
    public MyMqtt_Sub_Client init(String server, String clientId) throws MqttException {
        mqttOption = new MqttConnectOptions();
        mqttOption.setCleanSession(true);
        mqttOption.setKeepAliveInterval(30);
        //broker에 subscribe하기 위한 클라이언트객체 생성
        mqttclient = new MqttClient(server, clientId);
        //클라이언트객체에 MqttCallback을 등록 - 구독신청 후 적절한 시점에 처리하고 싶은 기능을 구현하고
        //메소드가 자동으로 그 시점에 호출되도록 할 수 있다.
        mqttclient.setCallback(this);
        mqttclient.connect(mqttOption);

        return this;
    }

    //커넥션이 종료되면 호출 - 통신오류로 연결이 끊어지는 경우 호출
    @Override
    public void connectionLost(Throwable throwable) {

    }

    //메시지가 도착하면 호출되는 메소드
    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        System.out.println("==================메시지도착==================");
        System.out.println(message);
        System.out.println("topic:" + topic + ", id:" + message.getId()
                + ", payload:" + new String(message.getPayload()));
    }

    //메시지의 배달이 완료되면 호출
    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {

    }

    //구독 신청
    public boolean subscribe(String topic) {

        boolean result = true;
        try {
            if(topic != null) {
                //topic과 Qos를 전달
                //Qos는 메시지가 도착하기 위한 품질에 값을 설정 - 서비스 품질
                //0,1,2를 설정
                mqttclient.subscribe(topic, 0);
            }
        } catch (MqttException e) {
            e.printStackTrace();
            result = false;
        }
        return result;
    }



    public static void main(String[] args) throws MqttException {
        MyMqtt_Sub_Client subobj = new MyMqtt_Sub_Client();
        subobj.init("tcp://192.168.63.95:1883", "myid2").subscribe("iot");
    }
}

MyMqtt_Sub_Client