Azure Stream Analytics Lag (and LAG)

Recently, I started to look at Azure PaaS model for MI/BI/Analytics solutions for one of my low latency\high throughput based projects here at work. After much thought and considering options like Storm, Redis Cache and Stream Analytics etc. I came to the conclusion that the latter is the most suitable technology for my requirements. So off I went and started to prototype it, experience has been largely pleasant except a few things which I will mention shortly here in this blog.

Ok, my key requirement for this was to analyse the data stream as it flows through and highlight the confidence of the applicant who is giving the driving theory test (for IPR reasons I’ve changed the example here), my objective is to find the applicants who are not confident of their answers when sitting for the theory driving test. Now this analysis in itself can be a major subject of discussion but I will leave it for another day and focus of the technology side of the things here rather. After looking at the examples on the web for this (like here) I was convinced that I could use inner join in the query to achieve what I wanted to achieve for my requirements (which are described below) until I bumped into another function called LAG which is discussed later.

Here are the requirements-

Data is received in the JSON format as described below-

{
 "ActivityId": "c8c64b10-f029-4dc6-93c5-31373ed72a31",
 "Timestamp": "2015-05-15T08:04:26.9445222Z",
 "ProcessingServer": "RD000D3A21C503",
 “ApplicantId”: “C0001”,
 "Attributes": {
 “BrakingDistanceAt20MPH”: “40”,
 “BrakingDistanceAt30MPH”: “75”
 }, 
{
 "ActivityId": "c8c64b10-f029-4dc6-93c5-31373ed72a31",
 "Timestamp": "2015-05-15T08:04:26.9445222Z",
 "ProcessingServer": "RD000D3A21C503",
 “ApplicantId”: “C0001”,
 "Attributes": {
 “BrakingDistanceAt20MPH”: “40”,
 “BrakingDistanceAt30MPH”: “80”
 }

Now in the above example if someone is changing the answers often as he\she is not sure of it then we would like to highlight that or even dynamically load questions which are more focused on this specific area. Technically, we want to emit an event every time an applicant changes the answer within certain time window (lets say within an hour in our case). That event will then be stored in cache along with the past events to provide the single applicant view at any point in time.

One way to achieve this in Stream Analytics is to use inner self join like this (inspired by this blog)

SELECT
 System.Timestamp as ReceivedOn,
 QMM1.ApplicantID,
 QMM1.Attributes.BrakingDistanceAt20MPH AS FromBrakingDistanceAt20MPH,
 QMM2.Attributes.BrakingDistanceAt20MPH AS ToBrakingDistanceAt20MPH,
 QMM1.Attributes.BrakingDistanceAt30MPH AS FromBrakingDistanceAt30MPH,
 QMM2.Attributes.BrakingDistanceAt30MPH AS ToBrakingDistanceAt30MPH
 FROM Input QMM1 TIMESTAMP BY [TimeStamp]
 JOIN
 Input QMM2 TIMESTAMP BY [TimeStamp]
 ON
 QMM1.ApplicantID = QMM2.ApplicantID
 AND
 DATEDIFF(ss, QMM1, QMM2) BETWEEN 1 AND 3600 -- for 1 hour
 WHERE
 QMM1.Attributes.BrakingDistanceAt20MPH != QMM2.Attributes.BrakingDistanceAt20MPH
 OR 
 QMM1.Attributes.BrakingDistanceAt30MPH != QMM2.Attributes.BrakingDistanceAt30MPH

This will work as expected but there are two minor issues with this-

  1. Output is not relative i.e. it shows the state change from initial state to final state i.e. 75->80, 75->85 for 30mph braking distance (ideally I would need 75->80, 80-85 mph, see the output below)
  2. Output is not real-time (reason is not known, possibly optimisation), I could only see the emitted events in the sink after 2-3 minutes which would defeat the purpose of using Stream Analytics\Event Hub for my scenario. This is recently explained by Zhong Chen here.

Output from the above query looks like this-

receivedon
applicantid
frombrakingdistanceat20mph
tobrakingdistanceat20mph
frombrakingdistanceat30mph
tobrakingdistanceat30mph
2015-05-15T08:05:26.944Z C0001 40 40 75 80
2015-05-15T08:05:26.944Z C0001 40 40 75 85

Here comes the help from another analytic function in Stream Analytics called LAG. This function compares your record for certain field(s) with the previous record in the defined time window and emits the event when the difference is found. This is exactly what I wanted  and the syntax is very concise as well so started to get on with it. Here’s the query-

WITH FlatInput
 AS
 (
 SELECT ApplicantID, 
 Attributes.BrakingDistanceAt20MPH AS BrakingDistanceAt20MPH, 
 Attributes.BrakingDistanceAt30MPH AS BrakingDistanceAt30MPH
 FROM Input
 )
SELECT ApplicantID, BrakingDistanceAt20MPH AS ToBrakingDistanceAt20MPH,
  LAG(BrakingDistanceAt20MPH) OVER (LIMIT DURATION(ss, 3600)) as FromBrakingDistanceAt20MPH,
  BrakingDistanceAt30MPH AS ToBrakingDistanceAt30MPH,
  LAG(BrakingDistanceAt30MPH) OVER (LIMIT DURATION(ss, 3600)) as FromBrakingDistanceAt30MPH
 FROM FlatInput

This was not as simple as it seems here, problem is that LAG function does not like qualified fields (or nested fields) in JSON i.e. I could not use Attributes.BrakingDistanceAt20MPH is LAG function above directly, it will either throw the error saying invalid field name or give you a compile time error. This error is a bit misleading though as the field name is perfectly valid qualified name so I spoke to the MS CSA (Rupert Benbrook) who then asked me to try flattening the rows first using WITH and then use the aliases in LAG function as a second query (as defined above)  and guess what it started to work. So this is something to keep in mind when you use LAG function.

Here is the output of the above query, displaying the relative changes in output (30mph) now-

applicantid
tobrakingdistanceat20mph
frombrakingdistanceat20mph
tobrakingdistanceat30mph
frombrakingdistanceat30mph
C0001 40 75
C0001 40 40 80 75
C0001 40 40 85 80

In the end, I resorted to LAG function which is a much cleaner way to emit the events as per my requirements. Hope this helps someone who is running with the similar activity.