Ruminations of J.net idle rants and ramblings of a code monkey

Dual-Mode Data Sinks - Part I

Code Sample | StreamInsight

Now that we have the input completed, we need to start working on the output adapters. As with the input adapters/sources, we’ll create an architecture that allows you to use the same core code whether you are using the pre-2.1 adapter model or the 2.1 and later sink model.

As with our StreamInputEvent, we’ll create an abstraction that allows us to handle any event shape with the same code. This StreamOutputEvent should have properties that express all of the possible variations in event shapes as well as a generic property to hold the payload. Now, if you look at several of the StreamInsight samples, you’ll notice that they only send the payload to the sink. Certainly, that makes a couple of things easier but I don’t think that it’s really the best way to do things. A key aspect of everything StreamInsight is the temporal properties and you lose that if you don’t send the full event shape to your sink. And … the shape is really only a way of expressing the event. Internally, all events, regardless of shape, have start and end times that control their lifetime in the query engine. The shape really becomes a way of how you want to “see” the events and handle them in your output. There’s nothing that stops you from expressing the same query as an edge, a point or an interval. It will impact when the event gets “released” to the output adapters/sink by the engine but really, it’s just a matter of how you want to see the event and when you want to get it to your output. But, before I go any further, here’s our StreamOutputEvent:

StreamOutputEvent
publicclassStreamOutputEvent<TPayload>
{

    ///<summary>
    /// Creates an output event from a source event.
    ///</summary>
    ///<param name="sourceEvent">The source event.</param>
    ///<returns></returns>
    publicstaticStreamOutputEvent<TPayload> Create(PointEvent<TPayload> sourceEvent)
    {
        var outputEvent = newStreamOutputEvent<TPayload>()
            {
                StartTime = sourceEvent.StartTime,
                EventKind = sourceEvent.EventKind,
                EventShape = EventShape.Point
            };

        if (sourceEvent.EventKind == EventKind.Insert)
        {
            outputEvent.Payload = sourceEvent.Payload;
        }
        return outputEvent;
    }

    ///<summary>
    /// Creates an output event from a source event.
    ///</summary>
    ///<param name="sourceEvent">The source event.</param>
    ///<returns></returns>
    publicstaticStreamOutputEvent<TPayload> Create(IntervalEvent<TPayload> sourceEvent)
    {
        var outputEvent = newStreamOutputEvent<TPayload>()
        {
            StartTime = sourceEvent.StartTime,
            EventKind = sourceEvent.EventKind,
            EventShape = EventShape.Interval
        };

        if (sourceEvent.EventKind == EventKind.Insert)
        {
            outputEvent.EndTime = sourceEvent.EndTime;
            outputEvent.Payload = sourceEvent.Payload;
        }
        return outputEvent;
    }

    ///<summary>
    /// Creates an output event from a source event.
    ///</summary>
    ///<param name="sourceEvent">The source event.</param>
    ///<returns></returns>
    publicstaticStreamOutputEvent<TPayload> Create(EdgeEvent<TPayload> sourceEvent)
    {
        var outputEvent = newStreamOutputEvent<TPayload>()
        {
            StartTime = sourceEvent.StartTime,
            EventKind = sourceEvent.EventKind,
            EventShape = EventShape.Edge
        };

        if (sourceEvent.EventKind == EventKind.Insert)
        {
            outputEvent.Payload = sourceEvent.Payload;
            outputEvent.EdgeType = sourceEvent.EdgeType;
            if (sourceEvent.EdgeType == Microsoft.ComplexEventProcessing.EdgeType.End)
            {
                outputEvent.EndTime = sourceEvent.EndTime;
            }
        }
        return outputEvent;
    }

    publicDateTimeOffset StartTime { get; privateset;  }
    publicEventKind EventKind { get; privateset; }
    publicDateTimeOffset? EndTime { get; privateset;  }
    publicEventShape EventShape { get; privateset;  }
    publicEdgeType? EdgeType { get; privateset; }
    public TPayload Payload { get; privateset; }
}

With constructors for each different event shape, this is something that we can easily create from an event stream and then send to a single set of code that handles the outbound event.

When creating the sink and hooking it to the stream, it’s a matter of how you create your observer and the type specified for the TElement. Very simply, the key thing that dictates what StreamInsight “sends” to your sink is the type for the observer’s generic class parameter. If you specify IObserver<TPayload>, you’ll only get the payload. However, if you specify IObserver<PointEvent<TPayload>>, you’ll get a point event (and so on for intervals and edges). Since our event consumer should be able to consume events of any shape, we will actually need to implement the observer interface for each of the shapes. While it may be tempting to try to implement one interface based on TypedEvent<T>, it won’t work. Yes, I tried. But StreamInsight requires that your sinks specify the payload as the type for the observer or one of the TypedEvent<T> child classes. If you don’t specify an event shape, StreamInsight will handle the events and release them to the output adapter as though they were point events. For some very basic scenarios, this works. But when you start getting into some of the more interesting scenarios for StreamInsight, you’ll want to do a lot more with your output than to view it as a point. But … this is for the next post. Let’s get back to our implementation.

As we did with our input producer, we’ll create a common, abstract base class for all of our event consumers. Our concrete consumers will inherit from this class and handle whatever is necessary to write the data to our target, whatever it may be. Again, as with the producer, we’ll specify a configuration class; while the reactive StreamInsight API no longer requires it, the reality is that you will want to have a configuration class for your sinks; you don’t want to hard-code things like database connection strings, web service target URIs or anything like that in your event consumers. But the key thing that our base class will do is to implement the interface for each of the event shapes, translate them to our StreamOutputEvent and then send to our actual event consumer’s code.

StreamEventConsumer
publicabstractclassStreamEventConsumer<TPayloadType, TConfigType> :
    IObserver<PointEvent<TPayloadType>>,
    IObserver<EdgeEvent<TPayloadType>>,
    IObserver<IntervalEvent<TPayloadType>>
{
    protected StreamEventConsumer(TConfigType configuration)
    {
        this.Configuration = configuration;
    }

    public TConfigType Configuration { get; privateset; }

    publicabstractvoid Completed();

    publicabstractvoid Error(Exception error);

    publicabstractvoid EventReceived(StreamOutputEvent<TPayloadType> outputEvent);
    publicvoid OnNext(PointEvent<TPayloadType> value)
    {
        EventReceived(StreamOutputEvent<TPayloadType>.Create((PointEvent<TPayloadType>)value));
    }

    publicvoid OnNext(IntervalEvent<TPayloadType> value)
    {
        EventReceived(StreamOutputEvent<TPayloadType>.Create((IntervalEvent<TPayloadType>)value));
    }

    publicvoid OnNext(EdgeEvent<TPayloadType> value)
    {
        EventReceived(StreamOutputEvent<TPayloadType>.Create((EdgeEvent<TPayloadType>)value));
    }

    publicvoid OnCompleted()
    {
        Completed();
    }

    publicvoid OnError(Exception error)
    {
        Error(error);
    }
}

From here, we’ll continue to build the architecture much the same way that we built the event producers. For each consumer, we’ll have a factory that handles the details of creating and starting the consumer based on a common interface.

ISinkFactory
interfaceISinkFactory
{
    IObserver<PointEvent<TPayload>> CreatePointObserverSink< TPayload>(object config);
    IObserver<EdgeEvent<TPayload>> CreateEdgeObserverSink<TPayload>(object config);
    IObserver<IntervalEvent<TPayload>> CreateIntervalObserverSink<TPayload>(object config);
}

Between StreamEventConsumer and ISinkFactory, it’s now a small step to create a concrete factory and consumer – as well as the adapters. For simplicity’s sake, we’ll use a console consumer.

Console Data Consumer
publicclassConsoleDataConsumer<TPayloadType>:StreamEventConsumer<TPayloadType, ConsoleOutputConfig>
{
    public ConsoleDataConsumer(ConsoleOutputConfig configuration) : base(configuration)
    {
    }

    publicoverridevoid Completed()
    {
        //Nothing necessary.
    }

    publicoverridevoid Error(Exception error)
    {
        Console.WriteLine("Error occurred:" + error.ToString());
    }

    publicoverridevoid EventReceived(StreamOutputEvent<TPayloadType> outputEvent)
    {
        if (outputEvent.EventKind == EventKind.Insert)
        {
            Console.ForegroundColor = Configuration.InsertEventColor;
            Console.WriteLine("Insert Event Received at " + outputEvent.StartTime);
        }
        elseif (Configuration.ShowCti)
        {
            Console.ForegroundColor = Configuration.CtiEventColor;
            Console.WriteLine("CTI event received at " + outputEvent.StartTime);
        }
        Console.ResetColor();
    }
}

Our factory handles the dirty details of hooking up to our source streams as well as our output adapters. By forcing the factory to implement methods for each event shape we both ensure that we get the interface that we need when creating the sink and tying it to our streams as well as the opportunity to say, in code, that a particular output sink doesn’t support specific shapes, should that be appropriate.

ConsoleOutputFactory
publicclassConsoleOutputFactory:ISinkFactory , ITypedOutputAdapterFactory<ConsoleOutputConfig>  
    {

        publicIObserver<PointEvent<TPayload>> CreatePointObserverSink<TPayload>(object config)
        {
            returnnewConsoleDataConsumer<TPayload>((ConsoleOutputConfig)config);
        }

        publicIObserver<EdgeEvent<TPayload>> CreateEdgeObserverSink<TPayload>(object config)
        {
            returnnewConsoleDataConsumer<TPayload>((ConsoleOutputConfig)config);
        }

        publicIObserver<IntervalEvent<TPayload>> CreateIntervalObserverSink<TPayload>(object config)
        {
            returnnewConsoleDataConsumer<TPayload>((ConsoleOutputConfig)config);
        }

        publicOutputAdapterBase Create<TPayload>(ConsoleOutputConfig configInfo, EventShape eventShape)
        {
            switch (eventShape)
            {
                caseEventShape.Interval:
                    returnnewObserverTypedIntervalOutputAdapter<TPayload>(CreateIntervalObserverSink<TPayload>(configInfo));
                    break;
                caseEventShape.Edge:
                    returnnewObserverTypedEdgeOutputAdapter<TPayload>(CreateEdgeObserverSink<TPayload>(configInfo));
                    break;
                caseEventShape.Point:
                    returnnewObserverTypedPointOutputAdapter<TPayload>(CreatePointObserverSink<TPayload>(configInfo));
                    break;
                default:
                    thrownewArgumentOutOfRangeException("eventShape");
            }
        }

        publicvoid Dispose()
        {
            //throw new NotImplementedException();
        }
    }

We’ve not touched on the output adapters yet so now’s the time to introduce them; they are, after all, already referenced in our factory. As before, we have a single factory for our producers using the 2.1+ Reactive model as well as our legacy adapter mode. As with our input adapters, the output adapters are relatively thin wrappers around our event consumers that handle the details of lifetime. Unlike the input adapters, with the output adapters, we may well get some data after our “stop” event and we want to make sure that we dequeue all events before shutting down. To control this a little better, we use Monitor.Enter and Monitor.Exit directly rather than the basic lock{} block provided by C#. The lock block, by the way, creates, behind the scenes, a Monitor.Enter/Monitor.Exit pair. However, using this directly allows us to minimize the possibility of deadlocks if we get into a scenario where we are actively dequeuing events when we get a Resume call. By using Monitor.TryEnter(), we can attempt to enter our dequeuing thread from other threads without blocking. If the lock has already been acquired, we don’t need to spin up another thread to dequeue and we certainly don’t need to block waiting for a lock that we won’t actually need once we get it. Our dequeue thread will continue to dequeue 1 event at a time until nothing is left in the queue. And we need to make sure that the dequeue operation is synchronized – only 1 thread can dequeue at a time anyway. Adding multiple threads to the dequeue operation typically won’t help us and we want to make sure that we have all available threads available to process actual query results. Now … once we’ve dequeued, we may want to use techniques to multi-thread sending the results to the final target. But … our actual dequeue from each query/process should be single threaded. Keep in mind, however, that you’ll have multiple, single-threaded output sinks in most real-world applications. You will be multi-threaded, have no worries there. And calls into our event consumers can come from any thread, which is why we need to use locks to make sure that we’re properly synchronized. This is particularly important when our output adapter is stopped. After stop is called, we’ll get one more change to empty our queue. We use the monitor to make sure that we do empty all available events from the queue before calling Stopped(). This ensures that we’ll have a nice, clean shutdown with no hangs and no ObjectDisposedExcetpions.

Point Output Adapter
publicclassObserverTypedPointOutputAdapter<TPayloadType>
        : TypedPointOutputAdapter<TPayloadType>
    {
        privatereadonlyIObserver<PointEvent<TPayloadType>> _sinkObserver;

        public ObserverTypedPointOutputAdapter(IObserver<PointEvent<TPayloadType>> sinkObserver)
        {
            _sinkObserver = sinkObserver;
        }

        publicoverridevoid Stop()
        {
            try
            {
                Monitor.Enter(_monitorObject);
                //On last round to dequeue
                EmptyQueue();
                //Completed
                _sinkObserver.OnCompleted();
            }
            finally
            {
                Monitor.Exit(_monitorObject);
            }
            base.Stop();
            Stopped();
        }


        publicoverridevoid Resume()
        {
            System.Threading.Thread thd = newThread(DequeueEvents);
            thd.Start();
        }

        publicoverridevoid Start()
        {
            System.Threading.Thread thd = newThread(DequeueEvents);
            thd.Start();
        }

        privateobject _monitorObject = newobject();
        privatevoid DequeueEvents()
        {
            if (this.AdapterState != AdapterState.Running)
            {
                return;
            }
            //Ensures only 1 thread is dequeuing and no other threads are blocked.
            if (Monitor.TryEnter(_monitorObject))
            {
                try
                {
                    EmptyQueue();
                }
                catch (Exception ex)
                {
                    _sinkObserver.OnError(ex);
                }
                finally
                {
                    Monitor.Exit(_monitorObject);
                    this.Ready();
                }
                    
            }
        }

        privatevoid EmptyQueue()
        {
            PointEvent<TPayloadType> dequeuedEvent;
            
            while (this.Dequeue(out dequeuedEvent) == DequeueOperationResult.Success  )
            {
                    _sinkObserver.OnNext(dequeuedEvent);
            }
        }
    }

Now that we have all of the core pieces in place, let’s take a look at what we need to do to hook our sink up to the console output. It’s actually very simple.

Hooking up to a stream

privatestaticvoid RunProcess(Application cepApplication)
{
    var config = newTestDataInputConfig (){
        NumberOfItems=20,
        RefreshInterval=TimeSpan.FromMilliseconds(500)
    };


    var data = RxStream<TestDataEvent>.Create(cepApplication, typeof (TestDataInputFactory), config, EventShape.Point);

    var factory = new ConsoleOutputAdapter.ConsoleOutputFactory();
    
    
    var sink = cepApplication.DefineObserver(() => factory.CreatePointObserverSink<TestDataEvent>
                                                       (new ConsoleOutputAdapter.ConsoleOutputConfig()
                                                           {
                                                               ShowCti = true,
                                                               CtiEventColor = ConsoleColor.Blue,
                                                               InsertEventColor = ConsoleColor.Green
                                                           }));

    data.Bind(sink).Run();

}

There’s one thing that you may notice … the sink needs to know all of the details about the data class. This is far from ideal … and one of the things that I found so powerful about the untyped adapter model – you weren’t tied to the schema of your data classes. There are various ways that we can handle this but that’s a topic for the next entry. Until then, you can download the code from my SkyDrive.