Ruminations of J.net idle rants and ramblings of a code monkey

Getting started with IoT

.NET Stuff | IoT | StreamInsight
So … I’ve started playing with the whole Internet-of-Things stuff that Microsoft has been talking about lately. It’s funny … I’ve been doing this kind of stuff for several years, with Microsoft’s own product and it’s finally getting traction at Microsoft. Largely, I think, because of two things: first, Windows 10 IoT edition (which is pretty cool) and second, the Azure Event Hubs. And, honestly, I can’t say that I’m just “getting started” with IoT – I’ve been doing it for a while. But I am hopping on the current wave coming out from Redmond and, overall, I like what I’m seeing. If you weren’t aware of Event Hubs, it’s a CEP service in the cloud. It scales to millions of events per second (they claim). In that, it has some of the same functionality as StreamInsight. However, getting StreamInsight to scale to millions of events per second would, admittedly, be pretty tough … that’s a lot of raw throughput. But part of how Event Hubs accomplishes this is to limit the functionality provided – the sources and sinks are strictly limited to those the team has elected to include and there’s no extensibility. It also doesn’t seem to provide the number of options that you have with StreamInsight. Not that this is a bad thing … depending on what you are trying to do, this may be absolutely fine. Also, the integration into the Azure Machine Learning with Event Hubs is pretty cool – though you can do that with a custom UDO or UDF in StreamInsight. And it’s not an either-or thing … you don’t have to use simply one or the other. In fact, there is, I think, far more usages for a combination scenario, with StreamInsight on-prem feeding to Event Hubs in the cloud. First, since there’s a lot more that you can do with StreamInsight locally (due to its extensibility), there’s quite a bit of processing you can add to the data that you are streaming, including far more advanced analytics as well as downsampling and data cleansing. Also, with local connectivity to other devices, you can respond immediately to events and conditions by issuing control commands to these other devices that are on your local network. Rather than an “Internet of Things”, it’s an “Intranet of Things”. That locally processed information can then be forwarded to the cloud Event Hub if you so desire. Think, for example, of facilities management. You have various sensors around the facility measuring all kinds of things – temperature, humidity, air quality (including CO, CO2, volatile organics, flammable gases and more) – and taking appropriate action or raising alerts. Then, this would be downsampled (using, say, a hopping window in StreamInsight) and then streamed to the cloud-based Event Hub for multiple facility monitoring and tracking. Naturally, I had to go get myself a bunch of devices. I’ve gotten an Arduino R3, Netduino 3 with WiFi and Raspberry Pi and started playing with them. My first task : duplicate the Raspberry Pi IoT Weather Station demo. Done … but took a little longer than expected and included a complete brain-fart where I hooked the 5V wire from the RasPi to the ground on the weather shield. Surprisingly, nothing appears damaged but, needless to say, it didn’t work either … and they both got quite warm. But first … now that I’ve had these for about a week, here are some thoughts. Arduino Uno R3: This is a neat little device and it’s really, really inexpensive. When it comes to embedded, build-it-yourself devices, this is the one that has, by far, the most options and most importantly (especially when you are trying to figure the thing out), the most sample code out there for using sensors and devices. It’s also open source and many of the components are as well. The controller is a 16Mhz ATmega. Yup, that’s right … only 16Mhz. The same clock speed as the first 386’s – but it’s RISC-based, so it’s a different architecture. It’s also got 32KB of flash and 2KB of SRAM … so it’s actually very limited when it comes to program size (in the flash) and working set (SRAM).  There are various flavors of the Arduino, including “clones” (for lack of a better term) and there’s also sew-on (wearable) Arduino-based systems (called LilyPad). The biggest downside to the Arduino is that the development environment really makes you appreciate Visual Studio. It’s bad … really bad. And if you leave it running too long – or maybe it’s through a sleep/resume cycle – it’ll starting chewing up CPU and memory. The programming language is C/C++-based. (Late breaking development … apparently there is a Visual Studio plug-in for Arduino. I will have to check it out). The debugger is non-existent. To debug, you’ll be writing out to the serial port and reading it. The IDE has a handy tool to monitor your serial/USB port to show the output. Still, it’s remarkably capable and powerful for the things it was designed for and the LilyPad stuff is quite intriguing. (Example: GPS embedded in a kid’s backpack, with cellular uplink to report location periodically. Kick it up a notch to record UV exposure as well). Power requirements are quite low … it’ll run just fine from a USB connection (500mA). It can be had for as little as $10. Netduino 3 WiFi: This is an Arduino-shield compatible microcontroller that runs the .Net MicroFramework – a mini version of .Net for embedded system. This latest rev of the Netduino has a 168 Mhz processor, 1408KB of flash and 164KB of RAM. As the name implies, it also as 802.11 Wi-Fi built in, including support for 802.11N – but only to the 2.4Ghz band. That – right there – makes this, for me, a far more interesting and useful device than the Netduino, especially for the journey upon which I am embarking. Yes, there are WiFi and Ethernet shields for the Arduino but this is baked-in to the board. It also has a unique (as far as I can tell) Go-Bus that allows you to quickly and easily plug in additional devices, including additional Arduino shields. The board itself will run on a USB connection but once  you start adding things to it, you’ll need to have supplemental power. Like the Arduino, it’s a truly embedded system and has no built-in support for any kind of display – though you can add Arduino-based LCDs or OLEDs for a simple display. Development is exclusively with Visual Studio using C# or VB.net and you get the full debugging experience right in Visual Studio. More powerful than the Arduino, it’s also more expensive at $70. Raspberry PI 2 Model B: Second full version of the open-source Raspberry PI. This is a full computer that’s the size of a credit card. It sports a 900Mhz quad-core ARM processor with 1 GB of RAM, has a dedicated camera and display interface, HDMI port, 4 USB 2.0 ports, microSD and Ethernet all on-board and runs various flavors of (ported) Linux including Ubuntu, OpenElec, Pidora, and a flavor just for the RasPi called Raspbian as well as Windows 10 IoT edition. Development is done with Python, Scratch, C++ or various other environments (Sonic Pi looks like it’d be really cool for my son as he’s into music) or – if you install Windows 10 IoT edition on it (as I did), you can use Visual Studio 2015. Unlike the ‘duino-based boards above, it won’t reliably run just from a USB connection (500mA for USB 2.0, 900mA for USB 3.0) so you’ll want to get a higher-powered micro USB power supply. Also, unlike both of the ‘duino boards above, it doesn’t have a built-in analog-to-digital converter (ADC) so if you want to use any analog sensors (and there are plenty of them), you’ll need to wire in an ADC. That said, of the three, this is, by far, the most powerful – but that’s not a fair comparison. RasPi is a general-purpose computer that someone with modest needs to use as their personal computer – or as a media center – or as a full web server/DNS server/DHCP Server … you get the idea. It’s also a very, very affordable $40. There are others out there as well. I’m also particularly interested in the Intel Galileo boards as they are Arduino-shield compatible but, like the RasPi, will run Windows 10 IoT edition (at least R1, right now … R2 runs Windows Embedded).     The Project: Weather Station Intra/Internet of Things Edition Over the next … well, however long but I hope to be done by the end of the summer … I’ll be building out a duplicate and extension of the Raspberry Pi IoT Weather Station demo mentioned above with the Netduino as well as extending the Rasp Pi/Win10 version. We’ll set up the weather station to stream to an Intranet-Of-Things server running StreamInsight that will then publish the data internally using a modified version of Microsoft’s Connect The Dots. This will be running StreamInsight and also publish a cleansed, down-sampled stream to the Azure Event Hub. By the end of it, I hope to have a full weather station, including wind and rain meters as well as GPS, in my back yard … and, if all goes as I hope, it’ll incorporate feeds from Weather Underground of other weather stations in my area (for comparison and alerts of incoming fronts/weather changes) as well as publish to them. I’ll also hopefully have a couple of these things scattered around the house doing things like air quality, light and presence in addition to temperature and humidity. For the Intranet-Of-Things server, I’ll probably be using my recently purchased Asus VivoMini that’s running Windows Server 2012 R2. Just today, I got all of the code running on the Netduio to access the built-in sensors of the SparkFun Weather Shield – light, temperature, humidity and pressure. There were several challenges that came out of this … and a pretty nice I2C device bus class for Netduino as well. I’ll be covering that in my next post.

RealTime Web Analytics Presentation And Demo From HDNUG

Code Sample | StreamInsight
Here’s all of the materials from the presentation that I did at the Houston .NET User’s Group on March 13. Some of the things that I updated from the version for Baton Rouge Sql Saturday include: Lock around active requests HashSet: Added a lock block around adding and removing items from the _activeRequest HashSet. Since the HashSet isn’t thread-safe and our source is very highly multi-threaded, we need to make sure that operations that modify the internal array are thread-safe. This eliminated some random “IndexOutOfBoundsException” in the source that would halt StreamInsight process. Checks in StandardDeviation UDA: Added checks for the number of values as well as the result. If the number of values in the set for the standard deviation is less than one, the standard deviation is always 0. Also, after the calculation, there’s an additional check on the result to make sure it’s not NaN. This eliminated some random exceptions in the queries that were calculating the standard deviation that would halt the StreamInsight process. Both cases highlight the need to make sure that your custom code running in StreamInsight is tight and solid. They were pretty difficult to track down as well … both would happen randomly. Intellitrace was absolutely essential to identifying and resolving the issues. After fixing them, I was able to run for hours without a problem. Notes to reproducing the demo: I’m not including the site that I used when running the demo. You can get this from the NopCommerce site on CodePlex. I used the out-of-the-box site with sample data. Keep in mind that the module used for the demo forces some of the requests to take 3 seconds – these are our “Bad Actors” – so it’s in no way representative of the performance of nopCommerce. You’ll need to set it up on a local site on port 81 if you want to use the Visual Studio load tests. From there, you need to copy the WebSiteMonitor.Contracts and WebSiteMonitor.Module assemblies into the \bin folder of the site and add the following into the web.config: Under system.WebServer, add the module Code Snippet <modulesrunAllManagedModulesForAllRequests="true">   <addname="MonitorModule"type="WebSiteMonitor.Module.WebSiteMonitorHttpModule"/> </modules> Under system.ServiceModel, add the WCF configuration Code Snippet <system.serviceModel>   <serviceHostingEnvironmentaspNetCompatibilityEnabled="true"multipleSiteBindingsEnabled="true" />     <bindings>     <netTcpBinding>       <bindingname="streamedNetTcpBinding"transferMode="Streamed" />     </netTcpBinding>   </bindings>   <client>     <endpointaddress="net.tcp://localhost/eventService"binding="netTcpBinding"       bindingConfiguration="streamedNetTcpBinding"contract="WebSiteMonitor.Contracts.IRequestService"       name="Client" />   </client> </system.serviceModel> You may (probably will) need to specify the StreamInsight 2.1 instance name that you are using. This is in the app.config for the WebSiteMonitor.WinForms project under “applicationSettings”. The setting name is “StreamInsightInstance” (very creative, I know). You’ll want to run the client app “As Administrator” or reserve the URLs for non-administrator users and accounts. If you are running from Visual Studio, run Visual Studio as Administrator. I tend to run as Administrator when testing and running the demo. In the real-world, you’d reserve the URLs. The TestWebSite project in the solution is a “New Web Site” template from Visual Studio and helps make sure that everything is set up properly. It also has the configuration settings.

Decoupling Queries from Input and Output

Code Sample | StreamInsight
There’s something that really disturbs me about how all of the StreamInsight samples are put together – everything is very, very tightly coupled. Streams are tightly bound to the source and the target of the data and there’s no good guidance on how to break all that stuff apart. I understand why – it certainly simplifies things – but that’s Bad Mojo™ and not how we want to build enterprise applications. Let’s take a simple scenario. Let’s say that you are developing queries to detect key events for, say, an offshore well. You have a good idea what these events look like – what follows what – and you even have some recordings that you can play back at high speed to test your algorithms. All of your query code is built much like the demos are, so you’re pretty tightly bound to the source (the database) and your output (the console, just for giggles). Now … how do you hook this up to your production sources? And you certainly won’t be using a console output for the events that you detect in production. So do you go in, real quick-like, and change all of your code before compiling it for production? If so … let me know how that goes for you. And don’t call me when things start falling apart while you are doing last-minute compiles and emergency deployments and the like. This, however, is how all of the samples are written. Granted … they are samples and aren’t meant to be “ready for the real-world”. But, sadly, there’s precious little guidance out there on a better way to do this in StreamInsight. With that said, however, this isn’t a new problem in software development. Oh, sure, the technology may be new and/or different, but the pattern of the problem certainly isn’t. What we need to do is abstract the specification of the consumer or producer from its usage when creating the streams. Fortunately, we’ve already defined a pretty well abstracted factory pattern for constructing our producers and consumers, even in the context of Reactive-based streams so this helps (and was part of the method to my madness!). In addition to abstracting the producers and consumers, we also need to have a pattern for reuse of query logic. Take this scenario as an example: we’re monitoring offshore platforms in real-time. We have a set of queries that we use to process the sensor readings from the platforms and this logic should be applied to each of the platforms – it detects the key events that we’re concerned with. The only difference between each set of queries is the source of the data (different connections), the metadata associated with the queries and, possibly, the outputs for detected events (though, using subjects, we can aggregate the output events for a single output stream). Enter the Stream Builder This becomes our unit of query re-use. Using a defined interface and implementing classes based on this interface, we can encapsulate the query logic into a reusable “chunk” that gets built together. We can use this to start and stop the queries/process (depending on which model we are using) as well as to provide configuration for different parameters and the producers and consumers. Let’s start with the base class for configuration. StreamBuilderConfiguration publicclassStreamBuilderConfiguration:IConfiguration {     privatereadonlyDictionary<string, EventComponentDefinition> _eventProducers =         newDictionary<string, EventComponentDefinition>();       publicIDictionary<string, EventComponentDefinition> EventProducers     {         get { return _eventProducers; }     }       privatereadonlyDictionary<string, EventComponentDefinition> _eventConsumers =         newDictionary<string, EventComponentDefinition>();       publicIDictionary<string, EventComponentDefinition> EventConsumers     {         get { return _eventConsumers; }     }       publicstring Name     {         get; set;     }       publicvirtualEventComponentDefinition DefaultConsumer     {         get         {             returnnewEventComponentDefinition(){                 ComponentType=typeof(NullDataConsumerConfig),                 Configuration=newNullDataConsumerConfig()};         }     } } First, you’ll notice that we define a dictionaries of EventComponentDefinitions. What is this? Well, keep in mind that we need a factory and a configuration to create our producers and consumers. So … this is what the EventComponentDefinition class encapsulates. EventComponentDefinition publicclassEventComponentDefinition {     publicobject Configuration { get; set; }     publicType ComponentType { get; set; } } In this case, the type for “ComponentType” is the producer/consumer factory class. So … now we have a way to abstract the definition of the consumers and producers (in the configuration) as well as a way to find them. In case you haven’t guessed yet, this provides an inversion of control that uses the dictionary lookup to locate the appropriate service. Now, the producer and/or consumer must still handle the type of the payload for the stream and we don’t have anything (yet) that checks and/or ensures that this is actually correct and compatible but we now have a contract for specifying these items and a contract for creation. Finally, so that we have an interface that we can bind to without having to worry about generics, we’ll extract the Start and the Stop methods into an interface. It’s all nicely abstracted now and ready for  us to create our stream builder. StreamBuilder publicinterfaceIStreamBuilder {     void Start(Microsoft.ComplexEventProcessing.Application cepApplication);       void Stop(); }   publicabstractclassStreamBuilder<TConfiguration> : IStreamBuilderwhere TConfiguration:StreamBuilderConfiguration {       protected StreamBuilder(TConfiguration configuration)     {         this.Configuration = configuration;     }       public TConfiguration Configuration     {         get;         privateset;     }       protectedApplication CurrentApplication     {         get;         set;     }       publicabstractvoid Start(Microsoft.ComplexEventProcessing.Application cepApplication);       publicabstractvoid Stop();       protectedEventComponentDefinition GetConsumer(string name)     {         if (Configuration.EventConsumers.ContainsKey(name))         {             return Configuration.EventConsumers[name];         }         if (Configuration.DefaultConsumer != null)         {             return Configuration.DefaultConsumer;         }         thrownewInvalidOperationException(string.Format(ExceptionMessages.CONSUMER_NOT_FOUND, name));     }       protectedEventComponentDefinition GetProducer(string name)     {         if (Configuration.EventProducers.ContainsKey(name))         {             return Configuration.EventProducers[name];         }           thrownewInvalidOperationException(string.Format(ExceptionMessages.PRODUCER_NOT_FOUND, name));     }   } We also have nice “helper” methods to get the producers and the consumers. Since a consumer isn’t really required, we also have a default consumer – the null consumer – in case it’s not specified. However, the producer is absolutely required so if it’s not found (not provided), we throw an exception. And since the interface has both a Start and a Stop, we can use this nice layer of abstraction to manage our running queries in a way that is abstracted from the underlying StreamInsight API in a consistent method regardless of the API that we are using for the queries. From here, we’ll create more specific implementations for the Reactive and the Query models. ProcessBuilder Implementation With the Reactive model introduced in StreamInsight 2.1, the process is the unit of execution. Related queries are started, executed and stopped as a single unit. You can have multiple bindings in each process for both input and output and they all start and stop together. So … it makes sense that our ProcessBuilder will build a single process with all of the related streams. bound together in that process. We’ll also abstract the code that we need to write for every source stream and for binding every output stream. The trick for handling the bindings is simple … have them added to a list that gets built up and then, when it’s time, bind them all together using With and run them in a single process. Of course, abstracting the bindings also allows us to do this pretty easily. Overriding the StreamBuilder’s Start method allows us to wrap up all of the necessary housekeeping to get started as well as to bind all of the streams together and run them in a single process. We’ll also define a “CreateStreams” method (as abstract) … our concrete implementations will override this method to do the work of creating the streams. ProcessBuilder publicabstractclassProcessBuilder<TConfiguration>:StreamBuilder<TConfiguration>     where TConfiguration:StreamBuilderConfiguration {     protected ProcessBuilder(TConfiguration configuration) : base(configuration)     {     }       privateList<IRemoteBinding> _bindings;     privateIDisposable _runningBindings;       publicoverridevoid Start(Application cepApplication)     {         _bindings = newList<IRemoteBinding>();         CurrentApplication = cepApplication;         CreateStreams();           var bindingList = _bindings.Skip(1).Aggregate(_bindings.First(),             (current, source) => current.With(source));         _runningBindings = bindingList.Run(Configuration.Name);       }       publicoverridevoid Stop()     {         if (_runningBindings != null)         {             _runningBindings.Dispose();         }     }       protectedIQStreamable<TPayload> CreateStream<TPayload>(string producerName, EventShape eventShape)     {         var producer = Configuration.EventProducers[producerName];         returnRxStream<TPayload>.Create(CurrentApplication, producer.ComponentType, producer.Configuration, eventShape);     }       protectedIQStreamable<TPayload> CreateStream<TPayload>(string producerName, EventShape eventShape,         AdvanceTimeSettings advanceTimeSettings)     {         var producer = Configuration.EventProducers[producerName];         returnRxStream<TPayload>.Create(CurrentApplication, producer.ComponentType, producer.Configuration, eventShape, advanceTimeSettings);     }       protectedvoid Bind<TPayload>(IQStreamable<TPayload> stream, string consumerName, EventShape eventShape)     {         var consumer = GetConsumer(consumerName);         _bindings.Add(stream.ToBinding(CurrentApplication, consumer, eventShape));       }       protectedabstractvoid CreateStreams(); } Creating a new process builder is super-simple a really cuts down on the amount of code that you need to create and bind your streams. Both your events producers and consumers are now named resources – the process builder has no idea what the source or the targets are and it doesn’t need to know. SampleProcessBuilder publicclassSampleProcessBuilder : ProcessBuilder<SampleStreamBuilderConfig> {     public SampleProcessBuilder(SampleStreamBuilderConfig configuration)         : base(configuration)     {     }       protectedoverridevoid CreateStreams()     {           var data = CreateStream<TestDataEvent>("SourceData",                             EventShape.Point,                             Configuration.GetAdvanceTimeSettings());           Bind(data, "Output", EventShape.Point);         var aggregate = from d in data             group d by d.ItemId             into itemGroups             from i in itemGroups.HoppingWindow(TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(2))             selectnew             {                 ItemId = itemGroups.Key,                 Average = i.Avg(e => e.Value)             };         Bind(aggregate, "Aggregate", EventShape.Point);     } } CepStreamBuilder Implementation In keeping with our principle to keep the APIs as close as possible, we’ll also have a CepStreamBuilder that looks and acts the same way as our ProcessBuilder. However, we have a little bit of extra work involved since there is no concept of a Process with multiple inputs and outputs that run as a single unit of execution with the pre-2.1 model, we have a little extra work to do. Prior to the reactive model, the query was the unit of execution – it could have multiple input streams but only one output. Each query was independent of all other queries as well and there was no way to conveniently run related queries together. Reuse of query outputs was done through a technique called Dynamic Query Composition (DQC) that re-enqueued the output of one query back into a new stream, which then got output to another query. If you looked at the details of how this was done, it was a special input/output adapter pair. In essence, it was a subject without the flexibility or extensibility that we have with subjects in the Reactive model. Finally, we’ll also need to take into consideration the potential for name clashes when we name our queries – not something that we needed to worry about when working with the process container – and a way to “group” our related queries together – something the process container does inherently but not something that the query model did for you. QueryBuilder publicabstractclassQueryBuilder<TConfiguration> : StreamBuilder<TConfiguration>     where TConfiguration : StreamBuilderConfiguration {       publicList<Query> _queries;       protected QueryBuilder(TConfiguration configuration) : base(configuration)     {     }       publicoverridevoid Start(Application cepApplication)     {         _queries = newList<Query>();         CurrentApplication = cepApplication;         CreateStreams();           //Start all of the queries.         foreach (var query in _queries)         {             query.Start();         }       }       publicoverridevoid Stop()     {         foreach (var query in _queries)         {             query.Stop();         }         foreach (var query in _queries)         {             query.Delete();         }         _queries = null;     }       protectedabstractvoid CreateStreams();       protectedCepStream<TPayload> CreateStream<TPayload>(string producerName, EventShape eventShape)     {         var producer = Configuration.EventProducers[producerName];         returnCepStream<TPayload>.Create(CurrentApplication,             GetSourceStreamName(producerName), producer.ComponentType,             producer.Configuration, eventShape);     }       protectedCepStream<TPayload> CreateStream<TPayload>(string producerName, EventShape eventShape,         AdvanceTimeSettings advanceTimeSettings)     {         var producer = Configuration.EventProducers[producerName];         returnCepStream<TPayload>.Create(CurrentApplication,             GetSourceStreamName(producerName), producer.ComponentType,             producer.Configuration, eventShape, advanceTimeSettings);     }       protectedvoid Bind<TPayload>(CepStream<TPayload> stream, string consumerName, EventShape eventShape)     {         var consumer = GetConsumer(consumerName);         _queries.Add(stream.ToQuery(CurrentApplication, GetQueryName(consumerName),             GetQueryDescription(consumerName), consumer.ComponentType,             consumer.Configuration, eventShape, StreamEventOrder.FullyOrdered));       }       privatestring GetQueryDescription(string consumerName)     {         return"Query from QueryBuilder [" + Configuration.Name + "] for consumer [" + consumerName + "]";     }       protectedstring GetQueryName(string consumerName)     {         return Configuration.Name + "." + consumerName;     }     protectedstring GetSourceStreamName(string producerName)     {         return Configuration.Name + "." + producerName + ".Stream";     }   } And now, creating a query builder is, like the process builder, super-simple. In fact, we can copy and paste the code from the process building into the query builder. The only difference is the name of the base class. SampleQueryBuilder publicclassSampleQueryBuilder:QueryBuilder<SampleStreamBuilderConfig> {     public SampleQueryBuilder(SampleStreamBuilderConfig configuration) : base(configuration)     {     }       protectedoverridevoid CreateStreams()     {         var data = CreateStream<TestDataEvent>("SourceData",                                         EventShape.Point,                                         Configuration.GetAdvanceTimeSettings());           Bind(data, "Output", EventShape.Point);         var aggregate = from d in data                         group d by d.ItemId                             into itemGroups                             from i in itemGroups.HoppingWindow(TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(2))                             selectnew                             {                                 ItemId = itemGroups.Key,                                 Average = i.Avg(e => e.Value)                             };         Bind(aggregate, "Aggregate", EventShape.Point);     } } Using the Stream Builders Again, this is pretty simple. You create the configuration. You create the builder. You start the builder. You stop the builder. Regardless of whether it’s a QueryBuilder or a ProcessBuilder, the code to do this is the same and it bound to the StreamBuilder base class, rather than the specific implementations. Most of the code is now, actually, in creating the configuration to pass to the builder. Right now this is hard-coded but, in time, it won’t be; we’re pretty close to having all of the core, foundational pieces in place to further abstract even the configuration. RunBuilder privatestaticvoid RunBuilder(Application cepApplication, Type builderType) {     var builderConfig = GetSampleStreamBuilderConfig();     var builder = (IStreamBuilder) Activator.CreateInstance(builderType, builderConfig);     builder.Start(cepApplication);       Console.WriteLine("Builder is running. Press ENTER to stop.");     Console.ReadLine();     builder.Stop(); }   privatestaticSampleStreamBuilderConfig GetSampleStreamBuilderConfig() {     //Create the configuration.     var streamProviderConfig = newSampleStreamBuilderConfig()     {         Name = "Provider1",         CtiDelay = TimeSpan.FromMilliseconds(750),         CtiInterval = TimeSpan.FromMilliseconds(250)     };       //Add the producer.     streamProviderConfig.EventProducers.Add("SourceData", newEventComponentDefinition()     {         Configuration = newTestDataInputConfig()         {             NumberOfItems = 20,             RefreshInterval = TimeSpan.FromMilliseconds(500),             TimestampIncrement = TimeSpan.FromMilliseconds(500),             AlwaysUseNow = true,             EnqueueCtis = false         },         ComponentType = typeof (TestDataInputFactory)     });       //Add the consumer.     streamProviderConfig.EventConsumers.Add("Output", newEventComponentDefinition()     {         Configuration = newConsoleOutputConfig()         {             ShowCti = true,             CtiEventColor = ConsoleColor.Blue,             InsertEventColor = ConsoleColor.Green         },         ComponentType = typeof (ConsoleOutputFactory)     });     streamProviderConfig.EventConsumers.Add("Aggregate", newEventComponentDefinition()     {         Configuration = newConsoleOutputConfig()         {             ShowCti = true,             CtiEventColor = ConsoleColor.White,             InsertEventColor = ConsoleColor.Yellow         },         ComponentType = typeof (ConsoleOutputFactory)     });     return streamProviderConfig; } Wrapping Up We’ve made a good deal of progress with this revision. We focused first on creating abstractions for the producers and consumers – they’re the beginning and the end, after all, of your StreamInsight application – and now added a framework for the chewy middle. I had a couple of questions from folks as to why I made some of the architectural decisions, particularly with having factories for the Reactive-model sinks and producers. Hopefully some of that is a little clearer now but, as we move forward, there will be additional things that we may do with the factories. There are also some changes and tweaks included in this download that aren’t described in the blog, primarily around the serialization/deserialization for the event consumers and producers, that were done while I was building the demo for Baton Rouge Sql Saturday. There’s still some work to do around these to optimize the performance, particularly for the serialization side of the equation, that I’ll get to sometime later. I was, however, quite proud that I was able to shave about 3 ticks off the deserialization performance. Now, this doesn’t sound like much (and it’s not) but when you realize that you may be doing this process tens of thousands of times per second, it adds up really quickly. So, what’s next? There are two key pieces that we’ll need to put into place … abstracting the application hosting mode and then a configuration system. And then lots to build out … common query patterns in Linq macros, some aggregates and operators and building out more of the stream builder classes/model, to name a couple. Oh, and let’s not forget that we’ll need to have some of the basic, common event producers and consumers. Last, but not least, you can download the code from here:

How long did that edge event take?–The Sequel

Code Sample | StreamInsight
This is beginning to become a recurring theme around here. It turns out that there is yet another way to get a composite event that will tell you how long a particular event “took”. This goes back to the initial post, where I was using a subject to create a new event that included the original payload but with a Duration property, allowing you to get the total milliseconds for a specific event. You see, subjects have become one of my mostest favoritest features with the whole Reactive model in StreamInsight 2.1; there’s just so much that they let you do that you couldn’t do before. But this particular scenario was possible without subjects and it really should have smacked me in the face. Rather than using a subject, we can use a UDSO (specifically, an edge stream operator) to achieve the exact same thing but with fewer moving parts. Oh … and using a UDSO doesn’t create an independent timeline, whereas using the Subject does create a new timeline in the process. Overall, the UDSO is far simpler, easier to understand and you don’t have to worry about any CTI violations or funkiness thereabouts. With that, here’s the operator: EventDurationOperator public class DurationEvent<TPayload>{     public TPayload Payload;     public double TotalMilliseconds; }   [DataContract] public sealed class EventDurationOperator<TPayload> : CepEdgeStreamOperator<TPayload, DurationEvent<TPayload>> {    public override bool IsEmpty    {        get { return true; }    }      public override DateTimeOffset? NextCti    {        get { return null; }    }      public override IEnumerable<DurationEvent<TPayload>> ProcessEvent(EdgeEvent<TPayload> inputEvent)    {        if(inputEvent.EdgeType == EdgeType.End){             //Create the new duration event.             yield return new DurationEvent<TPayload>{                 Payload = inputEvent.Payload,                 TotalMilliseconds = (inputEvent.EndTime - inputEvent.StartTime).TotalMilliseconds             };         }    } } As you can see, the output here is identical to the output from the previous solution but the code is far, far simpler. Using the operator is simpler than the subject as well. Using the Operator var source = items.ToEdgeStreamable(e => e.GetEvent(startTime), AdvanceTimeSettings.IncreasingStartTime); var results = source.Scan(() => new EventDurationOperator<DataItem>()); That makes the operator, rather than the subject, the winner here … it keeps true to that Golden Rule of Programming – the KISS principle. (That’s Keep It Simple Stupid, in case you were wondering.)

Multiple Timelines From a Single (Input) Source

StreamInsight | Code Sample
This is an interesting challenge that I’ve come across a couple of times – the last time being on the StreamInsight Forum. How do you handle the case where you have a single inbound event source but multiple timelines for the events? If you are wondering how this can happen, it’s actually pretty simple – you’ll have some kind of event aggregator that combines events from multiple downstream sources and each of these sources has different timestamps – due to unsynchronized system clocks at the event source – and/or different latencies – due to a store/forward/batch update type of system. “But,” you are saying, “you can just hook into the events at the initial source and then you don’t have any problem.” Sure … if you can. But that’s not always possible for any number of reasons. In Oil & Gas, probably the most common reason is that you don’t own the source system and you get your events (and sensor readings) from another company that actually operates the equipment. Or, even worse, a service provider company that isn’t even the operator of the equipment but only provides data services. And yes, this is very common. What you’ll see in this scenario is a “loss of events”. You do get output but it’ll be from the one source with the “latest” timeline. Everything else gets dropped. Now … you will only see this if you are using declarative CTIs with AdvanceTimeGenerationSettings. If you are enqueuing your CTIs in your source, you’ll get CTI violation exceptions. And specifying Drop for the AdvanceTimePolicy doesn’t help with point events; Drop only affects interval events and only interval events where the event duration extends past the last-issued CTI. So how do you deal with this situation? First, we need to get the initial events in. They have different timestamps and we know that. To simulate this, we’ll have an event source that enqueues items every second. It implements IObservable, of course, so we can easily use it as an event source. The event generation looks like the following: Data Source for (int i = 0; i < 5; i++) {       var evt = new SourceEventData()     {         SourceTimeUtc = DateTimeOffset.Now.AddHours(i).UtcDateTime,         SourceId = i,         ItemId = "Item" + i.ToString(),         Value = i * (_rnd.NextDouble() * 10)     };     PublishEvent(evt);      } So … each “source” has a unique identifier. You will need to have some kind of unique identifier (besides the timestamp) to identify the source. If, for some reason, it’s not available on the initial event, you’ll still have some kind of ID that identifies the item … you can then use a reference stream to identify the source/target timeline for the item. In this sample, each source is offset from the current time by 0 to 4 hours … so we have multiple timelines. Now, let’s create a stream from this: Aggregator Stream var eventAggregatorSource = cepApplication.DefineObservable(() => new StreamSource()); var eventAggregatorSourceStream =     eventAggregatorSource.ToPointStreamable(e => PointEvent<SourceEventData>.CreateInsert(DateTimeOffset.UtcNow, e),     AdvanceTimeSettings.IncreasingStartTime); In this case, we are using the event arrival time for the point event start time. In our sample, we’re not doing anything with this stream except publishing it but we can do some analytics here. For example, you might want to have a query that detects when you aren’t getting events from one of your sources. Or you could include reference data that helps you “reset” the tagged system time with the real time. For example, if you know that one of our sources has a system clock that is 3 days behind, you can add that to the SourceTimeUtc to handle that. One caveat … if you want to be able to detect if the entire source stream is offline, you’ll need to add CTIs to the stream from a timer rather than generating them. When generating CTIs, new CTIs are generated only when you have events flowing through. So when you don’t have any events (the whole thing is offline), you won’t get any offline alerts. So … we have a single stream with timestamps by arrival time. For our analytics, we need to have them in the timeline of the underlying source of the data. So … how do we do this? It’s easy … you use a subject. We’ll take the original source stream, enqueued with the arrival time and publish to a subject. Then, we’ll subscribe to the subject for each of our source id’s. This will be the stream in the source system’s timeline and each timeline will be completely independent of other timelines. Once that’s done, we’ll define and then deploy an observable that will filter by a specific source id. Deploying the observable allows us to easily reuse it without needing to define it again. Note, however, that it’s not required to deploy an observable and it doesn’t always make sense to. But since this is something that we’ll reuse over and over again, it makes absolute sense to do it. Subject + Deployed Observable var publishSubject = cepApplication.CreateSubject("EventSourceSubject",     () => new Subject<SourceEventData>());   eventAggregatorSourceStream.Bind(publishSubject).Run("EventAggregatorProcess");   var sourceObservable = cepApplication.DefineObservable((int sourceId) =>     cepApplication.GetSubject<SourceEventData, SourceEventData>("EventSourceSubject")         .Where(e => e.SourceId == sourceId)); //Deploy the observable; we can just use this ... sourceObservable.Deploy("EventsForSourceId"); To wrap it all up, all we need to do is now get our deployed observable, invoke the method and then convert to a point stream. When we create the point stream, we’ll use the SourceTimeUtc property as the start time for the point, converting it to the timeline of the downstream source. Event Source Stream private static void CreateStreamForEventSource(Application cepApplication, string observableId, int sourceId) {     var sourceStream = cepApplication.GetObservable<int, SourceEventData>(observableId)         .Invoke(sourceId)         .ToPointStreamable         (e => PointEvent<EventData>.CreateInsert(new DateTimeOffset(e.SourceTimeUtc), (EventData) e),             AdvanceTimeSettings.IncreasingStartTime);     sourceStream             .Bind(cepApplication.DefineObserver(() => Observer.Create<PointEvent<EventData>>(WriteOutput)))             .Run("EventStream" + sourceId.ToString());   } At the end of it, we have a single process for the source events coming from our event aggregator. This process is synched to the local system clock and uses event arrival time for the event timestamp. Then, we have a process for each of our individual sources and each of those processes as a completely different, independent timeline … each process has its own clock. This is probably one of the clearest examples that I’ve seen of what’s meant by “application time” … something that I found to be a difficult concept to fully get my head around when I first started doing StreamInsight way back long ago. You can download the sample project that shows all of this and the multiple timelines from my SkyDrive. Enjoy!

How long did that edge event take–Reprising the Reprise

Code Sample | StreamInsight
So … I just posted a little bit ago. I thought that the solution that I came up with was pretty decent … and certainly an alternative to the subjects … but I still wanted to be able to determine a dynamic timeout based on payload. There had to be a way to do that so I put that whole thought process on a background thread while I did other stuff. And when it came to me … the solution was pretty simple, actually. In fact, this is a technique very similar to the standard queries that we use to detect offline sensors but with a slightly different twist. Here’s what you do: rather than shifting the event start time (as we did before), you simply alter the duration of the source edge event. You see, when edge events are enqueued but don’t have an end, the event in the engine has an end time of DateTimeOffset.MaxValue (you can see this in the Event Flow Debugger). When the end edge is enqueued, the initial item is retracted and the new one is inserted in the stream. Now, we used this behavior in the previous example to do our join to see which events still running after their timeout expired. But we can twist this a bit … rather than shifting the event time, we can set the event duration to a specified end time. AlterEventDuration, unlike ShiftEventTime, provides access to the payload, allowing us to pass in a timeout duration as a part of the payload. We can then do a Left Anti-Semi Join between the source stream and the altered stream to see which items in the source stream are no longer in the altered stream. This then gives us the events that have exceeded their allowed duration based on the payload. So … here’s the LinqPad sample: LinqPad Sample void Main() {     DateTimeOffset startTime = new DateTimeOffset(2013, 7,1,12,0,0,TimeSpan.FromHours(-6));     var items = Application.DefineEnumerable(() => new SourceItem[]{             new SourceItem("Item1", 4, 0),             new SourceItem("Item2", 6, 0),             new SourceItem("Item3", 3, 2),             new SourceItem("Item1", 4, 0, 3),             new SourceItem("Item4", 5, 3),             new SourceItem("Item2", 6, 0, 4),             new SourceItem("Item3", 3, 2, 8),             new SourceItem("Item4", 5, 3, 10),                      });          var source = items.ToEdgeStreamable(e => e.GetEvent(startTime), AdvanceTimeSettings.IncreasingStartTime);          //AlterEventDuration works with edges ... it sets an end to the edge.     //With AlterEventDuration, we get the payload (unlike ShiftEventTime)     //We'll use the payload as our max allowed duration     var altered = source.AlterEventDuration(e => TimeSpan.FromMinutes(e.Payload.Value));          //Now, let's get the ones "taking too long".     //What we want are src items *with no match* in the altered     //This will give us edges that are still "live"     //but their max allowed duration has passed.     var tooLong = from src in source.LeftAntiJoin(altered, (s, a) => s.Id == a.Id)                   select src;     //Output as the different event shapes.     //Note the relationship between the event times and the CTIs.     //Point is a good option.     tooLong.ToPointEnumerable().Dump("Timed Out Items - Point");     //Interval isn't a good option. Doesn't get output until the end.     tooLong.ToIntervalEnumerable().Dump("Timed Out Items - Interval");     //Edge is also a good option. You get the start and the end.     tooLong.ToEdgeEnumerable().Dump("Timed Out Items - Edge");      } // Define other methods and classes here class SourceItem:DataItem {     public SourceItem(string id, int value, int startTimeOffset){         Id = id;         Value = value;         StartTimeOffset = startTimeOffset;         EndTimeOffset = null;     }     public SourceItem(string id, int value, int startTimeOffset, int endTimeOffset){         Id = id;         Value = value;         StartTimeOffset = startTimeOffset;         EndTimeOffset = endTimeOffset;     }     public EdgeEvent<DataItem> GetEvent(DateTimeOffset startTime){         if(!EndTimeOffset.HasValue){             return EdgeEvent<DataItem>.CreateStart(startTime.AddMinutes(StartTimeOffset),                 this);         }         return EdgeEvent<DataItem>.CreateEnd(startTime.AddMinutes(StartTimeOffset),                 startTime.AddMinutes(EndTimeOffset.Value),                 this);     }     public int StartTimeOffset;     public int? EndTimeOffset; } class DataItem{     public string Id;     public int Value; } I do love it when a query comes together …

How long did that edge event take–Reprise

StreamInsight | Code Sample
In my last post, I talked about how to get the duration of an edge event using subjects. The only challenge with that is you don’t get the result until the end of the edge. While this does allow you to do very interesting things … for example, determine which events took longer more than 2 standard deviations greater than the average time … it doesn’t give you notification as soon as the item starts to take too long. That’s because it can’t calculate the duration until the end of the edge. But it’s also useful to get the notification as soon as an edge goes over a specific event duration. This is actually easier to do than I initially thought; I must have been having a brain-fart moment. (They’re rare but they do happen). So, this morning, I quickly whipped up a sample in LinqPad that does exactly this … as soon as an edge event goes over a pre-configured duration, you get notification. Now, while you can’t do anything really cool like dynamically determining the timeout from the event payload, that’s not always necessary. So it’s another option, just with different limitations and uses. LinqPad Sample void Main() {     DateTimeOffset startTime = new DateTimeOffset(2013, 7,1,12,0,0,TimeSpan.FromHours(-6));     SourceItem[] items = new SourceItem[]{             new SourceItem("Item1", 5, 0),             new SourceItem("Item2", 6, 0),             new SourceItem("Item3", 8, 2),             new SourceItem("Item1", 5, 0, 3),             new SourceItem("Item4", 2, 3),             new SourceItem("Item2", 6, 0, 4),             new SourceItem("Item3", 8, 2, 8),             new SourceItem("Item4", 2, 3, 10),                      };          var source = items.ToEdgeStream(Application, e => e.GetEvent(startTime), AdvanceTimeSettings.IncreasingStartTime);          //Shift the start time of the events to our timeout value.     //In this case, our timeout is 5 minutes; any event that     //has a duration longer than 5 minutes should be output to the sink.     var shifted = source.ShiftEventTime(e => TimeSpan.FromMinutes(5));          //Join the shifted items back to the source.     //The join will be applied only when the shifted time     //overlaps the original edge event.     //If the edge event has an end before the shifted event     //it won't be output to the sink as the join won't be applied.     var tooLong = from src in source                   from shft in shifted                   where src.Id == shft.Id                   select shft;     //Output as the different event shapes.     //Note the relationship between the event times and the CTIs.     //Point is a good option.     tooLong.ToPointEnumerable().Dump("Timed Out Items - Point");     //Interval isn't a good option. Doesn't get output until the end.     tooLong.ToIntervalEnumerable().Dump("Timed Out Items - Interval");     //Edge is also a good option. You get the start and the end.     tooLong.ToEdgeEnumerable().Dump("Timed Out Items - Edge");      } // Define other methods and classes here class SourceItem:DataItem {     public SourceItem(string id, int value, int startTimeOffset){         Id = id;         Value = value;         StartTimeOffset = startTimeOffset;         EndTimeOffset = null;     }     public SourceItem(string id, int value, int startTimeOffset, int endTimeOffset){         Id = id;         Value = value;         StartTimeOffset = startTimeOffset;         EndTimeOffset = endTimeOffset;     }     public EdgeEvent<DataItem> GetEvent(DateTimeOffset startTime){         if(!EndTimeOffset.HasValue){             return EdgeEvent<DataItem>.CreateStart(startTime.AddMinutes(StartTimeOffset),                 this);         }         return EdgeEvent<DataItem>.CreateEnd(startTime.AddMinutes(StartTimeOffset),                 startTime.AddMinutes(EndTimeOffset.Value),                 this);     }     public int StartTimeOffset;     public int? EndTimeOffset; } class DataItem{     public string Id;     public int Value; } When looking at the output, note where the events sit between the CTIs … that shows, without question, that the event is released as soon as the application time moves past the relevant timeout. You do need to use Point events for this; if you use an Interval, the final event won’t be output until the end is reached, which would defeat the purpose of what we’re trying to do. You could use Edges as output; in this case, you’d get a start as soon as it “takes too long” and then an end when the end edge for the original item in enqueued into the stream. With this, you could then feed it into the subject from my previous post and determine how much longer it took than the timeout, should that be interesting to you.

How long did that edge event take?

Code Sample | StreamInsight
With StreamInsight, you have three different event shapes – point, interval and edge. With a point, the duration is easy; it’s a single tick. Intervals are also easy since you know both the start and end times when you enqueue them. But what about edge events? Edge events are different because you know the start time, but not the end time, when you enqueue them. As I’m working on my presentation for Sql Saturday Baton Rouge, it occurred to me that this would be a very interesting bit of information to know. However, with just queries, it was difficult to do. (I say difficult; that doesn’t mean that it can’t be done, I just couldn’t figure out how.) But … once again, subjects come to the rescue. Since a subject acts as a sink and a source, I can attach it to the edge stream, evaluate the events as they are published and, when an end comes along, calculate the duration and then publish that to another stream. Even if your inbound events aren’t edges, remember that you can jump between the different shapes by manipulating your event’s temporal headers easily enough. So here goes. First, let’s declare the class and implement ISubject. I’ve kept it so this subject can be used with any payload class. Class Declaration public class EdgeEventDurationSubject<TPayload> : ISubject<EdgeEvent<TPayload>, PointEvent<EventDuration<TPayload>>> {       private List<IObserver<PointEvent<EventDuration<TPayload>>>> _observers =         new List<IObserver<PointEvent<EventDuration<TPayload>>>>(); } You’ll see that it publishes point events of type EventDuration<TPayload>. This generic event allows us to “hook up” to any stream and then publish the event duration with the payload. The query writer can then join this back to the original stream (if desired) or just continue on; after all, they have the entire payload. EventDuration Class public class EventDuration<TPayload> {       public double Duration { get; set; }       public TPayload Payload { get; set; } } Note that we support a list of observers – so we can publish to multiple sinks. Next, let’s implement the IObservable side of the subject. IObservable Implementation public IDisposable Subscribe(IObserver<PointEvent<EventDuration<TPayload>>> observer) {     lock (this)     {         if (!_observers.Contains(observer))         {             _observers.Add(observer);         }         return Disposable.Create(() => RemoveObserver(observer));     } }   private void RemoveObserver(IObserver<PointEvent<EventDuration<TPayload>>> observer) {     lock (this)     {         if (_observers.Contains(observer))         {             _observers.Remove(observer);         }     } } Note that we are cheating a bit and using the Disposable.Create from the Reactive Extensions. But it’s easy and it works well so I don’t feel a bit bad about it. Now, for the interesting part – implementing the IObserver side of the subject. We know that we’ll be consuming edge events because of our implementation of ISubject and all of our work will be done in the OnNext method. OnNext public void OnNext(EdgeEvent<TPayload> value) {     if (_observers.Count == 0)     {         //No one is listening.         return;     }     if (value.EventKind == EventKind.Cti)     {         PublishEvent(PointEvent<EventDuration<TPayload>>.CreateCti(value.StartTime.AddTicks(1)));         return;     }       if (value.EdgeType == EdgeType.End)     {         var payload = value.Payload;         var result = new EventDuration<TPayload>         {                          Duration = (value.EndTime - value.StartTime).TotalMilliseconds,             Payload = payload,         };         PublishEvent(PointEvent<EventDuration<TPayload>>.CreateInsert(value.EndTime.AddTicks(-1), result));     }      }   private void PublishEvent(PointEvent<EventDuration<TPayload>> newEvent) {     lock (this)     {         foreach (var observer in _observers)         {             observer.OnNext(newEvent);         }     } } We take the CTI’s and simply republish them, adding a tick to them. This pretty much guarantees that our CTIs will be after any events that we publish. If it’s not a CTI, it’s an Insert so we check the edge type. Only End edges have an end time and, besides, that the idea behind edges … you don’t the end when you know the start. So we have to wait for the end to determine the duration. That’s easy enough … EndTime – StartTime, get the total milliseconds (because TimeSpan isn’t valid in a StreamInsight stream) and publish the event to the observers. By setting the point’s start time to a tick less than the edge event’s end time, we make sure that we can join back to the original stream without shifting if we so desire. Using this in our queries is very simple. OnNext var timeEvalSubject = cepApplication.CreateSubject("TimeEval",     () => new EdgeEventDurationSubject<WebRequestPayload>());   var requestExecutionTimeStream = timeEvalSubject.ToPointStreamable(e => e  ); All you need to do now is make sure that the subject is bound to the other streamable bindings using With so they all run in the same process.

Where does StreamInsight fit?

StreamInsight | Idle Babbling
I’ve been working with StreamInsight for over 2 years now and this is one of those questions that I get all the time. Over that time, I’ve refined where I see StreamInsight fitting into an enterprise architecture … and that has included significantly expanding potential use cases. Typically, when people look at StreamInsight – or other CEP tools from other vendors – they think of monitoring sensors or financial markets. Of course, StreamInsight can do this and it’s very good at that but there’s a lot more that it can do and value that the technology can provide to the enterprise. Based on the number of forum posts and the increasing variety of users posting on the forums, it seems that others are beginning to experiment in this area as well and that adoption is picking up. So, in the past couple of months, I’ve really put a lot of thinking into where StreamInsight fits outside of the traditional use cases and wanted to share that. The Paradigm Shift StreamInsight looks at and handles data in a fundamentally different way than we, as developers, are used to. This is something that everyone getting into StreamInsight struggles with, myself included. Traditionally, we look at data that is in some kind of durable store … whether that be a file, a traditional RDBMS or an OLAP cube. We look at what has happened and was recorded for posterity. It’s stable and static. Time, in traditional data, is an attribute, a field value, that is descriptive of the data that we are looking at but not an integral dimension of the data. Time doesn’t inherently impact how our joins work, how we calculate aggregates or how we select unless we use it in our WHERE clause. It’s not a part of the SELECT or FROM clauses that actually define the shape and structure of the data set. It’s static, relative to the dataset, and references some time in the past, much like a history book’s timeline. For StreamInsight, it’s very different. In SI, time is an integral dimension of the data, a part of the FROM clause that we are familiar with. You don’t specify this in any of your LINQ queries but it’s there, an invisible dimension that impacts and affects everything that you do. It’s also the thing that’s the hardest for developers to get their heads around because it is so radically different. Many of the query-focused questions on the forums deal with trying to understand how all of this temporality works and how timelines, CTIs and temporal headers interact with events and queries. The things that this allows you to do are difficult in traditional systems. Certainly, they can be done but not without a TON of code that navigates back and forth, keeping track of time attributes and processing in a loop. Even WINDOW functions don’t come close (I’ve been asked this) and, while they may provide some capabilities to do things like running averages, doing something like “calculating the 30 minute rolling average every 5 seconds” – which is very easy in StreamInsight – is pretty difficult to accomplish. Native and inherent understanding of the order of events (previous vs. current vs. next) or holes in the data is also difficult – that’s going back to the cursoring and order by clauses with a whole lot of looping in the mix as well. Yet, with StreamInsight’s temporal characteristics, these things are relatively simple to do. More sophisticated things, like deadbands and rate of change, are even more difficult with traditional data stores but absolutely doable in StreamInsight with extension points like user-defined operators and aggregates. One comparison I like to make is to talk about driving. The traditional data paradigm would have you driving with a digital camera and taking a picture every x amount of time and then using the display of the picture to navigate and drive. Could you actually do this? Maybe. Probably, if you were good and careful, your camera was fast enough, traffic wasn’t heavy and people actually drove intelligently. But you’d miss a whole lot of things that happen in between snapshots, you’d have a more difficult time understanding where things are going and there’d be a latency in your reaction time. StreamInsight, however, is more similar to how we actually drive and take in our surroundings … our senses provide our brains with a continuous stream of information in a temporal context. We are constantly evaluating the road, other vehicles, their relationship to our current position and where we are going. StreamInsight does similar things with data, though not quite as efficiently as we do without even thinking about it. Our brains are, essentially, a massively parallel CEP system on steroids. Beyond that, StreamInsight’s understanding of time isn’t necessarily tied to the system clock, another thing that took me some time to get my head wrapped around. Instead, the clock is controlled and moved forward by the application, independent of the system clock. This allows use cases where you can use the temporal capabilities to analyze stored data – essentially replaying the dataset on super-fast-forward. An example of this was a POC that we did for a customer. They had a set of recorded sensor data, with readings for 175 sensors every 5 minutes, that represented about 3 months of data before and shortly after an equipment failure event. They gave us the data and some information about the equipment involved and asked us to find patterns that were predictive of an impending failure. Analyzing the dataset using traditional SQL queries got us nowhere … but when we starting doing some (relatively) basic analysis by running the dataset through StreamInsight, several of the patterns quickly became apparent. In doing this, we used the original timestamps but enqueued the data every 50ms – so 50ms of real-world time equaled 5 minutes of application time. In doing this, four months of data could be compressed down to less than a half hour to process. Now, if we had more powerful laptops than the dual-core i7’s with 8 GB of RAM that we were using at the time, we could have done it even faster. Our real limitation wound up being the disk – we were reading the data from a locally installed Sql Server instance and writing to local CSV files to look at the results in Excel. In the end, we were able to determine that, by looking at 2 different values and their relative rates-of-change and variability over a specific time period, we could eliminate false alerts for things like equipment shutdown and pick up the impending equipment failure about a month before it actually happened. If we had a better understand of the physics and engineering involved, we could probably have increased the warning time – but that wasn’t too bad for a couple of developers that didn’t have the full specs of the equipment, very little (or no) engineering background, basically shooting in the dark. In tests, we’ve pushed about 30,000 events per second – randomly generated and without any analytics – through StreamInsight on our laptops and over 100,000 events/second (with analytics, remote Sql Server data source) on a commodity server-class machine (dual quad-core XEON with 24 GB RAM) with an average of 35% CPU utilization. The Three V’s – Big Data “Big Data” is a very hot topic these days. Everyone’s all excited about the new capabilities provided by technologies like Hadoop/MapReduce and Massively Parallel Processing (MPP) and with good reason. These are ground-breaking technologies that allow us to more effectively get information from large amounts of data. But these are still technologies in the traditional paradigm of data – capture, store, retrieve and process. There is a latency involved with this that simply can’t be overcome due to the store/retrieve part of the cycle. No matter how fast the capture and process steps are, the disk is the bottleneck of the system. While SSD’s reduce this latency, they can only do so much and are still the slowest part of the entire system. When talking about Big Data, the “three V’s” often come up … Velocity, Volume and Variety. Hadoop and MPP deal – very well – with the massive volumes of data and Hadoop adds capabilities around variety. But they have trouble – because of the paradigm – with velocity – or the frequency with which data is generated and captured. And, let’s face it, velocity is a critical piece these days. Ten years ago, we talked about “moving at Internet speed” and the agility that the fast past of change required businesses to have. Today, what we used to call “Internet speed” seems a snail’s pace. We’ve even coined new terms to describe it; “going viral” comes immediately to mind. I’ve come to call it “moving at Twitterspeed” and enterprises need to become even more agile to keep up, especially when it comes to marketing. The impact of social media – particularly Facebook and Twitter – has really driven this fundamental change in the market and companies have, more than once, found themselves completely blindsided by viral explosions across Facebook and Twitter. This velocity, coupled with an understanding of increasing or decreasing velocity, combined with the sheer volume is becoming a critical business capability that companies are struggling with and, in some cases, failing spectacularly. With StreamInsight, handling the velocity of “Twitterspeed” and understanding how things are trending is absolutely do-able. Imagine a corporate marketing department being able to hook into Twitter and other social media streams, analyzing for specific keywords (or hashtags) and highlighting increasing (or decreasing) trends in these keywords … as they are happening. Within minutes, they can then begin to get on top of trends as they are just beginning to “go viral” and formulate an intelligent, coherent response while there’s still time to get ahead of it. It used to be that these trends weren’t readily apparent for days or weeks but now it’s down to hours or minutes when things are going at Twitterspeed. Now, add in geo-location analytics and customers can begin to understand not only what is going on, but where. From here, we can get into more effective and meaningful targeting of marketing messages that may have relevance in one area but not another. Outside of social media, we also have the increasing interest in the “Internet of Things” – smart devices that capture and report data. These have the potential to take both volume and velocity to a whole new level that makes even “Twitterspeed” look sluggish. Even now, as the IoT is in its earliest stages, there are billions of devices participating, ranging from the smart phones that we carry with us everywhere to RFID, smart meters, smart roads and other device sensors to shoes and wristbands and everything in between. We are just entering an age of truly ubiquitous computing and connectivity, allowing us to capture data from a broad range of sources, both traditional and non-traditional. In many of these cases, even if the velocity isn’t fast, the volume is simply mind-boggling. With an estimated 8 billion or so connected devices today, volumes get very big, very fast, even if they aren’t changing rapidly. And the number of these devices is increasing exponentially, with a projected 50 billion devices by 2020. StreamInsight is designed to handle both volume and velocity. Because it doesn’t require storage of data but, instead, does analytics in memory, it’s bound by CPU and memory speeds, not by disk. As a result, it can handle data velocity and volume that would simply overwhelm disk-oriented systems. This is especially the case when data is required to be continuously updated and analyzed … to do this with traditional technologies you have to poll the data store. You’ll have to be really careful doing this because you’ll very quickly overwhelm the system. But because StreamInsight pushes all the way through, polling … and the latency and scalability issues associated with it … isn’t a significant problem (unless that’s how you get your source data but that’s a completely different issue). You will, however, want to downsample the data before you send it to a sink/output adapter and in the vast majority of cases, this is actually desirable. Storing every piece of data from the Internet of Things is, quite simply, cost-prohibitive from a storage perspective. That brings us to the third “V” – Variety. This is something that is a mixed story with StreamInsight. Individual input sources must have a strongly-typed schema; this is your payload. This limits what you can do with the unstructured data that is becoming more prevalent these days. That said, StreamInsight is very good at bringing together multiple (strongly-typed) sources, synchronizing them within the application timeline and then performing analytics across all of them within a temporal context. Take, for example, real-time web server analytics (I’m doing a presentation on this at Sql Saturday Baton Rouge, by the way). One one hand, you have performance counters – we’ve done a good job with these and there’s tools a-plenty to monitor them. But how do they relate to the executing pages? What pages, what parameters, are executing when the CPU spikes? Are there specific pages that take too long to execute and wind up causing our requests to queue? This requires not only perfmon counters but also some hooks into the ASP.NET pipeline. From there, StreamInsight can take these two very different (and differently structured) data sources and merge them together, synchronizing in time. BUT … the individual data sources are still highly structured. Bringing it Together Don’t take any of this to mean that I’m discounting traditional data paradigms. They are … and always will be … very important. They provide a view into the past that CEP technologies (like StreamInsight) just won’t be able to do – and really aren’t the right tools for anyway. And these traditional paradigms, with their historical information, can and should be used as “reference data” (or metadata) that further inform real-time analytics. It’s the old axiom … to understand the present, you also need to understand the past and how it relates to the present. So it’s not an either-or discussion but how these technologies fit into the continuum of data and analytics. There’s a lot of focus on Big Data from a traditional paradigm but there’s also a significant amount of value to be found in the data that’s on its way to the storage, at the capture point of the process. Downsampling at this stage can also optimize storage costs and overall read performance from Big Data stores as well as providing analytics in near-real-time. StreamInsight expands our capabilities for business intelligence and shortens the timeframe for getting actionable information from the volume of rapidly changing data that is becoming increasingly important – even critical – to business faced with things moving at Twitterspeed.

Dual Mode Data Sinks–Part II

StreamInsight | Code Sample
Well, it’s been a while. Far too long, in fact. I’ve been on a project that’s been pretty intense has consumed much of my time. So … my apologies for taking so long to continue. With that out of the way, let’s pick up with our data sinks. There were two major things left undone with the last post. First, the API. I left it without an abstracted, cleaner API that was similar to the adapter model. That’s the first thing to add. Now, we don’t want to go “all the way” with this and produce a runnable process – that would defeat one of the coolest features of a process – the ability to contain multiple sources and sinks on one clean block that starts and stops as a whole. This also allows us to send individual queries to multiple sinks (without DQC or subjects) and makes reuse of the sources across multiple sinks simpler than in the adapter model with DQC. And, to keep in consistent with the adapter model, we want to attach our sink to the stream directly from the IQStreamable interface – and this means extension method. (I’ll confess that I have come to deeply love extension methods, by the way … I really like the clean API that they create). Our extension method will encapsulate creating the observer and then binding the stream to the observer. So that we can include multiple bindings within a process, we’ll just return the streamable binding that we create. Our extension method looks like this: ToBinding Extension Method public static IRemoteStreamableBinding ToBinding<TPayload>(     this IQStreamable<TPayload> stream,     Application cepApplication,     Type consumerFactoryType,     object configInfo,     EventShape eventShape) {     var factory = Activator.CreateInstance(consumerFactoryType) as ISinkFactory;       if (factory == null)     {         throw new ArgumentException("Factory cannot be created or does not implement ISinkFactory");     }       switch (eventShape)     {         case EventShape.Interval:             var intervalObserver =  cepApplication.DefineObserver(() => factory.CreateIntervalObserverSink<TPayload>(configInfo));             return stream.Bind(intervalObserver);                      case EventShape.Edge:             var edgeObserver = cepApplication.DefineObserver(() => factory.CreateEdgeObserverSink<TPayload>(configInfo));             return stream.Bind(edgeObserver);                      case EventShape.Point:             var pointObserver = cepApplication.DefineObserver(() => factory.CreatePointObserverSink<TPayload>(configInfo));             return stream.Bind(pointObserver);                      default:             throw new ArgumentOutOfRangeException("eventShape");     }      } We won’t call it ToQuery(), doing so, while consistent, wouldn’t be descriptive at all. A binding that then gets run in a process is an inherently different thing from a query. With the adapter model, a query is a single runnable unit of logic; a binding is not. A binding can only run within the context of a process – it is the process, not the binding, that is the closest corollary to the query. But, the naming and the arguments are similar enough. Now, when we want to bind a stream to a sink, it looks like this: Binding a Stream to a Sink var sinkConfig = new ConsoleOutputConfig()     {         ShowCti = true,         CtiEventColor = ConsoleColor.Blue,         InsertEventColor = ConsoleColor.Green     };   var binding = data.ToBinding(cepApplication, typeof (ConsoleOutputFactory), sinkConfig, EventShape.Point); binding.Run("Hello"); You’ll also notice, if you haven’t noticed before, that this API forces you to use the entire event, not just the payload, when sending to the sink. While you can just send the payload to the sink and ignore the event itself, that seems quite pointless and can also cause quite a bit of confusion. I’ve seen a lot of folks put timestamps in the payload; I typically don’t. In the vast majority of cases, I rely on – and use – the timestamps on the event itself. These are the timestamps that actually matter; these are the timestamps that the StreamInsight engine is using when evaluating the event. These are the timestamps of the event within the application timeline. Anything in the payload is just an attribute of the event; the start time and end time are a key part of the definition. Also, if you only send the payload to your sink, you won’t get any CTIs at all. CTIs are kinda important, I think, for a sink for a couple of reasons. First, even if you have no events coming through but you have CTIs, you know that the engine is running and pumping data through. This is particularly useful when you aren’t getting any data at all in your sink. Without the CTIs, you won’t have a good idea if your query is actually getting processed through the engine or if you have some other error in your logic. Second … CTI’s let you know when you can/should write to a durable store in a batch. Here’s the thing – Insert events don’t get released to your sink until there is a CTI. That CTI tells you, also, that all of the events up to that point in the application time have been released to your sink. So that’s the perfect time to batch up any writes that you have to a durable store (say, Sql Server) and write them in one shot. Batched updates/inserts are going to scale far better than single writes and that’s always a very good thing in a StreamInsight application. You always need to remember that, in many cases, that sink is going to be the biggest potential bottleneck in the application because it usually involves some sort of I/O. And that I/O is always going to be slower than the raw CPU and memory-bound performance that you can get from your queries. The next thing on our list is a bit tougher. I mentioned it in my previous post and it’s the one thing that I really didn’t like about the entire reactive model when I first saw it. You see, I’m a big fan of untyped adapters, especially output adapters. Yes, they are pretty useful on the input side as well but on the output they are absolutely essential. You can’t always know what your payload is going to look like and you don’t want to be writing a sink for every different payload that you dream up. For input … in a lot of cases, you can do OK since you’ll have a good idea of what the schema is going to look like. This is something that also came up recently on the StreamInsight forum so I know I’m not the only one that misses it. Fortunately, the .NET Framework gives us a way to make this happen. It’s more work than a simple untyped adapter but, with reflection, we can get the same kind of flexibility that we had with untyped adapters in our sinks. What we want to do is to have, at the end, something similar to what an output adapter provides for us – name/value pairs where nested classes have a [ParentProperty].[ChildProperty] type name. While we’re at it, we’ll simplify it a bit; it’ll be a simple, straightforward dictionary with the name and the value, rather than having a separate event definition and then matching the index with the name, as in an untyped adapter. At the highest level, we’ll have a method called "GetPropertyValues” that takes the object and handles all of the details for us. Of course, it’ll be an extension method. GetPropertyValues /// <summary> /// Returns the values of properties as name/value pairs. /// </summary> /// <param name="source">The object to read properties from</param> /// <returns></returns> public static Dictionary<string, object> GetPropertyValues(this object source ) {     var propertyDictionary = new Dictionary<string, object>();     //Get all of the properties.     AppendPropertyValues(source, string.Empty, propertyDictionary);     return propertyDictionary;   } The real work is in the AppendPropertyValues method. This takes the dictionary, the object and a property name prefix and appends the properties and their values to the dictionary. To do this, first we get a list of the public instance properties on the object. From there, we loop over them. If one of them is a class, we then recurse into AppendPropertyValues but adding the source property name as the prefix. After that, we also get the fields with the same binding flags; the GetProperties method won’t return the fields and there isn’t a single reflection method that I could find that would give me both in a single call. AppendPropertyValues private static void AppendPropertyValues(object source, string prefix, Dictionary<string, object> propertyDictionary) {     var properties = source.GetType().GetProperties(BindingFlags.Public | BindingFlags.Instance);     foreach (var propertyInfo in properties)     {         if (_validProp.Contains(propertyInfo.PropertyType))         {             var method = propertyInfo.GetGetMethod();             object value = method.Invoke(source, null);             propertyDictionary.Add(prefix + propertyInfo.Name, value);         }         else if(propertyInfo.PropertyType.IsClass)         {             var method = propertyInfo.GetGetMethod();             object value = method.Invoke(source, null);             AppendPropertyValues(value, prefix + propertyInfo.Name + ".", propertyDictionary);         }     }     var fields = source.GetType().GetFields(BindingFlags.Public | BindingFlags.Instance);     foreach (var fieldInfo in fields)     {         if (_validProp.Contains(fieldInfo.FieldType))         {             object value = fieldInfo.GetValue(source);             propertyDictionary.Add(prefix + fieldInfo.Name, value);         }         else if (fieldInfo.FieldType.IsClass)         {             var value = fieldInfo.GetValue(source);             AppendPropertyValues(value, prefix + fieldInfo.Name + ".", propertyDictionary);         }     } } With this now in place, we have the same capabilities in our sinks that we had in our output adapters. Using this, we can change our ConsoleDataConsumer to display the payload properties, rather than just some basic header info. Code Snippet if (outputEvent.EventKind == EventKind.Insert) {     Console.ForegroundColor = Configuration.InsertEventColor;          var eventValues = outputEvent.Payload.GetPropertyValues();     StringBuilder output = new StringBuilder(2048);     foreach (var eventValue in eventValues)     {         output.Append(eventValue.Key).Append(":").Append(eventValue.Value.ToString()).Append("\t");     }     Console.WriteLine("Insert Event Received at " + outputEvent.StartTime + "\t" + output.ToString()); } else if (Configuration.ShowCti) {     Console.ForegroundColor = Configuration.CtiEventColor;     Console.WriteLine("CTI event received at " + outputEvent.StartTime); } Console.ResetColor(); Now … we could certainly do some optimizations on this. For example, we could cache the property names and definitions for each of the types. I’ll leave that as an exercise for later … or for you, dear blog reader. You can download the solution from my SkyDrive. As you look, you’ll also see that I’ve done some refactoring on the project names and namespaces …