AMQP(RabbitMQ) in Ignition

Hi,

I’ve been trying to use AMQP in Ignition, using the amqp-client-5.14.2.jar library.
For now, I’ve just added the library to the folder in Ignition, maybe later, I will build a separate module for it.

I have used the following code:

from com.rabbitmq.client import Connection
from com.rabbitmq.client import Channel
from com.rabbitmq.client import Consumer
from com.rabbitmq.client import DefaultConsumer
from com.rabbitmq.client import ConnectionFactory

userName = 'guest'
password = 'guest' 
virtualHost = "/"
hostName = "rabbitmq"
portNumber = 5672  #15672

exchangeName = 'EXCH'
queueName = 'Ignition'
routingKey = 'Ignition'

def connect():
	factory = ConnectionFactory()
	#"guest"/"guest" by default, limited to localhost connections
	factory.setUsername(userName)
	factory.setPassword(password)
	factory.setVirtualHost(virtualHost)
	factory.setHost(hostName)
	factory.setPort(portNumber)
	
	conn = factory.newConnection()
	channel = conn.createChannel()
	
	channel.queueBind(queueName, exchangeName, routingKey)
	autoAck = False
	channel.basicConsume(queueName, autoAck, "Ignition_Consumer", DefaultConsumer(channel))
	

This gives me a connection to the broker:

Now I lack a bit of knowledge to proceed. Everytime I run the script, I logically get a new connection.

In the same script, I have this function:

def close():
	system.perspective.print('closing connection ...')
	channel.close()
	conn.close()

But this function is not connected anymore to the connection I first created, so it gives an error.

How do I make sure there is always one connection from my platform to the AMQP broker?
And how do I manage that active connection without ?
I assume I need something like persistent classes? @pturmel, I’ve seen you talk allot about classes an persistent classes, would you know how I can proceed?

Thanks!

I’d definitely recommend doing some digging on the forums for “prior art” here. One other thing to note: system.util.globals / system.util.getGlobals() is a dictionary that persists across the lifetime of the local JVM (so as long as the gateway is up).

I did do some digging here

and here

But I think you guys are a few levels ahead of me.
I simply want one always open connection (I need a class for this I think) when Ignition is running and want to speak to this connection when necessary.
How can I use system.util.getGlobals() to store a class? Is there any example on this?

And also, I'm a bit confused about system.util.getGlobals(). Are these globals something on your gateway? Because I see they are different when I open a new designer session.

*Edit: Ah, it is the same when you run it in a gateway scope, I see... Now I just need to find how I can store my connections in it.

The point of those topics is that you CAN’T just “store a class”. You will leak gobs of gateway memory if you try to do it naively. On this topic, you will simply have to study and test until you understand it. I strongly recommend testing on a non-production gateway.

Oh, ok… So referring back to my original code:

conn = factory.newConnection()

How would I typically refer back to a connection made in a script? Would the connection have some identifier, which I would need to store in globals and use later?

Ofcourse, I’m testing all new things in local containers.

We are heading down the same path. Currently working on a module which wraps the latest AMQP Client library.
You can reference the same global variables within each of your functions:

def close():
	global conn			// Added
	global channel		// Added
	system.perspective.print('closing connection ...')
	channel.close()
	conn.close()

Have you made progress on a ‘unique connection identifier’?
Found this related StackOverflow thread:

Hi Chris,

Yes I did. I store my connection in getGlobals, and this seems to work across sessions.

def connect():
	factory = ConnectionFactory()
	factory.setUsername(userName)
	factory.setPassword(password)
	factory.setVirtualHost(virtualHost)
	factory.setHost(hostName)
	factory.setPort(portNumber)
	g = system.util.getGlobals()
	if not "AMQP" in g:
		g["AMQP"] = {}
	gAMQP = g["AMQP"]
	if not "conn" in gAMQP:
		conn = factory.newConnection()
		gAMQP["conn"] = conn
	else:
		conn = gAMQP["conn"]
	if not "channel" in gAMQP:
		channel = conn.createChannel()
		gAMQP["channel"] = channel
		channel.queueBind(queueName, exchangeName, routingKey)
		#with basicConsume, we become a consumer, so we will be taking all messages from the queue and process it
		#Keep in mind this is pub/sub, but will automatically read all messages in queue on connect
		tag = channel.basicConsume(queueName, autoAck, newMessage, cancel, shutdown)
		gAMQP["consumerTag"] = tag
	else:
		channel = gAMQP["channel"]

Yes, we will be working on an AMQP module as well, somewhere in the future.

Racy. Try this:

import thread

# Returns channel.  If was a new channel, consumerTag is created too.
def conditionalConnect():
	g = system.util.getGlobals()
	# Ensure gAMQP exists in a thread-safe manner
	gAMQP = g.setdefault("AMQP", {})
	# Attempt to get the channel unlocked for speed
	channel = gAMQP.get("channel")
	if channel:
		return channel
	# Obtain and use lock
	channelLock = _g.get('channelLock', None)
	if channelLock is None:
		channelLock = _g.setdefault('channelLock', thread.allocate_lock())
	with channelLock:
		# Retry under the lock in case another thread was already in the
		# creation sequence.
		channel = gAMQP.get("channel")
		if channel:
			return channel
		# Nope. Connect and create and consume.
		factory = ConnectionFactory()
		factory.setUsername(userName)
		factory.setPassword(password)
		factory.setVirtualHost(virtualHost)
		factory.setHost(hostName)
		factory.setPort(portNumber)
		conn = factory.newConnection()
		channel = newConn.createChannel()
		channel.queueBind(queueName, exchangeName, routingKey)
		#with basicConsume, we become a consumer, so we will be taking all messages from the queue and process it
		#Keep in mind this is pub/sub, but will automatically read all messages in queue on connect
		tag = channel.basicConsume(queueName, autoAck, newMessage, cancel, shutdown)
		gAMQP['conn'] = conn
		gAMQP['channel'] = channel
		gAMQP["consumerTag"] = tag
		return channel
1 Like

Thanks for the upgrade!