Ruminations of J.net idle rants and ramblings of a code monkey

Cool StreamInsight query–turning alarms into Edge events

Code Sample | StreamInsight

One of the use cases for StreamInsight is to detect certain alarm conditions on streaming real time data. This, in itself, isn’t all that special … many of the process control systems out there already do something like that. What makes StreamInsight different is that it can be a lot smarter about determining these alarm conditions. For example … one of the things that apparently happened (as I understand it) with Macondo (aka Deepwater Horizon) is that the PCS alarms went off constantly … they would trigger when 1 value was out of range every time it was out of range. So … there were so many false alerts that they simply turned the system off. It really isn’t all that unreasonable … many of these sensors will temporarily go out of range and it’s not indicative of a problem. In fact, it could just be a random bad reading or a transient condition that isn’t really a cause for alarm at all. However, if you start to have multiple values from the same sensor/device out of range within a particular time period, then you may really have an issue. You also don’t want to issue an alarm with every bad reading … but, instead, issue one at start and at finish. Because, with an alarm, you have a definite start of the event … but at the start, you have no idea when it will end. You also have a definite end to the event … but you know that only when things come back to normal. It’s a perfect fit for an edge event.

Part of this can be found in the StreamInsight samples for LinqPad – a most righteously awesome tool that every StreamInsight query developer should have and use. It’s in the “Alarm Floods and Transients” sample under 101 StreamInsight Queries, section 2_QueryPatterns. I will start with that and describe the queries step-by-step.

First … our scenario. We have a data stream with events that have a “Status” field. If this field is a “0”, it’s good, if “1” it’s bad. Now … in the real world, you’d get to this value somehow … through previous queries that do analytics or perhaps even from your data source. For our purposes, that is irrelevant. We’re interested in what we do with it. Now, in our case, we can actually get false alerts (of course) so we want to trigger an alarm only when we get multiple alerts within a specific time frame. We then want the alarm expressed as an edge event. Finally, if we have an alarm that crosses a specific amount of time, we want to repeat the alarm. We’ll have several steps to do this. First, here’s our source data; I added a little to what was there. (Notes: These are all set up to run in LinqPad. Also, we don’t differentiate the events by any sort of ID … you’ll need to do this in a real app).

var sourceData = new []
{
new { Status = 0, TimeStamp = DateTime.Parse("10/23/2009 4:10:00 PM") },
new { Status = 0, TimeStamp = DateTime.Parse("10/23/2009 4:11:00 PM") },
//False alert @ 4:12
new { Status = 1, TimeStamp = DateTime.Parse("10/23/2009 4:12:00 PM") },
new { Status = 0, TimeStamp = DateTime.Parse("10/23/2009 4:13:00 PM") },
new { Status = 0, TimeStamp = DateTime.Parse("10/23/2009 4:14:00 PM") },
//Real alert @ 4:15new { Status = 1, TimeStamp = DateTime.Parse("10/23/2009 4:15:00 PM") },
new { Status = 1, TimeStamp = DateTime.Parse("10/23/2009 4:16:00 PM") },
new { Status = 1, TimeStamp = DateTime.Parse("10/23/2009 4:18:00 PM") },
new { Status = 0, TimeStamp = DateTime.Parse("10/23/2009 4:19:00 PM") },
new { Status = 0, TimeStamp = DateTime.Parse("10/23/2009 4:20:00 PM") },
//Real alert @ 4:21 - Longer alert that repeatsnew { Status = 1, TimeStamp = DateTime.Parse("10/23/2009 4:21:00 PM") },
new { Status = 1, TimeStamp = DateTime.Parse("10/23/2009 4:22:00 PM") },
new { Status = 1, TimeStamp = DateTime.Parse("10/23/2009 4:23:00 PM") },
new { Status = 1, TimeStamp = DateTime.Parse("10/23/2009 4:24:00 PM") },
new { Status = 1, TimeStamp = DateTime.Parse("10/23/2009 4:25:00 PM") },
new { Status = 1, TimeStamp = DateTime.Parse("10/23/2009 4:26:00 PM") },
new { Status = 1, TimeStamp = DateTime.Parse("10/23/2009 4:27:00 PM") },
new { Status = 0, TimeStamp = DateTime.Parse("10/23/2009 4:28:00 PM") },
new { Status = 0, TimeStamp = DateTime.Parse("10/23/2009 4:29:00 PM") },
//False alert @ 4:30new { Status = 1, TimeStamp = DateTime.Parse("10/23/2009 4:30:00 PM") },
new { Status = 0, TimeStamp = DateTime.Parse("10/23/2009 4:31:00 PM") },
new { Status = 0, TimeStamp = DateTime.Parse("10/23/2009 4:32:00 PM") },
new { Status = 0, TimeStamp = DateTime.Parse("10/23/2009 4:33:00 PM") },
};var source = sourceData.ToPointStream(Application, ev => 
PointEvent.CreateInsert(ev.TimeStamp.ToLocalTime(), ev),
AdvanceTimeSettings.StrictlyIncreasingStartTime);var timeout = TimeSpan.FromMinutes(2);var alarmTimeout = TimeSpan.FromMinutes(5); 

Now that we have our source data and variables, let’s create two streams, one with valid items and one with all alerts:

var validData = source.Where(e => e.Status == 0);var alarmEvents = source.Where(e => e.Status == 1);

Next, we need to remove the false alerts … alerts that aren’t followed by another alert. We do this by taking the valid items, moving the start time back by the timeout and then extending the event duration by the timeout. If there is a successful join then there is a “good” event within the timeout. In that case, we filter the alarm out using a Left Anti-Semi Join.

// take all alarm events that are not followed by a non-alarm event
// within the timeoutvar nonTransientAlarms = from alarm in alarmEventswhere (from nextevent in source
.AlterEventLifetime(
e => e.StartTime.Subtract(timeout),
e => timeout)
where nextevent.Status == 0select nextevent).IsEmpty()
select alarm;//Show the Non-transient alarms(from p in nonTransientAlarms.ToIntervalEnumerable()
where p.EventKind == EventKind.Insertselect p).Dump("Non-transient alarms");

The output shows all of the alarm events that do not have a “good” event within the timeout. You’ll notice that there is a flood of events … we’ll need to filter this out so that we have the initial alarm event.

// Expand all alarm events to the timeout and count over snapshotsvar counts = from win in nonTransientAlarms
.Where(e => e.Status == 1)
.AlterEventDuration(e => timeout2)
.SnapshotWindow(SnapshotWindowOutputPolicy.Clip)
select new { count = win.Count() };// Those snapshots with a count of 1 belong to the initial alarms.
// reduce to points and join with original stream.var initialAlarm = from c in counts
.Where(e => e.count == 1)
.ToPointEventStream()
from e in sourceselect e;

So far, we’ve not done much that’s not in the original sample. Now we go off that path and get these changed into Edges that represent the start and end of the actual alarm. We start by turning the valid data into a signal. We will also extend our initial alarms to the alarm timeout. From there, doing a LASJ again with the valid data signal will provide us with edge events that start and end with the alarm. Because we have 1 alarm that extends past the alarm timeout, this will generate 2 edge events. Together, these edge events will cover the entire alarm timeframe:

var validDataSignal = from v in validData
.AlterEventDuration(e=>TimeSpan.MaxValue)
.ClipEventDuration(initialAlarm,(e1, e2) => true)
select v;//Since with have the initial alarms, how long does the alarm last? var alarmTimeline = from i in initialAlarm.AlterEventDuration(e=> alarmTimeout)
where ( from v in validDataSignal select v).IsEmpty()
select i; 
(from p in alarmTimeline.ToEdgeEnumerable()
where p.EventKind == EventKind.Insertselect p).Dump("Alarm timeframe");

And that should do it …