Event Streams API Preview

Could somebody point me towards the classes we'd be using in 8.3 to publish events for the new event streams feature coming? I'm working on a feature right now that would probably utilize this in the future so I'd like to plan ahead if possible :slight_smile:

You can look around in here though I dont see anything yet about it.

Yeah I guess since there isn't anything here, is there any way of knowing what the API will look like from Inductive to publish a new event? I'm assuming we'll just need a namespace/topic and a payload?

It's not showing up because the event streams feature hasn't been merged in yet. It's going to be a new module, so once you've marked it as a dependency and pulled it in you'll be able to compile against it.

On the gateway, you'll have to register your source and/or handler with a central registry, which will call a startup method on your hook that delivers you an EventStreamSource.Subscriber that you submit events to whenever your code deems it appropriate:

/**
 * Provides {@link EventPayload} to the Event Stream through the {@link Subscriber} interface.
 */
public interface EventStreamSource {

    /**
     * Registered with {@link EventStreamRegistry} to create concrete implementation of
     * {@link EventStreamSource} from the provided configuration.
     */
    @FunctionalInterface
    interface Factory {

        /**
         * Called once when Event Stream is first created.
         * @param context {@link EventStreamContext}
         * @param config {@link EventStreamSource} configuration that defines its behavior
         * @return {@link EventStreamSource}
         */
        EventStreamSource create(EventStreamContext context, JsonObject config);

    }

    /**
     * Interface for submitting events to the Event Stream.
     */
    interface Subscriber {

        /**
         * Notifies Event Stream that events have arrived and are ready to process.
         * <br>
         * This call will never block. If blocking is desired, use the returned {@link CompletionStage} to wait for
         * the event to be processed.
         *
         * @param events List of events to pass Event Stream to start processing.
         *
         * @return Future of results that completes once all {@code events} have been processed.
         */
        CompletionStage<EventResultList> submitEvents(List<EventPayload> events);

        /**
         * Notifies Event Stream that an event has arrived and are ready to process. If multiple events are available,
         * {@link Subscriber#submitEvents} should be used.
         * <br>
         * This call will never block. If blocking is desired, use the returned {@link CompletionStage} to wait for
         * the event to be processed.
         *
         * @param event Event to pass Event Stream to start processing.
         *
         * @return Future of results that completes once all {@code events} have been processed.
         */
        default CompletionStage<EventResult> submitEvent(EventPayload event) {
            return submitEvents(List.of(event)).thenApply(results -> results.results().get(0));
        }

        /**
         * Notifies Event Stream an error occurred. This is primarily used for diagnostics, the Event Stream will still
         * be considered running until Shutdown. If the error is resolved events can continue to be submitted
         *
         * @param throwable Throwable that caused the error.
         */
        void onError(Throwable throwable);
    }

    /**
     * Called on Event Stream startup. Events can safely be sent to {@link Subscriber#submitEvents} or
     * {@link Subscriber#submitEvent}.
     */
    void onStartup(Subscriber subscriber);

    /**
     * Called on {@link EventStreamSource} shutdown. No more events should be sent through {@link Subscriber}. If
     * events are submitted after, they'll immediately complete with Error result.
     */
    void onShutdown();
}

You'll separately have to register different configuration (mostly for the sake of UI) in the designer to have a way to actually configure whatever settings make sense for your source, much like the way Perspective handles binding configuration.

Disclaimer: This code isn't merged yet and obviously we're pre-public-beta here, so this API is potentially subject to change still.

3 Likes

Thank you Paul! What's in an EventPayload if you don't mind me asking?

EventPayload is just a wrapper class that contains an arbitrary Object and optional 'metadata' in the form of a Map<String, Object>. Pretty maximally flexible.