Ruminations of J.net idle rants and ramblings of a code monkey

How long did that edge event take–Reprise

StreamInsight | Code Sample

In my last post, I talked about how to get the duration of an edge event using subjects. The only challenge with that is you don’t get the result until the end of the edge. While this does allow you to do very interesting things … for example, determine which events took longer more than 2 standard deviations greater than the average time … it doesn’t give you notification as soon as the item starts to take too long. That’s because it can’t calculate the duration until the end of the edge. But it’s also useful to get the notification as soon as an edge goes over a specific event duration. This is actually easier to do than I initially thought; I must have been having a brain-fart moment. (They’re rare but they do happen). So, this morning, I quickly whipped up a sample in LinqPad that does exactly this … as soon as an edge event goes over a pre-configured duration, you get notification. Now, while you can’t do anything really cool like dynamically determining the timeout from the event payload, that’s not always necessary. So it’s another option, just with different limitations and uses.

LinqPad Sample
void Main()
{
DateTimeOffset startTime = new DateTimeOffset(2013, 7,1,12,0,0,TimeSpan.FromHours(-6));
SourceItem[] items = new SourceItem[]{
new SourceItem("Item1", 5, 0),
new SourceItem("Item2", 6, 0),
new SourceItem("Item3", 8, 2),
new SourceItem("Item1", 5, 0, 3),
new SourceItem("Item4", 2, 3),
new SourceItem("Item2", 6, 0, 4),
new SourceItem("Item3", 8, 2, 8),
new SourceItem("Item4", 2, 3, 10),

};

var source = items.ToEdgeStream(Application, e => e.GetEvent(startTime), AdvanceTimeSettings.IncreasingStartTime);

//Shift the start time of the events to our timeout value.
//In this case, our timeout is 5 minutes; any event that
//has a duration longer than 5 minutes should be output to the sink.
var shifted = source.ShiftEventTime(e => TimeSpan.FromMinutes(5));

//Join the shifted items back to the source.
//The join will be applied only when the shifted time
//overlaps the original edge event.
//If the edge event has an end before the shifted event
//it won't be output to the sink as the join won't be applied.
var tooLong = from src in source
from shft in shifted
where src.Id == shft.Id
select shft;

//Output as the different event shapes.
//Note the relationship between the event times and the CTIs.
//Point is a good option.
tooLong.ToPointEnumerable().Dump("Timed Out Items - Point");
//Interval isn't a good option. Doesn't get output until the end.
tooLong.ToIntervalEnumerable().Dump("Timed Out Items - Interval");
//Edge is also a good option. You get the start and the end.
tooLong.ToEdgeEnumerable().Dump("Timed Out Items - Edge");

}




// Define other methods and classes here
class SourceItem:DataItem
{
public SourceItem(string id, int value, int startTimeOffset){
Id = id;
Value = value;
StartTimeOffset = startTimeOffset;
EndTimeOffset = null;
}
public SourceItem(string id, int value, int startTimeOffset, int endTimeOffset){
Id = id;
Value = value;
StartTimeOffset = startTimeOffset;
EndTimeOffset = endTimeOffset;
}
public EdgeEvent<DataItem> GetEvent(DateTimeOffset startTime){
if(!EndTimeOffset.HasValue){
return EdgeEvent<DataItem>.CreateStart(startTime.AddMinutes(StartTimeOffset),
this);
}
return EdgeEvent<DataItem>.CreateEnd(startTime.AddMinutes(StartTimeOffset),
startTime.AddMinutes(EndTimeOffset.Value),
this);
}
public int StartTimeOffset;
public int? EndTimeOffset;
}
class DataItem{
public string Id;
public int Value;
}

When looking at the output, note where the events sit between the CTIs … that shows, without question, that the event is released as soon as the application time moves past the relevant timeout. You do need to use Point events for this; if you use an Interval, the final event won’t be output until the end is reached, which would defeat the purpose of what we’re trying to do. You could use Edges as output; in this case, you’d get a start as soon as it “takes too long” and then an end when the end edge for the original item in enqueued into the stream. With this, you could then feed it into the subject from my previous post and determine how much longer it took than the timeout, should that be interesting to you.