StreamDM Programming Guide
Everything in StreamDM is designed around tasks, which describe the flow of a streaming data mining or machine learning algorithm. In a nutshell, streaming data is read and parsed into the StreamDM internal representation, passed through learners, evaluated and then output to various places, such as console, files, or as streams for other tasks.
Basic Data Structure
StreamDM processes multi-dimensional vectors, called instances, which are
implemented as classes derived from a general class Instance
. Depending on the
implementations of the underlying data structures, and Instance
can be:
Instance Type | Data Structure | Format |
---|---|---|
DenseInstance |
array of Double |
val1,val2,...,valn |
SparseInstance |
two arrays: one for indexes, one for values | idx1:val1,idx2:val2,... |
TextInstance |
map of key-value tuples; allows non-numeric keys | key1:val1,key2:val2,... |
NullInstance |
N/A | N/A |
Instance
provides useful operations for use in a Learner
, such as the
dot
and distanceTo
operations, but also map
and reduce
operations. Note
that an Instance
is always immutable; every operation returns a new Instance
with the modifications. For full details, please refer to the Instance
specification in the API documentation.
The input/output data structure which is sent via DStream
is the Example
.
This data structure wraps input and output instances, along with a number
representing its weight. The class signature is specified as:
class Example(inInstance: Instance, outInstance: Instance = new NullInstance,
weightValue: Double=1.0)
Its format is the following (note that weight and the output instance are optional):
<input_instance> [<output_instance>] [<weight>]
The Example.parse
method allows StreamDM to create objects of type Example
from text lines in the stream.
By default, every value in each instance is a Double. In cases where the values
have different types of values (for example, discrete integers) a helper data
structure ExampleSpecification
is used.
An ExampleSpecification
is used in special cases: classifiers such as decision
trees and Naive Bayes, which need to know the type of each feature in the
instance. In the current implementation, we support two types of features,
numeric and discrete. Numeric values are the default, and discrete features are
strings (such as colors Red, Green, Blue, etc.) which are internally
represented as doubles, for space efficiency and compatibility with the
Instance
classes.
This information is stored in an ExampleSpecification
object, which, similarly
to Example
, contains two InstanceSpecification
objects for the input and
output instances. Each InstanceSpecification
stores the name of the features,
and, for the discrete features, their original string descriptions in an
associated FeatureSpecification
object.
Task Building Blocks
A Task
is a sequential algorithm which is tasked with connecting to a
StreamingContext
and with processing sequential operations. This is performed
by using some building blocks, which are generally classes derived from the
following base classes:
Base Class | Purpose |
---|---|
StreamReader |
read and parse Example and create a stream |
Learner |
provides the train method from an input stream |
Model |
data structure and set of methods used for Learner |
Evaluator |
evaluation of predictions |
StreamWriter |
output of streams |
The base classes above can also be extended for more specific use cases. For
instance, Learner
is currently extended to Classifier
, which provides a
predict
method, and a Clusterer
, which provides an assign
method.
A Task
will also contain a set of options, which use the
JavaCLIParser library. These options
allow to specify what types of learners, evaluators, writers, and readers are to
be used, at runtime, without the need to re-compile the task.
An illustration on how a Task
can be programmed by using a combination of the
above building block is the EvaluatePrequential
example task, present in the
source code of StreamDM.
Case Study: EvaluatePrequential
Consider the (artificial) stream binary classification scenario, where
the stream consists of a stream of single label instances. The objective is to
first predict the instance using the current linear model, and then train (and
update) the model using the true label. The evaluation will be based on the
difference between the predicted label and the true label. The code listing of
the resulting EvaluatePrequential
is:
class EvaluatePrequential extends Task {
//Task options
val learnerOption:ClassOption = new ClassOption("learner", 'l',
"Learner to use", classOf[Classifier], "SGDLearner")
val evaluatorOption:ClassOption = new ClassOption("evaluator", 'e',
"Evaluator to use", classOf[Evaluator], "BasicClassificationEvaluator")
val streamReaderOption:ClassOption = new ClassOption("streamReader", 's',
"Stream reader to use", classOf[StreamReader], "SocketTextStreamReader")
val resultsWriterOption:ClassOption = new ClassOption("resultsWriter", 'w',
"Stream writer to use", classOf[StreamWriter], "PrintStreamWriter")
//Run the task
def run(ssc:StreamingContext): Unit = {
//Parse options and init
val reader:StreamReader = this.streamReaderOption.getValue()
val learner:SGDLearner = this.learnerOption.getValue()
learner.init(reader.getExampleSpecification())
val evaluator:Evaluator = this.evaluatorOption.getValue()
val writer:StreamWriter = this.resultsWriterOption.getValue()
//Parse stream and get Examples
val instances = reader.getExamples(ssc)
//Predict
val predPairs = learner.predict(instances)
//Train
learner.train(instances)
//Evaluate and output
writer.output(evaluator.addResult(predPairs))
}
}
First, EvaluatePrequential
is created by extending Task
and implementing its
run
method. run
takes a StreamingContext
as an argument, and its
objective is to process the streams in this context.
The first step is the processing of the options:
val reader:StreamReader = this.streamReaderOption.getValue()
val learner:SGDLearner = this.learnerOption.getValue()
learner.init(reader.getExampleSpecification())
val evaluator:Evaluator = this.evaluatorOption.getValue()
val writer:StreamWriter = this.resultsWriterOption.getValue()
Options specify the classes used for each of the components of a task; in this
case, the type of stream reader, the learner, the evaluator, and the stream
output. In addition, each class used, e.g., SGDLearner
, can also have options,
such as the parameters needed for the algorithms. Then, for example, an
EvaluatePrequential
parsing sparse instances and using SGD with a learning
rate of 0.001 and using hinge loss will use the command line options:
EvaluatePrequential -s (SocketTextStreamReader -t sparse) -l
(SGDLearner -l 0.001 )
Then, the instances get parsed by the reader:
val instances = reader.getExamples(ssc)
After the parsing, the evaluate first then train cycle is performed. In this
case, our learner is restricted to a Classifier
so that the method predict
is available:
//Predict
val predPairs = learner.predict(instances)
//Train
learner.train(instances)
Finally, the results are output. Here, the evaluator output is combined with the final output:
writer.output(evaluator.addResult(predPairs))
Extending StreamDM
StreamDM is designed to be easily extensible. Its purpose is to allow both users to run it, but also developers of real-world machine learning workflows to easily program task which are more complicated or even contain multiple layers of learning and evaluation, and researchers to easily include new learner algorithms in Spark Streaming.
Adding Tasks
To define a new task, we have to extend Task
and implement its run
method.
We illustrate on an example of a task which writes a string to the console:
class HelloWorldTask extends Task {
val textOption:StringOption = new StringOption("text", 't',
"Text to print", "Hello, World!")
def run(ssc:StreamingContext): Unit = {
print (textOption.getValue)
}
To specify the text to print, we can use a StringOption
, and then pass it from
the command line:
./spark "HelloWorldTask -t Bye"
As a general note, tasks like EvaluatePrequential
allow to test any learner
which inherits Classifier
- there is no need to create a task for each
classifier implemented and tested. In general, tasks should be designed so that
they allow as many options as possible at runtime without the need to compile.
Adding Learners
To add a new learner, we only need to implement the Learner
trait and
implement its associated methods: init
for initializing the Model
inside the
learner, and train
for updating the model with the data from the stream. If
the requirements for the learner are more specific, specialized traits need to
be implemented instead. For example, the Classifier
trait also contains a
predict
methods which applies the model to a stream:
trait Classifier extends Learner with Serializable {
/* Predict the label of the Example stream, given the current Model
*
* @param instance the input Example stream
* @return a stream of tuples containing the original instance and the
* predicted value
*/
def predict(input: DStream[Example]): DStream[(Example, Double)]
}
trait Learner extends Configurable with Serializable {
type T <: Model
/**
* Init the model based on the algorithm implemented in the learner.
*
* @param exampleSpecification the ExampleSpecification of the input stream.
*/
def init(exampleSpecification: ExampleSpecification): Unit
/**
* Train the model based on the algorithm implemented in the learner,
* from the stream of Examples given for training.
*
* @param input a stream of Examples
*/
def train(input: DStream[Example]): Unit
/**
* Gets the current Model used for the Learner.
*
* @return the Model object used for training
*/
def getModel: T
}