Ruminations of J.net idle rants and ramblings of a code monkey

Simple StreamInsight App

StreamInsight | Code Sample

In a previous post, I talked about how to change one of the StreamInsight sample apps to run using a remote StreamInsight server. After a discussion with the person that asked the original question, I’ve put together a full sample app that can run either in process or connect to a remote server. Most of the code is completely identical … input adapters, output adapters, queries … as that part of the programming model is the same. There are, however, some differences between what you need to do for this kind of app and what you typically see in the samples. I’ve tried to include most, if not all, of the server/application bells and whistles to make it easy to play with.

Let’s start with the common settings for the sample application. First, the streamInsightInstanceName should point to the active and installed instance. If starting out of process, this is used to build the url to connect to the management service. It probably would be better to have the full url there when you are running remote … but I didn’t do it. Next, you need to specify the streamInsightAppName, which determines the name of the application to create or connect to.

Creating the Server – In Process

First, set the runInProcess key in the app.config to true. If you want to use the metadata service that was introduced in 1.1, set useMetadata to true. When creating the server, you need to specify the metadata service at startup. If using the metadata overload, it cannot be null. A final note, in StreamInsight 1.2, a new feature called checkpointing is being introduced. This feature allows for higher availability of StreamInsight instances; certain queries can have their entire state written out to disk at some interval and allows for long-running queries to be reconstituted on startup (instead of losing all that information). Checkpointing relies on the metadata services.

Server cepServer = AppSettings.Current.UseMetadata ?
Server.Create(AppSettings.Current.StreamInsightInstanceName, GetMetadataService()) :Server.Create(AppSettings.Current.StreamInsightInstanceName)

When running in process, you can also fire up the management service, which allows the Event Flow Debugger (and other tools) to connect to your StreamInsight instance. If a value is present in the managementServiceUrl settings key, this will be created. At this time, only the WSHttpBinding is supported. Note: you will need to run Visual Studio as admin -or- reserve whatever url you are going to use with WCF. See this blog post for details on that. I, personally, run VS as admin since it makes my life easier in this regard.

ServiceHost managementServiceHost = new ServiceHost(cepServer.CreateManagementService());//Adding the service host. 
//This allows remote clients to access this application, including the Query Debugger.managementServiceHost.AddServiceEndpoint(
typeof(IManagementService),
new WSHttpBinding(SecurityMode.Message), AppSettings.Current.ManagementServiceUrl);
managementServiceHost.Open();

Creating the Server – Remote

Since there are fewer knobs and switches for a remote instance, connecting to a running server is much simpler. With the sample app, it will connect to the standard url for the StreamInsight instance name specified in the config file. However, if you have the management service started on a custom in-process instance of StreamInsight, you will also be able to connect to that. By the way, tThe model used for the recently announced Azure SteamInsight service is the remote model.

var endpointAddress = new System.ServiceModel.EndpointAddress(
@"http://localhost/StreamInsight/" + AppSettings.Current.StreamInsightInstanceName);Server cepServer = Server.Connect(endpointAddress);

Getting the Application

In most of the demos/samples out there, not only is it running in process, it’s also running without metadata services. In those cases, you always need to create the application. However, you don’t always need to do this. If connecting to a remote instance or if you are using the metadata service, it is very possible that the application is already there. So we need to check before we grab the application object.

private static Application GetApplication(Server cepServer)
{
if (!cepServer.Applications.ContainsKey(AppSettings.Current.StreamInsightAppName))
{
Console.WriteLine("Creating new Cep Application");return cepServer.CreateApplication(AppSettings.Current.StreamInsightAppName);
}
else{
Console.WriteLine("Connecting to existing Cep Application");return cepServer.Applications[AppSettings.Current.StreamInsightAppName];
}
}

Creating the Queries

Creating the queries and attaching the adapters is no different whether you are running in process or out of process. However, we need to take into account that, like with the application object, the queries may already be there. With the remote instance, this is pretty clear. When using metadata, the queries will be created when you create the server. However, they will not be started – so you do need to start them. When starting a query, you need to check to see if it’s already running – you will get an exception if you try to start a query that is already started. There are 3 queries that are created and running using a technique called Dynamic Query Composition. This is actually pretty important to understand – but a topic for another post. One is the raw, source query, one is a hopping window aggregate and one is a snapshot window aggregate. It’s interesting to see the differences between the output of the hopping window aggregate and the snapshot window aggregate. The only input adapter used is the random data generator that is included in the StreamInsight Team Samples. Each query is attached to an Async CSV output adapter, again from the StreamInsight Team Samples (not required – you can have a standing, running query without an output adapter).

I’ve uploaded the sample to MSDN Code Samples. You can download it here.