Ruminations of J.net idle rants and ramblings of a code monkey

Decoupling Queries from Input and Output

Code Sample | StreamInsight

There’s something that really disturbs me about how all of the StreamInsight samples are put together – everything is very, very tightly coupled. Streams are tightly bound to the source and the target of the data and there’s no good guidance on how to break all that stuff apart. I understand why – it certainly simplifies things – but that’s Bad Mojo™ and not how we want to build enterprise applications.

Let’s take a simple scenario. Let’s say that you are developing queries to detect key events for, say, an offshore well. You have a good idea what these events look like – what follows what – and you even have some recordings that you can play back at high speed to test your algorithms. All of your query code is built much like the demos are, so you’re pretty tightly bound to the source (the database) and your output (the console, just for giggles). Now … how do you hook this up to your production sources? And you certainly won’t be using a console output for the events that you detect in production. So do you go in, real quick-like, and change all of your code before compiling it for production? If so … let me know how that goes for you. And don’t call me when things start falling apart while you are doing last-minute compiles and emergency deployments and the like. This, however, is how all of the samples are written.

Granted … they are samples and aren’t meant to be “ready for the real-world”. But, sadly, there’s precious little guidance out there on a better way to do this in StreamInsight. With that said, however, this isn’t a new problem in software development. Oh, sure, the technology may be new and/or different, but the pattern of the problem certainly isn’t. What we need to do is abstract the specification of the consumer or producer from its usage when creating the streams. Fortunately, we’ve already defined a pretty well abstracted factory pattern for constructing our producers and consumers, even in the context of Reactive-based streams so this helps (and was part of the method to my madness!).

In addition to abstracting the producers and consumers, we also need to have a pattern for reuse of query logic. Take this scenario as an example: we’re monitoring offshore platforms in real-time. We have a set of queries that we use to process the sensor readings from the platforms and this logic should be applied to each of the platforms – it detects the key events that we’re concerned with. The only difference between each set of queries is the source of the data (different connections), the metadata associated with the queries and, possibly, the outputs for detected events (though, using subjects, we can aggregate the output events for a single output stream).

Enter the Stream Builder

This becomes our unit of query re-use. Using a defined interface and implementing classes based on this interface, we can encapsulate the query logic into a reusable “chunk” that gets built together. We can use this to start and stop the queries/process (depending on which model we are using) as well as to provide configuration for different parameters and the producers and consumers. Let’s start with the base class for configuration.

StreamBuilderConfiguration
  1. publicclassStreamBuilderConfiguration:IConfiguration
  2. {
  3.     privatereadonlyDictionary<string, EventComponentDefinition> _eventProducers =
  4.         newDictionary<string, EventComponentDefinition>();
  5.  
  6.     publicIDictionary<string, EventComponentDefinition> EventProducers
  7.     {
  8.         get { return _eventProducers; }
  9.     }
  10.  
  11.     privatereadonlyDictionary<string, EventComponentDefinition> _eventConsumers =
  12.         newDictionary<string, EventComponentDefinition>();
  13.  
  14.     publicIDictionary<string, EventComponentDefinition> EventConsumers
  15.     {
  16.         get { return _eventConsumers; }
  17.     }
  18.  
  19.     publicstring Name
  20.     {
  21.         get; set;
  22.     }
  23.  
  24.     publicvirtualEventComponentDefinition DefaultConsumer
  25.     {
  26.         get
  27.         {
  28.             returnnewEventComponentDefinition(){
  29.                 ComponentType=typeof(NullDataConsumerConfig),
  30.                 Configuration=newNullDataConsumerConfig()};
  31.         }
  32.     }
  33. }

First, you’ll notice that we define a dictionaries of EventComponentDefinitions. What is this? Well, keep in mind that we need a factory and a configuration to create our producers and consumers. So … this is what the EventComponentDefinition class encapsulates.

EventComponentDefinition
  1. publicclassEventComponentDefinition
  2. {
  3.     publicobject Configuration { get; set; }
  4.     publicType ComponentType { get; set; }
  5. }

In this case, the type for “ComponentType” is the producer/consumer factory class. So … now we have a way to abstract the definition of the consumers and producers (in the configuration) as well as a way to find them. In case you haven’t guessed yet, this provides an inversion of control that uses the dictionary lookup to locate the appropriate service. Now, the producer and/or consumer must still handle the type of the payload for the stream and we don’t have anything (yet) that checks and/or ensures that this is actually correct and compatible but we now have a contract for specifying these items and a contract for creation. Finally, so that we have an interface that we can bind to without having to worry about generics, we’ll extract the Start and the Stop methods into an interface. It’s all nicely abstracted now and ready for  us to create our stream builder.

StreamBuilder
  1. publicinterfaceIStreamBuilder
  2. {
  3.     void Start(Microsoft.ComplexEventProcessing.Application cepApplication);
  4.  
  5.     void Stop();
  6. }
  7.  
  8. publicabstractclassStreamBuilder<TConfiguration> : IStreamBuilderwhere TConfiguration:StreamBuilderConfiguration
  9. {
  10.  
  11.     protected StreamBuilder(TConfiguration configuration)
  12.     {
  13.         this.Configuration = configuration;
  14.     }
  15.  
  16.     public TConfiguration Configuration
  17.     {
  18.         get;
  19.         privateset;
  20.     }
  21.  
  22.     protectedApplication CurrentApplication
  23.     {
  24.         get;
  25.         set;
  26.     }
  27.  
  28.     publicabstractvoid Start(Microsoft.ComplexEventProcessing.Application cepApplication);
  29.  
  30.     publicabstractvoid Stop();
  31.  
  32.     protectedEventComponentDefinition GetConsumer(string name)
  33.     {
  34.         if (Configuration.EventConsumers.ContainsKey(name))
  35.         {
  36.             return Configuration.EventConsumers[name];
  37.         }
  38.         if (Configuration.DefaultConsumer != null)
  39.         {
  40.             return Configuration.DefaultConsumer;
  41.         }
  42.         thrownewInvalidOperationException(string.Format(ExceptionMessages.CONSUMER_NOT_FOUND, name));
  43.     }
  44.  
  45.     protectedEventComponentDefinition GetProducer(string name)
  46.     {
  47.         if (Configuration.EventProducers.ContainsKey(name))
  48.         {
  49.             return Configuration.EventProducers[name];
  50.         }
  51.  
  52.         thrownewInvalidOperationException(string.Format(ExceptionMessages.PRODUCER_NOT_FOUND, name));
  53.     }
  54.  
  55. }

We also have nice “helper” methods to get the producers and the consumers. Since a consumer isn’t really required, we also have a default consumer – the null consumer – in case it’s not specified. However, the producer is absolutely required so if it’s not found (not provided), we throw an exception. And since the interface has both a Start and a Stop, we can use this nice layer of abstraction to manage our running queries in a way that is abstracted from the underlying StreamInsight API in a consistent method regardless of the API that we are using for the queries. From here, we’ll create more specific implementations for the Reactive and the Query models.

ProcessBuilder Implementation

With the Reactive model introduced in StreamInsight 2.1, the process is the unit of execution. Related queries are started, executed and stopped as a single unit. You can have multiple bindings in each process for both input and output and they all start and stop together. So … it makes sense that our ProcessBuilder will build a single process with all of the related streams. bound together in that process. We’ll also abstract the code that we need to write for every source stream and for binding every output stream. The trick for handling the bindings is simple … have them added to a list that gets built up and then, when it’s time, bind them all together using With and run them in a single process. Of course, abstracting the bindings also allows us to do this pretty easily. Overriding the StreamBuilder’s Start method allows us to wrap up all of the necessary housekeeping to get started as well as to bind all of the streams together and run them in a single process. We’ll also define a “CreateStreams” method (as abstract) … our concrete implementations will override this method to do the work of creating the streams.

ProcessBuilder
  1. publicabstractclassProcessBuilder<TConfiguration>:StreamBuilder<TConfiguration>
  2.     where TConfiguration:StreamBuilderConfiguration
  3. {
  4.     protected ProcessBuilder(TConfiguration configuration) : base(configuration)
  5.     {
  6.     }
  7.  
  8.     privateList<IRemoteBinding> _bindings;
  9.     privateIDisposable _runningBindings;
  10.  
  11.     publicoverridevoid Start(Application cepApplication)
  12.     {
  13.         _bindings = newList<IRemoteBinding>();
  14.         CurrentApplication = cepApplication;
  15.         CreateStreams();
  16.  
  17.         var bindingList = _bindings.Skip(1).Aggregate(_bindings.First(),
  18.             (current, source) => current.With(source));
  19.         _runningBindings = bindingList.Run(Configuration.Name);
  20.  
  21.     }
  22.  
  23.     publicoverridevoid Stop()
  24.     {
  25.         if (_runningBindings != null)
  26.         {
  27.             _runningBindings.Dispose();
  28.         }
  29.     }
  30.  
  31.     protectedIQStreamable<TPayload> CreateStream<TPayload>(string producerName, EventShape eventShape)
  32.     {
  33.         var producer = Configuration.EventProducers[producerName];
  34.         returnRxStream<TPayload>.Create(CurrentApplication, producer.ComponentType, producer.Configuration, eventShape);
  35.     }
  36.  
  37.     protectedIQStreamable<TPayload> CreateStream<TPayload>(string producerName, EventShape eventShape,
  38.         AdvanceTimeSettings advanceTimeSettings)
  39.     {
  40.         var producer = Configuration.EventProducers[producerName];
  41.         returnRxStream<TPayload>.Create(CurrentApplication, producer.ComponentType, producer.Configuration, eventShape, advanceTimeSettings);
  42.     }
  43.  
  44.     protectedvoid Bind<TPayload>(IQStreamable<TPayload> stream, string consumerName, EventShape eventShape)
  45.     {
  46.         var consumer = GetConsumer(consumerName);
  47.         _bindings.Add(stream.ToBinding(CurrentApplication, consumer, eventShape));
  48.  
  49.     }
  50.  
  51.     protectedabstractvoid CreateStreams();
  52. }

Creating a new process builder is super-simple a really cuts down on the amount of code that you need to create and bind your streams. Both your events producers and consumers are now named resources – the process builder has no idea what the source or the targets are and it doesn’t need to know.

SampleProcessBuilder
  1. publicclassSampleProcessBuilder : ProcessBuilder<SampleStreamBuilderConfig>
  2. {
  3.     public SampleProcessBuilder(SampleStreamBuilderConfig configuration)
  4.         : base(configuration)
  5.     {
  6.     }
  7.  
  8.     protectedoverridevoid CreateStreams()
  9.     {
  10.  
  11.         var data = CreateStream<TestDataEvent>("SourceData",
  12.                             EventShape.Point,
  13.                             Configuration.GetAdvanceTimeSettings());
  14.  
  15.         Bind(data, "Output", EventShape.Point);
  16.         var aggregate = from d in data
  17.             group d by d.ItemId
  18.             into itemGroups
  19.             from i in itemGroups.HoppingWindow(TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(2))
  20.             selectnew
  21.             {
  22.                 ItemId = itemGroups.Key,
  23.                 Average = i.Avg(e => e.Value)
  24.             };
  25.         Bind(aggregate, "Aggregate", EventShape.Point);
  26.     }
  27. }

CepStreamBuilder Implementation

In keeping with our principle to keep the APIs as close as possible, we’ll also have a CepStreamBuilder that looks and acts the same way as our ProcessBuilder. However, we have a little bit of extra work involved since there is no concept of a Process with multiple inputs and outputs that run as a single unit of execution with the pre-2.1 model, we have a little extra work to do. Prior to the reactive model, the query was the unit of execution – it could have multiple input streams but only one output. Each query was independent of all other queries as well and there was no way to conveniently run related queries together. Reuse of query outputs was done through a technique called Dynamic Query Composition (DQC) that re-enqueued the output of one query back into a new stream, which then got output to another query. If you looked at the details of how this was done, it was a special input/output adapter pair. In essence, it was a subject without the flexibility or extensibility that we have with subjects in the Reactive model. Finally, we’ll also need to take into consideration the potential for name clashes when we name our queries – not something that we needed to worry about when working with the process container – and a way to “group” our related queries together – something the process container does inherently but not something that the query model did for you.

QueryBuilder
  1. publicabstractclassQueryBuilder<TConfiguration> : StreamBuilder<TConfiguration>
  2.     where TConfiguration : StreamBuilderConfiguration
  3. {
  4.  
  5.     publicList<Query> _queries;
  6.  
  7.     protected QueryBuilder(TConfiguration configuration) : base(configuration)
  8.     {
  9.     }
  10.  
  11.     publicoverridevoid Start(Application cepApplication)
  12.     {
  13.         _queries = newList<Query>();
  14.         CurrentApplication = cepApplication;
  15.         CreateStreams();
  16.  
  17.         //Start all of the queries.
  18.         foreach (var query in _queries)
  19.         {
  20.             query.Start();
  21.         }
  22.  
  23.     }
  24.  
  25.     publicoverridevoid Stop()
  26.     {
  27.         foreach (var query in _queries)
  28.         {
  29.             query.Stop();
  30.         }
  31.         foreach (var query in _queries)
  32.         {
  33.             query.Delete();
  34.         }
  35.         _queries = null;
  36.     }
  37.  
  38.     protectedabstractvoid CreateStreams();
  39.  
  40.     protectedCepStream<TPayload> CreateStream<TPayload>(string producerName, EventShape eventShape)
  41.     {
  42.         var producer = Configuration.EventProducers[producerName];
  43.         returnCepStream<TPayload>.Create(CurrentApplication,
  44.             GetSourceStreamName(producerName), producer.ComponentType,
  45.             producer.Configuration, eventShape);
  46.     }
  47.  
  48.     protectedCepStream<TPayload> CreateStream<TPayload>(string producerName, EventShape eventShape,
  49.         AdvanceTimeSettings advanceTimeSettings)
  50.     {
  51.         var producer = Configuration.EventProducers[producerName];
  52.         returnCepStream<TPayload>.Create(CurrentApplication,
  53.             GetSourceStreamName(producerName), producer.ComponentType,
  54.             producer.Configuration, eventShape, advanceTimeSettings);
  55.     }
  56.  
  57.     protectedvoid Bind<TPayload>(CepStream<TPayload> stream, string consumerName, EventShape eventShape)
  58.     {
  59.         var consumer = GetConsumer(consumerName);
  60.         _queries.Add(stream.ToQuery(CurrentApplication, GetQueryName(consumerName),
  61.             GetQueryDescription(consumerName), consumer.ComponentType,
  62.             consumer.Configuration, eventShape, StreamEventOrder.FullyOrdered));
  63.  
  64.     }
  65.  
  66.     privatestring GetQueryDescription(string consumerName)
  67.     {
  68.         return"Query from QueryBuilder [" + Configuration.Name + "] for consumer [" + consumerName + "]";
  69.     }
  70.  
  71.     protectedstring GetQueryName(string consumerName)
  72.     {
  73.         return Configuration.Name + "." + consumerName;
  74.     }
  75.     protectedstring GetSourceStreamName(string producerName)
  76.     {
  77.         return Configuration.Name + "." + producerName + ".Stream";
  78.     }
  79.  
  80. }

And now, creating a query builder is, like the process builder, super-simple. In fact, we can copy and paste the code from the process building into the query builder. The only difference is the name of the base class.

SampleQueryBuilder
  1. publicclassSampleQueryBuilder:QueryBuilder<SampleStreamBuilderConfig>
  2. {
  3.     public SampleQueryBuilder(SampleStreamBuilderConfig configuration) : base(configuration)
  4.     {
  5.     }
  6.  
  7.     protectedoverridevoid CreateStreams()
  8.     {
  9.         var data = CreateStream<TestDataEvent>("SourceData",
  10.                                         EventShape.Point,
  11.                                         Configuration.GetAdvanceTimeSettings());
  12.  
  13.         Bind(data, "Output", EventShape.Point);
  14.         var aggregate = from d in data
  15.                         group d by d.ItemId
  16.                             into itemGroups
  17.                             from i in itemGroups.HoppingWindow(TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(2))
  18.                             selectnew
  19.                             {
  20.                                 ItemId = itemGroups.Key,
  21.                                 Average = i.Avg(e => e.Value)
  22.                             };
  23.         Bind(aggregate, "Aggregate", EventShape.Point);
  24.     }
  25. }

Using the Stream Builders

Again, this is pretty simple. You create the configuration. You create the builder. You start the builder. You stop the builder. Regardless of whether it’s a QueryBuilder or a ProcessBuilder, the code to do this is the same and it bound to the StreamBuilder base class, rather than the specific implementations. Most of the code is now, actually, in creating the configuration to pass to the builder. Right now this is hard-coded but, in time, it won’t be; we’re pretty close to having all of the core, foundational pieces in place to further abstract even the configuration.

RunBuilder
  1. privatestaticvoid RunBuilder(Application cepApplication, Type builderType)
  2. {
  3.     var builderConfig = GetSampleStreamBuilderConfig();
  4.     var builder = (IStreamBuilder) Activator.CreateInstance(builderType, builderConfig);
  5.     builder.Start(cepApplication);
  6.  
  7.     Console.WriteLine("Builder is running. Press ENTER to stop.");
  8.     Console.ReadLine();
  9.     builder.Stop();
  10. }
  11.  
  12. privatestaticSampleStreamBuilderConfig GetSampleStreamBuilderConfig()
  13. {
  14.     //Create the configuration.
  15.     var streamProviderConfig = newSampleStreamBuilderConfig()
  16.     {
  17.         Name = "Provider1",
  18.         CtiDelay = TimeSpan.FromMilliseconds(750),
  19.         CtiInterval = TimeSpan.FromMilliseconds(250)
  20.     };
  21.  
  22.     //Add the producer.
  23.     streamProviderConfig.EventProducers.Add("SourceData", newEventComponentDefinition()
  24.     {
  25.         Configuration = newTestDataInputConfig()
  26.         {
  27.             NumberOfItems = 20,
  28.             RefreshInterval = TimeSpan.FromMilliseconds(500),
  29.             TimestampIncrement = TimeSpan.FromMilliseconds(500),
  30.             AlwaysUseNow = true,
  31.             EnqueueCtis = false
  32.         },
  33.         ComponentType = typeof (TestDataInputFactory)
  34.     });
  35.  
  36.     //Add the consumer.
  37.     streamProviderConfig.EventConsumers.Add("Output", newEventComponentDefinition()
  38.     {
  39.         Configuration = newConsoleOutputConfig()
  40.         {
  41.             ShowCti = true,
  42.             CtiEventColor = ConsoleColor.Blue,
  43.             InsertEventColor = ConsoleColor.Green
  44.         },
  45.         ComponentType = typeof (ConsoleOutputFactory)
  46.     });
  47.     streamProviderConfig.EventConsumers.Add("Aggregate", newEventComponentDefinition()
  48.     {
  49.         Configuration = newConsoleOutputConfig()
  50.         {
  51.             ShowCti = true,
  52.             CtiEventColor = ConsoleColor.White,
  53.             InsertEventColor = ConsoleColor.Yellow
  54.         },
  55.         ComponentType = typeof (ConsoleOutputFactory)
  56.     });
  57.     return streamProviderConfig;
  58. }

Wrapping Up

We’ve made a good deal of progress with this revision. We focused first on creating abstractions for the producers and consumers – they’re the beginning and the end, after all, of your StreamInsight application – and now added a framework for the chewy middle. I had a couple of questions from folks as to why I made some of the architectural decisions, particularly with having factories for the Reactive-model sinks and producers. Hopefully some of that is a little clearer now but, as we move forward, there will be additional things that we may do with the factories. There are also some changes and tweaks included in this download that aren’t described in the blog, primarily around the serialization/deserialization for the event consumers and producers, that were done while I was building the demo for Baton Rouge Sql Saturday. There’s still some work to do around these to optimize the performance, particularly for the serialization side of the equation, that I’ll get to sometime later. I was, however, quite proud that I was able to shave about 3 ticks off the deserialization performance. Now, this doesn’t sound like much (and it’s not) but when you realize that you may be doing this process tens of thousands of times per second, it adds up really quickly.

So, what’s next? There are two key pieces that we’ll need to put into place … abstracting the application hosting mode and then a configuration system. And then lots to build out … common query patterns in Linq macros, some aggregates and operators and building out more of the stream builder classes/model, to name a couple. Oh, and let’s not forget that we’ll need to have some of the basic, common event producers and consumers.

Last, but not least, you can download the code from here:

Comments are closed