Ruminations of J.net idle rants and ramblings of a code monkey

Using Subjects as a “Feedback Loop”

Code Sample | StreamInsight

OK, I’m going to switch gears for a minute here. Yes, I’ll be back to building the app but this is just too cool to not share. It’s something that I’ve thought about quite a bit off and on and a forum post got it going again so I put together a sample last night that shows how it can be done. The question … how do I take the output of a query and feed it back in to the query itself? This has a number of uses; in the case of the forum post, the developer wants to use this to update running totals and balances as orders are processed because these results impact the rest of the events. When working with sensors, this is also how you would do a “deadband” … an output isn’t produced unless the new value is more than a certain percentage change from the previous. For the deadband use case, you can’t just compare to the previous reading; if you do this, slow changes can accumulate but a new value is never reported because the change between any two values is less than the deadband threshold. So you need to compare the current reading value to the last reported value. I’ve done with with a UDO that maintains state (in a dictionary) for each of the items but it’s not as elegant as I’d like. And this is cooler anyway. Smile 

So … what is a subject? A subject is an observer and and observable; it will republish events that it observes (via IObserver) to others that are observing it (via IObservable). For a little primer on subjects, check out this blog entry from Colin on the StreamInsight team; it gives you a hint of what you can do with these. But that’s just the tip of the iceberg; subjects have become one of my favorite little tricks in StreamInsight because there’s so much that you can do with them. As Colin mentions, observers can come and go – so you can hook up multiple sinks to a single output query (something very difficult prior to 2.1). What Colin doesn’t mention is that your sources (observables) can come and go also … you can hook up multiple source queries that then get fed to one or more sinks.

Subjects are created and deployed independently of their sources and sinks – you can create a subject that you are using as a source before you actually hook it up to a sink. Or vice-versa. Subjects also allow you to share query results across processes – similar to what you would do with dynamic query composition but far, far, far more powerful and flexible.

Now, because a subject is both a source and a sink, you can use them to take the output results of a query and feed them back in to the source query … creating a feedback loop. Or a time warp.

Creating a subject is pretty simple. There isn’t much code required. You just need to know what type you’ll be using. You can bind the subject to the payload type only – you only get the payload and no temporal header – or to the TypedEvent (i.e. PointEvent, IntervalEvent, etc), in which case you will get the payload and the temporal header. Let’s start there.

GetOrCreateSubject
privatestaticobject _subjectLock = newobject();
privatestaticIRemoteSubject<TSubject, TSubject> GetOrCreateSubject<TSubject>(Application cepApplication, string subjectName)
{
    lock (_subjectLock)
    {
        bool subjectExists = cepApplication.Subjects.Any(s => s.Key == subjectName);
        if (subjectExists)
        {
            return cepApplication.GetSubject<TSubject, TSubject>(subjectName);
        }

        else
        {
            return cepApplication.CreateSubject(subjectName, () => newSubject<TSubject>());
        }
    }
}

As the name says, this method will get or create a subject, depending on if it’s already created or not. If you are wondering about the _subjectLock, that’s there to ensure that only one thread is in the middle of this operation at any time. If you are multi-threading calls to this, it is entirely possible to get into a situation where you are calling the same method with the same arguments on different threads and wind up with exceptions as multiple threads try to create the same subject at the same time.

Now, let’s get data running through our test app. I’m using the same app from my recent posts so this will look familiar. We’ll also get a reference to the subject.

Getting Data
var config = newTestDataInputConfig ()
            {
                NumberOfItems=20,
                RefreshInterval=TimeSpan.FromMilliseconds(500)
            };

            var data = cepApplication.DefineObservable(
                    () => newTestDataProducer(config)).ToPointStreamable(e => e.GetPointEvent());


            var lastReportedSubject = GetOrCreateSubject<PointEvent<TestDataEvent>>(cepApplication, "LastKnown");

We aren’t sending data to the subject just yet … that will be from the results later … but we do need to get a stream (IQStreamable) from it. But let’s stop for a moment and discuss some of the potential gotchas. First, remember that a stream in StreamInsight is a temporal stream. All of the events exist in time and the stream moves forward based on CTIs. We also know that we’ll be joining this feedback stream with the data stream and, when you do that, StreamInsight will synch to the slowest stream … the joined stream will move forward only when CTIs from both source streams move past the same timestamp. If our feedback loop simply publishes the CTIs that are generated from our result stream, it will never move forward. Why not? Because it’ll be waiting for CTIs to move past a timestamp from the result stream but the result stream can’t move forward because there is no CTI coming from our feedback stream. Did that make sense? It makes my brain hurt thinking about it too much. Anyway, what we need to do is to directly import the CTIs from the data stream. But that gives us another challenge. If we do that, we now have to worry about CTI violations from the data (insert) events being published from the results. You see, the events produced will have start times that are before the last-issued CTI; they must be or they wouldn’t be in the output stream. So we need to account for this when we enqueue the new events by shifting the start time forward so that it is after the last-issued CTI.

So let’s get started. To handle the CTIs, we’ll first filter the subject observable source so that it only produces Insert events. Then we will Merge these results it the CTIs from the data stream. The result will be an observable that we can then use as the source for the feedback stream. A note, however, on using the Merge technique to import CTIs … it will give you no protection at all from CTI violations. If you try to create/enqueue an event that violates the CTI, you will get an exception that you can’t really catch and it will cause your query to abort.

Feedback Source
var lastReportedObservableSource = lastReportedSubject
                                    //Get only the inserts from the subject, dropping the CTIs
                                    .Where(e => e.EventKind == EventKind.Insert)
                                    //Merge with the CTIs from the data stream.
                                    .Merge(data.ToPointObservable().Where(e => e.EventKind == EventKind.Cti));

Now that we have our observable, we need to make it a stream. While it is still an observable (IQbservable), we don’t have to follow the temporal/CTI rules because it’s not a temporal stream yet. However, when it’s a stream (IQStreamable), it is a temporal stream and we do have to follow the temporal/CTI rules … so we have to shift the timestamps of our point events so that they don’t violate the CTI. With the data source that we are using, we know that the CTIs are 1 tick past the “batch” of events so shifting the timestamp 1 tick will ensure that we’re good. In the real world, you may need to something a bit more sophisticated … see my previous blog article on importing CTIs in 2.1 for a couple of tips; I will be revisiting this in my app-building series.

Feedback Stream
var lastReportedSourceStream = lastReportedObservableSource
                                    .ToPointStreamable(e => e.EventKind == EventKind.Cti ? e :
                                        PointEvent<TestDataEvent>.CreateInsert(e.StartTime.AddTicks(1), e.Payload));

The hard part is done. Now that we have the streams, the rest is a series of StreamInsight queries. First, we want to make sure that we always have the last-known value available in the last reported value stream so we’ll use the (very) common ToSignal pattern. Next, we need to make sure that the initial values reported always go through. If we just join the streams, we’ll never get output because the initial events won’t be in the feedback stream. So we’ll do a LeftAntiJoin so that the events in the data stream with no matches in the feedback stream go through. (Depending on your scenario, it may be perfectly appropriate to “seed” the feedback stream with initial events … in the case of the forum post, these could be current balances when the application starts. If that’s the case, you can probably skip this step.) Then we calculate the percentage change from the last reported value for the current value; in this step, I’m including both the current and the last reported value in the stream (we don’t really need to) so that it’s easier to check the results using the event flow debugger and “see” everything that’s happening. From the calculation, we then select only those items that have changed more than a specified amount. Since we’re using random, very variable data in this sample, I’ve put the “deadband” at 100% but real-world will be somewhat less; anywhere from 1% – 10%, depending on the source. Union the two queries together and we’re done! (with the queries, at least).

The Queries
//Make sure that we have the last reported value always available.
var lastReportedStream = lastReportedSourceStream.ToSignal((e1, e2) => e1.ItemId == e2.ItemId);
    
//Make sure that our initial values always get reported in the output using LeftAntiJoin.
var firstValues =
    from i in data.LeftAntiJoin(lastReportedStream, (e1, e2) => e1.ItemId == e2.ItemId)
    select i;

//Calculate the change from the previous.
var calcStream = from d in data
                 join
                     lr in lastReportedStream
                     on d.ItemId equals lr.ItemId
                 selectnew
                     {
                         Value = d,
                         LastReported = lr,
                         PctChange = Math.Abs(1 - ((d.Value - lr.Value)/lr.Value))
                     };

//Select only those that have changed more than 100%
var changed = from c in calcStream
              where c.PctChange > 1.0
              select c.Value;

//Union first values with changed values
var final = firstValues.Union(changed);

All that’s left is to bind the output to the subject, creating the feedback loop as well as a console writer sink so that we see results. Then we run the process.

Bind and Run
var sink = cepApplication.DefineObserver(() => Observer.Create<TestDataEvent>(
    e => Console.WriteLine("TestEvent ItemId:{0} RunNumber:{1} Value{2}", e.ItemId, e.RunNumber, e.Value)));

//Bind to subject and console sink so that we see results.
//Run the process.
final.Bind(lastReportedSubject).With(final.Bind(sink)).Run();

I do have to note that when I run this, StreamInsight hangs on shutdown – not a good sign. I’m not exactly sure why this is happening but if I figure it out, I will update this post. Or … if you figure it out, you can send me a note using the “Contact” link on this blog.

You can download the code from my SkyDrive (of course!).