scala - Apache Flink : Creating a Lagged Datastream -


i starting out apache flink using scala. can please tell me how create lagged stream(lagged k events or k units of time) current datastream have?

basically, want implement auto regression model (linear regression on stream time lagged version of itself) on data-stream. so, method needed similar following pseudo code.

val ds : datastream = ...  val laggedds : datastream = ds.map(lag _)  def lag(ds : datastream, k : time) : datastream = {  } 

i expect sample input , output if every event spaced @ 1 second interval , there 2 second lag.

input : 1, 2, 3, 4, 5, 6, 7...
output: na, na, 1, 2, 3, 4, 5...

given requirements right, implement flatmapfunction fifo queue. queue buffers k events , emits head whenever new event arrives. in case need fault tolerant streaming application, queue must registered state. flink take care of checkpointing state (i.e., queue) , restore in case of failure.

the flatmapfunction this:

class lagger(val k: int)      extends flatmapfunction[x, x]      checkpointed[mutable.queue[x]]  {    var fifo: mutable.queue[x] = new mutable.queue[x]()    override def flatmap(value: x, out: collector[x]): unit = {     // add new element queue     fifo.enqueue(value)     if (fifo.size == k + 1) {       // remove head element , emit       out.collect(fifo.dequeue())     }   }    // restore state   override def restorestate(state: mutable.queue[x]) = { fifo = state }    // state checkpoint   override def snapshotstate(cid: long, cts: long): mutable.queue[x] = fifo  } 

returning elements time lag more involved. require timer threads emission because function called when new element arrives.


Comments

Popular posts from this blog

amazon web services - S3 Pre-signed POST validate file type? -

c# - Check Keyboard Input Winforms -