Ruminations of idle rants and ramblings of a code monkey

Dual-Mode Data Sinks - Part I

Code Sample | StreamInsight
Now that we have the input completed, we need to start working on the output adapters. As with the input adapters/sources, we’ll create an architecture that allows you to use the same core code whether you are using the pre-2.1 adapter model or the 2.1 and later sink model. As with our StreamInputEvent, we’ll create an abstraction that allows us to handle any event shape with the same code. This StreamOutputEvent should have properties that express all of the possible variations in event shapes as well as a generic property to hold the payload. Now, if you look at several of the StreamInsight samples, you’ll notice that they only send the payload to the sink. Certainly, that makes a couple of things easier but I don’t think that it’s really the best way to do things. A key aspect of everything StreamInsight is the temporal properties and you lose that if you don’t send the full event shape to your sink. And … the shape is really only a way of expressing the event. Internally, all events, regardless of shape, have start and end times that control their lifetime in the query engine. The shape really becomes a way of how you want to “see” the events and handle them in your output. There’s nothing that stops you from expressing the same query as an edge, a point or an interval. It will impact when the event gets “released” to the output adapters/sink by the engine but really, it’s just a matter of how you want to see the event and when you want to get it to your output. But, before I go any further, here’s our StreamOutputEvent: StreamOutputEvent publicclassStreamOutputEvent<TPayload> {     ///<summary>     /// Creates an output event from a source event.     ///</summary>     ///<param name="sourceEvent">The source event.</param>     ///<returns></returns>     publicstaticStreamOutputEvent<TPayload> Create(PointEvent<TPayload> sourceEvent)     {         var outputEvent = newStreamOutputEvent<TPayload>()             {                 StartTime = sourceEvent.StartTime,                 EventKind = sourceEvent.EventKind,                 EventShape = EventShape.Point             };         if (sourceEvent.EventKind == EventKind.Insert)         {             outputEvent.Payload = sourceEvent.Payload;         }         return outputEvent;     }     ///<summary>     /// Creates an output event from a source event.     ///</summary>     ///<param name="sourceEvent">The source event.</param>     ///<returns></returns>     publicstaticStreamOutputEvent<TPayload> Create(IntervalEvent<TPayload> sourceEvent)     {         var outputEvent = newStreamOutputEvent<TPayload>()         {             StartTime = sourceEvent.StartTime,             EventKind = sourceEvent.EventKind,             EventShape = EventShape.Interval         };         if (sourceEvent.EventKind == EventKind.Insert)         {             outputEvent.EndTime = sourceEvent.EndTime;             outputEvent.Payload = sourceEvent.Payload;         }         return outputEvent;     }     ///<summary>     /// Creates an output event from a source event.     ///</summary>     ///<param name="sourceEvent">The source event.</param>     ///<returns></returns>     publicstaticStreamOutputEvent<TPayload> Create(EdgeEvent<TPayload> sourceEvent)     {         var outputEvent = newStreamOutputEvent<TPayload>()         {             StartTime = sourceEvent.StartTime,             EventKind = sourceEvent.EventKind,             EventShape = EventShape.Edge         };         if (sourceEvent.EventKind == EventKind.Insert)         {             outputEvent.Payload = sourceEvent.Payload;             outputEvent.EdgeType = sourceEvent.EdgeType;             if (sourceEvent.EdgeType == Microsoft.ComplexEventProcessing.EdgeType.End)             {                 outputEvent.EndTime = sourceEvent.EndTime;             }         }         return outputEvent;     }     publicDateTimeOffset StartTime { get; privateset;  }     publicEventKind EventKind { get; privateset; }     publicDateTimeOffset? EndTime { get; privateset;  }     publicEventShape EventShape { get; privateset;  }     publicEdgeType? EdgeType { get; privateset; }     public TPayload Payload { get; privateset; } } With constructors for each different event shape, this is something that we can easily create from an event stream and then send to a single set of code that handles the outbound event. When creating the sink and hooking it to the stream, it’s a matter of how you create your observer and the type specified for the TElement. Very simply, the key thing that dictates what StreamInsight “sends” to your sink is the type for the observer’s generic class parameter. If you specify IObserver<TPayload>, you’ll only get the payload. However, if you specify IObserver<PointEvent<TPayload>>, you’ll get a point event (and so on for intervals and edges). Since our event consumer should be able to consume events of any shape, we will actually need to implement the observer interface for each of the shapes. While it may be tempting to try to implement one interface based on TypedEvent<T>, it won’t work. Yes, I tried. But StreamInsight requires that your sinks specify the payload as the type for the observer or one of the TypedEvent<T> child classes. If you don’t specify an event shape, StreamInsight will handle the events and release them to the output adapter as though they were point events. For some very basic scenarios, this works. But when you start getting into some of the more interesting scenarios for StreamInsight, you’ll want to do a lot more with your output than to view it as a point. But … this is for the next post. Let’s get back to our implementation. As we did with our input producer, we’ll create a common, abstract base class for all of our event consumers. Our concrete consumers will inherit from this class and handle whatever is necessary to write the data to our target, whatever it may be. Again, as with the producer, we’ll specify a configuration class; while the reactive StreamInsight API no longer requires it, the reality is that you will want to have a configuration class for your sinks; you don’t want to hard-code things like database connection strings, web service target URIs or anything like that in your event consumers. But the key thing that our base class will do is to implement the interface for each of the event shapes, translate them to our StreamOutputEvent and then send to our actual event consumer’s code. StreamEventConsumer publicabstractclassStreamEventConsumer<TPayloadType, TConfigType> :     IObserver<PointEvent<TPayloadType>>,     IObserver<EdgeEvent<TPayloadType>>,     IObserver<IntervalEvent<TPayloadType>> {     protected StreamEventConsumer(TConfigType configuration)     {         this.Configuration = configuration;     }     public TConfigType Configuration { get; privateset; }     publicabstractvoid Completed();     publicabstractvoid Error(Exception error);     publicabstractvoid EventReceived(StreamOutputEvent<TPayloadType> outputEvent);     publicvoid OnNext(PointEvent<TPayloadType> value)     {         EventReceived(StreamOutputEvent<TPayloadType>.Create((PointEvent<TPayloadType>)value));     }     publicvoid OnNext(IntervalEvent<TPayloadType> value)     {         EventReceived(StreamOutputEvent<TPayloadType>.Create((IntervalEvent<TPayloadType>)value));     }     publicvoid OnNext(EdgeEvent<TPayloadType> value)     {         EventReceived(StreamOutputEvent<TPayloadType>.Create((EdgeEvent<TPayloadType>)value));     }     publicvoid OnCompleted()     {         Completed();     }     publicvoid OnError(Exception error)     {         Error(error);     } } From here, we’ll continue to build the architecture much the same way that we built the event producers. For each consumer, we’ll have a factory that handles the details of creating and starting the consumer based on a common interface. ISinkFactory interfaceISinkFactory {     IObserver<PointEvent<TPayload>> CreatePointObserverSink< TPayload>(object config);     IObserver<EdgeEvent<TPayload>> CreateEdgeObserverSink<TPayload>(object config);     IObserver<IntervalEvent<TPayload>> CreateIntervalObserverSink<TPayload>(object config); } Between StreamEventConsumer and ISinkFactory, it’s now a small step to create a concrete factory and consumer – as well as the adapters. For simplicity’s sake, we’ll use a console consumer. Console Data Consumer publicclassConsoleDataConsumer<TPayloadType>:StreamEventConsumer<TPayloadType, ConsoleOutputConfig> {     public ConsoleDataConsumer(ConsoleOutputConfig configuration) : base(configuration)     {     }     publicoverridevoid Completed()     {         //Nothing necessary.     }     publicoverridevoid Error(Exception error)     {         Console.WriteLine("Error occurred:" + error.ToString());     }     publicoverridevoid EventReceived(StreamOutputEvent<TPayloadType> outputEvent)     {         if (outputEvent.EventKind == EventKind.Insert)         {             Console.ForegroundColor = Configuration.InsertEventColor;             Console.WriteLine("Insert Event Received at " + outputEvent.StartTime);         }         elseif (Configuration.ShowCti)         {             Console.ForegroundColor = Configuration.CtiEventColor;             Console.WriteLine("CTI event received at " + outputEvent.StartTime);         }         Console.ResetColor();     } } Our factory handles the dirty details of hooking up to our source streams as well as our output adapters. By forcing the factory to implement methods for each event shape we both ensure that we get the interface that we need when creating the sink and tying it to our streams as well as the opportunity to say, in code, that a particular output sink doesn’t support specific shapes, should that be appropriate. ConsoleOutputFactory publicclassConsoleOutputFactory:ISinkFactory , ITypedOutputAdapterFactory<ConsoleOutputConfig>       {         publicIObserver<PointEvent<TPayload>> CreatePointObserverSink<TPayload>(object config)         {             returnnewConsoleDataConsumer<TPayload>((ConsoleOutputConfig)config);         }         publicIObserver<EdgeEvent<TPayload>> CreateEdgeObserverSink<TPayload>(object config)         {             returnnewConsoleDataConsumer<TPayload>((ConsoleOutputConfig)config);         }         publicIObserver<IntervalEvent<TPayload>> CreateIntervalObserverSink<TPayload>(object config)         {             returnnewConsoleDataConsumer<TPayload>((ConsoleOutputConfig)config);         }         publicOutputAdapterBase Create<TPayload>(ConsoleOutputConfig configInfo, EventShape eventShape)         {             switch (eventShape)             {                 caseEventShape.Interval:                     returnnewObserverTypedIntervalOutputAdapter<TPayload>(CreateIntervalObserverSink<TPayload>(configInfo));                     break;                 caseEventShape.Edge:                     returnnewObserverTypedEdgeOutputAdapter<TPayload>(CreateEdgeObserverSink<TPayload>(configInfo));                     break;                 caseEventShape.Point:                     returnnewObserverTypedPointOutputAdapter<TPayload>(CreatePointObserverSink<TPayload>(configInfo));                     break;                 default:                     thrownewArgumentOutOfRangeException("eventShape");             }         }         publicvoid Dispose()         {             //throw new NotImplementedException();         }     } We’ve not touched on the output adapters yet so now’s the time to introduce them; they are, after all, already referenced in our factory. As before, we have a single factory for our producers using the 2.1+ Reactive model as well as our legacy adapter mode. As with our input adapters, the output adapters are relatively thin wrappers around our event consumers that handle the details of lifetime. Unlike the input adapters, with the output adapters, we may well get some data after our “stop” event and we want to make sure that we dequeue all events before shutting down. To control this a little better, we use Monitor.Enter and Monitor.Exit directly rather than the basic lock{} block provided by C#. The lock block, by the way, creates, behind the scenes, a Monitor.Enter/Monitor.Exit pair. However, using this directly allows us to minimize the possibility of deadlocks if we get into a scenario where we are actively dequeuing events when we get a Resume call. By using Monitor.TryEnter(), we can attempt to enter our dequeuing thread from other threads without blocking. If the lock has already been acquired, we don’t need to spin up another thread to dequeue and we certainly don’t need to block waiting for a lock that we won’t actually need once we get it. Our dequeue thread will continue to dequeue 1 event at a time until nothing is left in the queue. And we need to make sure that the dequeue operation is synchronized – only 1 thread can dequeue at a time anyway. Adding multiple threads to the dequeue operation typically won’t help us and we want to make sure that we have all available threads available to process actual query results. Now … once we’ve dequeued, we may want to use techniques to multi-thread sending the results to the final target. But … our actual dequeue from each query/process should be single threaded. Keep in mind, however, that you’ll have multiple, single-threaded output sinks in most real-world applications. You will be multi-threaded, have no worries there. And calls into our event consumers can come from any thread, which is why we need to use locks to make sure that we’re properly synchronized. This is particularly important when our output adapter is stopped. After stop is called, we’ll get one more change to empty our queue. We use the monitor to make sure that we do empty all available events from the queue before calling Stopped(). This ensures that we’ll have a nice, clean shutdown with no hangs and no ObjectDisposedExcetpions. Point Output Adapter publicclassObserverTypedPointOutputAdapter<TPayloadType>         : TypedPointOutputAdapter<TPayloadType>     {         privatereadonlyIObserver<PointEvent<TPayloadType>> _sinkObserver;         public ObserverTypedPointOutputAdapter(IObserver<PointEvent<TPayloadType>> sinkObserver)         {             _sinkObserver = sinkObserver;         }         publicoverridevoid Stop()         {             try             {                 Monitor.Enter(_monitorObject);                 //On last round to dequeue                 EmptyQueue();                 //Completed                 _sinkObserver.OnCompleted();             }             finally             {                 Monitor.Exit(_monitorObject);             }             base.Stop();             Stopped();         }         publicoverridevoid Resume()         {             System.Threading.Thread thd = newThread(DequeueEvents);             thd.Start();         }         publicoverridevoid Start()         {             System.Threading.Thread thd = newThread(DequeueEvents);             thd.Start();         }         privateobject _monitorObject = newobject();         privatevoid DequeueEvents()         {             if (this.AdapterState != AdapterState.Running)             {                 return;             }             //Ensures only 1 thread is dequeuing and no other threads are blocked.             if (Monitor.TryEnter(_monitorObject))             {                 try                 {                     EmptyQueue();                 }                 catch (Exception ex)                 {                     _sinkObserver.OnError(ex);                 }                 finally                 {                     Monitor.Exit(_monitorObject);                     this.Ready();                 }                                  }         }         privatevoid EmptyQueue()         {             PointEvent<TPayloadType> dequeuedEvent;                          while (this.Dequeue(out dequeuedEvent) == DequeueOperationResult.Success  )             {                     _sinkObserver.OnNext(dequeuedEvent);             }         }     } Now that we have all of the core pieces in place, let’s take a look at what we need to do to hook our sink up to the console output. It’s actually very simple. Hooking up to a stream privatestaticvoid RunProcess(Application cepApplication) {     var config = newTestDataInputConfig (){         NumberOfItems=20,         RefreshInterval=TimeSpan.FromMilliseconds(500)     };     var data = RxStream<TestDataEvent>.Create(cepApplication, typeof (TestDataInputFactory), config, EventShape.Point);     var factory = new ConsoleOutputAdapter.ConsoleOutputFactory();               var sink = cepApplication.DefineObserver(() => factory.CreatePointObserverSink<TestDataEvent>                                                        (new ConsoleOutputAdapter.ConsoleOutputConfig()                                                            {                                                                ShowCti = true,                                                                CtiEventColor = ConsoleColor.Blue,                                                                InsertEventColor = ConsoleColor.Green                                                            }));     data.Bind(sink).Run(); } There’s one thing that you may notice … the sink needs to know all of the details about the data class. This is far from ideal … and one of the things that I found so powerful about the untyped adapter model – you weren’t tied to the schema of your data classes. There are various ways that we can handle this but that’s a topic for the next entry. Until then, you can download the code from my SkyDrive.

Dual Mode Data Sources-Part III

StreamInsight | Code Sample
In the two previous postings, we would through creating the data sources and exposing them to both the new Reactive-centric API and the “legacy” adapter-centric API. While we’ve accomplished that, I also said that we’d revisit the 2.1 code to add a layer of consistency to the APIs; we’re going to “bring back” factories and integrate these into our API. Why on earth would I do that? Didn’t we just get rid of them to make things simpler? Well, yes, we did just do away with the requirement of a factory but that doesn’t mean that they aren’t a good idea. Implementing the factory pattern will hide the details of creating the actual data source – our current one is pretty simple but others may get quite complex – and provide a layer to “check” requirements before we try to start the query (we’re already doing this with the adapter factory). I also happen to like consistent APIs and keeping consistency whenever practical and possible. We’ll start by creating an interface. We may – and likely will – also wind up creating an abstract base class that implements the interface and handles common functionality but that will be refactored later after we write some more sources and get a better feel of how to best define the base class. And … having both an interface and a base class gives us the greatest level of flexibility when implementing later on; because our API is based on interfaces, we can inherit from other, existing code and/or components and just add the interface. If we based our API exclusively on base classes, the single-inheritance rule would limit what that. This interface will be pretty simple and only have a single method. ISourceFactory /// <summary> /// Interface for data source factories. /// </summary> public interface ISourceFactory {     /// <summary>     /// Creates an observable source.     /// </summary>     /// <typeparam name="TPayload">Type of the payload.</typeparam>     /// <param name="config">Configuration class.</param>     /// <param name="eventShape">Shape of the events</param>     /// <returns></returns>     IObservable<StreamInputEvent<TPayload>> CreateObservableSource<TPayload>(object config, EventShape eventShape); } You may notice a slight different between this and the input adapter factory interface – config is not strongly typed. Yes, we could do that … define a generic interface and then use reflection to call it. But, to be honest, I really didn’t want to go through all of the contortions to make that happen. So the config is an object. Now, when we implement this on our existing factory, we also refactor things a bit to maximize code reuse. Adapter + Source Factory public sealed class TestDataInputFactory : ITypedInputAdapterFactory<TestDataInputConfig>, ISourceFactory     {         [SuppressMessage("Microsoft.Design", "CA1004:GenericMethodsShouldProvideTypeParameter", Justification = "By Design")]         public InputAdapterBase Create<TPayload>(TestDataInputConfig configInfo, EventShape eventShape)         {             CheckPayloadType<TPayload>();             return new ObservableTypedPointInputAdapter<TestDataEvent, TestDataInputConfig>(CreateProducer(configInfo, eventShape));         }         public IObservable<StreamInputEvent<TPayload>> CreateObservableSource<TPayload>(object config, EventShape eventShape)         {             //Check the payload type.             CheckPayloadType<TPayload>();             //Check the config class for the proper type.             TestDataInputConfig typedConfig = config as TestDataInputConfig;             if (typedConfig == null)             {                 //Invalid cast                 throw new ArgumentException("Configuration class must be of type TestDataInputConfig");             }             return (IObservable<StreamInputEvent<TPayload>>)CreateProducer(typedConfig, eventShape);         }         private static void CheckPayloadType<TPayload>()         {             //Check the payload.             if (typeof(TPayload) != typeof(TestDataEvent))             {                 //this won't work.                 //throw an exception.                 throw new InvalidOperationException("Specified type must be of " + typeof(TestDataEvent).FullName);             }         }         private TestDataProducer CreateProducer(TestDataInputConfig config, EventShape eventShape)         {             switch (eventShape)             {                 case EventShape.Point:                     //Create the publisher.                     return new TestDataProducer(config);                 default:                     throw new ArgumentException(string.Format(                         System.Globalization.CultureInfo.InvariantCulture,                         "TestDataInputFactory cannot instantiate adapter with event shape {0}",                         eventShape.ToString()));             }         }         public void Dispose()         {         }     } You’ll notice that both methods … source and adapter … use all of the same validation logic and code. Using this in our existing code isn’t too difficult. Our new “RunProcess()” now looks like the following: Run Process private static void RunProcess(Application cepApplication) {     var config = new TestDataInputConfig (){         NumberOfItems=20,         RefreshInterval=TimeSpan.FromMilliseconds(500)     };     var data = cepApplication.DefineObservable(         () => new TestDataInputFactory().CreateObservableSource<TestDataEvent>(config,EventShape.Point ))         .ToPointStreamable(e => e.GetPointEvent());          var sink = cepApplication.DefineObserver(() => Observer.Create<TestDataEvent>(         e => Console.WriteLine("TestEvent ItemId:{0} Run:{1} Value{2}", e.ItemId, e.RunNumber, e.Value)));     data.Bind(sink).Run(); } We can still make it better, though, and make make it look more like the pre-2.1 API. We’ll start by creating an RxStream class and add methods for creating observables. We’ll also want to make sure that these methods are remotable – so we can work with either a local or a remote instance without any code changes. This was a challenge with the pre-2.1 API; you could pretty easily get yourself into a situation where your code work work on a local, in-process instance but not work at all when you were connecting to a remote instance. With DefineObservable, however, it’s actually defining a remote function that returns an observable and that’s what we call … whether in process or remote. If it works in-process then it’ll work out of process. However, you may wind up getting yourself into a situation with mysterious serialization errors … you need to make sure that whatever you pass to your methods if fully serializable. That’s why our configuration class has the DataContract attribute. With our CreateObservable method, we’ll first check to see if we have a reference to the function (and notice that it is of type Func<>). If not, we check to see if it’s been created and deployed. If not, we create and deploy it. We could also put this same code in a static constructor – it wouldn’t make much difference. The actual work is done by an InstantiateObservable private method and that’s what our defined Observable actually calls. RxStream Observables private static Func<Type, object, EventShape, IQbservable<StreamInputEvent<TPayload>>> _observable; public static IQbservable<StreamInputEvent<TPayload>> CreateObservable(Application cepApplication,     Type sourceFactoryType,     object configInfo,     EventShape eventShape) {     string entityName = "Observable:" + typeof (TPayload).FullName;     if (_observable == null)     {         //Check the application.         if (cepApplication.Entities.ContainsKey(entityName))         {             _observable =                 cepApplication.GetObservable<Type, object, EventShape, StreamInputEvent<TPayload>>(entityName);         }         else         {             //Define and deploy.             _observable = cepApplication.DefineObservable(                 (Type t, object c, EventShape e) => InstantiateObservable(t, c, e));             _observable.Deploy(entityName);         }     }     return _observable.Invoke(sourceFactoryType, configInfo, eventShape); } private static IObservable<StreamInputEvent<TPayload>> InstantiateObservable(Type sourceFactoryType,                                                                               object configInfo,                                                                               EventShape eventShape) {     var sourceFactory = Activator.CreateInstance(sourceFactoryType) as ISourceFactory;     if (sourceFactory == null)     {         throw new ArgumentException("Specified type is not a source factory.");     }     return sourceFactory.CreateObservableSource<TPayload> (configInfo, eventShape); } Now all that we have left to do is to create methods to also create the streams, rather than just the observables. This will be our typical use case but we’ll still keep the observable method public as well to give us the most flexibility when we are actually using this in anger. RxStream Create public static IQStreamable<TPayload> Create(     Application cepApplication, Type sourceFactoryType, object configInfo, EventShape eventShape) {     return Create(cepApplication, sourceFactoryType, configInfo, eventShape, null); } public static IQStreamable<TPayload> Create(     Application cepApplication, Type sourceFactoryType, object configInfo, EventShape eventShape, AdvanceTimeSettings advanceTimeSettings) {       var observable = CreateObservable(cepApplication, sourceFactoryType, configInfo, eventShape);     switch (eventShape)     {         case EventShape.Interval:             return observable.ToIntervalStreamable(e => e.GetIntervalEvent(), advanceTimeSettings);         case EventShape.Edge:             return observable.ToEdgeStreamable(e => e.GetEdgeEvent(), advanceTimeSettings);         case EventShape.Point:             return observable.ToPointStreamable(e => e.GetPointEvent(), advanceTimeSettings);         default:             throw new ArgumentOutOfRangeException("eventShape");     } } Once this is in place, the code to create our stream look remarkably similar to the adapter-centric code: Creating the Stream var config = new TestDataInputConfig (){     NumberOfItems=20,     RefreshInterval=TimeSpan.FromMilliseconds(500) }; var data = RxStream<TestDataEvent>.Create(cepApplication, typeof (TestDataInputFactory), config, EventShape.Point); There’s still more that we’ll need to do … for example, we’ll need to create overloads to import CTIs. But, for now, we’re done with the core API for our input adapters and will be moving on to output adapters/sinks in the next article. You can download the current from SkyDrive. 

Using Subjects as a “Feedback Loop”

Code Sample | StreamInsight
OK, I’m going to switch gears for a minute here. Yes, I’ll be back to building the app but this is just too cool to not share. It’s something that I’ve thought about quite a bit off and on and a forum post got it going again so I put together a sample last night that shows how it can be done. The question … how do I take the output of a query and feed it back in to the query itself? This has a number of uses; in the case of the forum post, the developer wants to use this to update running totals and balances as orders are processed because these results impact the rest of the events. When working with sensors, this is also how you would do a “deadband” … an output isn’t produced unless the new value is more than a certain percentage change from the previous. For the deadband use case, you can’t just compare to the previous reading; if you do this, slow changes can accumulate but a new value is never reported because the change between any two values is less than the deadband threshold. So you need to compare the current reading value to the last reported value. I’ve done with with a UDO that maintains state (in a dictionary) for each of the items but it’s not as elegant as I’d like. And this is cooler anyway.   So … what is a subject? A subject is an observer and and observable; it will republish events that it observes (via IObserver) to others that are observing it (via IObservable). For a little primer on subjects, check out this blog entry from Colin on the StreamInsight team; it gives you a hint of what you can do with these. But that’s just the tip of the iceberg; subjects have become one of my favorite little tricks in StreamInsight because there’s so much that you can do with them. As Colin mentions, observers can come and go – so you can hook up multiple sinks to a single output query (something very difficult prior to 2.1). What Colin doesn’t mention is that your sources (observables) can come and go also … you can hook up multiple source queries that then get fed to one or more sinks. Subjects are created and deployed independently of their sources and sinks – you can create a subject that you are using as a source before you actually hook it up to a sink. Or vice-versa. Subjects also allow you to share query results across processes – similar to what you would do with dynamic query composition but far, far, far more powerful and flexible. Now, because a subject is both a source and a sink, you can use them to take the output results of a query and feed them back in to the source query … creating a feedback loop. Or a time warp. Creating a subject is pretty simple. There isn’t much code required. You just need to know what type you’ll be using. You can bind the subject to the payload type only – you only get the payload and no temporal header – or to the TypedEvent (i.e. PointEvent, IntervalEvent, etc), in which case you will get the payload and the temporal header. Let’s start there. GetOrCreateSubject privatestaticobject _subjectLock = newobject(); privatestaticIRemoteSubject<TSubject, TSubject> GetOrCreateSubject<TSubject>(Application cepApplication, string subjectName) {     lock (_subjectLock)     {         bool subjectExists = cepApplication.Subjects.Any(s => s.Key == subjectName);         if (subjectExists)         {             return cepApplication.GetSubject<TSubject, TSubject>(subjectName);         }         else         {             return cepApplication.CreateSubject(subjectName, () => newSubject<TSubject>());         }     } } As the name says, this method will get or create a subject, depending on if it’s already created or not. If you are wondering about the _subjectLock, that’s there to ensure that only one thread is in the middle of this operation at any time. If you are multi-threading calls to this, it is entirely possible to get into a situation where you are calling the same method with the same arguments on different threads and wind up with exceptions as multiple threads try to create the same subject at the same time. Now, let’s get data running through our test app. I’m using the same app from my recent posts so this will look familiar. We’ll also get a reference to the subject. Getting Data var config = newTestDataInputConfig ()             {                 NumberOfItems=20,                 RefreshInterval=TimeSpan.FromMilliseconds(500)             };             var data = cepApplication.DefineObservable(                     () => newTestDataProducer(config)).ToPointStreamable(e => e.GetPointEvent());             var lastReportedSubject = GetOrCreateSubject<PointEvent<TestDataEvent>>(cepApplication, "LastKnown"); We aren’t sending data to the subject just yet … that will be from the results later … but we do need to get a stream (IQStreamable) from it. But let’s stop for a moment and discuss some of the potential gotchas. First, remember that a stream in StreamInsight is a temporal stream. All of the events exist in time and the stream moves forward based on CTIs. We also know that we’ll be joining this feedback stream with the data stream and, when you do that, StreamInsight will synch to the slowest stream … the joined stream will move forward only when CTIs from both source streams move past the same timestamp. If our feedback loop simply publishes the CTIs that are generated from our result stream, it will never move forward. Why not? Because it’ll be waiting for CTIs to move past a timestamp from the result stream but the result stream can’t move forward because there is no CTI coming from our feedback stream. Did that make sense? It makes my brain hurt thinking about it too much. Anyway, what we need to do is to directly import the CTIs from the data stream. But that gives us another challenge. If we do that, we now have to worry about CTI violations from the data (insert) events being published from the results. You see, the events produced will have start times that are before the last-issued CTI; they must be or they wouldn’t be in the output stream. So we need to account for this when we enqueue the new events by shifting the start time forward so that it is after the last-issued CTI. So let’s get started. To handle the CTIs, we’ll first filter the subject observable source so that it only produces Insert events. Then we will Merge these results it the CTIs from the data stream. The result will be an observable that we can then use as the source for the feedback stream. A note, however, on using the Merge technique to import CTIs … it will give you no protection at all from CTI violations. If you try to create/enqueue an event that violates the CTI, you will get an exception that you can’t really catch and it will cause your query to abort. Feedback Source var lastReportedObservableSource = lastReportedSubject                                     //Get only the inserts from the subject, dropping the CTIs                                     .Where(e => e.EventKind == EventKind.Insert)                                     //Merge with the CTIs from the data stream.                                     .Merge(data.ToPointObservable().Where(e => e.EventKind == EventKind.Cti)); Now that we have our observable, we need to make it a stream. While it is still an observable (IQbservable), we don’t have to follow the temporal/CTI rules because it’s not a temporal stream yet. However, when it’s a stream (IQStreamable), it is a temporal stream and we do have to follow the temporal/CTI rules … so we have to shift the timestamps of our point events so that they don’t violate the CTI. With the data source that we are using, we know that the CTIs are 1 tick past the “batch” of events so shifting the timestamp 1 tick will ensure that we’re good. In the real world, you may need to something a bit more sophisticated … see my previous blog article on importing CTIs in 2.1 for a couple of tips; I will be revisiting this in my app-building series. Feedback Stream var lastReportedSourceStream = lastReportedObservableSource                                     .ToPointStreamable(e => e.EventKind == EventKind.Cti ? e :                                         PointEvent<TestDataEvent>.CreateInsert(e.StartTime.AddTicks(1), e.Payload)); The hard part is done. Now that we have the streams, the rest is a series of StreamInsight queries. First, we want to make sure that we always have the last-known value available in the last reported value stream so we’ll use the (very) common ToSignal pattern. Next, we need to make sure that the initial values reported always go through. If we just join the streams, we’ll never get output because the initial events won’t be in the feedback stream. So we’ll do a LeftAntiJoin so that the events in the data stream with no matches in the feedback stream go through. (Depending on your scenario, it may be perfectly appropriate to “seed” the feedback stream with initial events … in the case of the forum post, these could be current balances when the application starts. If that’s the case, you can probably skip this step.) Then we calculate the percentage change from the last reported value for the current value; in this step, I’m including both the current and the last reported value in the stream (we don’t really need to) so that it’s easier to check the results using the event flow debugger and “see” everything that’s happening. From the calculation, we then select only those items that have changed more than a specified amount. Since we’re using random, very variable data in this sample, I’ve put the “deadband” at 100% but real-world will be somewhat less; anywhere from 1% – 10%, depending on the source. Union the two queries together and we’re done! (with the queries, at least). The Queries //Make sure that we have the last reported value always available. var lastReportedStream = lastReportedSourceStream.ToSignal((e1, e2) => e1.ItemId == e2.ItemId);      //Make sure that our initial values always get reported in the output using LeftAntiJoin. var firstValues =     from i in data.LeftAntiJoin(lastReportedStream, (e1, e2) => e1.ItemId == e2.ItemId)     select i; //Calculate the change from the previous. var calcStream = from d in data                  join                      lr in lastReportedStream                      on d.ItemId equals lr.ItemId                  selectnew                      {                          Value = d,                          LastReported = lr,                          PctChange = Math.Abs(1 - ((d.Value - lr.Value)/lr.Value))                      }; //Select only those that have changed more than 100% var changed = from c in calcStream               where c.PctChange > 1.0               select c.Value; //Union first values with changed values var final = firstValues.Union(changed); All that’s left is to bind the output to the subject, creating the feedback loop as well as a console writer sink so that we see results. Then we run the process. Bind and Run var sink = cepApplication.DefineObserver(() => Observer.Create<TestDataEvent>(     e => Console.WriteLine("TestEvent ItemId:{0} RunNumber:{1} Value{2}", e.ItemId, e.RunNumber, e.Value))); //Bind to subject and console sink so that we see results. //Run the process. final.Bind(lastReportedSubject).With(final.Bind(sink)).Run(); I do have to note that when I run this, StreamInsight hangs on shutdown – not a good sign. I’m not exactly sure why this is happening but if I figure it out, I will update this post. Or … if you figure it out, you can send me a note using the “Contact” link on this blog. You can download the code from my SkyDrive (of course!).

Dual Mode Data Sources–Part II

Code Sample | StreamInsight
In my previous post, I walked through creating a basic structure for creating StreamInsight data sources using the Reactive model introduced with version 2.1. There was quite a bit of plumbing work to get done but now that it’s finished, we’re ready to move on to the next step … taking what we just created and making it available to the pre-2.1 API. I didn’t mention this in the previous post but these articles are the first in what’s going to be a series of articles that walk through building a StreamInsight application. In these articles, we’re going through the process to create our data sources – or, more accurately, the architectural framework for our data sources. We’ll do the same with sinks/output adapters and then we’ll start pulling in some real(istic) data and doing something more interesting. Each article will build on the next so you can follow along. Now, back to our input. Since we already have the data flowing via the IObservable interface, it’s really easy to subscribe to this interface and implement an input adapter that uses the exact same code to generate events. All we need to do subscribe to our producer and enqueue the event when OnNext is called: On Next public void OnNext(StreamInputEvent<TPayloadType> publishedEvent) {     if (AdapterState == AdapterState.Running)     {         var newEvent = publishedEvent.GetPointEvent();         this.Enqueue(ref newEvent);     } } It really is that simple. Well, almost but before we get into that, let’s step back for a second. We have an event producer that creates our events. This producer handles all of the details of connecting to the data source and packaging the data for StreamInsight. Traditionally, we did all of this in the adapter itself but since we’ve already separated it, the adapter really doesn’t need to do much except enqueue. Looking at it that way, why do we need to have separate adapters for each type that we have? Well, the truth is: we don’t. With a little planning, we can have a single set of adapters that subscribe to a producer’s IObservable interface and handle enqueuing the events into StreamInsight as well as manage lifetime. These will be our adapters and they’ll wind up being very thin. So we’ll start by creating a new point typed input adapter. Like the base class, the payload type is a generic argument for the class – there’s no reason why we can’t use this for any type. Because Start() and Resume() are abstract methods on the base class, we have to implement them. We’ll also want to override Stop() and Dispose(bool disposing). Adapter Skeleton     public class ObservableTypedPointInputAdapter<TPayloadType>         :TypedPointInputAdapter<TPayloadType>     {         public override void Start()         {             throw new NotImplementedException();         }           public override void Resume()         {             throw new NotImplementedException();         }           public override void Stop()         {              base.Stop();         }           protected override void Dispose(bool disposing)         {              base.Dispose(disposing);         }     } So let’s get started. First, we need to get our producer. That’s simple enough; we know that we need to have an adapter factory so that’s where we’ll create it and pass it in as an argument of the constructor. We pop this into a private class field – we won’t be calling this until we subscribe. Speaking of subscribe, this is exactly what we need to do in the Start() method. For the producer’s OnCompleted action, we’ll call the adapter’s Stop() method. Yes, you can call Stop() in an adapter and not stop the queries that are getting events from the stream – this is something that may happen with a read-once reference data adapter. It’s not commonly seen in the wild but we’ll make sure that we handle it correctly. One thing that is important to understand here … if you are having StreamInsight generate your CTIs for you with AdvanceTimeSettings, one of your options is to AdvanceToInfinityOnShutdown. This is directly impacted by the adapter calling Stopped() (or the producer, in fact, calling Complete()). When the source shuts down, StreamInsight will enqueue a CTI with a timestamp of DateTimeOffset.MaxValue, which serves to “clear” the stream of any pending events. It also happens to work very nicely if you happen to have a read-once data source of reference data. Constructor & Start public ObservableTypedPointInputAdapter(IObservable<StreamInputEvent<TPayloadType>> eventProducer) {     this._eventProducer = eventProducer; }   public override void Start() {     _subscription = _eventProducer.Subscribe(OnNext,          () =>          {              if (AdapterState == AdapterState.Running)              {                  Stop();              }          }); } Start vs. Resume In a lot of the samples, you’ll see these implemented the exact same way. However, in the real world, you may want to put a little more thought into how you handle Resume(). Odds are pretty good, especially if you are using Premium Edition, that you will never need to actually implement Resume(). It gets called after your adapter is put into a suspended state because the input queue is full. Yes, it is possible to fill up the input queue – you need about 200,000 unprocessed events that StreamInsight can’t get pushed through your queries. I have made this happen – but it took some effort. I actually had to continue enqueueing events just as fast as I could on multiple threads. Even with some convoluted queries, it still took about some time to fill up the input queue. Considering that I’ve seen StreamInsight process over 30,000 events/sec on a dual core i7 laptop and over 100,000 events/sec on a low-to-mid level server with CPU cycles to spare, you will have to work at it. But it can happen. Now, if you are using Standard Edition, you won’t be able to get to that level of throughput (and I haven’t done any testing on Standard’s throughput) but you can still expect to push a goodly number of events through standard edition. Now, when the input queue fills up, StreamInsight will return Full from the enqueue. Your adapter will need to call Ready() and then StreamInsight will call Resume() when you can start enqueuing events again. If you are using the StreamInsight 2.1 Reactive model, you won’t get any notification if the input queue fills. Instead, StreamInsight will simply ignore your events until it’s ready to receive events again. This is what I’m going to do in our excercises but it may not be appropriate, depending on your source and your needs. For example, if you are simply pushing recorded data through the engine, it would be easy enough to pause and then start back up again when Resume() is called. If, however, it’s live data, you may need to do something else. You will want to exercise some caution in buffering up events that you miss – you have no way of knowing when the adapter will start back up again and you could eat up FAR more memory than you want to. It is a potentially difficult issue but, fortunately, one that is pretty tough to hit in the real world. Especially if you are using Premium Edition, you’ll saturate network bandwidth (for inbound data) before you fill up the queue. In our exercise, we’re going to do the same thing that StreamInsight 2.1 does … we’re just going to ignore incoming events until Resume() is called. Back to the code … in the snippet above, I enqueued directly from the OnNext function. While this works – if everything goes right – it doesn’t really represent what we’ll need to do in a real app to handle things like exceptions, queue full and the like. Also, while this class is designed to be a stand-alone input adapter than can be used with any observable source, there may be special case adapters – for example, specific handling of a pause/resume scenario – where we want to override specific behaviors with an inherited class so we want to make sure that we provide the right level of points to override. So we’ll take the enqueue operation and create our own method. In here, we do all of our state checks to make sure that we can enqueue, handle enqueue exceptions and queue full as well as keep track of our last-enqueued CTI. Finally, we will also include a critical section lock around the actual enqueue to make sure that we handline lifetime correctly and don’t report Stopped() when we’re in the middle of an enqueue. It is the same method described in my previous article on output adapter lifetime but applied to input adapters. Enqueue Event protected virtual void EnqueueEvent(StreamInputEvent<TPayloadType> publishedEvent) {     if (!_canEnqueue) return;       if (publishedEvent.Start < _lastCti)     {         return;     }     lock (_lockObject)     {         if (this.AdapterState != AdapterState.Running)         {             return;         }         var point = publishedEvent.GetPointEvent();         try         {             var enqueueResult = this.Enqueue(ref point);             if (enqueueResult == EnqueueOperationResult.Success && publishedEvent.EventKind == EventKind.Cti)             {                 _lastCti = point.StartTime;             }             if (enqueueResult == EnqueueOperationResult.Full)             {                 //Queue full!! Pause enqueuing.                 _canEnqueue = false;                 ReleaseEvent(ref point);                 //Let StreamInsight know we're ready to resume.                 Ready();             }         }         catch         {             ReleaseEvent(ref point);             throw;         }     } } Our last step is to create our adapter factory. In our Create method, we’ll do a couple of checks to make sure that our arguments are valid – the payload is the proper type and, since our random data source only supports points, that we are implementing the correct shape. Input Adapter Factory public sealed class TestDataInputFactory : ITypedInputAdapterFactory<TestDataInputConfig> {       [SuppressMessage("Microsoft.Design", "CA1004:GenericMethodsShouldProvideTypeParameter", Justification = "By Design")]     public InputAdapterBase Create <TPayload> (TestDataInputConfig configInfo, EventShape eventShape)     {           if (typeof(TPayload) != typeof(TestDataEvent))         {             //this won't work.             //throw an exception.             throw new InvalidOperationException("Specified type must be of " + typeof(TestDataEvent).FullName);         }           switch (eventShape)         {             case EventShape.Point:                 //Create the publisher.                 return new ObservableTypedPointInputAdapter<TestDataEvent, TestDataInputConfig>(                     new TestDataProducer(configInfo));             default:                 throw new ArgumentException(string.Format(                     System.Globalization.CultureInfo.InvariantCulture,                     "TestDataInputFactory cannot instantiate adapter with event shape {0}",                     eventShape.ToString()));         }       }       /// <summary>     /// Dispose method.     /// </summary>     public void Dispose()     {     }   } Once we have the adapter factory, we’re ready to go. Because we’ve not done any output adapters yet, we’ll be using the ToPointObservable() method to write to the console. Keep in mind that this works only when hosting StreamInsight in-process; it’s not at all like the IObservable support in 2.1. But it works well enough to show that we have some data flowing through and that our adapter now supports both models from the same data producer! Run Query private static void RunQuery(Application cepApplication) {     var config = new TestDataInputConfig()         {             NumberOfItems = 20,             RefreshInterval = TimeSpan.FromMilliseconds(500)         };       var data = CepStream<TestDataEvent>.Create(cepApplication,         "TestData", typeof (TestDataInputFactory), config, EventShape.Point);       var observable = data.ToPointObservable().Subscribe(         e =>             {                 if (e.EventKind == EventKind.Insert)                 {                     Console.WriteLine("TestEvent ItemId:{0} Run:{1} Value{2}",                                       e.Payload.ItemId, e.Payload.RunNumber, e.Payload.Value);                 }                 else                 {                     Console.WriteLine("CTI @ {0}", e.StartTime);                 }             }); } Using the same pattern, we can also create the adapters for interval and edge events pretty quickly. Unfortunately, a lot of it is copy-paste-tweak. I say unfortunately because it leads to a lot of redundant and repetitive code that I, for one, would prefer to create and maintain in a single place. However, our key method – enqueue – isn’t defined in any common base class; we have to inherit from the shape-specific adapters and there’s no way around it. While we could probably deal with this with another layer of abstraction, there isn’t, IMHO, a whole lot of value to that. Should that change, we’ll take a look at refactoring a bit then. Looking at the input adapter factory, you’ll notice that it does some checking to make sure things are valid before creation; factories are good for that. There’s a bunch of other stuff that could be done in the factory as well and having a standard, generic factory interface is a good way to provide yourself with a clean, clear and decoupled way to create what may be complex underlying objects. And there’s no reason why we can’t revisit our existing 2.1-based code to add the factory in there. It also provides a level of consistency between the two different APIs that can make our life simpler, reduce the amount of debugging that we need to do and promote code sharing. That, however, will have to wait for the next post – I am trying to do this in manageable pieces so I’m not writing a novel with each post. In the meantime, you can download the current project from my SkyDrive.

Dual Mode Data Sources–Part I

Code Sample | StreamInsight
When StreamInsight 2.1 came out, the Big New Thing for the release was the new model for data ingress and egress – sources and sinks. Based on standard interfaces like IObservable and IObserver and the Reactive Extensions, it makes getting data into and out of StreamInsight a whole lot easier. The previous model works (or worked) well enough but, honestly, it was hard … and harder than it really needed to be. Getting the adapter lifetime just right was not a trivial undertaking and getting wrong meant either hanging on shutdown or an ObjectDisposedException. It took me … and my team … some time to get all of this nailed down to a science. The new model handles all of this for you … you just subscribe and publish. The rest is invisible. Very nice. But, as when any new API replaces an old API, there is a time of transition from one to the other. There’s a time when developers or solution providers – like me – need to be able to support both APIs. For us, we actually still (sadly) need to support StreamInsight 1.2 and 2.0 in some cases so we can’t go over to the new model. But we also don’t want to rewrite all of our adapters or maintain two codebases of adapters. Instead, we want to support both from a single code base. Fortunately, this is actually pretty easy to accomplish with a little planning and good architectural practices. I’ll walk you through this – in it’s simplest form – and this code will also be the base for a line of additional articles that I’ve been working on and have planned. In fact, I plan to walk through creating a StreamInsight application step by step and discussing the technical details as we go so, by the end, you’ll have walked with me to build a simple StreamInsight application. Oh … and let me make one thing clear though. I’ve heard it said (from Microsoft field technical sales-type folks) that “you don’t have to write adapters anymore.” That’s not entirely accurate. You don’t have to use the “legacy” adapter API anymore. But you will have to write code to get data in to and out of StreamInsight. You may use a different API now (IObserver/IObservable) but the necessity of writing code for this task is still there. Call it what you want ‘cuz now we’re just arguing semantics. Sources/Input Adapters Source is the new name for input adapter. With StreamInsight 2.1, you’ll want to implement IObservable. I won’t go over the basics; you’ll find them in the documentation. But there are some things that you need to consider that aren’t in the documentation. First … CTI's (and I’ll be talking more about these in a future post or two). While you can use AdvanceTimeSettings to have StreamInsight generate your CTIs for you, this isn’t always appropriate to do. So we need to have a way to have CTIs created and enqueued by the adapters … and it needs to be optional. So we’ll start with an wrapper class that allows a data source to create both events (with payloads) and CTIs. This will be our StreamInputEvent class. It will have all of the appropriate temporal properties and event shape metadata as well as the payload. StreamInputEvent public class StreamInputEvent<TPayloadType>     {         public TPayloadType Payload { get; set; }         public DateTimeOffset Start{ get; set; }         public DateTimeOffset End{ get; set; }         public EdgeType EdgeType{ get; set; }         public EventKind EventKind{ get; set; }     } We’ll also add some helper methods to create the different event shapes as well as a constructor or two: Helpers & Constructors public PointEvent<TPayloadType> GetPointEvent() {     if (this.EventKind == EventKind.Insert)     {         return PointEvent<TPayloadType>.CreateInsert(this.Start, Payload);     }     return PointEvent<TPayloadType>.CreateCti(Start); }   //More helpers for Edge and Interval   public StreamInputEvent(DateTimeOffset ctiDateTime) {     EventKind = EventKind.Cti;     Start = ctiDateTime;     EdgeType = EdgeType.Start;     End = DateTimeOffset.MaxValue;     Payload = default(TPayloadType); }   public StreamInputEvent(TPayloadType payload) {     Start = DateTimeOffset.MinValue;     End = DateTimeOffset.MaxValue;     EdgeType = default(EdgeType);     Payload = payload;     EventKind = EventKind.Insert; }   public StreamInputEvent(TPayloadType payload, DateTimeOffset startTime, DateTimeOffset endTime, EdgeType edgeType) {     Start = startTime;     End = endTime;     EdgeType = edgeType;     Payload = payload;     EventKind = EventKind.Insert; } Now, you’ll note that there isn’t an EventShape specified. The shape can be determined from the event times and we’ve found it useful to have data sources that can adapter to whatever shape we deem fit for a particular use case. But we’ll still have a factory (regardless of how we do it) that allows us to limit a particular data source to specific shapes. And the helpers make it really, really easy when we create the IQStreamable for the temporal processing. Now that we have that done, let’s take a step back and look at this. We have a data source of some type and we need to get the data into StreamInsight. The wonderful thing about StreamInsight 2.1’s focus on IObservable is that he lets use really step back for a second and look at how we produce data. In fact, it allows use to “hide” our producers behind a standard interface … and that’s what we need to do. At the lowest level, the code closest to the data source, we’ll create a “data producer” that takes the data and publishes it via IObservable – so there’s our ‘native’ 2.1 interface. To support the legacy adapter model, we will just need to subscribe to this with the concrete Input Adapter class. So let’s get started by creating a base EventProducer class to handle subscribers and disposal. We’ll also need to make sure that we can specify the configuration for the event producer – just like we were forced to do with the legacy adapter model. The reality is simple … unless you are doing a really, incredibly simple demo, you need to have a way to specify configuration. Having a configuration class that is passed to the event producer from the application just makes sense. StreamEventProducer public abstract class StreamEventProducer<TPayloadType, TConfigType>     : IObservable<StreamInputEvent<TPayloadType>>, IDisposable {     public TConfigType Configuration { get; protected set; }       protected EventProducerBase(TConfigType configuration)     {         this.Configuration = configuration;     }       protected abstract void Start();       private IObserver<StreamInputEvent<TPayloadType>> _observer;       public virtual IDisposable Subscribe(IObserver<StreamInputEvent<TPayloadType>> observer)     {         this._observer = observer;         Start();         return this;     }       protected void PublishException(Exception ex)     {         _observer.OnError(ex);     }       protected void PublishEvent(StreamInputEvent<TPayloadType> newEvent)     {         _observer.OnNext(newEvent);     }       protected virtual void Dispose(bool disposing)     {         if (disposing)         {             _observer.OnCompleted();         }     } } If you look at this, you’ll see that it’s pretty simple. You can certainly get more sophisticated with this - by supporting multiple observers to share event producers between multiple sources and/or adapters – but I want to keep it simple and clear. Our concrete event producer will inherit from this class, implement Start() (called when a subscriber actually subscribes) and call the PublishException and PublishEvent as appropriate. Our event producer will also need want to override Dispose(bool disposing) to get notification if when it’s time to shut down. So let’s go ahead now and create our first event producer. For our sample here, it will be a very simple random data generator because, of course, everyone needs to analyze completely random data in StreamInsight. Well, no of course not, but it’s easy and simple. We’ll start with the configuration class. TestDataInputConfig [DataContract()] public class TestDataInputConfig {       public TestDataInputConfig()     {         RefreshInterval = TimeSpan.FromMilliseconds(500);         NumberOfItems = 10;         TimestampIncrement = TimeSpan.FromMinutes(5);         StartDateTime = DateTimeOffset.Now.AddMonths(-5);     }       [DataMember]     public DateTimeOffset StartDateTime { get; set; }       [DataMember]     public TimeSpan RefreshInterval { get; set; }       [DataMember]     public TimeSpan TimestampIncrement { get; set; }       [DataMember]     public int NumberOfItems { get; set; }      } Note that it is marked as DataContract and each property is marked with the DataMember attribute … this is very important. The configuration classes may – and probably will - end up being serialized. This was also true in the adapter model; all of your configuration classes need to be able to be serialized. Now you can also use Serializable() but I’ve found that, at times, the C# compiler goes somewhat insane when you do that and you get exceptions when serializing the configuration classes. Now for our payload class. We have an ItemId and a Value of type double and, since this class is for testing and we want to be able to “see” what’s going on (and for other reasons), we also have some properties on here to help with that – RunNumber (which generation “run” created it) and EnqueueTimestamp (the time it was actually created, which may be different from the event’s start time). Finally, we also have a static helper method that’ll create a bunch of these test events for enqueuing for the “run”. TestDataEvent public class TestDataEvent {     private static Random rdm = new Random();     public static List<TestDataEvent> CreateNext(TestDataInputConfig config, int runNumber)     {         List<TestDataEvent> newReferenceData =             new List<TestDataEvent>(config.NumberOfItems);           for (int i = 0; i < config.NumberOfItems; i++)         {             newReferenceData.Add(new TestDataEvent()                     {                         ItemId = "Item" + i.ToString(),                         RunNumber = runNumber,                         EnqueueTimestamp = DateTime.Now,                         Value = rdm.NextDouble() * 10                     });         }                  return newReferenceData;     }       public string ItemId { get; set; }     public int RunNumber { get; set; }     public DateTime EnqueueTimestamp { get; set; }     public double Value { get; set; } } Now that we have all of the foundational building blocks completed, we are finally ready to create our producer. For now, we’ll have our producer enqueue the CTIs after each “batch” of events; this keeps all of the events in the same “run” within the same CTI span. TestDataEventProducer public class TestDataProducer : StreamEventProducer<TestDataEvent, TestDataInputConfig>     {           private DateTimeOffset _nextStartTime;         private Timer _enqueueTimer;         private int _runNumber = 0;           public TestDataProducer(TestDataInputConfig config): base(config)         {             _enqueueTimer = new Timer(ProduceEvents, null, Timeout.Infinite, Timeout.Infinite);             this._nextStartTime = config.StartDateTime;         }           protected override void Start()         {             // Change the timer to start it.             _enqueueTimer.Change(TimeSpan.Zero, Configuration.RefreshInterval);         }           /// <summary>         /// Main driver to read events from source and enqueue them.         /// </summary>         private void ProduceEvents(object state)         {             _runNumber++;               var newEvents =                 TestDataEvent.CreateNext(Configuration, _runNumber);               var eventTimestamp =  _nextStartTime;               var publishEvents = (from e in newEvents                                  select new StreamInputEvent<TestDataEvent>(e)                                             {                                                 Start = eventTimestamp                                             }).ToList();               foreach (var publishedEvent in publishEvents)             {                 this.PublishEvent(publishedEvent);             }               //Enqueue our CTI             this.PublishEvent(new StreamInputEvent<TestDataEvent>(eventTimestamp.AddTicks(1)));             _nextStartTime += Configuration.TimestampIncrement;           }           protected override void Dispose(bool disposing)         {             if (disposing)             {                 if (_enqueueTimer != null)                 {                     _enqueueTimer.Dispose();                     _enqueueTimer = null;                 }             }             base.Dispose(disposing);         }         } At this point, we are actually ready to start using the StreamInsight 2.1 APIs to get data from our new producer! Of course, we aren’t done but let’s see how we could do this. It’s there that the GetPointEvent() helper method is really, really handy. It makes the code very readable and it happens to be highly reusable as well, both of which are Very Good Things™. Running the Process ... var config = new TestDataInputConfig (){     NumberOfItems=20,     RefreshInterval=TimeSpan.FromMilliseconds(500) };   var data = cepApplication.DefineObservable(     () => new TestDataProducer(config)).ToPointStreamable(e => e.GetPointEvent());     var sink = cepApplication.DefineObserver(() => Observer.Create<TestDataEvent>(     e => Console.WriteLine("TestEvent ItemId:{0} Run:{1} Value{2}", e.ItemId, e.RunNumber, e.Value)));   data.Bind(sink).Run(); And when we do that, we get our data flowing. You can download the code from my SkyDrive.

Where does StreamInsight data come from?

I got this question at Sql Saturday BI Edition in Dallas not too long ago. My answer was the usual one … “Well, you write an input adapter or (in 2.1) a source in .NET …”.  “Yes, I know that,” the asker replied, “but where does it come from?” “Ummm …” I was getting a little flustered. “Anything you can get to with .NET.” “You aren’t helping,” he replied … and proceeded to tell me that I’m giving him the same answer that everyone has given him and it’s not really an answer. Now, this wasn’t just some muffin-eater that you see at free conferences sometimes where you find yourself wondering if they should be allowed to use a computer at all. No, it came from fellow MVP and Microsoft Certified Master Sean McCown of Midnight DBA fame. I’d sat in on Sean’s sessions before and knew that he was a little smarter (at least) than your average bear. So I knew I was missing something. I scratched my head. And then went into a discussion … and whiteboarding … of how OSISoft’s PI System Adapters work. It’s been some time since then and I’ve been meaning to write this follow up. You see, it finally dawned on me as Sean was ranting about the answer not being an answer that, well, it wasn’t really an answer. The whole getting-data-to-StreamInsight thing has been something like the Underwear Gnomes from South Park. Step 1: Get Data Source. Step 2 … … … Step 3: Write Adapter! What’s missing is step 2 … how does the data get from your data source into the adapter/source code. In reality, it does vary pretty widely, depending on what your data source is. Determining the Data You Want Your first step is to determine the data that you want to get into StreamInsight. You’ll have two primary types of data: fast, stream data for analysis and slow-moving, reference data. For your stream data, you will typically want to get it as close to the original source of this data as possible – within reason. As we get further into this, it’ll make more sense what I’m talking about here. You also want to keep this data as lightweight as possible. There’s no point in sending extra bits over the wire that you really don’t need with every update. Other data, the stuff that doesn’t change (for example, with a pressure sensor you may have units of measure, device type, subsystem, etc.) is reference data. You’ll still need it but you won’t want it with every single individual update. Depending on the data rates and the number of individual data points that you want, it’s entirely conceivable that you’ll run out of network bandwidth before you run out of StreamInsight processing power. You’ll also want to have a minimum number of sources/input adapters as possible so if you can get reading from multiple items/devices in a single adapter, all the better. With that said, let’s talk about some of the options for data sources. Raw Device Connections Yup, you can do this. Well … as long as you have a way to get to it. Connecting directly to the device will be the fastest way to get the highest speed data possible. Now, exactly how you do that depends on the device. Some process control system sensors will transmit over dedicated and special-purpose protocols like Modbus or Profibus. They may ride on TCP, UDP, serial ports or even dedicated network connection require special hardware. Or you may have a completely custom and proprietary protocol. At any rate, you’re getting data direct from the device in its most raw form. Depending on the device and its capabilities, this may be a push or it may be a poll. And, as a result, you’ll need to take the bytes off the wire and deserialize them into events for StreamInsight. Let me give you an example of something that we’re currently working on. In this system, we’re actually getting data from building sensors via a push … things like fire alarms and door access devices. There is a common protocol for these devices but it’s all binary. The first byte gives you a header that tells you what kind of message it is and a timestamp. From there, you can determine how to read the rest of the bytes. The structure is documented and it’s a matter of taking the bytes, rearranging them and then enqueuing them. It’s not hard but it is tedious and you really have to pay attention to little details since you don’t have a nice object API to work with. Device Aggregation Systems With “Push” No, that’s not the formal name but it’s about the best “bucket” that I can come up with. Again, this covers a wide variety of types of systems that will aggregate raw device signals and then provide notifications of updates. It’s a layer above the raw device connection so you’ll lose a little bit of latency but the advantages are huge. Since you can aggregate multiple devices into a single connection, you minimize the number of adapters that you need to have running at any one time. Nor will you have to worry about fiddling with byte arrays and deserializing them into usable data – these systems will have APIs that do a lot of the heavy lifting for you. I’d put things like OPC and OSISoft’s PI Adapters for StreamInsight into this bucket. OPC has the advantange of being very widely deployed and accepted. Unfortunately, the most common implementation is the (very) old OPC-DA, which is based on (the horror!) DCOM. However … it does work very well. With OPC-DA, you create a connection to the OPC DCOM server and then subscribe for callbacks via old school COM events. As new readings come in, the OPC server sends your adapter/source the updates, which are then enqueued. OSISoft’s PI Adapters are similar except they are all .NET. Now … if you are familiar with PI, you also know that it’s a historian – it stores event data long-term and, to do this efficiently uses compression algorithms that amount to downsampling. (You can, in fact, use OLEDB to get to the PI archive for historical data.) However, the adapters don’t read from the archive. As the data comes into PI from the individual sensors, it goes into the Snapshot. Every bit of data that the PI Server gets goes into the Snapshot, whether it’s actually stored or not. It’s after the Snapshot that it goes into the compression algorithms for efficient storage. The PI adapters connect to the Snapshot and register for notifications; as data comes into PI and goes into the Snapshot, it is then sent to the input adapter and then to StreamInsight. Polling a Store (Store-And-Forward) You don’t want to do this if you can avoid it. But you can’t always avoid it. Sometimes you will want to sit alongside an existing system that collects data in a database and analyze it with StreamInsight temporal operators or correlate it with other real-time events. Sometimes the architecture and nature of the systems involved really require this. Standards like WITSML and PRODML come to mind what mentioning this. You will want to balance the amount of polling that you do with your latency requirements and you’ll have to keep in mind that there will be latency involved. As with other data, you want to only get the least amount of data so your source will need to have some way for you to select all since last poll. It’s actually the easiest to do but the least desirable from a StreamInsight perspective. SQL Server Service Broker is one area that may make some of this a little better from a Sql Server perspective and one that, honestly, I need to really spend some time looking at. Reference Data Your reference data is going to come from some sort of durable store like Sql Server, Oracle or web services. This is the metadata that you use in your queries but that doesn’t change very often. You’ll want to set this up as a poll but with a very long interval. Exactly how long will vary based on your use cases … every few minutes or every hour or even longer. Depending on the amount of data that you have, you may also want to look at only getting changes since last poll … again, minimizing the amount of data being pulled across the wire is always a Very Good Thing™. The pattern for reference data is pretty well defined and hasn’t changed at all … I still refer folks to Mark Simms’ post of 2 years ago. With StreamInsight 2.1, some things have changed with importing CTIs (which I will look at again later) but the end result is still the same. So … hopefully I’ve fleshed out and clarified that mysterious step 2. In the end, it really depends on how you can get to the data that you want to analyze. The aggregation with push is what I try to lean towards … it’s relatively low latency and gives you a lot of bang for the buck. In the real world, you don’t want to be writing adapters for every device under the sun, especially when you can connect with OPC or OSISoft PI and leverage the extensive connectivity that you’ll have with these established platforms.

Houston TechFest Sessions

Community | StreamInsight | .NET Stuff | User Groups
Houston TechFest is tomorrow at the Reliant Center. And, of course, I will be speaking. This year, I am doing two sessions: Back to Basics: .NET Essentials You May Have Forgotten (3:40 PM – 4:40 PM) Now that the .NET Framework has been out for 10 years and so many tools are available to make things easier, many developers seem to have forgotten some of the core principles of .NET. In this presentation, I'll review the fundamentals of the .NET Framework and what makes all of that magic happen. From the CLR to IL to memory management and garbage collection, we'll touch on these core concepts that every developer should know. I originally did this presentation at HDNUG and it was very well received – better than I had thought it would be. However, because the timeframe is a bit more limited at TechFest vs. HDNUG, I’ll need to pare this down a little bit so I don’t go over. Introducing StreamInsight 2.1 (4:50 PM – 5:50 PM) StreamInsight provides real-time analytics on large volumes of data with a response time measured in milliseconds, bringing a new level of capabilities to the Sql Server platform. This session will provide an overview of StreamInsight, its capabilities and use cases. It will also provide details on StreamInsight 2.1, what's been added and how it makes your real-time applications easier and more robust. I originally delivered this at Sql Saturday in Baton Rouge. I’ve tweaked and honed it a bit. Some will be familiar to my other StreamInsight intro presentations – how it works, etc. There are some updates on more use cases that we’ve seen “out in the wild” in the past year or so. StreamInsight is coming into its own now too … we are seeing more and more interest and adoption as folks get a better understanding of where it can add to their business. I hope to see you there!

Importing CTIs with IQStreamable/IQbservable in StreamInsight 2.1

Code Sample | StreamInsight
This came up on the StreamInsight forum … actually from one of the guys on my team. This is pretty important … since StreamInsight syncs to the slowest stream, if you have a slow-moving reference stream (as described so well here), you need to import the CTIs from your faster data stream into the slower stream to keep things moving along as the proper pace. We use this pattern all the time and ran into this when we were converting an existing demo to use the 2.1 model. Here’s what you need to do. First, you have your reference stream implemented and exposed as a simple Observable … you can use the .NET 4.0 IObservable interface for this; no special thing is needed. Your data stream should be exposed as an IQbservable or IObservable (you can get to IQStreamable from there) and the IObservable should include the CTIs. This means that your data source for the data stream needs to enqueue CTIs … you won’t really be able to use AdvanceTimeImportSettings.IncreasingStartTime/StrictlyIncreasingStartTime (since the CTIs aren’t exposed) nor will you be able to return as an IQStreamable and then switch it to a IQbservable (you’ll only get the payload). The first thing that we’ll need to do is to create a wrapper class that has both the payload and the temporal information and can also represent a CTI. Here’s an example: Typed Event Class     public TypedEvent<TPayloadType> GetEvent(EventShape eventShape)     {         switch (eventShape)         {             case EventShape.Interval:                 return GetIntervalEvent();                              case EventShape.Edge:                 return GetEdgeEvent();                              case EventShape.Point:                 return GetPointEvent();                              default:                 throw new ArgumentOutOfRangeException("eventShape");         }     }       public PointEvent<TPayloadType> GetPointEvent()     {         if (!this.IsCti)         {             return PointEvent<TPayloadType>.CreateInsert(this.Start, Payload);         }         return PointEvent<TPayloadType>.CreateCti(Start);     }         public IntervalEvent<TPayloadType> GetIntervalEvent()     {         if (!this.IsCti)         {             return IntervalEvent<TPayloadType>.CreateInsert(this.Start, this.End, Payload);         }         return IntervalEvent<TPayloadType>.CreateCti(Start);     }       public EdgeEvent<TPayloadType> GetEdgeEvent()     {         if (this.IsCti)         {             return EdgeEvent<TPayloadType>.CreateCti(this.Start);         }           if (this.EdgeType == EdgeType.Start)         {             return EdgeEvent.CreateStart(this.Start, this.Payload);         }         return EdgeEvent.CreateEnd(this.Start, this.End, this.Payload);     }       public PublishedEvent(DateTimeOffset ctiDateTime)     {         IsCti = true;         Start = ctiDateTime;         EdgeType=EdgeType.Start;         End = DateTimeOffset.MaxValue;         Payload = default(TPayloadType);     }       /// <summary>     /// Initializes a new instance of the <see cref="PublishedEvent&lt;TEventType&gt;"/> struct.     /// </summary>     /// <param name="payload">The Payload.</param>     /// <remarks>          ///This sets the start to <see cref="DateTimeOffset">DateTimeOffset.MinValue</see>,          ///the end to <see cref="DateTimeOffset">DateTimeOffset.MaxValue</see>          ///and the edge type to the default value.<br/>          ///The subscriber is responsible for explicitly setting these parameters.     /// </remarks>     public PublishedEvent(TPayloadType payload)     {                    Start = DateTimeOffset.MinValue;         End = DateTimeOffset.MaxValue;         EdgeType = default(EdgeType);         Payload = payload;         IsCti = false;     }       /// <summary>     /// Initializes a new instance of the <see cref="PublishedEvent&lt;TEventType&gt;"/> struct.     /// </summary>     /// <param name="payload">The event payload.</param>     /// <param name="startTime">The start time.</param>     /// <param name="endTime">The end time.</param>     /// <param name="edgeType">Type of the edge.</param>     /// <remarks>          ///Use when the event publisher "knows" the appropriate start time, end time and/or the edge type.          ///<br/>          ///This is useful when publishing Payload that carry this information.     /// </remarks>     public PublishedEvent(TPayloadType payload, DateTimeOffset startTime, DateTimeOffset endTime, EdgeType edgeType)     {         Start = startTime;         End = endTime;         EdgeType = edgeType;         Payload = payload;         IsCti = false;     }       /// <summary>     /// Gets or sets the Payload.     /// </summary>     /// <value>     /// The Payload.     /// </value>     public TPayloadType Payload;     public DateTimeOffset Start;     public DateTimeOffset End;     public EdgeType EdgeType;     public bool IsCti;   } There are “helper” methods on the class to create the appropriate events from the wrapper for convenience. It makes the ToStreamable method a little simpler and clearer. Now that we have that, we need to define an event publisher. This exposes IObservable and provides the events to StreamInsight. It’s pretty simple and, like an adapter, has a configuration that is passed in. In our case, the configuration has the number of items to create and a refresh interval … how frequently new events are created. This also allows us to use one publisher for both the fast and slow moving events. Configuration class [DataContract()] public class TestDataInputConfig {       public TestDataInputConfig()     {         RefreshInterval = TimeSpan.FromMilliseconds(500);         NumberOfItems = 10;     }       [DataMember]     public string Name { get; set; }       [DataMember]     public TimeSpan RefreshInterval { get; set; }       [DataMember]     public int NumberOfItems { get; set; }   } In the sample, this is a test data generator … it creates a payload that contains an Item Id, a run number (which iteration of the input) and a string so that we know a) where it came from and b) some payload info. Again, very simple but helps us validate that we are getting the results that we expect. And by implementing IObservable, we can publish this to StreamInsight using the new DefineObservable method. Since each published event has the payload and temporal header information, it’s easy to then convert the IQbservable to a IQStreamable and get it going with temporal properties. Test Data Event public class TestDataEvent {     public static List<TestDataEvent> CreateNext(TestDataInputConfig config, int runNumber)     {         List<TestDataEvent> newReferenceData =             new List<TestDataEvent>(config.NumberOfItems);           for (int i = 0; i < config.NumberOfItems; i++)         {             newReferenceData.Add(new TestDataEvent()                                      {                                          Source = config.Name,                                          ItemId = "Item" + i.ToString(),                                          RunNumber = runNumber,                                          EventText = "Text for Item " + i.ToString() + " Run Number " + runNumber                                      });         }         return newReferenceData;     }       public string Source { get; set; }     public string ItemId { get; set; }     public int RunNumber { get; set; }     public string EventText { get; set; }   } The publisher implements IObservable and creates events based on a timer. And it doesn’t start the timer until Subscribe is called … if we start producing them in the constructor, we’ll miss the first round of events and we don’t want to do that. Data Publisher public class TestDataPublisher:     IObservable<PublishedEvent<TestDataEvent>>,     IDisposable {       private Timer _enqueueTimer;     private int _runNumber = 0;     private readonly TestDataInputConfig _config;       private readonly Dictionary<string, IObserver<PublishedEvent<TestDataEvent>>>         _instances = new Dictionary<string, IObserver<PublishedEvent<TestDataEvent>>>(50);          public TestDataPublisher(TestDataInputConfig config)     {         _config = config;         _enqueueTimer = new Timer(ProduceEvents, null, Timeout.Infinite, Timeout.Infinite);     }       /// <summary>     /// Main driver to read events from source and enqueue them.     /// </summary>     private void ProduceEvents(object state)     {         _runNumber++;         var newEvents =             TestDataEvent.CreateNext(_config, _runNumber);           var publishEvents = (from e in newEvents                              select new PublishedEvent<TestDataEvent>(e)                                  {                                      Start = DateTimeOffset.Now                                  }).ToList();           foreach (var observer in _instances.Values.AsParallel())         {             foreach (var publishedEvent in publishEvents)             {                 observer.OnNext(publishedEvent);             }             observer.OnNext(new PublishedEvent<TestDataEvent>(DateTimeOffset.Now.AddTicks(1)));         }              }         /// <summary>     /// Notifies the provider that an observer is to receive notifications.     /// </summary>     /// <returns>     /// A reference to an interface that allows observers to stop receiving notifications before the provider has finished sending them.     /// </returns>     /// <param name="observer">The object that is to receive notifications.</param>     public IDisposable Subscribe(IObserver<PublishedEvent<TestDataEvent>> observer)     {         string instanceId = Guid.NewGuid().ToString();         _instances.Add(instanceId, observer);         observer.OnNext(new PublishedEvent<TestDataEvent>(DateTimeOffset.Now.AddMonths(-5)));         if(_instances.Count == 1)         {             _enqueueTimer.Change(TimeSpan.FromSeconds(0), _config.RefreshInterval);                }         return Disposable.Create(() => Unsubscribe(instanceId));     }       public void Unsubscribe(string id)     {         _instances.Remove(id);     }       #region Disposable Pattern     // (removed for brevity)     #endregion } The magic, though, happens in the AdvanceTimeImporter. It’s kind of a subject except that it observes both streams. It produces a single observable that has events from the event stream (with reference data) and the CTI stream. For the CTI stream, it is just an IObservable of DateTimeOffsets … so the CTIs don’t necessarily need to come from the moving data stream; they could be generated on a “clock”. The event stream doesn’t need to publish CTIs but, if it does, we exclude them (they’d just muddy the waters). We also have the option (as with the AdvanceTimeImportSettings) to adjust or drop events from the event stream that violate CTI rules. However, there’s on little implementation exception … when the value is Adjust, we adjust all of the event start times, not just those intervals that cross into the CTI span, ensuring that all of our data events get into the output (merged) stream. Advance Time Importer //NOTE: need to put some thought around how to handle OnCompleted/OnError from the observables. public class AdvanceTimeImporter<TDataStreamEventType>:IDisposable,     IObservable<PublishedEvent<TDataStreamEventType>> {       private IObservable<PublishedEvent<TDataStreamEventType>> _dataStreamObservable;     private IObserver<PublishedEvent<TDataStreamEventType>> _observer;     private readonly IDisposable _eventStreamDisposable, _ctiStreamDisposable;     private readonly TimeSpan _dataStreamRefreshInterval;     private readonly AdvanceTimePolicy _importPolicy;       public AdvanceTimeImporter(         IObservable<PublishedEvent<TDataStreamEventType>> dataStream,         IObservable<DateTimeOffset> ctiStream,         AdvanceTimePolicy importPolicy)     {         _dataStreamObservable = dataStream;         _ctiStreamDisposable = ctiStream.Subscribe(NextCtiItem);         _importPolicy = importPolicy;         _eventStreamDisposable = _dataStreamObservable.Where(e=> !e.IsCti ).Subscribe(NextDataItem);     }       private DateTimeOffset _lastCtiTimestamp = DateTimeOffset.MinValue;       private void NextDataItem(PublishedEvent<TDataStreamEventType> dataEvent)     {         if(dataEvent.IsCti )         {             //Don't publish the CTIs from the data stream.             return;         }         //Check for CTI violations but not for edge end.         if (dataEvent.Start >= _lastCtiTimestamp && dataEvent.EdgeType != EdgeType.End)         {             PublishOnNext(dataEvent);             System.Diagnostics.Debug.WriteLine("Data ... no CTI Violation");         }         else if (_importPolicy == AdvanceTimePolicy.Adjust)         {                //Adjust ... but not for end edges.             if (dataEvent.Start <= _lastCtiTimestamp)             {                 dataEvent.Start = _lastCtiTimestamp.AddTicks(1);             }             PublishOnNext(dataEvent);             System.Diagnostics.Debug.WriteLine("Data ... adjusted for CTI Violation");         }     }       public IDisposable Subscribe(IObserver<PublishedEvent<TDataStreamEventType>> observer)     {         _observer = observer;         if (_importPolicy == AdvanceTimePolicy.Adjust)         {             //add a starter CTI.             NextCtiItem(_lastCtiTimestamp);         }           //Release our reference to the observable.         _dataStreamObservable = null;         return this;     }       private void NextCtiItem(DateTimeOffset cti)     {         if(cti >= _lastCtiTimestamp)         {             _lastCtiTimestamp = cti;             PublishOnNext(new PublishedEvent<TDataStreamEventType>(cti));         }     }          private void PublishOnNext(PublishedEvent<TDataStreamEventType>  publishedEvent)     {         if (_observer != null)         {             _observer.OnNext(publishedEvent);         }     }     #region IDisposable implementation     // (removed for brevity)     #endregion } That’s really it. I also created an extension method for IObservable that makes this a bit simpler and clearer to call. One thing that I do in the extension method is to use the Reactive Extensions Replay subject to make sure that latecomers to the Observable get the last batch of data that was published. Implementing the queries is pretty straightforward now. Import CTI Queries var inputConfig = new TestDataInputConfig()     {         Name = "DataStream",         RefreshInterval = TimeSpan.FromMilliseconds(500)     }; var referenceConfig = new TestDataInputConfig()     {         Name = "ReferenceStream",         RefreshInterval= TimeSpan.FromSeconds(15)     };   var dataSource = myApp.DefineObservable(() => new TestDataPublisher(inputConfig));   var ctis = dataSource.Where(e => e.IsCti).Select(e => e.Start);   var referenceStream = myApp.DefineObservable(() =>         new TestDataPublisher(referenceConfig).ImportCTIs(ctis, referenceConfig.RefreshInterval,         AdvanceTimePolicy.Adjust))     .ToIntervalStreamable(e => e.GetIntervalEvent());   var dataStream = dataSource.ToPointStreamable(e => e.GetPointEvent());   var outputStream = from d in dataStream                    from r in referenceStream                         .AlterEventDuration(e=> TimeSpan.MaxValue)                         .ClipEventDuration(referenceStream, (r1,r2) => r1.ItemId == r2.ItemId)                    where d.ItemId == r.ItemId                    select new ResultClass()                               {                                   DataRunNumber = d.RunNumber,                                   ItemId = d.ItemId,                                   ReferenceString = r.EventText,                                   DataSource = d.Source,                                   ReferenceSource = r.Source                               };   var outputDataSink = GetOutputDataSink(myApp); outputStream.Bind(outputDataSink).Run("AdvanceTimeImport.Test"); In the sample, I’m using the old-school Tracer Adapter defined as a sink … that’s easy enough to change if you want to. Here’s the sample:

Speaking at Sql Saturday in Baton Rouge

Community | User Groups | StreamInsight
Next Saturday, August 4th, I will be speaking at Sql Saturday #150 in Baton Rouge. As usual, I will be talking about StreamInsight. In this case, I’ll be talking about StreamInsight 2.1 and the new adapter model based on Observables and the Reactive Extensions. I hope to see you there! In other notes, I know I’ve been bad about blogging lately. I also know that I’ve not kept up to my intended schedule. I do have a lot of stuff queued up to write about and I’m working on the first one. I’m planning a series of posts that will build on each other and exploring several different topics with StreamInsight. I’m not going to spoil the surprise though … you’ll have to come back to read them but I do promise that they will be good, in-depth articles that detail pieces of a real-world architecture of a StreamInsight application.

Listing installed StreamInsight instances …

StreamInsight | Code Sample
This is one of those things that I’ve always had some manner of difficulty with as I move between computers writing StreamInsight code. You see, I have … let’s see … four different computers that I write code on and I will move projects between them. (Yes, I do have an instance of TFS running at home for some personal stuff, which certainly makes things easier as I hop between the computers.) The problem, of course, boils down to remembering what instances of StreamInsight are installed and what versions each instance is. Sure, I can go to Add/Remove Programs and do the maintenance setup for StreamInsight to see what instances are there … but that’s kinda painful. So, after some digging, I discovered that all of the instances are listed in the registry.   So … as any developer would do, I fired up Visual Studio and threw together a little app that would list all of the installed instances of StreamInsight, what edition they are, what version and what architecture (x86 or x64). It was really quite simple, actually. Now … I’m not going to say that the UI is the prettiest thing in the world (it’s not) but it does work and it does what it needs to do. Below is a screen shot: You can download the project from my SkyDrive. All you need is Visual Studio to build … there are no dependencies on StreamInsight. It’s all read from the registry.