scala - Apache Flink : Creating a Lagged Datastream -


i starting out apache flink using scala. can please tell me how create lagged stream(lagged k events or k units of time) current datastream have?

basically, want implement auto regression model (linear regression on stream time lagged version of itself) on data-stream. so, method needed similar following pseudo code.

val ds : datastream = ...  val laggedds : datastream = ds.map(lag _)  def lag(ds : datastream, k : time) : datastream = {  } 

i expect sample input , output if every event spaced @ 1 second interval , there 2 second lag.

input : 1, 2, 3, 4, 5, 6, 7...
output: na, na, 1, 2, 3, 4, 5...

given requirements right, implement flatmapfunction fifo queue. queue buffers k events , emits head whenever new event arrives. in case need fault tolerant streaming application, queue must registered state. flink take care of checkpointing state (i.e., queue) , restore in case of failure.

the flatmapfunction this:

class lagger(val k: int)      extends flatmapfunction[x, x]      checkpointed[mutable.queue[x]]  {    var fifo: mutable.queue[x] = new mutable.queue[x]()    override def flatmap(value: x, out: collector[x]): unit = {     // add new element queue     fifo.enqueue(value)     if (fifo.size == k + 1) {       // remove head element , emit       out.collect(fifo.dequeue())     }   }    // restore state   override def restorestate(state: mutable.queue[x]) = { fifo = state }    // state checkpoint   override def snapshotstate(cid: long, cts: long): mutable.queue[x] = fifo  } 

returning elements time lag more involved. require timer threads emission because function called when new element arrives.


Comments

Popular posts from this blog

How to use SUM() in MySQL for calculated values -

loops - Spock: How to use test data with @Stepwise -