BigObject
Python Demo of RabbitMQ Topics Pub/Sub mode
RabbitMQ
Topics mode:
Publishers
can publish messages with different topics to MQ. A subscriber can
subscribe multiple topics of messages from MQ.
The
bo_emit_stooq.py program sends all instruments from BigObject, and the
bo_receive_stooq.py program only receives messages with 'ibm.us'
topic.
Environment
Python
Install and configure components¶
- Install the package:
- # apt-get install rabbitmq-server
- Add the remoteguest user:
- # sudo rabbitmqctl add_user remoteguest remoteguest
- Permit configuration, write, and read access for the remoteguest user:
- # sudo rabbitmqctl set_permissions remoteguest ".*" ".*" ".*"
# local connection can use the user guest/guest
#
but a remote connection needs to add a new user
remoteguest/remoteguest
#**************************
#
bo_receive_stooq.py
#**************************
#!/usr/bin/env
python
import
pika
import
sys
#localhost
#connection
=
pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))#localhost
#remote
server
parameters
=
pika.URLParameters("amqp://remoteguest:remoteguest@192.168.1.163/%2f")
connection
= pika.BlockingConnection(parameters)
channel
= connection.channel()
channel.exchange_declare(exchange='topic_logs',
type='topic')
result
= channel.queue_declare(exclusive=True)
queue_name
= result.method.queue
channel.queue_bind(exchange='topic_logs',
queue=queue_name,
routing_key='ibm.us')
print('
[*] Waiting for logs. To exit press CTRL+C')
def
callback(ch, method, properties, body):
print("
[x] %r:%r" % (method.routing_key, body))
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
#***********************
#
bo_emit_stooq.py
#***********************
#!/usr/bin/env
python
import
datetime
import
time
import
mysql.connector
import
random
import
pickle
import
os
from
time import sleep
import
pika
import
sys
#localhost
#connection
=
pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))#localhost
#remote
server
parameters
=
pika.URLParameters("amqp://remoteguest:remoteguest@192.168.1.163/%2f")
connection
= pika.BlockingConnection(parameters)
channel
= connection.channel()
channel.exchange_declare(exchange='topic_logs',
type='topic')
cnx
= mysql.connector.connect(user='scott',
password='tiger',host='192.168.1.163')
cursor
= cnx.cursor()
sql="select
rowid() as rowid,Date,Inst,Open,High,Low,Close from stooqseq"
cursor.execute(sql)
#
Fetch a single row using fetchone() method.
#data
= cursor.fetchone()
cnt=0
for
(rowid,Date,Inst,Open,High,Low,Close) in cursor:
if
cnt%10==0:
print
rowid,Date,Inst,Open,High,Low,Close
routing_key
= Inst.encode('ascii')
message
= str(rowid)+','+
Date+','+Inst+','+str(Open)+','+str(High)+','+str(Low)+','+str(Close)
message=message.encode('ascii')
channel.basic_publish(exchange='topic_logs',
routing_key=routing_key,
body=message)
#print("
[x] Sent %r:%r" % (routing_key, message))
cnt=cnt+1
#sleep(0.1)
#
Make sure data is committed to the database
cnx.commit()
cursor.close()
cnx.close()
connection.close()