Ruminations of J.net idle rants and ramblings of a code monkey

How long did that edge event take–Reprising the Reprise

Code Sample | StreamInsight

So … I just posted a little bit ago. I thought that the solution that I came up with was pretty decent … and certainly an alternative to the subjects … but I still wanted to be able to determine a dynamic timeout based on payload. There had to be a way to do that so I put that whole thought process on a background thread while I did other stuff. And when it came to me … the solution was pretty simple, actually. In fact, this is a technique very similar to the standard queries that we use to detect offline sensors but with a slightly different twist. Here’s what you do: rather than shifting the event start time (as we did before), you simply alter the duration of the source edge event. You see, when edge events are enqueued but don’t have an end, the event in the engine has an end time of DateTimeOffset.MaxValue (you can see this in the Event Flow Debugger). When the end edge is enqueued, the initial item is retracted and the new one is inserted in the stream. Now, we used this behavior in the previous example to do our join to see which events still running after their timeout expired. But we can twist this a bit … rather than shifting the event time, we can set the event duration to a specified end time. AlterEventDuration, unlike ShiftEventTime, provides access to the payload, allowing us to pass in a timeout duration as a part of the payload. We can then do a Left Anti-Semi Join between the source stream and the altered stream to see which items in the source stream are no longer in the altered stream. This then gives us the events that have exceeded their allowed duration based on the payload. So … here’s the LinqPad sample:

LinqPad Sample
void Main()
{
DateTimeOffset startTime = new DateTimeOffset(2013, 7,1,12,0,0,TimeSpan.FromHours(-6));
var items = Application.DefineEnumerable(() => new SourceItem[]{
new SourceItem("Item1", 4, 0),
new SourceItem("Item2", 6, 0),
new SourceItem("Item3", 3, 2),
new SourceItem("Item1", 4, 0, 3),
new SourceItem("Item4", 5, 3),
new SourceItem("Item2", 6, 0, 4),
new SourceItem("Item3", 3, 2, 8),
new SourceItem("Item4", 5, 3, 10),

});

var source = items.ToEdgeStreamable(e => e.GetEvent(startTime), AdvanceTimeSettings.IncreasingStartTime);

//AlterEventDuration works with edges ... it sets an end to the edge.
//With AlterEventDuration, we get the payload (unlike ShiftEventTime)
//We'll use the payload as our max allowed duration
var altered = source.AlterEventDuration(e => TimeSpan.FromMinutes(e.Payload.Value));

//Now, let's get the ones "taking too long".
//What we want are src items *with no match* in the altered
//This will give us edges that are still "live"
//but their max allowed duration has passed.
var tooLong = from src in source.LeftAntiJoin(altered, (s, a) => s.Id == a.Id)
select src;

//Output as the different event shapes.
//Note the relationship between the event times and the CTIs.
//Point is a good option.
tooLong.ToPointEnumerable().Dump("Timed Out Items - Point");
//Interval isn't a good option. Doesn't get output until the end.
tooLong.ToIntervalEnumerable().Dump("Timed Out Items - Interval");
//Edge is also a good option. You get the start and the end.
tooLong.ToEdgeEnumerable().Dump("Timed Out Items - Edge");
}

// Define other methods and classes here
class SourceItem:DataItem
{
public SourceItem(string id, int value, int startTimeOffset){
Id = id;
Value = value;
StartTimeOffset = startTimeOffset;
EndTimeOffset = null;
}
public SourceItem(string id, int value, int startTimeOffset, int endTimeOffset){
Id = id;
Value = value;
StartTimeOffset = startTimeOffset;
EndTimeOffset = endTimeOffset;
}
public EdgeEvent<DataItem> GetEvent(DateTimeOffset startTime){
if(!EndTimeOffset.HasValue){
return EdgeEvent<DataItem>.CreateStart(startTime.AddMinutes(StartTimeOffset),
this);
}
return EdgeEvent<DataItem>.CreateEnd(startTime.AddMinutes(StartTimeOffset),
startTime.AddMinutes(EndTimeOffset.Value),
this);
}
public int StartTimeOffset;
public int? EndTimeOffset;
}
class DataItem{
public string Id;
public int Value;
}

I do love it when a query comes together …