Developing custom module publish tags to Confluent Kafka

Hi Community.

I am currently developing a custom module using the Ignition SDK Programmer's Guide(Gradle, Groovy, Java). My goal for this module is to subscribe to tags/tag paths and publish data on Confluent Kafka topics.

this is the code for gateway's module build.gradle file:

modlImplementation(group: 'io.confluent', name: 'kafka-avro-serializer', version: '7.7.0')
    modlImplementation(group: 'org.apache.kafka', name: 'kafka-clients', version: '7.7.0-ccs')

and I'm following this post to write the client: Kafka Client Quick Start for Confluent Cloud | Confluent Documentation

at the moment, I have an interesting obstacle. when I build and add the module to the Ignition, getting this line of error:

Caused by: org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: No LoginModule found for org.apache.kafka.common.security.plain.PlainLoginModule

Does anybody know how to solve this? should I configure something in Ignition?

Environment:
Ignition: Version: 8.1.42 Trial license
Mac OS, M1
Java version 17.0.11+9-LTS

It seems like you're just missing some transitive/runtime dependency of the Kafka client lib you're attempting to use. So, a module build problem, not an Ignition problem.

Note also that we're going to have first party Kafka support in 8.3.0, the next major version (releasing early next year).

1 Like

Thanks, Paul for the quick answer.

It's great to hear that the Kafka module is being developed and released next year. is there a way to see its progress?

regarding my question, I don't understand what's missing. what could be a potential missing part?

Inspect the module file that gets built (it's just a zip file) and see if all of the dependencies you expect to be there are present.

Hi @Kevin.Herron , Yes, I can inspect and all the dependencies I need are present there.
I also created a jaas.conf file contains:

KafkaClient {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="USERNAME"
    password="PASSWORD";
  };

and add this line to ignition.conf:

wrapper.java.additional.7=-Djava.security.auth.login.config=config/jaas.conf

also using

put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='USERNAME' password='PASSWORD';");

in the code.
have no clue why it's behaving like this. The same code in a simple java project works fine. but when adding the module to Ignition modules, it start throwing errors.

PlainLoginModule is an SPI implementation of javax.security.auth.spi.LoginModule - I have no idea whether our module classloading situation allows/supports SPI, and I don't know if anyone's ever tried.

What's the full stacktrace of the error message you're getting?

Sure @PGriffith
here is my code:

 @Override
    public void startup(LicenseState activationState) {

        final Properties props = new Properties() {{
            put("bootstrap.servers", "pkc-xxxxx.eastus.azure.confluent.cloud:9092");
            put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='USERNAME' password='PASSWORD';");
            put("client.dns.lookup", "use_all_dns_ips"); // Required for correctness in Apache Kafka clients prior to 2.6
            put("session.timeout.ms", 45000);
            // Fixed properties
            put("key.serializer", StringSerializer.class);
            put("value.serializer", StringSerializer.class);
            put("acks", "all");
            put("security.protocol", "SASL_SSL"); // SASL_PLAINTEXT
            put("sasl.mechanism", "PLAIN");
   
        }};

        final String topic = "test-topic";
        String[] users = {"eabara", "jsmith", "sgarcia", "jbernard", "htanaka", "awalther"};
        String[] items = {"book", "alarm clock", "t-shirts", "gift card", "batteries"};
        
        try (final Producer<String, String> producer = new KafkaProducer<>(props)) {
            final Random rnd = new Random();
            final int numMessages = 10;
            for (int i = 0; i < numMessages; i++) {
                String user = users[rnd.nextInt(users.length)];
                String item = items[rnd.nextInt(items.length)];

                producer.send(
                        new ProducerRecord<>(topic, user, item),
                        (event, ex) -> {
                            if (ex != null)
                                ex.printStackTrace();
                            else
                                logger.info("Produced event to topic %s: key = %-10s value = %s%n", topic, user, item);
                        });
            }
            logger.info("%s events were produced to topic %s%n", numMessages, topic);
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        }
    }

here is the error log:

org.apache.kafka.common.KafkaException: Failed to construct kafka producer

at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:439)

at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:290)

at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:317)

at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:302)

at com.mrzslr.gateway.ConfkaGatewayHook.startup(ConfkaGatewayHook.java:220)

at com.inductiveautomation.ignition.gateway.modules.ModuleManagerImpl$LoadedModule.startup(ModuleManagerImpl.java:2446)

at com.inductiveautomation.ignition.gateway.modules.ModuleManagerImpl.startupModule(ModuleManagerImpl.java:1188)

at com.inductiveautomation.ignition.gateway.modules.ModuleManagerImpl$2.call(ModuleManagerImpl.java:734)

at com.inductiveautomation.ignition.gateway.modules.ModuleManagerImpl.executeModuleOperation(ModuleManagerImpl.java:913)

at com.inductiveautomation.ignition.gateway.modules.ModuleManagerImpl.installModuleInternal(ModuleManagerImpl.java:700)

at com.inductiveautomation.ignition.gateway.modules.ModuleManagerImpl$InstallCommand.doExecute(ModuleManagerImpl.java:1915)

at com.inductiveautomation.ignition.gateway.modules.ModuleManagerImpl$AbstractModuleCommand.execute(ModuleManagerImpl.java:1864)

at com.inductiveautomation.ignition.gateway.modules.ModuleManagerImpl$Receiver.receiveCall(ModuleManagerImpl.java:1820)

at com.inductiveautomation.ignition.gateway.redundancy.QueueableMessageReceiver.receiveCall(QueueableMessageReceiver.java:47)

at com.inductiveautomation.ignition.gateway.redundancy.RedundancyManagerImpl.dispatchMessage(RedundancyManagerImpl.java:1044)

at com.inductiveautomation.ignition.gateway.redundancy.RedundancyManagerImpl$ExecuteTask.run(RedundancyManagerImpl.java:1112)

at com.inductiveautomation.ignition.common.execution.impl.BasicExecutionEngine$ThrowableCatchingRunnable.run(BasicExecutionEngine.java:550)

at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)

at java.base/java.util.concurrent.FutureTask.run(Unknown Source)

at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)

at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)

at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)

at java.base/java.lang.Thread.run(Unknown Source)

Caused by: org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: No LoginModule found for org.apache.kafka.common.security.plain.PlainLoginModule

at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:184)

at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:192)

at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:81)

at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:105)

at org.apache.kafka.clients.producer.KafkaProducer.newSender(KafkaProducer.java:447)

at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:428)

... 22 common frames omitted

Caused by: javax.security.auth.login.LoginException: No LoginModule found for org.apache.kafka.common.security.plain.PlainLoginModule

at java.base/javax.security.auth.login.LoginContext.invoke(Unknown Source)

at java.base/javax.security.auth.login.LoginContext$4.run(Unknown Source)

at java.base/javax.security.auth.login.LoginContext$4.run(Unknown Source)

at java.base/java.security.AccessController.doPrivileged(Unknown Source)

at java.base/javax.security.auth.login.LoginContext.invokePriv(Unknown Source)

at java.base/javax.security.auth.login.LoginContext.login(Unknown Source)

at org.apache.kafka.common.security.authenticator.AbstractLogin.login(AbstractLogin.java:60)

at org.apache.kafka.common.security.authenticator.LoginManager.(LoginManager.java:62)

at org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:105)

at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:170)