High Zulu CPU when using async futures in module

Hi,

I am writing a history provider. This history provider can have multiple instances each with a Tag History Provider and a Data Sink. During the startup of my module, I am looping through each Tag Provider and caching all tag properties. The tag to grab all tags using a history provider is slow, so I am doing all this work on a future. Though inefficient, I have to loop through all tags as seen in this example. Everything in my browse function is async and I will eventually have to send all the tags for each instance to my historian through its web api.

The problem I am having is that whenever I restart my module in the gateway, it takes a long time as Ignition takes almost all of my CPU. I did some research and saw that Zulu does not handle futures well and tried updating my code to use thread pools, but this still doesn’t help things.

During startup I essentially need to do the following:

  1. Find all tags
  2. Pull each tag’s configuration (since browseNodeAsync does not grab this)
  3. Cache the tag if the tag’s history provider matches one of my instances
  4. Send all the cached tags to my API per instances

Any help would be appreciated!

    private CompletableFuture<List<TagPath>> browseNodeAsync(TagProvider provider, TagPath parentPath) {
        return provider.browseAsync(parentPath, BrowseFilter.NONE)
                .thenComposeAsync(results -> {
                    if (results.getResultQuality().isNotGood()) {
                        return CompletableFuture.failedFuture(new Exception("Bad quality results: " + results.getResultQuality()));
                    }

                    Collection<NodeDescription> nodes = results.getResults();
                    List<CompletableFuture<List<TagPath>>> childFutures = new ArrayList<>();
                    List<TagPath> paths = new ArrayList<>();
                    if (nodes != null){
                        for (NodeDescription node : nodes) {
                            TagPath fullPath = node.getFullPath();
                            paths.add(fullPath);

                            if (node.hasChildren() && node.getDataType() != DataType.Document) {
                                TagPath childPath = parentPath.getChildPath(node.getName());
                                childFutures.add(browseNodeAsync(provider, childPath));
                            }
                        }
                    }

                    return CompletableFuture.allOf(childFutures.toArray(new CompletableFuture[0]))
                            .thenApply(v -> childFutures.stream()
                                    .map(CompletableFuture::join)
                                    .flatMap(List::stream)
                                    .collect(Collectors.toCollection(() -> paths)));
                });
    }

    private CompletableFuture<List<QualityCode>> initializeCacheForProviderAsync(TagProvider provider, Set<String> names) {
        TagPath root;
        try {
            root = TagPathParser.parse("");
        } catch (IOException e) {
            return CompletableFuture.failedFuture(e);
        }

        if (root != null){
            return browseNodeAsync(provider, root)
                    .thenCompose(paths -> provider.getTagConfigsAsync(paths, true, false))
                    .thenCompose(tagConfigs -> {
                        List<TagConfiguration> tagConfigsToUpdate = new ArrayList<>();

                        for (TagConfiguration tagConfig : tagConfigs) {
                            var historyProvider = tagConfig.getOrDefault(WellKnownTagProps.HistoryProvider);
                            if (tagConfig.getType() != TagObjectType.Folder && historyProvider != null && names.contains(historyProvider)) {
                                var tagId = tagConfig.get(MyModule.MyProperty);
                                if (tagId == null) {
                                    tagConfig.set(PropertyValue.of(MyModule.MyProperty, UUID.randomUUID().toString()));
                                    tagConfigsToUpdate.add(tagConfig);
                                }

                                var propertiesByTagPath = PropertiesByInstanceThenByTagPath.get(historyProvider);
                                propertiesByTagPath.put(tagConfig.getPath(), tagConfig);
                            }
                        }

                        if (!tagConfigsToUpdate.isEmpty()) {
                            return provider.saveTagConfigsAsync(tagConfigsToUpdate, CollisionPolicy.MergeOverwrite);
                        } else {
                            return CompletableFuture.completedFuture(null);
                        }
                    });
        }
        else{
            return CompletableFuture.completedFuture(null);
        }
    }

    public void startup(LicenseState activationState) {

        List<CompletableFuture<Void>> instanceInitFutures = new ArrayList<>();

        startedFuture = CompletableFuture
                .allOf(context.getTagManager().getTagProviders().stream()
                        .map(provider -> initializeCacheForProviderAsync(provider, names))
                        .toArray(CompletableFuture[]::new))
                .thenComposeAsync(v -> {
                    // Initialize instances in parallel
                    for (MyPersistentRecord record : records) {
                        CompletableFuture<Void> instanceFuture = instanceManager.addAsync(record);
                        instanceInitFutures.add(instanceFuture);
                    }
                    return CompletableFuture.allOf(instanceInitFutures.toArray(new CompletableFuture[0]));
                }, ASYNC_EXECUTOR)
                .whenComplete((result, ex) -> {
                    if (ex == null) {
                        log.info("My module started successfully.");
                    } else {
                        log.error("My failed to start successfully.", ex);
                    }
                });
    }

You should not be doing this tag browse at all.

When your historian is chosen for a given tag's history, your sink will begin receiving storage requests for that tag. Do not browse for tags using your provider. Just let the history sink be notified naturally.

If there is some aspect of the history API that doesn't fit your requirements, consider using the tag actor API instead. Again, registering a tag actor factory with suitable properties and conditions will cause actor instances to be created when tags are configured with the desired properties. Again, no browsing required.

1 Like

Why wouldn't it?

You are browsing all providers, concurrently, recursively, and with unbounded parallelism at each recursive step (or at least, bounded only by whatever executor ends up being used).

2 Likes

I am using a Tag Actor for tag creation and updates (Though this does not work with copying/pasting a tag). Regardless, my historian requires the tag be added before any writes. If the data sink begins writing before the tag has been added, my historian rejects it and ignition will quarantine the data. I have adjusted my historian to store a local copy of the tag, so now I no longer need to browse during restarts. Still, I need to fill my module's caches on startup from my historian.

Excuse my ignorance with zulu/java, I am a c# developer trying to get this solution working.

I very carefully wrote:

{Fresh bold}

Pick one or the other. Don't try to use both tag actors and data sinks.

This is an unexpected constraint on the historian API. Accept the write to new tags. Delay the write during startup until your metadata is filled from disk (or wherever), but don't reject new tags to write. Add them on the fly.

You should strongly consider tossing out the mess you've made. Instead of digging a deeper hole.

(Getting the first try right, while also learning java, is probably too much to expect. FWIW, I have a historian concept module in the to do list, and I hit these same hurdles. I started it over.)