I’ve been trying to use AMQP in Ignition, using the amqp-client-5.14.2.jar library.
For now, I’ve just added the library to the folder in Ignition, maybe later, I will build a separate module for it.
But this function is not connected anymore to the connection I first created, so it gives an error.
How do I make sure there is always one connection from my platform to the AMQP broker?
And how do I manage that active connection without ?
I assume I need something like persistent classes? @pturmel, I’ve seen you talk allot about classes an persistent classes, would you know how I can proceed?
I’d definitely recommend doing some digging on the forums for “prior art” here. One other thing to note: system.util.globals / system.util.getGlobals() is a dictionary that persists across the lifetime of the local JVM (so as long as the gateway is up).
But I think you guys are a few levels ahead of me.
I simply want one always open connection (I need a class for this I think) when Ignition is running and want to speak to this connection when necessary.
How can I use system.util.getGlobals() to store a class? Is there any example on this?
And also, I'm a bit confused about system.util.getGlobals(). Are these globals something on your gateway? Because I see they are different when I open a new designer session.
*Edit: Ah, it is the same when you run it in a gateway scope, I see... Now I just need to find how I can store my connections in it.
The point of those topics is that you CAN’T just “store a class”. You will leak gobs of gateway memory if you try to do it naively. On this topic, you will simply have to study and test until you understand it. I strongly recommend testing on a non-production gateway.
How would I typically refer back to a connection made in a script? Would the connection have some identifier, which I would need to store in globals and use later?
Ofcourse, I’m testing all new things in local containers.
We are heading down the same path. Currently working on a module which wraps the latest AMQP Client library.
You can reference the same global variables within each of your functions:
def close():
global conn // Added
global channel // Added
system.perspective.print('closing connection ...')
channel.close()
conn.close()
Have you made progress on a ‘unique connection identifier’?
Found this related StackOverflow thread:
Yes I did. I store my connection in getGlobals, and this seems to work across sessions.
def connect():
factory = ConnectionFactory()
factory.setUsername(userName)
factory.setPassword(password)
factory.setVirtualHost(virtualHost)
factory.setHost(hostName)
factory.setPort(portNumber)
g = system.util.getGlobals()
if not "AMQP" in g:
g["AMQP"] = {}
gAMQP = g["AMQP"]
if not "conn" in gAMQP:
conn = factory.newConnection()
gAMQP["conn"] = conn
else:
conn = gAMQP["conn"]
if not "channel" in gAMQP:
channel = conn.createChannel()
gAMQP["channel"] = channel
channel.queueBind(queueName, exchangeName, routingKey)
#with basicConsume, we become a consumer, so we will be taking all messages from the queue and process it
#Keep in mind this is pub/sub, but will automatically read all messages in queue on connect
tag = channel.basicConsume(queueName, autoAck, newMessage, cancel, shutdown)
gAMQP["consumerTag"] = tag
else:
channel = gAMQP["channel"]
Yes, we will be working on an AMQP module as well, somewhere in the future.
import thread
# Returns channel. If was a new channel, consumerTag is created too.
def conditionalConnect():
g = system.util.getGlobals()
# Ensure gAMQP exists in a thread-safe manner
gAMQP = g.setdefault("AMQP", {})
# Attempt to get the channel unlocked for speed
channel = gAMQP.get("channel")
if channel:
return channel
# Obtain and use lock
channelLock = _g.get('channelLock', None)
if channelLock is None:
channelLock = _g.setdefault('channelLock', thread.allocate_lock())
with channelLock:
# Retry under the lock in case another thread was already in the
# creation sequence.
channel = gAMQP.get("channel")
if channel:
return channel
# Nope. Connect and create and consume.
factory = ConnectionFactory()
factory.setUsername(userName)
factory.setPassword(password)
factory.setVirtualHost(virtualHost)
factory.setHost(hostName)
factory.setPort(portNumber)
conn = factory.newConnection()
channel = newConn.createChannel()
channel.queueBind(queueName, exchangeName, routingKey)
#with basicConsume, we become a consumer, so we will be taking all messages from the queue and process it
#Keep in mind this is pub/sub, but will automatically read all messages in queue on connect
tag = channel.basicConsume(queueName, autoAck, newMessage, cancel, shutdown)
gAMQP['conn'] = conn
gAMQP['channel'] = channel
gAMQP["consumerTag"] = tag
return channel