Ruminations of J.net idle rants and ramblings of a code monkey

Multiple Timelines From a Single (Input) Source

StreamInsight | Code Sample

This is an interesting challenge that I’ve come across a couple of times – the last time being on the StreamInsight Forum. How do you handle the case where you have a single inbound event source but multiple timelines for the events? If you are wondering how this can happen, it’s actually pretty simple – you’ll have some kind of event aggregator that combines events from multiple downstream sources and each of these sources has different timestamps – due to unsynchronized system clocks at the event source – and/or different latencies – due to a store/forward/batch update type of system. “But,” you are saying, “you can just hook into the events at the initial source and then you don’t have any problem.” Sure … if you can. But that’s not always possible for any number of reasons. In Oil & Gas, probably the most common reason is that you don’t own the source system and you get your events (and sensor readings) from another company that actually operates the equipment. Or, even worse, a service provider company that isn’t even the operator of the equipment but only provides data services. And yes, this is very common.

What you’ll see in this scenario is a “loss of events”. You do get output but it’ll be from the one source with the “latest” timeline. Everything else gets dropped. Now … you will only see this if you are using declarative CTIs with AdvanceTimeGenerationSettings. If you are enqueuing your CTIs in your source, you’ll get CTI violation exceptions. And specifying Drop for the AdvanceTimePolicy doesn’t help with point events; Drop only affects interval events and only interval events where the event duration extends past the last-issued CTI.

So how do you deal with this situation? First, we need to get the initial events in. They have different timestamps and we know that. To simulate this, we’ll have an event source that enqueues items every second. It implements IObservable, of course, so we can easily use it as an event source. The event generation looks like the following:

Data Source
  1. for (int i = 0; i < 5; i++)
  2. {
  3.  
  4. var evt = new SourceEventData()
  5. {
  6. SourceTimeUtc = DateTimeOffset.Now.AddHours(i).UtcDateTime,
  7. SourceId = i,
  8. ItemId = "Item" + i.ToString(),
  9. Value = i * (_rnd.NextDouble() * 10)
  10. };
  11. PublishEvent(evt);
  12. }

So … each “source” has a unique identifier. You will need to have some kind of unique identifier (besides the timestamp) to identify the source. If, for some reason, it’s not available on the initial event, you’ll still have some kind of ID that identifies the item … you can then use a reference stream to identify the source/target timeline for the item. In this sample, each source is offset from the current time by 0 to 4 hours … so we have multiple timelines.

Now, let’s create a stream from this:

Aggregator Stream
  1. var eventAggregatorSource = cepApplication.DefineObservable(() => new StreamSource());
  2. var eventAggregatorSourceStream =
  3. eventAggregatorSource.ToPointStreamable(e => PointEvent<SourceEventData>.CreateInsert(DateTimeOffset.UtcNow, e),
  4. AdvanceTimeSettings.IncreasingStartTime);

In this case, we are using the event arrival time for the point event start time. In our sample, we’re not doing anything with this stream except publishing it but we can do some analytics here. For example, you might want to have a query that detects when you aren’t getting events from one of your sources. Or you could include reference data that helps you “reset” the tagged system time with the real time. For example, if you know that one of our sources has a system clock that is 3 days behind, you can add that to the SourceTimeUtc to handle that. One caveat … if you want to be able to detect if the entire source stream is offline, you’ll need to add CTIs to the stream from a timer rather than generating them. When generating CTIs, new CTIs are generated only when you have events flowing through. So when you don’t have any events (the whole thing is offline), you won’t get any offline alerts.

So … we have a single stream with timestamps by arrival time. For our analytics, we need to have them in the timeline of the underlying source of the data. So … how do we do this? It’s easy … you use a subject. We’ll take the original source stream, enqueued with the arrival time and publish to a subject. Then, we’ll subscribe to the subject for each of our source id’s. This will be the stream in the source system’s timeline and each timeline will be completely independent of other timelines. Once that’s done, we’ll define and then deploy an observable that will filter by a specific source id. Deploying the observable allows us to easily reuse it without needing to define it again. Note, however, that it’s not required to deploy an observable and it doesn’t always make sense to. But since this is something that we’ll reuse over and over again, it makes absolute sense to do it.

Subject + Deployed Observable
  1. var publishSubject = cepApplication.CreateSubject("EventSourceSubject",
  2. () => new Subject<SourceEventData>());
  3.  
  4. eventAggregatorSourceStream.Bind(publishSubject).Run("EventAggregatorProcess");
  5.  
  6. var sourceObservable = cepApplication.DefineObservable((int sourceId) =>
  7. cepApplication.GetSubject<SourceEventData, SourceEventData>("EventSourceSubject")
  8. .Where(e => e.SourceId == sourceId));
  9. //Deploy the observable; we can just use this ...
  10. sourceObservable.Deploy("EventsForSourceId");

To wrap it all up, all we need to do is now get our deployed observable, invoke the method and then convert to a point stream. When we create the point stream, we’ll use the SourceTimeUtc property as the start time for the point, converting it to the timeline of the downstream source.

Event Source Stream
  1. private static void CreateStreamForEventSource(Application cepApplication, string observableId, int sourceId)
  2. {
  3. var sourceStream = cepApplication.GetObservable<int, SourceEventData>(observableId)
  4. .Invoke(sourceId)
  5. .ToPointStreamable
  6. (e => PointEvent<EventData>.CreateInsert(new DateTimeOffset(e.SourceTimeUtc), (EventData) e),
  7.  AdvanceTimeSettings.IncreasingStartTime);
  8. sourceStream
  9.  .Bind(cepApplication.DefineObserver(() => Observer.Create<PointEvent<EventData>>(WriteOutput)))
  10.  .Run("EventStream" + sourceId.ToString());
  11.  
  12. }

At the end of it, we have a single process for the source events coming from our event aggregator. This process is synched to the local system clock and uses event arrival time for the event timestamp. Then, we have a process for each of our individual sources and each of those processes as a completely different, independent timeline … each process has its own clock. This is probably one of the clearest examples that I’ve seen of what’s meant by “application time” … something that I found to be a difficult concept to fully get my head around when I first started doing StreamInsight way back long ago.

You can download the sample project that shows all of this and the multiple timelines from my SkyDrive. Enjoy!

Comments are closed