Integration SSE events (Server Sent Event) with Ignition perspective applications

Hi Team, I am from John Deere and we are trying to integrate Server sent events into our perspective apps.
Its not feasible with system.net.httpGet due to its requrement of persistent connection to receive the response from server.

I need to understand what is the appropriate way to accomplish this. I donot want to develop a middlelayer because that will increase the ,complexity for our users. I tried using hosting http page and embedding into inline frame and it worked but not a feasible solution i think.

Please respond on this. thanks.

Are you trying to consume SSE or produce SSE data?

If you’re producing SSE data, I’ve got a module that does that.

APIs are normally implemented on the Ignition side with the WebDev module. So your server can make an API call to Ignition for whatever purposes you implement.

(Or use Ben's module.)

Thanks for your quick replies!
@bmusson I am trying to consume SSE event APIs. There is external server which is emitting the records over SSE APIs and we are trying to consume in Ignition as a client application.

@pturmel I tried using mount folder approach where i kept html file in the server and used that. But this approach is quite complex from customer standpoint. We are the platform owners and we are trying to provide a way consumers can consume the SSE APIs.

Ha, you might want to find a name other than "EventStream" / "event-stream" or things are going to get confusing quick once 8.3 is out.

1 Like

Yes I know :man_facepalming:

Case of bad timing, I released the first version like a month before y’all announced your Event Streams feature.

It’ll probably get named Embr-SSE. Also, I’m interested in trying to make an SSE source/connector for Event Streams, which I think is exactly the feature this OP is looking for.

1 Like

Yeah, that sounds like a good fit.

2 Likes

Totally unfit for APIs. Just use jython endpoints and script your API behavior.

2 Likes

@pturmel Can you please share some reference/resource/example for the same?

I’d recommend some sort of thread for your SSE client to live on, a queue for it to put received messages into, and then a way to process the messages in the queue.

Not a trivial task, but there are resources around the forum (mostly from Phil) on how to do those pieces individually.

I know nothing about SSE itself, but the basic flow of a jython API endpoint is:

  • Decode the request payload (WebDev will make jython dictionaries for you if a proper JSON payload),
  • Identify the operation requested and dispatch to the appropriate library function,
  • ... which returns a response payload,
  • Encode and send the response payload. If a jython dictionary compatible with JSON, WebDev can encode that for you.

WebDev is naturally multithreaded--no need for a queue if just doing the above.

They want the opposite of what you're trying to help them with. They just need to consume SSE from another server. This won't involve WebDev.

3 Likes

There's a very verbose, of dubious quality example of a Java SSE client based on Java's built in HTTP client here. That same HTTP client is what's backing the system.net.httpClient function, and is directly accessible via system.net.httpClient().javaClient.

In theory, you could translate the code in the link above to Jython, provide system.net.httpClient().javaClient as the input HTTP client, and use that as your baseline.

More fruitful might be reading up on the SSE spec, Java's built in HTTP client and how it works, and writing the code from scratch. :man_shrugging:

Whichever way you go is going to take some engineering time, there's no turn-key solution.

EDIT: Actually, I like some of the other links on this SO thread better:

2 Likes

The following has only been tested against a really simple python flask app to generate sse on the same machine but not internal to ignition, I have not tested this against a production endpoint yet. this will probably need to be tweaked but should serve as a decent starting point for someone looking to consume external server sent events. posting this here because this post is the top hit when searching the forum for implementing a consumer for server sent events and I needed it for a project of my own. I would appreciate any suggestions for improvement as I plan to incorporate into my own project soon. hopefully this helps out.

Basic Flask App Run Outside of Ignition to Generate Server Sent Events

from flask import Flask, Response, stream_with_context
import time

app = Flask(__name__)

@app.route('/stream')
def stream():
    def event_stream():
        while True:
            time.sleep(5)
            yield f"data: The current server time is {time.ctime()}\n\n"
    return Response(stream_with_context(event_stream()), mimetype="text/event-stream")

@app.route('/')
def index():
    return '''
        <html>
            <body>
                <h1>SSE Test</h1>
                <div id="sse-data"></div>
                <script type="text/javascript">
                    var source = new EventSource("/stream");
                    source.onmessage = function(event) {
                        document.getElementById("sse-data").innerHTML += event.data + "<br>";
                    };
                </script>
            </body>
        </html>
    '''

if __name__ == "__main__":
    app.run(threaded=True, port=5000)

Server Sent Event Listener Within Ignition Project Scripting Library

from java.net import URI
from java.net.http import HttpRequest, HttpResponse
from java.io import BufferedReader, InputStreamReader

logger = system.util.getLogger("SSE Client")
		
class SSEClient(object):
	
	# Given A Streaming Url, This Class With Listen for Server Sent Events and Write Data Recieved to Tags
	
	def __init__(self, url="http://localhost:5000/stream", headers={}):
		self._url = url
		self._headers = headers
		self._running = False
		self._logger = logger
		self._threadDescription = "SSE-Client-" + str(hash(self))
		self._thread = None
		# Spawn its own http Client because it Leverages underlying java client
		# See https://forum.inductiveautomation.com/t/integration-sse-events-server-sent-event-with-ignition-perspective-applications/102528/13
		self._http_client = system.net.httpClient().javaClient
		#
	def start(self):
        # Start listener client in a background thread.
		if not self._running:
			self._running = True
			self._logger.info("Starting " + self._threadDescription)
			self._thread = system.util.invokeAsynchronous(function=self._run, description=self._threadDescription)

	def stop(self):
		#Stop listening.
		self._logger.info("Stopping " + self._threadDescription)
		try:
			self._running = False
			self._thread.interrupt()
		except Exception as e:
			self._logger.error("Failed to Stop and Interrupt Thread: " +str(e))

	def _run(self):
        #Run the Listener and Perform Reconnect on Failure, Also Interrupt While Loop If Thread Interrupt Detected 
        #https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/lang/Thread.html
		while self._running == True and self._thread.interrupted() == False:
			try:
				self._logger.info("Connecting to SSE server at: %s" % self._url)
				#Build the Header per https://docs.oracle.com/en/java/javase/11/docs/api/java.net.http/java/net/http/HttpRequest.html
				request_builder = HttpRequest.newBuilder().uri(URI.create(self._url))
				#Add Additional Header Arguments Passed into Class
				#https://docs.oracle.com/en/java/javase/11/docs/api/java.net.http/java/net/http/HttpRequest.Builder.html#header(java.lang.String,java.lang.String)
				for key, value in self._headers.items():
				    request_builder.header(str(key), str(value))
				request_builder.header("Accept", "text/event-stream")
				#Set Request to Get and Return an HttpRequest Object
				request = request_builder.GET().build()
				#Example Get Request Header We Are Trying to Build, No Body
				#GET /stream HTTP/1.1
				#User-Agent: ~
				#Host: http://localhost:5000
				#Accept: text/event-stream
				
				
				# Because we are on a Seperate Thread, HTTPClient.send blocks until request has been sent and response has been recieved.
				# Experiment with Async if this becomes an issue. https://docs.oracle.com/en/java/javase/17/docs/api/java.net.http/java/net/http/HttpClient.html
				# https://docs.oracle.com/en/java/javase/11/docs/api/java.net.http/java/net/http/HttpResponse.BodyHandlers.html#ofInputStream()
				response = self._http_client.send(request, HttpResponse.BodyHandlers.ofInputStream())
				
				# If Non 200 Code Recieved, May Need to Open Up If We Get False Positives on statusCode in the 200 Range
				if response.statusCode() >= 300:
				    self._logger.error("Failed to connect: HTTP %d" % response.statusCode())
				    continue
				
				#Read Response into Input Stream, Then Buffered Reader and Parse
				#https://docs.oracle.com/javase/8/docs/api/java/io/BufferedReader.html
				#https://www.geeksforgeeks.org/java-io-bufferedreader-class-java/
				#Use Default Buffer Size to Start
				reader = BufferedReader(InputStreamReader(response.body()))
				self._parse_stream(reader)
		
			except Exception as e:
				self._logger.error("Error in SSE client: %s" % str(e))
				try:
					self._thread.interrupt()
					reader.close()
				except:
					pass

	def _parse_stream(self, reader):
		#Parse incoming SSE events
		event_name = "message"
		data_lines = []

		while self._running:
        	# Get the Latest SSE Message, See Buffered Reader Class Docs
			line = reader.readLine()
			if line is None:
				self._logger.warn("SSE connection closed by server.")
				break

			#Remove Leading/Trailing White Space
			line = line.strip()
			# If Line is Blank, Serves as Delimiter for End of Event Data
			if not line:
                # Combine All Data Lines Up To Blank Line
				if data_lines:
					full_data = "\n".join(data_lines)
					#Perform Work on the Data Recieved
					self._dispatch_event(event_name, full_data)
					event_name = "message"
					data_lines = []
				continue

			#Retrieve Info From SSE Message, Default to Message if No Event String Recieved, strip leading line identifiers
			if line.startswith("event:"):
				event_name = line[len("event:"):].strip()
			elif line.startswith("data:"):
				data_lines.append(line[len("data:"):].strip())

	def _dispatch_event(self, event_name, data):
		#Handle incoming events, write datastring to tag
		self._logger.info("Received Event: [%s] Data: [%s]" % (event_name, data))
		system.tag.writeAsync(["[default]SSE_Client/EventData"], [data])
        
sse_client_lite = SSEClient()

Within Gateway Event Scripts add the following startup and update script

{root_script_location}.sse_client_lite.start()

Within Gateway Event Scripts add the following shutdown script

{root_script_location}.sse_client_lite.stop()

Create Tag to Write Responses To, Update dispatch_event