Ruminations of J.net idle rants and ramblings of a code monkey

StreamInsight Output Adapter Lifetime

Code Sample | StreamInsight

[Download Sample Code]

I know it’s been a while since I’ve posted … things have been pretty busy here at the office with a major POC that we were doing. Since that’s just about wrapped up, I’m going to get back to blogging. My goal is to have a new post every week. Most, of course, will be on StreamInsight but I’ve got some other things that I’m thinking about as well.

For this post, I’ll be talking about the lifetime of a StreamInsight output adapter. There is some documentation on this on MSDN but it doesn’t have all of the details … and the samples provided in the StreamInsight product team samples don’t always work the way you would want in a real production application. I will detail why that’s the case and what you can do to make your adapters more robust.

Adapters … both input and output … are probably the most difficult thing to get exactly right in StreamInsight. This is due to the heavily multithreaded and asynchronous model of StreamInsight – and that’s something that developers have always struggled with to get right. Humans have a hard time really visualizing the code paths in multithreaded applications; our brains just don’t work that way. Now, back in the days when most processors were single-threaded, it wasn’t quite as difficult to do synchronization properly. In those days, we may spawn multiple threads but only one thread at a time was scheduled to run on the CPU … they weren’t truly concurrent. However, first with hyperthreading and now with multi-core processors (not to mention hyper-threaded multicore processors), multiple threads may very well be executing at the exact same instant. Of course, the exact timing is non-deterministic, which makes it that much more challenging to debug. They are Heisenbugs.

There are two major symptoms that you have your adapter lifetime wrong. You’ll either get random “ObjectDisposedExceptions” when shutting down or shutdown of StreamInsight will hang for a while. We will talk about why this happens. We’ll also be using the sample Sql Output Adapter from the product team samples since it does show some less-than-friendly shutdown behavior.

By the way, the testing was done with the Sql Server output adapter from the product team samples on CodePlex but using the Simple StreamInsight app sample on MSDN code samples.

First, the easy stuff …

Startup

Your output adapter is started when the query that it is bound to is started. In fact, your adapter is created when the query is started … not when ToQuery() is called. Also, keep in mind that the call to Query::Start is asynchronous … it occurs on a separate thread from Query::Start(). Still, it is very important that you limit your constructor and start methods to the bare minimum amount of code that you need there and then spawn a new thread to do your actual dequeue processing. If, for some insane reason, you immediately go into a while(true){ } loop block in your Start, Query::Start will return but the query will, for all intents and purposes, be hung in limbo. In fact, I just tried this by putting an infinite loop with a sleep inside an adapter’s Start method and the query’s Start method still returned. However, the StreamInsight management service – and, therefore the debugger – will hang.

Below is a little table from some logging code that shows the order of events on query and output adapter startup and includes the managed thread id. Note that we have the “Started Query” message while the adapter is still in the Start method and we even get to creating the next query before start actually exits. It’s also worthwhile to note the number of different threads being used in the process. The factory’s create is called on a separate thread and then the adapters start is called on still a different thread. The call to consume events is synchronous in the start … though I’m not sure if that’s actually necessary to do. Later calls to consume events are on different threads.

SourceQueryThreadIdTimeMessage
MAINinitialQuery91418Creating query
MAINinitialQuery9848464Created query
MAINinitialQuery9905283Starting query
SqlOutputAdapterFactoryinitialQuery181944604Creating adapter
SqlOutputPointinitialQuery181961683Constructor called
SqlOutputPointinitialQuery181972490Constructor exit
SqlOutputAdapterFactoryinitialQuery181980381Created adapter
SqlOutputPointinitialQuery212134443Start called
SqlOutputAdapterinitialQuery212149574Start Enter.
MAINinitialQuery92158461Started query
MAINhoppingWindowAggregate92235857Creating query
SqlOutputAdapterinitialQuery212536294ConsumeEvents Enter
SqlOutputAdapterinitialQuery212553473ConsumeEvent Exit - Queue is empty. Items dequeued: 0 - CTIs dequeued:0
SqlOutputAdapterinitialQuery212558886Start Exit
SqlOutputPointinitialQuery212567542Start exit

Dequeuing Events

This is the core of your output adapter … after all, if you don’t dequeue your events, there’s no purpose to having an output adapter, is there? So your code may look something like the following (taken from the Sql Output Adapter sample):

/// <summary>
/// Dequeues each event from StreamInsightReads, and writes it as a row into the SQL sink/// </summary>private void ConsumeEvents()
{
TEvent currentEvent = default(TEvent);while (true)
{
try{
// if the engine asked the adapter to stopif (this.outputAdapter.AdapterState == AdapterState.Stopping)
{
// clean up statethis.Cleanup();// inform the engine that the adapter has stoppedthis.outputAdapter.Stopped();// and exit worker threadreturn;
}
// Dequeue the eventDequeueOperationResult result = this.outputAdapter.Dequeue(out currentEvent);// if the engine does not have any events, the adapter is Suspended; so do this ..if (result == DequeueOperationResult.Empty)
{
// inform the engine that adapter is ready to be resumedthis.outputAdapter.Ready();// exit the worker threadreturn;
}
// write out event to output tablethis.CreateRowFromEvent(currentEvent);
}
finally{
// IMPORTANT: Release the event alwaysif (currentEvent != null)
{
this.outputAdapter.ReleaseEvent(ref currentEvent);
}
}
}
}

Events will come in “bunches” based on your CTIs; in fact, events are only “released” to the output adapter when there is a CTI. Exactly how this works varies by the event shape. For point events, if the start time is within the last CTI span (time span between CTIs), it will be released to the output adapter. For an interval, the event is released to the output adapter only when the end time is in the last CTI span. (This creates some pretty serious complications if you want to have intervals go from StreamInsight to StreamInsight as intervals. In fact, it’s pretty much impossible to do it correctly). For an edge, you get two events in the output adapter. The start edge is released when the start time is within the last CTI span. The end edge gets released when the end time is within last time. It is important to note that the events are not released one at a time as they pass through the engine but only when there is a CTI. From a functionality perspective, the CTI becomes a nice marker for doing batching to your output destination. In fact, in the Sql adapter that we have in our framework, we batch the events and then, on CTI, do a bulk insert into Sql Server of all the events from that CTI span rather than doing inserts one-at-a-time like the sample does.

When you get DequeueOperationResult.Empty as your result, it’s time for your adapter to take a little nap … there is nothing left for it to do. At this point, you should exit whatever processing method that you have … so, return directly from ConsumeEvents, as you see in the adapter code above. Additional attempts to dequeue will be exercises in futility … there’s nothing there for you to dequeue. Before you exit your ConsumeEvents method, you need to make sure that you call Adapter::Ready to let the engine know that you are taking a break but are ready for more action when it is. This is also a good time to let go of any expensive resources that you may be holding or at least configure a timer to let go of them if you aren’t resumed within a period of time.

That’s where Resume comes in to play. Resume is StreamInsight’s method to “wake up” your output adapter from its little snooze and start dequeuing events again because there are events there for you to dequeue. If you look at the code sample above, you’ll see that Ready() is called right after the result of the dequeue operation is “Empty”. You must do that. If you don’t call Ready() after an empty dequeue, StreamInsight will never call Resume() for you to resume dequeueing events.

You may also find yourself in a situation where you never get Empty as the result of a dequeue operation and, because of that, you don’t actually need to worry about Resume(). But don’t be thinking that you’ve gotten off scott-free here. If you never empty your queue, it means that your output adapter isn’t keeping up with the events coming in from the query, which is a completely different issue in and of itself. You want to get to a point … sometime before the end of the application … where you have an empty queue and need to be resumed because, if that’s happening, you are keeping up with the events that are coming through from the StreamInsight engine. You will see this happening if you enable performance counters for the query and look at the “# Events in output queue” counter. This same counter is also at the application level and provides the total number of events in the output queues, regardless of whether the queries are instrumented for performance counters or not. The table below shows some of the logging events when I overloaded the output adapter with just too many events for it to keep up with:

SourceQueryThread IdStopwatchMessage
SqlOutputPointinitialQuery205284643Resume called
SqlOutputAdapterinitialQuery205294055Resume called.
SqlOutputAdapterinitialQuery205302828ConsumeEvents Enter
SqlOutputAdapterinitialQuery2021442634Still consuming events - 496 Inserts and 4 CTIs.
SqlOutputAdapterinitialQuery2036501939Still consuming events - 991 Inserts and 9 CTIs.
SqlOutputAdapterinitialQuery2051355765Still consuming events - 1486 Inserts and 14 CTIs.
SqlOutputAdapterinitialQuery2066693981Still consuming events - 1981 Inserts and 19 CTIs.
SqlOutputAdapterinitialQuery2083590260Still consuming events - 2476 Inserts and 24 CTIs.
SqlOutputAdapterinitialQuery2099175549Still consuming events - 2971 Inserts and 29 CTIs.
MAINinitialQuery9109556687Stopping query
SqlOutputAdapterinitialQuery20113608268Still consuming events - 3466 Inserts and 34 CTIs.
SqlOutputAdapterinitialQuery20116769647ConsumeEvent Exit - Queue is empty. Items dequeued: 3600 - CTIs dequeued:36
SqlOutputPointinitialQuery29116769669Resume called
SqlOutputAdapterinitialQuery29116817259Resume called.
SqlOutputAdapterinitialQuery29116822197ConsumeEvents Enter
SqlOutputAdapterinitialQuery29116827716Cleanup enter
SqlOutputPointinitialQuery31116770289Stop called

One thing that you may notice here is that the query started stopping while events were being dequeued. In fact, when the query is stopped, the input adapter is immediately stopped but the output adapter can continue to run until such time as it is able to completely empty its queue of events. Also, not that the output adapter's stop isn’t called until its queue is empty. This can make it appear that the application is hanging on shutdown when, in fact, it’s simply trying to finish dequeuing the events. And no, this isn’t the reason for the hanging shutdown symptom that I mentioned above … that’s a completely different animal.

Shutdown

Now this is where it gets really tricky. In fact, I would say that this is probably one of the toughest parts of StreamInsight development to get exactly right. First, you have two ways of knowing that it’s time to shutdown … you have your AdapterState (Stopping) and you have your Stop method. I’ve dug around in this stuff a bit in Reflector and, from what I could gather, there is no guarantee that the adapter state will be set to Stopping before the call to Adapter::Stop. From what I could see, the internal operation to set the property and the internal operation to call Adapter::Stop run on different threads. While I’ve never seen this to be the case, I’ve also not looked for it all that closely either. But … don’t rely on your adapter state to be Stopping when your Stop method is called. And, if it’s not, it may well change before you actually leave your Stop method.

Now, let’s take a look at a couple of the log writes from shutdown time:

SourceQueryThread IDStopwatchMessage
SqlOutputPointhoppingWindowAggregate3055414039Calling Stopped
SqlOutputAdapterhoppingWindowAggregate2855363008ConsumeEvents Enter
SqlOutputAdapterhoppingWindowAggregate2855424376Cleanup enter
SqlOutputAdapterhoppingWindowAggregate2855429802Cleanup exit
SqlOutputAdapterhoppingWindowAggregate2855435118ConsumeEvents Exit - Stopping
SqlOutputPointhoppingWindowAggregate3055441926Dispose called - disposing = True
SqlOutputAdapterhoppingWindowAggregate3055446727Dispose called
SqlOutputAdapterhoppingWindowAggregate3055451689Cleanup enter
SqlOutputPointhoppingWindowAggregate3055459592Stopped Called
SqlOutputAdapterhoppingWindowAggregate2866790307ConsumeEvent Exit - Exception in ConsumeEvent -> ObjectDisposedException:Cannot access a disposed object.
MAINsnapshotWindowQuery971025431Stopping query
SqlOutputAdapterinitialQuery2771197933ConsumeEvents Exit - Stopping
SqlOutputPointinitialQuery4071274224Dispose called - disposing = True
SqlOutputAdapterinitialQuery4071367034Dispose called
SqlOutputAdapterinitialQuery4071372717Cleanup enter
SqlOutputPointinitialQuery4071378355Stopped Called
SqlOutputAdapterinitialQuery2776953367ConsumeEvent Exit - Exception in ConsumeEvent -> ObjectDisposedException:Cannot access a disposed object.
SqlOutputAdaptersnapshotWindowQuery2881816540ConsumeEvents Exit - Stopping
SqlOutputPointsnapshotWindowQuery2881823836Dispose called - disposing = True
SqlOutputPointsnapshotWindowQuery4281742317Stop called
SqlOutputAdaptersnapshotWindowQuery4281836699Cleanup enter
SqlOutputPointsnapshotWindowQuery4281843947Calling Stopped
SqlOutputAdaptersnapshotWindowQuery2881828773Dispose called
SqlOutputAdaptersnapshotWindowQuery2881856597Cleanup enter
SqlOutputPointsnapshotWindowQuery4286922959Exception in Stop -> ObjectDisposedException:Cannot access a disposed object.

I’ve highlighted the exceptions from all three instances of the adapter. They are all the same type of exception – ObjectDisposedException – but two of them come from ConsumeEvent and one comes from Stop. I’ve done runs of this demo/sample several times and there is no consistency on which method is going to throw the object disposed exception. I can say, with certainty, that just about every time that I’ve run this adapter, I’ve gotten that same ObjectDisposedException from either ConsumeEvent or Stop. What’s happening is that the adapter is called Stopped() twice and, since calling Stopped() calls your IDisposable::Dispose method, the second time gives you an object disposed exception. In order to prevent this, you need to make sure that stopped is only called once. The challenge here is that Stop() and ConsumeEvents() are always going to be on different threads. So, you may say, just call Stopped in the Stop method and not in the ConsumeEvents method. While that would ensure that Stopped is only called once, you may still be inside ConsumeEvents() – which would cause another ObjectDisposedException should you access anything on the adapter base class – like Dequeue. OK, you say … let’s only called Stopped from our ConsumeEvents(). Well … your issue here may be that, if you aren’t getting any events in the pipeline and your adapter is paused waiting for a call to Resume(), Stopped() will never get called. And if Stopped() never gets called, your shutdown will hang while StreamInsight waits for your adapter to call Stopped(), signalling that it has completed everything necessary to clean up resources. Eventually … I think after 3 minutes … StreamInsight will terminate the adapter with extreme prejudice. And that’s the other symptom of getting your adapter lifetime wrong.

So … let’s review what the lifetime order-of-events is for shutdown. First, Stop will be called and the adapter’s state will be set to stopping though both won’t necessarily happen at the same time. This provides the adapter with an opportunity to shut down any open resources and do any other clean up that may be necessary. When you are done with all of this and ready to be completely shut down, you must call Stopped() to notify the engine that you are all done cleaning up after yourself.

So how do we fix this, now that we understand the problem? First, we need to make sure that Stopped() is only called once. We are guaranteed that Stop() will be called when all is done, so that’s where we’ll put the single line. Since Stop() is only called after the queue is empty and the query is stopped, that’s probably all that we need to do. But I like to put an extra layer of protection in place to make sure that we aren’t in the middle dequeue process when Stop() calls Stopped(). In order to do this, we’ll need to use some sort of thread synchronization mechanism. In this case, we’ll create a lock object that both methods use and then utilize C#’s lock block to make sure that only one of those methods will run at a time (remember … they are called on separate threads). This lock, on the output adapter, probably isn’t necessary but, like I said, it’s that extra level of protection against Heisenbugs. Here’s what our log looks like for shutdown of one of the adapters now:

SourceQueryThreadIdStopwatchMessage
SqlOutputAdaptersnapshotWindowQuery2431526005ConsumeEvent Exit - Queue is empty. Items dequeued: 206 - CTIs dequeued:2
SqlOutputPointsnapshotWindowQuery2831526014Stop called
SqlOutputPointsnapshotWindowQuery2631527422Resume called
SqlOutputAdaptersnapshotWindowQuery2431537450Consume event -> exiting lock
SqlOutputAdaptersnapshotWindowQuery2631545272Resume called.
SqlOutputPointsnapshotWindowQuery2831547511Stop -> Entering lock
SqlOutputAdaptersnapshotWindowQuery2831551377Cleanup enter
SqlOutputAdaptersnapshotWindowQuery2831553290Cleanup exit
SqlOutputAdaptersnapshotWindowQuery2631549564ConsumeEvents Enter
SqlOutputPointsnapshotWindowQuery2831555071Calling Stopped
SqlOutputPointsnapshotWindowQuery2831558768Dispose called - disposing = True
SqlOutputAdaptersnapshotWindowQuery2831560581Dispose called
SqlOutputAdaptersnapshotWindowQuery2831562515Cleanup enter
SqlOutputAdaptersnapshotWindowQuery2631557017ConsumeEvents Exit - Stopping/Stopped
SqlOutputPointsnapshotWindowQuery2831566929Stopped returned
SqlOutputPointsnapshotWindowQuery2831571420Stop -> Exiting lock
SqlOutputPointsnapshotWindowQuery2831573536Stop Exiting

There are no exceptions. There is no hang on shutdown except for any “hanging” while the output adapter empties the queue. The log above also clearly shows where the lock is entered and exited. You will notice that ConsumeEvents still gets called once during the shutdown process but exits immediately.

How was this done? First, I created a lock object in SqlAdapter.cs … this is the single writer for the adapter. Then, while in the while(true) loop when dequeuing events, I still check for adapter state before every event is dequeued but, instead of calling Stopped() if the adapter is stopping, I simply exit – I let Stop() take care of that. Then I enter the lock which surrounds the entire dequeue process, ensuring that Stop() doesn’t call Cleanup() or Stopped() in the middle of the process. In the Point/Interval/Edge output adapters, there is a lock around the shutdown code in the Stop() method, ensuring that Cleanup() and Stopped() isn’t called while I’m in a dequeue process. Could we get fancier and use other thread synchronization mechanisms? Sure, but I’ve not seen any real performance benefit in it (we did test it) and … well … this method is simple, clear and easy to see what’s happening – it follows the KISS principle. Here’s what the updated code looks like:

SqlOutputAdapter.cs:

/// <summary>
///Lock object to ensure that we aren't calling Stopped() when inside ConsumeEvents/// </summary>public readonly object LockObject = new Object();/// <summary>
/// Dequeues each event from StreamInsightReads, and writes it as a row into the SQL sink/// </summary>private void ConsumeEvents()
{
TEvent currentEvent = default(TEvent);while (true)
{
// if the engine asked the adapter to stop
// Note that this check is *outside* the lockif (this.outputAdapter.AdapterState == AdapterState.Stopping || this.outputAdapter.AdapterState == AdapterState.Stopped)
{
//exit worker threadreturn;
}
//Lock during our enqueue process but only then. 
//Note that we check and acquire the lock *only* when absolutely necessary.lock (LockObject)
{
try{
// Dequeue the eventDequeueOperationResult result = this.outputAdapter.Dequeue(out currentEvent);// if the engine does not have any events, the adapter is Suspended; so do this ..if (result == DequeueOperationResult.Empty)
{
// inform the engine that adapter is ready to be resumedthis.outputAdapter.Ready();// exit the worker threadreturn;
}
// write out event to output tablethis.CreateRowFromEvent(currentEvent);
}
catch (Exception ex)
{
return;
}
finally{
// IMPORTANT: Release the event alwaysif (currentEvent != null)
{
this.outputAdapter.ReleaseEvent(ref currentEvent);
}
}
}
}
}

SqlOutputPoint.cs:

/// <summary>
/// Notifies the adapter to stop as a result of stopping or aborting the query./// </summary>public override void Stop()
{
try{
//Ensure that we aren't dequeuing right now.lock (this.outputAdapter.LockObject)
{
this.outputAdapter.Cleanup();this.Stopped();
}
}
catch (Exception ex)
{
//log the failure}   
}

That’s all there is to it. I did remove the logging code that produced the tables above for clarity but you can download the revised Simple StreamInsight App with the fixed Sql Output adapter and scripts for the databases from my SkyDrive here.