The following has only been tested against a really simple python flask app to generate sse on the same machine but not internal to ignition, I have not tested this against a production endpoint yet. this will probably need to be tweaked but should serve as a decent starting point for someone looking to consume external server sent events. posting this here because this post is the top hit when searching the forum for implementing a consumer for server sent events and I needed it for a project of my own. I would appreciate any suggestions for improvement as I plan to incorporate into my own project soon. hopefully this helps out.
Basic Flask App Run Outside of Ignition to Generate Server Sent Events
from flask import Flask, Response, stream_with_context
import time
app = Flask(__name__)
@app.route('/stream')
def stream():
def event_stream():
while True:
time.sleep(5)
yield f"data: The current server time is {time.ctime()}\n\n"
return Response(stream_with_context(event_stream()), mimetype="text/event-stream")
@app.route('/')
def index():
return '''
<html>
<body>
<h1>SSE Test</h1>
<div id="sse-data"></div>
<script type="text/javascript">
var source = new EventSource("/stream");
source.onmessage = function(event) {
document.getElementById("sse-data").innerHTML += event.data + "<br>";
};
</script>
</body>
</html>
'''
if __name__ == "__main__":
app.run(threaded=True, port=5000)
Server Sent Event Listener Within Ignition Project Scripting Library
from java.net import URI
from java.net.http import HttpRequest, HttpResponse
from java.io import BufferedReader, InputStreamReader
logger = system.util.getLogger("SSE Client")
class SSEClient(object):
# Given A Streaming Url, This Class With Listen for Server Sent Events and Write Data Recieved to Tags
def __init__(self, url="http://localhost:5000/stream", headers={}):
self._url = url
self._headers = headers
self._running = False
self._logger = logger
self._threadDescription = "SSE-Client-" + str(hash(self))
self._thread = None
# Spawn its own http Client because it Leverages underlying java client
# See https://forum.inductiveautomation.com/t/integration-sse-events-server-sent-event-with-ignition-perspective-applications/102528/13
self._http_client = system.net.httpClient().javaClient
#
def start(self):
# Start listener client in a background thread.
if not self._running:
self._running = True
self._logger.info("Starting " + self._threadDescription)
self._thread = system.util.invokeAsynchronous(function=self._run, description=self._threadDescription)
def stop(self):
#Stop listening.
self._logger.info("Stopping " + self._threadDescription)
try:
self._running = False
self._thread.interrupt()
except Exception as e:
self._logger.error("Failed to Stop and Interrupt Thread: " +str(e))
def _run(self):
#Run the Listener and Perform Reconnect on Failure, Also Interrupt While Loop If Thread Interrupt Detected
#https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/lang/Thread.html
while self._running == True and self._thread.interrupted() == False:
try:
self._logger.info("Connecting to SSE server at: %s" % self._url)
#Build the Header per https://docs.oracle.com/en/java/javase/11/docs/api/java.net.http/java/net/http/HttpRequest.html
request_builder = HttpRequest.newBuilder().uri(URI.create(self._url))
#Add Additional Header Arguments Passed into Class
#https://docs.oracle.com/en/java/javase/11/docs/api/java.net.http/java/net/http/HttpRequest.Builder.html#header(java.lang.String,java.lang.String)
for key, value in self._headers.items():
request_builder.header(str(key), str(value))
request_builder.header("Accept", "text/event-stream")
#Set Request to Get and Return an HttpRequest Object
request = request_builder.GET().build()
#Example Get Request Header We Are Trying to Build, No Body
#GET /stream HTTP/1.1
#User-Agent: ~
#Host: http://localhost:5000
#Accept: text/event-stream
# Because we are on a Seperate Thread, HTTPClient.send blocks until request has been sent and response has been recieved.
# Experiment with Async if this becomes an issue. https://docs.oracle.com/en/java/javase/17/docs/api/java.net.http/java/net/http/HttpClient.html
# https://docs.oracle.com/en/java/javase/11/docs/api/java.net.http/java/net/http/HttpResponse.BodyHandlers.html#ofInputStream()
response = self._http_client.send(request, HttpResponse.BodyHandlers.ofInputStream())
# If Non 200 Code Recieved, May Need to Open Up If We Get False Positives on statusCode in the 200 Range
if response.statusCode() >= 300:
self._logger.error("Failed to connect: HTTP %d" % response.statusCode())
continue
#Read Response into Input Stream, Then Buffered Reader and Parse
#https://docs.oracle.com/javase/8/docs/api/java/io/BufferedReader.html
#https://www.geeksforgeeks.org/java-io-bufferedreader-class-java/
#Use Default Buffer Size to Start
reader = BufferedReader(InputStreamReader(response.body()))
self._parse_stream(reader)
except Exception as e:
self._logger.error("Error in SSE client: %s" % str(e))
try:
self._thread.interrupt()
reader.close()
except:
pass
def _parse_stream(self, reader):
#Parse incoming SSE events
event_name = "message"
data_lines = []
while self._running:
# Get the Latest SSE Message, See Buffered Reader Class Docs
line = reader.readLine()
if line is None:
self._logger.warn("SSE connection closed by server.")
break
#Remove Leading/Trailing White Space
line = line.strip()
# If Line is Blank, Serves as Delimiter for End of Event Data
if not line:
# Combine All Data Lines Up To Blank Line
if data_lines:
full_data = "\n".join(data_lines)
#Perform Work on the Data Recieved
self._dispatch_event(event_name, full_data)
event_name = "message"
data_lines = []
continue
#Retrieve Info From SSE Message, Default to Message if No Event String Recieved, strip leading line identifiers
if line.startswith("event:"):
event_name = line[len("event:"):].strip()
elif line.startswith("data:"):
data_lines.append(line[len("data:"):].strip())
def _dispatch_event(self, event_name, data):
#Handle incoming events, write datastring to tag
self._logger.info("Received Event: [%s] Data: [%s]" % (event_name, data))
system.tag.writeAsync(["[default]SSE_Client/EventData"], [data])
sse_client_lite = SSEClient()
Within Gateway Event Scripts add the following startup and update script
{root_script_location}.sse_client_lite.start()
Within Gateway Event Scripts add the following shutdown script
{root_script_location}.sse_client_lite.stop()
Create Tag to Write Responses To, Update dispatch_event