Ruminations of J.net idle rants and ramblings of a code monkey

Importing CTIs with IQStreamable/IQbservable in StreamInsight 2.1

Code Sample | StreamInsight

This came up on the StreamInsight forum … actually from one of the guys on my team. This is pretty important … since StreamInsight syncs to the slowest stream, if you have a slow-moving reference stream (as described so well here), you need to import the CTIs from your faster data stream into the slower stream to keep things moving along as the proper pace. We use this pattern all the time and ran into this when we were converting an existing demo to use the 2.1 model.

Here’s what you need to do. First, you have your reference stream implemented and exposed as a simple Observable … you can use the .NET 4.0 IObservable interface for this; no special thing is needed. Your data stream should be exposed as an IQbservable or IObservable (you can get to IQStreamable from there) and the IObservable should include the CTIs. This means that your data source for the data stream needs to enqueue CTIs … you won’t really be able to use AdvanceTimeImportSettings.IncreasingStartTime/StrictlyIncreasingStartTime (since the CTIs aren’t exposed) nor will you be able to return as an IQStreamable and then switch it to a IQbservable (you’ll only get the payload). The first thing that we’ll need to do is to create a wrapper class that has both the payload and the temporal information and can also represent a CTI. Here’s an example:

Typed Event Class
  1. public TypedEvent<TPayloadType> GetEvent(EventShape eventShape)
  2. {
  3. switch (eventShape)
  4. {
  5.  case EventShape.Interval:
  6.  return GetIntervalEvent();
  7.  
  8.  case EventShape.Edge:
  9.  return GetEdgeEvent();
  10.  
  11.  case EventShape.Point:
  12.  return GetPointEvent();
  13.  
  14.  default:
  15.  throw new ArgumentOutOfRangeException("eventShape");
  16. }
  17. }
  18.  
  19. public PointEvent<TPayloadType> GetPointEvent()
  20. {
  21. if (!this.IsCti)
  22. {
  23.  return PointEvent<TPayloadType>.CreateInsert(this.Start, Payload);
  24. }
  25. return PointEvent<TPayloadType>.CreateCti(Start);
  26. }
  27.  
  28.  
  29. public IntervalEvent<TPayloadType> GetIntervalEvent()
  30. {
  31. if (!this.IsCti)
  32. {
  33.  return IntervalEvent<TPayloadType>.CreateInsert(this.Start, this.End, Payload);
  34. }
  35. return IntervalEvent<TPayloadType>.CreateCti(Start);
  36. }
  37.  
  38. public EdgeEvent<TPayloadType> GetEdgeEvent()
  39. {
  40. if (this.IsCti)
  41. {
  42.  return EdgeEvent<TPayloadType>.CreateCti(this.Start);
  43. }
  44.  
  45. if (this.EdgeType == EdgeType.Start)
  46. {
  47.  return EdgeEvent.CreateStart(this.Start, this.Payload);
  48. }
  49. return EdgeEvent.CreateEnd(this.Start, this.End, this.Payload);
  50. }
  51.  
  52. public PublishedEvent(DateTimeOffset ctiDateTime)
  53. {
  54. IsCti = true;
  55. Start = ctiDateTime;
  56. EdgeType=EdgeType.Start;
  57. End = DateTimeOffset.MaxValue;
  58. Payload = default(TPayloadType);
  59. }
  60.  
  61. /// <summary>
  62. /// Initializes a new instance of the <see cref="PublishedEvent&lt;TEventType&gt;"/> struct.
  63. /// </summary>
  64. /// <param name="payload">The Payload.</param>
  65. /// <remarks>
  66.  ///This sets the start to <see cref="DateTimeOffset">DateTimeOffset.MinValue</see>,
  67.  ///the end to <see cref="DateTimeOffset">DateTimeOffset.MaxValue</see>
  68.  ///and the edge type to the default value.<br/>
  69.  ///The subscriber is responsible for explicitly setting these parameters.
  70. /// </remarks>
  71. public PublishedEvent(TPayloadType payload)
  72. Start = DateTimeOffset.MinValue;
  73. End = DateTimeOffset.MaxValue;
  74. EdgeType = default(EdgeType);
  75. Payload = payload;
  76. IsCti = false;
  77. }
  78.  
  79. /// <summary>
  80. /// Initializes a new instance of the <see cref="PublishedEvent&lt;TEventType&gt;"/> struct.
  81. /// </summary>
  82. /// <param name="payload">The event payload.</param>
  83. /// <param name="startTime">The start time.</param>
  84. /// <param name="endTime">The end time.</param>
  85. /// <param name="edgeType">Type of the edge.</param>
  86. /// <remarks>
  87.  ///Use when the event publisher "knows" the appropriate start time, end time and/or the edge type.
  88.  ///<br/>
  89.  ///This is useful when publishing Payload that carry this information.
  90. /// </remarks>
  91. public PublishedEvent(TPayloadType payload, DateTimeOffset startTime, DateTimeOffset endTime, EdgeType edgeType)
  92. {
  93. Start = startTime;
  94. End = endTime;
  95. EdgeType = edgeType;
  96. Payload = payload;
  97. IsCti = false;
  98. }
  99.  
  100. /// <summary>
  101. /// Gets or sets the Payload.
  102. /// </summary>
  103. /// <value>
  104. /// The Payload.
  105. /// </value>
  106. public TPayloadType Payload;
  107. public DateTimeOffset Start;
  108. public DateTimeOffset End;
  109. public EdgeType EdgeType;
  110. public bool IsCti;
  111.  
  112. }

There are “helper” methods on the class to create the appropriate events from the wrapper for convenience. It makes the ToStreamable method a little simpler and clearer. Now that we have that, we need to define an event publisher. This exposes IObservable and provides the events to StreamInsight. It’s pretty simple and, like an adapter, has a configuration that is passed in. In our case, the configuration has the number of items to create and a refresh interval … how frequently new events are created. This also allows us to use one publisher for both the fast and slow moving events.

Configuration class
  1. [DataContract()]
  2. public class TestDataInputConfig
  3. {
  4.  
  5. public TestDataInputConfig()
  6. {
  7. RefreshInterval = TimeSpan.FromMilliseconds(500);
  8. NumberOfItems = 10;
  9. }
  10.  
  11. [DataMember]
  12. public string Name { get; set; }
  13.  
  14. [DataMember]
  15. public TimeSpan RefreshInterval { get; set; }
  16.  
  17. [DataMember]
  18. public int NumberOfItems { get; set; }
  19.  
  20. }

In the sample, this is a test data generator … it creates a payload that contains an Item Id, a run number (which iteration of the input) and a string so that we know a) where it came from and b) some payload info. Again, very simple but helps us validate that we are getting the results that we expect. And by implementing IObservable, we can publish this to StreamInsight using the new DefineObservable method. Since each published event has the payload and temporal header information, it’s easy to then convert the IQbservable to a IQStreamable and get it going with temporal properties.

Test Data Event
  1. public class TestDataEvent
  2. {
  3. public static List<TestDataEvent> CreateNext(TestDataInputConfig config, int runNumber)
  4. {
  5. List<TestDataEvent> newReferenceData =
  6.  new List<TestDataEvent>(config.NumberOfItems);
  7.  
  8. for (int i = 0; i < config.NumberOfItems; i++)
  9. {
  10.  newReferenceData.Add(new TestDataEvent()
  11.  {
  12.  Source = config.Name,
  13.  ItemId = "Item" + i.ToString(),
  14.  RunNumber = runNumber,
  15.  EventText = "Text for Item " + i.ToString() + " Run Number " + runNumber
  16.  });
  17. }
  18. return newReferenceData;
  19. }
  20.  
  21. public string Source { get; set; }
  22. public string ItemId { get; set; }
  23. public int RunNumber { get; set; }
  24. public string EventText { get; set; }
  25.  
  26. }

The publisher implements IObservable and creates events based on a timer. And it doesn’t start the timer until Subscribe is called … if we start producing them in the constructor, we’ll miss the first round of events and we don’t want to do that.

Data Publisher
  1. public class TestDataPublisher:
  2. IObservable<PublishedEvent<TestDataEvent>>,
  3. IDisposable
  4. {
  5.  
  6. private Timer _enqueueTimer;
  7. private int _runNumber = 0;
  8. private readonly TestDataInputConfig _config;
  9.  
  10. private readonly Dictionary<string, IObserver<PublishedEvent<TestDataEvent>>>
  11. _instances = new Dictionary<string, IObserver<PublishedEvent<TestDataEvent>>>(50);
  12. public TestDataPublisher(TestDataInputConfig config)
  13. {
  14. _config = config;
  15. _enqueueTimer = new Timer(ProduceEvents, null, Timeout.Infinite, Timeout.Infinite);
  16. }
  17.  
  18. /// <summary>
  19. /// Main driver to read events from source and enqueue them.
  20. /// </summary>
  21. private void ProduceEvents(object state)
  22. {
  23. _runNumber++;
  24. var newEvents =
  25.  TestDataEvent.CreateNext(_config, _runNumber);
  26.  
  27. var publishEvents = (from e in newEvents
  28.  select new PublishedEvent<TestDataEvent>(e)
  29.  {
  30.  Start = DateTimeOffset.Now
  31.  }).ToList();
  32.  
  33. foreach (var observer in _instances.Values.AsParallel())
  34. {
  35.  foreach (var publishedEvent in publishEvents)
  36.  {
  37.  observer.OnNext(publishedEvent);
  38.  }
  39.  observer.OnNext(new PublishedEvent<TestDataEvent>(DateTimeOffset.Now.AddTicks(1)));
  40. }
  41. }
  42.  
  43.  
  44. /// <summary>
  45. /// Notifies the provider that an observer is to receive notifications.
  46. /// </summary>
  47. /// <returns>
  48. /// A reference to an interface that allows observers to stop receiving notifications before the provider has finished sending them.
  49. /// </returns>
  50. /// <param name="observer">The object that is to receive notifications.</param>
  51. public IDisposable Subscribe(IObserver<PublishedEvent<TestDataEvent>> observer)
  52. {
  53. string instanceId = Guid.NewGuid().ToString();
  54. _instances.Add(instanceId, observer);
  55. observer.OnNext(new PublishedEvent<TestDataEvent>(DateTimeOffset.Now.AddMonths(-5)));
  56. if(_instances.Count == 1)
  57. {
  58.  _enqueueTimer.Change(TimeSpan.FromSeconds(0), _config.RefreshInterval);
  59. }
  60. return Disposable.Create(() => Unsubscribe(instanceId));
  61. }
  62.  
  63. public void Unsubscribe(string id)
  64. {
  65. _instances.Remove(id);
  66. }
  67.  
  68. #region Disposable Pattern
  69. // (removed for brevity)
  70. #endregion
  71. }

The magic, though, happens in the AdvanceTimeImporter. It’s kind of a subject except that it observes both streams. It produces a single observable that has events from the event stream (with reference data) and the CTI stream. For the CTI stream, it is just an IObservable of DateTimeOffsets … so the CTIs don’t necessarily need to come from the moving data stream; they could be generated on a “clock”. The event stream doesn’t need to publish CTIs but, if it does, we exclude them (they’d just muddy the waters). We also have the option (as with the AdvanceTimeImportSettings) to adjust or drop events from the event stream that violate CTI rules. However, there’s on little implementation exception … when the value is Adjust, we adjust all of the event start times, not just those intervals that cross into the CTI span, ensuring that all of our data events get into the output (merged) stream.

Advance Time Importer
  1. //NOTE: need to put some thought around how to handle OnCompleted/OnError from the observables.
  2. public class AdvanceTimeImporter<TDataStreamEventType>:IDisposable,
  3. IObservable<PublishedEvent<TDataStreamEventType>>
  4. {
  5.  
  6. private IObservable<PublishedEvent<TDataStreamEventType>> _dataStreamObservable;
  7. private IObserver<PublishedEvent<TDataStreamEventType>> _observer;
  8. private readonly IDisposable _eventStreamDisposable, _ctiStreamDisposable;
  9. private readonly TimeSpan _dataStreamRefreshInterval;
  10. private readonly AdvanceTimePolicy _importPolicy;
  11.  
  12. public AdvanceTimeImporter(
  13. IObservable<PublishedEvent<TDataStreamEventType>> dataStream,
  14. IObservable<DateTimeOffset> ctiStream,
  15. AdvanceTimePolicy importPolicy)
  16. {
  17. _dataStreamObservable = dataStream;
  18. _ctiStreamDisposable = ctiStream.Subscribe(NextCtiItem);
  19. _importPolicy = importPolicy;
  20. _eventStreamDisposable = _dataStreamObservable.Where(e=> !e.IsCti ).Subscribe(NextDataItem);
  21. }
  22.  
  23. private DateTimeOffset _lastCtiTimestamp = DateTimeOffset.MinValue;
  24.  
  25. private void NextDataItem(PublishedEvent<TDataStreamEventType> dataEvent)
  26. {
  27. if(dataEvent.IsCti )
  28. {
  29.  //Don't publish the CTIs from the data stream.
  30.  return;
  31. }
  32. //Check for CTI violations but not for edge end.
  33. if (dataEvent.Start >= _lastCtiTimestamp && dataEvent.EdgeType != EdgeType.End)
  34. {
  35.  PublishOnNext(dataEvent);
  36.  System.Diagnostics.Debug.WriteLine("Data ... no CTI Violation");
  37. }
  38. else if (_importPolicy == AdvanceTimePolicy.Adjust)
  39.  //Adjust ... but not for end edges.
  40.  if (dataEvent.Start <= _lastCtiTimestamp)
  41.  {
  42.  dataEvent.Start = _lastCtiTimestamp.AddTicks(1);
  43.  }
  44.  PublishOnNext(dataEvent);
  45.  System.Diagnostics.Debug.WriteLine("Data ... adjusted for CTI Violation");
  46. }
  47. }
  48.  
  49. public IDisposable Subscribe(IObserver<PublishedEvent<TDataStreamEventType>> observer)
  50. {
  51. _observer = observer;
  52. if (_importPolicy == AdvanceTimePolicy.Adjust)
  53. {
  54.  //add a starter CTI.
  55.  NextCtiItem(_lastCtiTimestamp);
  56. }
  57.  
  58. //Release our reference to the observable.
  59. _dataStreamObservable = null;
  60. return this;
  61. }
  62.  
  63. private void NextCtiItem(DateTimeOffset cti)
  64. {
  65. if(cti >= _lastCtiTimestamp)
  66. {
  67.  _lastCtiTimestamp = cti;
  68.  PublishOnNext(new PublishedEvent<TDataStreamEventType>(cti));
  69. }
  70. }
  71. private void PublishOnNext(PublishedEvent<TDataStreamEventType>  publishedEvent)
  72. {
  73. if (_observer != null)
  74. {
  75.  _observer.OnNext(publishedEvent);
  76. }
  77. }
  78. #region IDisposable implementation
  79. // (removed for brevity)
  80. #endregion
  81. }

That’s really it. I also created an extension method for IObservable that makes this a bit simpler and clearer to call. One thing that I do in the extension method is to use the Reactive Extensions Replay subject to make sure that latecomers to the Observable get the last batch of data that was published.

Implementing the queries is pretty straightforward now.

Import CTI Queries
  1. var inputConfig = new TestDataInputConfig()
  2. {
  3. Name = "DataStream",
  4. RefreshInterval = TimeSpan.FromMilliseconds(500)
  5. };
  6. var referenceConfig = new TestDataInputConfig()
  7. {
  8. Name = "ReferenceStream",
  9. RefreshInterval= TimeSpan.FromSeconds(15)
  10. };
  11.  
  12. var dataSource = myApp.DefineObservable(() => new TestDataPublisher(inputConfig));
  13.  
  14. var ctis = dataSource.Where(e => e.IsCti).Select(e => e.Start);
  15.  
  16. var referenceStream = myApp.DefineObservable(() =>
  17. new TestDataPublisher(referenceConfig).ImportCTIs(ctis, referenceConfig.RefreshInterval,
  18. AdvanceTimePolicy.Adjust))
  19. .ToIntervalStreamable(e => e.GetIntervalEvent());
  20.  
  21. var dataStream = dataSource.ToPointStreamable(e => e.GetPointEvent());
  22.  
  23. var outputStream = from d in dataStream
  24.  from r in referenceStream
  25.  .AlterEventDuration(e=> TimeSpan.MaxValue)
  26.  .ClipEventDuration(referenceStream, (r1,r2) => r1.ItemId == r2.ItemId)
  27.  where d.ItemId == r.ItemId
  28.  select new ResultClass()
  29.  {
  30.  DataRunNumber = d.RunNumber,
  31.  ItemId = d.ItemId,
  32.  ReferenceString = r.EventText,
  33.  DataSource = d.Source,
  34.  ReferenceSource = r.Source
  35.  };
  36.  
  37. var outputDataSink = GetOutputDataSink(myApp);
  38. outputStream.Bind(outputDataSink).Run("AdvanceTimeImport.Test");

In the sample, I’m using the old-school Tracer Adapter defined as a sink … that’s easy enough to change if you want to. Here’s the sample: