Ruminations of idle rants and ramblings of a code monkey

StreamInsight Dynamic Filter Query Patterns

StreamInsight | Code Sample

A little while ago, I wrote a post about doing AdHoc Query Filters in StreamInsight. In the sample code, I accomplished the filter by using a user-defined function. This caused quite a bit of ugliness that I didn’t like but it did work. Since then, I’ve reworked and extended the filtering and I wanted to revisit it here. I’ve moved all of the query logic out of the user-defined function and into the StreamInsight engine, bypassing things like Split() and Join() as well as improving the overall performance characteristics.

The requirements are a bit expanded. While we still have a base DataItem, we’ve added another field to the key – AdapterId. So a data item’s uniqueness is defined by the combination of the ItemId and AdapterId. DataItem now looks like the following:

/// <summary>
/// Base class for all query data points/// </summary>public class DataItem{
/// <summary>
/// ID of the source item/// </summary>public string ItemId { get; set; }
/// <summary>
/// Identifier for the source adapter./// </summary>public string AdapterId { get; set; }
/// <summary>
/// Time item was received/caclulated/// </summary>public DateTime SourceTimestamp;

This is the base data type for all of our query data types. When we do dynamic filters, we want to be able to do include the AdapterId in the filter query. If present, then it should match the item’s AdapterId exactly. If missing, all AdapterId’s should match. A single filter could also have a number of groups included … some items from Adapter1, some items from Adapter2 and some from all adapters, which adds a ton of flexibility but also some complexity. The filters are defined by a FilterDefinition class, which has a property for the filter operation (equals, starts with, ends with, contains) and then an array of ItemKey classes. Each ItemKey class has the FilterValue (for the item id), the AdapterId (which can be null) and then a GroupId. The group id is really only needed for the Contains query. You’ll see why in a bit.

Our filter definition classes look like the following:

public enum FilterOperation{
/// <summary>
/// Defines a filter/// </summary>public class FilterDefinition{
/// <summary>
/// Operation to perform/// </summary>public FilterOperation Operation { get; set; }
/// <summary>
/// List of item keys./// </summary>public ItemKey[] ItemKeys { get; set; }
public class ItemKey{
/// <summary>
/// Source adapter for the item/// </summary>public string AdapterId { get; set; }
/// <summary>
/// ID for the item./// </summary>public string FilterValue { get; set; }
/// <summary>
/// Gets or sets the group id./// </summary>
/// <value>The group id.</value>public int GroupId { get; set; }

So far, pretty straightforward. Doing the code for the equals, starts with and ends with operations is straightforward and easy using an equijoin. The starts with version is below. Equals and ends with look the pretty much the same.

private static CepStream<T> EqualsFilter<T>(CepStream<T> source, CepStream<ItemKey> filterStream)
where T:DataItem{
var filtered =
from s in sourcefrom f in filterStreamwhere s.ItemId.StartsWith(f.FilterValue)
where String.IsNullOrWhiteSpace(f.Adapter) || f.Adapter == s.AdapterIdselect s;return filtered; 

Implementing contains, however, was a bit more challenging. Since Equals, Starts With and Ends With were all logical “OR” filters, doing a simple equi-join would work. With the Contains operation, however, it should be an “AND” operation … all of the FilterValues in a particular group should match, not just one. Because of that, an equi-join, which would return every item that matched at least one of the filter values, doesn’t work. This query is a bit more complex and takes several steps but it’s still very do-able. There are a couple of ways to do it but I’ll take the way that I’m showing in the demo step by step.

  1. Filter the source query to find all candidate matches. Note that these won’t be our final matches; it will return all items that match any 1 of the filter values. Since they need to match all, we have more work to do.
  2. Group the filter stream to determine how many matches each group should have. You will need to make sure that the FilterValues are distinct within a specific group for this to work properly. In a real-world application, this would come from the (most likely) input adapter and that will have to ensure the uniqueness. In the demo, I’m using Array.ToPointStream() … so I can use the Linq Distinct() to ensure this.
  3. Filter the initial candidate matches to pull out only the items that match all of the filter values in a specific grouping.
//Get all initial candidate matches.var initialMatches = from s in sourcefrom v in filterStreamwhere s.ItemId.Contains(v.FilterValue)
where s.AdapterId == v.AdapterId || String.IsNullOrWhiteSpace(v.AdapterId)
select new{
MatchedValue = v.FilterValue,
GroupId = v.GroupId,
ItemId = s.ItemId,
AdapterId = s.AdapterId
};//Get the required number of matches per group.var requiredMatchCounts = from v in filterStreamgroup v by v.GroupIdinto adapterGroupfrom a in adapterGroup.SnapshotWindow(SnapshotWindowOutputPolicy.Clip)
select new{
GroupId = adapterGroup.Key,
CountRequired = a.Count()
};//Count the number of matches.var countMatches = from i in initialMatchesgroup i by new { i.ItemId, i.GroupId, i.AdapterId } into cfrom v in c.SnapshotWindow(SnapshotWindowOutputPolicy.Clip)
select new{
AdapterId = c.Key.AdapterId,
GroupId = c.Key.GroupId,
ItemId = c.Key.ItemId,
Count = v.Count()
};var matched =
from c in countMatchesfrom r in requiredMatchCountswhere c.GroupId == r.GroupIdwhere c.Count == r.CountRequired//Selectselect new { Adapter = c.AdapterId, ItemId = c.ItemId };var final = from g in matchedfrom s in sourcewhere g.Adapter == s.AdapterIdwhere g.ItemId == s.ItemIdselect s;

Finally, the implementation uses an extension method on the standard StreamInsight Query object. Since this is designed specifically to utilized dynamic query composition (DQC), this makes it very simple and straightforward to use.

Query sourceQuery = cepApplication.Queries["SourceQuery"];CepStream<ValueDataItem<float>> filteredSourceStream =
sourceQuery.Filter<ValueDataItem<float>>(filterDefinition);Query filteredSourceQuery = filteredSourceStream.ToQuery(cepApplication, "FilteredSourceQuery", "Filtered Source Query",
typeof (TracerFactory), GetTracerConfig("Source"), EventShape.Point,


You can download the sample project from MSDN Code Samples.