티스토리 뷰

mqtt 통신 시 Publisher, Subscriber의 동작을 살펴보고자 RabbitMQ를 이용하여 메시지 브로커 서버를 구축해보았다.

 

[환경]

Ubuntu 18.04, RabbitMQ 3.9.1, Erlang 24.0.5

 

1. RabbitMQ 환경구축

1) RabbitMQ 설치

해당 방법으로 설치 시 최신 버전이 설치되지 않으므로, 아래 추가된 방법으로 설치하는 것을 권장함.

$ apt-get update
$ apt install rabbitmq-server

// RabbitMQ 서비스가 자동으로 시작하도록 구성
$ systemctl enable rabbitmq-server

// RabbitMQ 서비스 구동
$ systemctl start rabbitmq-server

// 서비스 확인
$ netstat -tnlp | grep 5672
root@ubuntu:/home/ubuntu# netstat -tnlp | grep 5672
tcp        0      0 0.0.0.0:25672           0.0.0.0:*               LISTEN      3720/beam.smp       
tcp6       0      0 :::5672                 :::*                    LISTEN      3720/beam.smp

 

+) 추가 (Ubuntu 18.04에서 최신 버전의 Erlang과 RabbitMQ 설치하기)

참고: 데비안과 우분투에 설치 — RabbitMQ

// RabbitMQ 리포지토리 서명 키 추가
## Team RabbitMQ's main signing key
$ sudo apt-key adv --keyserver "hkps://keys.openpgp.org" --recv-keys "0x0A9AF2115F4687BD29803A206B73A36E6026DFCA"

## Launchpad PPA that provides modern Erlang releases
$ sudo apt-key adv --keyserver "keyserver.ubuntu.com" --recv-keys "F77F1EDA57EBB1CC"

## PackageCloud RabbitMQ repository
$ curl -1sLf 'https://packagecloud.io/rabbitmq/rabbitmq-server/gpgkey' | sudo apt-key add -

// 소스 목록 파일 추가
$ sudo tee /etc/apt/sources.list.d/rabbitmq.list <<EOF
## Provides modern Erlang/OTP releases
deb http://ppa.launchpad.net/rabbitmq/rabbitmq-erlang/ubuntu bionic main
deb-src http://ppa.launchpad.net/rabbitmq/rabbitmq-erlang/ubuntu bionic main

## Provides RabbitMQ
deb https://packagecloud.io/rabbitmq/rabbitmq-server/ubuntu/ bionic main
deb-src https://packagecloud.io/rabbitmq/rabbitmq-server/ubuntu/ bionic main
EOF

// 패키지 설치
$ sudo apt update -y

## Install Erlang packages
$ sudo apt-get install -y erlang-base \
                        erlang-asn1 erlang-crypto erlang-eldap erlang-ftp erlang-inets \
                        erlang-mnesia erlang-os-mon erlang-parsetools erlang-public-key \
                        erlang-runtime-tools erlang-snmp erlang-ssl \
                        erlang-syntax-tools erlang-tftp erlang-tools erlang-xmerl

## Install rabbitmq-server and its dependencies
$ sudo apt-get install rabbitmq-server -y --fix-missing

// RabbitMQ 서비스가 자동으로 시작하도록 구성
$ systemctl enable rabbitmq-server

// RabbitMQ 서비스 구동
$ systemctl start rabbitmq-server

// 서비스 확인
$ netstat -tnlp | grep 5672
root@ubuntu:/home/ubuntu# netstat -tnlp | grep 5672
tcp        0      0 0.0.0.0:25672           0.0.0.0:*               LISTEN      3720/beam.smp       
tcp6       0      0 :::5672                 :::*                    LISTEN      3720/beam.smp

 

2) RabbitMQ 관리자 계정 생성

// 계정 생성
$ rabbitmqctl add_user {id} {password}

// 생성한 계정에 관리자 권한 부여
$ rabbitmqctl set_user_tags {id} administrator

 

3) 관리용 플러그인 활성화

$ rabbitmq-plugins enable rabbitmq_management
$ service rabbitmq-server restart

 

4) 관리 페이지 접속 확인

 

2. MQTT 설정

1) MQTT 활성화

$ rabbitmq-plugins enable rabbitmq_mqtt
$ service rabbitmq-server restart

 

2) pika 설치

mqtt publish/subscribe 메시지를 생성할 python 모듈을 설치한다.

$ python --version
Python 2.7.17

// pip 설치
$ apt install python-pip

// pika 설치
$ pip install pika

 

3. pika를 이용한 publisher/consumer 테스트

1) publisher 코드 작성

import pika, os, logging
logging.basicConfig()

# Parse CPOUDMQP_URL (fallback to localhost)
url = os.environ.get('TEST_AMQP_URL', 'amqp://tester:tester@localhost/%2f')
params = pika.URLParameters(url)
params.socket_timeout = 5

connection = pika.BlockingConnection(params) # Connect to CloudAMQP
channel = connection.channel() # start a channel
channel.queue_declare(queue='test_queue') # Declare a queue

# send a message
channel.basic_publish(exchange="amq.direct", routing_key='test_key', body='User information')
print ("[x] Message sent to consumer")
connection.close()

 

2) consumer 코드 작성

import pika, os, time

def test_queue_function(msg):
    print(" [x] Received " + str(msg))

    time.sleep(5) # delays for 5 seconds
    print(" test queue finished")
    return;

# Access the CLODUAMQP_URL environment variable and parse it (fallback to localhost)
url = os.environ.get('TEST_AMQP_URL', 'amqp://test:test@localhost:5672/%2f')
params = pika.URLParameters(url)
connection = pika.BlockingConnection(params)
channel = connection.channel() # start a channel
channel.queue_declare(queue='test_queue') # Declare a queue
channel.queue_bind(queue='test_queue', exchange="amq.direct", routing_key='test_key') # set routing key

# create a function which is called on incoming messages
def callback(ch, method, properties, body):
    test_queue_function(body)

# set up subscription on the queue
channel.basic_consume('test_queue', callback, auto_ack=True)

# start consuming (blocks)
channel.start_consuming()
connection.close()

 

3) publisher , consumer 실행

// publisher 실행
$ python publisher.py

// consumer 실행
$ python consumer.py

 

 

연결되어 publisher에서 생성한 message를 consumer가 가져가는 것을 확인할 수 있다.

 

4) UI를 이용하여 publish 메시지 생성

- Exchanges에서 사용하고자 하는 exchanges를 클릭 (direct로 테스트)

 

- routing key를 입력하고 payload에 메시지를 작성한 뒤 'publish message' 버튼을 클릭

 

- 기존에 실행시켰던 consumer에서 생성한 publish 메시지를 받아오는 것을 확인할 수 있음

 

- consumer.py를 실행하지 않은 상태에서 publish 메시지 생성

 

- Queues에서 queue를 선택하여 클릭한 후 'Get Message'를 클릭하여 publish 메시지를 확인할 수 있음

publisher가 보낸 메세지가 queue에 저장이 되고, consumer가 queue에 저장된 메세지를 읽는 것을 확인하였다.

 

 

 

 

참고 사이트

http://Hybridego.net/ :: RabbitMQ 설치 테스트 해보기

 

 

 

 

 

댓글
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday