Ruminations of idle rants and ramblings of a code monkey

Dual Mode Data Sources–Part I

Code Sample | StreamInsight

When StreamInsight 2.1 came out, the Big New Thing for the release was the new model for data ingress and egress – sources and sinks. Based on standard interfaces like IObservable and IObserver and the Reactive Extensions, it makes getting data into and out of StreamInsight a whole lot easier. The previous model works (or worked) well enough but, honestly, it was hard … and harder than it really needed to be. Getting the adapter lifetime just right was not a trivial undertaking and getting wrong meant either hanging on shutdown or an ObjectDisposedException. It took me … and my team … some time to get all of this nailed down to a science. The new model handles all of this for you … you just subscribe and publish. The rest is invisible. Very nice.

But, as when any new API replaces an old API, there is a time of transition from one to the other. There’s a time when developers or solution providers – like me – need to be able to support both APIs. For us, we actually still (sadly) need to support StreamInsight 1.2 and 2.0 in some cases so we can’t go over to the new model. But we also don’t want to rewrite all of our adapters or maintain two codebases of adapters. Instead, we want to support both from a single code base. Fortunately, this is actually pretty easy to accomplish with a little planning and good architectural practices. I’ll walk you through this – in it’s simplest form – and this code will also be the base for a line of additional articles that I’ve been working on and have planned. In fact, I plan to walk through creating a StreamInsight application step by step and discussing the technical details as we go so, by the end, you’ll have walked with me to build a simple StreamInsight application.

Oh … and let me make one thing clear though. I’ve heard it said (from Microsoft field technical sales-type folks) that “you don’t have to write adapters anymore.” That’s not entirely accurate. You don’t have to use the “legacy” adapter API anymore. But you will have to write code to get data in to and out of StreamInsight. You may use a different API now (IObserver/IObservable) but the necessity of writing code for this task is still there. Call it what you want ‘cuz now we’re just arguing semantics.

Sources/Input Adapters

Source is the new name for input adapter. With StreamInsight 2.1, you’ll want to implement IObservable. I won’t go over the basics; you’ll find them in the documentation. But there are some things that you need to consider that aren’t in the documentation. First … CTI's (and I’ll be talking more about these in a future post or two). While you can use AdvanceTimeSettings to have StreamInsight generate your CTIs for you, this isn’t always appropriate to do. So we need to have a way to have CTIs created and enqueued by the adapters … and it needs to be optional. So we’ll start with an wrapper class that allows a data source to create both events (with payloads) and CTIs. This will be our StreamInputEvent class. It will have all of the appropriate temporal properties and event shape metadata as well as the payload.

  1. public class StreamInputEvent<TPayloadType>
  2.     {
  3. public TPayloadType Payload { get; set; }
  4. public DateTimeOffset Start{ get; set; }
  5. public DateTimeOffset End{ get; set; }
  6. public EdgeType EdgeType{ get; set; }
  7. public EventKind EventKind{ get; set; }
  8.     }

We’ll also add some helper methods to create the different event shapes as well as a constructor or two:

Helpers & Constructors
  1. public PointEvent<TPayloadType> GetPointEvent()
  2. {
  3. if (this.EventKind == EventKind.Insert)
  4.     {
  5. return PointEvent<TPayloadType>.CreateInsert(this.Start, Payload);
  6.     }
  7. return PointEvent<TPayloadType>.CreateCti(Start);
  8. }
  10. //More helpers for Edge and Interval
  12. public StreamInputEvent(DateTimeOffset ctiDateTime)
  13. {
  14.     EventKind = EventKind.Cti;
  15.     Start = ctiDateTime;
  16.     EdgeType = EdgeType.Start;
  17.     End = DateTimeOffset.MaxValue;
  18.     Payload = default(TPayloadType);
  19. }
  21. public StreamInputEvent(TPayloadType payload)
  22. {
  23.     Start = DateTimeOffset.MinValue;
  24.     End = DateTimeOffset.MaxValue;
  25.     EdgeType = default(EdgeType);
  26.     Payload = payload;
  27.     EventKind = EventKind.Insert;
  28. }
  30. public StreamInputEvent(TPayloadType payload, DateTimeOffset startTime, DateTimeOffset endTime, EdgeType edgeType)
  31. {
  32.     Start = startTime;
  33.     End = endTime;
  34.     EdgeType = edgeType;
  35.     Payload = payload;
  36.     EventKind = EventKind.Insert;
  37. }

Now, you’ll note that there isn’t an EventShape specified. The shape can be determined from the event times and we’ve found it useful to have data sources that can adapter to whatever shape we deem fit for a particular use case. But we’ll still have a factory (regardless of how we do it) that allows us to limit a particular data source to specific shapes. And the helpers make it really, really easy when we create the IQStreamable for the temporal processing.

Now that we have that done, let’s take a step back and look at this. We have a data source of some type and we need to get the data into StreamInsight. The wonderful thing about StreamInsight 2.1’s focus on IObservable is that he lets use really step back for a second and look at how we produce data. In fact, it allows use to “hide” our producers behind a standard interface … and that’s what we need to do. At the lowest level, the code closest to the data source, we’ll create a “data producer” that takes the data and publishes it via IObservable – so there’s our ‘native’ 2.1 interface. To support the legacy adapter model, we will just need to subscribe to this with the concrete Input Adapter class.

So let’s get started by creating a base EventProducer class to handle subscribers and disposal. We’ll also need to make sure that we can specify the configuration for the event producer – just like we were forced to do with the legacy adapter model. The reality is simple … unless you are doing a really, incredibly simple demo, you need to have a way to specify configuration. Having a configuration class that is passed to the event producer from the application just makes sense.

  1. public abstract class StreamEventProducer<TPayloadType, TConfigType>
  2.     : IObservable<StreamInputEvent<TPayloadType>>, IDisposable
  3. {
  4. public TConfigType Configuration { get; protected set; }
  6. protected EventProducerBase(TConfigType configuration)
  7.     {
  8. this.Configuration = configuration;
  9.     }
  11. protected abstract void Start();
  13. private IObserver<StreamInputEvent<TPayloadType>> _observer;
  15. public virtual IDisposable Subscribe(IObserver<StreamInputEvent<TPayloadType>> observer)
  16.     {
  17. this._observer = observer;
  18.         Start();
  19. return this;
  20.     }
  22. protected void PublishException(Exception ex)
  23.     {
  24.         _observer.OnError(ex);
  25.     }
  27. protected void PublishEvent(StreamInputEvent<TPayloadType> newEvent)
  28.     {
  29.         _observer.OnNext(newEvent);
  30.     }
  32. protected virtual void Dispose(bool disposing)
  33.     {
  34. if (disposing)
  35.         {
  36.             _observer.OnCompleted();
  37.         }
  38.     }
  39. }

If you look at this, you’ll see that it’s pretty simple. You can certainly get more sophisticated with this - by supporting multiple observers to share event producers between multiple sources and/or adapters – but I want to keep it simple and clear. Our concrete event producer will inherit from this class, implement Start() (called when a subscriber actually subscribes) and call the PublishException and PublishEvent as appropriate. Our event producer will also need want to override Dispose(bool disposing) to get notification if when it’s time to shut down.

So let’s go ahead now and create our first event producer. For our sample here, it will be a very simple random data generator because, of course, everyone needs to analyze completely random data in StreamInsight. Well, no of course not, but it’s easy and simple. We’ll start with the configuration class.

  1. [DataContract()]
  2. public class TestDataInputConfig
  3. {
  5. public TestDataInputConfig()
  6.     {
  7.         RefreshInterval = TimeSpan.FromMilliseconds(500);
  8.         NumberOfItems = 10;
  9.         TimestampIncrement = TimeSpan.FromMinutes(5);
  10.         StartDateTime = DateTimeOffset.Now.AddMonths(-5);
  11.     }
  13.     [DataMember]
  14. public DateTimeOffset StartDateTime { get; set; }
  16.     [DataMember]
  17. public TimeSpan RefreshInterval { get; set; }
  19.     [DataMember]
  20. public TimeSpan TimestampIncrement { get; set; }
  22.     [DataMember]
  23. public int NumberOfItems { get; set; }
  24. }

Note that it is marked as DataContract and each property is marked with the DataMember attribute … this is very important. The configuration classes may – and probably will - end up being serialized. This was also true in the adapter model; all of your configuration classes need to be able to be serialized. Now you can also use Serializable() but I’ve found that, at times, the C# compiler goes somewhat insane when you do that and you get exceptions when serializing the configuration classes. Now for our payload class. We have an ItemId and a Value of type double and, since this class is for testing and we want to be able to “see” what’s going on (and for other reasons), we also have some properties on here to help with that – RunNumber (which generation “run” created it) and EnqueueTimestamp (the time it was actually created, which may be different from the event’s start time). Finally, we also have a static helper method that’ll create a bunch of these test events for enqueuing for the “run”.

  1. public class TestDataEvent
  2. {
  3. private static Random rdm = new Random();
  4. public static List<TestDataEvent> CreateNext(TestDataInputConfig config, int runNumber)
  5.     {
  6. List<TestDataEvent> newReferenceData =
  7.  new List<TestDataEvent>(config.NumberOfItems);
  9. for (int i = 0; i < config.NumberOfItems; i++)
  10.         {
  11.             newReferenceData.Add(new TestDataEvent()
  12.                     {
  13.                         ItemId = "Item" + i.ToString(),
  14.                         RunNumber = runNumber,
  15.                         EnqueueTimestamp = DateTime.Now,
  16.                         Value = rdm.NextDouble() * 10
  17.                     });
  18.         }
  19. return newReferenceData;
  20.     }
  22. public string ItemId { get; set; }
  23. public int RunNumber { get; set; }
  24. public DateTime EnqueueTimestamp { get; set; }
  25. public double Value { get; set; }
  26. }

Now that we have all of the foundational building blocks completed, we are finally ready to create our producer. For now, we’ll have our producer enqueue the CTIs after each “batch” of events; this keeps all of the events in the same “run” within the same CTI span.

  1. public class TestDataProducer : StreamEventProducer<TestDataEvent, TestDataInputConfig>
  2.     {
  4. private DateTimeOffset _nextStartTime;
  5. private Timer _enqueueTimer;
  6. private int _runNumber = 0;
  8. public TestDataProducer(TestDataInputConfig config): base(config)
  9.         {
  10.             _enqueueTimer = new Timer(ProduceEvents, null, Timeout.Infinite, Timeout.Infinite);
  11.  this._nextStartTime = config.StartDateTime;
  12.         }
  14. protected override void Start()
  15.         {
  16.  // Change the timer to start it.
  17.             _enqueueTimer.Change(TimeSpan.Zero, Configuration.RefreshInterval);
  18.         }
  20. /// <summary>
  21. /// Main driver to read events from source and enqueue them.
  22. /// </summary>
  23. private void ProduceEvents(object state)
  24.         {
  25.             _runNumber++;
  27.  var newEvents =
  28.  TestDataEvent.CreateNext(Configuration, _runNumber);
  30.  var eventTimestamp =  _nextStartTime;
  32.  var publishEvents = (from e in newEvents
  33.  select new StreamInputEvent<TestDataEvent>(e)
  34.                                             {
  35.                                                 Start = eventTimestamp
  36.                                             }).ToList();
  38.  foreach (var publishedEvent in publishEvents)
  39.             {
  40.  this.PublishEvent(publishedEvent);
  41.             }
  43.  //Enqueue our CTI
  44.  this.PublishEvent(new StreamInputEvent<TestDataEvent>(eventTimestamp.AddTicks(1)));
  45.             _nextStartTime += Configuration.TimestampIncrement;
  47.         }
  49. protected override void Dispose(bool disposing)
  50.         {
  51.  if (disposing)
  52.             {
  53.  if (_enqueueTimer != null)
  54.                 {
  55.                     _enqueueTimer.Dispose();
  56.                     _enqueueTimer = null;
  57.                 }
  58.             }
  59.  base.Dispose(disposing);
  60.         }
  63.     }

At this point, we are actually ready to start using the StreamInsight 2.1 APIs to get data from our new producer! Of course, we aren’t done but let’s see how we could do this. It’s there that the GetPointEvent() helper method is really, really handy. It makes the code very readable and it happens to be highly reusable as well, both of which are Very Good Things™.

Running the Process ...
  1. var config = new TestDataInputConfig (){
  2.     NumberOfItems=20,
  3.     RefreshInterval=TimeSpan.FromMilliseconds(500)
  4. };
  6. var data = cepApplication.DefineObservable(
  7.     () => new TestDataProducer(config)).ToPointStreamable(e => e.GetPointEvent());
  10. var sink = cepApplication.DefineObserver(() => Observer.Create<TestDataEvent>(
  11.     e => Console.WriteLine("TestEvent ItemId:{0} Run:{1} Value{2}", e.ItemId, e.RunNumber, e.Value)));
  13. data.Bind(sink).Run();

And when we do that, we get our data flowing. You can download the code from my SkyDrive.