Ruminations of J.net idle rants and ramblings of a code monkey

Dual Mode Data Sources-Part III

StreamInsight | Code Sample

In the two previous postings, we would through creating the data sources and exposing them to both the new Reactive-centric API and the “legacy” adapter-centric API. While we’ve accomplished that, I also said that we’d revisit the 2.1 code to add a layer of consistency to the APIs; we’re going to “bring back” factories and integrate these into our API. Why on earth would I do that? Didn’t we just get rid of them to make things simpler? Well, yes, we did just do away with the requirement of a factory but that doesn’t mean that they aren’t a good idea. Implementing the factory pattern will hide the details of creating the actual data source – our current one is pretty simple but others may get quite complex – and provide a layer to “check” requirements before we try to start the query (we’re already doing this with the adapter factory). I also happen to like consistent APIs and keeping consistency whenever practical and possible.

We’ll start by creating an interface. We may – and likely will – also wind up creating an abstract base class that implements the interface and handles common functionality but that will be refactored later after we write some more sources and get a better feel of how to best define the base class. And … having both an interface and a base class gives us the greatest level of flexibility when implementing later on; because our API is based on interfaces, we can inherit from other, existing code and/or components and just add the interface. If we based our API exclusively on base classes, the single-inheritance rule would limit what that. This interface will be pretty simple and only have a single method.

ISourceFactory
/// <summary>
/// Interface for data source factories.
/// </summary>
public interface ISourceFactory
{
/// <summary>
/// Creates an observable source.
/// </summary>
/// <typeparam name="TPayload">Type of the payload.</typeparam>
/// <param name="config">Configuration class.</param>
/// <param name="eventShape">Shape of the events</param>
/// <returns></returns>
IObservable<StreamInputEvent<TPayload>> CreateObservableSource<TPayload>(object config, EventShape eventShape);
}

You may notice a slight different between this and the input adapter factory interface – config is not strongly typed. Yes, we could do that … define a generic interface and then use reflection to call it. But, to be honest, I really didn’t want to go through all of the contortions to make that happen. So the config is an object.

Now, when we implement this on our existing factory, we also refactor things a bit to maximize code reuse.

Adapter + Source Factory
public sealed class TestDataInputFactory : ITypedInputAdapterFactory<TestDataInputConfig>, ISourceFactory
{

[SuppressMessage("Microsoft.Design", "CA1004:GenericMethodsShouldProvideTypeParameter", Justification = "By Design")]
public InputAdapterBase Create<TPayload>(TestDataInputConfig configInfo, EventShape eventShape)
{
CheckPayloadType<TPayload>();
return new ObservableTypedPointInputAdapter<TestDataEvent, TestDataInputConfig>(CreateProducer(configInfo, eventShape));
}

public IObservable<StreamInputEvent<TPayload>> CreateObservableSource<TPayload>(object config, EventShape eventShape)
{
//Check the payload type.
CheckPayloadType<TPayload>();
//Check the config class for the proper type.
TestDataInputConfig typedConfig = config as TestDataInputConfig;
if (typedConfig == null)
{
//Invalid cast
throw new ArgumentException("Configuration class must be of type TestDataInputConfig");
}
return (IObservable<StreamInputEvent<TPayload>>)CreateProducer(typedConfig, eventShape);
}

private static void CheckPayloadType<TPayload>()
{
//Check the payload.
if (typeof(TPayload) != typeof(TestDataEvent))
{
//this won't work.
//throw an exception.
throw new InvalidOperationException("Specified type must be of " + typeof(TestDataEvent).FullName);
}
}


private TestDataProducer CreateProducer(TestDataInputConfig config, EventShape eventShape)
{
switch (eventShape)
{
case EventShape.Point:
//Create the publisher.
return new TestDataProducer(config);
default:
throw new ArgumentException(string.Format(
System.Globalization.CultureInfo.InvariantCulture,
"TestDataInputFactory cannot instantiate adapter with event shape {0}",
eventShape.ToString()));
}
}

public void Dispose()
{
}
}

You’ll notice that both methods … source and adapter … use all of the same validation logic and code. Using this in our existing code isn’t too difficult. Our new “RunProcess()” now looks like the following:

Run Process
private static void RunProcess(Application cepApplication)
{
var config = new TestDataInputConfig (){
NumberOfItems=20,
RefreshInterval=TimeSpan.FromMilliseconds(500)
};


var data = cepApplication.DefineObservable(
() => new TestDataInputFactory().CreateObservableSource<TestDataEvent>(config,EventShape.Point ))
.ToPointStreamable(e => e.GetPointEvent());


var sink = cepApplication.DefineObserver(() => Observer.Create<TestDataEvent>(
e => Console.WriteLine("TestEvent ItemId:{0} Run:{1} Value{2}", e.ItemId, e.RunNumber, e.Value)));

data.Bind(sink).Run();

}

We can still make it better, though, and make make it look more like the pre-2.1 API. We’ll start by creating an RxStream class and add methods for creating observables. We’ll also want to make sure that these methods are remotable – so we can work with either a local or a remote instance without any code changes. This was a challenge with the pre-2.1 API; you could pretty easily get yourself into a situation where your code work work on a local, in-process instance but not work at all when you were connecting to a remote instance. With DefineObservable, however, it’s actually defining a remote function that returns an observable and that’s what we call … whether in process or remote. If it works in-process then it’ll work out of process. However, you may wind up getting yourself into a situation with mysterious serialization errors … you need to make sure that whatever you pass to your methods if fully serializable. That’s why our configuration class has the DataContract attribute. With our CreateObservable method, we’ll first check to see if we have a reference to the function (and notice that it is of type Func<>). If not, we check to see if it’s been created and deployed. If not, we create and deploy it. We could also put this same code in a static constructor – it wouldn’t make much difference. The actual work is done by an InstantiateObservable private method and that’s what our defined Observable actually calls.

RxStream Observables
private static Func<Type, object, EventShape, IQbservable<StreamInputEvent<TPayload>>> _observable;

public static IQbservable<StreamInputEvent<TPayload>> CreateObservable(Application cepApplication,
Type sourceFactoryType,
object configInfo,
EventShape eventShape)
{
string entityName = "Observable:" + typeof (TPayload).FullName;
if (_observable == null)
{
//Check the application.
if (cepApplication.Entities.ContainsKey(entityName))
{
_observable =
cepApplication.GetObservable<Type, object, EventShape, StreamInputEvent<TPayload>>(entityName);
}
else
{
//Define and deploy.
_observable = cepApplication.DefineObservable(
(Type t, object c, EventShape e) => InstantiateObservable(t, c, e));
_observable.Deploy(entityName);
}

}
return _observable.Invoke(sourceFactoryType, configInfo, eventShape);
}

private static IObservable<StreamInputEvent<TPayload>> InstantiateObservable(Type sourceFactoryType,
object configInfo,
EventShape eventShape)
{
var sourceFactory = Activator.CreateInstance(sourceFactoryType) as ISourceFactory;
if (sourceFactory == null)
{
throw new ArgumentException("Specified type is not a source factory.");
}
return sourceFactory.CreateObservableSource<TPayload> (configInfo, eventShape);
}

Now all that we have left to do is to create methods to also create the streams, rather than just the observables. This will be our typical use case but we’ll still keep the observable method public as well to give us the most flexibility when we are actually using this in anger.

RxStream Create
public static IQStreamable<TPayload> Create(
Application cepApplication, Type sourceFactoryType, object configInfo, EventShape eventShape)
{
return Create(cepApplication, sourceFactoryType, configInfo, eventShape, null);
}

public static IQStreamable<TPayload> Create(
Application cepApplication, Type sourceFactoryType, object configInfo, EventShape eventShape, AdvanceTimeSettings advanceTimeSettings)
{

var observable = CreateObservable(cepApplication, sourceFactoryType, configInfo, eventShape);

switch (eventShape)
{
case EventShape.Interval:
return observable.ToIntervalStreamable(e => e.GetIntervalEvent(), advanceTimeSettings);

case EventShape.Edge:
return observable.ToEdgeStreamable(e => e.GetEdgeEvent(), advanceTimeSettings);

case EventShape.Point:
return observable.ToPointStreamable(e => e.GetPointEvent(), advanceTimeSettings);

default:
throw new ArgumentOutOfRangeException("eventShape");
}
}

Once this is in place, the code to create our stream look remarkably similar to the adapter-centric code:

Creating the Stream
var config = new TestDataInputConfig (){
NumberOfItems=20,
RefreshInterval=TimeSpan.FromMilliseconds(500)
};


var data = RxStream<TestDataEvent>.Create(cepApplication, typeof (TestDataInputFactory), config, EventShape.Point);

There’s still more that we’ll need to do … for example, we’ll need to create overloads to import CTIs. But, for now, we’re done with the core API for our input adapters and will be moving on to output adapters/sinks in the next article.

You can download the current from SkyDrive

Comments are closed