The following Coral Actors for stream transformations are defined:
name | class | description |
---|---|---|
cassandra |
CassandraActor | connect to a Cassandra datasource |
fsm |
FsmActor | select a state according to key |
generator |
GeneratorActor | generate data based on a JSON template and distribution definitions |
group |
GroupByActor | partition the stream |
httpbroadcast |
HttpBroadcastActor | pass (HTTP supplied) JSON to other actors |
httpclient |
HttpClientActor | post to a service URL |
json |
JsonActor | transform an input JSON |
kafka-consumer |
KafkaConsumerActor | reads data from Kafka |
kafka-producer |
KafkaProducerActor | writes data to Kafka |
log |
LogActor | logs data to a file |
linearregression |
LinearRegressionActor | performs prediction on streaming data |
lookup |
LookupActor | find data for a key value |
sample |
SampleActor | emits only a fraction of the supplied trigger JSON |
stats |
StatsActor | accumulate some basic statistics |
threshold |
ThresholdActor | emit only when a specified field value exceeds a threshold |
window |
WindowActor | collect input objects and emit only when reaching a certain number or a certain time |
zscore |
ZscoreActor | determine if a value is an outlier according to the Z-score statistic |
The JSON to create a Coral Actor conforms to JSON API. The attributes in the JSON to create a Coral Actor contain the following fields:
field | type | required | description |
---|---|---|---|
type |
string | yes | the name of the actor |
params |
json | yes | depends on the actor |
timeout |
json | no | timeout JSON |
group |
json | no | group by JSON |
The timeout JSON is defined as follows:
field | type | required | description |
---|---|---|---|
mode |
string | yes | “continue” for continuing timer events; “exit” for act-once timer behavior |
duration |
number | yes | the duration of the timer in seconds |
The timeout definition will determine how the timer function of an actor (if defined) is triggered.
The group by JSON is defined with a single field:
field | type | required | description |
---|---|---|---|
by |
string | yes | the value is the name of a field in the stream JSON |
Having a group by field defined will invoke the (GroupByActor)[/coral/docs/Actors-groupActor] to partition the stream and control the creation of underlying actors for each partition.
Create the statistics actor with a timer action every 60 seconds.
Group the statistics by the value of the trigger-field tag
.
{
"data": {
"type": "actors",
"attributes": {
"type":"stats",
"timeout": {
"duration": 60,
"mode": "continue"
},
"params": {
"field": "amount"
},
"group": {
"by": "tag"
}
}
}
}
Note: the data
, attributes
and "type": "actors"
are used because of the conformation to JSON API.
After defining an actor, you can connect it to another actor by setting the trigger and/or collect.
{
"data": {
"type": "actors",
"id": "1",
"attributes": {
"input": {
"trigger": {
"in": {
"type": "actor",
"source": 2
}
}
}
}
}
}
When the provided Coral actors or a combination of them don’t provide the functionality you need, you can write your own Coral actor and use it. To use your own Coral actor, you need to create a factory that extends the ActorPropFactory
trait.
See for an example the DefaultActorPropFactory
, which contains all the provided Coral actors.
Next, you create a jar with your Coral actor(s) and your factory and add this jar to the classpath. To configure the Coral platform to use your factory, add the following to the application.conf:
injections {
actorPropFactories = ["mypackage.MyFactory"]
}
You can define more than one factory. When looking for a Coral actor by name, the search order is the order in which the factories are given. Before searching in user defined factories, the default factory with the provided Coral actors is searched for a given Coral actor name, so it is not possible to redefine one of the provided actors.