Ruminations of J.net idle rants and ramblings of a code monkey

A couple of helpful StreamInsight Query extension methods

StreamInsight
When you are working with StreamInsight queries, there is a certain amount of information that isn’t available from the Query object, but should be … or, that is, I think should be and, since this is my blog, that’s all that really matters, right? The biggest one is the query’s current state. You can’t tell, from the query object, if it is running, stopped, stopping or anything else. Instead, you have to get the diagnostic view and then dig into that. I’m sorry … I am really loving StreamInsight but I find that just a little on the warty side. And … since you need the DiagnosticView to get this, it’s ideal to just add that one on there while we are at it. There are a bunch of things available in the diagnostic view that can be helpful but I find the query state to be the most important. First, an enum for the various query states. This isn’t defined in the StreamInsight assemblies; you get a string from the diagnostic view. /// <summary> /// Enum for the query's current state. /// </summary> public enum QueryState { /// <summary> /// The query has been suspended /// </summary> Suspended, /// <summary> /// The query is in the process of stopping /// </summary> Stopping, /// <summary> /// The query is running and processing events /// </summary> Running, /// <summary> /// The query has been aborted due to an error /// </summary> Aborted, /// <summary> /// The state cannot be determined or the query has been deleted /// </summary> Unknown } Next, the extension methods. I’ve used Get in the title as they aren’t really properties; instead the value needs to be gotten so I feel that it’s best and proper to have that clearly indicated in the method name. They are both quite simple and, I feel, help make working with the query object a bit smoother. /// <summary> /// Class with extension methods for Queries. /// </summary> public static class QueryExtensionMethods { /// <summary> /// Returns the diagnostic view for the query. /// </summary> /// <param name="query">Query object </param> /// <returns>The Diagnostic view for the query.</returns> public static DiagnosticView GetDiagnosticView (this Query query) { DiagnosticView diagnosticView = query.Application.Server.GetDiagnosticView(query.Name); return diagnosticView; } /// <summary> /// Returns the current state of the query /// </summary> /// <param name="query"></param> /// <returns></returns> public static QueryState GetQueryState(this Query query) { QueryState queryState = QueryState.Unknown; try { DiagnosticView diagnosticView = query.GetDiagnosticView(); if (!Enum.TryParse<QueryState>( diagnosticView[DiagnosticViewProperty.QueryState].ToString(), true, out queryState)) { queryState = QueryState.Unknown; } } catch(ManagementException) {} return queryState; } } And … a shout-out to Tony, one of the developers on my team, for coming up with this idea.

StreamInsight: Simple sample of adapters and Server.Connect

.NET Stuff | StreamInsight | Code Sample
This is in response to a question on the StreamInsight forum where a developer was asking for a sample that uses Server.Connect. Before I get into the details, notes: I used the PatternDetector sample from the StreamInsight Product Team Samples on CodePlex. Rather than write something from scratch, I just converted the existing project to VS 2010 and tweaked it. Add a reference to System.ServiceModel to the project. You need to make sure that you defined an instance for StreamInsight in the setup and installed the service. And start it. Starting it helps. I had to change the creds for the StreamInsight service to use my credentials instead of Network Service because I got a funky error (that I’ve never seen before) with the Event Flow Debugger. It may have something to do with the fact that I’m running on a domain controller (don’t say it … I know … but it’s just a dev machine and I need to AD for some Hyper-V VM’s that I run). I’d recommend trying to do it with Network Service but if it doesn’t work, you’ve been warned. The error is below: Security Support Provider Interface (SSPI) authentication failed. The server may not be running in an account with identity 'DEVBIKER\J Sawyer'. If the server is running in a service account (Network Service for example), specify the account's ServicePrincipalName as the identity in the EndpointAddress for the server. If the server is running in a user account, specify the account's UserPrincipalName as the identity in the EndpointAddress for the server. You need to make sure that you copy the relevant DLL’s to the folder for the StreamInsight service host folder. By default, this is C:\Program Files\Microsoft StreamInsight 1.1\Host. These can be found in the \bin folder for the PatternDetector project and are: StreamInsight.Samples.Adapters.SimpleTextFileReader StreamInsight.Samples.Adapters.SimpleTextFileWriter StreamInsight.Samples.UserExtensions.Afa Create a folder for the input and output folders. Actually, if you are running under your own account, this won’t be necessary. If you are running under Network Service, you will. And make sure that you give Network Service appropriate permissions. The User Defined Operator in the sample that I chose caused an issue when running remotely. This surprised me … usually things run pretty much unchanged. I didn’t have the time or energy to debug that so I just changed it so that it was no longer necessary. So … the changes. In void Main, I changed the startup code to: if (inProc) using (Server server = Server.Create(streamInsightInstanceName)) { RunApp(server); } else { using (Server server = Server.Connect(new System.ServiceModel.EndpointAddress("http://localhost/StreamInsight/" + streamInsightInstanceName))) { RunApp(server); } }   As you can see, I refactored everything in the using block to a new method called “RunApp”. This made it cleaner (I thought). I suppose that I could have done it another way, but this just seemed right. In RunApp, I changed to code to create the application to check for the existence of the application and, based on that, get or create the application object. This is below: // Create application in the server. The application will serve // as a container for actual CEP objects and queries. Console.WriteLine("Creating CEP Application"); Application application; if (!server.Applications.ContainsKey(appName)) { application = server.CreateApplication(appName); } else { application = server.Applications[appName]; } And that is (typically) all that you need to do to change a StreamInsight app to run either in proc or remote. As I mentioned before, I did have to change the query and remove the reference to the UDO … which was very strange because I’ve never had issues with extensions before. I’m guessing that it had something to do with the implementation of the UDO. You can download it below Program.cs (the only changed file) for the Pattern Detector sample below:  

StreamInsight User Defined Aggregate: Standard Deviation

.NET Stuff | StreamInsight | Code Sample
Here’s a quick UDA to do standard deviation of a window. I found it interesting that I had to take the IEnumerable<double> source and call ToArray(). If I didn’t, it would throw a NullReferenceException, although why is something of a mystery. It would be nice if I could pass in the Average from the query since that’s already calculated by the StreamInsight engine but no dice. Note: I’ve not done any performance testing … it was copied from MSDN. Use at your own risk … /// <summary> /// Static class with UDA extensions for standard deviation /// </summary> public static class StandardDeviation { /// <summary> /// Extension method for the UDA. /// </summary> /// <typeparam name="T">Payload type of the stream.</typeparam> /// <param name="window">Window to be passed to the UDA</param> /// <param name="map">Mapping from the payload to a float field in the payload.</param> /// <returns>Aggregation result.</returns> [CepUserDefinedAggregate(typeof(StdDevDouble))] public static double StdDev<T>(this CepWindow<T> window, Expression<Func<T, double>> map) { throw CepUtility.DoNotCall(); } } /// <summary> /// A UDA to calculate the standard deviation of a window /// </summary> public class StdDevDouble : CepAggregate<double, double> { /// <summary> /// Computes the aggregation over a window. /// </summary> /// <param name="source">Set of events contained in the window.</param> /// <returns>Aggregation result.</returns> public override double GenerateOutput(IEnumerable<double> source) { double[] values = source.ToArray(); double mean = values.AsParallel().Average(); double standardDev = values.AsParallel().Aggregate( 0.0, // do this on each thread (subtotal, item) => subtotal + Math.Pow((item - mean), 2), // aggregate results after all threads are done. (total, thisThread) => total + thisThread, // perform standard deviation calc on the aggregated result. (finalSum) => Math.Sqrt((finalSum / (source.Count() - 1))) ); return standardDev; } } Using it in a query is simple – just make sure that you add a using statement for the namespace containing the aggregate’s definition. var stdDeviation = from e in sourceQueryStream group e by e.ItemId into eachGroup from window in eachGroup.HoppingWindow( TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(1), HoppingWindowOutputPolicy.ClipToWindowEnd) select new { ItemId = eachGroup.Key, StdDev = window.StdDev(e => e.Value), Avg = window.Avg(e => e.Value), Min = window.Avg(e => e.Value), Max = window.Avg(e => e.Value), Count = window.Count() };

How StreamInsight Data Is Different – Part I

StreamInsight
One of the key challenges that developers (from what I’ve seen) have when getting started with StreamInsight is in dealing with and understanding how data is handled. The concepts of input and output adapters is actually relatively simple and familiar to developers everywhere so that’s simple. The language used by SI to handle the data – LINQ – is also pretty familiar to developers, in this case, deceptively familiar. But once they get into writing the queries, things become more difficult. Let’s start first with what developers are used to – 3-dimensional data. Data is identified and “found” using 3 pieces of information: source, row and column. This tells you where it is. We’re used to seeing things in 3 dimensions; it’s how our world is shaped. Given 3 pieces of spatial values, you can determine either size or location. Yes, these things may change – new building are constructed, existing buildings are reconfigured or renovated, wrecking balls take buildings out. But finding something now only requires those 3 pieces of information. And using this information, you can go to a restaurant. Or you can go on walkabout and explore to see what’s there right now. It is information that is stored and then retrieved on demand. From an experiential perspective, it’s memory and recall. StreamInsight is different. There is an additional dimension added to things – the dimension of time. If, say, you are meeting a friend to have lunch, you need to know more than just where to meet him. You need to know when to meet him. You don’t need to know the “when” dimension to find the restaurant. And this is exactly the dimension that StreamInsight adds to our data. But it’s actually more than that. Saying when to meet our friend is still using it as an attribute, not as a dimension. Instead, it is like how we actually experience the lunch as it is occurring, how our brain processes the events and happenings around us. Like our own daily experience of events in time, data comes to us and passes by. Once a moment has passed, that moment is gone and lost to history. We may remember things, write them down, take a picture and so on … but then we are back to stored data that is a snapshot of time, but is not happening in time. As things (events) are happening, our brain processes them, correlates them, stores some away in memory and tosses some of it out. StreamInsight, as a CEP engine, does much the same thing that our brains do approaching the world in time. It wouldn’t, IMHO, be unreasonable to say that our brains are the most complex CEP engine that there is. Let’s take this a little further and use it to explain some of the core StreamInsight concepts. Like the real world, SI has events and these events have different temporal characteristics. Let’s go back to meeting our friend for lunch. We go to the at a specific time restaurant to meet our friend. When we go into the restaurant, we open the door and walk in. It is an event in time but it’s not something that we’d typically remember later nor would we care to. This is a point event; there and then gone. If there is something unusual about this event … say, for example, the door falls off of its hinges when we open it … but we’ll touch on that a little later. This restaurant that we’re going to isn’t open 24 hours; they are open from 10:00 AM to 11:00 PM on some days and from 10:00 AM to 2:00 AM on some other days. But we know what those hours are going to be and that the restaurant will have a status of “open” during this time period. While the restaurant is open, other things happen … customers come in, place orders and pay their bill. At the end of the day, we have a total number of customer, total number of orders and total amount of money that pass through the register. This, too, is an interval with associated data, one that is just like an aggregate database query. We can have both of these types of intervals in StreamInsight – one where the data associated with it is known in the beginning and one that has data that is calculated based on a time window. Either way, there is a known and definite start and end. Now, back to our lunch. We know we are at lunch and we know who our friend is when it starts but we don’t know, at the time we start, when it’s going to end. We know it will but there are a lot of factors that come into play – how quickly the kitchen is running, how busy our waitress is, a lengthy discussion with our friend the kids and wives or yesterday’s baseball game. Once our lunch is done, we can mark down the end time and move on with our day. This is the edge event. You have a start and sometime later have an end but you don’t know when that end will be when the event itself starts. If you want to get technical, everything happens over a time span, even if it’s a very short one. Likewise, StreamInsight handles all of these events internally as interval events. Point events have a time span of a tick and edge events start out with a time span of infinity and at the end has the end time set. Interval events are, I think, self-explanatory. Where do all of these events come from? In our daily experiences, we learn about things that happening through our 5 senses. Or, perhaps more accurately, we can get it through our 5 senses. We could close our eyes or plug our ears. Our senses are our input adapters. Most of these events come and go with little or even no memory or their occurring (it’s a MASSIVE amount of data that our brains routinely handle) but unusual or extraordinary events are remembered, written down, photographed or video recorded. We can then review these events later, whether it’s to relive them or analyze them deeper. And this is exactly what our output adapters are for. But, just like the events that occur around us, not all events are sent to output adapters, only those events that meet certain conditions. In StreamInsight, we use LINQ queries to do this determination. You won’t, in many cases, want to store these events and, besides, StreamInsight can handle and analyze far more events than we could reasonably send to a disk. A traditional database is always a replay of the past … you select things from tables that have already happened, you analyze them in cubes, create reports, etc. StreamInsight can help you get the appropriate, important information into these storage-based data sources. In the next article, I’ll go further on this and talk about how events are joined and unioned in time.

Race Conditions in StreamInsight Query Startup?

StreamInsight
A bit of explanation is due here. The platform that we are building at Logica is intended to be reusable, configurable and flexible … from adapters to queries (or, more properly sets of related queries). A part of the architecture is what we call the StreamingQueryProvider. This is the centerpiece of everything; after all, everything in StreamInsight revolves around the queries. Initial startup instantiates and starts configured StreamingQueryProviders based on configuration. We could do this serially and, in a lot of ways, this would be MUCH easier but there is a potentially huge cost in startup time when multiple query providers need to be started. This is exacerbated when the input adapters have an expensive startup as well … something that see regularly when starting our OPC-DA adapter. This is because a) our adapter is connecting to and registering for 3000+ item subscriptions and b) OPC-DA is the old standard and is based on (the horror!) DCOM. It’s not something that we can really optimize at the adapter level; it’s something that is inherent in the beast. Each adapter typically takes somewhere between 20 and 45 seconds to initialize, register and start. When you have multiple input adapters registering different subscriptions, you are beginning talk about some serious startup costs. In a real-world deployment scenario, we expect that there could be several OPC sources with even more than 3000 subscriptions each. Now, once the events get into StreamInsight, it handles them like a charm. It’s just the startup. We’ll be working on OPC-UA and, possibly, OPC .NET 3.0 (OPC-Xi – a server-side COM wrapper for OPC-DA/HDA/AE that exposes a WCF-based interface to clients) that I suspect will perform better but here will still be limitations due to the sheer number of subscriptions – a typical offshore rig can have up to 70,000 individual sensors, depending on age of the platform and the operator. It is likely that the number of sensors will grow even more after the Macondo accident. To help optimize startup, we’ve multithreaded the query provider startup – and the queries within each query provider – using the .NET thread pool (System.Threading.ThreadPool::QueueUserWorkItem). This has helped to cut startup time significantly. With our typical test setup, we’ve gone from 1:30 to 2 minutes startup to about 20 seconds – a big and very noticeable improvement. But, in doing so, we’ve also run into some race conditions that require some careful handling. To understand this, I’ll explain what StreamInsight does when a query and input adapter are started. If you look at the interface for the Microsoft.ComplexEventProcessing.Application class, you’ll see several collections. The ones that are of particular interest are InputAdapters, OutputAdapters and EventTypes. These collections cache instances of your adapter factories and the definitions of your CepEventTypes. When using Microsoft.ComplexEventProcessing.Linq.CepStream::ToQuery, the StreamInsight engine will first check these collections and, if they are not there, will create the cached objects – 1 per unique type. When multi-threading your startup queries, you can run into a race condition where you have two threads checking and creating the cached items. Between the check and creation on 1 thread, the other thread creates the cached instance. You will then get one of my absolute favorite exceptions: the TargetInvocationException. Depending on where the condition happens in the code path, it may be the InputAdapter, the EventType or the OutputAdapter. The exception message gives you the necessary details to understand what went wrong but … what do you then do about it? You can trap the exception and retry the operation – which then succeeds without exception but trapping exceptions and using them for control flow is just nasty-ugly. And it’s not like it’s something that you can simply reproduce at will – it’s a classic race condition that sometimes happens but mostly doesn’t. We’re currently using a dependency list to help deal with this issue – making sure that this race condition doesn’t happen but, as I write this entry, it occurs to me that there may be some better ways to handle and avoid these conditions. I won’t detail these possible resolutions until I do some further testing on them.

AdHoc Query Filters with StreamInsight

StreamInsight
It’s been a while – things have been VERY busy getting ready for the big public unveiling at the Microsoft Global Energy Forum. That was yesterday and I do think that it went very well; we had a lot of interesting conversations with customers around  StreamInsight, what it can do and, specifically, Logica’s capabilities to deliver a solution on the StreamInsight platform. That said, I have some ideas and thoughts backed up to blog about. So … here’s your scenario. You have standing queries that you start with StreamInsight and process. With some output adapters, you want all of the results for these queries. However, that’s not always the case, especially when the output adapter is feeding a UI. When you have 7000-8000 individual data points coming into to StreamInsight, you don’t want to be feeding all of this down to the UI. And, since most people can’t process more than a few individual data items at a time, there’s really no point in doing so. (Keep in mind that this is the demo feed; real world scenarios in Oil & Gas can easily have over 50K data points coming in.) Now, you could spend all of your time writing every possible permutation of query filters and, I suppose, if very long term job security is your goal, that might be your path. Personally, I’d find such a thing boring to no end and, since I’m an inherently lazy developer, I’d much prefer to do it once in a flexible manner and move on to more interesting things. So … here it goes. First, you need to define a common data class for your query result items; you need to know, at the very least, what field that you want to filter on and what the resulting datatype is – for typing the stream and query. For a large implementation, you’d want to do this anyway and part of your analysis at the beginning would be to determine the core datatypes required for your queries. You will want these datatypes to inherit from a core base class that defines attributes to include in every concrete data class. Here’s the one that we’ll be using in this example: /// <summary> /// Base class for all query data points /// </summary> public class DataItem { /// <summary> /// ID of the source item /// </summary> public string ItemId { get; set; } /// <summary> /// Time item was received/caclulated /// </summary> public DateTimeOffset Timestamp; } As you can see, it’s pretty basic and, though it’s not marked abstract, it’s main use is to be inherited and to allow use to generically write the query filters. I didn’t mark it abstract to allow for the any potential cases where this would be all we need … though I cannot, for the life of me, think of a single case for this. For a value, we’ll use a derived ValueDataItem class: /// <summary> /// Class for data items with a single value /// </summary> /// <typeparam name="T">Type of the value</typeparam> public class ValueDataItem<T>:DataItem { /// <summary> /// Single value for this item. /// </summary> public T Value { get; set; } } This class will suffice for most raw point events, especially those that come from sensor readings. And, since it’s a generic type, it can hold any value that we need. Note: You may be tempted, as I was, to create typed input adapters that use this kind of generic class. DO NOT DO IT. Just say no. Something funky goes on with the way the typed input adapters are handled that will lead you down a rat hole into Oblivion. You will start trying to force it and violate the KISS principle every step of the way into an endless abyss of reflection and dynamic compilation. But that’s a discussion for another time … To show the flexibility of the methodology, I’m also going to introduce one more DataItem class, the AggregateDataItem. /// <summary> /// Data item for aggregate query results /// </summary> /// <typeparam name="T">Type of the raw source data values.</typeparam> class AggregateDataItem<T>:DataItem { /// <summary> /// Sum of the aggregate /// </summary> public T Sum { get; set; } /// <summary> /// Count of the aggregate /// </summary> public uint Count { get; set; } /// <summary> /// Mininum value of the aggregate /// </summary> public T Min { get; set; } /// <summary> /// Maximum value of the aggregate /// </summary> public T Max { get; set; } /// <summary> /// Average value of the aggregate /// </summary> public double Average { get; set; } } Now that we have the basics down, let’s take a look at what we need to do to evaluate our filter. First, we have a class defining the filter with properties for the ItemIds to compare (as a string array) and an int for the type of comparison. While it’s tempting (and natural) to create the comparison mode as an enumeration, enums aren’t supported datatypes when calling custom methods/functions from a running StreamInsight query so I’ve made them old-school constants. Also, while I am using an array for the ItemIds, StreamInsight doesn’t support passing string arrays into a method from a query either so I have to join the array and then split later. Ugly, but it works. Our method to call and do the comparison are below: public static bool Matches(string value, string itemIdList, int comparisonMode ) { string[] itemIds = itemIdList.Split(';'); return MatchesItem(value, itemIds, comparisonMode); } /// <summary> /// Evaluates for a match /// </summary> /// <param name="value">Value of item to match</param> /// <param name="itemIds">Array of ItemIds for comparison</param> /// <param name="comparisonMode"></param> /// <returns></returns> /// <remarks>CONTAINS is an "AND" match; all others are an "OR"</remarks> public static bool MatchesItem(string value, string[] itemIds, int comparisonMode) { switch (comparisonMode) { case COMPARISON_NONE: return true; case COMPARISON_EQUALS: return itemIds.Any(value.Equals); case COMPARISON_STARTS_WITH: return itemIds.Any(value.StartsWith); case COMPARISON_ENDS_WITH: return itemIds.Any(value.EndsWith); case COMPARISON_CONTAINS: return itemIds.All(value.Contains); } return false; } Now. the code to create the filtered query from a standing query and attach an output adapter to it: /// <summary> /// Creates a filtered query from an existing query. /// </summary> /// <typeparam name="T">Type of the query's output data item</typeparam> /// <param name="cepApplication">Hosting CEP Application</param> /// <param name="queryName">Name of the standing query</param> /// <param name="filterDefinition">Filter definition</param> /// <param name="outputAdapterFactoryType">Type of the output adapter factory.</param> /// <param name="outputAdapterSettings">Settings for the output adapter.</param> /// <param name="eventShape">Event shape for the resulting query</param> /// <param name="eventOrder">Event order for the resulting query.</param> /// <returns>Newly created query.</returns> /// <remarks>Query is not started on return.</remarks> public static Query CreateFilterQuery<T>( Application cepApplication, string queryName, QueryFilterDefinition filterDefinition, Type outputAdapterFactoryType, object outputAdapterSettings, EventShape eventShape, StreamEventOrder eventOrder) where T : DataItem { //Get the source query. Query sourceQuery = cepApplication.Queries[queryName]; Guid newQueryId = Guid.NewGuid(); //Convert to a stream to monkey with it. CepStream<T> cepStream = sourceQuery.ToStream<T>(); string itemIdList = String.Join(";", filterDefinition.ItemIds); var filteredStream = from d in cepStream where QueryFilterDefinition.Matches(d.ItemId, itemIdList, filterDefinition.ComparisonMode) select d; //Create the query. Query filteredQuery = filteredStream.ToQuery( cepApplication, newQueryId.ToString(), "Filtered " + queryName, outputAdapterFactoryType, outputAdapterSettings, eventShape, eventOrder); return filteredQuery; } Note the use of the generic T in the method call as well as the generic constraint. This is why we started with the base class defining ItemId; we need it to provide StreamInsight with the schema for the query and the constraint to ensure that we have an ItemId to use for the filter. If the generic type specified when you create the filtered query doesn’t match the type of the standing query, you will get an error when you create the query. Creating the filtered query is as simple as the following: Query filteredAggregate = QueryFilterDefinition.CreateFilterQuery<AggregateDataItem<float>>( cepApplication, "Aggregate", new QueryFilterDefinition() { ComparisonMode = QueryFilterDefinition.COMPARISON_CONTAINS, ItemIds = new string[] {"1", "2"} }, typeof(TracerFactory), tracerConfig, EventShape.Interval, StreamEventOrder.FullyOrdered); filteredAggregate.Start(); Yes, I’ve left a LOT out … I didn’t feel that it would be necessary to show all of the set-up, etc. But you can download and review the demo solution to see the details.