Ruminations of J.net idle rants and ramblings of a code monkey

Cool StreamInsight query–Point input to Edge Output

StreamInsight | Code Sample

It’s easy to get individual events as points in StreamInsight. We can then take those events and write them directly out to our output adapter. There’s nothing to this. But … and this is a common scenario … what do we do if the values don’t change very often? Do we really want to write an hours worth of data that hasn’t changed, especially if we are reading the value every second or even several times a second? Probably not. And in many cases, we don’t need to. In many cases, in fact, we’re only interested in saving the values when they have changed. Of course, this can be done in StreamInsight through a little bit of query majick (or I wouldn’t be writing this blog entry, now would I?). So … what we are going to do here is to take a point stream, evaluate it, select only the items that have changed since the previous value and then send these to output as edge events. Each edge will have a start and end when the value changes. Simple enough, right?

As with the previous example, this was done using LinqPad. In this case, I borrowed (or stole, as the case may be) a Linq macro from one of the samples – the FoldPairs macro. I also borrowed/stole the ToSignal extension and turned that into a Linq macro from the StreamInsight blog. Both of these are going to be darn handy in this example.

public static CepStream<TResult> FoldPairs<TStream, TResult>(
CepStream<TStream> input,
Expression<Func<TStream, TStream, bool>> predicate,
TimeSpan timeout,
Expression<Func<TStream, TStream, TResult>> resultSelector)
{
var signal = input
.AlterEventDuration(e => timeout)
.ClipEventDuration(input, (f, s) => predicate.Compile()(f, s));return from l in signal.ShiftEventTime(e => TimeSpan.FromTicks(1))
from r in inputwhere predicate.Compile()(l, r)
select resultSelector.Compile()(l, r);
}
public static CepStream<T> ToSignal<T, K>(CepStream<T> inputstream, Expression<Func<T, K>> keySelector)
{
return inputstream
.AlterEventDuration(e => TimeSpan.MaxValue)
.ClipEventDuration(inputstream, (e1, e2) => (keySelector.Compile()(e1)).Equals(keySelector.Compile()(e2)));
}

And now for our data. I’ve added comments to the data to show where we have value changes. We’ll take this and convert it to a point stream for evaluation.

Func<int, int, DateTimeOffset> t = (h, m) => new DateTimeOffset(2011, 1, 25, 0, 0, 0, TimeSpan.Zero).AddHours(h).AddMinutes(m);var sourceData = new []
{
//Initial event @ 4:12
new { SourceId = "A", Value = 22, TimeStamp = t(4, 12) },
new { SourceId = "A", Value = 22, TimeStamp = t(4, 13) },
new { SourceId = "A", Value = 22, TimeStamp = t(4, 14) },
//A: New event @ 4:15new { SourceId = "A", Value = 67, TimeStamp = t(4, 15) },
//A: New event @ 4:16new { SourceId = "A", Value = 54, TimeStamp = t(4, 16) },
new { SourceId = "A", Value = 54, TimeStamp = t(4, 17) },
new { SourceId = "A", Value = 54, TimeStamp = t(4, 18) },
new { SourceId = "A", Value = 54, TimeStamp = t(4, 19) },
new { SourceId = "A", Value = 54, TimeStamp = t(4, 20) },
new { SourceId = "A", Value = 54, TimeStamp = t(4, 21) },
//A: New event @ 4:22
new { SourceId = "A", Value = 87, TimeStamp = t(4, 22) },
//B: Initial Event @ 4:12
new { SourceId = "B", Value = 24, TimeStamp = t(4, 12) },
new { SourceId = "B", Value = 24, TimeStamp = t(4, 13) },
//B: New Event @ 4:14new { SourceId = "B", Value = 31, TimeStamp = t(4, 14) },
new { SourceId = "B", Value = 31, TimeStamp = t(4, 15) },
new { SourceId = "B", Value = 31, TimeStamp = t(4, 16) },
new { SourceId = "B", Value = 31, TimeStamp = t(4, 17) },
new { SourceId = "B", Value = 31, TimeStamp = t(4, 18) },
//B: New Event @ 4:19new { SourceId = "B", Value = 50, TimeStamp = t(4, 19) },
new { SourceId = "B", Value = 50, TimeStamp = t(4, 20) },
new { SourceId = "B", Value = 50, TimeStamp = t(4, 21) },
new { SourceId = "B", Value = 50, TimeStamp = t(4, 22) }
};var source = sourceData.OrderBy(e => e.TimeStamp).ToPointStream(
Application,
ev => PointEvent.CreateInsert(ev.TimeStamp.ToLocalTime(), new { ev.SourceId, ev.Value }),
AdvanceTimeSettings.IncreasingStartTime);

So far, pretty straightforward and nothing special. Next we need to calculate the change between two consecutive values. We do this using the borrowed FoldPairs macro. This will provide us with an anonymous type with our item identifier (SourceId) and the difference (delta) between two consecutive values.

var delta = FoldPairs(source,
(a, b) => a.SourceId == b.SourceId,
TimeSpan.MaxValue,
(a, b) => new { a.SourceId, diff = b.Value - a.Value });

Now that we have the deltas, it’s easy enough to join this back to the original source query, selecting only those source items where the delta is not equal to 0.

var changesOnly = from r in delta join s in sourceon r.SourceId equals s.SourceIdwhere r.diff != 0select s; 

So far so good. If you run what we have so far and write this to the output, you’ll see that you only get those point events that change since the previous value. But … you’ll also see that we’re missing something – our very first, initial event. That’s because no delta is calculated for this event as it has no previous event. Well, if we are going to be a real application, we can’t have the first one disappearing on us all the time. So now we need to get the first event – the event that has nothing preceding it. To do this, we use the ToSignal macro and convert our initial source stream to a signal stream … they become an interval for each individual reading (whether changed or not). We then shift the event time to create an overlap between one event and the next one. Where we don’t have an overlap, we have the very first point event – a left anti semi-join. We can then take this an union it with the stream of changed events, providing a stream with the changed events AND our first event.

var initialEvent = from s in sourcewhere (
from s2 inToSignal(source, e=> e.SourceId)
.ShiftEventTime(e => TimeSpan.FromTicks(1))
where s.SourceId == s2.SourceIdselect s2
).IsEmpty()
select s;var final = changesOnly.Union(initialEvent); 

This, in itself, is actually useful … you may not want to take the next step and convert them into edge events as this query will give you a point event for each value change. But … we’re not quite done with the scenario that we want to accomplish. To create edge events with a start and end time that represent value changes, we simply us the ToSignal() linq macro on our final point stream. If you want these as intervals, you’ll get, in effect, the same data except that you won’t “see” the interval until the end time. If they are edge events, you’ll get a start as soon as the value changes and the end before the next change.

var signalEdges = ToSignal(final, e=> e.SourceId);
And … it really is just that simple … it also helps that the Linq macros really reduce the amount of linq query statements that we have to write. You could, if you wanted, also add dead zones … where you take this up a notch and only produce an event when the value changes by a certain amount or percentage. But I’ll leave that as an exercise for the ready. Can’t take all the fun out of it, can I?

 

Comments are closed