Ruminations of J.net idle rants and ramblings of a code monkey

StreamInsight User Defined Aggregate: Standard Deviation

.NET Stuff | StreamInsight | Code Sample

Here’s a quick UDA to do standard deviation of a window. I found it interesting that I had to take the IEnumerable<double> source and call ToArray(). If I didn’t, it would throw a NullReferenceException, although why is something of a mystery. It would be nice if I could pass in the Average from the query since that’s already calculated by the StreamInsight engine but no dice.

Note: I’ve not done any performance testing … it was copied from MSDN. Use at your own risk …

/// <summary>
/// Static class with UDA extensions for standard deviation/// </summary>public static class StandardDeviation{
/// <summary>
/// Extension method for the UDA./// </summary>
/// <typeparam name="T">Payload type of the stream.</typeparam>
/// <param name="window">Window to be passed to the UDA</param>
/// <param name="map">Mapping from the payload to a float field in the payload.</param>
/// <returns>Aggregation result.</returns>[CepUserDefinedAggregate(typeof(StdDevDouble))]
public static double StdDev<T>(this CepWindow<T> window, Expression<Func<T, double>> map)
{
throw CepUtility.DoNotCall();
}
}
/// <summary>
/// A UDA to calculate the standard deviation of a window/// </summary>public class StdDevDouble : CepAggregate<double, double>
{
/// <summary>
/// Computes the aggregation over a window./// </summary>
/// <param name="source">Set of events contained in the window.</param>
/// <returns>Aggregation result.</returns>public override double GenerateOutput(IEnumerable<double> source)
{
double[] values = source.ToArray();double mean = values.AsParallel().Average();double standardDev = values.AsParallel().Aggregate(
0.0,
// do this on each thread(subtotal, item) => subtotal + Math.Pow((item - mean), 2),
// aggregate results after all threads are done.(total, thisThread) => total + thisThread,
// perform standard deviation calc on the aggregated result.(finalSum) => Math.Sqrt((finalSum / (source.Count() - 1)))
);return standardDev;
}
}

Using it in a query is simple – just make sure that you add a using statement for the namespace containing the aggregate’s definition.

var stdDeviation = from e in sourceQueryStreamgroup e by e.ItemId into eachGroupfrom window in eachGroup.HoppingWindow(
TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(1), HoppingWindowOutputPolicy.ClipToWindowEnd)
select new {
ItemId = eachGroup.Key,
StdDev = window.StdDev(e => e.Value), 
Avg = window.Avg(e => e.Value), 
Min = window.Avg(e => e.Value), 
Max = window.Avg(e => e.Value), 
Count = window.Count()
};