Ruminations of J.net idle rants and ramblings of a code monkey

RealTime Web Analytics Presentation And Demo From HDNUG

Code Sample | StreamInsight
Here’s all of the materials from the presentation that I did at the Houston .NET User’s Group on March 13. Some of the things that I updated from the version for Baton Rouge Sql Saturday include: Lock around active requests HashSet: Added a lock block around adding and removing items from the _activeRequest HashSet. Since the HashSet isn’t thread-safe and our source is very highly multi-threaded, we need to make sure that operations that modify the internal array are thread-safe. This eliminated some random “IndexOutOfBoundsException” in the source that would halt StreamInsight process. Checks in StandardDeviation UDA: Added checks for the number of values as well as the result. If the number of values in the set for the standard deviation is less than one, the standard deviation is always 0. Also, after the calculation, there’s an additional check on the result to make sure it’s not NaN. This eliminated some random exceptions in the queries that were calculating the standard deviation that would halt the StreamInsight process. Both cases highlight the need to make sure that your custom code running in StreamInsight is tight and solid. They were pretty difficult to track down as well … both would happen randomly. Intellitrace was absolutely essential to identifying and resolving the issues. After fixing them, I was able to run for hours without a problem. Notes to reproducing the demo: I’m not including the site that I used when running the demo. You can get this from the NopCommerce site on CodePlex. I used the out-of-the-box site with sample data. Keep in mind that the module used for the demo forces some of the requests to take 3 seconds – these are our “Bad Actors” – so it’s in no way representative of the performance of nopCommerce. You’ll need to set it up on a local site on port 81 if you want to use the Visual Studio load tests. From there, you need to copy the WebSiteMonitor.Contracts and WebSiteMonitor.Module assemblies into the \bin folder of the site and add the following into the web.config: Under system.WebServer, add the module Code Snippet <modulesrunAllManagedModulesForAllRequests="true">   <addname="MonitorModule"type="WebSiteMonitor.Module.WebSiteMonitorHttpModule"/> </modules> Under system.ServiceModel, add the WCF configuration Code Snippet <system.serviceModel>   <serviceHostingEnvironmentaspNetCompatibilityEnabled="true"multipleSiteBindingsEnabled="true" />     <bindings>     <netTcpBinding>       <bindingname="streamedNetTcpBinding"transferMode="Streamed" />     </netTcpBinding>   </bindings>   <client>     <endpointaddress="net.tcp://localhost/eventService"binding="netTcpBinding"       bindingConfiguration="streamedNetTcpBinding"contract="WebSiteMonitor.Contracts.IRequestService"       name="Client" />   </client> </system.serviceModel> You may (probably will) need to specify the StreamInsight 2.1 instance name that you are using. This is in the app.config for the WebSiteMonitor.WinForms project under “applicationSettings”. The setting name is “StreamInsightInstance” (very creative, I know). You’ll want to run the client app “As Administrator” or reserve the URLs for non-administrator users and accounts. If you are running from Visual Studio, run Visual Studio as Administrator. I tend to run as Administrator when testing and running the demo. In the real-world, you’d reserve the URLs. The TestWebSite project in the solution is a “New Web Site” template from Visual Studio and helps make sure that everything is set up properly. It also has the configuration settings.

Decoupling Queries from Input and Output

Code Sample | StreamInsight
There’s something that really disturbs me about how all of the StreamInsight samples are put together – everything is very, very tightly coupled. Streams are tightly bound to the source and the target of the data and there’s no good guidance on how to break all that stuff apart. I understand why – it certainly simplifies things – but that’s Bad Mojo™ and not how we want to build enterprise applications. Let’s take a simple scenario. Let’s say that you are developing queries to detect key events for, say, an offshore well. You have a good idea what these events look like – what follows what – and you even have some recordings that you can play back at high speed to test your algorithms. All of your query code is built much like the demos are, so you’re pretty tightly bound to the source (the database) and your output (the console, just for giggles). Now … how do you hook this up to your production sources? And you certainly won’t be using a console output for the events that you detect in production. So do you go in, real quick-like, and change all of your code before compiling it for production? If so … let me know how that goes for you. And don’t call me when things start falling apart while you are doing last-minute compiles and emergency deployments and the like. This, however, is how all of the samples are written. Granted … they are samples and aren’t meant to be “ready for the real-world”. But, sadly, there’s precious little guidance out there on a better way to do this in StreamInsight. With that said, however, this isn’t a new problem in software development. Oh, sure, the technology may be new and/or different, but the pattern of the problem certainly isn’t. What we need to do is abstract the specification of the consumer or producer from its usage when creating the streams. Fortunately, we’ve already defined a pretty well abstracted factory pattern for constructing our producers and consumers, even in the context of Reactive-based streams so this helps (and was part of the method to my madness!). In addition to abstracting the producers and consumers, we also need to have a pattern for reuse of query logic. Take this scenario as an example: we’re monitoring offshore platforms in real-time. We have a set of queries that we use to process the sensor readings from the platforms and this logic should be applied to each of the platforms – it detects the key events that we’re concerned with. The only difference between each set of queries is the source of the data (different connections), the metadata associated with the queries and, possibly, the outputs for detected events (though, using subjects, we can aggregate the output events for a single output stream). Enter the Stream Builder This becomes our unit of query re-use. Using a defined interface and implementing classes based on this interface, we can encapsulate the query logic into a reusable “chunk” that gets built together. We can use this to start and stop the queries/process (depending on which model we are using) as well as to provide configuration for different parameters and the producers and consumers. Let’s start with the base class for configuration. StreamBuilderConfiguration publicclassStreamBuilderConfiguration:IConfiguration {     privatereadonlyDictionary<string, EventComponentDefinition> _eventProducers =         newDictionary<string, EventComponentDefinition>();       publicIDictionary<string, EventComponentDefinition> EventProducers     {         get { return _eventProducers; }     }       privatereadonlyDictionary<string, EventComponentDefinition> _eventConsumers =         newDictionary<string, EventComponentDefinition>();       publicIDictionary<string, EventComponentDefinition> EventConsumers     {         get { return _eventConsumers; }     }       publicstring Name     {         get; set;     }       publicvirtualEventComponentDefinition DefaultConsumer     {         get         {             returnnewEventComponentDefinition(){                 ComponentType=typeof(NullDataConsumerConfig),                 Configuration=newNullDataConsumerConfig()};         }     } } First, you’ll notice that we define a dictionaries of EventComponentDefinitions. What is this? Well, keep in mind that we need a factory and a configuration to create our producers and consumers. So … this is what the EventComponentDefinition class encapsulates. EventComponentDefinition publicclassEventComponentDefinition {     publicobject Configuration { get; set; }     publicType ComponentType { get; set; } } In this case, the type for “ComponentType” is the producer/consumer factory class. So … now we have a way to abstract the definition of the consumers and producers (in the configuration) as well as a way to find them. In case you haven’t guessed yet, this provides an inversion of control that uses the dictionary lookup to locate the appropriate service. Now, the producer and/or consumer must still handle the type of the payload for the stream and we don’t have anything (yet) that checks and/or ensures that this is actually correct and compatible but we now have a contract for specifying these items and a contract for creation. Finally, so that we have an interface that we can bind to without having to worry about generics, we’ll extract the Start and the Stop methods into an interface. It’s all nicely abstracted now and ready for  us to create our stream builder. StreamBuilder publicinterfaceIStreamBuilder {     void Start(Microsoft.ComplexEventProcessing.Application cepApplication);       void Stop(); }   publicabstractclassStreamBuilder<TConfiguration> : IStreamBuilderwhere TConfiguration:StreamBuilderConfiguration {       protected StreamBuilder(TConfiguration configuration)     {         this.Configuration = configuration;     }       public TConfiguration Configuration     {         get;         privateset;     }       protectedApplication CurrentApplication     {         get;         set;     }       publicabstractvoid Start(Microsoft.ComplexEventProcessing.Application cepApplication);       publicabstractvoid Stop();       protectedEventComponentDefinition GetConsumer(string name)     {         if (Configuration.EventConsumers.ContainsKey(name))         {             return Configuration.EventConsumers[name];         }         if (Configuration.DefaultConsumer != null)         {             return Configuration.DefaultConsumer;         }         thrownewInvalidOperationException(string.Format(ExceptionMessages.CONSUMER_NOT_FOUND, name));     }       protectedEventComponentDefinition GetProducer(string name)     {         if (Configuration.EventProducers.ContainsKey(name))         {             return Configuration.EventProducers[name];         }           thrownewInvalidOperationException(string.Format(ExceptionMessages.PRODUCER_NOT_FOUND, name));     }   } We also have nice “helper” methods to get the producers and the consumers. Since a consumer isn’t really required, we also have a default consumer – the null consumer – in case it’s not specified. However, the producer is absolutely required so if it’s not found (not provided), we throw an exception. And since the interface has both a Start and a Stop, we can use this nice layer of abstraction to manage our running queries in a way that is abstracted from the underlying StreamInsight API in a consistent method regardless of the API that we are using for the queries. From here, we’ll create more specific implementations for the Reactive and the Query models. ProcessBuilder Implementation With the Reactive model introduced in StreamInsight 2.1, the process is the unit of execution. Related queries are started, executed and stopped as a single unit. You can have multiple bindings in each process for both input and output and they all start and stop together. So … it makes sense that our ProcessBuilder will build a single process with all of the related streams. bound together in that process. We’ll also abstract the code that we need to write for every source stream and for binding every output stream. The trick for handling the bindings is simple … have them added to a list that gets built up and then, when it’s time, bind them all together using With and run them in a single process. Of course, abstracting the bindings also allows us to do this pretty easily. Overriding the StreamBuilder’s Start method allows us to wrap up all of the necessary housekeeping to get started as well as to bind all of the streams together and run them in a single process. We’ll also define a “CreateStreams” method (as abstract) … our concrete implementations will override this method to do the work of creating the streams. ProcessBuilder publicabstractclassProcessBuilder<TConfiguration>:StreamBuilder<TConfiguration>     where TConfiguration:StreamBuilderConfiguration {     protected ProcessBuilder(TConfiguration configuration) : base(configuration)     {     }       privateList<IRemoteBinding> _bindings;     privateIDisposable _runningBindings;       publicoverridevoid Start(Application cepApplication)     {         _bindings = newList<IRemoteBinding>();         CurrentApplication = cepApplication;         CreateStreams();           var bindingList = _bindings.Skip(1).Aggregate(_bindings.First(),             (current, source) => current.With(source));         _runningBindings = bindingList.Run(Configuration.Name);       }       publicoverridevoid Stop()     {         if (_runningBindings != null)         {             _runningBindings.Dispose();         }     }       protectedIQStreamable<TPayload> CreateStream<TPayload>(string producerName, EventShape eventShape)     {         var producer = Configuration.EventProducers[producerName];         returnRxStream<TPayload>.Create(CurrentApplication, producer.ComponentType, producer.Configuration, eventShape);     }       protectedIQStreamable<TPayload> CreateStream<TPayload>(string producerName, EventShape eventShape,         AdvanceTimeSettings advanceTimeSettings)     {         var producer = Configuration.EventProducers[producerName];         returnRxStream<TPayload>.Create(CurrentApplication, producer.ComponentType, producer.Configuration, eventShape, advanceTimeSettings);     }       protectedvoid Bind<TPayload>(IQStreamable<TPayload> stream, string consumerName, EventShape eventShape)     {         var consumer = GetConsumer(consumerName);         _bindings.Add(stream.ToBinding(CurrentApplication, consumer, eventShape));       }       protectedabstractvoid CreateStreams(); } Creating a new process builder is super-simple a really cuts down on the amount of code that you need to create and bind your streams. Both your events producers and consumers are now named resources – the process builder has no idea what the source or the targets are and it doesn’t need to know. SampleProcessBuilder publicclassSampleProcessBuilder : ProcessBuilder<SampleStreamBuilderConfig> {     public SampleProcessBuilder(SampleStreamBuilderConfig configuration)         : base(configuration)     {     }       protectedoverridevoid CreateStreams()     {           var data = CreateStream<TestDataEvent>("SourceData",                             EventShape.Point,                             Configuration.GetAdvanceTimeSettings());           Bind(data, "Output", EventShape.Point);         var aggregate = from d in data             group d by d.ItemId             into itemGroups             from i in itemGroups.HoppingWindow(TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(2))             selectnew             {                 ItemId = itemGroups.Key,                 Average = i.Avg(e => e.Value)             };         Bind(aggregate, "Aggregate", EventShape.Point);     } } CepStreamBuilder Implementation In keeping with our principle to keep the APIs as close as possible, we’ll also have a CepStreamBuilder that looks and acts the same way as our ProcessBuilder. However, we have a little bit of extra work involved since there is no concept of a Process with multiple inputs and outputs that run as a single unit of execution with the pre-2.1 model, we have a little extra work to do. Prior to the reactive model, the query was the unit of execution – it could have multiple input streams but only one output. Each query was independent of all other queries as well and there was no way to conveniently run related queries together. Reuse of query outputs was done through a technique called Dynamic Query Composition (DQC) that re-enqueued the output of one query back into a new stream, which then got output to another query. If you looked at the details of how this was done, it was a special input/output adapter pair. In essence, it was a subject without the flexibility or extensibility that we have with subjects in the Reactive model. Finally, we’ll also need to take into consideration the potential for name clashes when we name our queries – not something that we needed to worry about when working with the process container – and a way to “group” our related queries together – something the process container does inherently but not something that the query model did for you. QueryBuilder publicabstractclassQueryBuilder<TConfiguration> : StreamBuilder<TConfiguration>     where TConfiguration : StreamBuilderConfiguration {       publicList<Query> _queries;       protected QueryBuilder(TConfiguration configuration) : base(configuration)     {     }       publicoverridevoid Start(Application cepApplication)     {         _queries = newList<Query>();         CurrentApplication = cepApplication;         CreateStreams();           //Start all of the queries.         foreach (var query in _queries)         {             query.Start();         }       }       publicoverridevoid Stop()     {         foreach (var query in _queries)         {             query.Stop();         }         foreach (var query in _queries)         {             query.Delete();         }         _queries = null;     }       protectedabstractvoid CreateStreams();       protectedCepStream<TPayload> CreateStream<TPayload>(string producerName, EventShape eventShape)     {         var producer = Configuration.EventProducers[producerName];         returnCepStream<TPayload>.Create(CurrentApplication,             GetSourceStreamName(producerName), producer.ComponentType,             producer.Configuration, eventShape);     }       protectedCepStream<TPayload> CreateStream<TPayload>(string producerName, EventShape eventShape,         AdvanceTimeSettings advanceTimeSettings)     {         var producer = Configuration.EventProducers[producerName];         returnCepStream<TPayload>.Create(CurrentApplication,             GetSourceStreamName(producerName), producer.ComponentType,             producer.Configuration, eventShape, advanceTimeSettings);     }       protectedvoid Bind<TPayload>(CepStream<TPayload> stream, string consumerName, EventShape eventShape)     {         var consumer = GetConsumer(consumerName);         _queries.Add(stream.ToQuery(CurrentApplication, GetQueryName(consumerName),             GetQueryDescription(consumerName), consumer.ComponentType,             consumer.Configuration, eventShape, StreamEventOrder.FullyOrdered));       }       privatestring GetQueryDescription(string consumerName)     {         return"Query from QueryBuilder [" + Configuration.Name + "] for consumer [" + consumerName + "]";     }       protectedstring GetQueryName(string consumerName)     {         return Configuration.Name + "." + consumerName;     }     protectedstring GetSourceStreamName(string producerName)     {         return Configuration.Name + "." + producerName + ".Stream";     }   } And now, creating a query builder is, like the process builder, super-simple. In fact, we can copy and paste the code from the process building into the query builder. The only difference is the name of the base class. SampleQueryBuilder publicclassSampleQueryBuilder:QueryBuilder<SampleStreamBuilderConfig> {     public SampleQueryBuilder(SampleStreamBuilderConfig configuration) : base(configuration)     {     }       protectedoverridevoid CreateStreams()     {         var data = CreateStream<TestDataEvent>("SourceData",                                         EventShape.Point,                                         Configuration.GetAdvanceTimeSettings());           Bind(data, "Output", EventShape.Point);         var aggregate = from d in data                         group d by d.ItemId                             into itemGroups                             from i in itemGroups.HoppingWindow(TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(2))                             selectnew                             {                                 ItemId = itemGroups.Key,                                 Average = i.Avg(e => e.Value)                             };         Bind(aggregate, "Aggregate", EventShape.Point);     } } Using the Stream Builders Again, this is pretty simple. You create the configuration. You create the builder. You start the builder. You stop the builder. Regardless of whether it’s a QueryBuilder or a ProcessBuilder, the code to do this is the same and it bound to the StreamBuilder base class, rather than the specific implementations. Most of the code is now, actually, in creating the configuration to pass to the builder. Right now this is hard-coded but, in time, it won’t be; we’re pretty close to having all of the core, foundational pieces in place to further abstract even the configuration. RunBuilder privatestaticvoid RunBuilder(Application cepApplication, Type builderType) {     var builderConfig = GetSampleStreamBuilderConfig();     var builder = (IStreamBuilder) Activator.CreateInstance(builderType, builderConfig);     builder.Start(cepApplication);       Console.WriteLine("Builder is running. Press ENTER to stop.");     Console.ReadLine();     builder.Stop(); }   privatestaticSampleStreamBuilderConfig GetSampleStreamBuilderConfig() {     //Create the configuration.     var streamProviderConfig = newSampleStreamBuilderConfig()     {         Name = "Provider1",         CtiDelay = TimeSpan.FromMilliseconds(750),         CtiInterval = TimeSpan.FromMilliseconds(250)     };       //Add the producer.     streamProviderConfig.EventProducers.Add("SourceData", newEventComponentDefinition()     {         Configuration = newTestDataInputConfig()         {             NumberOfItems = 20,             RefreshInterval = TimeSpan.FromMilliseconds(500),             TimestampIncrement = TimeSpan.FromMilliseconds(500),             AlwaysUseNow = true,             EnqueueCtis = false         },         ComponentType = typeof (TestDataInputFactory)     });       //Add the consumer.     streamProviderConfig.EventConsumers.Add("Output", newEventComponentDefinition()     {         Configuration = newConsoleOutputConfig()         {             ShowCti = true,             CtiEventColor = ConsoleColor.Blue,             InsertEventColor = ConsoleColor.Green         },         ComponentType = typeof (ConsoleOutputFactory)     });     streamProviderConfig.EventConsumers.Add("Aggregate", newEventComponentDefinition()     {         Configuration = newConsoleOutputConfig()         {             ShowCti = true,             CtiEventColor = ConsoleColor.White,             InsertEventColor = ConsoleColor.Yellow         },         ComponentType = typeof (ConsoleOutputFactory)     });     return streamProviderConfig; } Wrapping Up We’ve made a good deal of progress with this revision. We focused first on creating abstractions for the producers and consumers – they’re the beginning and the end, after all, of your StreamInsight application – and now added a framework for the chewy middle. I had a couple of questions from folks as to why I made some of the architectural decisions, particularly with having factories for the Reactive-model sinks and producers. Hopefully some of that is a little clearer now but, as we move forward, there will be additional things that we may do with the factories. There are also some changes and tweaks included in this download that aren’t described in the blog, primarily around the serialization/deserialization for the event consumers and producers, that were done while I was building the demo for Baton Rouge Sql Saturday. There’s still some work to do around these to optimize the performance, particularly for the serialization side of the equation, that I’ll get to sometime later. I was, however, quite proud that I was able to shave about 3 ticks off the deserialization performance. Now, this doesn’t sound like much (and it’s not) but when you realize that you may be doing this process tens of thousands of times per second, it adds up really quickly. So, what’s next? There are two key pieces that we’ll need to put into place … abstracting the application hosting mode and then a configuration system. And then lots to build out … common query patterns in Linq macros, some aggregates and operators and building out more of the stream builder classes/model, to name a couple. Oh, and let’s not forget that we’ll need to have some of the basic, common event producers and consumers. Last, but not least, you can download the code from here:

How long did that edge event take?–The Sequel

Code Sample | StreamInsight
This is beginning to become a recurring theme around here. It turns out that there is yet another way to get a composite event that will tell you how long a particular event “took”. This goes back to the initial post, where I was using a subject to create a new event that included the original payload but with a Duration property, allowing you to get the total milliseconds for a specific event. You see, subjects have become one of my mostest favoritest features with the whole Reactive model in StreamInsight 2.1; there’s just so much that they let you do that you couldn’t do before. But this particular scenario was possible without subjects and it really should have smacked me in the face. Rather than using a subject, we can use a UDSO (specifically, an edge stream operator) to achieve the exact same thing but with fewer moving parts. Oh … and using a UDSO doesn’t create an independent timeline, whereas using the Subject does create a new timeline in the process. Overall, the UDSO is far simpler, easier to understand and you don’t have to worry about any CTI violations or funkiness thereabouts. With that, here’s the operator: EventDurationOperator public class DurationEvent<TPayload>{     public TPayload Payload;     public double TotalMilliseconds; }   [DataContract] public sealed class EventDurationOperator<TPayload> : CepEdgeStreamOperator<TPayload, DurationEvent<TPayload>> {    public override bool IsEmpty    {        get { return true; }    }      public override DateTimeOffset? NextCti    {        get { return null; }    }      public override IEnumerable<DurationEvent<TPayload>> ProcessEvent(EdgeEvent<TPayload> inputEvent)    {        if(inputEvent.EdgeType == EdgeType.End){             //Create the new duration event.             yield return new DurationEvent<TPayload>{                 Payload = inputEvent.Payload,                 TotalMilliseconds = (inputEvent.EndTime - inputEvent.StartTime).TotalMilliseconds             };         }    } } As you can see, the output here is identical to the output from the previous solution but the code is far, far simpler. Using the operator is simpler than the subject as well. Using the Operator var source = items.ToEdgeStreamable(e => e.GetEvent(startTime), AdvanceTimeSettings.IncreasingStartTime); var results = source.Scan(() => new EventDurationOperator<DataItem>()); That makes the operator, rather than the subject, the winner here … it keeps true to that Golden Rule of Programming – the KISS principle. (That’s Keep It Simple Stupid, in case you were wondering.)

Dual-Mode Data Sinks - Part I

Code Sample | StreamInsight
Now that we have the input completed, we need to start working on the output adapters. As with the input adapters/sources, we’ll create an architecture that allows you to use the same core code whether you are using the pre-2.1 adapter model or the 2.1 and later sink model. As with our StreamInputEvent, we’ll create an abstraction that allows us to handle any event shape with the same code. This StreamOutputEvent should have properties that express all of the possible variations in event shapes as well as a generic property to hold the payload. Now, if you look at several of the StreamInsight samples, you’ll notice that they only send the payload to the sink. Certainly, that makes a couple of things easier but I don’t think that it’s really the best way to do things. A key aspect of everything StreamInsight is the temporal properties and you lose that if you don’t send the full event shape to your sink. And … the shape is really only a way of expressing the event. Internally, all events, regardless of shape, have start and end times that control their lifetime in the query engine. The shape really becomes a way of how you want to “see” the events and handle them in your output. There’s nothing that stops you from expressing the same query as an edge, a point or an interval. It will impact when the event gets “released” to the output adapters/sink by the engine but really, it’s just a matter of how you want to see the event and when you want to get it to your output. But, before I go any further, here’s our StreamOutputEvent: StreamOutputEvent publicclassStreamOutputEvent<TPayload> {     ///<summary>     /// Creates an output event from a source event.     ///</summary>     ///<param name="sourceEvent">The source event.</param>     ///<returns></returns>     publicstaticStreamOutputEvent<TPayload> Create(PointEvent<TPayload> sourceEvent)     {         var outputEvent = newStreamOutputEvent<TPayload>()             {                 StartTime = sourceEvent.StartTime,                 EventKind = sourceEvent.EventKind,                 EventShape = EventShape.Point             };         if (sourceEvent.EventKind == EventKind.Insert)         {             outputEvent.Payload = sourceEvent.Payload;         }         return outputEvent;     }     ///<summary>     /// Creates an output event from a source event.     ///</summary>     ///<param name="sourceEvent">The source event.</param>     ///<returns></returns>     publicstaticStreamOutputEvent<TPayload> Create(IntervalEvent<TPayload> sourceEvent)     {         var outputEvent = newStreamOutputEvent<TPayload>()         {             StartTime = sourceEvent.StartTime,             EventKind = sourceEvent.EventKind,             EventShape = EventShape.Interval         };         if (sourceEvent.EventKind == EventKind.Insert)         {             outputEvent.EndTime = sourceEvent.EndTime;             outputEvent.Payload = sourceEvent.Payload;         }         return outputEvent;     }     ///<summary>     /// Creates an output event from a source event.     ///</summary>     ///<param name="sourceEvent">The source event.</param>     ///<returns></returns>     publicstaticStreamOutputEvent<TPayload> Create(EdgeEvent<TPayload> sourceEvent)     {         var outputEvent = newStreamOutputEvent<TPayload>()         {             StartTime = sourceEvent.StartTime,             EventKind = sourceEvent.EventKind,             EventShape = EventShape.Edge         };         if (sourceEvent.EventKind == EventKind.Insert)         {             outputEvent.Payload = sourceEvent.Payload;             outputEvent.EdgeType = sourceEvent.EdgeType;             if (sourceEvent.EdgeType == Microsoft.ComplexEventProcessing.EdgeType.End)             {                 outputEvent.EndTime = sourceEvent.EndTime;             }         }         return outputEvent;     }     publicDateTimeOffset StartTime { get; privateset;  }     publicEventKind EventKind { get; privateset; }     publicDateTimeOffset? EndTime { get; privateset;  }     publicEventShape EventShape { get; privateset;  }     publicEdgeType? EdgeType { get; privateset; }     public TPayload Payload { get; privateset; } } With constructors for each different event shape, this is something that we can easily create from an event stream and then send to a single set of code that handles the outbound event. When creating the sink and hooking it to the stream, it’s a matter of how you create your observer and the type specified for the TElement. Very simply, the key thing that dictates what StreamInsight “sends” to your sink is the type for the observer’s generic class parameter. If you specify IObserver<TPayload>, you’ll only get the payload. However, if you specify IObserver<PointEvent<TPayload>>, you’ll get a point event (and so on for intervals and edges). Since our event consumer should be able to consume events of any shape, we will actually need to implement the observer interface for each of the shapes. While it may be tempting to try to implement one interface based on TypedEvent<T>, it won’t work. Yes, I tried. But StreamInsight requires that your sinks specify the payload as the type for the observer or one of the TypedEvent<T> child classes. If you don’t specify an event shape, StreamInsight will handle the events and release them to the output adapter as though they were point events. For some very basic scenarios, this works. But when you start getting into some of the more interesting scenarios for StreamInsight, you’ll want to do a lot more with your output than to view it as a point. But … this is for the next post. Let’s get back to our implementation. As we did with our input producer, we’ll create a common, abstract base class for all of our event consumers. Our concrete consumers will inherit from this class and handle whatever is necessary to write the data to our target, whatever it may be. Again, as with the producer, we’ll specify a configuration class; while the reactive StreamInsight API no longer requires it, the reality is that you will want to have a configuration class for your sinks; you don’t want to hard-code things like database connection strings, web service target URIs or anything like that in your event consumers. But the key thing that our base class will do is to implement the interface for each of the event shapes, translate them to our StreamOutputEvent and then send to our actual event consumer’s code. StreamEventConsumer publicabstractclassStreamEventConsumer<TPayloadType, TConfigType> :     IObserver<PointEvent<TPayloadType>>,     IObserver<EdgeEvent<TPayloadType>>,     IObserver<IntervalEvent<TPayloadType>> {     protected StreamEventConsumer(TConfigType configuration)     {         this.Configuration = configuration;     }     public TConfigType Configuration { get; privateset; }     publicabstractvoid Completed();     publicabstractvoid Error(Exception error);     publicabstractvoid EventReceived(StreamOutputEvent<TPayloadType> outputEvent);     publicvoid OnNext(PointEvent<TPayloadType> value)     {         EventReceived(StreamOutputEvent<TPayloadType>.Create((PointEvent<TPayloadType>)value));     }     publicvoid OnNext(IntervalEvent<TPayloadType> value)     {         EventReceived(StreamOutputEvent<TPayloadType>.Create((IntervalEvent<TPayloadType>)value));     }     publicvoid OnNext(EdgeEvent<TPayloadType> value)     {         EventReceived(StreamOutputEvent<TPayloadType>.Create((EdgeEvent<TPayloadType>)value));     }     publicvoid OnCompleted()     {         Completed();     }     publicvoid OnError(Exception error)     {         Error(error);     } } From here, we’ll continue to build the architecture much the same way that we built the event producers. For each consumer, we’ll have a factory that handles the details of creating and starting the consumer based on a common interface. ISinkFactory interfaceISinkFactory {     IObserver<PointEvent<TPayload>> CreatePointObserverSink< TPayload>(object config);     IObserver<EdgeEvent<TPayload>> CreateEdgeObserverSink<TPayload>(object config);     IObserver<IntervalEvent<TPayload>> CreateIntervalObserverSink<TPayload>(object config); } Between StreamEventConsumer and ISinkFactory, it’s now a small step to create a concrete factory and consumer – as well as the adapters. For simplicity’s sake, we’ll use a console consumer. Console Data Consumer publicclassConsoleDataConsumer<TPayloadType>:StreamEventConsumer<TPayloadType, ConsoleOutputConfig> {     public ConsoleDataConsumer(ConsoleOutputConfig configuration) : base(configuration)     {     }     publicoverridevoid Completed()     {         //Nothing necessary.     }     publicoverridevoid Error(Exception error)     {         Console.WriteLine("Error occurred:" + error.ToString());     }     publicoverridevoid EventReceived(StreamOutputEvent<TPayloadType> outputEvent)     {         if (outputEvent.EventKind == EventKind.Insert)         {             Console.ForegroundColor = Configuration.InsertEventColor;             Console.WriteLine("Insert Event Received at " + outputEvent.StartTime);         }         elseif (Configuration.ShowCti)         {             Console.ForegroundColor = Configuration.CtiEventColor;             Console.WriteLine("CTI event received at " + outputEvent.StartTime);         }         Console.ResetColor();     } } Our factory handles the dirty details of hooking up to our source streams as well as our output adapters. By forcing the factory to implement methods for each event shape we both ensure that we get the interface that we need when creating the sink and tying it to our streams as well as the opportunity to say, in code, that a particular output sink doesn’t support specific shapes, should that be appropriate. ConsoleOutputFactory publicclassConsoleOutputFactory:ISinkFactory , ITypedOutputAdapterFactory<ConsoleOutputConfig>       {         publicIObserver<PointEvent<TPayload>> CreatePointObserverSink<TPayload>(object config)         {             returnnewConsoleDataConsumer<TPayload>((ConsoleOutputConfig)config);         }         publicIObserver<EdgeEvent<TPayload>> CreateEdgeObserverSink<TPayload>(object config)         {             returnnewConsoleDataConsumer<TPayload>((ConsoleOutputConfig)config);         }         publicIObserver<IntervalEvent<TPayload>> CreateIntervalObserverSink<TPayload>(object config)         {             returnnewConsoleDataConsumer<TPayload>((ConsoleOutputConfig)config);         }         publicOutputAdapterBase Create<TPayload>(ConsoleOutputConfig configInfo, EventShape eventShape)         {             switch (eventShape)             {                 caseEventShape.Interval:                     returnnewObserverTypedIntervalOutputAdapter<TPayload>(CreateIntervalObserverSink<TPayload>(configInfo));                     break;                 caseEventShape.Edge:                     returnnewObserverTypedEdgeOutputAdapter<TPayload>(CreateEdgeObserverSink<TPayload>(configInfo));                     break;                 caseEventShape.Point:                     returnnewObserverTypedPointOutputAdapter<TPayload>(CreatePointObserverSink<TPayload>(configInfo));                     break;                 default:                     thrownewArgumentOutOfRangeException("eventShape");             }         }         publicvoid Dispose()         {             //throw new NotImplementedException();         }     } We’ve not touched on the output adapters yet so now’s the time to introduce them; they are, after all, already referenced in our factory. As before, we have a single factory for our producers using the 2.1+ Reactive model as well as our legacy adapter mode. As with our input adapters, the output adapters are relatively thin wrappers around our event consumers that handle the details of lifetime. Unlike the input adapters, with the output adapters, we may well get some data after our “stop” event and we want to make sure that we dequeue all events before shutting down. To control this a little better, we use Monitor.Enter and Monitor.Exit directly rather than the basic lock{} block provided by C#. The lock block, by the way, creates, behind the scenes, a Monitor.Enter/Monitor.Exit pair. However, using this directly allows us to minimize the possibility of deadlocks if we get into a scenario where we are actively dequeuing events when we get a Resume call. By using Monitor.TryEnter(), we can attempt to enter our dequeuing thread from other threads without blocking. If the lock has already been acquired, we don’t need to spin up another thread to dequeue and we certainly don’t need to block waiting for a lock that we won’t actually need once we get it. Our dequeue thread will continue to dequeue 1 event at a time until nothing is left in the queue. And we need to make sure that the dequeue operation is synchronized – only 1 thread can dequeue at a time anyway. Adding multiple threads to the dequeue operation typically won’t help us and we want to make sure that we have all available threads available to process actual query results. Now … once we’ve dequeued, we may want to use techniques to multi-thread sending the results to the final target. But … our actual dequeue from each query/process should be single threaded. Keep in mind, however, that you’ll have multiple, single-threaded output sinks in most real-world applications. You will be multi-threaded, have no worries there. And calls into our event consumers can come from any thread, which is why we need to use locks to make sure that we’re properly synchronized. This is particularly important when our output adapter is stopped. After stop is called, we’ll get one more change to empty our queue. We use the monitor to make sure that we do empty all available events from the queue before calling Stopped(). This ensures that we’ll have a nice, clean shutdown with no hangs and no ObjectDisposedExcetpions. Point Output Adapter publicclassObserverTypedPointOutputAdapter<TPayloadType>         : TypedPointOutputAdapter<TPayloadType>     {         privatereadonlyIObserver<PointEvent<TPayloadType>> _sinkObserver;         public ObserverTypedPointOutputAdapter(IObserver<PointEvent<TPayloadType>> sinkObserver)         {             _sinkObserver = sinkObserver;         }         publicoverridevoid Stop()         {             try             {                 Monitor.Enter(_monitorObject);                 //On last round to dequeue                 EmptyQueue();                 //Completed                 _sinkObserver.OnCompleted();             }             finally             {                 Monitor.Exit(_monitorObject);             }             base.Stop();             Stopped();         }         publicoverridevoid Resume()         {             System.Threading.Thread thd = newThread(DequeueEvents);             thd.Start();         }         publicoverridevoid Start()         {             System.Threading.Thread thd = newThread(DequeueEvents);             thd.Start();         }         privateobject _monitorObject = newobject();         privatevoid DequeueEvents()         {             if (this.AdapterState != AdapterState.Running)             {                 return;             }             //Ensures only 1 thread is dequeuing and no other threads are blocked.             if (Monitor.TryEnter(_monitorObject))             {                 try                 {                     EmptyQueue();                 }                 catch (Exception ex)                 {                     _sinkObserver.OnError(ex);                 }                 finally                 {                     Monitor.Exit(_monitorObject);                     this.Ready();                 }                                  }         }         privatevoid EmptyQueue()         {             PointEvent<TPayloadType> dequeuedEvent;                          while (this.Dequeue(out dequeuedEvent) == DequeueOperationResult.Success  )             {                     _sinkObserver.OnNext(dequeuedEvent);             }         }     } Now that we have all of the core pieces in place, let’s take a look at what we need to do to hook our sink up to the console output. It’s actually very simple. Hooking up to a stream privatestaticvoid RunProcess(Application cepApplication) {     var config = newTestDataInputConfig (){         NumberOfItems=20,         RefreshInterval=TimeSpan.FromMilliseconds(500)     };     var data = RxStream<TestDataEvent>.Create(cepApplication, typeof (TestDataInputFactory), config, EventShape.Point);     var factory = new ConsoleOutputAdapter.ConsoleOutputFactory();               var sink = cepApplication.DefineObserver(() => factory.CreatePointObserverSink<TestDataEvent>                                                        (new ConsoleOutputAdapter.ConsoleOutputConfig()                                                            {                                                                ShowCti = true,                                                                CtiEventColor = ConsoleColor.Blue,                                                                InsertEventColor = ConsoleColor.Green                                                            }));     data.Bind(sink).Run(); } There’s one thing that you may notice … the sink needs to know all of the details about the data class. This is far from ideal … and one of the things that I found so powerful about the untyped adapter model – you weren’t tied to the schema of your data classes. There are various ways that we can handle this but that’s a topic for the next entry. Until then, you can download the code from my SkyDrive.