Writing extensions for Wisdom is super easy if you know Java. An extension can be a Window
, Source
, Sink
, or Mapper
. In this section, I explain how to create a new sink to write events to a text file. All existing windows, sources, sinks, and mappers are written as extensions following the same technique.
Create New Sink Extension
Step 1: Create a new Maven project in your favorite IDE.
Step 2: Open pom.xml
file and add wisdom-core
dependency as show below:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.javahelps</groupId>
<artifactId>wisdom-java-api</artifactId>
<version>1.0-SNAPSHOT</version>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>11</source>
<target>11</target>
</configuration>
</plugin>
</plugins>
</build>
<properties>
<wisdom.version>0.0.1</wisdom.version>
</properties>
<dependencies>
<dependency>
<groupId>com.javahelps.wisdom</groupId>
<artifactId>wisdom-core</artifactId>
<version>${wisdom.version}</version>
</dependency>
</dependencies>
</project>
Step 3: Create a new class com.javahelps.wisdom.extensions.file.sink.TextFileSink
and extend com.javahelps.wisdom.core.stream.output.Sink
. Depending on your extension type, you may need to override different classes:
- Window -
com.javahelps.wisdom.core.window.Window
- Source -
com.javahelps.wisdom.core.stream.input.Source
- Sink -
com.javahelps.wisdom.core.stream.output.Sink
- Mapper -
com.javahelps.wisdom.core.map.Mapper
package com.javahelps.wisdom.extensions.file.sink;
import com.javahelps.wisdom.core.stream.output.Sink;
public class TextFileSink extends Sink {
}
Step 4: Annotate the class using WisdomExtension
annotation and define namespace as file.text
which will be used to identify this sink later in Wisdom query. This step is common for all Wisdom extensions.
package com.javahelps.wisdom.extensions.file.sink;
import com.javahelps.wisdom.core.extension.WisdomExtension;
import com.javahelps.wisdom.core.stream.output.Sink;
@WisdomExtension("file.text")
public class TextFileSink extends Sink {
}
Step 5: Override all required methods. For text file sink, overriding publish
method is enough. You may need start
, init
and stop
if your sink is complex as Kafka sink.
package com.javahelps.wisdom.extensions.file.sink;
import com.javahelps.wisdom.core.WisdomApp;
import com.javahelps.wisdom.core.event.Event;
import com.javahelps.wisdom.core.exception.WisdomAppValidationException;
import com.javahelps.wisdom.core.extension.WisdomExtension;
import com.javahelps.wisdom.core.stream.output.Sink;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.List;
import java.util.Map;
@WisdomExtension("file.text")
public class TextFileSink extends Sink {
private final String path;
public TextFileSink(Map<String, ?> properties) {
super(properties);
this.path = (String) properties.get("path");
if (this.path == null) {
throw new WisdomAppValidationException("Required property 'path' for TextFile sink not found");
}
}
@Override
public void start() {
// Do nothing
}
@Override
public void init(WisdomApp wisdomApp, String streamId) {
}
@Override
public void publish(List<Event> events) throws IOException {
try (PrintWriter writer = new PrintWriter(new FileWriter(this.path, true))) {
for (Event event : events) {
writer.println(event);
}
}
}
@Override
public void stop() {
// Do nothing
}
}
Now you are ready to use this sink in your Wisdom app.
Use Extension in Java API
Step 1: Create a new test class in the same project com.javahelps.wisdom.extensions.file.sink.TestTextFileSink
.
Step 2: Create a static initialization block and import the custom extension.
package com.javahelps.wisdom.extensions.file.sink;
import com.javahelps.wisdom.core.extension.ImportsManager;
public class TestTextFileSink {
static {
ImportsManager.INSTANCE.use(TextFileSink.class);
}
}
We use ImportsManager
to import selected extension, instead of searching the complete classpath to avoid unnecessary delays. It also reduces unnecessary complexities in Android applications.
Step 3: Create a new Wisdom app using the file.text
sink. Note that we are using the namespace file.text
to create this sink.
WisdomApp wisdomApp = new WisdomApp();
wisdomApp.defineStream("StockStream");
wisdomApp.defineStream("OutputStream");
wisdomApp.defineQuery("query1")
.from("StockStream")
.select("symbol", "price")
.insertInto("OutputStream");
wisdomApp.addSink("OutputStream", Sink.create("file.text", Map.of("path", "output.log")));
wisdomApp.start();
InputHandler stockStreamInputHandler = wisdomApp.getInputHandler("StockStream");
stockStreamInputHandler.send(EventGenerator.generate("symbol", "IBM", "price", 50.0, "volume", 10));
stockStreamInputHandler.send(EventGenerator.generate("symbol", "WSO2", "price", 60.0, "volume", 15));
Use Extension in Wisdom Query
Above sink can be used in a Wisdom query as given below:
def stream StockStream;
@sink(type='file.text', path='output.log')
def stream OutputStream;
from StockStream
select symbol, price
insert into OutputStream;JAR
Deploy in Wisdom Server
Step 1: Build the jar file containing com.javahelps.wisdom.extensions.file.sink.TextFileSink
.
mvn clean package
Step 2: Copy and paste the target/xxx.jar
file into WISDOM_HOME/lib
directory.
Step 3: Restart running Wisdom services.