Transmitting Ignition Device Data To Kafka Through Community Module

Hello,

I am working on a simple project to transmit simulated device data generated by Ignition (on Docker) to a Kafka broker that I have running locally. I am trying to leverage the Ignition Kafka Community Module repo to do this.

I have 2 questions on trying to get this module setup so far:

1. Has anyone been able to use this module successfully so far in this manner, and if so, could you share some thoughts on next steps to troubleshoot the 404 error below?

So far, these are the steps I have taken:

  • built the module using mvn package and signed it to generate a signed .modl file
  • added the .modl file to a running instance of Ignition Gateway (running on Docker)
  • I now see the module added to the gateway and running successfully
  • I set up a kafka producer and consumer using Confluent Platform commands like so:
➜  bin ./kafka-console-producer --topic ignition-consumer-test --bootstrap-server localhost:9092
➜  bin ./kafka-console-consumer --topic ignition-consumer-test --bootstrap-server localhost:9092
  • I am able to see msgs flow from the producer to consumer following these commands

  • I then opened a Script Console on Designer Launcher to run the Consumer snippet found in the repo README. To keep things simple, I started by using the non SSL getConsumer method as seen below:

serverPath = 'server:9093'
# consumer = system.kafka.getSSLConsumer(serverPath,'topicname','groupname') # if SSL is desired
consumer = system.kafka.getConsumer(serverPath,'topicname','groupname') # If ssl is not required
for record in consumer:
	print record["value"], record["timestamp"],record["offset"],record["key"],record["partition"]
  • Some other notes: I have also tried a few different serverPath values: host.docker.internal:9092 and server:9092. I also tried port 9093 for these as specified in the above snippet.

The resulting stack trace raises a 404 error as follows:

Java Traceback:
Traceback (most recent call last):
  File "<input>", line 2, in <module>
	at com.sun.proxy.$Proxy61.RPCGetConsumer(Unknown Source)
	at org.ignitionmdc.apache.kafka.Kafka_Com.getConsumer(Kafka_Com.java:13)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.base/java.lang.reflect.Method.invoke(Unknown Source)
java.lang.reflect.UndeclaredThrowableException: java.lang.reflect.UndeclaredThrowableException

	at org.python.core.Py.JavaError(Py.java:552)
	at org.python.core.Py.JavaError(Py.java:543)
	at org.python.core.PyReflectedFunction.__call__(PyReflectedFunction.java:190)
	at org.python.core.PyReflectedFunction.__call__(PyReflectedFunction.java:206)
	at org.python.core.PyObject.__call__(PyObject.java:515)
	at org.python.core.PyObject.__call__(PyObject.java:519)
	at org.python.pycode._pyx122.f$0(<input>:3)
	at org.python.pycode._pyx122.call_function(<input>)
	at org.python.core.PyTableCode.call(PyTableCode.java:171)
	at org.python.core.PyCode.call(PyCode.java:18)
	at org.python.core.Py.runCode(Py.java:1614)
	at org.python.core.Py.exec(Py.java:1658)
	at org.python.util.PythonInterpreter.exec(PythonInterpreter.java:276)
	at org.python.util.InteractiveInterpreter.runcode(InteractiveInterpreter.java:131)
	at com.inductiveautomation.ignition.designer.gui.tools.jythonconsole.JythonConsole$ConsoleWorker.doInBackground(JythonConsole.java:605)
	at com.inductiveautomation.ignition.designer.gui.tools.jythonconsole.JythonConsole$ConsoleWorker.doInBackground(JythonConsole.java:593)
	at java.desktop/javax.swing.SwingWorker$1.call(Unknown Source)
	at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
	at java.desktop/javax.swing.SwingWorker.run(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.lang.reflect.UndeclaredThrowableException
	at com.sun.proxy.$Proxy61.RPCGetConsumer(Unknown Source)
	at org.ignitionmdc.apache.kafka.Kafka_Com.getConsumer(Kafka_Com.java:13)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.base/java.lang.reflect.Method.invoke(Unknown Source)
	at org.python.core.PyReflectedFunction.__call__(PyReflectedFunction.java:188)
	... 19 more
Caused by: com.inductiveautomation.ignition.client.gateway_interface.GatewayException: Post Error, error code = 404
	at com.inductiveautomation.ignition.client.gateway_interface.GatewayInterface.newGatewayException(GatewayInterface.java:351)
	at com.inductiveautomation.ignition.client.gateway_interface.GatewayInterface.newGatewayException(GatewayInterface.java:347)
	at com.inductiveautomation.ignition.client.gateway_interface.GatewayInterface.getResponse(GatewayInterface.java:468)
	at com.inductiveautomation.ignition.client.gateway_interface.GatewayInterface.sendMessage(GatewayInterface.java:283)
	at com.inductiveautomation.ignition.client.gateway_interface.GatewayInterface.sendMessage(GatewayInterface.java:278)
	at com.inductiveautomation.ignition.client.gateway_interface.GatewayInterface.moduleInvokeSafe(GatewayInterface.java:908)
	at com.inductiveautomation.ignition.client.gateway_interface.ModuleRPCFactory$DynamicRPCHandler.invoke(ModuleRPCFactory.java:53)
	... 26 more

2. Has anyone tried to extend this module to produce messages and transmit over through the Kafka broker for downstream consumption? The repo README claims this is relatively simple, but I’m not sure how to get started with this step. If so, could you share thoughts on this as well?

Thank you for your time!

have you tried 127.0.0.1:9092 or server_ip:9092 as the server address? I'm sure this module worked well.if ignition is not installed at the same server with kafka ,you may also need add a firewall rule at kafka server to allow tcp port 9092 in.pls also check the kafka's configuration "server.properties",

listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://192.168.xxx.xxxx:9092

res = system.kafka.getConsumer("192.168.xxx.xxx:9092","firstEvent","1")
for item in res:
	print item

It’s been awhile since I originally published it, but my test scenario used a separate VM for the producer. The serverpath should be the dns name/ip of the producer (should be pingable or manually creating a consumer from the same relative location should be able to communicate with the producer).

@akhil I’m working on the same thing right now i.e. adding producer functions to the community library.

I’m starting from zero essentially in regards to java so it’s going to be a learning journey.

I was also wondering if anyone out there had already added the producer functions, so if you do come across that please let us know. Likewise if I get something working I would share it.

Nick

Regarding this comment in the community github:

The producer could easily be extended following the existing implementation.

I have experience with java programming short enough to count the days on 1 hand. Following publically available training and documents, here is how I added producer functions as a proof of concept. To test I have installed kafka on my local machine and and working from the "intelliJ" IDE.

Kafka_Com

Added 3 functions. The create producer functions return a string which is the key to access the producer object which is stored in memory. "transmitData" retrieves a producer object from memory, builds a producer record, and then calls the send method of the producer.

    public static String createProducer(String address){
        KafkaRPC rpc = ModuleRPCFactory.create("org.ignitionmdc.apache.kafka.kafka", KafkaRPC.class);
        return rpc.RPCCreateProducer(address);
    }

    public static String createSSLProducer(String address){
        KafkaRPC rpc = ModuleRPCFactory.create("org.ignitionmdc.apache.kafka.kafka", KafkaRPC.class);
        return rpc.RPCCreateSSLProducer(address);
    }

    public static boolean transmitData(String producerKey, String topic, String data){
        KafkaRPC rpc = ModuleRPCFactory.create("org.ignitionmdc.apache.kafka.kafka", KafkaRPC.class);
        return rpc.RPCTransmitData(producerKey, topic, data);
    }

KafkaRPC

public interface KafkaRPC {
    List<Map> RPCGetSSLConsumer(String address, String topic, String groupname);
    List<Map> RPCGetConsumer(String address, String topic, String groupname);
    String RPCCreateProducer(String address);
    String RPCCreateSSLProducer(String address);
    boolean RPCTransmitData(String producerKey, String topic, String data);
}

GatewayHook

Add a hashmap variable similar to what is used for the consumer, this is where the producer object(s) will be stored.

public static Map<String, Producer> ProducerHashMap = new HashMap<>();

Kafka

In addition to adding the functions, how SSL is added to the properties is broken out as a separate function so it can be used by consumer and producer create object functions.

Get producer obect function only changes the useSSL parameter between false and true when calling "buildProducerProps".

    public static Producer<String, String> getProducerObj(String address){
        logger.debug("Creating a new non-ssl producer object");

        Thread.currentThread().setContextClassLoader(null);
        Properties props = buildProducerProps(address, false);
        final Producer<String, String> producer = new KafkaProducer<String, String>(props);

        return producer;
    }

    public static Producer<String, String> getSSLProducerObj(String address){
        logger.debug("Creating a new ssl encrypted producer object");

        Thread.currentThread().setContextClassLoader(null);
        Properties props = buildProducerProps(address, true);
        final Producer<String, String> producer = new KafkaProducer<String, String>(props);

        return producer;
    }

The function which builds the parameters is very minimal for now and would need more advanced options added. You can see how it would select to add SSL parameters or not.

    private static Properties buildProducerProps(String address, boolean useSSL){

        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, address);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        if (useSSL){
            return getSSLProps(props);
        }
        else{
            return props;
        }
    }

Here is the function that adds SSL properties. It takes properties, adds to them and returns them. This is the exact code from the original library, just broken out into a seperate function.

    private static Properties getSSLProps(Properties props){
        String homePath = getGatewayHome();
        String sep = File.separator;

        logger.debug("homepath = " + homePath);
        props.put(SslConfigs.SSL_PROTOCOL_CONFIG,"SSL");
        props.put("security.protocol","SSL");
        props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG,"");

        // Keystore settings
        logger.debug("SSL Keystore Path: "+ homePath+String.format("%swebserver%sssl.key",sep,sep,sep));

        props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,homePath+String.format("%swebserver%sssl.key",sep,sep,sep));
        props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG,"ignition");
        props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG,"ignition");

        // Truststore settings
        String truststorePath = homePath+String.format("%sdata%scertificates%struststore.jks",sep,sep,sep);
        if (fileExists(truststorePath)) {
            props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststorePath);
            props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "ignition");
        }
        return props;
    }

Here the user facing function name is linked to the RPC functions:

    public String createProducer(String address){
        return RPCCreateProducer(address);
    }
    public String createSSLProducer(String address){
        return RPCCreateSSLProducer(address);
    }
    public boolean transmitData(String producerKey, String topic, String data){
        return RPCTransmitData(producerKey, topic, data);
    }

The actual definitions of the RPC functions. One key part to note is how the producer object is created and then stored in memory, not returned to the JYTHON script which will be using these. Likewise, when "transmitData" is called, is retrieves the producer object from memory with a key.

 public String RPCCreateProducer(String address){
        String keyName = String.format("%s",address);

        if (!GatewayHook.ProducerHashMap.containsKey(keyName)) {
            Producer pObj = getProducerObj(address);
            GatewayHook.ProducerHashMap.put(keyName, pObj);
        }
        return(keyName);
    }

    public String RPCCreateSSLProducer(String address){
        String keyName = String.format("%s",address);

        if (!GatewayHook.ProducerHashMap.containsKey(keyName)) {
            Producer pObj = getSSLProducerObj(address);
            GatewayHook.ProducerHashMap.put(keyName, pObj);
        }
        return(keyName);
    }

    public boolean RPCTransmitData(String producerKey, String topic, String data){
        // Get the producer object from memory
        try{
            Producer pObj = GatewayHook.ProducerHashMap.get(producerKey);

            ProducerRecord<String, String> record =
                    new ProducerRecord<String, String>(topic, data);

            pObj.send(record);
            pObj.flush();
            return true;
        }
        catch(Exception e){
            return false;
        }
    }

Finally here is how it is used:

Cheers,

Nick

1 Like

Consider making a pull request. Github with a permissive license is for collaborative development, after all.

I have done so:

1 Like