Ruminations of idle rants and ramblings of a code monkey

Dual Mode Data Sources–Part II

Code Sample | StreamInsight

In my previous post, I walked through creating a basic structure for creating StreamInsight data sources using the Reactive model introduced with version 2.1. There was quite a bit of plumbing work to get done but now that it’s finished, we’re ready to move on to the next step … taking what we just created and making it available to the pre-2.1 API. I didn’t mention this in the previous post but these articles are the first in what’s going to be a series of articles that walk through building a StreamInsight application. In these articles, we’re going through the process to create our data sources – or, more accurately, the architectural framework for our data sources. We’ll do the same with sinks/output adapters and then we’ll start pulling in some real(istic) data and doing something more interesting. Each article will build on the next so you can follow along.

Now, back to our input. Since we already have the data flowing via the IObservable interface, it’s really easy to subscribe to this interface and implement an input adapter that uses the exact same code to generate events. All we need to do subscribe to our producer and enqueue the event when OnNext is called:

On Next
  1. public void OnNext(StreamInputEvent<TPayloadType> publishedEvent)
  2. {
  3. if (AdapterState == AdapterState.Running)
  4. {
  5. var newEvent = publishedEvent.GetPointEvent();
  6. this.Enqueue(ref newEvent);
  7. }
  8. }

It really is that simple. Well, almost but before we get into that, let’s step back for a second. We have an event producer that creates our events. This producer handles all of the details of connecting to the data source and packaging the data for StreamInsight. Traditionally, we did all of this in the adapter itself but since we’ve already separated it, the adapter really doesn’t need to do much except enqueue. Looking at it that way, why do we need to have separate adapters for each type that we have? Well, the truth is: we don’t. With a little planning, we can have a single set of adapters that subscribe to a producer’s IObservable interface and handle enqueuing the events into StreamInsight as well as manage lifetime. These will be our adapters and they’ll wind up being very thin. So we’ll start by creating a new point typed input adapter. Like the base class, the payload type is a generic argument for the class – there’s no reason why we can’t use this for any type. Because Start() and Resume() are abstract methods on the base class, we have to implement them. We’ll also want to override Stop() and Dispose(bool disposing).

Adapter Skeleton
  1. public class ObservableTypedPointInputAdapter<TPayloadType>
  2. :TypedPointInputAdapter<TPayloadType>
  3. {
  4. public override void Start()
  5. {
  6.  throw new NotImplementedException();
  7. }
  9. public override void Resume()
  10. {
  11.  throw new NotImplementedException();
  12. }
  14. public override void Stop()
  15. {
  16. base.Stop();
  17. }
  19. protected override void Dispose(bool disposing)
  20. {
  21. base.Dispose(disposing);
  22. }
  23. }

So let’s get started. First, we need to get our producer. That’s simple enough; we know that we need to have an adapter factory so that’s where we’ll create it and pass it in as an argument of the constructor. We pop this into a private class field – we won’t be calling this until we subscribe. Speaking of subscribe, this is exactly what we need to do in the Start() method. For the producer’s OnCompleted action, we’ll call the adapter’s Stop() method. Yes, you can call Stop() in an adapter and not stop the queries that are getting events from the stream – this is something that may happen with a read-once reference data adapter. It’s not commonly seen in the wild but we’ll make sure that we handle it correctly. One thing that is important to understand here … if you are having StreamInsight generate your CTIs for you with AdvanceTimeSettings, one of your options is to AdvanceToInfinityOnShutdown. This is directly impacted by the adapter calling Stopped() (or the producer, in fact, calling Complete()). When the source shuts down, StreamInsight will enqueue a CTI with a timestamp of DateTimeOffset.MaxValue, which serves to “clear” the stream of any pending events. It also happens to work very nicely if you happen to have a read-once data source of reference data.

Constructor & Start
  1. public ObservableTypedPointInputAdapter(IObservable<StreamInputEvent<TPayloadType>> eventProducer)
  2. {
  3. this._eventProducer = eventProducer;
  4. }
  6. public override void Start()
  7. {
  8. _subscription = _eventProducer.Subscribe(OnNext,
  9. () =>
  10. {
  11.  if (AdapterState == AdapterState.Running)
  12.  {
  13.  Stop();
  14.  }
  15. });
  16. }

Start vs. Resume

In a lot of the samples, you’ll see these implemented the exact same way. However, in the real world, you may want to put a little more thought into how you handle Resume(). Odds are pretty good, especially if you are using Premium Edition, that you will never need to actually implement Resume(). It gets called after your adapter is put into a suspended state because the input queue is full. Yes, it is possible to fill up the input queue – you need about 200,000 unprocessed events that StreamInsight can’t get pushed through your queries. I have made this happen – but it took some effort. I actually had to continue enqueueing events just as fast as I could on multiple threads. Even with some convoluted queries, it still took about some time to fill up the input queue. Considering that I’ve seen StreamInsight process over 30,000 events/sec on a dual core i7 laptop and over 100,000 events/sec on a low-to-mid level server with CPU cycles to spare, you will have to work at it. But it can happen. Now, if you are using Standard Edition, you won’t be able to get to that level of throughput (and I haven’t done any testing on Standard’s throughput) but you can still expect to push a goodly number of events through standard edition. Now, when the input queue fills up, StreamInsight will return Full from the enqueue. Your adapter will need to call Ready() and then StreamInsight will call Resume() when you can start enqueuing events again. If you are using the StreamInsight 2.1 Reactive model, you won’t get any notification if the input queue fills. Instead, StreamInsight will simply ignore your events until it’s ready to receive events again. This is what I’m going to do in our excercises but it may not be appropriate, depending on your source and your needs. For example, if you are simply pushing recorded data through the engine, it would be easy enough to pause and then start back up again when Resume() is called. If, however, it’s live data, you may need to do something else. You will want to exercise some caution in buffering up events that you miss – you have no way of knowing when the adapter will start back up again and you could eat up FAR more memory than you want to. It is a potentially difficult issue but, fortunately, one that is pretty tough to hit in the real world. Especially if you are using Premium Edition, you’ll saturate network bandwidth (for inbound data) before you fill up the queue. In our exercise, we’re going to do the same thing that StreamInsight 2.1 does … we’re just going to ignore incoming events until Resume() is called.

Back to the code … in the snippet above, I enqueued directly from the OnNext function. While this works – if everything goes right – it doesn’t really represent what we’ll need to do in a real app to handle things like exceptions, queue full and the like. Also, while this class is designed to be a stand-alone input adapter than can be used with any observable source, there may be special case adapters – for example, specific handling of a pause/resume scenario – where we want to override specific behaviors with an inherited class so we want to make sure that we provide the right level of points to override. So we’ll take the enqueue operation and create our own method. In here, we do all of our state checks to make sure that we can enqueue, handle enqueue exceptions and queue full as well as keep track of our last-enqueued CTI. Finally, we will also include a critical section lock around the actual enqueue to make sure that we handline lifetime correctly and don’t report Stopped() when we’re in the middle of an enqueue. It is the same method described in my previous article on output adapter lifetime but applied to input adapters.

Enqueue Event
  1. protected virtual void EnqueueEvent(StreamInputEvent<TPayloadType> publishedEvent)
  2. {
  3. if (!_canEnqueue) return;
  5. if (publishedEvent.Start < _lastCti)
  6. {
  7. return;
  8. }
  9. lock (_lockObject)
  10. {
  11. if (this.AdapterState != AdapterState.Running)
  12. {
  13.  return;
  14. }
  15. var point = publishedEvent.GetPointEvent();
  16. try
  17. {
  18.  var enqueueResult = this.Enqueue(ref point);
  19.  if (enqueueResult == EnqueueOperationResult.Success && publishedEvent.EventKind == EventKind.Cti)
  20.  {
  21.  _lastCti = point.StartTime;
  22.  }
  23.  if (enqueueResult == EnqueueOperationResult.Full)
  24.  {
  25.  //Queue full!! Pause enqueuing.
  26.  _canEnqueue = false;
  27.  ReleaseEvent(ref point);
  28.  //Let StreamInsight know we're ready to resume.
  29.  Ready();
  30.  }
  31. }
  32. catch
  33. {
  34.  ReleaseEvent(ref point);
  35.  throw;
  36. }
  37. }
  38. }

Our last step is to create our adapter factory. In our Create method, we’ll do a couple of checks to make sure that our arguments are valid – the payload is the proper type and, since our random data source only supports points, that we are implementing the correct shape.

Input Adapter Factory
  1. public sealed class TestDataInputFactory : ITypedInputAdapterFactory<TestDataInputConfig>
  2. {
  4. [SuppressMessage("Microsoft.Design", "CA1004:GenericMethodsShouldProvideTypeParameter", Justification = "By Design")]
  5. public InputAdapterBase Create <TPayload> (TestDataInputConfig configInfo, EventShape eventShape)
  6. {
  8. if (typeof(TPayload) != typeof(TestDataEvent))
  9. {
  10.  //this won't work.
  11.  //throw an exception.
  12.  throw new InvalidOperationException("Specified type must be of " + typeof(TestDataEvent).FullName);
  13. }
  15. switch (eventShape)
  16. {
  17.  case EventShape.Point:
  18.  //Create the publisher.
  19.  return new ObservableTypedPointInputAdapter<TestDataEvent, TestDataInputConfig>(
  20.  new TestDataProducer(configInfo));
  21.  default:
  22.  throw new ArgumentException(string.Format(
  23.  System.Globalization.CultureInfo.InvariantCulture,
  24.  "TestDataInputFactory cannot instantiate adapter with event shape {0}",
  25.  eventShape.ToString()));
  26. }
  28. }
  30. /// <summary>
  31. /// Dispose method.
  32. /// </summary>
  33. public void Dispose()
  34. {
  35. }
  37. }

Once we have the adapter factory, we’re ready to go. Because we’ve not done any output adapters yet, we’ll be using the ToPointObservable() method to write to the console. Keep in mind that this works only when hosting StreamInsight in-process; it’s not at all like the IObservable support in 2.1. But it works well enough to show that we have some data flowing through and that our adapter now supports both models from the same data producer!

Run Query
  1. private static void RunQuery(Application cepApplication)
  2. {
  3. var config = new TestDataInputConfig()
  4. {
  5.  NumberOfItems = 20,
  6.  RefreshInterval = TimeSpan.FromMilliseconds(500)
  7. };
  9. var data = CepStream<TestDataEvent>.Create(cepApplication,
  10. "TestData", typeof (TestDataInputFactory), config, EventShape.Point);
  12. var observable = data.ToPointObservable().Subscribe(
  13. e =>
  14.  {
  15.  if (e.EventKind == EventKind.Insert)
  16.  {
  17.  Console.WriteLine("TestEvent ItemId:{0} Run:{1} Value{2}",
  18.  e.Payload.ItemId, e.Payload.RunNumber, e.Payload.Value);
  19.  }
  20.  else
  21.  {
  22.  Console.WriteLine("CTI @ {0}", e.StartTime);
  23.  }
  24.  });
  25. }

Using the same pattern, we can also create the adapters for interval and edge events pretty quickly. Unfortunately, a lot of it is copy-paste-tweak. I say unfortunately because it leads to a lot of redundant and repetitive code that I, for one, would prefer to create and maintain in a single place. However, our key method – enqueue – isn’t defined in any common base class; we have to inherit from the shape-specific adapters and there’s no way around it. While we could probably deal with this with another layer of abstraction, there isn’t, IMHO, a whole lot of value to that. Should that change, we’ll take a look at refactoring a bit then.

Looking at the input adapter factory, you’ll notice that it does some checking to make sure things are valid before creation; factories are good for that. There’s a bunch of other stuff that could be done in the factory as well and having a standard, generic factory interface is a good way to provide yourself with a clean, clear and decoupled way to create what may be complex underlying objects. And there’s no reason why we can’t revisit our existing 2.1-based code to add the factory in there. It also provides a level of consistency between the two different APIs that can make our life simpler, reduce the amount of debugging that we need to do and promote code sharing. That, however, will have to wait for the next post – I am trying to do this in manageable pieces so I’m not writing a novel with each post. In the meantime, you can download the current project from my SkyDrive.