Developing custom module publish tags to Confluent Kafka

Hi Community.

I am currently developing a custom module using the Ignition SDK Programmer's Guide(Gradle, Groovy, Java). My goal for this module is to subscribe to tags/tag paths and publish data on Confluent Kafka topics.

this is the code for gateway's module build.gradle file:

modlImplementation(group: 'io.confluent', name: 'kafka-avro-serializer', version: '7.7.0')
    modlImplementation(group: 'org.apache.kafka', name: 'kafka-clients', version: '7.7.0-ccs')

and I'm following this post to write the client: Kafka Client Quick Start for Confluent Cloud | Confluent Documentation

at the moment, I have an interesting obstacle. when I build and add the module to the Ignition, getting this line of error:

Caused by: org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: No LoginModule found for org.apache.kafka.common.security.plain.PlainLoginModule

Does anybody know how to solve this? should I configure something in Ignition?

Environment:
Ignition: Version: 8.1.42 Trial license
Mac OS, M1
Java version 17.0.11+9-LTS

It seems like you're just missing some transitive/runtime dependency of the Kafka client lib you're attempting to use. So, a module build problem, not an Ignition problem.

Note also that we're going to have first party Kafka support in 8.3.0, the next major version (releasing early next year).

Thanks, Paul for the quick answer.

It's great to hear that the Kafka module is being developed and released next year. is there a way to see its progress?

regarding my question, I don't understand what's missing. what could be a potential missing part?

Inspect the module file that gets built (it's just a zip file) and see if all of the dependencies you expect to be there are present.

Hi @Kevin.Herron , Yes, I can inspect and all the dependencies I need are present there.
I also created a jaas.conf file contains:

KafkaClient {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="USERNAME"
    password="PASSWORD";
  };

and add this line to ignition.conf:

wrapper.java.additional.7=-Djava.security.auth.login.config=config/jaas.conf

also using

put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='USERNAME' password='PASSWORD';");

in the code.
have no clue why it's behaving like this. The same code in a simple java project works fine. but when adding the module to Ignition modules, it start throwing errors.

PlainLoginModule is an SPI implementation of javax.security.auth.spi.LoginModule - I have no idea whether our module classloading situation allows/supports SPI, and I don't know if anyone's ever tried.

What's the full stacktrace of the error message you're getting?

Sure @paul-griffith
here is my code:

 @Override
    public void startup(LicenseState activationState) {

        final Properties props = new Properties() {{
            put("bootstrap.servers", "pkc-xxxxx.eastus.azure.confluent.cloud:9092");
            put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='USERNAME' password='PASSWORD';");
            put("client.dns.lookup", "use_all_dns_ips"); // Required for correctness in Apache Kafka clients prior to 2.6
            put("session.timeout.ms", 45000);
            // Fixed properties
            put("key.serializer", StringSerializer.class);
            put("value.serializer", StringSerializer.class);
            put("acks", "all");
            put("security.protocol", "SASL_SSL"); // SASL_PLAINTEXT
            put("sasl.mechanism", "PLAIN");
   
        }};

        final String topic = "test-topic";
        String[] users = {"eabara", "jsmith", "sgarcia", "jbernard", "htanaka", "awalther"};
        String[] items = {"book", "alarm clock", "t-shirts", "gift card", "batteries"};
        
        try (final Producer<String, String> producer = new KafkaProducer<>(props)) {
            final Random rnd = new Random();
            final int numMessages = 10;
            for (int i = 0; i < numMessages; i++) {
                String user = users[rnd.nextInt(users.length)];
                String item = items[rnd.nextInt(items.length)];

                producer.send(
                        new ProducerRecord<>(topic, user, item),
                        (event, ex) -> {
                            if (ex != null)
                                ex.printStackTrace();
                            else
                                logger.info("Produced event to topic %s: key = %-10s value = %s%n", topic, user, item);
                        });
            }
            logger.info("%s events were produced to topic %s%n", numMessages, topic);
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        }
    }

here is the error log:

org.apache.kafka.common.KafkaException: Failed to construct kafka producer

at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:439)

at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:290)

at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:317)

at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:302)

at com.mrzslr.gateway.ConfkaGatewayHook.startup(ConfkaGatewayHook.java:220)

at com.inductiveautomation.ignition.gateway.modules.ModuleManagerImpl$LoadedModule.startup(ModuleManagerImpl.java:2446)

at com.inductiveautomation.ignition.gateway.modules.ModuleManagerImpl.startupModule(ModuleManagerImpl.java:1188)

at com.inductiveautomation.ignition.gateway.modules.ModuleManagerImpl$2.call(ModuleManagerImpl.java:734)

at com.inductiveautomation.ignition.gateway.modules.ModuleManagerImpl.executeModuleOperation(ModuleManagerImpl.java:913)

at com.inductiveautomation.ignition.gateway.modules.ModuleManagerImpl.installModuleInternal(ModuleManagerImpl.java:700)

at com.inductiveautomation.ignition.gateway.modules.ModuleManagerImpl$InstallCommand.doExecute(ModuleManagerImpl.java:1915)

at com.inductiveautomation.ignition.gateway.modules.ModuleManagerImpl$AbstractModuleCommand.execute(ModuleManagerImpl.java:1864)

at com.inductiveautomation.ignition.gateway.modules.ModuleManagerImpl$Receiver.receiveCall(ModuleManagerImpl.java:1820)

at com.inductiveautomation.ignition.gateway.redundancy.QueueableMessageReceiver.receiveCall(QueueableMessageReceiver.java:47)

at com.inductiveautomation.ignition.gateway.redundancy.RedundancyManagerImpl.dispatchMessage(RedundancyManagerImpl.java:1044)

at com.inductiveautomation.ignition.gateway.redundancy.RedundancyManagerImpl$ExecuteTask.run(RedundancyManagerImpl.java:1112)

at com.inductiveautomation.ignition.common.execution.impl.BasicExecutionEngine$ThrowableCatchingRunnable.run(BasicExecutionEngine.java:550)

at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)

at java.base/java.util.concurrent.FutureTask.run(Unknown Source)

at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)

at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)

at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)

at java.base/java.lang.Thread.run(Unknown Source)

Caused by: org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: No LoginModule found for org.apache.kafka.common.security.plain.PlainLoginModule

at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:184)

at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:192)

at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:81)

at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:105)

at org.apache.kafka.clients.producer.KafkaProducer.newSender(KafkaProducer.java:447)

at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:428)

... 22 common frames omitted

Caused by: javax.security.auth.login.LoginException: No LoginModule found for org.apache.kafka.common.security.plain.PlainLoginModule

at java.base/javax.security.auth.login.LoginContext.invoke(Unknown Source)

at java.base/javax.security.auth.login.LoginContext$4.run(Unknown Source)

at java.base/javax.security.auth.login.LoginContext$4.run(Unknown Source)

at java.base/java.security.AccessController.doPrivileged(Unknown Source)

at java.base/javax.security.auth.login.LoginContext.invokePriv(Unknown Source)

at java.base/javax.security.auth.login.LoginContext.login(Unknown Source)

at org.apache.kafka.common.security.authenticator.AbstractLogin.login(AbstractLogin.java:60)

at org.apache.kafka.common.security.authenticator.LoginManager.(LoginManager.java:62)

at org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:105)

at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:170)