Ruminations of J.net idle rants and ramblings of a code monkey

Dual-Mode Data Sinks - Part I

Code Sample | StreamInsight
Now that we have the input completed, we need to start working on the output adapters. As with the input adapters/sources, we’ll create an architecture that allows you to use the same core code whether you are using the pre-2.1 adapter model or the 2.1 and later sink model. As with our StreamInputEvent, we’ll create an abstraction that allows us to handle any event shape with the same code. This StreamOutputEvent should have properties that express all of the possible variations in event shapes as well as a generic property to hold the payload. Now, if you look at several of the StreamInsight samples, you’ll notice that they only send the payload to the sink. Certainly, that makes a couple of things easier but I don’t think that it’s really the best way to do things. A key aspect of everything StreamInsight is the temporal properties and you lose that if you don’t send the full event shape to your sink. And … the shape is really only a way of expressing the event. Internally, all events, regardless of shape, have start and end times that control their lifetime in the query engine. The shape really becomes a way of how you want to “see” the events and handle them in your output. There’s nothing that stops you from expressing the same query as an edge, a point or an interval. It will impact when the event gets “released” to the output adapters/sink by the engine but really, it’s just a matter of how you want to see the event and when you want to get it to your output. But, before I go any further, here’s our StreamOutputEvent: StreamOutputEvent publicclassStreamOutputEvent<TPayload> {     ///<summary>     /// Creates an output event from a source event.     ///</summary>     ///<param name="sourceEvent">The source event.</param>     ///<returns></returns>     publicstaticStreamOutputEvent<TPayload> Create(PointEvent<TPayload> sourceEvent)     {         var outputEvent = newStreamOutputEvent<TPayload>()             {                 StartTime = sourceEvent.StartTime,                 EventKind = sourceEvent.EventKind,                 EventShape = EventShape.Point             };         if (sourceEvent.EventKind == EventKind.Insert)         {             outputEvent.Payload = sourceEvent.Payload;         }         return outputEvent;     }     ///<summary>     /// Creates an output event from a source event.     ///</summary>     ///<param name="sourceEvent">The source event.</param>     ///<returns></returns>     publicstaticStreamOutputEvent<TPayload> Create(IntervalEvent<TPayload> sourceEvent)     {         var outputEvent = newStreamOutputEvent<TPayload>()         {             StartTime = sourceEvent.StartTime,             EventKind = sourceEvent.EventKind,             EventShape = EventShape.Interval         };         if (sourceEvent.EventKind == EventKind.Insert)         {             outputEvent.EndTime = sourceEvent.EndTime;             outputEvent.Payload = sourceEvent.Payload;         }         return outputEvent;     }     ///<summary>     /// Creates an output event from a source event.     ///</summary>     ///<param name="sourceEvent">The source event.</param>     ///<returns></returns>     publicstaticStreamOutputEvent<TPayload> Create(EdgeEvent<TPayload> sourceEvent)     {         var outputEvent = newStreamOutputEvent<TPayload>()         {             StartTime = sourceEvent.StartTime,             EventKind = sourceEvent.EventKind,             EventShape = EventShape.Edge         };         if (sourceEvent.EventKind == EventKind.Insert)         {             outputEvent.Payload = sourceEvent.Payload;             outputEvent.EdgeType = sourceEvent.EdgeType;             if (sourceEvent.EdgeType == Microsoft.ComplexEventProcessing.EdgeType.End)             {                 outputEvent.EndTime = sourceEvent.EndTime;             }         }         return outputEvent;     }     publicDateTimeOffset StartTime { get; privateset;  }     publicEventKind EventKind { get; privateset; }     publicDateTimeOffset? EndTime { get; privateset;  }     publicEventShape EventShape { get; privateset;  }     publicEdgeType? EdgeType { get; privateset; }     public TPayload Payload { get; privateset; } } With constructors for each different event shape, this is something that we can easily create from an event stream and then send to a single set of code that handles the outbound event. When creating the sink and hooking it to the stream, it’s a matter of how you create your observer and the type specified for the TElement. Very simply, the key thing that dictates what StreamInsight “sends” to your sink is the type for the observer’s generic class parameter. If you specify IObserver<TPayload>, you’ll only get the payload. However, if you specify IObserver<PointEvent<TPayload>>, you’ll get a point event (and so on for intervals and edges). Since our event consumer should be able to consume events of any shape, we will actually need to implement the observer interface for each of the shapes. While it may be tempting to try to implement one interface based on TypedEvent<T>, it won’t work. Yes, I tried. But StreamInsight requires that your sinks specify the payload as the type for the observer or one of the TypedEvent<T> child classes. If you don’t specify an event shape, StreamInsight will handle the events and release them to the output adapter as though they were point events. For some very basic scenarios, this works. But when you start getting into some of the more interesting scenarios for StreamInsight, you’ll want to do a lot more with your output than to view it as a point. But … this is for the next post. Let’s get back to our implementation. As we did with our input producer, we’ll create a common, abstract base class for all of our event consumers. Our concrete consumers will inherit from this class and handle whatever is necessary to write the data to our target, whatever it may be. Again, as with the producer, we’ll specify a configuration class; while the reactive StreamInsight API no longer requires it, the reality is that you will want to have a configuration class for your sinks; you don’t want to hard-code things like database connection strings, web service target URIs or anything like that in your event consumers. But the key thing that our base class will do is to implement the interface for each of the event shapes, translate them to our StreamOutputEvent and then send to our actual event consumer’s code. StreamEventConsumer publicabstractclassStreamEventConsumer<TPayloadType, TConfigType> :     IObserver<PointEvent<TPayloadType>>,     IObserver<EdgeEvent<TPayloadType>>,     IObserver<IntervalEvent<TPayloadType>> {     protected StreamEventConsumer(TConfigType configuration)     {         this.Configuration = configuration;     }     public TConfigType Configuration { get; privateset; }     publicabstractvoid Completed();     publicabstractvoid Error(Exception error);     publicabstractvoid EventReceived(StreamOutputEvent<TPayloadType> outputEvent);     publicvoid OnNext(PointEvent<TPayloadType> value)     {         EventReceived(StreamOutputEvent<TPayloadType>.Create((PointEvent<TPayloadType>)value));     }     publicvoid OnNext(IntervalEvent<TPayloadType> value)     {         EventReceived(StreamOutputEvent<TPayloadType>.Create((IntervalEvent<TPayloadType>)value));     }     publicvoid OnNext(EdgeEvent<TPayloadType> value)     {         EventReceived(StreamOutputEvent<TPayloadType>.Create((EdgeEvent<TPayloadType>)value));     }     publicvoid OnCompleted()     {         Completed();     }     publicvoid OnError(Exception error)     {         Error(error);     } } From here, we’ll continue to build the architecture much the same way that we built the event producers. For each consumer, we’ll have a factory that handles the details of creating and starting the consumer based on a common interface. ISinkFactory interfaceISinkFactory {     IObserver<PointEvent<TPayload>> CreatePointObserverSink< TPayload>(object config);     IObserver<EdgeEvent<TPayload>> CreateEdgeObserverSink<TPayload>(object config);     IObserver<IntervalEvent<TPayload>> CreateIntervalObserverSink<TPayload>(object config); } Between StreamEventConsumer and ISinkFactory, it’s now a small step to create a concrete factory and consumer – as well as the adapters. For simplicity’s sake, we’ll use a console consumer. Console Data Consumer publicclassConsoleDataConsumer<TPayloadType>:StreamEventConsumer<TPayloadType, ConsoleOutputConfig> {     public ConsoleDataConsumer(ConsoleOutputConfig configuration) : base(configuration)     {     }     publicoverridevoid Completed()     {         //Nothing necessary.     }     publicoverridevoid Error(Exception error)     {         Console.WriteLine("Error occurred:" + error.ToString());     }     publicoverridevoid EventReceived(StreamOutputEvent<TPayloadType> outputEvent)     {         if (outputEvent.EventKind == EventKind.Insert)         {             Console.ForegroundColor = Configuration.InsertEventColor;             Console.WriteLine("Insert Event Received at " + outputEvent.StartTime);         }         elseif (Configuration.ShowCti)         {             Console.ForegroundColor = Configuration.CtiEventColor;             Console.WriteLine("CTI event received at " + outputEvent.StartTime);         }         Console.ResetColor();     } } Our factory handles the dirty details of hooking up to our source streams as well as our output adapters. By forcing the factory to implement methods for each event shape we both ensure that we get the interface that we need when creating the sink and tying it to our streams as well as the opportunity to say, in code, that a particular output sink doesn’t support specific shapes, should that be appropriate. ConsoleOutputFactory publicclassConsoleOutputFactory:ISinkFactory , ITypedOutputAdapterFactory<ConsoleOutputConfig>       {         publicIObserver<PointEvent<TPayload>> CreatePointObserverSink<TPayload>(object config)         {             returnnewConsoleDataConsumer<TPayload>((ConsoleOutputConfig)config);         }         publicIObserver<EdgeEvent<TPayload>> CreateEdgeObserverSink<TPayload>(object config)         {             returnnewConsoleDataConsumer<TPayload>((ConsoleOutputConfig)config);         }         publicIObserver<IntervalEvent<TPayload>> CreateIntervalObserverSink<TPayload>(object config)         {             returnnewConsoleDataConsumer<TPayload>((ConsoleOutputConfig)config);         }         publicOutputAdapterBase Create<TPayload>(ConsoleOutputConfig configInfo, EventShape eventShape)         {             switch (eventShape)             {                 caseEventShape.Interval:                     returnnewObserverTypedIntervalOutputAdapter<TPayload>(CreateIntervalObserverSink<TPayload>(configInfo));                     break;                 caseEventShape.Edge:                     returnnewObserverTypedEdgeOutputAdapter<TPayload>(CreateEdgeObserverSink<TPayload>(configInfo));                     break;                 caseEventShape.Point:                     returnnewObserverTypedPointOutputAdapter<TPayload>(CreatePointObserverSink<TPayload>(configInfo));                     break;                 default:                     thrownewArgumentOutOfRangeException("eventShape");             }         }         publicvoid Dispose()         {             //throw new NotImplementedException();         }     } We’ve not touched on the output adapters yet so now’s the time to introduce them; they are, after all, already referenced in our factory. As before, we have a single factory for our producers using the 2.1+ Reactive model as well as our legacy adapter mode. As with our input adapters, the output adapters are relatively thin wrappers around our event consumers that handle the details of lifetime. Unlike the input adapters, with the output adapters, we may well get some data after our “stop” event and we want to make sure that we dequeue all events before shutting down. To control this a little better, we use Monitor.Enter and Monitor.Exit directly rather than the basic lock{} block provided by C#. The lock block, by the way, creates, behind the scenes, a Monitor.Enter/Monitor.Exit pair. However, using this directly allows us to minimize the possibility of deadlocks if we get into a scenario where we are actively dequeuing events when we get a Resume call. By using Monitor.TryEnter(), we can attempt to enter our dequeuing thread from other threads without blocking. If the lock has already been acquired, we don’t need to spin up another thread to dequeue and we certainly don’t need to block waiting for a lock that we won’t actually need once we get it. Our dequeue thread will continue to dequeue 1 event at a time until nothing is left in the queue. And we need to make sure that the dequeue operation is synchronized – only 1 thread can dequeue at a time anyway. Adding multiple threads to the dequeue operation typically won’t help us and we want to make sure that we have all available threads available to process actual query results. Now … once we’ve dequeued, we may want to use techniques to multi-thread sending the results to the final target. But … our actual dequeue from each query/process should be single threaded. Keep in mind, however, that you’ll have multiple, single-threaded output sinks in most real-world applications. You will be multi-threaded, have no worries there. And calls into our event consumers can come from any thread, which is why we need to use locks to make sure that we’re properly synchronized. This is particularly important when our output adapter is stopped. After stop is called, we’ll get one more change to empty our queue. We use the monitor to make sure that we do empty all available events from the queue before calling Stopped(). This ensures that we’ll have a nice, clean shutdown with no hangs and no ObjectDisposedExcetpions. Point Output Adapter publicclassObserverTypedPointOutputAdapter<TPayloadType>         : TypedPointOutputAdapter<TPayloadType>     {         privatereadonlyIObserver<PointEvent<TPayloadType>> _sinkObserver;         public ObserverTypedPointOutputAdapter(IObserver<PointEvent<TPayloadType>> sinkObserver)         {             _sinkObserver = sinkObserver;         }         publicoverridevoid Stop()         {             try             {                 Monitor.Enter(_monitorObject);                 //On last round to dequeue                 EmptyQueue();                 //Completed                 _sinkObserver.OnCompleted();             }             finally             {                 Monitor.Exit(_monitorObject);             }             base.Stop();             Stopped();         }         publicoverridevoid Resume()         {             System.Threading.Thread thd = newThread(DequeueEvents);             thd.Start();         }         publicoverridevoid Start()         {             System.Threading.Thread thd = newThread(DequeueEvents);             thd.Start();         }         privateobject _monitorObject = newobject();         privatevoid DequeueEvents()         {             if (this.AdapterState != AdapterState.Running)             {                 return;             }             //Ensures only 1 thread is dequeuing and no other threads are blocked.             if (Monitor.TryEnter(_monitorObject))             {                 try                 {                     EmptyQueue();                 }                 catch (Exception ex)                 {                     _sinkObserver.OnError(ex);                 }                 finally                 {                     Monitor.Exit(_monitorObject);                     this.Ready();                 }                                  }         }         privatevoid EmptyQueue()         {             PointEvent<TPayloadType> dequeuedEvent;                          while (this.Dequeue(out dequeuedEvent) == DequeueOperationResult.Success  )             {                     _sinkObserver.OnNext(dequeuedEvent);             }         }     } Now that we have all of the core pieces in place, let’s take a look at what we need to do to hook our sink up to the console output. It’s actually very simple. Hooking up to a stream privatestaticvoid RunProcess(Application cepApplication) {     var config = newTestDataInputConfig (){         NumberOfItems=20,         RefreshInterval=TimeSpan.FromMilliseconds(500)     };     var data = RxStream<TestDataEvent>.Create(cepApplication, typeof (TestDataInputFactory), config, EventShape.Point);     var factory = new ConsoleOutputAdapter.ConsoleOutputFactory();               var sink = cepApplication.DefineObserver(() => factory.CreatePointObserverSink<TestDataEvent>                                                        (new ConsoleOutputAdapter.ConsoleOutputConfig()                                                            {                                                                ShowCti = true,                                                                CtiEventColor = ConsoleColor.Blue,                                                                InsertEventColor = ConsoleColor.Green                                                            }));     data.Bind(sink).Run(); } There’s one thing that you may notice … the sink needs to know all of the details about the data class. This is far from ideal … and one of the things that I found so powerful about the untyped adapter model – you weren’t tied to the schema of your data classes. There are various ways that we can handle this but that’s a topic for the next entry. Until then, you can download the code from my SkyDrive.

Dual Mode Data Sources-Part III

StreamInsight | Code Sample
In the two previous postings, we would through creating the data sources and exposing them to both the new Reactive-centric API and the “legacy” adapter-centric API. While we’ve accomplished that, I also said that we’d revisit the 2.1 code to add a layer of consistency to the APIs; we’re going to “bring back” factories and integrate these into our API. Why on earth would I do that? Didn’t we just get rid of them to make things simpler? Well, yes, we did just do away with the requirement of a factory but that doesn’t mean that they aren’t a good idea. Implementing the factory pattern will hide the details of creating the actual data source – our current one is pretty simple but others may get quite complex – and provide a layer to “check” requirements before we try to start the query (we’re already doing this with the adapter factory). I also happen to like consistent APIs and keeping consistency whenever practical and possible. We’ll start by creating an interface. We may – and likely will – also wind up creating an abstract base class that implements the interface and handles common functionality but that will be refactored later after we write some more sources and get a better feel of how to best define the base class. And … having both an interface and a base class gives us the greatest level of flexibility when implementing later on; because our API is based on interfaces, we can inherit from other, existing code and/or components and just add the interface. If we based our API exclusively on base classes, the single-inheritance rule would limit what that. This interface will be pretty simple and only have a single method. ISourceFactory /// <summary> /// Interface for data source factories. /// </summary> public interface ISourceFactory {     /// <summary>     /// Creates an observable source.     /// </summary>     /// <typeparam name="TPayload">Type of the payload.</typeparam>     /// <param name="config">Configuration class.</param>     /// <param name="eventShape">Shape of the events</param>     /// <returns></returns>     IObservable<StreamInputEvent<TPayload>> CreateObservableSource<TPayload>(object config, EventShape eventShape); } You may notice a slight different between this and the input adapter factory interface – config is not strongly typed. Yes, we could do that … define a generic interface and then use reflection to call it. But, to be honest, I really didn’t want to go through all of the contortions to make that happen. So the config is an object. Now, when we implement this on our existing factory, we also refactor things a bit to maximize code reuse. Adapter + Source Factory public sealed class TestDataInputFactory : ITypedInputAdapterFactory<TestDataInputConfig>, ISourceFactory     {         [SuppressMessage("Microsoft.Design", "CA1004:GenericMethodsShouldProvideTypeParameter", Justification = "By Design")]         public InputAdapterBase Create<TPayload>(TestDataInputConfig configInfo, EventShape eventShape)         {             CheckPayloadType<TPayload>();             return new ObservableTypedPointInputAdapter<TestDataEvent, TestDataInputConfig>(CreateProducer(configInfo, eventShape));         }         public IObservable<StreamInputEvent<TPayload>> CreateObservableSource<TPayload>(object config, EventShape eventShape)         {             //Check the payload type.             CheckPayloadType<TPayload>();             //Check the config class for the proper type.             TestDataInputConfig typedConfig = config as TestDataInputConfig;             if (typedConfig == null)             {                 //Invalid cast                 throw new ArgumentException("Configuration class must be of type TestDataInputConfig");             }             return (IObservable<StreamInputEvent<TPayload>>)CreateProducer(typedConfig, eventShape);         }         private static void CheckPayloadType<TPayload>()         {             //Check the payload.             if (typeof(TPayload) != typeof(TestDataEvent))             {                 //this won't work.                 //throw an exception.                 throw new InvalidOperationException("Specified type must be of " + typeof(TestDataEvent).FullName);             }         }         private TestDataProducer CreateProducer(TestDataInputConfig config, EventShape eventShape)         {             switch (eventShape)             {                 case EventShape.Point:                     //Create the publisher.                     return new TestDataProducer(config);                 default:                     throw new ArgumentException(string.Format(                         System.Globalization.CultureInfo.InvariantCulture,                         "TestDataInputFactory cannot instantiate adapter with event shape {0}",                         eventShape.ToString()));             }         }         public void Dispose()         {         }     } You’ll notice that both methods … source and adapter … use all of the same validation logic and code. Using this in our existing code isn’t too difficult. Our new “RunProcess()” now looks like the following: Run Process private static void RunProcess(Application cepApplication) {     var config = new TestDataInputConfig (){         NumberOfItems=20,         RefreshInterval=TimeSpan.FromMilliseconds(500)     };     var data = cepApplication.DefineObservable(         () => new TestDataInputFactory().CreateObservableSource<TestDataEvent>(config,EventShape.Point ))         .ToPointStreamable(e => e.GetPointEvent());          var sink = cepApplication.DefineObserver(() => Observer.Create<TestDataEvent>(         e => Console.WriteLine("TestEvent ItemId:{0} Run:{1} Value{2}", e.ItemId, e.RunNumber, e.Value)));     data.Bind(sink).Run(); } We can still make it better, though, and make make it look more like the pre-2.1 API. We’ll start by creating an RxStream class and add methods for creating observables. We’ll also want to make sure that these methods are remotable – so we can work with either a local or a remote instance without any code changes. This was a challenge with the pre-2.1 API; you could pretty easily get yourself into a situation where your code work work on a local, in-process instance but not work at all when you were connecting to a remote instance. With DefineObservable, however, it’s actually defining a remote function that returns an observable and that’s what we call … whether in process or remote. If it works in-process then it’ll work out of process. However, you may wind up getting yourself into a situation with mysterious serialization errors … you need to make sure that whatever you pass to your methods if fully serializable. That’s why our configuration class has the DataContract attribute. With our CreateObservable method, we’ll first check to see if we have a reference to the function (and notice that it is of type Func<>). If not, we check to see if it’s been created and deployed. If not, we create and deploy it. We could also put this same code in a static constructor – it wouldn’t make much difference. The actual work is done by an InstantiateObservable private method and that’s what our defined Observable actually calls. RxStream Observables private static Func<Type, object, EventShape, IQbservable<StreamInputEvent<TPayload>>> _observable; public static IQbservable<StreamInputEvent<TPayload>> CreateObservable(Application cepApplication,     Type sourceFactoryType,     object configInfo,     EventShape eventShape) {     string entityName = "Observable:" + typeof (TPayload).FullName;     if (_observable == null)     {         //Check the application.         if (cepApplication.Entities.ContainsKey(entityName))         {             _observable =                 cepApplication.GetObservable<Type, object, EventShape, StreamInputEvent<TPayload>>(entityName);         }         else         {             //Define and deploy.             _observable = cepApplication.DefineObservable(                 (Type t, object c, EventShape e) => InstantiateObservable(t, c, e));             _observable.Deploy(entityName);         }     }     return _observable.Invoke(sourceFactoryType, configInfo, eventShape); } private static IObservable<StreamInputEvent<TPayload>> InstantiateObservable(Type sourceFactoryType,                                                                               object configInfo,                                                                               EventShape eventShape) {     var sourceFactory = Activator.CreateInstance(sourceFactoryType) as ISourceFactory;     if (sourceFactory == null)     {         throw new ArgumentException("Specified type is not a source factory.");     }     return sourceFactory.CreateObservableSource<TPayload> (configInfo, eventShape); } Now all that we have left to do is to create methods to also create the streams, rather than just the observables. This will be our typical use case but we’ll still keep the observable method public as well to give us the most flexibility when we are actually using this in anger. RxStream Create public static IQStreamable<TPayload> Create(     Application cepApplication, Type sourceFactoryType, object configInfo, EventShape eventShape) {     return Create(cepApplication, sourceFactoryType, configInfo, eventShape, null); } public static IQStreamable<TPayload> Create(     Application cepApplication, Type sourceFactoryType, object configInfo, EventShape eventShape, AdvanceTimeSettings advanceTimeSettings) {       var observable = CreateObservable(cepApplication, sourceFactoryType, configInfo, eventShape);     switch (eventShape)     {         case EventShape.Interval:             return observable.ToIntervalStreamable(e => e.GetIntervalEvent(), advanceTimeSettings);         case EventShape.Edge:             return observable.ToEdgeStreamable(e => e.GetEdgeEvent(), advanceTimeSettings);         case EventShape.Point:             return observable.ToPointStreamable(e => e.GetPointEvent(), advanceTimeSettings);         default:             throw new ArgumentOutOfRangeException("eventShape");     } } Once this is in place, the code to create our stream look remarkably similar to the adapter-centric code: Creating the Stream var config = new TestDataInputConfig (){     NumberOfItems=20,     RefreshInterval=TimeSpan.FromMilliseconds(500) }; var data = RxStream<TestDataEvent>.Create(cepApplication, typeof (TestDataInputFactory), config, EventShape.Point); There’s still more that we’ll need to do … for example, we’ll need to create overloads to import CTIs. But, for now, we’re done with the core API for our input adapters and will be moving on to output adapters/sinks in the next article. You can download the current from SkyDrive. 

Using Subjects as a “Feedback Loop”

Code Sample | StreamInsight
OK, I’m going to switch gears for a minute here. Yes, I’ll be back to building the app but this is just too cool to not share. It’s something that I’ve thought about quite a bit off and on and a forum post got it going again so I put together a sample last night that shows how it can be done. The question … how do I take the output of a query and feed it back in to the query itself? This has a number of uses; in the case of the forum post, the developer wants to use this to update running totals and balances as orders are processed because these results impact the rest of the events. When working with sensors, this is also how you would do a “deadband” … an output isn’t produced unless the new value is more than a certain percentage change from the previous. For the deadband use case, you can’t just compare to the previous reading; if you do this, slow changes can accumulate but a new value is never reported because the change between any two values is less than the deadband threshold. So you need to compare the current reading value to the last reported value. I’ve done with with a UDO that maintains state (in a dictionary) for each of the items but it’s not as elegant as I’d like. And this is cooler anyway.   So … what is a subject? A subject is an observer and and observable; it will republish events that it observes (via IObserver) to others that are observing it (via IObservable). For a little primer on subjects, check out this blog entry from Colin on the StreamInsight team; it gives you a hint of what you can do with these. But that’s just the tip of the iceberg; subjects have become one of my favorite little tricks in StreamInsight because there’s so much that you can do with them. As Colin mentions, observers can come and go – so you can hook up multiple sinks to a single output query (something very difficult prior to 2.1). What Colin doesn’t mention is that your sources (observables) can come and go also … you can hook up multiple source queries that then get fed to one or more sinks. Subjects are created and deployed independently of their sources and sinks – you can create a subject that you are using as a source before you actually hook it up to a sink. Or vice-versa. Subjects also allow you to share query results across processes – similar to what you would do with dynamic query composition but far, far, far more powerful and flexible. Now, because a subject is both a source and a sink, you can use them to take the output results of a query and feed them back in to the source query … creating a feedback loop. Or a time warp. Creating a subject is pretty simple. There isn’t much code required. You just need to know what type you’ll be using. You can bind the subject to the payload type only – you only get the payload and no temporal header – or to the TypedEvent (i.e. PointEvent, IntervalEvent, etc), in which case you will get the payload and the temporal header. Let’s start there. GetOrCreateSubject privatestaticobject _subjectLock = newobject(); privatestaticIRemoteSubject<TSubject, TSubject> GetOrCreateSubject<TSubject>(Application cepApplication, string subjectName) {     lock (_subjectLock)     {         bool subjectExists = cepApplication.Subjects.Any(s => s.Key == subjectName);         if (subjectExists)         {             return cepApplication.GetSubject<TSubject, TSubject>(subjectName);         }         else         {             return cepApplication.CreateSubject(subjectName, () => newSubject<TSubject>());         }     } } As the name says, this method will get or create a subject, depending on if it’s already created or not. If you are wondering about the _subjectLock, that’s there to ensure that only one thread is in the middle of this operation at any time. If you are multi-threading calls to this, it is entirely possible to get into a situation where you are calling the same method with the same arguments on different threads and wind up with exceptions as multiple threads try to create the same subject at the same time. Now, let’s get data running through our test app. I’m using the same app from my recent posts so this will look familiar. We’ll also get a reference to the subject. Getting Data var config = newTestDataInputConfig ()             {                 NumberOfItems=20,                 RefreshInterval=TimeSpan.FromMilliseconds(500)             };             var data = cepApplication.DefineObservable(                     () => newTestDataProducer(config)).ToPointStreamable(e => e.GetPointEvent());             var lastReportedSubject = GetOrCreateSubject<PointEvent<TestDataEvent>>(cepApplication, "LastKnown"); We aren’t sending data to the subject just yet … that will be from the results later … but we do need to get a stream (IQStreamable) from it. But let’s stop for a moment and discuss some of the potential gotchas. First, remember that a stream in StreamInsight is a temporal stream. All of the events exist in time and the stream moves forward based on CTIs. We also know that we’ll be joining this feedback stream with the data stream and, when you do that, StreamInsight will synch to the slowest stream … the joined stream will move forward only when CTIs from both source streams move past the same timestamp. If our feedback loop simply publishes the CTIs that are generated from our result stream, it will never move forward. Why not? Because it’ll be waiting for CTIs to move past a timestamp from the result stream but the result stream can’t move forward because there is no CTI coming from our feedback stream. Did that make sense? It makes my brain hurt thinking about it too much. Anyway, what we need to do is to directly import the CTIs from the data stream. But that gives us another challenge. If we do that, we now have to worry about CTI violations from the data (insert) events being published from the results. You see, the events produced will have start times that are before the last-issued CTI; they must be or they wouldn’t be in the output stream. So we need to account for this when we enqueue the new events by shifting the start time forward so that it is after the last-issued CTI. So let’s get started. To handle the CTIs, we’ll first filter the subject observable source so that it only produces Insert events. Then we will Merge these results it the CTIs from the data stream. The result will be an observable that we can then use as the source for the feedback stream. A note, however, on using the Merge technique to import CTIs … it will give you no protection at all from CTI violations. If you try to create/enqueue an event that violates the CTI, you will get an exception that you can’t really catch and it will cause your query to abort. Feedback Source var lastReportedObservableSource = lastReportedSubject                                     //Get only the inserts from the subject, dropping the CTIs                                     .Where(e => e.EventKind == EventKind.Insert)                                     //Merge with the CTIs from the data stream.                                     .Merge(data.ToPointObservable().Where(e => e.EventKind == EventKind.Cti)); Now that we have our observable, we need to make it a stream. While it is still an observable (IQbservable), we don’t have to follow the temporal/CTI rules because it’s not a temporal stream yet. However, when it’s a stream (IQStreamable), it is a temporal stream and we do have to follow the temporal/CTI rules … so we have to shift the timestamps of our point events so that they don’t violate the CTI. With the data source that we are using, we know that the CTIs are 1 tick past the “batch” of events so shifting the timestamp 1 tick will ensure that we’re good. In the real world, you may need to something a bit more sophisticated … see my previous blog article on importing CTIs in 2.1 for a couple of tips; I will be revisiting this in my app-building series. Feedback Stream var lastReportedSourceStream = lastReportedObservableSource                                     .ToPointStreamable(e => e.EventKind == EventKind.Cti ? e :                                         PointEvent<TestDataEvent>.CreateInsert(e.StartTime.AddTicks(1), e.Payload)); The hard part is done. Now that we have the streams, the rest is a series of StreamInsight queries. First, we want to make sure that we always have the last-known value available in the last reported value stream so we’ll use the (very) common ToSignal pattern. Next, we need to make sure that the initial values reported always go through. If we just join the streams, we’ll never get output because the initial events won’t be in the feedback stream. So we’ll do a LeftAntiJoin so that the events in the data stream with no matches in the feedback stream go through. (Depending on your scenario, it may be perfectly appropriate to “seed” the feedback stream with initial events … in the case of the forum post, these could be current balances when the application starts. If that’s the case, you can probably skip this step.) Then we calculate the percentage change from the last reported value for the current value; in this step, I’m including both the current and the last reported value in the stream (we don’t really need to) so that it’s easier to check the results using the event flow debugger and “see” everything that’s happening. From the calculation, we then select only those items that have changed more than a specified amount. Since we’re using random, very variable data in this sample, I’ve put the “deadband” at 100% but real-world will be somewhat less; anywhere from 1% – 10%, depending on the source. Union the two queries together and we’re done! (with the queries, at least). The Queries //Make sure that we have the last reported value always available. var lastReportedStream = lastReportedSourceStream.ToSignal((e1, e2) => e1.ItemId == e2.ItemId);      //Make sure that our initial values always get reported in the output using LeftAntiJoin. var firstValues =     from i in data.LeftAntiJoin(lastReportedStream, (e1, e2) => e1.ItemId == e2.ItemId)     select i; //Calculate the change from the previous. var calcStream = from d in data                  join                      lr in lastReportedStream                      on d.ItemId equals lr.ItemId                  selectnew                      {                          Value = d,                          LastReported = lr,                          PctChange = Math.Abs(1 - ((d.Value - lr.Value)/lr.Value))                      }; //Select only those that have changed more than 100% var changed = from c in calcStream               where c.PctChange > 1.0               select c.Value; //Union first values with changed values var final = firstValues.Union(changed); All that’s left is to bind the output to the subject, creating the feedback loop as well as a console writer sink so that we see results. Then we run the process. Bind and Run var sink = cepApplication.DefineObserver(() => Observer.Create<TestDataEvent>(     e => Console.WriteLine("TestEvent ItemId:{0} RunNumber:{1} Value{2}", e.ItemId, e.RunNumber, e.Value))); //Bind to subject and console sink so that we see results. //Run the process. final.Bind(lastReportedSubject).With(final.Bind(sink)).Run(); I do have to note that when I run this, StreamInsight hangs on shutdown – not a good sign. I’m not exactly sure why this is happening but if I figure it out, I will update this post. Or … if you figure it out, you can send me a note using the “Contact” link on this blog. You can download the code from my SkyDrive (of course!).

Dual Mode Data Sources–Part II

Code Sample | StreamInsight
In my previous post, I walked through creating a basic structure for creating StreamInsight data sources using the Reactive model introduced with version 2.1. There was quite a bit of plumbing work to get done but now that it’s finished, we’re ready to move on to the next step … taking what we just created and making it available to the pre-2.1 API. I didn’t mention this in the previous post but these articles are the first in what’s going to be a series of articles that walk through building a StreamInsight application. In these articles, we’re going through the process to create our data sources – or, more accurately, the architectural framework for our data sources. We’ll do the same with sinks/output adapters and then we’ll start pulling in some real(istic) data and doing something more interesting. Each article will build on the next so you can follow along. Now, back to our input. Since we already have the data flowing via the IObservable interface, it’s really easy to subscribe to this interface and implement an input adapter that uses the exact same code to generate events. All we need to do subscribe to our producer and enqueue the event when OnNext is called: On Next public void OnNext(StreamInputEvent<TPayloadType> publishedEvent) {     if (AdapterState == AdapterState.Running)     {         var newEvent = publishedEvent.GetPointEvent();         this.Enqueue(ref newEvent);     } } It really is that simple. Well, almost but before we get into that, let’s step back for a second. We have an event producer that creates our events. This producer handles all of the details of connecting to the data source and packaging the data for StreamInsight. Traditionally, we did all of this in the adapter itself but since we’ve already separated it, the adapter really doesn’t need to do much except enqueue. Looking at it that way, why do we need to have separate adapters for each type that we have? Well, the truth is: we don’t. With a little planning, we can have a single set of adapters that subscribe to a producer’s IObservable interface and handle enqueuing the events into StreamInsight as well as manage lifetime. These will be our adapters and they’ll wind up being very thin. So we’ll start by creating a new point typed input adapter. Like the base class, the payload type is a generic argument for the class – there’s no reason why we can’t use this for any type. Because Start() and Resume() are abstract methods on the base class, we have to implement them. We’ll also want to override Stop() and Dispose(bool disposing). Adapter Skeleton     public class ObservableTypedPointInputAdapter<TPayloadType>         :TypedPointInputAdapter<TPayloadType>     {         public override void Start()         {             throw new NotImplementedException();         }           public override void Resume()         {             throw new NotImplementedException();         }           public override void Stop()         {              base.Stop();         }           protected override void Dispose(bool disposing)         {              base.Dispose(disposing);         }     } So let’s get started. First, we need to get our producer. That’s simple enough; we know that we need to have an adapter factory so that’s where we’ll create it and pass it in as an argument of the constructor. We pop this into a private class field – we won’t be calling this until we subscribe. Speaking of subscribe, this is exactly what we need to do in the Start() method. For the producer’s OnCompleted action, we’ll call the adapter’s Stop() method. Yes, you can call Stop() in an adapter and not stop the queries that are getting events from the stream – this is something that may happen with a read-once reference data adapter. It’s not commonly seen in the wild but we’ll make sure that we handle it correctly. One thing that is important to understand here … if you are having StreamInsight generate your CTIs for you with AdvanceTimeSettings, one of your options is to AdvanceToInfinityOnShutdown. This is directly impacted by the adapter calling Stopped() (or the producer, in fact, calling Complete()). When the source shuts down, StreamInsight will enqueue a CTI with a timestamp of DateTimeOffset.MaxValue, which serves to “clear” the stream of any pending events. It also happens to work very nicely if you happen to have a read-once data source of reference data. Constructor & Start public ObservableTypedPointInputAdapter(IObservable<StreamInputEvent<TPayloadType>> eventProducer) {     this._eventProducer = eventProducer; }   public override void Start() {     _subscription = _eventProducer.Subscribe(OnNext,          () =>          {              if (AdapterState == AdapterState.Running)              {                  Stop();              }          }); } Start vs. Resume In a lot of the samples, you’ll see these implemented the exact same way. However, in the real world, you may want to put a little more thought into how you handle Resume(). Odds are pretty good, especially if you are using Premium Edition, that you will never need to actually implement Resume(). It gets called after your adapter is put into a suspended state because the input queue is full. Yes, it is possible to fill up the input queue – you need about 200,000 unprocessed events that StreamInsight can’t get pushed through your queries. I have made this happen – but it took some effort. I actually had to continue enqueueing events just as fast as I could on multiple threads. Even with some convoluted queries, it still took about some time to fill up the input queue. Considering that I’ve seen StreamInsight process over 30,000 events/sec on a dual core i7 laptop and over 100,000 events/sec on a low-to-mid level server with CPU cycles to spare, you will have to work at it. But it can happen. Now, if you are using Standard Edition, you won’t be able to get to that level of throughput (and I haven’t done any testing on Standard’s throughput) but you can still expect to push a goodly number of events through standard edition. Now, when the input queue fills up, StreamInsight will return Full from the enqueue. Your adapter will need to call Ready() and then StreamInsight will call Resume() when you can start enqueuing events again. If you are using the StreamInsight 2.1 Reactive model, you won’t get any notification if the input queue fills. Instead, StreamInsight will simply ignore your events until it’s ready to receive events again. This is what I’m going to do in our excercises but it may not be appropriate, depending on your source and your needs. For example, if you are simply pushing recorded data through the engine, it would be easy enough to pause and then start back up again when Resume() is called. If, however, it’s live data, you may need to do something else. You will want to exercise some caution in buffering up events that you miss – you have no way of knowing when the adapter will start back up again and you could eat up FAR more memory than you want to. It is a potentially difficult issue but, fortunately, one that is pretty tough to hit in the real world. Especially if you are using Premium Edition, you’ll saturate network bandwidth (for inbound data) before you fill up the queue. In our exercise, we’re going to do the same thing that StreamInsight 2.1 does … we’re just going to ignore incoming events until Resume() is called. Back to the code … in the snippet above, I enqueued directly from the OnNext function. While this works – if everything goes right – it doesn’t really represent what we’ll need to do in a real app to handle things like exceptions, queue full and the like. Also, while this class is designed to be a stand-alone input adapter than can be used with any observable source, there may be special case adapters – for example, specific handling of a pause/resume scenario – where we want to override specific behaviors with an inherited class so we want to make sure that we provide the right level of points to override. So we’ll take the enqueue operation and create our own method. In here, we do all of our state checks to make sure that we can enqueue, handle enqueue exceptions and queue full as well as keep track of our last-enqueued CTI. Finally, we will also include a critical section lock around the actual enqueue to make sure that we handline lifetime correctly and don’t report Stopped() when we’re in the middle of an enqueue. It is the same method described in my previous article on output adapter lifetime but applied to input adapters. Enqueue Event protected virtual void EnqueueEvent(StreamInputEvent<TPayloadType> publishedEvent) {     if (!_canEnqueue) return;       if (publishedEvent.Start < _lastCti)     {         return;     }     lock (_lockObject)     {         if (this.AdapterState != AdapterState.Running)         {             return;         }         var point = publishedEvent.GetPointEvent();         try         {             var enqueueResult = this.Enqueue(ref point);             if (enqueueResult == EnqueueOperationResult.Success && publishedEvent.EventKind == EventKind.Cti)             {                 _lastCti = point.StartTime;             }             if (enqueueResult == EnqueueOperationResult.Full)             {                 //Queue full!! Pause enqueuing.                 _canEnqueue = false;                 ReleaseEvent(ref point);                 //Let StreamInsight know we're ready to resume.                 Ready();             }         }         catch         {             ReleaseEvent(ref point);             throw;         }     } } Our last step is to create our adapter factory. In our Create method, we’ll do a couple of checks to make sure that our arguments are valid – the payload is the proper type and, since our random data source only supports points, that we are implementing the correct shape. Input Adapter Factory public sealed class TestDataInputFactory : ITypedInputAdapterFactory<TestDataInputConfig> {       [SuppressMessage("Microsoft.Design", "CA1004:GenericMethodsShouldProvideTypeParameter", Justification = "By Design")]     public InputAdapterBase Create <TPayload> (TestDataInputConfig configInfo, EventShape eventShape)     {           if (typeof(TPayload) != typeof(TestDataEvent))         {             //this won't work.             //throw an exception.             throw new InvalidOperationException("Specified type must be of " + typeof(TestDataEvent).FullName);         }           switch (eventShape)         {             case EventShape.Point:                 //Create the publisher.                 return new ObservableTypedPointInputAdapter<TestDataEvent, TestDataInputConfig>(                     new TestDataProducer(configInfo));             default:                 throw new ArgumentException(string.Format(                     System.Globalization.CultureInfo.InvariantCulture,                     "TestDataInputFactory cannot instantiate adapter with event shape {0}",                     eventShape.ToString()));         }       }       /// <summary>     /// Dispose method.     /// </summary>     public void Dispose()     {     }   } Once we have the adapter factory, we’re ready to go. Because we’ve not done any output adapters yet, we’ll be using the ToPointObservable() method to write to the console. Keep in mind that this works only when hosting StreamInsight in-process; it’s not at all like the IObservable support in 2.1. But it works well enough to show that we have some data flowing through and that our adapter now supports both models from the same data producer! Run Query private static void RunQuery(Application cepApplication) {     var config = new TestDataInputConfig()         {             NumberOfItems = 20,             RefreshInterval = TimeSpan.FromMilliseconds(500)         };       var data = CepStream<TestDataEvent>.Create(cepApplication,         "TestData", typeof (TestDataInputFactory), config, EventShape.Point);       var observable = data.ToPointObservable().Subscribe(         e =>             {                 if (e.EventKind == EventKind.Insert)                 {                     Console.WriteLine("TestEvent ItemId:{0} Run:{1} Value{2}",                                       e.Payload.ItemId, e.Payload.RunNumber, e.Payload.Value);                 }                 else                 {                     Console.WriteLine("CTI @ {0}", e.StartTime);                 }             }); } Using the same pattern, we can also create the adapters for interval and edge events pretty quickly. Unfortunately, a lot of it is copy-paste-tweak. I say unfortunately because it leads to a lot of redundant and repetitive code that I, for one, would prefer to create and maintain in a single place. However, our key method – enqueue – isn’t defined in any common base class; we have to inherit from the shape-specific adapters and there’s no way around it. While we could probably deal with this with another layer of abstraction, there isn’t, IMHO, a whole lot of value to that. Should that change, we’ll take a look at refactoring a bit then. Looking at the input adapter factory, you’ll notice that it does some checking to make sure things are valid before creation; factories are good for that. There’s a bunch of other stuff that could be done in the factory as well and having a standard, generic factory interface is a good way to provide yourself with a clean, clear and decoupled way to create what may be complex underlying objects. And there’s no reason why we can’t revisit our existing 2.1-based code to add the factory in there. It also provides a level of consistency between the two different APIs that can make our life simpler, reduce the amount of debugging that we need to do and promote code sharing. That, however, will have to wait for the next post – I am trying to do this in manageable pieces so I’m not writing a novel with each post. In the meantime, you can download the current project from my SkyDrive.

Dual Mode Data Sources–Part I

Code Sample | StreamInsight
When StreamInsight 2.1 came out, the Big New Thing for the release was the new model for data ingress and egress – sources and sinks. Based on standard interfaces like IObservable and IObserver and the Reactive Extensions, it makes getting data into and out of StreamInsight a whole lot easier. The previous model works (or worked) well enough but, honestly, it was hard … and harder than it really needed to be. Getting the adapter lifetime just right was not a trivial undertaking and getting wrong meant either hanging on shutdown or an ObjectDisposedException. It took me … and my team … some time to get all of this nailed down to a science. The new model handles all of this for you … you just subscribe and publish. The rest is invisible. Very nice. But, as when any new API replaces an old API, there is a time of transition from one to the other. There’s a time when developers or solution providers – like me – need to be able to support both APIs. For us, we actually still (sadly) need to support StreamInsight 1.2 and 2.0 in some cases so we can’t go over to the new model. But we also don’t want to rewrite all of our adapters or maintain two codebases of adapters. Instead, we want to support both from a single code base. Fortunately, this is actually pretty easy to accomplish with a little planning and good architectural practices. I’ll walk you through this – in it’s simplest form – and this code will also be the base for a line of additional articles that I’ve been working on and have planned. In fact, I plan to walk through creating a StreamInsight application step by step and discussing the technical details as we go so, by the end, you’ll have walked with me to build a simple StreamInsight application. Oh … and let me make one thing clear though. I’ve heard it said (from Microsoft field technical sales-type folks) that “you don’t have to write adapters anymore.” That’s not entirely accurate. You don’t have to use the “legacy” adapter API anymore. But you will have to write code to get data in to and out of StreamInsight. You may use a different API now (IObserver/IObservable) but the necessity of writing code for this task is still there. Call it what you want ‘cuz now we’re just arguing semantics. Sources/Input Adapters Source is the new name for input adapter. With StreamInsight 2.1, you’ll want to implement IObservable. I won’t go over the basics; you’ll find them in the documentation. But there are some things that you need to consider that aren’t in the documentation. First … CTI's (and I’ll be talking more about these in a future post or two). While you can use AdvanceTimeSettings to have StreamInsight generate your CTIs for you, this isn’t always appropriate to do. So we need to have a way to have CTIs created and enqueued by the adapters … and it needs to be optional. So we’ll start with an wrapper class that allows a data source to create both events (with payloads) and CTIs. This will be our StreamInputEvent class. It will have all of the appropriate temporal properties and event shape metadata as well as the payload. StreamInputEvent public class StreamInputEvent<TPayloadType>     {         public TPayloadType Payload { get; set; }         public DateTimeOffset Start{ get; set; }         public DateTimeOffset End{ get; set; }         public EdgeType EdgeType{ get; set; }         public EventKind EventKind{ get; set; }     } We’ll also add some helper methods to create the different event shapes as well as a constructor or two: Helpers & Constructors public PointEvent<TPayloadType> GetPointEvent() {     if (this.EventKind == EventKind.Insert)     {         return PointEvent<TPayloadType>.CreateInsert(this.Start, Payload);     }     return PointEvent<TPayloadType>.CreateCti(Start); }   //More helpers for Edge and Interval   public StreamInputEvent(DateTimeOffset ctiDateTime) {     EventKind = EventKind.Cti;     Start = ctiDateTime;     EdgeType = EdgeType.Start;     End = DateTimeOffset.MaxValue;     Payload = default(TPayloadType); }   public StreamInputEvent(TPayloadType payload) {     Start = DateTimeOffset.MinValue;     End = DateTimeOffset.MaxValue;     EdgeType = default(EdgeType);     Payload = payload;     EventKind = EventKind.Insert; }   public StreamInputEvent(TPayloadType payload, DateTimeOffset startTime, DateTimeOffset endTime, EdgeType edgeType) {     Start = startTime;     End = endTime;     EdgeType = edgeType;     Payload = payload;     EventKind = EventKind.Insert; } Now, you’ll note that there isn’t an EventShape specified. The shape can be determined from the event times and we’ve found it useful to have data sources that can adapter to whatever shape we deem fit for a particular use case. But we’ll still have a factory (regardless of how we do it) that allows us to limit a particular data source to specific shapes. And the helpers make it really, really easy when we create the IQStreamable for the temporal processing. Now that we have that done, let’s take a step back and look at this. We have a data source of some type and we need to get the data into StreamInsight. The wonderful thing about StreamInsight 2.1’s focus on IObservable is that he lets use really step back for a second and look at how we produce data. In fact, it allows use to “hide” our producers behind a standard interface … and that’s what we need to do. At the lowest level, the code closest to the data source, we’ll create a “data producer” that takes the data and publishes it via IObservable – so there’s our ‘native’ 2.1 interface. To support the legacy adapter model, we will just need to subscribe to this with the concrete Input Adapter class. So let’s get started by creating a base EventProducer class to handle subscribers and disposal. We’ll also need to make sure that we can specify the configuration for the event producer – just like we were forced to do with the legacy adapter model. The reality is simple … unless you are doing a really, incredibly simple demo, you need to have a way to specify configuration. Having a configuration class that is passed to the event producer from the application just makes sense. StreamEventProducer public abstract class StreamEventProducer<TPayloadType, TConfigType>     : IObservable<StreamInputEvent<TPayloadType>>, IDisposable {     public TConfigType Configuration { get; protected set; }       protected EventProducerBase(TConfigType configuration)     {         this.Configuration = configuration;     }       protected abstract void Start();       private IObserver<StreamInputEvent<TPayloadType>> _observer;       public virtual IDisposable Subscribe(IObserver<StreamInputEvent<TPayloadType>> observer)     {         this._observer = observer;         Start();         return this;     }       protected void PublishException(Exception ex)     {         _observer.OnError(ex);     }       protected void PublishEvent(StreamInputEvent<TPayloadType> newEvent)     {         _observer.OnNext(newEvent);     }       protected virtual void Dispose(bool disposing)     {         if (disposing)         {             _observer.OnCompleted();         }     } } If you look at this, you’ll see that it’s pretty simple. You can certainly get more sophisticated with this - by supporting multiple observers to share event producers between multiple sources and/or adapters – but I want to keep it simple and clear. Our concrete event producer will inherit from this class, implement Start() (called when a subscriber actually subscribes) and call the PublishException and PublishEvent as appropriate. Our event producer will also need want to override Dispose(bool disposing) to get notification if when it’s time to shut down. So let’s go ahead now and create our first event producer. For our sample here, it will be a very simple random data generator because, of course, everyone needs to analyze completely random data in StreamInsight. Well, no of course not, but it’s easy and simple. We’ll start with the configuration class. TestDataInputConfig [DataContract()] public class TestDataInputConfig {       public TestDataInputConfig()     {         RefreshInterval = TimeSpan.FromMilliseconds(500);         NumberOfItems = 10;         TimestampIncrement = TimeSpan.FromMinutes(5);         StartDateTime = DateTimeOffset.Now.AddMonths(-5);     }       [DataMember]     public DateTimeOffset StartDateTime { get; set; }       [DataMember]     public TimeSpan RefreshInterval { get; set; }       [DataMember]     public TimeSpan TimestampIncrement { get; set; }       [DataMember]     public int NumberOfItems { get; set; }      } Note that it is marked as DataContract and each property is marked with the DataMember attribute … this is very important. The configuration classes may – and probably will - end up being serialized. This was also true in the adapter model; all of your configuration classes need to be able to be serialized. Now you can also use Serializable() but I’ve found that, at times, the C# compiler goes somewhat insane when you do that and you get exceptions when serializing the configuration classes. Now for our payload class. We have an ItemId and a Value of type double and, since this class is for testing and we want to be able to “see” what’s going on (and for other reasons), we also have some properties on here to help with that – RunNumber (which generation “run” created it) and EnqueueTimestamp (the time it was actually created, which may be different from the event’s start time). Finally, we also have a static helper method that’ll create a bunch of these test events for enqueuing for the “run”. TestDataEvent public class TestDataEvent {     private static Random rdm = new Random();     public static List<TestDataEvent> CreateNext(TestDataInputConfig config, int runNumber)     {         List<TestDataEvent> newReferenceData =             new List<TestDataEvent>(config.NumberOfItems);           for (int i = 0; i < config.NumberOfItems; i++)         {             newReferenceData.Add(new TestDataEvent()                     {                         ItemId = "Item" + i.ToString(),                         RunNumber = runNumber,                         EnqueueTimestamp = DateTime.Now,                         Value = rdm.NextDouble() * 10                     });         }                  return newReferenceData;     }       public string ItemId { get; set; }     public int RunNumber { get; set; }     public DateTime EnqueueTimestamp { get; set; }     public double Value { get; set; } } Now that we have all of the foundational building blocks completed, we are finally ready to create our producer. For now, we’ll have our producer enqueue the CTIs after each “batch” of events; this keeps all of the events in the same “run” within the same CTI span. TestDataEventProducer public class TestDataProducer : StreamEventProducer<TestDataEvent, TestDataInputConfig>     {           private DateTimeOffset _nextStartTime;         private Timer _enqueueTimer;         private int _runNumber = 0;           public TestDataProducer(TestDataInputConfig config): base(config)         {             _enqueueTimer = new Timer(ProduceEvents, null, Timeout.Infinite, Timeout.Infinite);             this._nextStartTime = config.StartDateTime;         }           protected override void Start()         {             // Change the timer to start it.             _enqueueTimer.Change(TimeSpan.Zero, Configuration.RefreshInterval);         }           /// <summary>         /// Main driver to read events from source and enqueue them.         /// </summary>         private void ProduceEvents(object state)         {             _runNumber++;               var newEvents =                 TestDataEvent.CreateNext(Configuration, _runNumber);               var eventTimestamp =  _nextStartTime;               var publishEvents = (from e in newEvents                                  select new StreamInputEvent<TestDataEvent>(e)                                             {                                                 Start = eventTimestamp                                             }).ToList();               foreach (var publishedEvent in publishEvents)             {                 this.PublishEvent(publishedEvent);             }               //Enqueue our CTI             this.PublishEvent(new StreamInputEvent<TestDataEvent>(eventTimestamp.AddTicks(1)));             _nextStartTime += Configuration.TimestampIncrement;           }           protected override void Dispose(bool disposing)         {             if (disposing)             {                 if (_enqueueTimer != null)                 {                     _enqueueTimer.Dispose();                     _enqueueTimer = null;                 }             }             base.Dispose(disposing);         }         } At this point, we are actually ready to start using the StreamInsight 2.1 APIs to get data from our new producer! Of course, we aren’t done but let’s see how we could do this. It’s there that the GetPointEvent() helper method is really, really handy. It makes the code very readable and it happens to be highly reusable as well, both of which are Very Good Things™. Running the Process ... var config = new TestDataInputConfig (){     NumberOfItems=20,     RefreshInterval=TimeSpan.FromMilliseconds(500) };   var data = cepApplication.DefineObservable(     () => new TestDataProducer(config)).ToPointStreamable(e => e.GetPointEvent());     var sink = cepApplication.DefineObserver(() => Observer.Create<TestDataEvent>(     e => Console.WriteLine("TestEvent ItemId:{0} Run:{1} Value{2}", e.ItemId, e.RunNumber, e.Value)));   data.Bind(sink).Run(); And when we do that, we get our data flowing. You can download the code from my SkyDrive.

Importing CTIs with IQStreamable/IQbservable in StreamInsight 2.1

Code Sample | StreamInsight
This came up on the StreamInsight forum … actually from one of the guys on my team. This is pretty important … since StreamInsight syncs to the slowest stream, if you have a slow-moving reference stream (as described so well here), you need to import the CTIs from your faster data stream into the slower stream to keep things moving along as the proper pace. We use this pattern all the time and ran into this when we were converting an existing demo to use the 2.1 model. Here’s what you need to do. First, you have your reference stream implemented and exposed as a simple Observable … you can use the .NET 4.0 IObservable interface for this; no special thing is needed. Your data stream should be exposed as an IQbservable or IObservable (you can get to IQStreamable from there) and the IObservable should include the CTIs. This means that your data source for the data stream needs to enqueue CTIs … you won’t really be able to use AdvanceTimeImportSettings.IncreasingStartTime/StrictlyIncreasingStartTime (since the CTIs aren’t exposed) nor will you be able to return as an IQStreamable and then switch it to a IQbservable (you’ll only get the payload). The first thing that we’ll need to do is to create a wrapper class that has both the payload and the temporal information and can also represent a CTI. Here’s an example: Typed Event Class     public TypedEvent<TPayloadType> GetEvent(EventShape eventShape)     {         switch (eventShape)         {             case EventShape.Interval:                 return GetIntervalEvent();                              case EventShape.Edge:                 return GetEdgeEvent();                              case EventShape.Point:                 return GetPointEvent();                              default:                 throw new ArgumentOutOfRangeException("eventShape");         }     }       public PointEvent<TPayloadType> GetPointEvent()     {         if (!this.IsCti)         {             return PointEvent<TPayloadType>.CreateInsert(this.Start, Payload);         }         return PointEvent<TPayloadType>.CreateCti(Start);     }         public IntervalEvent<TPayloadType> GetIntervalEvent()     {         if (!this.IsCti)         {             return IntervalEvent<TPayloadType>.CreateInsert(this.Start, this.End, Payload);         }         return IntervalEvent<TPayloadType>.CreateCti(Start);     }       public EdgeEvent<TPayloadType> GetEdgeEvent()     {         if (this.IsCti)         {             return EdgeEvent<TPayloadType>.CreateCti(this.Start);         }           if (this.EdgeType == EdgeType.Start)         {             return EdgeEvent.CreateStart(this.Start, this.Payload);         }         return EdgeEvent.CreateEnd(this.Start, this.End, this.Payload);     }       public PublishedEvent(DateTimeOffset ctiDateTime)     {         IsCti = true;         Start = ctiDateTime;         EdgeType=EdgeType.Start;         End = DateTimeOffset.MaxValue;         Payload = default(TPayloadType);     }       /// <summary>     /// Initializes a new instance of the <see cref="PublishedEvent&lt;TEventType&gt;"/> struct.     /// </summary>     /// <param name="payload">The Payload.</param>     /// <remarks>          ///This sets the start to <see cref="DateTimeOffset">DateTimeOffset.MinValue</see>,          ///the end to <see cref="DateTimeOffset">DateTimeOffset.MaxValue</see>          ///and the edge type to the default value.<br/>          ///The subscriber is responsible for explicitly setting these parameters.     /// </remarks>     public PublishedEvent(TPayloadType payload)     {                    Start = DateTimeOffset.MinValue;         End = DateTimeOffset.MaxValue;         EdgeType = default(EdgeType);         Payload = payload;         IsCti = false;     }       /// <summary>     /// Initializes a new instance of the <see cref="PublishedEvent&lt;TEventType&gt;"/> struct.     /// </summary>     /// <param name="payload">The event payload.</param>     /// <param name="startTime">The start time.</param>     /// <param name="endTime">The end time.</param>     /// <param name="edgeType">Type of the edge.</param>     /// <remarks>          ///Use when the event publisher "knows" the appropriate start time, end time and/or the edge type.          ///<br/>          ///This is useful when publishing Payload that carry this information.     /// </remarks>     public PublishedEvent(TPayloadType payload, DateTimeOffset startTime, DateTimeOffset endTime, EdgeType edgeType)     {         Start = startTime;         End = endTime;         EdgeType = edgeType;         Payload = payload;         IsCti = false;     }       /// <summary>     /// Gets or sets the Payload.     /// </summary>     /// <value>     /// The Payload.     /// </value>     public TPayloadType Payload;     public DateTimeOffset Start;     public DateTimeOffset End;     public EdgeType EdgeType;     public bool IsCti;   } There are “helper” methods on the class to create the appropriate events from the wrapper for convenience. It makes the ToStreamable method a little simpler and clearer. Now that we have that, we need to define an event publisher. This exposes IObservable and provides the events to StreamInsight. It’s pretty simple and, like an adapter, has a configuration that is passed in. In our case, the configuration has the number of items to create and a refresh interval … how frequently new events are created. This also allows us to use one publisher for both the fast and slow moving events. Configuration class [DataContract()] public class TestDataInputConfig {       public TestDataInputConfig()     {         RefreshInterval = TimeSpan.FromMilliseconds(500);         NumberOfItems = 10;     }       [DataMember]     public string Name { get; set; }       [DataMember]     public TimeSpan RefreshInterval { get; set; }       [DataMember]     public int NumberOfItems { get; set; }   } In the sample, this is a test data generator … it creates a payload that contains an Item Id, a run number (which iteration of the input) and a string so that we know a) where it came from and b) some payload info. Again, very simple but helps us validate that we are getting the results that we expect. And by implementing IObservable, we can publish this to StreamInsight using the new DefineObservable method. Since each published event has the payload and temporal header information, it’s easy to then convert the IQbservable to a IQStreamable and get it going with temporal properties. Test Data Event public class TestDataEvent {     public static List<TestDataEvent> CreateNext(TestDataInputConfig config, int runNumber)     {         List<TestDataEvent> newReferenceData =             new List<TestDataEvent>(config.NumberOfItems);           for (int i = 0; i < config.NumberOfItems; i++)         {             newReferenceData.Add(new TestDataEvent()                                      {                                          Source = config.Name,                                          ItemId = "Item" + i.ToString(),                                          RunNumber = runNumber,                                          EventText = "Text for Item " + i.ToString() + " Run Number " + runNumber                                      });         }         return newReferenceData;     }       public string Source { get; set; }     public string ItemId { get; set; }     public int RunNumber { get; set; }     public string EventText { get; set; }   } The publisher implements IObservable and creates events based on a timer. And it doesn’t start the timer until Subscribe is called … if we start producing them in the constructor, we’ll miss the first round of events and we don’t want to do that. Data Publisher public class TestDataPublisher:     IObservable<PublishedEvent<TestDataEvent>>,     IDisposable {       private Timer _enqueueTimer;     private int _runNumber = 0;     private readonly TestDataInputConfig _config;       private readonly Dictionary<string, IObserver<PublishedEvent<TestDataEvent>>>         _instances = new Dictionary<string, IObserver<PublishedEvent<TestDataEvent>>>(50);          public TestDataPublisher(TestDataInputConfig config)     {         _config = config;         _enqueueTimer = new Timer(ProduceEvents, null, Timeout.Infinite, Timeout.Infinite);     }       /// <summary>     /// Main driver to read events from source and enqueue them.     /// </summary>     private void ProduceEvents(object state)     {         _runNumber++;         var newEvents =             TestDataEvent.CreateNext(_config, _runNumber);           var publishEvents = (from e in newEvents                              select new PublishedEvent<TestDataEvent>(e)                                  {                                      Start = DateTimeOffset.Now                                  }).ToList();           foreach (var observer in _instances.Values.AsParallel())         {             foreach (var publishedEvent in publishEvents)             {                 observer.OnNext(publishedEvent);             }             observer.OnNext(new PublishedEvent<TestDataEvent>(DateTimeOffset.Now.AddTicks(1)));         }              }         /// <summary>     /// Notifies the provider that an observer is to receive notifications.     /// </summary>     /// <returns>     /// A reference to an interface that allows observers to stop receiving notifications before the provider has finished sending them.     /// </returns>     /// <param name="observer">The object that is to receive notifications.</param>     public IDisposable Subscribe(IObserver<PublishedEvent<TestDataEvent>> observer)     {         string instanceId = Guid.NewGuid().ToString();         _instances.Add(instanceId, observer);         observer.OnNext(new PublishedEvent<TestDataEvent>(DateTimeOffset.Now.AddMonths(-5)));         if(_instances.Count == 1)         {             _enqueueTimer.Change(TimeSpan.FromSeconds(0), _config.RefreshInterval);                }         return Disposable.Create(() => Unsubscribe(instanceId));     }       public void Unsubscribe(string id)     {         _instances.Remove(id);     }       #region Disposable Pattern     // (removed for brevity)     #endregion } The magic, though, happens in the AdvanceTimeImporter. It’s kind of a subject except that it observes both streams. It produces a single observable that has events from the event stream (with reference data) and the CTI stream. For the CTI stream, it is just an IObservable of DateTimeOffsets … so the CTIs don’t necessarily need to come from the moving data stream; they could be generated on a “clock”. The event stream doesn’t need to publish CTIs but, if it does, we exclude them (they’d just muddy the waters). We also have the option (as with the AdvanceTimeImportSettings) to adjust or drop events from the event stream that violate CTI rules. However, there’s on little implementation exception … when the value is Adjust, we adjust all of the event start times, not just those intervals that cross into the CTI span, ensuring that all of our data events get into the output (merged) stream. Advance Time Importer //NOTE: need to put some thought around how to handle OnCompleted/OnError from the observables. public class AdvanceTimeImporter<TDataStreamEventType>:IDisposable,     IObservable<PublishedEvent<TDataStreamEventType>> {       private IObservable<PublishedEvent<TDataStreamEventType>> _dataStreamObservable;     private IObserver<PublishedEvent<TDataStreamEventType>> _observer;     private readonly IDisposable _eventStreamDisposable, _ctiStreamDisposable;     private readonly TimeSpan _dataStreamRefreshInterval;     private readonly AdvanceTimePolicy _importPolicy;       public AdvanceTimeImporter(         IObservable<PublishedEvent<TDataStreamEventType>> dataStream,         IObservable<DateTimeOffset> ctiStream,         AdvanceTimePolicy importPolicy)     {         _dataStreamObservable = dataStream;         _ctiStreamDisposable = ctiStream.Subscribe(NextCtiItem);         _importPolicy = importPolicy;         _eventStreamDisposable = _dataStreamObservable.Where(e=> !e.IsCti ).Subscribe(NextDataItem);     }       private DateTimeOffset _lastCtiTimestamp = DateTimeOffset.MinValue;       private void NextDataItem(PublishedEvent<TDataStreamEventType> dataEvent)     {         if(dataEvent.IsCti )         {             //Don't publish the CTIs from the data stream.             return;         }         //Check for CTI violations but not for edge end.         if (dataEvent.Start >= _lastCtiTimestamp && dataEvent.EdgeType != EdgeType.End)         {             PublishOnNext(dataEvent);             System.Diagnostics.Debug.WriteLine("Data ... no CTI Violation");         }         else if (_importPolicy == AdvanceTimePolicy.Adjust)         {                //Adjust ... but not for end edges.             if (dataEvent.Start <= _lastCtiTimestamp)             {                 dataEvent.Start = _lastCtiTimestamp.AddTicks(1);             }             PublishOnNext(dataEvent);             System.Diagnostics.Debug.WriteLine("Data ... adjusted for CTI Violation");         }     }       public IDisposable Subscribe(IObserver<PublishedEvent<TDataStreamEventType>> observer)     {         _observer = observer;         if (_importPolicy == AdvanceTimePolicy.Adjust)         {             //add a starter CTI.             NextCtiItem(_lastCtiTimestamp);         }           //Release our reference to the observable.         _dataStreamObservable = null;         return this;     }       private void NextCtiItem(DateTimeOffset cti)     {         if(cti >= _lastCtiTimestamp)         {             _lastCtiTimestamp = cti;             PublishOnNext(new PublishedEvent<TDataStreamEventType>(cti));         }     }          private void PublishOnNext(PublishedEvent<TDataStreamEventType>  publishedEvent)     {         if (_observer != null)         {             _observer.OnNext(publishedEvent);         }     }     #region IDisposable implementation     // (removed for brevity)     #endregion } That’s really it. I also created an extension method for IObservable that makes this a bit simpler and clearer to call. One thing that I do in the extension method is to use the Reactive Extensions Replay subject to make sure that latecomers to the Observable get the last batch of data that was published. Implementing the queries is pretty straightforward now. Import CTI Queries var inputConfig = new TestDataInputConfig()     {         Name = "DataStream",         RefreshInterval = TimeSpan.FromMilliseconds(500)     }; var referenceConfig = new TestDataInputConfig()     {         Name = "ReferenceStream",         RefreshInterval= TimeSpan.FromSeconds(15)     };   var dataSource = myApp.DefineObservable(() => new TestDataPublisher(inputConfig));   var ctis = dataSource.Where(e => e.IsCti).Select(e => e.Start);   var referenceStream = myApp.DefineObservable(() =>         new TestDataPublisher(referenceConfig).ImportCTIs(ctis, referenceConfig.RefreshInterval,         AdvanceTimePolicy.Adjust))     .ToIntervalStreamable(e => e.GetIntervalEvent());   var dataStream = dataSource.ToPointStreamable(e => e.GetPointEvent());   var outputStream = from d in dataStream                    from r in referenceStream                         .AlterEventDuration(e=> TimeSpan.MaxValue)                         .ClipEventDuration(referenceStream, (r1,r2) => r1.ItemId == r2.ItemId)                    where d.ItemId == r.ItemId                    select new ResultClass()                               {                                   DataRunNumber = d.RunNumber,                                   ItemId = d.ItemId,                                   ReferenceString = r.EventText,                                   DataSource = d.Source,                                   ReferenceSource = r.Source                               };   var outputDataSink = GetOutputDataSink(myApp); outputStream.Bind(outputDataSink).Run("AdvanceTimeImport.Test"); In the sample, I’m using the old-school Tracer Adapter defined as a sink … that’s easy enough to change if you want to. Here’s the sample:

Listing installed StreamInsight instances …

StreamInsight | Code Sample
This is one of those things that I’ve always had some manner of difficulty with as I move between computers writing StreamInsight code. You see, I have … let’s see … four different computers that I write code on and I will move projects between them. (Yes, I do have an instance of TFS running at home for some personal stuff, which certainly makes things easier as I hop between the computers.) The problem, of course, boils down to remembering what instances of StreamInsight are installed and what versions each instance is. Sure, I can go to Add/Remove Programs and do the maintenance setup for StreamInsight to see what instances are there … but that’s kinda painful. So, after some digging, I discovered that all of the instances are listed in the registry.   So … as any developer would do, I fired up Visual Studio and threw together a little app that would list all of the installed instances of StreamInsight, what edition they are, what version and what architecture (x86 or x64). It was really quite simple, actually. Now … I’m not going to say that the UI is the prettiest thing in the world (it’s not) but it does work and it does what it needs to do. Below is a screen shot: You can download the project from my SkyDrive. All you need is Visual Studio to build … there are no dependencies on StreamInsight. It’s all read from the registry.

Query Logic and Operator Reuse in StreamInsight (DQC)

StreamInsight | Code Sample
Way back in 2010, Mark Simms posted a blog entry called StreamInsight: Understanding dynamic query composition where he detailed how you can use dynamic query composition (DQC) to prevent multiple instantiation of your adapters. And yes, it is important … very important … for this reason but there is a more to the story here that Mark didn’t cover. Let’s start, however, with a set of requirements for our StreamInsight queries. First, we need to have a data source. For our example, we’ll be using a random number generator but that’s really irrelevant. But the data source gets it started. Next, we need to calculate a 10-second rolling average every 2 seconds. This is easy enough … that’s a hopping window. The last step is to take the results of our hopping window aggregate and calculate the difference between the averages with every hop … providing a rate-of-change for the calculated average. Again, this is a pretty common, well-known pattern that’s in the StreamInsight samples for LinqPad called “FoldPairs” (Alter/Clip/Shift). Each one of these steps needs to be a query since we need to send this to an output adapter. So let’s get started. We’ve read Mark’s article and take it to heart so we’ll use DQC with the first, source query to make sure that there is only one instance of the input adapter. var initialStream = CepStream<GeneratedEvent>.Create(cepApplication, "generatedStream", typeof(GeneratorFactory), config, EventShape.Point); Query initialQuery = CreateAndStartQuery<GeneratedEvent>( initialStream, "initialQuery", "Initial query from generator", EventShape.Point, csvConfig, cepApplication); //Taking this query and converting to a stream allows us to build on it further. //This is called Dynamic Query Composition (DQC) var sourceStream = initialQuery.ToStream<GeneratedEvent>("sourceStream"); Our next step is to create the hopping window for our aggregate, using sourceStream as the basis. var hoppingWindowStream = from s in sourceStream.AlterEventDuration(e=> TimeSpan.FromTicks(1)) group s by s.DeviceId into aggregateGroup from item in aggregateGroup.HoppingWindow( TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(2), HoppingWindowOutputPolicy.PointAlignToWindowEnd) select new AggregateItem { DeviceId = aggregateGroup.Key, Average = item.Avg(e => e.Value), Count = item.Count(), Sum = item.Sum(e => e.Value) }; //Create the query and send to output adapter. Query hoppingWindowQuery = CreateAndStartQuery<AggregateItem>(hoppingWindowStream, "hoppingWindowAggregate", "Hopping aggregate query from generator", EventShape.Point, csvConfig, cepApplication); Now we need to calculate the rate-of-change for the hopping aggregate. Since we’ve already got the hoppingWindowStream, we can just use that. var aggregateDifferenceSourceStream = hoppingWindowStream; var aggregateDifference = from current in aggregateDifferenceSourceStream from previous in aggregateDifferenceSourceStream .AlterEventDuration(e => TimeSpan.MaxValue) .ClipEventDuration(aggregateDifferenceSourceStream, (e1, e2) => e1.DeviceId == e2.DeviceId) .ShiftEventTime(e => TimeSpan.FromTicks(1)) where current.DeviceId == previous.DeviceId select new ItemDifference() { CurrentValue = current.Average, DeviceId = current.DeviceId, PreviousValue = previous.Average, Difference = current.Average - previous.Average }; Query aggregateDifferenceQuery = CreateAndStartQuery(aggregateDifference, "AggregateDifference", "Difference between two aggregate values", EventShape.Point, csvConfig, cepApplication); When we run this, we get the results that we expect. Pretty straightforward, right? Well … not exactly. Let’s take a look at the queries in the event flow debugger. First, we see the hopping window stream and the grouping and aggregating operators. It’s pretty simple and straightforward. Now, let’s take a look at the query for the rate-of-change in the aggregates. It’s a bit longer and, if you look at the area that’s highlighted in red, it has the exact same operators that we see in the aggregate query. So … the operators that did the group and aggregate are actually repeated for both queries! What’s going on here? What you need to understand is that the operator tree for a StreamInsight query isn’t built until you call ToQuery() … and the entire operator tree is built! If your tree goes all the way back to the source, you’ll get two instances of the same input adapter, as Mark described in his blog. But you’ll also get any other operators repeated. In our little sample here, it’s really not a big deal but in a large application, this can lead to some pretty substantial overhead. Using DQC, you can reduce this overhead and reuse the results of operators without having them rebuilt in the entire tree. When you get to a result that then reused with different streams, you can create a query from it, convert the query back to a stream with ToStream() and then write your additional Linq expressions. You do not need to have an output adapter with every query either … that is actually optional.  Here’s how the code for the aggregate difference query would like using DQC: var aggregateDifferenceSourceStream = hoppingWindowQuery.ToStream<AggregateItem>(); var aggregateDifference = from current in aggregateDifferenceSourceStream from previous in aggregateDifferenceSourceStream .AlterEventDuration(e => TimeSpan.MaxValue) .ClipEventDuration(aggregateDifferenceSourceStream, (e1, e2) => e1.DeviceId == e2.DeviceId) .ShiftEventTime(e => TimeSpan.FromTicks(1)) where current.DeviceId == previous.DeviceId select new ItemDifference() { CurrentValue = current.Average, DeviceId = current.DeviceId, PreviousValue = previous.Average, Difference = current.Average - previous.Average }; Query aggregateDifferenceQuery = CreateAndStartQuery(aggregateDifference, "AggregateDifference", "Difference between two aggregate values", EventShape.Point, csvConfig, Note that the only difference here (in bold) is that we use convert the hoppingWindowQuery back into a stream and use that for our source. The difference, however, in the tree of query operators is telling though. I’ve outlined in red where the aggregate query source now is imported from the published query with the results of the aggregates. The operators that are unique to this query – those that calculate the rate of change in the rolling average – are still here. The operators that are already necessary for a previous query, however, are not. This is a very simple example, to be sure, and any benefits probably wouldn’t be noticeable except at a very high volume. However, when you have more complex queries (as you would in the real world), the difference can be huge. There is, however, one pretty big limitation that you need to be aware of when using DQC … you can’t use checkpoints (high availability) when your source is from a published query (DQC). So if you need this, then you need to very carefully plan how and where you use DQC, how you get your source data and how you compose your queries.

StreamInsight Output Adapter Lifetime

Code Sample | StreamInsight
[Download Sample Code] I know it’s been a while since I’ve posted … things have been pretty busy here at the office with a major POC that we were doing. Since that’s just about wrapped up, I’m going to get back to blogging. My goal is to have a new post every week. Most, of course, will be on StreamInsight but I’ve got some other things that I’m thinking about as well. For this post, I’ll be talking about the lifetime of a StreamInsight output adapter. There is some documentation on this on MSDN but it doesn’t have all of the details … and the samples provided in the StreamInsight product team samples don’t always work the way you would want in a real production application. I will detail why that’s the case and what you can do to make your adapters more robust. Adapters … both input and output … are probably the most difficult thing to get exactly right in StreamInsight. This is due to the heavily multithreaded and asynchronous model of StreamInsight – and that’s something that developers have always struggled with to get right. Humans have a hard time really visualizing the code paths in multithreaded applications; our brains just don’t work that way. Now, back in the days when most processors were single-threaded, it wasn’t quite as difficult to do synchronization properly. In those days, we may spawn multiple threads but only one thread at a time was scheduled to run on the CPU … they weren’t truly concurrent. However, first with hyperthreading and now with multi-core processors (not to mention hyper-threaded multicore processors), multiple threads may very well be executing at the exact same instant. Of course, the exact timing is non-deterministic, which makes it that much more challenging to debug. They are Heisenbugs. There are two major symptoms that you have your adapter lifetime wrong. You’ll either get random “ObjectDisposedExceptions” when shutting down or shutdown of StreamInsight will hang for a while. We will talk about why this happens. We’ll also be using the sample Sql Output Adapter from the product team samples since it does show some less-than-friendly shutdown behavior. By the way, the testing was done with the Sql Server output adapter from the product team samples on CodePlex but using the Simple StreamInsight app sample on MSDN code samples. First, the easy stuff … Startup Your output adapter is started when the query that it is bound to is started. In fact, your adapter is created when the query is started … not when ToQuery() is called. Also, keep in mind that the call to Query::Start is asynchronous … it occurs on a separate thread from Query::Start(). Still, it is very important that you limit your constructor and start methods to the bare minimum amount of code that you need there and then spawn a new thread to do your actual dequeue processing. If, for some insane reason, you immediately go into a while(true){ } loop block in your Start, Query::Start will return but the query will, for all intents and purposes, be hung in limbo. In fact, I just tried this by putting an infinite loop with a sleep inside an adapter’s Start method and the query’s Start method still returned. However, the StreamInsight management service – and, therefore the debugger – will hang. Below is a little table from some logging code that shows the order of events on query and output adapter startup and includes the managed thread id. Note that we have the “Started Query” message while the adapter is still in the Start method and we even get to creating the next query before start actually exits. It’s also worthwhile to note the number of different threads being used in the process. The factory’s create is called on a separate thread and then the adapters start is called on still a different thread. The call to consume events is synchronous in the start … though I’m not sure if that’s actually necessary to do. Later calls to consume events are on different threads. Source Query ThreadId Time Message MAIN initialQuery 9 1418 Creating query MAIN initialQuery 9 848464 Created query MAIN initialQuery 9 905283 Starting query SqlOutputAdapterFactory initialQuery 18 1944604 Creating adapter SqlOutputPoint initialQuery 18 1961683 Constructor called SqlOutputPoint initialQuery 18 1972490 Constructor exit SqlOutputAdapterFactory initialQuery 18 1980381 Created adapter SqlOutputPoint initialQuery 21 2134443 Start called SqlOutputAdapter initialQuery 21 2149574 Start Enter. MAIN initialQuery 9 2158461 Started query MAIN hoppingWindowAggregate 9 2235857 Creating query SqlOutputAdapter initialQuery 21 2536294 ConsumeEvents Enter SqlOutputAdapter initialQuery 21 2553473 ConsumeEvent Exit - Queue is empty. Items dequeued: 0 - CTIs dequeued:0 SqlOutputAdapter initialQuery 21 2558886 Start Exit SqlOutputPoint initialQuery 21 2567542 Start exit Dequeuing Events This is the core of your output adapter … after all, if you don’t dequeue your events, there’s no purpose to having an output adapter, is there? So your code may look something like the following (taken from the Sql Output Adapter sample): /// <summary> /// Dequeues each event from StreamInsightReads, and writes it as a row into the SQL sink /// </summary> private void ConsumeEvents() { TEvent currentEvent = default(TEvent); while (true) { try { // if the engine asked the adapter to stop if (this.outputAdapter.AdapterState == AdapterState.Stopping) { // clean up state this.Cleanup(); // inform the engine that the adapter has stopped this.outputAdapter.Stopped(); // and exit worker thread return; } // Dequeue the event DequeueOperationResult result = this.outputAdapter.Dequeue(out currentEvent); // if the engine does not have any events, the adapter is Suspended; so do this .. if (result == DequeueOperationResult.Empty) { // inform the engine that adapter is ready to be resumed this.outputAdapter.Ready(); // exit the worker thread return; } // write out event to output table this.CreateRowFromEvent(currentEvent); } finally { // IMPORTANT: Release the event always if (currentEvent != null) { this.outputAdapter.ReleaseEvent(ref currentEvent); } } } } Events will come in “bunches” based on your CTIs; in fact, events are only “released” to the output adapter when there is a CTI. Exactly how this works varies by the event shape. For point events, if the start time is within the last CTI span (time span between CTIs), it will be released to the output adapter. For an interval, the event is released to the output adapter only when the end time is in the last CTI span. (This creates some pretty serious complications if you want to have intervals go from StreamInsight to StreamInsight as intervals. In fact, it’s pretty much impossible to do it correctly). For an edge, you get two events in the output adapter. The start edge is released when the start time is within the last CTI span. The end edge gets released when the end time is within last time. It is important to note that the events are not released one at a time as they pass through the engine but only when there is a CTI. From a functionality perspective, the CTI becomes a nice marker for doing batching to your output destination. In fact, in the Sql adapter that we have in our framework, we batch the events and then, on CTI, do a bulk insert into Sql Server of all the events from that CTI span rather than doing inserts one-at-a-time like the sample does. When you get DequeueOperationResult.Empty as your result, it’s time for your adapter to take a little nap … there is nothing left for it to do. At this point, you should exit whatever processing method that you have … so, return directly from ConsumeEvents, as you see in the adapter code above. Additional attempts to dequeue will be exercises in futility … there’s nothing there for you to dequeue. Before you exit your ConsumeEvents method, you need to make sure that you call Adapter::Ready to let the engine know that you are taking a break but are ready for more action when it is. This is also a good time to let go of any expensive resources that you may be holding or at least configure a timer to let go of them if you aren’t resumed within a period of time. That’s where Resume comes in to play. Resume is StreamInsight’s method to “wake up” your output adapter from its little snooze and start dequeuing events again because there are events there for you to dequeue. If you look at the code sample above, you’ll see that Ready() is called right after the result of the dequeue operation is “Empty”. You must do that. If you don’t call Ready() after an empty dequeue, StreamInsight will never call Resume() for you to resume dequeueing events. You may also find yourself in a situation where you never get Empty as the result of a dequeue operation and, because of that, you don’t actually need to worry about Resume(). But don’t be thinking that you’ve gotten off scott-free here. If you never empty your queue, it means that your output adapter isn’t keeping up with the events coming in from the query, which is a completely different issue in and of itself. You want to get to a point … sometime before the end of the application … where you have an empty queue and need to be resumed because, if that’s happening, you are keeping up with the events that are coming through from the StreamInsight engine. You will see this happening if you enable performance counters for the query and look at the “# Events in output queue” counter. This same counter is also at the application level and provides the total number of events in the output queues, regardless of whether the queries are instrumented for performance counters or not. The table below shows some of the logging events when I overloaded the output adapter with just too many events for it to keep up with: Source Query Thread Id Stopwatch Message SqlOutputPoint initialQuery 20 5284643 Resume called SqlOutputAdapter initialQuery 20 5294055 Resume called. SqlOutputAdapter initialQuery 20 5302828 ConsumeEvents Enter SqlOutputAdapter initialQuery 20 21442634 Still consuming events - 496 Inserts and 4 CTIs. SqlOutputAdapter initialQuery 20 36501939 Still consuming events - 991 Inserts and 9 CTIs. SqlOutputAdapter initialQuery 20 51355765 Still consuming events - 1486 Inserts and 14 CTIs. SqlOutputAdapter initialQuery 20 66693981 Still consuming events - 1981 Inserts and 19 CTIs. SqlOutputAdapter initialQuery 20 83590260 Still consuming events - 2476 Inserts and 24 CTIs. SqlOutputAdapter initialQuery 20 99175549 Still consuming events - 2971 Inserts and 29 CTIs. MAIN initialQuery 9 109556687 Stopping query SqlOutputAdapter initialQuery 20 113608268 Still consuming events - 3466 Inserts and 34 CTIs. SqlOutputAdapter initialQuery 20 116769647 ConsumeEvent Exit - Queue is empty. Items dequeued: 3600 - CTIs dequeued:36 SqlOutputPoint initialQuery 29 116769669 Resume called SqlOutputAdapter initialQuery 29 116817259 Resume called. SqlOutputAdapter initialQuery 29 116822197 ConsumeEvents Enter SqlOutputAdapter initialQuery 29 116827716 Cleanup enter SqlOutputPoint initialQuery 31 116770289 Stop called One thing that you may notice here is that the query started stopping while events were being dequeued. In fact, when the query is stopped, the input adapter is immediately stopped but the output adapter can continue to run until such time as it is able to completely empty its queue of events. Also, not that the output adapter's stop isn’t called until its queue is empty. This can make it appear that the application is hanging on shutdown when, in fact, it’s simply trying to finish dequeuing the events. And no, this isn’t the reason for the hanging shutdown symptom that I mentioned above … that’s a completely different animal. Shutdown Now this is where it gets really tricky. In fact, I would say that this is probably one of the toughest parts of StreamInsight development to get exactly right. First, you have two ways of knowing that it’s time to shutdown … you have your AdapterState (Stopping) and you have your Stop method. I’ve dug around in this stuff a bit in Reflector and, from what I could gather, there is no guarantee that the adapter state will be set to Stopping before the call to Adapter::Stop. From what I could see, the internal operation to set the property and the internal operation to call Adapter::Stop run on different threads. While I’ve never seen this to be the case, I’ve also not looked for it all that closely either. But … don’t rely on your adapter state to be Stopping when your Stop method is called. And, if it’s not, it may well change before you actually leave your Stop method. Now, let’s take a look at a couple of the log writes from shutdown time: Source Query Thread ID Stopwatch Message SqlOutputPoint hoppingWindowAggregate 30 55414039 Calling Stopped SqlOutputAdapter hoppingWindowAggregate 28 55363008 ConsumeEvents Enter SqlOutputAdapter hoppingWindowAggregate 28 55424376 Cleanup enter SqlOutputAdapter hoppingWindowAggregate 28 55429802 Cleanup exit SqlOutputAdapter hoppingWindowAggregate 28 55435118 ConsumeEvents Exit - Stopping SqlOutputPoint hoppingWindowAggregate 30 55441926 Dispose called - disposing = True SqlOutputAdapter hoppingWindowAggregate 30 55446727 Dispose called SqlOutputAdapter hoppingWindowAggregate 30 55451689 Cleanup enter SqlOutputPoint hoppingWindowAggregate 30 55459592 Stopped Called SqlOutputAdapter hoppingWindowAggregate 28 66790307 ConsumeEvent Exit - Exception in ConsumeEvent -> ObjectDisposedException:Cannot access a disposed object. MAIN snapshotWindowQuery 9 71025431 Stopping query SqlOutputAdapter initialQuery 27 71197933 ConsumeEvents Exit - Stopping SqlOutputPoint initialQuery 40 71274224 Dispose called - disposing = True SqlOutputAdapter initialQuery 40 71367034 Dispose called SqlOutputAdapter initialQuery 40 71372717 Cleanup enter SqlOutputPoint initialQuery 40 71378355 Stopped Called SqlOutputAdapter initialQuery 27 76953367 ConsumeEvent Exit - Exception in ConsumeEvent -> ObjectDisposedException:Cannot access a disposed object. SqlOutputAdapter snapshotWindowQuery 28 81816540 ConsumeEvents Exit - Stopping SqlOutputPoint snapshotWindowQuery 28 81823836 Dispose called - disposing = True SqlOutputPoint snapshotWindowQuery 42 81742317 Stop called SqlOutputAdapter snapshotWindowQuery 42 81836699 Cleanup enter SqlOutputPoint snapshotWindowQuery 42 81843947 Calling Stopped SqlOutputAdapter snapshotWindowQuery 28 81828773 Dispose called SqlOutputAdapter snapshotWindowQuery 28 81856597 Cleanup enter SqlOutputPoint snapshotWindowQuery 42 86922959 Exception in Stop -> ObjectDisposedException:Cannot access a disposed object. I’ve highlighted the exceptions from all three instances of the adapter. They are all the same type of exception – ObjectDisposedException – but two of them come from ConsumeEvent and one comes from Stop. I’ve done runs of this demo/sample several times and there is no consistency on which method is going to throw the object disposed exception. I can say, with certainty, that just about every time that I’ve run this adapter, I’ve gotten that same ObjectDisposedException from either ConsumeEvent or Stop. What’s happening is that the adapter is called Stopped() twice and, since calling Stopped() calls your IDisposable::Dispose method, the second time gives you an object disposed exception. In order to prevent this, you need to make sure that stopped is only called once. The challenge here is that Stop() and ConsumeEvents() are always going to be on different threads. So, you may say, just call Stopped in the Stop method and not in the ConsumeEvents method. While that would ensure that Stopped is only called once, you may still be inside ConsumeEvents() – which would cause another ObjectDisposedException should you access anything on the adapter base class – like Dequeue. OK, you say … let’s only called Stopped from our ConsumeEvents(). Well … your issue here may be that, if you aren’t getting any events in the pipeline and your adapter is paused waiting for a call to Resume(), Stopped() will never get called. And if Stopped() never gets called, your shutdown will hang while StreamInsight waits for your adapter to call Stopped(), signalling that it has completed everything necessary to clean up resources. Eventually … I think after 3 minutes … StreamInsight will terminate the adapter with extreme prejudice. And that’s the other symptom of getting your adapter lifetime wrong. So … let’s review what the lifetime order-of-events is for shutdown. First, Stop will be called and the adapter’s state will be set to stopping though both won’t necessarily happen at the same time. This provides the adapter with an opportunity to shut down any open resources and do any other clean up that may be necessary. When you are done with all of this and ready to be completely shut down, you must call Stopped() to notify the engine that you are all done cleaning up after yourself. So how do we fix this, now that we understand the problem? First, we need to make sure that Stopped() is only called once. We are guaranteed that Stop() will be called when all is done, so that’s where we’ll put the single line. Since Stop() is only called after the queue is empty and the query is stopped, that’s probably all that we need to do. But I like to put an extra layer of protection in place to make sure that we aren’t in the middle dequeue process when Stop() calls Stopped(). In order to do this, we’ll need to use some sort of thread synchronization mechanism. In this case, we’ll create a lock object that both methods use and then utilize C#’s lock block to make sure that only one of those methods will run at a time (remember … they are called on separate threads). This lock, on the output adapter, probably isn’t necessary but, like I said, it’s that extra level of protection against Heisenbugs. Here’s what our log looks like for shutdown of one of the adapters now: Source Query ThreadId Stopwatch Message SqlOutputAdapter snapshotWindowQuery 24 31526005 ConsumeEvent Exit - Queue is empty. Items dequeued: 206 - CTIs dequeued:2 SqlOutputPoint snapshotWindowQuery 28 31526014 Stop called SqlOutputPoint snapshotWindowQuery 26 31527422 Resume called SqlOutputAdapter snapshotWindowQuery 24 31537450 Consume event -> exiting lock SqlOutputAdapter snapshotWindowQuery 26 31545272 Resume called. SqlOutputPoint snapshotWindowQuery 28 31547511 Stop -> Entering lock SqlOutputAdapter snapshotWindowQuery 28 31551377 Cleanup enter SqlOutputAdapter snapshotWindowQuery 28 31553290 Cleanup exit SqlOutputAdapter snapshotWindowQuery 26 31549564 ConsumeEvents Enter SqlOutputPoint snapshotWindowQuery 28 31555071 Calling Stopped SqlOutputPoint snapshotWindowQuery 28 31558768 Dispose called - disposing = True SqlOutputAdapter snapshotWindowQuery 28 31560581 Dispose called SqlOutputAdapter snapshotWindowQuery 28 31562515 Cleanup enter SqlOutputAdapter snapshotWindowQuery 26 31557017 ConsumeEvents Exit - Stopping/Stopped SqlOutputPoint snapshotWindowQuery 28 31566929 Stopped returned SqlOutputPoint snapshotWindowQuery 28 31571420 Stop -> Exiting lock SqlOutputPoint snapshotWindowQuery 28 31573536 Stop Exiting There are no exceptions. There is no hang on shutdown except for any “hanging” while the output adapter empties the queue. The log above also clearly shows where the lock is entered and exited. You will notice that ConsumeEvents still gets called once during the shutdown process but exits immediately. How was this done? First, I created a lock object in SqlAdapter.cs … this is the single writer for the adapter. Then, while in the while(true) loop when dequeuing events, I still check for adapter state before every event is dequeued but, instead of calling Stopped() if the adapter is stopping, I simply exit – I let Stop() take care of that. Then I enter the lock which surrounds the entire dequeue process, ensuring that Stop() doesn’t call Cleanup() or Stopped() in the middle of the process. In the Point/Interval/Edge output adapters, there is a lock around the shutdown code in the Stop() method, ensuring that Cleanup() and Stopped() isn’t called while I’m in a dequeue process. Could we get fancier and use other thread synchronization mechanisms? Sure, but I’ve not seen any real performance benefit in it (we did test it) and … well … this method is simple, clear and easy to see what’s happening – it follows the KISS principle. Here’s what the updated code looks like: SqlOutputAdapter.cs: /// <summary> /// Lock object to ensure that we aren't calling Stopped() when inside ConsumeEvents /// </summary> public readonly object LockObject = new Object(); /// <summary> /// Dequeues each event from StreamInsightReads, and writes it as a row into the SQL sink /// </summary> private void ConsumeEvents() { TEvent currentEvent = default(TEvent); while (true) { // if the engine asked the adapter to stop // Note that this check is *outside* the lock if (this.outputAdapter.AdapterState == AdapterState.Stopping || this.outputAdapter.AdapterState == AdapterState.Stopped) { //exit worker thread return; } //Lock during our enqueue process but only then. //Note that we check and acquire the lock *only* when absolutely necessary. lock (LockObject) { try { // Dequeue the event DequeueOperationResult result = this.outputAdapter.Dequeue(out currentEvent); // if the engine does not have any events, the adapter is Suspended; so do this .. if (result == DequeueOperationResult.Empty) { // inform the engine that adapter is ready to be resumed this.outputAdapter.Ready(); // exit the worker thread return; } // write out event to output table this.CreateRowFromEvent(currentEvent); } catch (Exception ex) { return; } finally { // IMPORTANT: Release the event always if (currentEvent != null) { this.outputAdapter.ReleaseEvent(ref currentEvent); } } } } } SqlOutputPoint.cs: /// <summary> /// Notifies the adapter to stop as a result of stopping or aborting the query. /// </summary> public override void Stop() { try { //Ensure that we aren't dequeuing right now. lock (this.outputAdapter.LockObject) { this.outputAdapter.Cleanup(); this.Stopped(); } } catch (Exception ex) { //log the failure } } That’s all there is to it. I did remove the logging code that produced the tables above for clarity but you can download the revised Simple StreamInsight App with the fixed Sql Output adapter and scripts for the databases from my SkyDrive here.

Specifying CTIs with LinqPad

StreamInsight | Code Sample
I’ve said it before and I’ll say it again … LinqPad is an essential tool for anyone doing StreamInsight applications. And don’t just settle for the free version but get the Pro version (at least) since it has Intellisense. I’m not ashamed to admit that I am completely, totally addicted to Intellisense (which I find somewhat amusing at times because it annoyed me to no end when it first came out in VB 5.0 – but then I got used to it and descended into my current addiction). With that said (again), one thing that I’ve found a little … oh … less-than-perfect doesn’t have to do with LinqPad but with the way that all of the StreamInsight samples create the streams, which also happened to be how I was creating my streams. Until recently, that is. You see, AdvanceTimeSettings.IncreasingStartTime doesn’t always mirror how we are going to see data in the real world. It also doesn’t allow you to show how CTIs can be used to handle little issues like latency from the source data. To do that, you really need to specify your own CTIs so that you can control – and others can see - exactly where the CTI is issued in relation to the enqueued Insert events. You also can’t test/prototype query scenarios where you have multiple events with the same identifier in a single CTI span – or no events within a CTI span. Both of these scenarios can – and do – happen in the real world. But … and this depends on the adapter … you may want to handle CTI’s in your input adapter itself rather than relying on AdvanceTimeSettings. It turns out, however, that it’s really not that difficult. Let’s start with how we typically do it. First, we have some source data as an array and a function to create our event timestamp. Then we create the point stream from the array using ToPointStream (or ToIntervalStream or ToEdgeStream). Here’s a code example and the results from LinqPad: void Main() { Func<int, DateTimeOffset> t = (s) => new DateTimeOffset(2011, 1, 11, 8, 0, 0, TimeSpan.Zero).AddSeconds(s); var values = new [] { new {Item="Variable1", Value=92, Timestamp=0}, new {Item="Variable2", Value=60, Timestamp=0}, new {Item="Variable1", Value=93, Timestamp=2}, new {Item="Variable2", Value=75, Timestamp=2}, new {Item="Variable1", Value=88, Timestamp=3}, new {Item="Variable2", Value=81, Timestamp=3}, new {Item="Variable1", Value=93, Timestamp=5}, new {Item="Variable2", Value=82, Timestamp=5} }; var valueStream = values.ToPointStream(Application, e => PointEvent.CreateInsert(t(e.Timestamp), new {Item = e.Item, Value = e.Value}), AdvanceTimeSettings.IncreasingStartTime); valueStream.ToPointEnumerable().Dump("Results"); }   Results IEnumerable<PointEvent<>> (13 items) EventKind StartTime Payload Insert 1/11/2011 8:00:00 AM ø { Item = Variable1, Value = 92 } Item Variable1 Value 92 Cti 1/11/2011 8:00:00 AM null Insert 1/11/2011 8:00:00 AM ø { Item = Variable2, Value = 60 } Item Variable2 Value 60 Insert 1/11/2011 8:00:02 AM ø { Item = Variable1, Value = 93 } Item Variable1 Value 93 Cti 1/11/2011 8:00:02 AM null Insert 1/11/2011 8:00:02 AM ø { Item = Variable2, Value = 75 } Item Variable2 Value 75 Insert 1/11/2011 8:00:03 AM ø { Item = Variable1, Value = 88 } Item Variable1 Value 88 Cti 1/11/2011 8:00:03 AM null Insert 1/11/2011 8:00:03 AM ø { Item = Variable2, Value = 81 } Item Variable2 Value 81 Insert 1/11/2011 8:00:05 AM ø { Item = Variable1, Value = 93 } Item Variable1 Value 93 Cti 1/11/2011 8:00:05 AM null Insert 1/11/2011 8:00:05 AM ø { Item = Variable2, Value = 82 } Item Variable2 Value 82 Cti 12/31/9999 11:59:59 PM null Most of the samples filter the CTIs out from the dump but I like to see them (always). Of course, since this post is about CTIs, we definitely need to see them. If you take a look at the results, you’ll see that the CTIs aren’t exactly where you might expect them to be. When you use IncreasingStartTime, the engine “watches” for a new start time to be enqueued with an event. It then enqueues a CTI with that new event’s start time. The next event – with the same start time – is in the next CTI span. So each CTI span has events with two different start times! Let’s change it around a bit. There is an overload of ToPointStream that takes an AdvanceTimeSettings, which gives you more control over your CTIs. Changing the code around a bit, we certainly get different results: AdvanceTimeSettings ats = new AdvanceTimeSettings( new AdvanceTimeGenerationSettings( TimeSpan.FromSeconds(2), TimeSpan.FromSeconds(0)), null, AdvanceTimePolicy.Drop); var valueStream = values.ToPointStream(Application, e => PointEvent.CreateInsert(t(e.Timestamp), new {Item = e.Item, Value = e.Value}),ats, "Values");   Values IEnumerable<PointEvent<>> (11 items) EventKind StartTime Payload Insert 1/11/2011 8:00:00 AM ø { Item = Variable1, Value = 92 } Item Variable1 Value 92 Cti 1/11/2011 8:00:00 AM null Insert 1/11/2011 8:00:00 AM ø { Item = Variable2, Value = 60 } Item Variable2 Value 60 Insert 1/11/2011 8:00:02 AM ø { Item = Variable1, Value = 93 } Item Variable1 Value 93 Cti 1/11/2011 8:00:02 AM null Insert 1/11/2011 8:00:02 AM ø { Item = Variable2, Value = 75 } Item Variable2 Value 75 Insert 1/11/2011 8:00:03 AM ø { Item = Variable2, Value = 81 } Item Variable2 Value 81 Insert 1/11/2011 8:00:03 AM ø { Item = Variable1, Value = 88 } Item Variable1 Value 88 Insert 1/11/2011 8:00:05 AM ø { Item = Variable1, Value = 93 } Item Variable1 Value 93 Cti 1/11/2011 8:00:05 AM null Insert 1/11/2011 8:00:05 AM ø { Item = Variable2, Value = 82 } Item Variable2 Value 82   It is different and it’s also close to what I want. But there are still events in there that have mixed start times within the same CTI window. But the example above isn’t quite fair … if I add a touch of a delay into the AdvanceTimeSettings begin to look more like what I expect. But … if you look above, we aren’t getting them every 2 seconds. We still have a patch of events with different start times. And – notice – they don’t come every 2 seconds like clockwork. Instead the CTIs are enqueued only after an event start time changed in the CTI. The only way to resolve it is to take complete control over the CTIs … so we add them into the source data. We don’t have to specify any AdvanceTimeGenerationSettings since, of course, we are enqueing manually. Which gives us the following code and output: void Main() { Func<int, DateTimeOffset> t = (s) => new DateTimeOffset(2011, 1, 11, 8, 0, 0, TimeSpan.Zero).AddSeconds(s); var values = new [] { new {Item="Variable2", Value=60, Timestamp=0}, new {Item="CTI", Value=60, Timestamp=0}, new {Item="Variable1", Value=93, Timestamp=2}, new {Item="Variable2", Value=75, Timestamp=2}, new {Item="Variable1", Value=88, Timestamp=3}, new {Item="Variable2", Value=81, Timestamp=3}, new {Item="CTI", Value=60, Timestamp=3}, new {Item="Variable1", Value=93, Timestamp=5}, new {Item="Variable2", Value=82, Timestamp=5}, new {Item="CTI", Value=60, Timestamp=5} }; var sourceData = values.ToPointStream(Application, e => e.Item != "CTI" ? PointEvent.CreateInsert(t(e.Timestamp), new Payload(){Item=e.Item, Value=e.Value}): PointEvent<Payload>.CreateCti(t(e.Timestamp).AddTicks(1))); sourceData.ToPointEnumerable().Dump("Results"); } // Define other methods and classes here public struct Payload{ public string Item; public int Value; } Note that we aren’t using an anonymous type for the stream – we can’t. You’ll get a compile error if you do. Also, the method that we’re using isn’t very reusable and we’ll wind up writing the same thing over and over again and tweaked to whatever we did. Finally, I’m not really thrilled about the clarity. But we can kick this up a notch and use a fully reusable method that handles creating the events and can use anonymous types, thanks to the wonderful goodness that are lamdas. Check it out: public static PointEvent<TPayload> GetPointEvent<TPayload, TSource>( TSource source, Func<TSource, bool> ctiSelectExpression, Func<TSource, TPayload> payloadSelectExpression, Func<TSource, DateTimeOffset> eventTimeExpression) { bool isCti = ctiSelectExpression.Invoke(source); if(isCti) { return PointEvent<TPayload>.CreateCti(eventTimeExpression.Invoke(source)); } return PointEvent<TPayload>.CreateInsert(eventTimeExpression.Invoke(source), payloadSelectExpression.Invoke(source)); } We can then use this in ToPointStream: var sourceData = values.ToPointStream(Application, e => GetPointEvent(e, i=> i.Item == "CTI", i=> new {Item=i.Item, Value = i.Value}, i => t(i.Timestamp))); The output is the same as the first method but, in this case, it is more reusable and I find it a touch simpler.