Ruminations of J.net idle rants and ramblings of a code monkey

Baton Rouge Sql Saturday Content

Code Sample | Community | Events | StreamInsight
I’ve just posted this to the SQL Saturday web site. All of the content that was presented is there, including the PowerPoint and the code. There’s quite a bit of stuff in the code that we simply didn’t have time to show. There are two text files in there with several different query patterns that you can copy and paste into place. This sample does require StreamInsight 1.2, so make sure that you have at least the evaluation version installed.

StreamInsight 1.2: Extension Method for Perf Counters

StreamInsight | Code Sample
One of the new features in StreamInsight 1.2 is performance counters. These give you a very robust and easy way to test and monitor performance of your StreamInsight application and were much needed. The counters are enabled at the StreamInsight Server and Application level all the time but you need to specify and enable the individual queries that you want monitored as well. Enabling a query for performance counters will also hook up both the input and the output adapters, which help you understand if you have bottlenecks or poor performance in them as well – though there is a performance hit (though relatively minimal) for this. Very cool stuff. But … like the query state … it’s a bit warty (IMHO) in some of the details. Fortunately, it’s nothing that we can’t make at least a little better through the beauty that is extension methods. I’ll go into more detail on the perf counters in a future post but I wanted to share this little piece of code to help y’all get to using these a bit quicker. /// <summary> /// Enables and disables performance counters for a query. /// </summary> /// <param name="query"></param> /// <param name="enabled">true to enable, false to disable.</param> public static void SetPerformanceCounters(this Query query, bool enabled) { Uri queryUri = query.Name; DiagnosticSettings settings = query.Application.Server.GetDiagnosticSettings( queryUri); if (enabled) { settings.Aspects |= DiagnosticAspect.PerformanceCounters; } else { settings.Aspects &= DiagnosticAspect.PerformanceCounters; } query.Application.Server.SetDiagnosticSettings(queryUri, settings); }

StreamInsight Templates–New Build/Release

StreamInsight | Open Source
I just posted this last night on CodePlex. There is nothing new in there but there was some general cleaning up and some fixes to a couple of issues based on feedback. Some of the changes: Added System.ServiceModel to the console project. I could have sworn it was there before … Fix for the project versions. They now all (properly) show up as Fx 4.0, no client profile. In the catch statements, the exception types/variables are commented out. This prevents compiler warnings. Naming of variables in a couple of places to make a little more sense. All input adapters now have a ProductEvents() method. All output adapters now have a skeleton for ConsumeEvents() that also looks for the Stopping() state. Factory classes don’t return the new adapters immediately. Instead, a temp/return variable is set and then returned after the switch statement. This makes it easier to insert custom logic for creation vs. reuse of input adapters. Fix for names … some adapters didn’t have the proper file names. I am working on templates for extensibility points and have started the UDA item template but that wasn’t quite ready for this build. There will also be templates for UDO’s and UDSO’s. I think that covers it. As with the last release, please provide feedback.

Visual Studio Project and Item Templates for StreamInsight

I’ve just published – and created an initial release for – a set of Visual Studio project and item templates for StreamInsight. The release dropped yesterday is a beta primarily for feedback on the templates, particularly the naming conventions, comments and sample boilerplate code. The items are placed in a “StreamInsight” grouping in the Add New dialogues so they’re very easy to find. There are four project templates: A console application project template that creates a basic app based on the Simple StreamInsight Application that I posted about 2 months ago. A StreamInsight library project template that creates a blank DLL project with the StreamInsight references added. Unlike the built-in DLL project template, this template does not include “Class1.cs” … which has always gotten on my nerves. Item templates for both input and output adapters, typed and untyped. These are multi-file item templates and add the factory, a configuration class and adapters for edge, point and interval events. The typed adapters add a basic/simple type class as well. Each adapter is placed in its own folder and has its own namespace. Because of how the replacement parameters work with the templates, the file names have the format [BaseItemName][Direction].[ItemType]. The base item name will be a short name for the source/destination for the adapter. For example, a point adapter for, say, WCF input adapter for edge events would be WcfInput.Edge. When naming this in the Add New Item dialogue, you would use “Wcf” only. Project Templates: Item Templates: New Item in Solution Explorer: In the near future, I’m planning on adding item templates for the various StreamInsight specific extension points such as UDA’s and, when version 1.2 is released, UDSO’s. In the longer term, I’d like to make this a wizard rather than simple item templates. A wizard may allow me to get away from the “.” in the file names and it will allow users to select existing classes for the typed input adapters rather than creating a “phony” one, thereby creating a better user experience. Once I do that, the VSIX will need to be installed via MSI, which would also allow for the addition of Visual Studio snippets for common query patterns and other code chunks. Ambitious goals, perhaps, but those are the only ones worth having, right? Creating the templates themselves was a royal pain in the a$$. Not so much because of the actual process but because the documentation for creating templates sucks so badly. Some of the documented replacement parameters documented are simply wrong – as in, they don’t get replaced!. Others aren’t documented at all, which makes discovery of these things far more difficult. Considering the number of templates that the Microsoft folks create (and especially the Visual Studio group), one would hope that the documentation would be better that the suckage that exists on MSDN. I wound up digging around in the Visual Studio templates installed on my machine than to figure out some of this stuff. Of course, because I am a big fan of .NET Open Source software, it is available on CodePlex … and you, dear reader, are welcome to sign up if you’d like to help!

How StreamInsight Data is Different – Part II

StreamInsight
In a previous post, I outlined how StreamInsight adds an additional dimension to data – the dimension of time – and how events in StreamInsight correlate to events that we see and experience every day. I’ll now dig a little more into this topic. As things happen in the world around us, seconds, minutes, hours go by and our brain processes and interprets these events. All the while things are happening, time is passing by at a regular, known pace … but that’s not how we experience it. Sometimes time flies by (having a good, interesting discussion over lunch with our friend) and sometimes it drags (waiting around to be seated and a very busy restaurant when you’re hungry). If, however, we use a video camera to record the lunch (because we’re strange like that), the camera will see the events at the regular, known, pace of seconds. So, depending on how we are getting the information in, time may “pass” differently. It may be based on information from our senses that we are processing or it may be external and ticking by regularly like the video camera. Now, back to lunch. My friend and I have placed our orders and our waiter brings our food out and places our orders in front of us. Did he place them down in front of us at the same time? It depends on how we are looking at it and what our time reference is for “at the same time”. Technically, unless he is some sort of super-waiter that can handle an order in each hand and perfectly synchronize his hands to put them both down at the exact same nanosecond, they aren’t happening at the same time. But we don’t usually split hairs like that and we don’t experience, mentally, simultaneity like that. Still, that’s a pretty limited time window. If the waiter brings out, say, my order first (mmmm … I has cheezburger) and then my friend’s order 5 minutes later (ewwww … chicken livers), we aren’t going to say that he brought our orders out at the same time. In StreamInsight, Current Time Increments (CTIs) provide a way to handle time windows and to understand what “at the same time” means within the context of the application. Like our experience, CTIs may or may not happen at regular intervals and they may move quickly or slowly. They may be controlled programmatically or declaratively by input adapter (IDeclareAdvanceTimeSettings), the query (AdvanceTimeGenerationSettings) or even another stream by importing CTIs. Our experience of time really isn’t much different. It is not uncommon to say that two events happened “at the same time” when, in fact, they were off by a few seconds (or more). When we say “at the same time”, we really mean that the events happened within a window that we understand to be simultaneous. And it is the CTIs that allow us to define that in StreamInsight. When StreamInsight does joins and unions, it joins and unions those events that are happening at the same time … that is, events that are valid within the current CTI window. For our lunch example, if we had a stream of the “open hours” interval events, a stream for our “starting/ending lunch” edge event and a stream for the individual point events of our lunch, they would all participate in the join or union. If the event’s valid times are outside the CTI window, they will not participate in the join or union. Those valid times are determined by the start and end times on the events. Furthermore, these joins are happening on the data as it is happening … in a way very much like we perceive events happening in time during our daily experience. The human brain is, after all, a massively parallel complex event processing engine. Going back to lunch, let’s also say that here is a thunderstorm outside during lunch. It’s raining cats and dog, lightning flashing, thunder rolling. We’re all familiar with first seeing the lightning and then hearing the thunder. We know that these events occur at the same time but we don’t “get” them at the same time. Instead, the thunder has latency when compared to the lightning; the event is actually simultaneous with the lightning but it’s arrival is some time later. In the StreamInsight, we have ways of handling this. First, when you handle your CTIs declaratively, either using IDeclareAdvanceTimeSettings or using AdvanceTimeGenerationSettings, you can specify a delay for late-arriving events that then allows these events to be processed with the appropriate, on-time arriving events. For events that arrive even later than that, you can specify dropping or adjusting those events. An example of IDeclareAdvanceTimeSettings is below. In this example, we’re saying “Everything that happens in a 5 second window is considered simultaneous. Wait for a second before that window is closed. Anything that comes in late should be adjusted to the current application time.” var advanceTimeSettings = new AdapterAdvanceTimeSettings( new AdvanceTimeGenerationSettings( TimeSpan.FromSeconds(5), //Frequency of the CTI TimeSpan.FromSeconds(1)), //Delay for late-arriving events AdvanceTimePolicy.Adjust); One caveat with using IDeclareAdvanceTimeSettings … if your adapter is not enqueuing events, the StreamInsight engine will not keep enqueuing CTIs. The CTIs are enqueued only while your adapter is sending events into the queue. So … don’t think that it’s going to keep happily chugging away on, say, a reference stream adapter when you aren’t pushing events in. If you are programmatically enqueuing your CTI events, you have even more control – you can really do whatever you want/need/desire to do since you enqueue the CTI time yourself. Now … we’ve been talking a lot about time and you may think that it’s always happening now and related to the system clock. While that can be true, it’s not necessarily true. StreamInsight runs with application time, not system clock time. What does that mean? Well, it means, first of all, that events don’t have to be enqueued with a timestamp of DateTimeOffset.Now. You can enqueue events with any time that you want. You could, for example, re-run events from a log file using the original timestamp … and you would need to enqueue your CTIs with the proper (in the past) timestamps. You could call this “playback” mode … you’re reviewing events in the past as they happened; for example, watching a video of an event in your life. Like that video, you don’t have to watch it at 1x speed … you can fast forward through events. Your application time does not have to be now … it can be 10 years ago. When you enqueue your events and CTIs, you enqueue them with the appropriate timestamp 10 years ago as well.

StreamInsight : What edition am I running?

Code Sample | StreamInsight
It is sometimes helpful to know what edition and/or version of StreamInsight you are using. This can be especially helpful as part of your application startup, where you can log this information for future/current troubleshooting. Unfortunately, there isn’t a good way to get that information from the StreamInsight API. Sure, you can use one of the Diagnostic Views – the scheduler diagnostic view – that will tell you how many schedulers are running, which will give you an idea of which edition you are running but it doesn’t provide things like the processor architecture installed (x86 vs. x64) nor will it tell you the exact edition of the instance. Oh … and that diagnostic view isn’t there in version 1.2. You’ll just know whether you are running Standard (1 scheduler) or one of Developer, DataCenter or Evaluation (1 scheduler / core). So here’s a little code snippet that will give you some more information: /// <summary> /// Gets information about the current StreamInsight service edition /// </summary> /// <param name="instanceName">Name of the StreamInsight instance.</param> /// <returns>String with edition information</returns> [System.Security.Permissions.RegistryPermission( SecurityAction.Demand, Read="HKLM\\SOFTWARE\\Microsoft\\Microsoft StreamInsight")] public string GetStreamInsightEditionInformation(string instanceName) { try { var streamInsightRegistry = Microsoft.Win32.Registry.LocalMachine.OpenSubKey( @"SOFTWARE\Microsoft\Microsoft StreamInsight", false ); var instanceKeyName = "MSSI." + instanceName; var instanceRegistryKey = streamInsightRegistry.OpenSubKey( instanceKeyName, false ); StringBuilder sb = new StringBuilder(); sb.Append( "StreamInsight Version:" ); sb.AppendLine( instanceRegistryKey.GetValue( "Version" ).ToString() ); sb.Append( "Edition:" ); sb.AppendLine( instanceRegistryKey.GetValue( "Edition" ).ToString() ); sb.Append( "Platform:" ); sb.AppendLine( instanceRegistryKey.GetValue( "PlatformId" ).ToString() ); return sb.ToString(); } catch { return "Could not get StreamInsight information"; } } Some caveats: this will only work on the same machine where StreamInsight is running. So if you are connecting to StreamInsight remotely using the Management Service (for example, to a remote instance of the StreamInsight service), this won’t work for you. Also, it tells you the version of StreamInsight that the instance was installed with – which may not necessarily be the instance that you are running!!! For example, you can have a 1.0 instance, a 1.1 instance and a 1.2 instance. If you use Server.Create(“1.0 Instance”), you will actually be running StreamInsight 1.2! Fortunately, the Server object does have a Version property that will tell you which version you are running.

StreamInsight Dynamic Filter Query Patterns

StreamInsight | Code Sample
A little while ago, I wrote a post about doing AdHoc Query Filters in StreamInsight. In the sample code, I accomplished the filter by using a user-defined function. This caused quite a bit of ugliness that I didn’t like but it did work. Since then, I’ve reworked and extended the filtering and I wanted to revisit it here. I’ve moved all of the query logic out of the user-defined function and into the StreamInsight engine, bypassing things like Split() and Join() as well as improving the overall performance characteristics. The requirements are a bit expanded. While we still have a base DataItem, we’ve added another field to the key – AdapterId. So a data item’s uniqueness is defined by the combination of the ItemId and AdapterId. DataItem now looks like the following: /// <summary> /// Base class for all query data points /// </summary> public class DataItem { /// <summary> /// ID of the source item /// </summary> public string ItemId { get; set; } /// <summary> /// Identifier for the source adapter. /// </summary> public string AdapterId { get; set; } /// <summary> /// Time item was received/caclulated /// </summary> public DateTime SourceTimestamp; } This is the base data type for all of our query data types. When we do dynamic filters, we want to be able to do include the AdapterId in the filter query. If present, then it should match the item’s AdapterId exactly. If missing, all AdapterId’s should match. A single filter could also have a number of groups included … some items from Adapter1, some items from Adapter2 and some from all adapters, which adds a ton of flexibility but also some complexity. The filters are defined by a FilterDefinition class, which has a property for the filter operation (equals, starts with, ends with, contains) and then an array of ItemKey classes. Each ItemKey class has the FilterValue (for the item id), the AdapterId (which can be null) and then a GroupId. The group id is really only needed for the Contains query. You’ll see why in a bit. Our filter definition classes look like the following: public enum FilterOperation { Equals, StartsWith, EndsWith, Contains } /// <summary> /// Defines a filter /// </summary> public class FilterDefinition { /// <summary> /// Operation to perform /// </summary> public FilterOperation Operation { get; set; } /// <summary> /// List of item keys. /// </summary> public ItemKey[] ItemKeys { get; set; } } public class ItemKey { /// <summary> /// Source adapter for the item /// </summary> public string AdapterId { get; set; } /// <summary> /// ID for the item. /// </summary> public string FilterValue { get; set; } /// <summary> /// Gets or sets the group id. /// </summary> /// <value>The group id.</value> public int GroupId { get; set; } } So far, pretty straightforward. Doing the code for the equals, starts with and ends with operations is straightforward and easy using an equijoin. The starts with version is below. Equals and ends with look the pretty much the same. private static CepStream<T> EqualsFilter<T>(CepStream<T> source, CepStream<ItemKey> filterStream) where T:DataItem { var filtered = from s in source from f in filterStream where s.ItemId.StartsWith(f.FilterValue) where String.IsNullOrWhiteSpace(f.Adapter) || f.Adapter == s.AdapterId select s; return filtered; } Implementing contains, however, was a bit more challenging. Since Equals, Starts With and Ends With were all logical “OR” filters, doing a simple equi-join would work. With the Contains operation, however, it should be an “AND” operation … all of the FilterValues in a particular group should match, not just one. Because of that, an equi-join, which would return every item that matched at least one of the filter values, doesn’t work. This query is a bit more complex and takes several steps but it’s still very do-able. There are a couple of ways to do it but I’ll take the way that I’m showing in the demo step by step. Filter the source query to find all candidate matches. Note that these won’t be our final matches; it will return all items that match any 1 of the filter values. Since they need to match all, we have more work to do. Group the filter stream to determine how many matches each group should have. You will need to make sure that the FilterValues are distinct within a specific group for this to work properly. In a real-world application, this would come from the (most likely) input adapter and that will have to ensure the uniqueness. In the demo, I’m using Array.ToPointStream() … so I can use the Linq Distinct() to ensure this. Filter the initial candidate matches to pull out only the items that match all of the filter values in a specific grouping. //Get all initial candidate matches. var initialMatches = from s in source from v in filterStream where s.ItemId.Contains(v.FilterValue) where s.AdapterId == v.AdapterId || String.IsNullOrWhiteSpace(v.AdapterId) select new { MatchedValue = v.FilterValue, GroupId = v.GroupId, ItemId = s.ItemId, AdapterId = s.AdapterId }; //Get the required number of matches per group. var requiredMatchCounts = from v in filterStream group v by v.GroupId into adapterGroup from a in adapterGroup.SnapshotWindow(SnapshotWindowOutputPolicy.Clip) select new { GroupId = adapterGroup.Key, CountRequired = a.Count() }; //Count the number of matches. var countMatches = from i in initialMatches group i by new { i.ItemId, i.GroupId, i.AdapterId } into c from v in c.SnapshotWindow(SnapshotWindowOutputPolicy.Clip) select new { AdapterId = c.Key.AdapterId, GroupId = c.Key.GroupId, ItemId = c.Key.ItemId, Count = v.Count() }; var matched = from c in countMatches from r in requiredMatchCounts where c.GroupId == r.GroupId where c.Count == r.CountRequired //Select select new { Adapter = c.AdapterId, ItemId = c.ItemId }; var final = from g in matched from s in source where g.Adapter == s.AdapterId where g.ItemId == s.ItemId select s; Finally, the implementation uses an extension method on the standard StreamInsight Query object. Since this is designed specifically to utilized dynamic query composition (DQC), this makes it very simple and straightforward to use. Query sourceQuery = cepApplication.Queries["SourceQuery"]; CepStream<ValueDataItem<float>> filteredSourceStream = sourceQuery.Filter<ValueDataItem<float>>(filterDefinition); Query filteredSourceQuery = filteredSourceStream.ToQuery(cepApplication, "FilteredSourceQuery", "Filtered Source Query", typeof (TracerFactory), GetTracerConfig("Source"), EventShape.Point, StreamEventOrder.FullyOrdered);   You can download the sample project from MSDN Code Samples.

Why I will always have my 39th birthday …

Idle Babbling | StreamInsight
I’ve discovered that StreamInsight holds the key to eternal “youth”! var newLifeStream = from e in lifeStream .AlterEventLifetime( //Events greater than 39 years ago get moved forward a year e => e.StartTime.AddYears(39) >= DateTime.Now ? e.StartTime.AddYears(1) : e.StartTime, e => e.EndTime - e.StartTime ) It may need a little tweaking, particularly the event duration, but I still have two and a half months before “go-live.”

MURA StreamInsight Whitepaper Released

StreamInsight
MURA – the Microsoft Upstream Reference Architecture initiative – just released a whitepaper called “Event-Driven Solutions for Oil & Gas Industry” that provides and overview of the scenarios for and value of StreamInsight in drilling, completions and production in O&G. Along with Paul Nguyen and Roman Schindlauer from Microsoft, I worked this whitepaper and am pretty excited to see it published. It was quite an effort. While it does focus on upstream O&G scenarios, there is certainly information on StreamInsight in there that is applicable across multiple industries. Check it out – and additional information about MURA – at www.microsoft.com/mura.

Simple StreamInsight App

StreamInsight | Code Sample
In a previous post, I talked about how to change one of the StreamInsight sample apps to run using a remote StreamInsight server. After a discussion with the person that asked the original question, I’ve put together a full sample app that can run either in process or connect to a remote server. Most of the code is completely identical … input adapters, output adapters, queries … as that part of the programming model is the same. There are, however, some differences between what you need to do for this kind of app and what you typically see in the samples. I’ve tried to include most, if not all, of the server/application bells and whistles to make it easy to play with. Let’s start with the common settings for the sample application. First, the streamInsightInstanceName should point to the active and installed instance. If starting out of process, this is used to build the url to connect to the management service. It probably would be better to have the full url there when you are running remote … but I didn’t do it. Next, you need to specify the streamInsightAppName, which determines the name of the application to create or connect to. Creating the Server – In Process First, set the runInProcess key in the app.config to true. If you want to use the metadata service that was introduced in 1.1, set useMetadata to true. When creating the server, you need to specify the metadata service at startup. If using the metadata overload, it cannot be null. A final note, in StreamInsight 1.2, a new feature called checkpointing is being introduced. This feature allows for higher availability of StreamInsight instances; certain queries can have their entire state written out to disk at some interval and allows for long-running queries to be reconstituted on startup (instead of losing all that information). Checkpointing relies on the metadata services. Server cepServer = AppSettings.Current.UseMetadata ? Server.Create(AppSettings.Current.StreamInsightInstanceName, GetMetadataService()) : Server.Create(AppSettings.Current.StreamInsightInstanceName) When running in process, you can also fire up the management service, which allows the Event Flow Debugger (and other tools) to connect to your StreamInsight instance. If a value is present in the managementServiceUrl settings key, this will be created. At this time, only the WSHttpBinding is supported. Note: you will need to run Visual Studio as admin -or- reserve whatever url you are going to use with WCF. See this blog post for details on that. I, personally, run VS as admin since it makes my life easier in this regard. ServiceHost managementServiceHost = new ServiceHost(cepServer.CreateManagementService()); //Adding the service host. //This allows remote clients to access this application, including the Query Debugger. managementServiceHost.AddServiceEndpoint( typeof(IManagementService), new WSHttpBinding(SecurityMode.Message), AppSettings.Current.ManagementServiceUrl); managementServiceHost.Open(); Creating the Server – Remote Since there are fewer knobs and switches for a remote instance, connecting to a running server is much simpler. With the sample app, it will connect to the standard url for the StreamInsight instance name specified in the config file. However, if you have the management service started on a custom in-process instance of StreamInsight, you will also be able to connect to that. By the way, tThe model used for the recently announced Azure SteamInsight service is the remote model. var endpointAddress = new System.ServiceModel.EndpointAddress( @"http://localhost/StreamInsight/" + AppSettings.Current.StreamInsightInstanceName); Server cepServer = Server.Connect(endpointAddress); Getting the Application In most of the demos/samples out there, not only is it running in process, it’s also running without metadata services. In those cases, you always need to create the application. However, you don’t always need to do this. If connecting to a remote instance or if you are using the metadata service, it is very possible that the application is already there. So we need to check before we grab the application object. private static Application GetApplication(Server cepServer) { if (!cepServer.Applications.ContainsKey(AppSettings.Current.StreamInsightAppName)) { Console.WriteLine("Creating new Cep Application"); return cepServer.CreateApplication(AppSettings.Current.StreamInsightAppName); } else { Console.WriteLine("Connecting to existing Cep Application"); return cepServer.Applications[AppSettings.Current.StreamInsightAppName]; } } Creating the Queries Creating the queries and attaching the adapters is no different whether you are running in process or out of process. However, we need to take into account that, like with the application object, the queries may already be there. With the remote instance, this is pretty clear. When using metadata, the queries will be created when you create the server. However, they will not be started – so you do need to start them. When starting a query, you need to check to see if it’s already running – you will get an exception if you try to start a query that is already started. There are 3 queries that are created and running using a technique called Dynamic Query Composition. This is actually pretty important to understand – but a topic for another post. One is the raw, source query, one is a hopping window aggregate and one is a snapshot window aggregate. It’s interesting to see the differences between the output of the hopping window aggregate and the snapshot window aggregate. The only input adapter used is the random data generator that is included in the StreamInsight Team Samples. Each query is attached to an Async CSV output adapter, again from the StreamInsight Team Samples (not required – you can have a standing, running query without an output adapter). I’ve uploaded the sample to MSDN Code Samples. You can download it here.