Could somebody point me towards the classes we'd be using in 8.3 to publish events for the new event streams feature coming? I'm working on a feature right now that would probably utilize this in the future so I'd like to plan ahead if possible
You can look around in here though I dont see anything yet about it.
Yeah I guess since there isn't anything here, is there any way of knowing what the API will look like from Inductive to publish a new event? I'm assuming we'll just need a namespace/topic and a payload?
It's not showing up because the event streams feature hasn't been merged in yet. It's going to be a new module, so once you've marked it as a dependency and pulled it in you'll be able to compile against it.
On the gateway, you'll have to register your source and/or handler with a central registry, which will call a startup method on your hook that delivers you an EventStreamSource.Subscriber
that you submit events to whenever your code deems it appropriate:
/**
* Provides {@link EventPayload} to the Event Stream through the {@link Subscriber} interface.
*/
public interface EventStreamSource {
/**
* Registered with {@link EventStreamRegistry} to create concrete implementation of
* {@link EventStreamSource} from the provided configuration.
*/
@FunctionalInterface
interface Factory {
/**
* Called once when Event Stream is first created.
* @param context {@link EventStreamContext}
* @param config {@link EventStreamSource} configuration that defines its behavior
* @return {@link EventStreamSource}
*/
EventStreamSource create(EventStreamContext context, JsonObject config);
}
/**
* Interface for submitting events to the Event Stream.
*/
interface Subscriber {
/**
* Notifies Event Stream that events have arrived and are ready to process.
* <br>
* This call will never block. If blocking is desired, use the returned {@link CompletionStage} to wait for
* the event to be processed.
*
* @param events List of events to pass Event Stream to start processing.
*
* @return Future of results that completes once all {@code events} have been processed.
*/
CompletionStage<EventResultList> submitEvents(List<EventPayload> events);
/**
* Notifies Event Stream that an event has arrived and are ready to process. If multiple events are available,
* {@link Subscriber#submitEvents} should be used.
* <br>
* This call will never block. If blocking is desired, use the returned {@link CompletionStage} to wait for
* the event to be processed.
*
* @param event Event to pass Event Stream to start processing.
*
* @return Future of results that completes once all {@code events} have been processed.
*/
default CompletionStage<EventResult> submitEvent(EventPayload event) {
return submitEvents(List.of(event)).thenApply(results -> results.results().get(0));
}
/**
* Notifies Event Stream an error occurred. This is primarily used for diagnostics, the Event Stream will still
* be considered running until Shutdown. If the error is resolved events can continue to be submitted
*
* @param throwable Throwable that caused the error.
*/
void onError(Throwable throwable);
}
/**
* Called on Event Stream startup. Events can safely be sent to {@link Subscriber#submitEvents} or
* {@link Subscriber#submitEvent}.
*/
void onStartup(Subscriber subscriber);
/**
* Called on {@link EventStreamSource} shutdown. No more events should be sent through {@link Subscriber}. If
* events are submitted after, they'll immediately complete with Error result.
*/
void onShutdown();
}
You'll separately have to register different configuration (mostly for the sake of UI) in the designer to have a way to actually configure whatever settings make sense for your source, much like the way Perspective handles binding configuration.
Disclaimer: This code isn't merged yet and obviously we're pre-public-beta here, so this API is potentially subject to change still.
Thank you Paul! What's in an EventPayload if you don't mind me asking?
EventPayload
is just a wrapper class that contains an arbitrary Object and optional 'metadata' in the form of a Map<String, Object>
. Pretty maximally flexible.