About the author

J Sawyer is a developer based in Houston, TX who absolutely loves to write code. After spending 9 years at Microsoft, he moved on to other things and is currently the Lead Developer for the RealTime Data Management team at Logica US. He spends his days building Really Cool Things around StreamInsight and having a blast doing it.

He has been involved with HDNUG, one of the oldest and largest .NET-focused user groups in the US, since its inception in 2001 and has watched it grow from 5-10 technologists meeting around a conference table to a thriving community of over 5000 with regular meeting attendance averaging 100 attendees. He currently serves as the Vice President. You can join him at HDNUG on the second Thursday of every month at the Houston Microsoft office.

He also loves to ride his Yamaha FZ1. And sometimes his Ninja 650. And also his Honday XR-400 dirt bike. But he doesn't code and ride at the same time. That would be bad.

Calendar

<<  May 2012  >>
MoTuWeThFrSaSu
30123456
78910111213
14151617181920
21222324252627
28293031123
45678910

View posts in large calendar

Listing installed StreamInsight instances …

April 16, 2012 10:13 PM

This is one of those things that I’ve always had some manner of difficulty with as I move between computers writing StreamInsight code. You see, I have … let’s see … four different computers that I write code on and I will move projects between them. (Yes, I do have an instance of TFS running at home for some personal stuff, which certainly makes things easier as I hop between the computers.) The problem, of course, boils down to remembering what instances of StreamInsight are installed and what versions each instance is. Sure, I can go to Add/Remove Programs and do the maintenance setup for StreamInsight to see what instances are there … but that’s kinda painful. So, after some digging, I discovered that all of the instances are listed in the registry. Smile 

So … as any developer would do, I fired up Visual Studio and threw together a little app that would list all of the installed instances of StreamInsight, what edition they are, what version and what architecture (x86 or x64). It was really quite simple, actually. Now … I’m not going to say that the UI is the prettiest thing in the world (it’s not) but it does work and it does what it needs to do. Below is a screen shot:

Instance Lister

You can download the project from my SkyDrive. All you need is Visual Studio to build … there are no dependencies on StreamInsight. It’s all read from the registry.

Tags:

StreamInsight | Code Sample

Where is my PID for StreamInsight????

April 13, 2012 6:05 PM

This is one of the things that I’ve found a little frustrating when installing StreamInsight … entering the PID (product ID). You see, I have an MSDN subscription (every developer should!) and, therefore, I have versions of all of the Sql Server editions … including Sql Server Developer Edition, which includes StreamInsight Premium (though for development purposes only … it’s not allowed to put it in production). On some products on MSDN, you can request the product key and it’s right there on your subscription page to copy and paste/retype … but not for any of the Sql Server DVDs! The MSDN Sql Server version are “pre-PID’d” … the product id is already in them. While this makes it easier to install the full Sql Server engine, when you need to enter the product key for StreamInsight … then you have to go digging to find it. Since I’m a developer and, therefore, lazy, I put this off and usually just choose the evaluation edition … only to get kerfluffled when the eval expires.

So … for your reference (and mine) … here’s where you can find your PID:

Open the Sql Server DVD in Windows Explorer.

Navigate to the folder for your favorite architecture (x86 or x64). It doesn’t matter which one … both PIDs are the same.

Find a file called "DefaultSetup.ini”.  Open this in notepad.

Right there at the top is your PID. Copy and paste into the StreamInsight setup.

Tags:

StreamInsight

Awarded MVP …

April 4, 2012 7:28 AM

I am honored and not a little happy to have been recognized by Microsoft as a Most Valuable Professional (MVP) for SQL Server in recognition of my involvement on the forums, this blog and my expertise in StreamInsight. I got the official notification on Sunday … April Fool’s Day. But it was no joke. I am very much looking forward to continuing what I’ve been doing as well as participating in the MVP community in the coming year.

I’d like to thank Torsten Grabs, Roman Schindlauer, Rafael Moctezuma and the rest of the StreamInsight team as well as Zain Naboulsi for their support with my nomination and award.

 MVP Logo

Tags:

Query Logic and Operator Reuse in StreamInsight (DQC)

April 3, 2012 6:25 AM

Way back in 2010, Mark Simms posted a blog entry called StreamInsight: Understanding dynamic query composition where he detailed how you can use dynamic query composition (DQC) to prevent multiple instantiation of your adapters. And yes, it is important … very important … for this reason but there is a more to the story here that Mark didn’t cover.

Let’s start, however, with a set of requirements for our StreamInsight queries.

First, we need to have a data source. For our example, we’ll be using a random number generator but that’s really irrelevant. But the data source gets it started. Next, we need to calculate a 10-second rolling average every 2 seconds. This is easy enough … that’s a hopping window. The last step is to take the results of our hopping window aggregate and calculate the difference between the averages with every hop … providing a rate-of-change for the calculated average. Again, this is a pretty common, well-known pattern that’s in the StreamInsight samples for LinqPad called “FoldPairs” (Alter/Clip/Shift). Each one of these steps needs to be a query since we need to send this to an output adapter.

So let’s get started. We’ve read Mark’s article and take it to heart so we’ll use DQC with the first, source query to make sure that there is only one instance of the input adapter.

var initialStream = CepStream<GeneratedEvent>.Create(cepApplication,
                                     "generatedStream", typeof(GeneratorFactory),
                                     config, EventShape.Point);


Query initialQuery = CreateAndStartQuery<GeneratedEvent>(
    initialStream, "initialQuery", "Initial query from generator",
    EventShape.Point, csvConfig, cepApplication);

//Taking this query and converting to a stream allows us to build on it further. 
//This is called Dynamic Query Composition (DQC)
var sourceStream = initialQuery.ToStream<GeneratedEvent>("sourceStream");

Our next step is to create the hopping window for our aggregate, using sourceStream as the basis.

var hoppingWindowStream = from s in sourceStream.AlterEventDuration(e=> TimeSpan.FromTicks(1))
                          group s by s.DeviceId into aggregateGroup
                          from item in aggregateGroup.HoppingWindow(
                            TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(2),
                            HoppingWindowOutputPolicy.PointAlignToWindowEnd)
                          select new AggregateItem
                          {
                              DeviceId = aggregateGroup.Key,
                              Average = item.Avg(e => e.Value),
                              Count = item.Count(),
                              Sum = item.Sum(e => e.Value)
                          };


//Create the query and send to output adapter.
Query hoppingWindowQuery = CreateAndStartQuery<AggregateItem>(hoppingWindowStream,
    "hoppingWindowAggregate", "Hopping aggregate query from generator",
    EventShape.Point, csvConfig, cepApplication);

Now we need to calculate the rate-of-change for the hopping aggregate. Since we’ve already got the hoppingWindowStream, we can just use that.

var aggregateDifferenceSourceStream = hoppingWindowStream;

var aggregateDifference = from current in aggregateDifferenceSourceStream
                          from previous in aggregateDifferenceSourceStream
                              .AlterEventDuration(e => TimeSpan.MaxValue)
                              .ClipEventDuration(aggregateDifferenceSourceStream,
                                                 (e1, e2) => e1.DeviceId == e2.DeviceId)
                              .ShiftEventTime(e => TimeSpan.FromTicks(1))
                          where current.DeviceId == previous.DeviceId
                          select new ItemDifference()
                                     {
                                         CurrentValue = current.Average,
                                         DeviceId = current.DeviceId,
                                         PreviousValue = previous.Average,
                                         Difference = current.Average - previous.Average
                                     };

Query aggregateDifferenceQuery = CreateAndStartQuery(aggregateDifference, "AggregateDifference",
                                                     "Difference between two aggregate values",
                                                     EventShape.Point,
                                                     csvConfig, 
                                                     cepApplication); 

When we run this, we get the results that we expect. Pretty straightforward, right? Well … not exactly. Let’s take a look at the queries in the event flow debugger. First, we see the hopping window stream and the grouping and aggregating operators. It’s pretty simple and straightforward.

Hopping WIndow

Now, let’s take a look at the query for the rate-of-change in the aggregates. It’s a bit longer and, if you look at the area that’s highlighted in red, it has the exact same operators that we see in the aggregate query.

diffFromAvg - No DQC

So … the operators that did the group and aggregate are actually repeated for both queries! What’s going on here? What you need to understand is that the operator tree for a StreamInsight query isn’t built until you call ToQuery() … and the entire operator tree is built! If your tree goes all the way back to the source, you’ll get two instances of the same input adapter, as Mark described in his blog. But you’ll also get any other operators repeated. In our little sample here, it’s really not a big deal but in a large application, this can lead to some pretty substantial overhead. Using DQC, you can reduce this overhead and reuse the results of operators without having them rebuilt in the entire tree. When you get to a result that then reused with different streams, you can create a query from it, convert the query back to a stream with ToStream() and then write your additional Linq expressions. You do not need to have an output adapter with every query either … that is actually optional.  Here’s how the code for the aggregate difference query would like using DQC:

var aggregateDifferenceSourceStream = hoppingWindowQuery.ToStream<AggregateItem>();

var aggregateDifference = from current in aggregateDifferenceSourceStream
                          from previous in aggregateDifferenceSourceStream
                              .AlterEventDuration(e => TimeSpan.MaxValue)
                              .ClipEventDuration(aggregateDifferenceSourceStream,
                                                 (e1, e2) => e1.DeviceId == e2.DeviceId)
                              .ShiftEventTime(e => TimeSpan.FromTicks(1))
                          where current.DeviceId == previous.DeviceId
                          select new ItemDifference()
                                     {
                                         CurrentValue = current.Average,
                                         DeviceId = current.DeviceId,
                                         PreviousValue = previous.Average,
                                         Difference = current.Average - previous.Average
                                     };

Query aggregateDifferenceQuery = CreateAndStartQuery(aggregateDifference, "AggregateDifference",
                                                     "Difference between two aggregate values",
                                                     EventShape.Point,
                                                     csvConfig, 

Note that the only difference here (in bold) is that we use convert the hoppingWindowQuery back into a stream and use that for our source. The difference, however, in the tree of query operators is telling though. I’ve outlined in red where the aggregate query source now is imported from the published query with the results of the aggregates.

diffFromAvg - DQC

The operators that are unique to this query – those that calculate the rate of change in the rolling average – are still here. The operators that are already necessary for a previous query, however, are not. This is a very simple example, to be sure, and any benefits probably wouldn’t be noticeable except at a very high volume. However, when you have more complex queries (as you would in the real world), the difference can be huge.

There is, however, one pretty big limitation that you need to be aware of when using DQC … you can’t use checkpoints (high availability) when your source is from a published query (DQC). So if you need this, then you need to very carefully plan how and where you use DQC, how you get your source data and how you compose your queries.

Tags:

StreamInsight | Code Sample

Bug in StreamInsight 1.2 …

March 9, 2012 5:11 PM

This came up recently on the forums and I’ve been meaning to blog about it and, finally, I’m doing it.

Before I get going let me just say the following things:

  1. There are a couple of ways to work around the bug.
  2. It is fixed in StreamInsight 2.0. It does not apply to StreamInsight 1.1. Only 1.2.
  3. The scenario that triggers the bug is, I feel, a pretty narrow and uncommon one.

Symptoms

Since this happens in the Simple StreamInsight App that I posted on MSDN Samples, that’s what we’ll use. In certain cases, a StreamInsight query will crash on the initial event. You will be able to stop and restart the (now aborted) query from the Query Debugger or any custom tools that use the StreamInsight API to restart queries and it will run fine. This only happens on the initial events. The query debugger shows that the exception for the aborted query is:

Microsoft.ComplexEventProcessing.Engine.OperatorExecutionException: An exception happened when operator 'Grouping.1.1.Aggregate.1.1' was processing event, check inner exception for more details. ---> System.ArgumentOutOfRangeException: The added or subtracted value results in an un-representable DateTime.
Parameter name: value
   at System.DateTime.AddTicks(Int64 value)
   at Microsoft.ComplexEventProcessing.Engine.DateTimeExtensions.AddNoOverflow(DateTime dateTime, TimeSpan timeSpan)
   at Microsoft.ComplexEventProcessing.Engine.DateTimeExtensions.SubtractThrowIfOverflow(DateTime dateTime, TimeSpan timeSpan)
   at Microsoft.ComplexEventProcessing.Engine.SynopsisManager.SingleHoppingWindowManager.PreviousWindowVs(DateTime timestamp)
   at Microsoft.ComplexEventProcessing.Engine.SynopsisManager.SingleHoppingWindowManager.OverrideCtiPushBackward(DateTime ctiTimestamp)
   at Microsoft.ComplexEventProcessing.Engine.ExecutionOperatorWindowBasedOrderPreservingMinus.ProcessSmartCti(EventReference& eventReference, Int64 stimulusTicks)
   at Microsoft.ComplexEventProcessing.Engine.ExecutionOperatorStateful.DoProcessEvent(EventReference& eventReference, Int64 stimulusTicks)
   at Microsoft.ComplexEventProcessing.Engine.QueryExecutionOperator.ProcessEvent(Int32 streamNo, EventReference& eventReference, Int64 stimulusTicks, Int64 enqueueSequenceNumber)
   --- End of inner exception stack trace ---
   at Microsoft.ComplexEventProcessing.Diagnostics.Exceptions.Throw(Exception exception)
   at Microsoft.ComplexEventProcessing.Engine.QueryExecutionOperator.ProcessEvent(Int32 streamNo, EventReference& eventReference, Int64 stimulusTicks, Int64 enqueueSequenceNumber)
   at Microsoft.ComplexEventProcessing.Engine.ExecutionOperatorStateful.ProcessEvent(Int32 streamNo, EventReference& eventReference, Int64 stimulusTicks, Int64 enqueueSequenceNumber)
   at Microsoft.ComplexEventProcessing.Engine.QuerySegmentInputStrategy.DispatchEvents(SchedulingPolicy policy)
   at Microsoft.ComplexEventProcessing.Engine.SchedulingPolicy.DispatchEvents()
   at Microsoft.ComplexEventProcessing.Engine.DataflowTask.OnRun()
   at Microsoft.ComplexEventProcessing.StreamOS.Task.Run()
   at Microsoft.ComplexEventProcessing.StreamOS.Scheduler.Main()

The most interesting – and, for me – perplexing is the “The added or subtracted value results in an un-representable DateTime”. Huh? How is that happening? Since this happens right at start up, we don’t have the time to connect to the query and record the events as it starts so we need to use trace.cmd (see here, about halfway down) to set up the query trace when the application starts. Once we do that and open the trace for the query, we see that the only event in the query is a CTI with a time of negative infinity … or DateTime.MinValue.

image

Conditions

This will only happen when you are using IDeclareAdvanceTimeProperties in your adapter factory or AdvanceTimeSettings when you create the stream, and specify that CTIs are created by event count, not by timespan. The delay and the AdvanceTimePolicy don’t seem to make any difference. Next, the query needs to have a HoppingWindow and no other query operators that alter the event lifetime or duration. Tumbling and count by start time windows don’t have any problems.

Fixes/Workarounds

There are a few ways that you can work around this. Any one of the following methods will work.

  1. Specify that CTIs are created by timespan rather than event count.
  2. Add a temporal operator into the stream before the window. For example, adding AlterEventDuration(e => TimeSpan.FromSeconds(0)) will work.
    var hoppingWindowStream = from s in sourceStream
                .AlterEventDuration(e=>TimeSpan.FromSeconds(0))
            group s by s.DeviceId into aggregateGroup
            from item in aggregateGroup.HoppingWindow(
            TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(2),
            HoppingWindowOutputPolicy.ClipToWindowEnd)
            select new AggregateItem
            {
                DeviceId = aggregateGroup.Key,
                Average = item.Avg(e => e.Value),
                Count = item.Count(),
                Sum = item.Sum(e => e.Value)
            };
  3. Enqueue a CTI from your adapter. This can have any date/time for the value, as long as it is greater than DateTimeOffset.MinValue + [WindowHopSize]. You can, for example, use new DateTimeOffset(1776, 7,4, 0,0,0,TimeSpan.FromTicks(0))

  4. Upgrade to StreamInsight 2.0 now that it’s released.

Tags:

StreamInsight