Ruminations of J.net idle rants and ramblings of a code monkey

RealTime Web Analytics Presentation And Demo From HDNUG

Code Sample | StreamInsight
Here’s all of the materials from the presentation that I did at the Houston .NET User’s Group on March 13. Some of the things that I updated from the version for Baton Rouge Sql Saturday include: Lock around active requests HashSet: Added a lock block around adding and removing items from the _activeRequest HashSet. Since the HashSet isn’t thread-safe and our source is very highly multi-threaded, we need to make sure that operations that modify the internal array are thread-safe. This eliminated some random “IndexOutOfBoundsException” in the source that would halt StreamInsight process. Checks in StandardDeviation UDA: Added checks for the number of values as well as the result. If the number of values in the set for the standard deviation is less than one, the standard deviation is always 0. Also, after the calculation, there’s an additional check on the result to make sure it’s not NaN. This eliminated some random exceptions in the queries that were calculating the standard deviation that would halt the StreamInsight process. Both cases highlight the need to make sure that your custom code running in StreamInsight is tight and solid. They were pretty difficult to track down as well … both would happen randomly. Intellitrace was absolutely essential to identifying and resolving the issues. After fixing them, I was able to run for hours without a problem. Notes to reproducing the demo: I’m not including the site that I used when running the demo. You can get this from the NopCommerce site on CodePlex. I used the out-of-the-box site with sample data. Keep in mind that the module used for the demo forces some of the requests to take 3 seconds – these are our “Bad Actors” – so it’s in no way representative of the performance of nopCommerce. You’ll need to set it up on a local site on port 81 if you want to use the Visual Studio load tests. From there, you need to copy the WebSiteMonitor.Contracts and WebSiteMonitor.Module assemblies into the \bin folder of the site and add the following into the web.config: Under system.WebServer, add the module Code Snippet <modulesrunAllManagedModulesForAllRequests="true">   <addname="MonitorModule"type="WebSiteMonitor.Module.WebSiteMonitorHttpModule"/> </modules> Under system.ServiceModel, add the WCF configuration Code Snippet <system.serviceModel>   <serviceHostingEnvironmentaspNetCompatibilityEnabled="true"multipleSiteBindingsEnabled="true" />     <bindings>     <netTcpBinding>       <bindingname="streamedNetTcpBinding"transferMode="Streamed" />     </netTcpBinding>   </bindings>   <client>     <endpointaddress="net.tcp://localhost/eventService"binding="netTcpBinding"       bindingConfiguration="streamedNetTcpBinding"contract="WebSiteMonitor.Contracts.IRequestService"       name="Client" />   </client> </system.serviceModel> You may (probably will) need to specify the StreamInsight 2.1 instance name that you are using. This is in the app.config for the WebSiteMonitor.WinForms project under “applicationSettings”. The setting name is “StreamInsightInstance” (very creative, I know). You’ll want to run the client app “As Administrator” or reserve the URLs for non-administrator users and accounts. If you are running from Visual Studio, run Visual Studio as Administrator. I tend to run as Administrator when testing and running the demo. In the real-world, you’d reserve the URLs. The TestWebSite project in the solution is a “New Web Site” template from Visual Studio and helps make sure that everything is set up properly. It also has the configuration settings.

Decoupling Queries from Input and Output

Code Sample | StreamInsight
There’s something that really disturbs me about how all of the StreamInsight samples are put together – everything is very, very tightly coupled. Streams are tightly bound to the source and the target of the data and there’s no good guidance on how to break all that stuff apart. I understand why – it certainly simplifies things – but that’s Bad Mojo™ and not how we want to build enterprise applications. Let’s take a simple scenario. Let’s say that you are developing queries to detect key events for, say, an offshore well. You have a good idea what these events look like – what follows what – and you even have some recordings that you can play back at high speed to test your algorithms. All of your query code is built much like the demos are, so you’re pretty tightly bound to the source (the database) and your output (the console, just for giggles). Now … how do you hook this up to your production sources? And you certainly won’t be using a console output for the events that you detect in production. So do you go in, real quick-like, and change all of your code before compiling it for production? If so … let me know how that goes for you. And don’t call me when things start falling apart while you are doing last-minute compiles and emergency deployments and the like. This, however, is how all of the samples are written. Granted … they are samples and aren’t meant to be “ready for the real-world”. But, sadly, there’s precious little guidance out there on a better way to do this in StreamInsight. With that said, however, this isn’t a new problem in software development. Oh, sure, the technology may be new and/or different, but the pattern of the problem certainly isn’t. What we need to do is abstract the specification of the consumer or producer from its usage when creating the streams. Fortunately, we’ve already defined a pretty well abstracted factory pattern for constructing our producers and consumers, even in the context of Reactive-based streams so this helps (and was part of the method to my madness!). In addition to abstracting the producers and consumers, we also need to have a pattern for reuse of query logic. Take this scenario as an example: we’re monitoring offshore platforms in real-time. We have a set of queries that we use to process the sensor readings from the platforms and this logic should be applied to each of the platforms – it detects the key events that we’re concerned with. The only difference between each set of queries is the source of the data (different connections), the metadata associated with the queries and, possibly, the outputs for detected events (though, using subjects, we can aggregate the output events for a single output stream). Enter the Stream Builder This becomes our unit of query re-use. Using a defined interface and implementing classes based on this interface, we can encapsulate the query logic into a reusable “chunk” that gets built together. We can use this to start and stop the queries/process (depending on which model we are using) as well as to provide configuration for different parameters and the producers and consumers. Let’s start with the base class for configuration. StreamBuilderConfiguration publicclassStreamBuilderConfiguration:IConfiguration {     privatereadonlyDictionary<string, EventComponentDefinition> _eventProducers =         newDictionary<string, EventComponentDefinition>();       publicIDictionary<string, EventComponentDefinition> EventProducers     {         get { return _eventProducers; }     }       privatereadonlyDictionary<string, EventComponentDefinition> _eventConsumers =         newDictionary<string, EventComponentDefinition>();       publicIDictionary<string, EventComponentDefinition> EventConsumers     {         get { return _eventConsumers; }     }       publicstring Name     {         get; set;     }       publicvirtualEventComponentDefinition DefaultConsumer     {         get         {             returnnewEventComponentDefinition(){                 ComponentType=typeof(NullDataConsumerConfig),                 Configuration=newNullDataConsumerConfig()};         }     } } First, you’ll notice that we define a dictionaries of EventComponentDefinitions. What is this? Well, keep in mind that we need a factory and a configuration to create our producers and consumers. So … this is what the EventComponentDefinition class encapsulates. EventComponentDefinition publicclassEventComponentDefinition {     publicobject Configuration { get; set; }     publicType ComponentType { get; set; } } In this case, the type for “ComponentType” is the producer/consumer factory class. So … now we have a way to abstract the definition of the consumers and producers (in the configuration) as well as a way to find them. In case you haven’t guessed yet, this provides an inversion of control that uses the dictionary lookup to locate the appropriate service. Now, the producer and/or consumer must still handle the type of the payload for the stream and we don’t have anything (yet) that checks and/or ensures that this is actually correct and compatible but we now have a contract for specifying these items and a contract for creation. Finally, so that we have an interface that we can bind to without having to worry about generics, we’ll extract the Start and the Stop methods into an interface. It’s all nicely abstracted now and ready for  us to create our stream builder. StreamBuilder publicinterfaceIStreamBuilder {     void Start(Microsoft.ComplexEventProcessing.Application cepApplication);       void Stop(); }   publicabstractclassStreamBuilder<TConfiguration> : IStreamBuilderwhere TConfiguration:StreamBuilderConfiguration {       protected StreamBuilder(TConfiguration configuration)     {         this.Configuration = configuration;     }       public TConfiguration Configuration     {         get;         privateset;     }       protectedApplication CurrentApplication     {         get;         set;     }       publicabstractvoid Start(Microsoft.ComplexEventProcessing.Application cepApplication);       publicabstractvoid Stop();       protectedEventComponentDefinition GetConsumer(string name)     {         if (Configuration.EventConsumers.ContainsKey(name))         {             return Configuration.EventConsumers[name];         }         if (Configuration.DefaultConsumer != null)         {             return Configuration.DefaultConsumer;         }         thrownewInvalidOperationException(string.Format(ExceptionMessages.CONSUMER_NOT_FOUND, name));     }       protectedEventComponentDefinition GetProducer(string name)     {         if (Configuration.EventProducers.ContainsKey(name))         {             return Configuration.EventProducers[name];         }           thrownewInvalidOperationException(string.Format(ExceptionMessages.PRODUCER_NOT_FOUND, name));     }   } We also have nice “helper” methods to get the producers and the consumers. Since a consumer isn’t really required, we also have a default consumer – the null consumer – in case it’s not specified. However, the producer is absolutely required so if it’s not found (not provided), we throw an exception. And since the interface has both a Start and a Stop, we can use this nice layer of abstraction to manage our running queries in a way that is abstracted from the underlying StreamInsight API in a consistent method regardless of the API that we are using for the queries. From here, we’ll create more specific implementations for the Reactive and the Query models. ProcessBuilder Implementation With the Reactive model introduced in StreamInsight 2.1, the process is the unit of execution. Related queries are started, executed and stopped as a single unit. You can have multiple bindings in each process for both input and output and they all start and stop together. So … it makes sense that our ProcessBuilder will build a single process with all of the related streams. bound together in that process. We’ll also abstract the code that we need to write for every source stream and for binding every output stream. The trick for handling the bindings is simple … have them added to a list that gets built up and then, when it’s time, bind them all together using With and run them in a single process. Of course, abstracting the bindings also allows us to do this pretty easily. Overriding the StreamBuilder’s Start method allows us to wrap up all of the necessary housekeeping to get started as well as to bind all of the streams together and run them in a single process. We’ll also define a “CreateStreams” method (as abstract) … our concrete implementations will override this method to do the work of creating the streams. ProcessBuilder publicabstractclassProcessBuilder<TConfiguration>:StreamBuilder<TConfiguration>     where TConfiguration:StreamBuilderConfiguration {     protected ProcessBuilder(TConfiguration configuration) : base(configuration)     {     }       privateList<IRemoteBinding> _bindings;     privateIDisposable _runningBindings;       publicoverridevoid Start(Application cepApplication)     {         _bindings = newList<IRemoteBinding>();         CurrentApplication = cepApplication;         CreateStreams();           var bindingList = _bindings.Skip(1).Aggregate(_bindings.First(),             (current, source) => current.With(source));         _runningBindings = bindingList.Run(Configuration.Name);       }       publicoverridevoid Stop()     {         if (_runningBindings != null)         {             _runningBindings.Dispose();         }     }       protectedIQStreamable<TPayload> CreateStream<TPayload>(string producerName, EventShape eventShape)     {         var producer = Configuration.EventProducers[producerName];         returnRxStream<TPayload>.Create(CurrentApplication, producer.ComponentType, producer.Configuration, eventShape);     }       protectedIQStreamable<TPayload> CreateStream<TPayload>(string producerName, EventShape eventShape,         AdvanceTimeSettings advanceTimeSettings)     {         var producer = Configuration.EventProducers[producerName];         returnRxStream<TPayload>.Create(CurrentApplication, producer.ComponentType, producer.Configuration, eventShape, advanceTimeSettings);     }       protectedvoid Bind<TPayload>(IQStreamable<TPayload> stream, string consumerName, EventShape eventShape)     {         var consumer = GetConsumer(consumerName);         _bindings.Add(stream.ToBinding(CurrentApplication, consumer, eventShape));       }       protectedabstractvoid CreateStreams(); } Creating a new process builder is super-simple a really cuts down on the amount of code that you need to create and bind your streams. Both your events producers and consumers are now named resources – the process builder has no idea what the source or the targets are and it doesn’t need to know. SampleProcessBuilder publicclassSampleProcessBuilder : ProcessBuilder<SampleStreamBuilderConfig> {     public SampleProcessBuilder(SampleStreamBuilderConfig configuration)         : base(configuration)     {     }       protectedoverridevoid CreateStreams()     {           var data = CreateStream<TestDataEvent>("SourceData",                             EventShape.Point,                             Configuration.GetAdvanceTimeSettings());           Bind(data, "Output", EventShape.Point);         var aggregate = from d in data             group d by d.ItemId             into itemGroups             from i in itemGroups.HoppingWindow(TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(2))             selectnew             {                 ItemId = itemGroups.Key,                 Average = i.Avg(e => e.Value)             };         Bind(aggregate, "Aggregate", EventShape.Point);     } } CepStreamBuilder Implementation In keeping with our principle to keep the APIs as close as possible, we’ll also have a CepStreamBuilder that looks and acts the same way as our ProcessBuilder. However, we have a little bit of extra work involved since there is no concept of a Process with multiple inputs and outputs that run as a single unit of execution with the pre-2.1 model, we have a little extra work to do. Prior to the reactive model, the query was the unit of execution – it could have multiple input streams but only one output. Each query was independent of all other queries as well and there was no way to conveniently run related queries together. Reuse of query outputs was done through a technique called Dynamic Query Composition (DQC) that re-enqueued the output of one query back into a new stream, which then got output to another query. If you looked at the details of how this was done, it was a special input/output adapter pair. In essence, it was a subject without the flexibility or extensibility that we have with subjects in the Reactive model. Finally, we’ll also need to take into consideration the potential for name clashes when we name our queries – not something that we needed to worry about when working with the process container – and a way to “group” our related queries together – something the process container does inherently but not something that the query model did for you. QueryBuilder publicabstractclassQueryBuilder<TConfiguration> : StreamBuilder<TConfiguration>     where TConfiguration : StreamBuilderConfiguration {       publicList<Query> _queries;       protected QueryBuilder(TConfiguration configuration) : base(configuration)     {     }       publicoverridevoid Start(Application cepApplication)     {         _queries = newList<Query>();         CurrentApplication = cepApplication;         CreateStreams();           //Start all of the queries.         foreach (var query in _queries)         {             query.Start();         }       }       publicoverridevoid Stop()     {         foreach (var query in _queries)         {             query.Stop();         }         foreach (var query in _queries)         {             query.Delete();         }         _queries = null;     }       protectedabstractvoid CreateStreams();       protectedCepStream<TPayload> CreateStream<TPayload>(string producerName, EventShape eventShape)     {         var producer = Configuration.EventProducers[producerName];         returnCepStream<TPayload>.Create(CurrentApplication,             GetSourceStreamName(producerName), producer.ComponentType,             producer.Configuration, eventShape);     }       protectedCepStream<TPayload> CreateStream<TPayload>(string producerName, EventShape eventShape,         AdvanceTimeSettings advanceTimeSettings)     {         var producer = Configuration.EventProducers[producerName];         returnCepStream<TPayload>.Create(CurrentApplication,             GetSourceStreamName(producerName), producer.ComponentType,             producer.Configuration, eventShape, advanceTimeSettings);     }       protectedvoid Bind<TPayload>(CepStream<TPayload> stream, string consumerName, EventShape eventShape)     {         var consumer = GetConsumer(consumerName);         _queries.Add(stream.ToQuery(CurrentApplication, GetQueryName(consumerName),             GetQueryDescription(consumerName), consumer.ComponentType,             consumer.Configuration, eventShape, StreamEventOrder.FullyOrdered));       }       privatestring GetQueryDescription(string consumerName)     {         return"Query from QueryBuilder [" + Configuration.Name + "] for consumer [" + consumerName + "]";     }       protectedstring GetQueryName(string consumerName)     {         return Configuration.Name + "." + consumerName;     }     protectedstring GetSourceStreamName(string producerName)     {         return Configuration.Name + "." + producerName + ".Stream";     }   } And now, creating a query builder is, like the process builder, super-simple. In fact, we can copy and paste the code from the process building into the query builder. The only difference is the name of the base class. SampleQueryBuilder publicclassSampleQueryBuilder:QueryBuilder<SampleStreamBuilderConfig> {     public SampleQueryBuilder(SampleStreamBuilderConfig configuration) : base(configuration)     {     }       protectedoverridevoid CreateStreams()     {         var data = CreateStream<TestDataEvent>("SourceData",                                         EventShape.Point,                                         Configuration.GetAdvanceTimeSettings());           Bind(data, "Output", EventShape.Point);         var aggregate = from d in data                         group d by d.ItemId                             into itemGroups                             from i in itemGroups.HoppingWindow(TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(2))                             selectnew                             {                                 ItemId = itemGroups.Key,                                 Average = i.Avg(e => e.Value)                             };         Bind(aggregate, "Aggregate", EventShape.Point);     } } Using the Stream Builders Again, this is pretty simple. You create the configuration. You create the builder. You start the builder. You stop the builder. Regardless of whether it’s a QueryBuilder or a ProcessBuilder, the code to do this is the same and it bound to the StreamBuilder base class, rather than the specific implementations. Most of the code is now, actually, in creating the configuration to pass to the builder. Right now this is hard-coded but, in time, it won’t be; we’re pretty close to having all of the core, foundational pieces in place to further abstract even the configuration. RunBuilder privatestaticvoid RunBuilder(Application cepApplication, Type builderType) {     var builderConfig = GetSampleStreamBuilderConfig();     var builder = (IStreamBuilder) Activator.CreateInstance(builderType, builderConfig);     builder.Start(cepApplication);       Console.WriteLine("Builder is running. Press ENTER to stop.");     Console.ReadLine();     builder.Stop(); }   privatestaticSampleStreamBuilderConfig GetSampleStreamBuilderConfig() {     //Create the configuration.     var streamProviderConfig = newSampleStreamBuilderConfig()     {         Name = "Provider1",         CtiDelay = TimeSpan.FromMilliseconds(750),         CtiInterval = TimeSpan.FromMilliseconds(250)     };       //Add the producer.     streamProviderConfig.EventProducers.Add("SourceData", newEventComponentDefinition()     {         Configuration = newTestDataInputConfig()         {             NumberOfItems = 20,             RefreshInterval = TimeSpan.FromMilliseconds(500),             TimestampIncrement = TimeSpan.FromMilliseconds(500),             AlwaysUseNow = true,             EnqueueCtis = false         },         ComponentType = typeof (TestDataInputFactory)     });       //Add the consumer.     streamProviderConfig.EventConsumers.Add("Output", newEventComponentDefinition()     {         Configuration = newConsoleOutputConfig()         {             ShowCti = true,             CtiEventColor = ConsoleColor.Blue,             InsertEventColor = ConsoleColor.Green         },         ComponentType = typeof (ConsoleOutputFactory)     });     streamProviderConfig.EventConsumers.Add("Aggregate", newEventComponentDefinition()     {         Configuration = newConsoleOutputConfig()         {             ShowCti = true,             CtiEventColor = ConsoleColor.White,             InsertEventColor = ConsoleColor.Yellow         },         ComponentType = typeof (ConsoleOutputFactory)     });     return streamProviderConfig; } Wrapping Up We’ve made a good deal of progress with this revision. We focused first on creating abstractions for the producers and consumers – they’re the beginning and the end, after all, of your StreamInsight application – and now added a framework for the chewy middle. I had a couple of questions from folks as to why I made some of the architectural decisions, particularly with having factories for the Reactive-model sinks and producers. Hopefully some of that is a little clearer now but, as we move forward, there will be additional things that we may do with the factories. There are also some changes and tweaks included in this download that aren’t described in the blog, primarily around the serialization/deserialization for the event consumers and producers, that were done while I was building the demo for Baton Rouge Sql Saturday. There’s still some work to do around these to optimize the performance, particularly for the serialization side of the equation, that I’ll get to sometime later. I was, however, quite proud that I was able to shave about 3 ticks off the deserialization performance. Now, this doesn’t sound like much (and it’s not) but when you realize that you may be doing this process tens of thousands of times per second, it adds up really quickly. So, what’s next? There are two key pieces that we’ll need to put into place … abstracting the application hosting mode and then a configuration system. And then lots to build out … common query patterns in Linq macros, some aggregates and operators and building out more of the stream builder classes/model, to name a couple. Oh, and let’s not forget that we’ll need to have some of the basic, common event producers and consumers. Last, but not least, you can download the code from here:

How long did that edge event take?–The Sequel

Code Sample | StreamInsight
This is beginning to become a recurring theme around here. It turns out that there is yet another way to get a composite event that will tell you how long a particular event “took”. This goes back to the initial post, where I was using a subject to create a new event that included the original payload but with a Duration property, allowing you to get the total milliseconds for a specific event. You see, subjects have become one of my mostest favoritest features with the whole Reactive model in StreamInsight 2.1; there’s just so much that they let you do that you couldn’t do before. But this particular scenario was possible without subjects and it really should have smacked me in the face. Rather than using a subject, we can use a UDSO (specifically, an edge stream operator) to achieve the exact same thing but with fewer moving parts. Oh … and using a UDSO doesn’t create an independent timeline, whereas using the Subject does create a new timeline in the process. Overall, the UDSO is far simpler, easier to understand and you don’t have to worry about any CTI violations or funkiness thereabouts. With that, here’s the operator: EventDurationOperator public class DurationEvent<TPayload>{     public TPayload Payload;     public double TotalMilliseconds; }   [DataContract] public sealed class EventDurationOperator<TPayload> : CepEdgeStreamOperator<TPayload, DurationEvent<TPayload>> {    public override bool IsEmpty    {        get { return true; }    }      public override DateTimeOffset? NextCti    {        get { return null; }    }      public override IEnumerable<DurationEvent<TPayload>> ProcessEvent(EdgeEvent<TPayload> inputEvent)    {        if(inputEvent.EdgeType == EdgeType.End){             //Create the new duration event.             yield return new DurationEvent<TPayload>{                 Payload = inputEvent.Payload,                 TotalMilliseconds = (inputEvent.EndTime - inputEvent.StartTime).TotalMilliseconds             };         }    } } As you can see, the output here is identical to the output from the previous solution but the code is far, far simpler. Using the operator is simpler than the subject as well. Using the Operator var source = items.ToEdgeStreamable(e => e.GetEvent(startTime), AdvanceTimeSettings.IncreasingStartTime); var results = source.Scan(() => new EventDurationOperator<DataItem>()); That makes the operator, rather than the subject, the winner here … it keeps true to that Golden Rule of Programming – the KISS principle. (That’s Keep It Simple Stupid, in case you were wondering.)

Houston TechFest

Code Sample | Community | Events
Wow … another year, another Houston TechFest is over. It was absolutely awesome, once again … Mike Steinberg has really built this into one heck of an event. It was good to see such a HUGE turnout of techies for a great day of learning and networking. It was especially exciting to talk with some of the kids … future programmers … who were also in attendance! Here’s the links to my sessions: Introducing StreamInsight 2.1. Back to Basics: .NET Essentials Thanks to everyone who came and especially those that showed up at my sessions to hear me ramble.

Multiple Timelines From a Single (Input) Source

StreamInsight | Code Sample
This is an interesting challenge that I’ve come across a couple of times – the last time being on the StreamInsight Forum. How do you handle the case where you have a single inbound event source but multiple timelines for the events? If you are wondering how this can happen, it’s actually pretty simple – you’ll have some kind of event aggregator that combines events from multiple downstream sources and each of these sources has different timestamps – due to unsynchronized system clocks at the event source – and/or different latencies – due to a store/forward/batch update type of system. “But,” you are saying, “you can just hook into the events at the initial source and then you don’t have any problem.” Sure … if you can. But that’s not always possible for any number of reasons. In Oil & Gas, probably the most common reason is that you don’t own the source system and you get your events (and sensor readings) from another company that actually operates the equipment. Or, even worse, a service provider company that isn’t even the operator of the equipment but only provides data services. And yes, this is very common. What you’ll see in this scenario is a “loss of events”. You do get output but it’ll be from the one source with the “latest” timeline. Everything else gets dropped. Now … you will only see this if you are using declarative CTIs with AdvanceTimeGenerationSettings. If you are enqueuing your CTIs in your source, you’ll get CTI violation exceptions. And specifying Drop for the AdvanceTimePolicy doesn’t help with point events; Drop only affects interval events and only interval events where the event duration extends past the last-issued CTI. So how do you deal with this situation? First, we need to get the initial events in. They have different timestamps and we know that. To simulate this, we’ll have an event source that enqueues items every second. It implements IObservable, of course, so we can easily use it as an event source. The event generation looks like the following: Data Source for (int i = 0; i < 5; i++) {       var evt = new SourceEventData()     {         SourceTimeUtc = DateTimeOffset.Now.AddHours(i).UtcDateTime,         SourceId = i,         ItemId = "Item" + i.ToString(),         Value = i * (_rnd.NextDouble() * 10)     };     PublishEvent(evt);      } So … each “source” has a unique identifier. You will need to have some kind of unique identifier (besides the timestamp) to identify the source. If, for some reason, it’s not available on the initial event, you’ll still have some kind of ID that identifies the item … you can then use a reference stream to identify the source/target timeline for the item. In this sample, each source is offset from the current time by 0 to 4 hours … so we have multiple timelines. Now, let’s create a stream from this: Aggregator Stream var eventAggregatorSource = cepApplication.DefineObservable(() => new StreamSource()); var eventAggregatorSourceStream =     eventAggregatorSource.ToPointStreamable(e => PointEvent<SourceEventData>.CreateInsert(DateTimeOffset.UtcNow, e),     AdvanceTimeSettings.IncreasingStartTime); In this case, we are using the event arrival time for the point event start time. In our sample, we’re not doing anything with this stream except publishing it but we can do some analytics here. For example, you might want to have a query that detects when you aren’t getting events from one of your sources. Or you could include reference data that helps you “reset” the tagged system time with the real time. For example, if you know that one of our sources has a system clock that is 3 days behind, you can add that to the SourceTimeUtc to handle that. One caveat … if you want to be able to detect if the entire source stream is offline, you’ll need to add CTIs to the stream from a timer rather than generating them. When generating CTIs, new CTIs are generated only when you have events flowing through. So when you don’t have any events (the whole thing is offline), you won’t get any offline alerts. So … we have a single stream with timestamps by arrival time. For our analytics, we need to have them in the timeline of the underlying source of the data. So … how do we do this? It’s easy … you use a subject. We’ll take the original source stream, enqueued with the arrival time and publish to a subject. Then, we’ll subscribe to the subject for each of our source id’s. This will be the stream in the source system’s timeline and each timeline will be completely independent of other timelines. Once that’s done, we’ll define and then deploy an observable that will filter by a specific source id. Deploying the observable allows us to easily reuse it without needing to define it again. Note, however, that it’s not required to deploy an observable and it doesn’t always make sense to. But since this is something that we’ll reuse over and over again, it makes absolute sense to do it. Subject + Deployed Observable var publishSubject = cepApplication.CreateSubject("EventSourceSubject",     () => new Subject<SourceEventData>());   eventAggregatorSourceStream.Bind(publishSubject).Run("EventAggregatorProcess");   var sourceObservable = cepApplication.DefineObservable((int sourceId) =>     cepApplication.GetSubject<SourceEventData, SourceEventData>("EventSourceSubject")         .Where(e => e.SourceId == sourceId)); //Deploy the observable; we can just use this ... sourceObservable.Deploy("EventsForSourceId"); To wrap it all up, all we need to do is now get our deployed observable, invoke the method and then convert to a point stream. When we create the point stream, we’ll use the SourceTimeUtc property as the start time for the point, converting it to the timeline of the downstream source. Event Source Stream private static void CreateStreamForEventSource(Application cepApplication, string observableId, int sourceId) {     var sourceStream = cepApplication.GetObservable<int, SourceEventData>(observableId)         .Invoke(sourceId)         .ToPointStreamable         (e => PointEvent<EventData>.CreateInsert(new DateTimeOffset(e.SourceTimeUtc), (EventData) e),             AdvanceTimeSettings.IncreasingStartTime);     sourceStream             .Bind(cepApplication.DefineObserver(() => Observer.Create<PointEvent<EventData>>(WriteOutput)))             .Run("EventStream" + sourceId.ToString());   } At the end of it, we have a single process for the source events coming from our event aggregator. This process is synched to the local system clock and uses event arrival time for the event timestamp. Then, we have a process for each of our individual sources and each of those processes as a completely different, independent timeline … each process has its own clock. This is probably one of the clearest examples that I’ve seen of what’s meant by “application time” … something that I found to be a difficult concept to fully get my head around when I first started doing StreamInsight way back long ago. You can download the sample project that shows all of this and the multiple timelines from my SkyDrive. Enjoy!

SQL Saturday Baton Rouge Presentations

Code Sample | Community | Events
It was absolutely awesome to be back in Baton Rouge for Sql Saturday. I do miss going out there … used to be I was out there regularly but now – well, not so much. I did already post them on the Sql Saturday site but for anyone else that’s interested, I’m posting them here. The first is not a new one but always a popular one … Back to Basics: .NET Essentials That You May Have Forgotten. I’ll also be doing this one at Houston TechFest in September. The other one is new: StreamInsight In Action: Real-Time Web Monitoring. This was inspired by a project that I was recently on where we were doing performance analysis and testing. Here was the challenge … we needed to know exactly which pages were executing (and what some of the form post parameters were) when we had spikes in various performance counters. Unfortunately, load testing tools won’t give you this level of detail. And trying to synchronize web logs and perfmon logs is an exercise in futility. And that’s where StreamInsight comes in … since we can take multiple sources of data and synchronize them in time, it’s actually pretty easy to do something like this in StreamInsight. And that’s just the tip of the iceberg; once you start, there’s a lot of other things that you can do as well. For example, if you are analyzing web farm performance, you can get early warning if one of the servers in the farm is starting to “go off the reservation”. Or if your servers are starting to perform poorly or are just beginning to get over-capacity. It’s all about enabling you to be more proactive in managing and operating your web site(s) rather than waiting for uncomfortable emails or calls. This also has a little WinForms/Grid sink so you can see (exactly) the different events as they are output.

How long did that edge event take–Reprising the Reprise

Code Sample | StreamInsight
So … I just posted a little bit ago. I thought that the solution that I came up with was pretty decent … and certainly an alternative to the subjects … but I still wanted to be able to determine a dynamic timeout based on payload. There had to be a way to do that so I put that whole thought process on a background thread while I did other stuff. And when it came to me … the solution was pretty simple, actually. In fact, this is a technique very similar to the standard queries that we use to detect offline sensors but with a slightly different twist. Here’s what you do: rather than shifting the event start time (as we did before), you simply alter the duration of the source edge event. You see, when edge events are enqueued but don’t have an end, the event in the engine has an end time of DateTimeOffset.MaxValue (you can see this in the Event Flow Debugger). When the end edge is enqueued, the initial item is retracted and the new one is inserted in the stream. Now, we used this behavior in the previous example to do our join to see which events still running after their timeout expired. But we can twist this a bit … rather than shifting the event time, we can set the event duration to a specified end time. AlterEventDuration, unlike ShiftEventTime, provides access to the payload, allowing us to pass in a timeout duration as a part of the payload. We can then do a Left Anti-Semi Join between the source stream and the altered stream to see which items in the source stream are no longer in the altered stream. This then gives us the events that have exceeded their allowed duration based on the payload. So … here’s the LinqPad sample: LinqPad Sample void Main() {     DateTimeOffset startTime = new DateTimeOffset(2013, 7,1,12,0,0,TimeSpan.FromHours(-6));     var items = Application.DefineEnumerable(() => new SourceItem[]{             new SourceItem("Item1", 4, 0),             new SourceItem("Item2", 6, 0),             new SourceItem("Item3", 3, 2),             new SourceItem("Item1", 4, 0, 3),             new SourceItem("Item4", 5, 3),             new SourceItem("Item2", 6, 0, 4),             new SourceItem("Item3", 3, 2, 8),             new SourceItem("Item4", 5, 3, 10),                      });          var source = items.ToEdgeStreamable(e => e.GetEvent(startTime), AdvanceTimeSettings.IncreasingStartTime);          //AlterEventDuration works with edges ... it sets an end to the edge.     //With AlterEventDuration, we get the payload (unlike ShiftEventTime)     //We'll use the payload as our max allowed duration     var altered = source.AlterEventDuration(e => TimeSpan.FromMinutes(e.Payload.Value));          //Now, let's get the ones "taking too long".     //What we want are src items *with no match* in the altered     //This will give us edges that are still "live"     //but their max allowed duration has passed.     var tooLong = from src in source.LeftAntiJoin(altered, (s, a) => s.Id == a.Id)                   select src;     //Output as the different event shapes.     //Note the relationship between the event times and the CTIs.     //Point is a good option.     tooLong.ToPointEnumerable().Dump("Timed Out Items - Point");     //Interval isn't a good option. Doesn't get output until the end.     tooLong.ToIntervalEnumerable().Dump("Timed Out Items - Interval");     //Edge is also a good option. You get the start and the end.     tooLong.ToEdgeEnumerable().Dump("Timed Out Items - Edge");      } // Define other methods and classes here class SourceItem:DataItem {     public SourceItem(string id, int value, int startTimeOffset){         Id = id;         Value = value;         StartTimeOffset = startTimeOffset;         EndTimeOffset = null;     }     public SourceItem(string id, int value, int startTimeOffset, int endTimeOffset){         Id = id;         Value = value;         StartTimeOffset = startTimeOffset;         EndTimeOffset = endTimeOffset;     }     public EdgeEvent<DataItem> GetEvent(DateTimeOffset startTime){         if(!EndTimeOffset.HasValue){             return EdgeEvent<DataItem>.CreateStart(startTime.AddMinutes(StartTimeOffset),                 this);         }         return EdgeEvent<DataItem>.CreateEnd(startTime.AddMinutes(StartTimeOffset),                 startTime.AddMinutes(EndTimeOffset.Value),                 this);     }     public int StartTimeOffset;     public int? EndTimeOffset; } class DataItem{     public string Id;     public int Value; } I do love it when a query comes together …

How long did that edge event take–Reprise

StreamInsight | Code Sample
In my last post, I talked about how to get the duration of an edge event using subjects. The only challenge with that is you don’t get the result until the end of the edge. While this does allow you to do very interesting things … for example, determine which events took longer more than 2 standard deviations greater than the average time … it doesn’t give you notification as soon as the item starts to take too long. That’s because it can’t calculate the duration until the end of the edge. But it’s also useful to get the notification as soon as an edge goes over a specific event duration. This is actually easier to do than I initially thought; I must have been having a brain-fart moment. (They’re rare but they do happen). So, this morning, I quickly whipped up a sample in LinqPad that does exactly this … as soon as an edge event goes over a pre-configured duration, you get notification. Now, while you can’t do anything really cool like dynamically determining the timeout from the event payload, that’s not always necessary. So it’s another option, just with different limitations and uses. LinqPad Sample void Main() {     DateTimeOffset startTime = new DateTimeOffset(2013, 7,1,12,0,0,TimeSpan.FromHours(-6));     SourceItem[] items = new SourceItem[]{             new SourceItem("Item1", 5, 0),             new SourceItem("Item2", 6, 0),             new SourceItem("Item3", 8, 2),             new SourceItem("Item1", 5, 0, 3),             new SourceItem("Item4", 2, 3),             new SourceItem("Item2", 6, 0, 4),             new SourceItem("Item3", 8, 2, 8),             new SourceItem("Item4", 2, 3, 10),                      };          var source = items.ToEdgeStream(Application, e => e.GetEvent(startTime), AdvanceTimeSettings.IncreasingStartTime);          //Shift the start time of the events to our timeout value.     //In this case, our timeout is 5 minutes; any event that     //has a duration longer than 5 minutes should be output to the sink.     var shifted = source.ShiftEventTime(e => TimeSpan.FromMinutes(5));          //Join the shifted items back to the source.     //The join will be applied only when the shifted time     //overlaps the original edge event.     //If the edge event has an end before the shifted event     //it won't be output to the sink as the join won't be applied.     var tooLong = from src in source                   from shft in shifted                   where src.Id == shft.Id                   select shft;     //Output as the different event shapes.     //Note the relationship between the event times and the CTIs.     //Point is a good option.     tooLong.ToPointEnumerable().Dump("Timed Out Items - Point");     //Interval isn't a good option. Doesn't get output until the end.     tooLong.ToIntervalEnumerable().Dump("Timed Out Items - Interval");     //Edge is also a good option. You get the start and the end.     tooLong.ToEdgeEnumerable().Dump("Timed Out Items - Edge");      } // Define other methods and classes here class SourceItem:DataItem {     public SourceItem(string id, int value, int startTimeOffset){         Id = id;         Value = value;         StartTimeOffset = startTimeOffset;         EndTimeOffset = null;     }     public SourceItem(string id, int value, int startTimeOffset, int endTimeOffset){         Id = id;         Value = value;         StartTimeOffset = startTimeOffset;         EndTimeOffset = endTimeOffset;     }     public EdgeEvent<DataItem> GetEvent(DateTimeOffset startTime){         if(!EndTimeOffset.HasValue){             return EdgeEvent<DataItem>.CreateStart(startTime.AddMinutes(StartTimeOffset),                 this);         }         return EdgeEvent<DataItem>.CreateEnd(startTime.AddMinutes(StartTimeOffset),                 startTime.AddMinutes(EndTimeOffset.Value),                 this);     }     public int StartTimeOffset;     public int? EndTimeOffset; } class DataItem{     public string Id;     public int Value; } When looking at the output, note where the events sit between the CTIs … that shows, without question, that the event is released as soon as the application time moves past the relevant timeout. You do need to use Point events for this; if you use an Interval, the final event won’t be output until the end is reached, which would defeat the purpose of what we’re trying to do. You could use Edges as output; in this case, you’d get a start as soon as it “takes too long” and then an end when the end edge for the original item in enqueued into the stream. With this, you could then feed it into the subject from my previous post and determine how much longer it took than the timeout, should that be interesting to you.

How long did that edge event take?

Code Sample | StreamInsight
With StreamInsight, you have three different event shapes – point, interval and edge. With a point, the duration is easy; it’s a single tick. Intervals are also easy since you know both the start and end times when you enqueue them. But what about edge events? Edge events are different because you know the start time, but not the end time, when you enqueue them. As I’m working on my presentation for Sql Saturday Baton Rouge, it occurred to me that this would be a very interesting bit of information to know. However, with just queries, it was difficult to do. (I say difficult; that doesn’t mean that it can’t be done, I just couldn’t figure out how.) But … once again, subjects come to the rescue. Since a subject acts as a sink and a source, I can attach it to the edge stream, evaluate the events as they are published and, when an end comes along, calculate the duration and then publish that to another stream. Even if your inbound events aren’t edges, remember that you can jump between the different shapes by manipulating your event’s temporal headers easily enough. So here goes. First, let’s declare the class and implement ISubject. I’ve kept it so this subject can be used with any payload class. Class Declaration public class EdgeEventDurationSubject<TPayload> : ISubject<EdgeEvent<TPayload>, PointEvent<EventDuration<TPayload>>> {       private List<IObserver<PointEvent<EventDuration<TPayload>>>> _observers =         new List<IObserver<PointEvent<EventDuration<TPayload>>>>(); } You’ll see that it publishes point events of type EventDuration<TPayload>. This generic event allows us to “hook up” to any stream and then publish the event duration with the payload. The query writer can then join this back to the original stream (if desired) or just continue on; after all, they have the entire payload. EventDuration Class public class EventDuration<TPayload> {       public double Duration { get; set; }       public TPayload Payload { get; set; } } Note that we support a list of observers – so we can publish to multiple sinks. Next, let’s implement the IObservable side of the subject. IObservable Implementation public IDisposable Subscribe(IObserver<PointEvent<EventDuration<TPayload>>> observer) {     lock (this)     {         if (!_observers.Contains(observer))         {             _observers.Add(observer);         }         return Disposable.Create(() => RemoveObserver(observer));     } }   private void RemoveObserver(IObserver<PointEvent<EventDuration<TPayload>>> observer) {     lock (this)     {         if (_observers.Contains(observer))         {             _observers.Remove(observer);         }     } } Note that we are cheating a bit and using the Disposable.Create from the Reactive Extensions. But it’s easy and it works well so I don’t feel a bit bad about it. Now, for the interesting part – implementing the IObserver side of the subject. We know that we’ll be consuming edge events because of our implementation of ISubject and all of our work will be done in the OnNext method. OnNext public void OnNext(EdgeEvent<TPayload> value) {     if (_observers.Count == 0)     {         //No one is listening.         return;     }     if (value.EventKind == EventKind.Cti)     {         PublishEvent(PointEvent<EventDuration<TPayload>>.CreateCti(value.StartTime.AddTicks(1)));         return;     }       if (value.EdgeType == EdgeType.End)     {         var payload = value.Payload;         var result = new EventDuration<TPayload>         {                          Duration = (value.EndTime - value.StartTime).TotalMilliseconds,             Payload = payload,         };         PublishEvent(PointEvent<EventDuration<TPayload>>.CreateInsert(value.EndTime.AddTicks(-1), result));     }      }   private void PublishEvent(PointEvent<EventDuration<TPayload>> newEvent) {     lock (this)     {         foreach (var observer in _observers)         {             observer.OnNext(newEvent);         }     } } We take the CTI’s and simply republish them, adding a tick to them. This pretty much guarantees that our CTIs will be after any events that we publish. If it’s not a CTI, it’s an Insert so we check the edge type. Only End edges have an end time and, besides, that the idea behind edges … you don’t the end when you know the start. So we have to wait for the end to determine the duration. That’s easy enough … EndTime – StartTime, get the total milliseconds (because TimeSpan isn’t valid in a StreamInsight stream) and publish the event to the observers. By setting the point’s start time to a tick less than the edge event’s end time, we make sure that we can join back to the original stream without shifting if we so desire. Using this in our queries is very simple. OnNext var timeEvalSubject = cepApplication.CreateSubject("TimeEval",     () => new EdgeEventDurationSubject<WebRequestPayload>());   var requestExecutionTimeStream = timeEvalSubject.ToPointStreamable(e => e  ); All you need to do now is make sure that the subject is bound to the other streamable bindings using With so they all run in the same process.

Dual Mode Data Sinks–Part II

StreamInsight | Code Sample
Well, it’s been a while. Far too long, in fact. I’ve been on a project that’s been pretty intense has consumed much of my time. So … my apologies for taking so long to continue. With that out of the way, let’s pick up with our data sinks. There were two major things left undone with the last post. First, the API. I left it without an abstracted, cleaner API that was similar to the adapter model. That’s the first thing to add. Now, we don’t want to go “all the way” with this and produce a runnable process – that would defeat one of the coolest features of a process – the ability to contain multiple sources and sinks on one clean block that starts and stops as a whole. This also allows us to send individual queries to multiple sinks (without DQC or subjects) and makes reuse of the sources across multiple sinks simpler than in the adapter model with DQC. And, to keep in consistent with the adapter model, we want to attach our sink to the stream directly from the IQStreamable interface – and this means extension method. (I’ll confess that I have come to deeply love extension methods, by the way … I really like the clean API that they create). Our extension method will encapsulate creating the observer and then binding the stream to the observer. So that we can include multiple bindings within a process, we’ll just return the streamable binding that we create. Our extension method looks like this: ToBinding Extension Method public static IRemoteStreamableBinding ToBinding<TPayload>(     this IQStreamable<TPayload> stream,     Application cepApplication,     Type consumerFactoryType,     object configInfo,     EventShape eventShape) {     var factory = Activator.CreateInstance(consumerFactoryType) as ISinkFactory;       if (factory == null)     {         throw new ArgumentException("Factory cannot be created or does not implement ISinkFactory");     }       switch (eventShape)     {         case EventShape.Interval:             var intervalObserver =  cepApplication.DefineObserver(() => factory.CreateIntervalObserverSink<TPayload>(configInfo));             return stream.Bind(intervalObserver);                      case EventShape.Edge:             var edgeObserver = cepApplication.DefineObserver(() => factory.CreateEdgeObserverSink<TPayload>(configInfo));             return stream.Bind(edgeObserver);                      case EventShape.Point:             var pointObserver = cepApplication.DefineObserver(() => factory.CreatePointObserverSink<TPayload>(configInfo));             return stream.Bind(pointObserver);                      default:             throw new ArgumentOutOfRangeException("eventShape");     }      } We won’t call it ToQuery(), doing so, while consistent, wouldn’t be descriptive at all. A binding that then gets run in a process is an inherently different thing from a query. With the adapter model, a query is a single runnable unit of logic; a binding is not. A binding can only run within the context of a process – it is the process, not the binding, that is the closest corollary to the query. But, the naming and the arguments are similar enough. Now, when we want to bind a stream to a sink, it looks like this: Binding a Stream to a Sink var sinkConfig = new ConsoleOutputConfig()     {         ShowCti = true,         CtiEventColor = ConsoleColor.Blue,         InsertEventColor = ConsoleColor.Green     };   var binding = data.ToBinding(cepApplication, typeof (ConsoleOutputFactory), sinkConfig, EventShape.Point); binding.Run("Hello"); You’ll also notice, if you haven’t noticed before, that this API forces you to use the entire event, not just the payload, when sending to the sink. While you can just send the payload to the sink and ignore the event itself, that seems quite pointless and can also cause quite a bit of confusion. I’ve seen a lot of folks put timestamps in the payload; I typically don’t. In the vast majority of cases, I rely on – and use – the timestamps on the event itself. These are the timestamps that actually matter; these are the timestamps that the StreamInsight engine is using when evaluating the event. These are the timestamps of the event within the application timeline. Anything in the payload is just an attribute of the event; the start time and end time are a key part of the definition. Also, if you only send the payload to your sink, you won’t get any CTIs at all. CTIs are kinda important, I think, for a sink for a couple of reasons. First, even if you have no events coming through but you have CTIs, you know that the engine is running and pumping data through. This is particularly useful when you aren’t getting any data at all in your sink. Without the CTIs, you won’t have a good idea if your query is actually getting processed through the engine or if you have some other error in your logic. Second … CTI’s let you know when you can/should write to a durable store in a batch. Here’s the thing – Insert events don’t get released to your sink until there is a CTI. That CTI tells you, also, that all of the events up to that point in the application time have been released to your sink. So that’s the perfect time to batch up any writes that you have to a durable store (say, Sql Server) and write them in one shot. Batched updates/inserts are going to scale far better than single writes and that’s always a very good thing in a StreamInsight application. You always need to remember that, in many cases, that sink is going to be the biggest potential bottleneck in the application because it usually involves some sort of I/O. And that I/O is always going to be slower than the raw CPU and memory-bound performance that you can get from your queries. The next thing on our list is a bit tougher. I mentioned it in my previous post and it’s the one thing that I really didn’t like about the entire reactive model when I first saw it. You see, I’m a big fan of untyped adapters, especially output adapters. Yes, they are pretty useful on the input side as well but on the output they are absolutely essential. You can’t always know what your payload is going to look like and you don’t want to be writing a sink for every different payload that you dream up. For input … in a lot of cases, you can do OK since you’ll have a good idea of what the schema is going to look like. This is something that also came up recently on the StreamInsight forum so I know I’m not the only one that misses it. Fortunately, the .NET Framework gives us a way to make this happen. It’s more work than a simple untyped adapter but, with reflection, we can get the same kind of flexibility that we had with untyped adapters in our sinks. What we want to do is to have, at the end, something similar to what an output adapter provides for us – name/value pairs where nested classes have a [ParentProperty].[ChildProperty] type name. While we’re at it, we’ll simplify it a bit; it’ll be a simple, straightforward dictionary with the name and the value, rather than having a separate event definition and then matching the index with the name, as in an untyped adapter. At the highest level, we’ll have a method called "GetPropertyValues” that takes the object and handles all of the details for us. Of course, it’ll be an extension method. GetPropertyValues /// <summary> /// Returns the values of properties as name/value pairs. /// </summary> /// <param name="source">The object to read properties from</param> /// <returns></returns> public static Dictionary<string, object> GetPropertyValues(this object source ) {     var propertyDictionary = new Dictionary<string, object>();     //Get all of the properties.     AppendPropertyValues(source, string.Empty, propertyDictionary);     return propertyDictionary;   } The real work is in the AppendPropertyValues method. This takes the dictionary, the object and a property name prefix and appends the properties and their values to the dictionary. To do this, first we get a list of the public instance properties on the object. From there, we loop over them. If one of them is a class, we then recurse into AppendPropertyValues but adding the source property name as the prefix. After that, we also get the fields with the same binding flags; the GetProperties method won’t return the fields and there isn’t a single reflection method that I could find that would give me both in a single call. AppendPropertyValues private static void AppendPropertyValues(object source, string prefix, Dictionary<string, object> propertyDictionary) {     var properties = source.GetType().GetProperties(BindingFlags.Public | BindingFlags.Instance);     foreach (var propertyInfo in properties)     {         if (_validProp.Contains(propertyInfo.PropertyType))         {             var method = propertyInfo.GetGetMethod();             object value = method.Invoke(source, null);             propertyDictionary.Add(prefix + propertyInfo.Name, value);         }         else if(propertyInfo.PropertyType.IsClass)         {             var method = propertyInfo.GetGetMethod();             object value = method.Invoke(source, null);             AppendPropertyValues(value, prefix + propertyInfo.Name + ".", propertyDictionary);         }     }     var fields = source.GetType().GetFields(BindingFlags.Public | BindingFlags.Instance);     foreach (var fieldInfo in fields)     {         if (_validProp.Contains(fieldInfo.FieldType))         {             object value = fieldInfo.GetValue(source);             propertyDictionary.Add(prefix + fieldInfo.Name, value);         }         else if (fieldInfo.FieldType.IsClass)         {             var value = fieldInfo.GetValue(source);             AppendPropertyValues(value, prefix + fieldInfo.Name + ".", propertyDictionary);         }     } } With this now in place, we have the same capabilities in our sinks that we had in our output adapters. Using this, we can change our ConsoleDataConsumer to display the payload properties, rather than just some basic header info. Code Snippet if (outputEvent.EventKind == EventKind.Insert) {     Console.ForegroundColor = Configuration.InsertEventColor;          var eventValues = outputEvent.Payload.GetPropertyValues();     StringBuilder output = new StringBuilder(2048);     foreach (var eventValue in eventValues)     {         output.Append(eventValue.Key).Append(":").Append(eventValue.Value.ToString()).Append("\t");     }     Console.WriteLine("Insert Event Received at " + outputEvent.StartTime + "\t" + output.ToString()); } else if (Configuration.ShowCti) {     Console.ForegroundColor = Configuration.CtiEventColor;     Console.WriteLine("CTI event received at " + outputEvent.StartTime); } Console.ResetColor(); Now … we could certainly do some optimizations on this. For example, we could cache the property names and definitions for each of the types. I’ll leave that as an exercise for later … or for you, dear blog reader. You can download the solution from my SkyDrive. As you look, you’ll also see that I’ve done some refactoring on the project names and namespaces …