Hi, today we are going to learn about RabbitMQ, a message broker software.
What is Message Borker?
Message broker is an intermediary program module that translates a message from the formal messaging protocol of the sender to the formal messaging protocol of the receive
– source wikipedia
It act as an intermediate between sender and receiver.
What is RabbitMQ?
RabbitMQ is an open source message broker software which implements Advanced Message Queuing Protocol (AMQP). It can act as pub-sub, routing and queuing system.
RabbitMQ Message Flow
Producer: Creates a message to send it to exchange
Broker: It consist of three parts
- Exchange: Receive published messages and send it to appropriate queue on the basis of routing key.
- Binding: It is a link between queue and exchange
- Routing key: It helps in selecting queue i.e. which message is will be send to which queue.
Consumer: Consumer consumes the message from the queue they are bind.
Installation
Development Environment
- Ubuntu 14.04 LTS
- Python 2.7
- PIP (python package manager)
Follow the steps to start using RabbitMQ
# Create Directory and enter it
1 2 3 4 |
// create directory mkdir rmq // enter directory cd ./rmq |
# Create and activate virtual environment
Don’t know how to install virtual environment, check the official doc at docs.python-guide.org.
1 2 3 4 |
// create virtual environment venv virtualenv venv // activate virtual environment source venv/bin/activate |
Download requirements.txt file to install all packages in your virtual environment using following command or you can go step by step with the tutorial.
1 2 |
// put requirements.txt in your root folder(rmq) pip install -r requirements.txt |
# Install RabbitMQ
1 |
sudo apt-get install rabbitmq-server |
Once you install rabbitmq server, it will starts running automatically
# Enable RabbiMQ management plugin
1 |
rabbitmq-plugins enable rabbitmq_management |
It will give you a WebUI to manage and monitor rabbitmq. By default it runs on port 15672, you can access it in your browser by going to url http://127.0.0.1:15672/. Default username and password for WebUI is guest and guest respectively.
# Install pika
Pika is a pure-Python implementation of the AMQP 0-9-1 protocol that tries to stay fairly independent of the underlying network support library.
– source python.org
1 |
pip install pika |
# Create Producer (message.py)
Producer sends a string message to “messages” queue.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
import pika # creating connection with rabbitmq-server at localhost connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() # setting queue to insert the message channel.queue_declare(queue='messages', durable=True) # message string message = "HI we are learning rabbitmq" # sending message to rabbitmq with a routing key channel.basic_publish(exchange='', routing_key='messages', body=message, properties=pika.BasicProperties( delivery_mode = 2, )) print "######\nSending new message\n######" print message connection.close() |
# Create Consumer (worker.py)
Consumer will receive message and process it. Here, consume is listening on “messages” queue.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='messages', durable=True) print('>> Waiting for messages') def callback(ch, method, properties, body): print(">> Received message " % body) ch.basic_ack(delivery_tag = method.delivery_tag) # prefetch+count = 1 tells consumer to recieve 1 message at a time channel.basic_qos(prefetch_count=1) # teling consumer to consume message from messages queue and then send a callback for further operations channel.basic_consume(callback, queue='messages') # consuming queue starts channel.start_consuming() |
# Run Producer and Consumer
Run producer and consumer in two different terminal in order to see sending and receiving of messages simultaneously.
1 2 |
// producer running in terminal 1 python ./message.py |
When we run message.py, message got inserted into queue but since there is no consumer listening on that queue therefor it shows one message is ready for publishing as “Ready” equals to 1.
Click on messages in above UI and we can get stats about individual queues. As you can see 1 message which we sent earlier is still in queue.
# Start consumer
1 2 |
// consumer running in terminal 2 python ./worker.py |
As soon as we start our consumer worker.py, it will receive message from queue and now you can see ready count is 0.
Sending Multiple Messages
Create a loop to send 10 bulk message in queue.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='messages', durable=True) for i in range(1, 10): message = "This is message "+ str(i) channel.basic_publish(exchange='', routing_key='messages', body=message, properties=pika.BasicProperties( delivery_mode = 2, )) print "Sending Message >> "+message connection.close() |
Now start two instances of your worker.py
When you start two instances of worker.py, you can see message got evenly distributed on the basis of Round Robin Algorithm.Also, same message is not delivered to multiple workers.
It’s a simple example, now we send dict as message.
Create a new file dict-message.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
import pika import json connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='dict_messages', durable=True) for i in range(1, 10): message = { 'id': i, 'msg': 'message '+ str(i) } channel.basic_publish(exchange='', routing_key='dict_messages', body=json.dumps(message), # to send dict as a message in json properties=pika.BasicProperties( delivery_mode = 2, )) print "Sending Message >> " + str(message) connection.close() |
In order to read message now, we have to change our worker a little bit.
Create a new file dict-message-worker.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
import pika import json connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='dict_messages', durable=True) for i in range(1, 10): message = { 'id': i, 'msg': 'message '+ str(i) } channel.basic_publish(exchange='', routing_key='dict_messages', body=json.dumps(message), # to send dict as a message in json properties=pika.BasicProperties( delivery_mode = 2, )) print "Sending Message >> " + str(message) connection.close() |
We use json to send and receive message. As you can see when we send dict as message in a new queue “dict_messages” , consumer listening to that queue start receiving message instantly.
Now you have fair amount of knowledge of RabbiMQ you can use it in any appropriate project like for send notification, storing logs, sending bulk emails and many more.
For further reading check rabbitmq official docs at rabbitmq.com.
That’s all for today, hope you enjoy and do try at home.
Download source code Up-and-Running-with-RabbitMQ-A-Message-Broker-tusharsharma.in.zip