Ruminations of J.net idle rants and ramblings of a code monkey

Cool StreamInsight query–Point input to Edge Output

StreamInsight | Code Sample
It’s easy to get individual events as points in StreamInsight. We can then take those events and write them directly out to our output adapter. There’s nothing to this. But … and this is a common scenario … what do we do if the values don’t change very often? Do we really want to write an hours worth of data that hasn’t changed, especially if we are reading the value every second or even several times a second? Probably not. And in many cases, we don’t need to. In many cases, in fact, we’re only interested in saving the values when they have changed. Of course, this can be done in StreamInsight through a little bit of query majick (or I wouldn’t be writing this blog entry, now would I?). So … what we are going to do here is to take a point stream, evaluate it, select only the items that have changed since the previous value and then send these to output as edge events. Each edge will have a start and end when the value changes. Simple enough, right? As with the previous example, this was done using LinqPad. In this case, I borrowed (or stole, as the case may be) a Linq macro from one of the samples – the FoldPairs macro. I also borrowed/stole the ToSignal extension and turned that into a Linq macro from the StreamInsight blog. Both of these are going to be darn handy in this example. public static CepStream<TResult> FoldPairs<TStream, TResult>( CepStream<TStream> input, Expression<Func<TStream, TStream, bool>> predicate, TimeSpan timeout, Expression<Func<TStream, TStream, TResult>> resultSelector) { var signal = input .AlterEventDuration(e => timeout) .ClipEventDuration(input, (f, s) => predicate.Compile()(f, s)); return from l in signal.ShiftEventTime(e => TimeSpan.FromTicks(1)) from r in input where predicate.Compile()(l, r) select resultSelector.Compile()(l, r); } public static CepStream<T> ToSignal<T, K>(CepStream<T> inputstream, Expression<Func<T, K>> keySelector) { return inputstream .AlterEventDuration(e => TimeSpan.MaxValue) .ClipEventDuration(inputstream, (e1, e2) => (keySelector.Compile()(e1)).Equals(keySelector.Compile()(e2))); } And now for our data. I’ve added comments to the data to show where we have value changes. We’ll take this and convert it to a point stream for evaluation. Func<int, int, DateTimeOffset> t = (h, m) => new DateTimeOffset(2011, 1, 25, 0, 0, 0, TimeSpan.Zero).AddHours(h).AddMinutes(m); var sourceData = new [] { //Initial event @ 4:12 new { SourceId = "A", Value = 22, TimeStamp = t(4, 12) }, new { SourceId = "A", Value = 22, TimeStamp = t(4, 13) }, new { SourceId = "A", Value = 22, TimeStamp = t(4, 14) }, //A: New event @ 4:15 new { SourceId = "A", Value = 67, TimeStamp = t(4, 15) }, //A: New event @ 4:16 new { SourceId = "A", Value = 54, TimeStamp = t(4, 16) }, new { SourceId = "A", Value = 54, TimeStamp = t(4, 17) }, new { SourceId = "A", Value = 54, TimeStamp = t(4, 18) }, new { SourceId = "A", Value = 54, TimeStamp = t(4, 19) }, new { SourceId = "A", Value = 54, TimeStamp = t(4, 20) }, new { SourceId = "A", Value = 54, TimeStamp = t(4, 21) }, //A: New event @ 4:22 new { SourceId = "A", Value = 87, TimeStamp = t(4, 22) }, //B: Initial Event @ 4:12 new { SourceId = "B", Value = 24, TimeStamp = t(4, 12) }, new { SourceId = "B", Value = 24, TimeStamp = t(4, 13) }, //B: New Event @ 4:14 new { SourceId = "B", Value = 31, TimeStamp = t(4, 14) }, new { SourceId = "B", Value = 31, TimeStamp = t(4, 15) }, new { SourceId = "B", Value = 31, TimeStamp = t(4, 16) }, new { SourceId = "B", Value = 31, TimeStamp = t(4, 17) }, new { SourceId = "B", Value = 31, TimeStamp = t(4, 18) }, //B: New Event @ 4:19 new { SourceId = "B", Value = 50, TimeStamp = t(4, 19) }, new { SourceId = "B", Value = 50, TimeStamp = t(4, 20) }, new { SourceId = "B", Value = 50, TimeStamp = t(4, 21) }, new { SourceId = "B", Value = 50, TimeStamp = t(4, 22) } }; var source = sourceData.OrderBy(e => e.TimeStamp).ToPointStream( Application, ev => PointEvent.CreateInsert(ev.TimeStamp.ToLocalTime(), new { ev.SourceId, ev.Value }), AdvanceTimeSettings.IncreasingStartTime); So far, pretty straightforward and nothing special. Next we need to calculate the change between two consecutive values. We do this using the borrowed FoldPairs macro. This will provide us with an anonymous type with our item identifier (SourceId) and the difference (delta) between two consecutive values. var delta = FoldPairs(source, (a, b) => a.SourceId == b.SourceId, TimeSpan.MaxValue, (a, b) => new { a.SourceId, diff = b.Value - a.Value }); Now that we have the deltas, it’s easy enough to join this back to the original source query, selecting only those source items where the delta is not equal to 0. var changesOnly = from r in delta join s in source on r.SourceId equals s.SourceId where r.diff != 0 select s; So far so good. If you run what we have so far and write this to the output, you’ll see that you only get those point events that change since the previous value. But … you’ll also see that we’re missing something – our very first, initial event. That’s because no delta is calculated for this event as it has no previous event. Well, if we are going to be a real application, we can’t have the first one disappearing on us all the time. So now we need to get the first event – the event that has nothing preceding it. To do this, we use the ToSignal macro and convert our initial source stream to a signal stream … they become an interval for each individual reading (whether changed or not). We then shift the event time to create an overlap between one event and the next one. Where we don’t have an overlap, we have the very first point event – a left anti semi-join. We can then take this an union it with the stream of changed events, providing a stream with the changed events AND our first event. var initialEvent = from s in source where ( from s2 in ToSignal(source, e=> e.SourceId) .ShiftEventTime(e => TimeSpan.FromTicks(1)) where s.SourceId == s2.SourceId select s2 ).IsEmpty() select s; var final = changesOnly.Union(initialEvent); This, in itself, is actually useful … you may not want to take the next step and convert them into edge events as this query will give you a point event for each value change. But … we’re not quite done with the scenario that we want to accomplish. To create edge events with a start and end time that represent value changes, we simply us the ToSignal() linq macro on our final point stream. If you want these as intervals, you’ll get, in effect, the same data except that you won’t “see” the interval until the end time. If they are edge events, you’ll get a start as soon as the value changes and the end before the next change. var signalEdges = ToSignal(final, e=> e.SourceId); And … it really is just that simple … it also helps that the Linq macros really reduce the amount of linq query statements that we have to write. You could, if you wanted, also add dead zones … where you take this up a notch and only produce an event when the value changes by a certain amount or percentage. But I’ll leave that as an exercise for the ready. Can’t take all the fun out of it, can I?  

Cool StreamInsight query–turning alarms into Edge events

Code Sample | StreamInsight
One of the use cases for StreamInsight is to detect certain alarm conditions on streaming real time data. This, in itself, isn’t all that special … many of the process control systems out there already do something like that. What makes StreamInsight different is that it can be a lot smarter about determining these alarm conditions. For example … one of the things that apparently happened (as I understand it) with Macondo (aka Deepwater Horizon) is that the PCS alarms went off constantly … they would trigger when 1 value was out of range every time it was out of range. So … there were so many false alerts that they simply turned the system off. It really isn’t all that unreasonable … many of these sensors will temporarily go out of range and it’s not indicative of a problem. In fact, it could just be a random bad reading or a transient condition that isn’t really a cause for alarm at all. However, if you start to have multiple values from the same sensor/device out of range within a particular time period, then you may really have an issue. You also don’t want to issue an alarm with every bad reading … but, instead, issue one at start and at finish. Because, with an alarm, you have a definite start of the event … but at the start, you have no idea when it will end. You also have a definite end to the event … but you know that only when things come back to normal. It’s a perfect fit for an edge event. Part of this can be found in the StreamInsight samples for LinqPad – a most righteously awesome tool that every StreamInsight query developer should have and use. It’s in the “Alarm Floods and Transients” sample under 101 StreamInsight Queries, section 2_QueryPatterns. I will start with that and describe the queries step-by-step. First … our scenario. We have a data stream with events that have a “Status” field. If this field is a “0”, it’s good, if “1” it’s bad. Now … in the real world, you’d get to this value somehow … through previous queries that do analytics or perhaps even from your data source. For our purposes, that is irrelevant. We’re interested in what we do with it. Now, in our case, we can actually get false alerts (of course) so we want to trigger an alarm only when we get multiple alerts within a specific time frame. We then want the alarm expressed as an edge event. Finally, if we have an alarm that crosses a specific amount of time, we want to repeat the alarm. We’ll have several steps to do this. First, here’s our source data; I added a little to what was there. (Notes: These are all set up to run in LinqPad. Also, we don’t differentiate the events by any sort of ID … you’ll need to do this in a real app). var sourceData = new [] { new { Status = 0, TimeStamp = DateTime.Parse("10/23/2009 4:10:00 PM") }, new { Status = 0, TimeStamp = DateTime.Parse("10/23/2009 4:11:00 PM") }, //False alert @ 4:12 new { Status = 1, TimeStamp = DateTime.Parse("10/23/2009 4:12:00 PM") }, new { Status = 0, TimeStamp = DateTime.Parse("10/23/2009 4:13:00 PM") }, new { Status = 0, TimeStamp = DateTime.Parse("10/23/2009 4:14:00 PM") }, //Real alert @ 4:15 new { Status = 1, TimeStamp = DateTime.Parse("10/23/2009 4:15:00 PM") }, new { Status = 1, TimeStamp = DateTime.Parse("10/23/2009 4:16:00 PM") }, new { Status = 1, TimeStamp = DateTime.Parse("10/23/2009 4:18:00 PM") }, new { Status = 0, TimeStamp = DateTime.Parse("10/23/2009 4:19:00 PM") }, new { Status = 0, TimeStamp = DateTime.Parse("10/23/2009 4:20:00 PM") }, //Real alert @ 4:21 - Longer alert that repeats new { Status = 1, TimeStamp = DateTime.Parse("10/23/2009 4:21:00 PM") }, new { Status = 1, TimeStamp = DateTime.Parse("10/23/2009 4:22:00 PM") }, new { Status = 1, TimeStamp = DateTime.Parse("10/23/2009 4:23:00 PM") }, new { Status = 1, TimeStamp = DateTime.Parse("10/23/2009 4:24:00 PM") }, new { Status = 1, TimeStamp = DateTime.Parse("10/23/2009 4:25:00 PM") }, new { Status = 1, TimeStamp = DateTime.Parse("10/23/2009 4:26:00 PM") }, new { Status = 1, TimeStamp = DateTime.Parse("10/23/2009 4:27:00 PM") }, new { Status = 0, TimeStamp = DateTime.Parse("10/23/2009 4:28:00 PM") }, new { Status = 0, TimeStamp = DateTime.Parse("10/23/2009 4:29:00 PM") }, //False alert @ 4:30 new { Status = 1, TimeStamp = DateTime.Parse("10/23/2009 4:30:00 PM") }, new { Status = 0, TimeStamp = DateTime.Parse("10/23/2009 4:31:00 PM") }, new { Status = 0, TimeStamp = DateTime.Parse("10/23/2009 4:32:00 PM") }, new { Status = 0, TimeStamp = DateTime.Parse("10/23/2009 4:33:00 PM") }, }; var source = sourceData.ToPointStream(Application, ev => PointEvent.CreateInsert(ev.TimeStamp.ToLocalTime(), ev), AdvanceTimeSettings.StrictlyIncreasingStartTime); var timeout = TimeSpan.FromMinutes(2); var alarmTimeout = TimeSpan.FromMinutes(5); Now that we have our source data and variables, let’s create two streams, one with valid items and one with all alerts: var validData = source.Where(e => e.Status == 0); var alarmEvents = source.Where(e => e.Status == 1); Next, we need to remove the false alerts … alerts that aren’t followed by another alert. We do this by taking the valid items, moving the start time back by the timeout and then extending the event duration by the timeout. If there is a successful join then there is a “good” event within the timeout. In that case, we filter the alarm out using a Left Anti-Semi Join. // take all alarm events that are not followed by a non-alarm event // within the timeout var nonTransientAlarms = from alarm in alarmEvents where (from nextevent in source .AlterEventLifetime( e => e.StartTime.Subtract(timeout), e => timeout) where nextevent.Status == 0 select nextevent).IsEmpty() select alarm; //Show the Non-transient alarms (from p in nonTransientAlarms.ToIntervalEnumerable() where p.EventKind == EventKind.Insert select p).Dump("Non-transient alarms"); The output shows all of the alarm events that do not have a “good” event within the timeout. You’ll notice that there is a flood of events … we’ll need to filter this out so that we have the initial alarm event. // Expand all alarm events to the timeout and count over snapshots var counts = from win in nonTransientAlarms .Where(e => e.Status == 1) .AlterEventDuration(e => timeout2) .SnapshotWindow(SnapshotWindowOutputPolicy.Clip) select new { count = win.Count() }; // Those snapshots with a count of 1 belong to the initial alarms. // reduce to points and join with original stream. var initialAlarm = from c in counts .Where(e => e.count == 1) .ToPointEventStream() from e in source select e; So far, we’ve not done much that’s not in the original sample. Now we go off that path and get these changed into Edges that represent the start and end of the actual alarm. We start by turning the valid data into a signal. We will also extend our initial alarms to the alarm timeout. From there, doing a LASJ again with the valid data signal will provide us with edge events that start and end with the alarm. Because we have 1 alarm that extends past the alarm timeout, this will generate 2 edge events. Together, these edge events will cover the entire alarm timeframe: var validDataSignal = from v in validData .AlterEventDuration(e=>TimeSpan.MaxValue) .ClipEventDuration(initialAlarm,(e1, e2) => true) select v; //Since with have the initial alarms, how long does the alarm last? var alarmTimeline = from i in initialAlarm.AlterEventDuration(e=> alarmTimeout) where ( from v in validDataSignal select v).IsEmpty() select i; (from p in alarmTimeline.ToEdgeEnumerable() where p.EventKind == EventKind.Insert select p).Dump("Alarm timeframe"); And that should do it …

TFS Work Item Batch Updater

Code Sample | TFS | TFS 2010
TFS is a great tool – don’t get me wrong – but there are some things conspicuously missing from TFS out-of-the-box and the TFS Power Tools. One of these is a way to update work items published to a large number of projects. Certainly, you can use the witadmin command-line tool to import work items or, if the command-line frightens you, you can use the Work Item Import tool in the TFS Power Tools. While both of these work fine, they only import a single work item into a single project. And here’s where TFS fails the TFS administrator that is responsible for an implementation with many projects – you could spend all day importing a single work item across 100 projects. And if you have to import multiple work items? Well, there goes your vacation. So … let’s break this down. You have a highly repetitive task and differs by only a couple of variables that you want to do over and over and over again. Sounds like a good application of our old friend, the looping construct, right? In fact, it certainly is. Let’s start with what we need to do to import a single work item into a single project: private void ImportWorkItem(string workItemFilePath, Microsoft.TeamFoundation.WorkItemTracking.Client.Project project) { //Need to get the raw XML for the work item. var fileInfo = new System.IO.FileInfo(workItemFilePath); string workItemXml = String.Empty; using (var reader = fileInfo.OpenText()) { workItemXml = reader.ReadToEnd(); reader.Close(); } //Import ... project.WorkItemTypes.Import(workItemXml); } Really … it is that simple. The fun part comes when you want to take this simple concept and wrap it up in a nice pretty bow that allows a TFS Admin to update multiple work items across multiple projects. It’s not hard … except for the fact that I am UI-challenged and anything that is functional and looks somewhat usable is not my strong suit. But … I did manage to get it done. Here it is: Running the tool is pretty straightforward. Start the EXE. It will then prompt you to select a Team Project Collection, which gets you to this screen. Use “Browse…” to select the work item files that you want to import. These should be in the same format as you would use in the project template. Then you select the projects that you want them imported into. Hit OK and away it goes. It will take a few minutes, so go get yourself some coffee. Or Mountain Dew. Then relax and surf the web a bit … after all, you did tell the boss that this was a tedious, time-consuming process right? You can download this tool on MSDN Code Samples.

Baton Rouge Sql Saturday Content

Code Sample | Community | Events | StreamInsight
I’ve just posted this to the SQL Saturday web site. All of the content that was presented is there, including the PowerPoint and the code. There’s quite a bit of stuff in the code that we simply didn’t have time to show. There are two text files in there with several different query patterns that you can copy and paste into place. This sample does require StreamInsight 1.2, so make sure that you have at least the evaluation version installed.

StreamInsight 1.2: Extension Method for Perf Counters

StreamInsight | Code Sample
One of the new features in StreamInsight 1.2 is performance counters. These give you a very robust and easy way to test and monitor performance of your StreamInsight application and were much needed. The counters are enabled at the StreamInsight Server and Application level all the time but you need to specify and enable the individual queries that you want monitored as well. Enabling a query for performance counters will also hook up both the input and the output adapters, which help you understand if you have bottlenecks or poor performance in them as well – though there is a performance hit (though relatively minimal) for this. Very cool stuff. But … like the query state … it’s a bit warty (IMHO) in some of the details. Fortunately, it’s nothing that we can’t make at least a little better through the beauty that is extension methods. I’ll go into more detail on the perf counters in a future post but I wanted to share this little piece of code to help y’all get to using these a bit quicker. /// <summary> /// Enables and disables performance counters for a query. /// </summary> /// <param name="query"></param> /// <param name="enabled">true to enable, false to disable.</param> public static void SetPerformanceCounters(this Query query, bool enabled) { Uri queryUri = query.Name; DiagnosticSettings settings = query.Application.Server.GetDiagnosticSettings( queryUri); if (enabled) { settings.Aspects |= DiagnosticAspect.PerformanceCounters; } else { settings.Aspects &= DiagnosticAspect.PerformanceCounters; } query.Application.Server.SetDiagnosticSettings(queryUri, settings); }

StreamInsight : What edition am I running?

Code Sample | StreamInsight
It is sometimes helpful to know what edition and/or version of StreamInsight you are using. This can be especially helpful as part of your application startup, where you can log this information for future/current troubleshooting. Unfortunately, there isn’t a good way to get that information from the StreamInsight API. Sure, you can use one of the Diagnostic Views – the scheduler diagnostic view – that will tell you how many schedulers are running, which will give you an idea of which edition you are running but it doesn’t provide things like the processor architecture installed (x86 vs. x64) nor will it tell you the exact edition of the instance. Oh … and that diagnostic view isn’t there in version 1.2. You’ll just know whether you are running Standard (1 scheduler) or one of Developer, DataCenter or Evaluation (1 scheduler / core). So here’s a little code snippet that will give you some more information: /// <summary> /// Gets information about the current StreamInsight service edition /// </summary> /// <param name="instanceName">Name of the StreamInsight instance.</param> /// <returns>String with edition information</returns> [System.Security.Permissions.RegistryPermission( SecurityAction.Demand, Read="HKLM\\SOFTWARE\\Microsoft\\Microsoft StreamInsight")] public string GetStreamInsightEditionInformation(string instanceName) { try { var streamInsightRegistry = Microsoft.Win32.Registry.LocalMachine.OpenSubKey( @"SOFTWARE\Microsoft\Microsoft StreamInsight", false ); var instanceKeyName = "MSSI." + instanceName; var instanceRegistryKey = streamInsightRegistry.OpenSubKey( instanceKeyName, false ); StringBuilder sb = new StringBuilder(); sb.Append( "StreamInsight Version:" ); sb.AppendLine( instanceRegistryKey.GetValue( "Version" ).ToString() ); sb.Append( "Edition:" ); sb.AppendLine( instanceRegistryKey.GetValue( "Edition" ).ToString() ); sb.Append( "Platform:" ); sb.AppendLine( instanceRegistryKey.GetValue( "PlatformId" ).ToString() ); return sb.ToString(); } catch { return "Could not get StreamInsight information"; } } Some caveats: this will only work on the same machine where StreamInsight is running. So if you are connecting to StreamInsight remotely using the Management Service (for example, to a remote instance of the StreamInsight service), this won’t work for you. Also, it tells you the version of StreamInsight that the instance was installed with – which may not necessarily be the instance that you are running!!! For example, you can have a 1.0 instance, a 1.1 instance and a 1.2 instance. If you use Server.Create(“1.0 Instance”), you will actually be running StreamInsight 1.2! Fortunately, the Server object does have a Version property that will tell you which version you are running.

StreamInsight Dynamic Filter Query Patterns

StreamInsight | Code Sample
A little while ago, I wrote a post about doing AdHoc Query Filters in StreamInsight. In the sample code, I accomplished the filter by using a user-defined function. This caused quite a bit of ugliness that I didn’t like but it did work. Since then, I’ve reworked and extended the filtering and I wanted to revisit it here. I’ve moved all of the query logic out of the user-defined function and into the StreamInsight engine, bypassing things like Split() and Join() as well as improving the overall performance characteristics. The requirements are a bit expanded. While we still have a base DataItem, we’ve added another field to the key – AdapterId. So a data item’s uniqueness is defined by the combination of the ItemId and AdapterId. DataItem now looks like the following: /// <summary> /// Base class for all query data points /// </summary> public class DataItem { /// <summary> /// ID of the source item /// </summary> public string ItemId { get; set; } /// <summary> /// Identifier for the source adapter. /// </summary> public string AdapterId { get; set; } /// <summary> /// Time item was received/caclulated /// </summary> public DateTime SourceTimestamp; } This is the base data type for all of our query data types. When we do dynamic filters, we want to be able to do include the AdapterId in the filter query. If present, then it should match the item’s AdapterId exactly. If missing, all AdapterId’s should match. A single filter could also have a number of groups included … some items from Adapter1, some items from Adapter2 and some from all adapters, which adds a ton of flexibility but also some complexity. The filters are defined by a FilterDefinition class, which has a property for the filter operation (equals, starts with, ends with, contains) and then an array of ItemKey classes. Each ItemKey class has the FilterValue (for the item id), the AdapterId (which can be null) and then a GroupId. The group id is really only needed for the Contains query. You’ll see why in a bit. Our filter definition classes look like the following: public enum FilterOperation { Equals, StartsWith, EndsWith, Contains } /// <summary> /// Defines a filter /// </summary> public class FilterDefinition { /// <summary> /// Operation to perform /// </summary> public FilterOperation Operation { get; set; } /// <summary> /// List of item keys. /// </summary> public ItemKey[] ItemKeys { get; set; } } public class ItemKey { /// <summary> /// Source adapter for the item /// </summary> public string AdapterId { get; set; } /// <summary> /// ID for the item. /// </summary> public string FilterValue { get; set; } /// <summary> /// Gets or sets the group id. /// </summary> /// <value>The group id.</value> public int GroupId { get; set; } } So far, pretty straightforward. Doing the code for the equals, starts with and ends with operations is straightforward and easy using an equijoin. The starts with version is below. Equals and ends with look the pretty much the same. private static CepStream<T> EqualsFilter<T>(CepStream<T> source, CepStream<ItemKey> filterStream) where T:DataItem { var filtered = from s in source from f in filterStream where s.ItemId.StartsWith(f.FilterValue) where String.IsNullOrWhiteSpace(f.Adapter) || f.Adapter == s.AdapterId select s; return filtered; } Implementing contains, however, was a bit more challenging. Since Equals, Starts With and Ends With were all logical “OR” filters, doing a simple equi-join would work. With the Contains operation, however, it should be an “AND” operation … all of the FilterValues in a particular group should match, not just one. Because of that, an equi-join, which would return every item that matched at least one of the filter values, doesn’t work. This query is a bit more complex and takes several steps but it’s still very do-able. There are a couple of ways to do it but I’ll take the way that I’m showing in the demo step by step. Filter the source query to find all candidate matches. Note that these won’t be our final matches; it will return all items that match any 1 of the filter values. Since they need to match all, we have more work to do. Group the filter stream to determine how many matches each group should have. You will need to make sure that the FilterValues are distinct within a specific group for this to work properly. In a real-world application, this would come from the (most likely) input adapter and that will have to ensure the uniqueness. In the demo, I’m using Array.ToPointStream() … so I can use the Linq Distinct() to ensure this. Filter the initial candidate matches to pull out only the items that match all of the filter values in a specific grouping. //Get all initial candidate matches. var initialMatches = from s in source from v in filterStream where s.ItemId.Contains(v.FilterValue) where s.AdapterId == v.AdapterId || String.IsNullOrWhiteSpace(v.AdapterId) select new { MatchedValue = v.FilterValue, GroupId = v.GroupId, ItemId = s.ItemId, AdapterId = s.AdapterId }; //Get the required number of matches per group. var requiredMatchCounts = from v in filterStream group v by v.GroupId into adapterGroup from a in adapterGroup.SnapshotWindow(SnapshotWindowOutputPolicy.Clip) select new { GroupId = adapterGroup.Key, CountRequired = a.Count() }; //Count the number of matches. var countMatches = from i in initialMatches group i by new { i.ItemId, i.GroupId, i.AdapterId } into c from v in c.SnapshotWindow(SnapshotWindowOutputPolicy.Clip) select new { AdapterId = c.Key.AdapterId, GroupId = c.Key.GroupId, ItemId = c.Key.ItemId, Count = v.Count() }; var matched = from c in countMatches from r in requiredMatchCounts where c.GroupId == r.GroupId where c.Count == r.CountRequired //Select select new { Adapter = c.AdapterId, ItemId = c.ItemId }; var final = from g in matched from s in source where g.Adapter == s.AdapterId where g.ItemId == s.ItemId select s; Finally, the implementation uses an extension method on the standard StreamInsight Query object. Since this is designed specifically to utilized dynamic query composition (DQC), this makes it very simple and straightforward to use. Query sourceQuery = cepApplication.Queries["SourceQuery"]; CepStream<ValueDataItem<float>> filteredSourceStream = sourceQuery.Filter<ValueDataItem<float>>(filterDefinition); Query filteredSourceQuery = filteredSourceStream.ToQuery(cepApplication, "FilteredSourceQuery", "Filtered Source Query", typeof (TracerFactory), GetTracerConfig("Source"), EventShape.Point, StreamEventOrder.FullyOrdered);   You can download the sample project from MSDN Code Samples.

Simple StreamInsight App

StreamInsight | Code Sample
In a previous post, I talked about how to change one of the StreamInsight sample apps to run using a remote StreamInsight server. After a discussion with the person that asked the original question, I’ve put together a full sample app that can run either in process or connect to a remote server. Most of the code is completely identical … input adapters, output adapters, queries … as that part of the programming model is the same. There are, however, some differences between what you need to do for this kind of app and what you typically see in the samples. I’ve tried to include most, if not all, of the server/application bells and whistles to make it easy to play with. Let’s start with the common settings for the sample application. First, the streamInsightInstanceName should point to the active and installed instance. If starting out of process, this is used to build the url to connect to the management service. It probably would be better to have the full url there when you are running remote … but I didn’t do it. Next, you need to specify the streamInsightAppName, which determines the name of the application to create or connect to. Creating the Server – In Process First, set the runInProcess key in the app.config to true. If you want to use the metadata service that was introduced in 1.1, set useMetadata to true. When creating the server, you need to specify the metadata service at startup. If using the metadata overload, it cannot be null. A final note, in StreamInsight 1.2, a new feature called checkpointing is being introduced. This feature allows for higher availability of StreamInsight instances; certain queries can have their entire state written out to disk at some interval and allows for long-running queries to be reconstituted on startup (instead of losing all that information). Checkpointing relies on the metadata services. Server cepServer = AppSettings.Current.UseMetadata ? Server.Create(AppSettings.Current.StreamInsightInstanceName, GetMetadataService()) : Server.Create(AppSettings.Current.StreamInsightInstanceName) When running in process, you can also fire up the management service, which allows the Event Flow Debugger (and other tools) to connect to your StreamInsight instance. If a value is present in the managementServiceUrl settings key, this will be created. At this time, only the WSHttpBinding is supported. Note: you will need to run Visual Studio as admin -or- reserve whatever url you are going to use with WCF. See this blog post for details on that. I, personally, run VS as admin since it makes my life easier in this regard. ServiceHost managementServiceHost = new ServiceHost(cepServer.CreateManagementService()); //Adding the service host. //This allows remote clients to access this application, including the Query Debugger. managementServiceHost.AddServiceEndpoint( typeof(IManagementService), new WSHttpBinding(SecurityMode.Message), AppSettings.Current.ManagementServiceUrl); managementServiceHost.Open(); Creating the Server – Remote Since there are fewer knobs and switches for a remote instance, connecting to a running server is much simpler. With the sample app, it will connect to the standard url for the StreamInsight instance name specified in the config file. However, if you have the management service started on a custom in-process instance of StreamInsight, you will also be able to connect to that. By the way, tThe model used for the recently announced Azure SteamInsight service is the remote model. var endpointAddress = new System.ServiceModel.EndpointAddress( @"http://localhost/StreamInsight/" + AppSettings.Current.StreamInsightInstanceName); Server cepServer = Server.Connect(endpointAddress); Getting the Application In most of the demos/samples out there, not only is it running in process, it’s also running without metadata services. In those cases, you always need to create the application. However, you don’t always need to do this. If connecting to a remote instance or if you are using the metadata service, it is very possible that the application is already there. So we need to check before we grab the application object. private static Application GetApplication(Server cepServer) { if (!cepServer.Applications.ContainsKey(AppSettings.Current.StreamInsightAppName)) { Console.WriteLine("Creating new Cep Application"); return cepServer.CreateApplication(AppSettings.Current.StreamInsightAppName); } else { Console.WriteLine("Connecting to existing Cep Application"); return cepServer.Applications[AppSettings.Current.StreamInsightAppName]; } } Creating the Queries Creating the queries and attaching the adapters is no different whether you are running in process or out of process. However, we need to take into account that, like with the application object, the queries may already be there. With the remote instance, this is pretty clear. When using metadata, the queries will be created when you create the server. However, they will not be started – so you do need to start them. When starting a query, you need to check to see if it’s already running – you will get an exception if you try to start a query that is already started. There are 3 queries that are created and running using a technique called Dynamic Query Composition. This is actually pretty important to understand – but a topic for another post. One is the raw, source query, one is a hopping window aggregate and one is a snapshot window aggregate. It’s interesting to see the differences between the output of the hopping window aggregate and the snapshot window aggregate. The only input adapter used is the random data generator that is included in the StreamInsight Team Samples. Each query is attached to an Async CSV output adapter, again from the StreamInsight Team Samples (not required – you can have a standing, running query without an output adapter). I’ve uploaded the sample to MSDN Code Samples. You can download it here.

StreamInsight: Simple sample of adapters and Server.Connect

.NET Stuff | StreamInsight | Code Sample
This is in response to a question on the StreamInsight forum where a developer was asking for a sample that uses Server.Connect. Before I get into the details, notes: I used the PatternDetector sample from the StreamInsight Product Team Samples on CodePlex. Rather than write something from scratch, I just converted the existing project to VS 2010 and tweaked it. Add a reference to System.ServiceModel to the project. You need to make sure that you defined an instance for StreamInsight in the setup and installed the service. And start it. Starting it helps. I had to change the creds for the StreamInsight service to use my credentials instead of Network Service because I got a funky error (that I’ve never seen before) with the Event Flow Debugger. It may have something to do with the fact that I’m running on a domain controller (don’t say it … I know … but it’s just a dev machine and I need to AD for some Hyper-V VM’s that I run). I’d recommend trying to do it with Network Service but if it doesn’t work, you’ve been warned. The error is below: Security Support Provider Interface (SSPI) authentication failed. The server may not be running in an account with identity 'DEVBIKER\J Sawyer'. If the server is running in a service account (Network Service for example), specify the account's ServicePrincipalName as the identity in the EndpointAddress for the server. If the server is running in a user account, specify the account's UserPrincipalName as the identity in the EndpointAddress for the server. You need to make sure that you copy the relevant DLL’s to the folder for the StreamInsight service host folder. By default, this is C:\Program Files\Microsoft StreamInsight 1.1\Host. These can be found in the \bin folder for the PatternDetector project and are: StreamInsight.Samples.Adapters.SimpleTextFileReader StreamInsight.Samples.Adapters.SimpleTextFileWriter StreamInsight.Samples.UserExtensions.Afa Create a folder for the input and output folders. Actually, if you are running under your own account, this won’t be necessary. If you are running under Network Service, you will. And make sure that you give Network Service appropriate permissions. The User Defined Operator in the sample that I chose caused an issue when running remotely. This surprised me … usually things run pretty much unchanged. I didn’t have the time or energy to debug that so I just changed it so that it was no longer necessary. So … the changes. In void Main, I changed the startup code to: if (inProc) using (Server server = Server.Create(streamInsightInstanceName)) { RunApp(server); } else { using (Server server = Server.Connect(new System.ServiceModel.EndpointAddress("http://localhost/StreamInsight/" + streamInsightInstanceName))) { RunApp(server); } }   As you can see, I refactored everything in the using block to a new method called “RunApp”. This made it cleaner (I thought). I suppose that I could have done it another way, but this just seemed right. In RunApp, I changed to code to create the application to check for the existence of the application and, based on that, get or create the application object. This is below: // Create application in the server. The application will serve // as a container for actual CEP objects and queries. Console.WriteLine("Creating CEP Application"); Application application; if (!server.Applications.ContainsKey(appName)) { application = server.CreateApplication(appName); } else { application = server.Applications[appName]; } And that is (typically) all that you need to do to change a StreamInsight app to run either in proc or remote. As I mentioned before, I did have to change the query and remove the reference to the UDO … which was very strange because I’ve never had issues with extensions before. I’m guessing that it had something to do with the implementation of the UDO. You can download it below Program.cs (the only changed file) for the Pattern Detector sample below:  

StreamInsight User Defined Aggregate: Standard Deviation

.NET Stuff | StreamInsight | Code Sample
Here’s a quick UDA to do standard deviation of a window. I found it interesting that I had to take the IEnumerable<double> source and call ToArray(). If I didn’t, it would throw a NullReferenceException, although why is something of a mystery. It would be nice if I could pass in the Average from the query since that’s already calculated by the StreamInsight engine but no dice. Note: I’ve not done any performance testing … it was copied from MSDN. Use at your own risk … /// <summary> /// Static class with UDA extensions for standard deviation /// </summary> public static class StandardDeviation { /// <summary> /// Extension method for the UDA. /// </summary> /// <typeparam name="T">Payload type of the stream.</typeparam> /// <param name="window">Window to be passed to the UDA</param> /// <param name="map">Mapping from the payload to a float field in the payload.</param> /// <returns>Aggregation result.</returns> [CepUserDefinedAggregate(typeof(StdDevDouble))] public static double StdDev<T>(this CepWindow<T> window, Expression<Func<T, double>> map) { throw CepUtility.DoNotCall(); } } /// <summary> /// A UDA to calculate the standard deviation of a window /// </summary> public class StdDevDouble : CepAggregate<double, double> { /// <summary> /// Computes the aggregation over a window. /// </summary> /// <param name="source">Set of events contained in the window.</param> /// <returns>Aggregation result.</returns> public override double GenerateOutput(IEnumerable<double> source) { double[] values = source.ToArray(); double mean = values.AsParallel().Average(); double standardDev = values.AsParallel().Aggregate( 0.0, // do this on each thread (subtotal, item) => subtotal + Math.Pow((item - mean), 2), // aggregate results after all threads are done. (total, thisThread) => total + thisThread, // perform standard deviation calc on the aggregated result. (finalSum) => Math.Sqrt((finalSum / (source.Count() - 1))) ); return standardDev; } } Using it in a query is simple – just make sure that you add a using statement for the namespace containing the aggregate’s definition. var stdDeviation = from e in sourceQueryStream group e by e.ItemId into eachGroup from window in eachGroup.HoppingWindow( TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(1), HoppingWindowOutputPolicy.ClipToWindowEnd) select new { ItemId = eachGroup.Key, StdDev = window.StdDev(e => e.Value), Avg = window.Avg(e => e.Value), Min = window.Avg(e => e.Value), Max = window.Avg(e => e.Value), Count = window.Count() };