Ruminations of J.net idle rants and ramblings of a code monkey

Where is my PID for StreamInsight????

StreamInsight
This is one of the things that I’ve found a little frustrating when installing StreamInsight … entering the PID (product ID). You see, I have an MSDN subscription (every developer should!) and, therefore, I have versions of all of the Sql Server editions … including Sql Server Developer Edition, which includes StreamInsight Premium (though for development purposes only … it’s not allowed to put it in production). On some products on MSDN, you can request the product key and it’s right there on your subscription page to copy and paste/retype … but not for any of the Sql Server DVDs! The MSDN Sql Server version are “pre-PID’d” … the product id is already in them. While this makes it easier to install the full Sql Server engine, when you need to enter the product key for StreamInsight … then you have to go digging to find it. Since I’m a developer and, therefore, lazy, I put this off and usually just choose the evaluation edition … only to get kerfluffled when the eval expires. So … for your reference (and mine) … here’s where you can find your PID: Open the Sql Server DVD in Windows Explorer. Navigate to the folder for your favorite architecture (x86 or x64). It doesn’t matter which one … both PIDs are the same. Find a file called "DefaultSetup.ini”.  Open this in notepad. Right there at the top is your PID. Copy and paste into the StreamInsight setup.

Query Logic and Operator Reuse in StreamInsight (DQC)

StreamInsight | Code Sample
Way back in 2010, Mark Simms posted a blog entry called StreamInsight: Understanding dynamic query composition where he detailed how you can use dynamic query composition (DQC) to prevent multiple instantiation of your adapters. And yes, it is important … very important … for this reason but there is a more to the story here that Mark didn’t cover. Let’s start, however, with a set of requirements for our StreamInsight queries. First, we need to have a data source. For our example, we’ll be using a random number generator but that’s really irrelevant. But the data source gets it started. Next, we need to calculate a 10-second rolling average every 2 seconds. This is easy enough … that’s a hopping window. The last step is to take the results of our hopping window aggregate and calculate the difference between the averages with every hop … providing a rate-of-change for the calculated average. Again, this is a pretty common, well-known pattern that’s in the StreamInsight samples for LinqPad called “FoldPairs” (Alter/Clip/Shift). Each one of these steps needs to be a query since we need to send this to an output adapter. So let’s get started. We’ve read Mark’s article and take it to heart so we’ll use DQC with the first, source query to make sure that there is only one instance of the input adapter. var initialStream = CepStream<GeneratedEvent>.Create(cepApplication, "generatedStream", typeof(GeneratorFactory), config, EventShape.Point); Query initialQuery = CreateAndStartQuery<GeneratedEvent>( initialStream, "initialQuery", "Initial query from generator", EventShape.Point, csvConfig, cepApplication); //Taking this query and converting to a stream allows us to build on it further. //This is called Dynamic Query Composition (DQC) var sourceStream = initialQuery.ToStream<GeneratedEvent>("sourceStream"); Our next step is to create the hopping window for our aggregate, using sourceStream as the basis. var hoppingWindowStream = from s in sourceStream.AlterEventDuration(e=> TimeSpan.FromTicks(1)) group s by s.DeviceId into aggregateGroup from item in aggregateGroup.HoppingWindow( TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(2), HoppingWindowOutputPolicy.PointAlignToWindowEnd) select new AggregateItem { DeviceId = aggregateGroup.Key, Average = item.Avg(e => e.Value), Count = item.Count(), Sum = item.Sum(e => e.Value) }; //Create the query and send to output adapter. Query hoppingWindowQuery = CreateAndStartQuery<AggregateItem>(hoppingWindowStream, "hoppingWindowAggregate", "Hopping aggregate query from generator", EventShape.Point, csvConfig, cepApplication); Now we need to calculate the rate-of-change for the hopping aggregate. Since we’ve already got the hoppingWindowStream, we can just use that. var aggregateDifferenceSourceStream = hoppingWindowStream; var aggregateDifference = from current in aggregateDifferenceSourceStream from previous in aggregateDifferenceSourceStream .AlterEventDuration(e => TimeSpan.MaxValue) .ClipEventDuration(aggregateDifferenceSourceStream, (e1, e2) => e1.DeviceId == e2.DeviceId) .ShiftEventTime(e => TimeSpan.FromTicks(1)) where current.DeviceId == previous.DeviceId select new ItemDifference() { CurrentValue = current.Average, DeviceId = current.DeviceId, PreviousValue = previous.Average, Difference = current.Average - previous.Average }; Query aggregateDifferenceQuery = CreateAndStartQuery(aggregateDifference, "AggregateDifference", "Difference between two aggregate values", EventShape.Point, csvConfig, cepApplication); When we run this, we get the results that we expect. Pretty straightforward, right? Well … not exactly. Let’s take a look at the queries in the event flow debugger. First, we see the hopping window stream and the grouping and aggregating operators. It’s pretty simple and straightforward. Now, let’s take a look at the query for the rate-of-change in the aggregates. It’s a bit longer and, if you look at the area that’s highlighted in red, it has the exact same operators that we see in the aggregate query. So … the operators that did the group and aggregate are actually repeated for both queries! What’s going on here? What you need to understand is that the operator tree for a StreamInsight query isn’t built until you call ToQuery() … and the entire operator tree is built! If your tree goes all the way back to the source, you’ll get two instances of the same input adapter, as Mark described in his blog. But you’ll also get any other operators repeated. In our little sample here, it’s really not a big deal but in a large application, this can lead to some pretty substantial overhead. Using DQC, you can reduce this overhead and reuse the results of operators without having them rebuilt in the entire tree. When you get to a result that then reused with different streams, you can create a query from it, convert the query back to a stream with ToStream() and then write your additional Linq expressions. You do not need to have an output adapter with every query either … that is actually optional.  Here’s how the code for the aggregate difference query would like using DQC: var aggregateDifferenceSourceStream = hoppingWindowQuery.ToStream<AggregateItem>(); var aggregateDifference = from current in aggregateDifferenceSourceStream from previous in aggregateDifferenceSourceStream .AlterEventDuration(e => TimeSpan.MaxValue) .ClipEventDuration(aggregateDifferenceSourceStream, (e1, e2) => e1.DeviceId == e2.DeviceId) .ShiftEventTime(e => TimeSpan.FromTicks(1)) where current.DeviceId == previous.DeviceId select new ItemDifference() { CurrentValue = current.Average, DeviceId = current.DeviceId, PreviousValue = previous.Average, Difference = current.Average - previous.Average }; Query aggregateDifferenceQuery = CreateAndStartQuery(aggregateDifference, "AggregateDifference", "Difference between two aggregate values", EventShape.Point, csvConfig, Note that the only difference here (in bold) is that we use convert the hoppingWindowQuery back into a stream and use that for our source. The difference, however, in the tree of query operators is telling though. I’ve outlined in red where the aggregate query source now is imported from the published query with the results of the aggregates. The operators that are unique to this query – those that calculate the rate of change in the rolling average – are still here. The operators that are already necessary for a previous query, however, are not. This is a very simple example, to be sure, and any benefits probably wouldn’t be noticeable except at a very high volume. However, when you have more complex queries (as you would in the real world), the difference can be huge. There is, however, one pretty big limitation that you need to be aware of when using DQC … you can’t use checkpoints (high availability) when your source is from a published query (DQC). So if you need this, then you need to very carefully plan how and where you use DQC, how you get your source data and how you compose your queries.

Bug in StreamInsight 1.2 …

StreamInsight
This came up recently on the forums and I’ve been meaning to blog about it and, finally, I’m doing it. Before I get going let me just say the following things: There are a couple of ways to work around the bug. It is fixed in StreamInsight 2.0. It does not apply to StreamInsight 1.1. Only 1.2. The scenario that triggers the bug is, I feel, a pretty narrow and uncommon one. Symptoms Since this happens in the Simple StreamInsight App that I posted on MSDN Samples, that’s what we’ll use. In certain cases, a StreamInsight query will crash on the initial event. You will be able to stop and restart the (now aborted) query from the Query Debugger or any custom tools that use the StreamInsight API to restart queries and it will run fine. This only happens on the initial events. The query debugger shows that the exception for the aborted query is: Microsoft.ComplexEventProcessing.Engine.OperatorExecutionException: An exception happened when operator 'Grouping.1.1.Aggregate.1.1' was processing event, check inner exception for more details. ---> System.ArgumentOutOfRangeException: The added or subtracted value results in an un-representable DateTime. Parameter name: value    at System.DateTime.AddTicks(Int64 value)    at Microsoft.ComplexEventProcessing.Engine.DateTimeExtensions.AddNoOverflow(DateTime dateTime, TimeSpan timeSpan)    at Microsoft.ComplexEventProcessing.Engine.DateTimeExtensions.SubtractThrowIfOverflow(DateTime dateTime, TimeSpan timeSpan)    at Microsoft.ComplexEventProcessing.Engine.SynopsisManager.SingleHoppingWindowManager.PreviousWindowVs(DateTime timestamp)    at Microsoft.ComplexEventProcessing.Engine.SynopsisManager.SingleHoppingWindowManager.OverrideCtiPushBackward(DateTime ctiTimestamp)    at Microsoft.ComplexEventProcessing.Engine.ExecutionOperatorWindowBasedOrderPreservingMinus.ProcessSmartCti(EventReference& eventReference, Int64 stimulusTicks)    at Microsoft.ComplexEventProcessing.Engine.ExecutionOperatorStateful.DoProcessEvent(EventReference& eventReference, Int64 stimulusTicks)    at Microsoft.ComplexEventProcessing.Engine.QueryExecutionOperator.ProcessEvent(Int32 streamNo, EventReference& eventReference, Int64 stimulusTicks, Int64 enqueueSequenceNumber)    --- End of inner exception stack trace ---    at Microsoft.ComplexEventProcessing.Diagnostics.Exceptions.Throw(Exception exception)    at Microsoft.ComplexEventProcessing.Engine.QueryExecutionOperator.ProcessEvent(Int32 streamNo, EventReference& eventReference, Int64 stimulusTicks, Int64 enqueueSequenceNumber)    at Microsoft.ComplexEventProcessing.Engine.ExecutionOperatorStateful.ProcessEvent(Int32 streamNo, EventReference& eventReference, Int64 stimulusTicks, Int64 enqueueSequenceNumber)    at Microsoft.ComplexEventProcessing.Engine.QuerySegmentInputStrategy.DispatchEvents(SchedulingPolicy policy)    at Microsoft.ComplexEventProcessing.Engine.SchedulingPolicy.DispatchEvents()    at Microsoft.ComplexEventProcessing.Engine.DataflowTask.OnRun()    at Microsoft.ComplexEventProcessing.StreamOS.Task.Run()    at Microsoft.ComplexEventProcessing.StreamOS.Scheduler.Main() The most interesting – and, for me – perplexing is the “The added or subtracted value results in an un-representable DateTime”. Huh? How is that happening? Since this happens right at start up, we don’t have the time to connect to the query and record the events as it starts so we need to use trace.cmd (see here, about halfway down) to set up the query trace when the application starts. Once we do that and open the trace for the query, we see that the only event in the query is a CTI with a time of negative infinity … or DateTime.MinValue. Conditions This will only happen when you are using IDeclareAdvanceTimeProperties in your adapter factory or AdvanceTimeSettings when you create the stream, and specify that CTIs are created by event count, not by timespan. The delay and the AdvanceTimePolicy don’t seem to make any difference. Next, the query needs to have a HoppingWindow and no other query operators that alter the event lifetime or duration. Tumbling and count by start time windows don’t have any problems. Fixes/Workarounds There are a few ways that you can work around this. Any one of the following methods will work. Specify that CTIs are created by timespan rather than event count. Add a temporal operator into the stream before the window. For example, adding AlterEventDuration(e => TimeSpan.FromSeconds(0)) will work. var hoppingWindowStream = from s in sourceStream .AlterEventDuration(e=>TimeSpan.FromSeconds(0)) group s by s.DeviceId into aggregateGroup from item in aggregateGroup.HoppingWindow( TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(2), HoppingWindowOutputPolicy.ClipToWindowEnd) select new AggregateItem { DeviceId = aggregateGroup.Key, Average = item.Avg(e => e.Value), Count = item.Count(), Sum = item.Sum(e => e.Value) }; Enqueue a CTI from your adapter. This can have any date/time for the value, as long as it is greater than DateTimeOffset.MinValue + [WindowHopSize]. You can, for example, use new DateTimeOffset(1776, 7,4, 0,0,0,TimeSpan.FromTicks(0)) Upgrade to StreamInsight 2.0 now that it’s released.

StreamInsight Output Adapter Lifetime

Code Sample | StreamInsight
[Download Sample Code] I know it’s been a while since I’ve posted … things have been pretty busy here at the office with a major POC that we were doing. Since that’s just about wrapped up, I’m going to get back to blogging. My goal is to have a new post every week. Most, of course, will be on StreamInsight but I’ve got some other things that I’m thinking about as well. For this post, I’ll be talking about the lifetime of a StreamInsight output adapter. There is some documentation on this on MSDN but it doesn’t have all of the details … and the samples provided in the StreamInsight product team samples don’t always work the way you would want in a real production application. I will detail why that’s the case and what you can do to make your adapters more robust. Adapters … both input and output … are probably the most difficult thing to get exactly right in StreamInsight. This is due to the heavily multithreaded and asynchronous model of StreamInsight – and that’s something that developers have always struggled with to get right. Humans have a hard time really visualizing the code paths in multithreaded applications; our brains just don’t work that way. Now, back in the days when most processors were single-threaded, it wasn’t quite as difficult to do synchronization properly. In those days, we may spawn multiple threads but only one thread at a time was scheduled to run on the CPU … they weren’t truly concurrent. However, first with hyperthreading and now with multi-core processors (not to mention hyper-threaded multicore processors), multiple threads may very well be executing at the exact same instant. Of course, the exact timing is non-deterministic, which makes it that much more challenging to debug. They are Heisenbugs. There are two major symptoms that you have your adapter lifetime wrong. You’ll either get random “ObjectDisposedExceptions” when shutting down or shutdown of StreamInsight will hang for a while. We will talk about why this happens. We’ll also be using the sample Sql Output Adapter from the product team samples since it does show some less-than-friendly shutdown behavior. By the way, the testing was done with the Sql Server output adapter from the product team samples on CodePlex but using the Simple StreamInsight app sample on MSDN code samples. First, the easy stuff … Startup Your output adapter is started when the query that it is bound to is started. In fact, your adapter is created when the query is started … not when ToQuery() is called. Also, keep in mind that the call to Query::Start is asynchronous … it occurs on a separate thread from Query::Start(). Still, it is very important that you limit your constructor and start methods to the bare minimum amount of code that you need there and then spawn a new thread to do your actual dequeue processing. If, for some insane reason, you immediately go into a while(true){ } loop block in your Start, Query::Start will return but the query will, for all intents and purposes, be hung in limbo. In fact, I just tried this by putting an infinite loop with a sleep inside an adapter’s Start method and the query’s Start method still returned. However, the StreamInsight management service – and, therefore the debugger – will hang. Below is a little table from some logging code that shows the order of events on query and output adapter startup and includes the managed thread id. Note that we have the “Started Query” message while the adapter is still in the Start method and we even get to creating the next query before start actually exits. It’s also worthwhile to note the number of different threads being used in the process. The factory’s create is called on a separate thread and then the adapters start is called on still a different thread. The call to consume events is synchronous in the start … though I’m not sure if that’s actually necessary to do. Later calls to consume events are on different threads. Source Query ThreadId Time Message MAIN initialQuery 9 1418 Creating query MAIN initialQuery 9 848464 Created query MAIN initialQuery 9 905283 Starting query SqlOutputAdapterFactory initialQuery 18 1944604 Creating adapter SqlOutputPoint initialQuery 18 1961683 Constructor called SqlOutputPoint initialQuery 18 1972490 Constructor exit SqlOutputAdapterFactory initialQuery 18 1980381 Created adapter SqlOutputPoint initialQuery 21 2134443 Start called SqlOutputAdapter initialQuery 21 2149574 Start Enter. MAIN initialQuery 9 2158461 Started query MAIN hoppingWindowAggregate 9 2235857 Creating query SqlOutputAdapter initialQuery 21 2536294 ConsumeEvents Enter SqlOutputAdapter initialQuery 21 2553473 ConsumeEvent Exit - Queue is empty. Items dequeued: 0 - CTIs dequeued:0 SqlOutputAdapter initialQuery 21 2558886 Start Exit SqlOutputPoint initialQuery 21 2567542 Start exit Dequeuing Events This is the core of your output adapter … after all, if you don’t dequeue your events, there’s no purpose to having an output adapter, is there? So your code may look something like the following (taken from the Sql Output Adapter sample): /// <summary> /// Dequeues each event from StreamInsightReads, and writes it as a row into the SQL sink /// </summary> private void ConsumeEvents() { TEvent currentEvent = default(TEvent); while (true) { try { // if the engine asked the adapter to stop if (this.outputAdapter.AdapterState == AdapterState.Stopping) { // clean up state this.Cleanup(); // inform the engine that the adapter has stopped this.outputAdapter.Stopped(); // and exit worker thread return; } // Dequeue the event DequeueOperationResult result = this.outputAdapter.Dequeue(out currentEvent); // if the engine does not have any events, the adapter is Suspended; so do this .. if (result == DequeueOperationResult.Empty) { // inform the engine that adapter is ready to be resumed this.outputAdapter.Ready(); // exit the worker thread return; } // write out event to output table this.CreateRowFromEvent(currentEvent); } finally { // IMPORTANT: Release the event always if (currentEvent != null) { this.outputAdapter.ReleaseEvent(ref currentEvent); } } } } Events will come in “bunches” based on your CTIs; in fact, events are only “released” to the output adapter when there is a CTI. Exactly how this works varies by the event shape. For point events, if the start time is within the last CTI span (time span between CTIs), it will be released to the output adapter. For an interval, the event is released to the output adapter only when the end time is in the last CTI span. (This creates some pretty serious complications if you want to have intervals go from StreamInsight to StreamInsight as intervals. In fact, it’s pretty much impossible to do it correctly). For an edge, you get two events in the output adapter. The start edge is released when the start time is within the last CTI span. The end edge gets released when the end time is within last time. It is important to note that the events are not released one at a time as they pass through the engine but only when there is a CTI. From a functionality perspective, the CTI becomes a nice marker for doing batching to your output destination. In fact, in the Sql adapter that we have in our framework, we batch the events and then, on CTI, do a bulk insert into Sql Server of all the events from that CTI span rather than doing inserts one-at-a-time like the sample does. When you get DequeueOperationResult.Empty as your result, it’s time for your adapter to take a little nap … there is nothing left for it to do. At this point, you should exit whatever processing method that you have … so, return directly from ConsumeEvents, as you see in the adapter code above. Additional attempts to dequeue will be exercises in futility … there’s nothing there for you to dequeue. Before you exit your ConsumeEvents method, you need to make sure that you call Adapter::Ready to let the engine know that you are taking a break but are ready for more action when it is. This is also a good time to let go of any expensive resources that you may be holding or at least configure a timer to let go of them if you aren’t resumed within a period of time. That’s where Resume comes in to play. Resume is StreamInsight’s method to “wake up” your output adapter from its little snooze and start dequeuing events again because there are events there for you to dequeue. If you look at the code sample above, you’ll see that Ready() is called right after the result of the dequeue operation is “Empty”. You must do that. If you don’t call Ready() after an empty dequeue, StreamInsight will never call Resume() for you to resume dequeueing events. You may also find yourself in a situation where you never get Empty as the result of a dequeue operation and, because of that, you don’t actually need to worry about Resume(). But don’t be thinking that you’ve gotten off scott-free here. If you never empty your queue, it means that your output adapter isn’t keeping up with the events coming in from the query, which is a completely different issue in and of itself. You want to get to a point … sometime before the end of the application … where you have an empty queue and need to be resumed because, if that’s happening, you are keeping up with the events that are coming through from the StreamInsight engine. You will see this happening if you enable performance counters for the query and look at the “# Events in output queue” counter. This same counter is also at the application level and provides the total number of events in the output queues, regardless of whether the queries are instrumented for performance counters or not. The table below shows some of the logging events when I overloaded the output adapter with just too many events for it to keep up with: Source Query Thread Id Stopwatch Message SqlOutputPoint initialQuery 20 5284643 Resume called SqlOutputAdapter initialQuery 20 5294055 Resume called. SqlOutputAdapter initialQuery 20 5302828 ConsumeEvents Enter SqlOutputAdapter initialQuery 20 21442634 Still consuming events - 496 Inserts and 4 CTIs. SqlOutputAdapter initialQuery 20 36501939 Still consuming events - 991 Inserts and 9 CTIs. SqlOutputAdapter initialQuery 20 51355765 Still consuming events - 1486 Inserts and 14 CTIs. SqlOutputAdapter initialQuery 20 66693981 Still consuming events - 1981 Inserts and 19 CTIs. SqlOutputAdapter initialQuery 20 83590260 Still consuming events - 2476 Inserts and 24 CTIs. SqlOutputAdapter initialQuery 20 99175549 Still consuming events - 2971 Inserts and 29 CTIs. MAIN initialQuery 9 109556687 Stopping query SqlOutputAdapter initialQuery 20 113608268 Still consuming events - 3466 Inserts and 34 CTIs. SqlOutputAdapter initialQuery 20 116769647 ConsumeEvent Exit - Queue is empty. Items dequeued: 3600 - CTIs dequeued:36 SqlOutputPoint initialQuery 29 116769669 Resume called SqlOutputAdapter initialQuery 29 116817259 Resume called. SqlOutputAdapter initialQuery 29 116822197 ConsumeEvents Enter SqlOutputAdapter initialQuery 29 116827716 Cleanup enter SqlOutputPoint initialQuery 31 116770289 Stop called One thing that you may notice here is that the query started stopping while events were being dequeued. In fact, when the query is stopped, the input adapter is immediately stopped but the output adapter can continue to run until such time as it is able to completely empty its queue of events. Also, not that the output adapter's stop isn’t called until its queue is empty. This can make it appear that the application is hanging on shutdown when, in fact, it’s simply trying to finish dequeuing the events. And no, this isn’t the reason for the hanging shutdown symptom that I mentioned above … that’s a completely different animal. Shutdown Now this is where it gets really tricky. In fact, I would say that this is probably one of the toughest parts of StreamInsight development to get exactly right. First, you have two ways of knowing that it’s time to shutdown … you have your AdapterState (Stopping) and you have your Stop method. I’ve dug around in this stuff a bit in Reflector and, from what I could gather, there is no guarantee that the adapter state will be set to Stopping before the call to Adapter::Stop. From what I could see, the internal operation to set the property and the internal operation to call Adapter::Stop run on different threads. While I’ve never seen this to be the case, I’ve also not looked for it all that closely either. But … don’t rely on your adapter state to be Stopping when your Stop method is called. And, if it’s not, it may well change before you actually leave your Stop method. Now, let’s take a look at a couple of the log writes from shutdown time: Source Query Thread ID Stopwatch Message SqlOutputPoint hoppingWindowAggregate 30 55414039 Calling Stopped SqlOutputAdapter hoppingWindowAggregate 28 55363008 ConsumeEvents Enter SqlOutputAdapter hoppingWindowAggregate 28 55424376 Cleanup enter SqlOutputAdapter hoppingWindowAggregate 28 55429802 Cleanup exit SqlOutputAdapter hoppingWindowAggregate 28 55435118 ConsumeEvents Exit - Stopping SqlOutputPoint hoppingWindowAggregate 30 55441926 Dispose called - disposing = True SqlOutputAdapter hoppingWindowAggregate 30 55446727 Dispose called SqlOutputAdapter hoppingWindowAggregate 30 55451689 Cleanup enter SqlOutputPoint hoppingWindowAggregate 30 55459592 Stopped Called SqlOutputAdapter hoppingWindowAggregate 28 66790307 ConsumeEvent Exit - Exception in ConsumeEvent -> ObjectDisposedException:Cannot access a disposed object. MAIN snapshotWindowQuery 9 71025431 Stopping query SqlOutputAdapter initialQuery 27 71197933 ConsumeEvents Exit - Stopping SqlOutputPoint initialQuery 40 71274224 Dispose called - disposing = True SqlOutputAdapter initialQuery 40 71367034 Dispose called SqlOutputAdapter initialQuery 40 71372717 Cleanup enter SqlOutputPoint initialQuery 40 71378355 Stopped Called SqlOutputAdapter initialQuery 27 76953367 ConsumeEvent Exit - Exception in ConsumeEvent -> ObjectDisposedException:Cannot access a disposed object. SqlOutputAdapter snapshotWindowQuery 28 81816540 ConsumeEvents Exit - Stopping SqlOutputPoint snapshotWindowQuery 28 81823836 Dispose called - disposing = True SqlOutputPoint snapshotWindowQuery 42 81742317 Stop called SqlOutputAdapter snapshotWindowQuery 42 81836699 Cleanup enter SqlOutputPoint snapshotWindowQuery 42 81843947 Calling Stopped SqlOutputAdapter snapshotWindowQuery 28 81828773 Dispose called SqlOutputAdapter snapshotWindowQuery 28 81856597 Cleanup enter SqlOutputPoint snapshotWindowQuery 42 86922959 Exception in Stop -> ObjectDisposedException:Cannot access a disposed object. I’ve highlighted the exceptions from all three instances of the adapter. They are all the same type of exception – ObjectDisposedException – but two of them come from ConsumeEvent and one comes from Stop. I’ve done runs of this demo/sample several times and there is no consistency on which method is going to throw the object disposed exception. I can say, with certainty, that just about every time that I’ve run this adapter, I’ve gotten that same ObjectDisposedException from either ConsumeEvent or Stop. What’s happening is that the adapter is called Stopped() twice and, since calling Stopped() calls your IDisposable::Dispose method, the second time gives you an object disposed exception. In order to prevent this, you need to make sure that stopped is only called once. The challenge here is that Stop() and ConsumeEvents() are always going to be on different threads. So, you may say, just call Stopped in the Stop method and not in the ConsumeEvents method. While that would ensure that Stopped is only called once, you may still be inside ConsumeEvents() – which would cause another ObjectDisposedException should you access anything on the adapter base class – like Dequeue. OK, you say … let’s only called Stopped from our ConsumeEvents(). Well … your issue here may be that, if you aren’t getting any events in the pipeline and your adapter is paused waiting for a call to Resume(), Stopped() will never get called. And if Stopped() never gets called, your shutdown will hang while StreamInsight waits for your adapter to call Stopped(), signalling that it has completed everything necessary to clean up resources. Eventually … I think after 3 minutes … StreamInsight will terminate the adapter with extreme prejudice. And that’s the other symptom of getting your adapter lifetime wrong. So … let’s review what the lifetime order-of-events is for shutdown. First, Stop will be called and the adapter’s state will be set to stopping though both won’t necessarily happen at the same time. This provides the adapter with an opportunity to shut down any open resources and do any other clean up that may be necessary. When you are done with all of this and ready to be completely shut down, you must call Stopped() to notify the engine that you are all done cleaning up after yourself. So how do we fix this, now that we understand the problem? First, we need to make sure that Stopped() is only called once. We are guaranteed that Stop() will be called when all is done, so that’s where we’ll put the single line. Since Stop() is only called after the queue is empty and the query is stopped, that’s probably all that we need to do. But I like to put an extra layer of protection in place to make sure that we aren’t in the middle dequeue process when Stop() calls Stopped(). In order to do this, we’ll need to use some sort of thread synchronization mechanism. In this case, we’ll create a lock object that both methods use and then utilize C#’s lock block to make sure that only one of those methods will run at a time (remember … they are called on separate threads). This lock, on the output adapter, probably isn’t necessary but, like I said, it’s that extra level of protection against Heisenbugs. Here’s what our log looks like for shutdown of one of the adapters now: Source Query ThreadId Stopwatch Message SqlOutputAdapter snapshotWindowQuery 24 31526005 ConsumeEvent Exit - Queue is empty. Items dequeued: 206 - CTIs dequeued:2 SqlOutputPoint snapshotWindowQuery 28 31526014 Stop called SqlOutputPoint snapshotWindowQuery 26 31527422 Resume called SqlOutputAdapter snapshotWindowQuery 24 31537450 Consume event -> exiting lock SqlOutputAdapter snapshotWindowQuery 26 31545272 Resume called. SqlOutputPoint snapshotWindowQuery 28 31547511 Stop -> Entering lock SqlOutputAdapter snapshotWindowQuery 28 31551377 Cleanup enter SqlOutputAdapter snapshotWindowQuery 28 31553290 Cleanup exit SqlOutputAdapter snapshotWindowQuery 26 31549564 ConsumeEvents Enter SqlOutputPoint snapshotWindowQuery 28 31555071 Calling Stopped SqlOutputPoint snapshotWindowQuery 28 31558768 Dispose called - disposing = True SqlOutputAdapter snapshotWindowQuery 28 31560581 Dispose called SqlOutputAdapter snapshotWindowQuery 28 31562515 Cleanup enter SqlOutputAdapter snapshotWindowQuery 26 31557017 ConsumeEvents Exit - Stopping/Stopped SqlOutputPoint snapshotWindowQuery 28 31566929 Stopped returned SqlOutputPoint snapshotWindowQuery 28 31571420 Stop -> Exiting lock SqlOutputPoint snapshotWindowQuery 28 31573536 Stop Exiting There are no exceptions. There is no hang on shutdown except for any “hanging” while the output adapter empties the queue. The log above also clearly shows where the lock is entered and exited. You will notice that ConsumeEvents still gets called once during the shutdown process but exits immediately. How was this done? First, I created a lock object in SqlAdapter.cs … this is the single writer for the adapter. Then, while in the while(true) loop when dequeuing events, I still check for adapter state before every event is dequeued but, instead of calling Stopped() if the adapter is stopping, I simply exit – I let Stop() take care of that. Then I enter the lock which surrounds the entire dequeue process, ensuring that Stop() doesn’t call Cleanup() or Stopped() in the middle of the process. In the Point/Interval/Edge output adapters, there is a lock around the shutdown code in the Stop() method, ensuring that Cleanup() and Stopped() isn’t called while I’m in a dequeue process. Could we get fancier and use other thread synchronization mechanisms? Sure, but I’ve not seen any real performance benefit in it (we did test it) and … well … this method is simple, clear and easy to see what’s happening – it follows the KISS principle. Here’s what the updated code looks like: SqlOutputAdapter.cs: /// <summary> /// Lock object to ensure that we aren't calling Stopped() when inside ConsumeEvents /// </summary> public readonly object LockObject = new Object(); /// <summary> /// Dequeues each event from StreamInsightReads, and writes it as a row into the SQL sink /// </summary> private void ConsumeEvents() { TEvent currentEvent = default(TEvent); while (true) { // if the engine asked the adapter to stop // Note that this check is *outside* the lock if (this.outputAdapter.AdapterState == AdapterState.Stopping || this.outputAdapter.AdapterState == AdapterState.Stopped) { //exit worker thread return; } //Lock during our enqueue process but only then. //Note that we check and acquire the lock *only* when absolutely necessary. lock (LockObject) { try { // Dequeue the event DequeueOperationResult result = this.outputAdapter.Dequeue(out currentEvent); // if the engine does not have any events, the adapter is Suspended; so do this .. if (result == DequeueOperationResult.Empty) { // inform the engine that adapter is ready to be resumed this.outputAdapter.Ready(); // exit the worker thread return; } // write out event to output table this.CreateRowFromEvent(currentEvent); } catch (Exception ex) { return; } finally { // IMPORTANT: Release the event always if (currentEvent != null) { this.outputAdapter.ReleaseEvent(ref currentEvent); } } } } } SqlOutputPoint.cs: /// <summary> /// Notifies the adapter to stop as a result of stopping or aborting the query. /// </summary> public override void Stop() { try { //Ensure that we aren't dequeuing right now. lock (this.outputAdapter.LockObject) { this.outputAdapter.Cleanup(); this.Stopped(); } } catch (Exception ex) { //log the failure } } That’s all there is to it. I did remove the logging code that produced the tables above for clarity but you can download the revised Simple StreamInsight App with the fixed Sql Output adapter and scripts for the databases from my SkyDrive here.

Specifying CTIs with LinqPad

StreamInsight | Code Sample
I’ve said it before and I’ll say it again … LinqPad is an essential tool for anyone doing StreamInsight applications. And don’t just settle for the free version but get the Pro version (at least) since it has Intellisense. I’m not ashamed to admit that I am completely, totally addicted to Intellisense (which I find somewhat amusing at times because it annoyed me to no end when it first came out in VB 5.0 – but then I got used to it and descended into my current addiction). With that said (again), one thing that I’ve found a little … oh … less-than-perfect doesn’t have to do with LinqPad but with the way that all of the StreamInsight samples create the streams, which also happened to be how I was creating my streams. Until recently, that is. You see, AdvanceTimeSettings.IncreasingStartTime doesn’t always mirror how we are going to see data in the real world. It also doesn’t allow you to show how CTIs can be used to handle little issues like latency from the source data. To do that, you really need to specify your own CTIs so that you can control – and others can see - exactly where the CTI is issued in relation to the enqueued Insert events. You also can’t test/prototype query scenarios where you have multiple events with the same identifier in a single CTI span – or no events within a CTI span. Both of these scenarios can – and do – happen in the real world. But … and this depends on the adapter … you may want to handle CTI’s in your input adapter itself rather than relying on AdvanceTimeSettings. It turns out, however, that it’s really not that difficult. Let’s start with how we typically do it. First, we have some source data as an array and a function to create our event timestamp. Then we create the point stream from the array using ToPointStream (or ToIntervalStream or ToEdgeStream). Here’s a code example and the results from LinqPad: void Main() { Func<int, DateTimeOffset> t = (s) => new DateTimeOffset(2011, 1, 11, 8, 0, 0, TimeSpan.Zero).AddSeconds(s); var values = new [] { new {Item="Variable1", Value=92, Timestamp=0}, new {Item="Variable2", Value=60, Timestamp=0}, new {Item="Variable1", Value=93, Timestamp=2}, new {Item="Variable2", Value=75, Timestamp=2}, new {Item="Variable1", Value=88, Timestamp=3}, new {Item="Variable2", Value=81, Timestamp=3}, new {Item="Variable1", Value=93, Timestamp=5}, new {Item="Variable2", Value=82, Timestamp=5} }; var valueStream = values.ToPointStream(Application, e => PointEvent.CreateInsert(t(e.Timestamp), new {Item = e.Item, Value = e.Value}), AdvanceTimeSettings.IncreasingStartTime); valueStream.ToPointEnumerable().Dump("Results"); }   Results IEnumerable<PointEvent<>> (13 items) EventKind StartTime Payload Insert 1/11/2011 8:00:00 AM ø { Item = Variable1, Value = 92 } Item Variable1 Value 92 Cti 1/11/2011 8:00:00 AM null Insert 1/11/2011 8:00:00 AM ø { Item = Variable2, Value = 60 } Item Variable2 Value 60 Insert 1/11/2011 8:00:02 AM ø { Item = Variable1, Value = 93 } Item Variable1 Value 93 Cti 1/11/2011 8:00:02 AM null Insert 1/11/2011 8:00:02 AM ø { Item = Variable2, Value = 75 } Item Variable2 Value 75 Insert 1/11/2011 8:00:03 AM ø { Item = Variable1, Value = 88 } Item Variable1 Value 88 Cti 1/11/2011 8:00:03 AM null Insert 1/11/2011 8:00:03 AM ø { Item = Variable2, Value = 81 } Item Variable2 Value 81 Insert 1/11/2011 8:00:05 AM ø { Item = Variable1, Value = 93 } Item Variable1 Value 93 Cti 1/11/2011 8:00:05 AM null Insert 1/11/2011 8:00:05 AM ø { Item = Variable2, Value = 82 } Item Variable2 Value 82 Cti 12/31/9999 11:59:59 PM null Most of the samples filter the CTIs out from the dump but I like to see them (always). Of course, since this post is about CTIs, we definitely need to see them. If you take a look at the results, you’ll see that the CTIs aren’t exactly where you might expect them to be. When you use IncreasingStartTime, the engine “watches” for a new start time to be enqueued with an event. It then enqueues a CTI with that new event’s start time. The next event – with the same start time – is in the next CTI span. So each CTI span has events with two different start times! Let’s change it around a bit. There is an overload of ToPointStream that takes an AdvanceTimeSettings, which gives you more control over your CTIs. Changing the code around a bit, we certainly get different results: AdvanceTimeSettings ats = new AdvanceTimeSettings( new AdvanceTimeGenerationSettings( TimeSpan.FromSeconds(2), TimeSpan.FromSeconds(0)), null, AdvanceTimePolicy.Drop); var valueStream = values.ToPointStream(Application, e => PointEvent.CreateInsert(t(e.Timestamp), new {Item = e.Item, Value = e.Value}),ats, "Values");   Values IEnumerable<PointEvent<>> (11 items) EventKind StartTime Payload Insert 1/11/2011 8:00:00 AM ø { Item = Variable1, Value = 92 } Item Variable1 Value 92 Cti 1/11/2011 8:00:00 AM null Insert 1/11/2011 8:00:00 AM ø { Item = Variable2, Value = 60 } Item Variable2 Value 60 Insert 1/11/2011 8:00:02 AM ø { Item = Variable1, Value = 93 } Item Variable1 Value 93 Cti 1/11/2011 8:00:02 AM null Insert 1/11/2011 8:00:02 AM ø { Item = Variable2, Value = 75 } Item Variable2 Value 75 Insert 1/11/2011 8:00:03 AM ø { Item = Variable2, Value = 81 } Item Variable2 Value 81 Insert 1/11/2011 8:00:03 AM ø { Item = Variable1, Value = 88 } Item Variable1 Value 88 Insert 1/11/2011 8:00:05 AM ø { Item = Variable1, Value = 93 } Item Variable1 Value 93 Cti 1/11/2011 8:00:05 AM null Insert 1/11/2011 8:00:05 AM ø { Item = Variable2, Value = 82 } Item Variable2 Value 82   It is different and it’s also close to what I want. But there are still events in there that have mixed start times within the same CTI window. But the example above isn’t quite fair … if I add a touch of a delay into the AdvanceTimeSettings begin to look more like what I expect. But … if you look above, we aren’t getting them every 2 seconds. We still have a patch of events with different start times. And – notice – they don’t come every 2 seconds like clockwork. Instead the CTIs are enqueued only after an event start time changed in the CTI. The only way to resolve it is to take complete control over the CTIs … so we add them into the source data. We don’t have to specify any AdvanceTimeGenerationSettings since, of course, we are enqueing manually. Which gives us the following code and output: void Main() { Func<int, DateTimeOffset> t = (s) => new DateTimeOffset(2011, 1, 11, 8, 0, 0, TimeSpan.Zero).AddSeconds(s); var values = new [] { new {Item="Variable2", Value=60, Timestamp=0}, new {Item="CTI", Value=60, Timestamp=0}, new {Item="Variable1", Value=93, Timestamp=2}, new {Item="Variable2", Value=75, Timestamp=2}, new {Item="Variable1", Value=88, Timestamp=3}, new {Item="Variable2", Value=81, Timestamp=3}, new {Item="CTI", Value=60, Timestamp=3}, new {Item="Variable1", Value=93, Timestamp=5}, new {Item="Variable2", Value=82, Timestamp=5}, new {Item="CTI", Value=60, Timestamp=5} }; var sourceData = values.ToPointStream(Application, e => e.Item != "CTI" ? PointEvent.CreateInsert(t(e.Timestamp), new Payload(){Item=e.Item, Value=e.Value}): PointEvent<Payload>.CreateCti(t(e.Timestamp).AddTicks(1))); sourceData.ToPointEnumerable().Dump("Results"); } // Define other methods and classes here public struct Payload{ public string Item; public int Value; } Note that we aren’t using an anonymous type for the stream – we can’t. You’ll get a compile error if you do. Also, the method that we’re using isn’t very reusable and we’ll wind up writing the same thing over and over again and tweaked to whatever we did. Finally, I’m not really thrilled about the clarity. But we can kick this up a notch and use a fully reusable method that handles creating the events and can use anonymous types, thanks to the wonderful goodness that are lamdas. Check it out: public static PointEvent<TPayload> GetPointEvent<TPayload, TSource>( TSource source, Func<TSource, bool> ctiSelectExpression, Func<TSource, TPayload> payloadSelectExpression, Func<TSource, DateTimeOffset> eventTimeExpression) { bool isCti = ctiSelectExpression.Invoke(source); if(isCti) { return PointEvent<TPayload>.CreateCti(eventTimeExpression.Invoke(source)); } return PointEvent<TPayload>.CreateInsert(eventTimeExpression.Invoke(source), payloadSelectExpression.Invoke(source)); } We can then use this in ToPointStream: var sourceData = values.ToPointStream(Application, e => GetPointEvent(e, i=> i.Item == "CTI", i=> new {Item=i.Item, Value = i.Value}, i => t(i.Timestamp))); The output is the same as the first method but, in this case, it is more reusable and I find it a touch simpler.

Output Adapter –> Input Adapter Communications : Follow up

StreamInsight
Just a quite note to follow up on my previous post on Output Adapter –> Input Adapter Communications : Event Shapes – specifically about Edge output to Edge input scenarios. While this scenario works just fine in an ideal world, we all know that we don’t live in an ideal world. Instead, there are potential communication breakdowns between the StreamInsight servers from things like reboots (for whatever reason … like Windows Updates), network outages or – if you are using a unreliable protocol like UDP – dropped packets and messages. In an edge-to-edge scenario, it is possible for the hub StreamInsight server to get a start edge … but never an end. In this case, you have an event that is in the engine and participating in analysis, joins, unions, aggregates, etc. that is no longer valid. But … since the end never “came in”, you have no way of knowing that the event is no longer valid and its end date is, essentially, the end of time. Over a long-running process, this can build if you have several starts without a corresponding end. On the other end of the spectrum, you could get an end event without a corresponding start. StreamInsight won’t let you enqueue such a beastie – it will raise an exception – but the problem is deeper than that. As with the never-ending start, you’ll have data consistency issues. In this case, you rather than having an event that is part of your analysis, you are missing an event that should be a part of your analysis. Again, the result is that you have inconsistent output. Some of this … particularly issues with reboots … can be handled, to some extent, with checkpointing and adapters that understand and properly handle high water marks. But there’s nothing that you can do about communications outages or dropped/undelivered packets. Translating these incoming events to points simplifies these issues but doesn’t completely resolve all of them. If you enqueue a point on start, you can use the ToSignal() macro that’s in the LinqPad samples with a timeout of TimeSpan.MaxValue to get the same effect (Edge Start/End) in your output. And, while you can still have events living longer than they should, they will only live until you get an updated value for the item rather than living forever, which minimizes the impact and prevents orphaned starts from building up. Whether you are enqueuing only starts or only ends, you still may miss some events but that is a potential problem regardless of your event shapes. So … the edge output to edge input scenario isn’t quite as simple as it appeared at first blush. In a test/lab scenario, it will usually work just fine, especially when following the “happy path”. However, there are other challenges that come into play in a real-world scenario where things go wrong and, with these challenges in mind, the edge-to-edge scenario is more challenging. At the end of it, point inputs, regardless of the source event shape, provide the simplest use case and present only those challenges that are due to the very nature of a distributed system. Using something like MSMQ for the transport would resolve a lot of this as well … but it comes at a (pretty significant) cost of throughput and latency.

Output Adapter –> Input Adapter Communications : Event Shapes

StreamInsight
One use case for StreamInsight in an enterprise environment is to have a hub and spoke architecture. In such an architecture, you would have multiple downstream StreamInsight instances (the spokes) that sit close to the data source to do event detection and processing on very high speed data that, very simply, can’t be sent to a centralized server due to network latency and limited bandwidth. This server would also downsample and (likely) filter data that is of interest to a centralized operations center where there is an aggregating StreamInsight server (the hub). In the case of an “interesting” event or set of events, additional information can be “turned on” at the downstream server to add to the existing feed, thereby optimizing the use of limited bandwidth while preserving the ability to view and collect data that is of critical interest. The aggregating StreamInsight server can then provide end-user interfaces with data as well as do additional aggregated analysis across all of the spokes. Two example use cases for this: Oil & Gas Production: In O&G, you would have a downstream StreamInsight server that sits at the drill site, whether that be an offshore rig or an onshore platform. In both of these scenarios, it is entirely likely that there are very limited pipes back to a central operations center. Furthermore, since these installations can have thousands or tens of thousands of sensors, not all of this information will be useful – or even desirable – in the onshore operations center but still needs to be processed and analyzed onsite. Aggregated, filtered data and even calculated data based on the raw sensor feeds would be forwarded to a central StreamInsight server back “on the beach” for analysis across platforms and monitoring. Cross-platform aggregation would be interesting and useful when multiple platforms are using the same pipelines to make sure that there is capacity in the pipeline for current production as well as to optimize capacity usage. Utilities – SmartGrid: This scenario provides an even better use case for this kind of architecture. A single utility company will have millions of smart meters installed across their service area. Utility companies are also installing “smart transformers” that provide data related to transformer performance. StreamInsight may be fast (it is) and may be able to handle a lot of data at high frequency (it can) but it can only do so much. Having a single StreamInsight server processing the data from all of a utlity company’s smart meters and transformers simply isn’t realistic. Like the O&G scenario above, downstream StreamInsight servers would collect information from individual meters and transformers, downsample and aggregate and then send to a centralized server. In fact, with utilities, there may be a couple of layers of this, depending on the size of the service area and utility provider. Initial aggregate (at the source) by substation that is then fed to the hub server would be useful and interesting. From there, aggregation across substations can provide the information required to ensure that there is enough capacity on the grid for current usage as well as optimize the grid’s current capacity so there isn’t too much capacity that isn’t required. Aggregation by substation – or, for example, zip code – can help utility providers optimize and target certain areas for rolling brown/blackouts when necessary with the minimum impact required to keep the grid balanced. My team has developed some adapters that are specifically designed for and intended to be used in these scenarios. As we’ve been doing this, we’ve also had a lot of discussion around how these should work, especially in cases where the inbound stream has a different shape than the target input adapter (e.g. Edge output –> Point Input). How are these translated from one to the other? What kind of event shapes are actually valid in these scenarios? Here’s what we’ve come up with: Output (Source) Adapter Shape Input (Target) Adapter Shape Valid Use Case Comments Edge Edge Yes Events should be enqueued as they arrive. No translation should be done unless the Start Edge time is before the last issued CTI. Edge Interval No Typically, this is invalid. With an inbound interval from a downstream StreamInsight server, the start time is just about guaranteed to be before the last issued CTI. Because interval events aren’t released to the output adapter until the end time, it is also possible for there to be different start times in one inbound package. Edge Point Yes The point input adapter should only enqueue the End Edge with an event timestamp equal to the EndTime of the end edge. If the end time is before the last issued CTI, Alternatively, you could enqueue a point at both the start and the end or just the start, using the corresponding timestamp. If these alternatives are required use cases, it should be configurable. Interval Edge No The only time this would be possible is to enqueue on the end edge event as it is only then that the end time (and total interval) is known. Since the start time is virtually guaranteed to be after the last issued CTI and since it is likely that different edge events that arrive together also have completely different start times, it is impossible to enqueue them correctly in the application timeline. We have determined, therefore, that this is an invalid use case. Interval Interval No While it seems that there would be no translation required, that is not the case. As with the edge target above, the start time of the interval is virtually guaranteed to be before the last issued CTI. We have determined, therefore, that this an invalid use case. Interval Point Yes Again, the start time is virtually guaranteed to be before the last issued CTI and start times in the same “group” will likely have different start times. Therefore, the point should be enqueued with the End Time of the inbound interval event. Point Point Yes No translation necessary. Enqueue the point with the original timestamp. Point Interval No While possible, it doesn’t really make much sense. You could enqueue the interval with an end time 1 tick past the start time but what would be the point? Point Edge No Just like with the interval target above, there isn’t a very good way to handle this that makes sense logically. Having a start edge and then an end edge with a 1 tick difference between start and end time defeats the purpose of an edge event. As you can see above, it’s not as straightforward as one would initially think. Because of their very nature, interval events are particularly problematic when coming in from a downstream StreamInsight server because their start times are typically in the past according to application time. Point events as a destination are universally valid use cases. What about CTI’s? As I’m sure you are aware, CTI’s advance application time and are not necessarily based on the system clock - though they certainly can be if that is desired. In the use cases above, they would be at the source, downstream servers and the destination, upstream server would advance application time based on the source servers. Due to latency, this would like be a touch behind the system clock. Depending on the protocol used, it is possible that events may arrive out-of-order so, in some cases, this CTI should have a configurable time span to account for this. You may need to issue a CTI for, say 10:00:00 at 10:00:02 – but using the 10:00:00 timestamp. This will also ensure that any queries that span multiple downstream, input servers are synchronized. One thing to also note with all of the timestamps – it may be necessary, in some cases, to account for differences in the source server application clock with the target server application clock. This can happen in cases where the source application clock is advanced according to the source system clock and and the application clocks across the source servers aren’t fully synchronized. In both the O&G and Utilities scenarios above, this is an entirely possible use case. If that is the case, the target adapter will need to do any translation necessary between the systems – the target, upstream server will need to do this to fully and accurately coordinate the clocks between the multiple downstream servers.

Cool StreamInsight query–Point input to Edge Output

StreamInsight | Code Sample
It’s easy to get individual events as points in StreamInsight. We can then take those events and write them directly out to our output adapter. There’s nothing to this. But … and this is a common scenario … what do we do if the values don’t change very often? Do we really want to write an hours worth of data that hasn’t changed, especially if we are reading the value every second or even several times a second? Probably not. And in many cases, we don’t need to. In many cases, in fact, we’re only interested in saving the values when they have changed. Of course, this can be done in StreamInsight through a little bit of query majick (or I wouldn’t be writing this blog entry, now would I?). So … what we are going to do here is to take a point stream, evaluate it, select only the items that have changed since the previous value and then send these to output as edge events. Each edge will have a start and end when the value changes. Simple enough, right? As with the previous example, this was done using LinqPad. In this case, I borrowed (or stole, as the case may be) a Linq macro from one of the samples – the FoldPairs macro. I also borrowed/stole the ToSignal extension and turned that into a Linq macro from the StreamInsight blog. Both of these are going to be darn handy in this example. public static CepStream<TResult> FoldPairs<TStream, TResult>( CepStream<TStream> input, Expression<Func<TStream, TStream, bool>> predicate, TimeSpan timeout, Expression<Func<TStream, TStream, TResult>> resultSelector) { var signal = input .AlterEventDuration(e => timeout) .ClipEventDuration(input, (f, s) => predicate.Compile()(f, s)); return from l in signal.ShiftEventTime(e => TimeSpan.FromTicks(1)) from r in input where predicate.Compile()(l, r) select resultSelector.Compile()(l, r); } public static CepStream<T> ToSignal<T, K>(CepStream<T> inputstream, Expression<Func<T, K>> keySelector) { return inputstream .AlterEventDuration(e => TimeSpan.MaxValue) .ClipEventDuration(inputstream, (e1, e2) => (keySelector.Compile()(e1)).Equals(keySelector.Compile()(e2))); } And now for our data. I’ve added comments to the data to show where we have value changes. We’ll take this and convert it to a point stream for evaluation. Func<int, int, DateTimeOffset> t = (h, m) => new DateTimeOffset(2011, 1, 25, 0, 0, 0, TimeSpan.Zero).AddHours(h).AddMinutes(m); var sourceData = new [] { //Initial event @ 4:12 new { SourceId = "A", Value = 22, TimeStamp = t(4, 12) }, new { SourceId = "A", Value = 22, TimeStamp = t(4, 13) }, new { SourceId = "A", Value = 22, TimeStamp = t(4, 14) }, //A: New event @ 4:15 new { SourceId = "A", Value = 67, TimeStamp = t(4, 15) }, //A: New event @ 4:16 new { SourceId = "A", Value = 54, TimeStamp = t(4, 16) }, new { SourceId = "A", Value = 54, TimeStamp = t(4, 17) }, new { SourceId = "A", Value = 54, TimeStamp = t(4, 18) }, new { SourceId = "A", Value = 54, TimeStamp = t(4, 19) }, new { SourceId = "A", Value = 54, TimeStamp = t(4, 20) }, new { SourceId = "A", Value = 54, TimeStamp = t(4, 21) }, //A: New event @ 4:22 new { SourceId = "A", Value = 87, TimeStamp = t(4, 22) }, //B: Initial Event @ 4:12 new { SourceId = "B", Value = 24, TimeStamp = t(4, 12) }, new { SourceId = "B", Value = 24, TimeStamp = t(4, 13) }, //B: New Event @ 4:14 new { SourceId = "B", Value = 31, TimeStamp = t(4, 14) }, new { SourceId = "B", Value = 31, TimeStamp = t(4, 15) }, new { SourceId = "B", Value = 31, TimeStamp = t(4, 16) }, new { SourceId = "B", Value = 31, TimeStamp = t(4, 17) }, new { SourceId = "B", Value = 31, TimeStamp = t(4, 18) }, //B: New Event @ 4:19 new { SourceId = "B", Value = 50, TimeStamp = t(4, 19) }, new { SourceId = "B", Value = 50, TimeStamp = t(4, 20) }, new { SourceId = "B", Value = 50, TimeStamp = t(4, 21) }, new { SourceId = "B", Value = 50, TimeStamp = t(4, 22) } }; var source = sourceData.OrderBy(e => e.TimeStamp).ToPointStream( Application, ev => PointEvent.CreateInsert(ev.TimeStamp.ToLocalTime(), new { ev.SourceId, ev.Value }), AdvanceTimeSettings.IncreasingStartTime); So far, pretty straightforward and nothing special. Next we need to calculate the change between two consecutive values. We do this using the borrowed FoldPairs macro. This will provide us with an anonymous type with our item identifier (SourceId) and the difference (delta) between two consecutive values. var delta = FoldPairs(source, (a, b) => a.SourceId == b.SourceId, TimeSpan.MaxValue, (a, b) => new { a.SourceId, diff = b.Value - a.Value }); Now that we have the deltas, it’s easy enough to join this back to the original source query, selecting only those source items where the delta is not equal to 0. var changesOnly = from r in delta join s in source on r.SourceId equals s.SourceId where r.diff != 0 select s; So far so good. If you run what we have so far and write this to the output, you’ll see that you only get those point events that change since the previous value. But … you’ll also see that we’re missing something – our very first, initial event. That’s because no delta is calculated for this event as it has no previous event. Well, if we are going to be a real application, we can’t have the first one disappearing on us all the time. So now we need to get the first event – the event that has nothing preceding it. To do this, we use the ToSignal macro and convert our initial source stream to a signal stream … they become an interval for each individual reading (whether changed or not). We then shift the event time to create an overlap between one event and the next one. Where we don’t have an overlap, we have the very first point event – a left anti semi-join. We can then take this an union it with the stream of changed events, providing a stream with the changed events AND our first event. var initialEvent = from s in source where ( from s2 in ToSignal(source, e=> e.SourceId) .ShiftEventTime(e => TimeSpan.FromTicks(1)) where s.SourceId == s2.SourceId select s2 ).IsEmpty() select s; var final = changesOnly.Union(initialEvent); This, in itself, is actually useful … you may not want to take the next step and convert them into edge events as this query will give you a point event for each value change. But … we’re not quite done with the scenario that we want to accomplish. To create edge events with a start and end time that represent value changes, we simply us the ToSignal() linq macro on our final point stream. If you want these as intervals, you’ll get, in effect, the same data except that you won’t “see” the interval until the end time. If they are edge events, you’ll get a start as soon as the value changes and the end before the next change. var signalEdges = ToSignal(final, e=> e.SourceId); And … it really is just that simple … it also helps that the Linq macros really reduce the amount of linq query statements that we have to write. You could, if you wanted, also add dead zones … where you take this up a notch and only produce an event when the value changes by a certain amount or percentage. But I’ll leave that as an exercise for the ready. Can’t take all the fun out of it, can I?  

Cool StreamInsight query–turning alarms into Edge events

Code Sample | StreamInsight
One of the use cases for StreamInsight is to detect certain alarm conditions on streaming real time data. This, in itself, isn’t all that special … many of the process control systems out there already do something like that. What makes StreamInsight different is that it can be a lot smarter about determining these alarm conditions. For example … one of the things that apparently happened (as I understand it) with Macondo (aka Deepwater Horizon) is that the PCS alarms went off constantly … they would trigger when 1 value was out of range every time it was out of range. So … there were so many false alerts that they simply turned the system off. It really isn’t all that unreasonable … many of these sensors will temporarily go out of range and it’s not indicative of a problem. In fact, it could just be a random bad reading or a transient condition that isn’t really a cause for alarm at all. However, if you start to have multiple values from the same sensor/device out of range within a particular time period, then you may really have an issue. You also don’t want to issue an alarm with every bad reading … but, instead, issue one at start and at finish. Because, with an alarm, you have a definite start of the event … but at the start, you have no idea when it will end. You also have a definite end to the event … but you know that only when things come back to normal. It’s a perfect fit for an edge event. Part of this can be found in the StreamInsight samples for LinqPad – a most righteously awesome tool that every StreamInsight query developer should have and use. It’s in the “Alarm Floods and Transients” sample under 101 StreamInsight Queries, section 2_QueryPatterns. I will start with that and describe the queries step-by-step. First … our scenario. We have a data stream with events that have a “Status” field. If this field is a “0”, it’s good, if “1” it’s bad. Now … in the real world, you’d get to this value somehow … through previous queries that do analytics or perhaps even from your data source. For our purposes, that is irrelevant. We’re interested in what we do with it. Now, in our case, we can actually get false alerts (of course) so we want to trigger an alarm only when we get multiple alerts within a specific time frame. We then want the alarm expressed as an edge event. Finally, if we have an alarm that crosses a specific amount of time, we want to repeat the alarm. We’ll have several steps to do this. First, here’s our source data; I added a little to what was there. (Notes: These are all set up to run in LinqPad. Also, we don’t differentiate the events by any sort of ID … you’ll need to do this in a real app). var sourceData = new [] { new { Status = 0, TimeStamp = DateTime.Parse("10/23/2009 4:10:00 PM") }, new { Status = 0, TimeStamp = DateTime.Parse("10/23/2009 4:11:00 PM") }, //False alert @ 4:12 new { Status = 1, TimeStamp = DateTime.Parse("10/23/2009 4:12:00 PM") }, new { Status = 0, TimeStamp = DateTime.Parse("10/23/2009 4:13:00 PM") }, new { Status = 0, TimeStamp = DateTime.Parse("10/23/2009 4:14:00 PM") }, //Real alert @ 4:15 new { Status = 1, TimeStamp = DateTime.Parse("10/23/2009 4:15:00 PM") }, new { Status = 1, TimeStamp = DateTime.Parse("10/23/2009 4:16:00 PM") }, new { Status = 1, TimeStamp = DateTime.Parse("10/23/2009 4:18:00 PM") }, new { Status = 0, TimeStamp = DateTime.Parse("10/23/2009 4:19:00 PM") }, new { Status = 0, TimeStamp = DateTime.Parse("10/23/2009 4:20:00 PM") }, //Real alert @ 4:21 - Longer alert that repeats new { Status = 1, TimeStamp = DateTime.Parse("10/23/2009 4:21:00 PM") }, new { Status = 1, TimeStamp = DateTime.Parse("10/23/2009 4:22:00 PM") }, new { Status = 1, TimeStamp = DateTime.Parse("10/23/2009 4:23:00 PM") }, new { Status = 1, TimeStamp = DateTime.Parse("10/23/2009 4:24:00 PM") }, new { Status = 1, TimeStamp = DateTime.Parse("10/23/2009 4:25:00 PM") }, new { Status = 1, TimeStamp = DateTime.Parse("10/23/2009 4:26:00 PM") }, new { Status = 1, TimeStamp = DateTime.Parse("10/23/2009 4:27:00 PM") }, new { Status = 0, TimeStamp = DateTime.Parse("10/23/2009 4:28:00 PM") }, new { Status = 0, TimeStamp = DateTime.Parse("10/23/2009 4:29:00 PM") }, //False alert @ 4:30 new { Status = 1, TimeStamp = DateTime.Parse("10/23/2009 4:30:00 PM") }, new { Status = 0, TimeStamp = DateTime.Parse("10/23/2009 4:31:00 PM") }, new { Status = 0, TimeStamp = DateTime.Parse("10/23/2009 4:32:00 PM") }, new { Status = 0, TimeStamp = DateTime.Parse("10/23/2009 4:33:00 PM") }, }; var source = sourceData.ToPointStream(Application, ev => PointEvent.CreateInsert(ev.TimeStamp.ToLocalTime(), ev), AdvanceTimeSettings.StrictlyIncreasingStartTime); var timeout = TimeSpan.FromMinutes(2); var alarmTimeout = TimeSpan.FromMinutes(5); Now that we have our source data and variables, let’s create two streams, one with valid items and one with all alerts: var validData = source.Where(e => e.Status == 0); var alarmEvents = source.Where(e => e.Status == 1); Next, we need to remove the false alerts … alerts that aren’t followed by another alert. We do this by taking the valid items, moving the start time back by the timeout and then extending the event duration by the timeout. If there is a successful join then there is a “good” event within the timeout. In that case, we filter the alarm out using a Left Anti-Semi Join. // take all alarm events that are not followed by a non-alarm event // within the timeout var nonTransientAlarms = from alarm in alarmEvents where (from nextevent in source .AlterEventLifetime( e => e.StartTime.Subtract(timeout), e => timeout) where nextevent.Status == 0 select nextevent).IsEmpty() select alarm; //Show the Non-transient alarms (from p in nonTransientAlarms.ToIntervalEnumerable() where p.EventKind == EventKind.Insert select p).Dump("Non-transient alarms"); The output shows all of the alarm events that do not have a “good” event within the timeout. You’ll notice that there is a flood of events … we’ll need to filter this out so that we have the initial alarm event. // Expand all alarm events to the timeout and count over snapshots var counts = from win in nonTransientAlarms .Where(e => e.Status == 1) .AlterEventDuration(e => timeout2) .SnapshotWindow(SnapshotWindowOutputPolicy.Clip) select new { count = win.Count() }; // Those snapshots with a count of 1 belong to the initial alarms. // reduce to points and join with original stream. var initialAlarm = from c in counts .Where(e => e.count == 1) .ToPointEventStream() from e in source select e; So far, we’ve not done much that’s not in the original sample. Now we go off that path and get these changed into Edges that represent the start and end of the actual alarm. We start by turning the valid data into a signal. We will also extend our initial alarms to the alarm timeout. From there, doing a LASJ again with the valid data signal will provide us with edge events that start and end with the alarm. Because we have 1 alarm that extends past the alarm timeout, this will generate 2 edge events. Together, these edge events will cover the entire alarm timeframe: var validDataSignal = from v in validData .AlterEventDuration(e=>TimeSpan.MaxValue) .ClipEventDuration(initialAlarm,(e1, e2) => true) select v; //Since with have the initial alarms, how long does the alarm last? var alarmTimeline = from i in initialAlarm.AlterEventDuration(e=> alarmTimeout) where ( from v in validDataSignal select v).IsEmpty() select i; (from p in alarmTimeline.ToEdgeEnumerable() where p.EventKind == EventKind.Insert select p).Dump("Alarm timeframe"); And that should do it …

Serialization and StreamInsight Adapter Config Classes

StreamInsight
It is essential that you mark your StreamInsight adapter configuration classes with either the [Serializable()] or [DataContract()] attributes. Keep in mind that the StreamInsight engine can be run either in process or remotely … and, if remote, these classes will have to be serialized on the wire. The serializable attribute is easier, but the DataContract attribute - through the [DataMember()] and the [IgnoreDataMember()] attributes - allows you to be more explicit in specifying which members actually go on the wire. I looked (quickly) and didn’t see this document but that doesn’t mean that it isn’t. Now … what can happen if you don’t do this? Bad things. This happened to me yesterday and simply reinforced why I shave my head (so I can’t rip my hair out). So … everything had been working fine. I was making some changes to an adapter to try to resolve a Heisenbug caused by a race condition during the adapter tear down (I’ll discuss some of these challenges later, I think). When I went to run to test, I started running into exceptions when starting the query. It was an XmlException with the message: Name cannot begin with the '<' character, hexadecimal value 0x3C. Huh? I checked through everything and there were no members that began with a “<”. In fact, the C# compiler won’t let you do this, even if you use the “@” character (rightfully so … I mean, why on earth would you do that??). Examining the full stack trace, I could see that it was happening during the stream creation while serializing something. Once I loaded the debug symbols, I found that the offending member name was “<LogName>k__BackingField”, which certainly does begin with a “<”. Of course, there was no member that I declared in the code … there’s no way that I would even try to declare such a thing. It turns out, however, that the C# compiler will declare the backing fields for automatic properties like this and, for some reason, it chose yesterday to start blowing up on me. I finally found that the configuration class for a specific adapter was not marked as serializable (and I know better than this) and, once I changed it, all worked just fine. I gotta tell you, I do so hate these mysterious errors.