Ruminations of J.net idle rants and ramblings of a code monkey

Specifying CTIs with LinqPad

StreamInsight | Code Sample

I’ve said it before and I’ll say it again … LinqPad is an essential tool for anyone doing StreamInsight applications. And don’t just settle for the free version but get the Pro version (at least) since it has Intellisense. I’m not ashamed to admit that I am completely, totally addicted to Intellisense (which I find somewhat amusing at times because it annoyed me to no end when it first came out in VB 5.0 – but then I got used to it and descended into my current addiction).

With that said (again), one thing that I’ve found a little … oh … less-than-perfect doesn’t have to do with LinqPad but with the way that all of the StreamInsight samples create the streams, which also happened to be how I was creating my streams. Until recently, that is. You see, AdvanceTimeSettings.IncreasingStartTime doesn’t always mirror how we are going to see data in the real world. It also doesn’t allow you to show how CTIs can be used to handle little issues like latency from the source data. To do that, you really need to specify your own CTIs so that you can control – and others can see - exactly where the CTI is issued in relation to the enqueued Insert events. You also can’t test/prototype query scenarios where you have multiple events with the same identifier in a single CTI span – or no events within a CTI span. Both of these scenarios can – and do – happen in the real world. But … and this depends on the adapter … you may want to handle CTI’s in your input adapter itself rather than relying on AdvanceTimeSettings. It turns out, however, that it’s really not that difficult.

Let’s start with how we typically do it. First, we have some source data as an array and a function to create our event timestamp. Then we create the point stream from the array using ToPointStream (or ToIntervalStream or ToEdgeStream). Here’s a code example and the results from LinqPad:

void Main()
{
Func<int, DateTimeOffset> t = 
(s) => new DateTimeOffset(2011, 1, 11, 8, 0, 0, TimeSpan.Zero).AddSeconds(s);var values = new []
{
new {Item="Variable1", Value=92, Timestamp=0},
new {Item="Variable2", Value=60, Timestamp=0},
new {Item="Variable1", Value=93, Timestamp=2},
new {Item="Variable2", Value=75, Timestamp=2},
new {Item="Variable1", Value=88, Timestamp=3},
new {Item="Variable2", Value=81, Timestamp=3},
new {Item="Variable1", Value=93, Timestamp=5},
new {Item="Variable2", Value=82, Timestamp=5}
};var valueStream = values.ToPointStream(Application, 
e => PointEvent.CreateInsert(t(e.Timestamp), new {Item = e.Item, Value = e.Value}),
AdvanceTimeSettings.IncreasingStartTime); 
valueStream.ToPointEnumerable().Dump("Results"); 
}

 

Results

IEnumerable<PointEvent<>> (13 items)

EventKind

StartTime

Payload

Insert

1/11/2011 8:00:00 AM

ø

{ Item = Variable1, Value = 92 }

Item

Variable1

Value

92

Cti

1/11/2011 8:00:00 AM

null

Insert

1/11/2011 8:00:00 AM

ø

{ Item = Variable2, Value = 60 }

Item

Variable2

Value

60

Insert

1/11/2011 8:00:02 AM

ø

{ Item = Variable1, Value = 93 }

Item

Variable1

Value

93

Cti

1/11/2011 8:00:02 AM

null

Insert

1/11/2011 8:00:02 AM

ø

{ Item = Variable2, Value = 75 }

Item

Variable2

Value

75

Insert

1/11/2011 8:00:03 AM

ø

{ Item = Variable1, Value = 88 }

Item

Variable1

Value

88

Cti

1/11/2011 8:00:03 AM

null

Insert

1/11/2011 8:00:03 AM

ø

{ Item = Variable2, Value = 81 }

Item

Variable2

Value

81

Insert

1/11/2011 8:00:05 AM

ø

{ Item = Variable1, Value = 93 }

Item

Variable1

Value

93

Cti

1/11/2011 8:00:05 AM

null

Insert

1/11/2011 8:00:05 AM

ø

{ Item = Variable2, Value = 82 }

Item

Variable2

Value

82

Cti

12/31/9999 11:59:59 PM

null

Most of the samples filter the CTIs out from the dump but I like to see them (always). Of course, since this post is about CTIs, we definitely need to see them. If you take a look at the results, you’ll see that the CTIs aren’t exactly where you might expect them to be. When you use IncreasingStartTime, the engine “watches” for a new start time to be enqueued with an event. It then enqueues a CTI with that new event’s start time. The next event – with the same start time – is in the next CTI span. So each CTI span has events with two different start times!

Let’s change it around a bit. There is an overload of ToPointStream that takes an AdvanceTimeSettings, which gives you more control over your CTIs. Changing the code around a bit, we certainly get different results:

AdvanceTimeSettings ats = new AdvanceTimeSettings(
new AdvanceTimeGenerationSettings(
TimeSpan.FromSeconds(2), TimeSpan.FromSeconds(0)), null, AdvanceTimePolicy.Drop);

var
valueStream = values.ToPointStream(Application, e => PointEvent.CreateInsert(t(e.Timestamp), new {Item = e.Item, Value = e.Value}),ats, "Values");

 

Values

IEnumerable<PointEvent<>> (11 items)

EventKind

StartTime

Payload

Insert

1/11/2011 8:00:00 AM

ø

{ Item = Variable1, Value = 92 }

Item

Variable1

Value

92

Cti

1/11/2011 8:00:00 AM

null

Insert

1/11/2011 8:00:00 AM

ø

{ Item = Variable2, Value = 60 }

Item

Variable2

Value

60

Insert

1/11/2011 8:00:02 AM

ø

{ Item = Variable1, Value = 93 }

Item

Variable1

Value

93

Cti

1/11/2011 8:00:02 AM

null

Insert

1/11/2011 8:00:02 AM

ø

{ Item = Variable2, Value = 75 }

Item

Variable2

Value

75

Insert

1/11/2011 8:00:03 AM

ø

{ Item = Variable2, Value = 81 }

Item

Variable2

Value

81

Insert

1/11/2011 8:00:03 AM

ø

{ Item = Variable1, Value = 88 }

Item

Variable1

Value

88

Insert

1/11/2011 8:00:05 AM

ø

{ Item = Variable1, Value = 93 }

Item

Variable1

Value

93

Cti

1/11/2011 8:00:05 AM

null

Insert

1/11/2011 8:00:05 AM

ø

{ Item = Variable2, Value = 82 }

Item

Variable2

Value

82

 

It is different and it’s also close to what I want. But there are still events in there that have mixed start times within the same CTI window. But the example above isn’t quite fair … if I add a touch of a delay into the AdvanceTimeSettings begin to look more like what I expect. But … if you look above, we aren’t getting them every 2 seconds. We still have a patch of events with different start times. And – notice – they don’t come every 2 seconds like clockwork. Instead the CTIs are enqueued only after an event start time changed in the CTI. The only way to resolve it is to take complete control over the CTIs … so we add them into the source data. We don’t have to specify any AdvanceTimeGenerationSettings since, of course, we are enqueing manually. Which gives us the following code and output:

void Main()
{
Func<int, DateTimeOffset> t = 
(s) => new DateTimeOffset(2011, 1, 11, 8, 0, 0, TimeSpan.Zero).AddSeconds(s);
var values = new [] { new {Item="Variable2", Value=60, Timestamp=0}, new {Item="CTI", Value=60, Timestamp=0}, new {Item="Variable1", Value=93, Timestamp=2}, new {Item="Variable2", Value=75, Timestamp=2}, new {Item="Variable1", Value=88, Timestamp=3}, new {Item="Variable2", Value=81, Timestamp=3}, new {Item="CTI", Value=60, Timestamp=3}, new {Item="Variable1", Value=93, Timestamp=5}, new {Item="Variable2", Value=82, Timestamp=5}, new {Item="CTI", Value=60, Timestamp=5} };var sourceData = values.ToPointStream(Application, e => e.Item != "CTI" ? PointEvent.CreateInsert(t(e.Timestamp), new Payload(){Item=e.Item, Value=e.Value}): PointEvent<Payload>.CreateCti(t(e.Timestamp).AddTicks(1))); sourceData.ToPointEnumerable().Dump("Results"); } // Define other methods and classes herepublic struct Payload{ public string Item;public int Value; }

Note that we aren’t using an anonymous type for the stream – we can’t. You’ll get a compile error if you do. Also, the method that we’re using isn’t very reusable and we’ll wind up writing the same thing over and over again and tweaked to whatever we did. Finally, I’m not really thrilled about the clarity. But we can kick this up a notch and use a fully reusable method that handles creating the events and can use anonymous types, thanks to the wonderful goodness that are lamdas. Check it out:

public static PointEvent<TPayload> GetPointEvent<TPayload, TSource>(
TSource source, 
Func<TSource, bool> ctiSelectExpression, 
Func<TSource, TPayload> payloadSelectExpression, 
Func<TSource, DateTimeOffset> eventTimeExpression)
{
bool isCti = ctiSelectExpression.Invoke(source);if(isCti)
{
return PointEvent<TPayload>.CreateCti(eventTimeExpression.Invoke(source)); 
}
return PointEvent<TPayload>.CreateInsert(eventTimeExpression.Invoke(source), 
payloadSelectExpression.Invoke(source)); 
}

We can then use this in ToPointStream:

var sourceData = values.ToPointStream(Application, 
e => GetPointEvent(e, 
i=> i.Item == "CTI", 
i=> new {Item=i.Item, Value = i.Value}, 
i => t(i.Timestamp)));
The output is the same as the first method but, in this case, it is more reusable and I find it a touch simpler.