scala - Apache Flink : Creating a Lagged Datastream -
i starting out apache flink using scala. can please tell me how create lagged stream(lagged k events or k units of time) current datastream have?
basically, want implement auto regression model (linear regression on stream time lagged version of itself) on data-stream. so, method needed similar following pseudo code.
val ds : datastream = ... val laggedds : datastream = ds.map(lag _) def lag(ds : datastream, k : time) : datastream = { }
i expect sample input , output if every event spaced @ 1 second interval , there 2 second lag.
input : 1, 2, 3, 4, 5, 6, 7...
output: na, na, 1, 2, 3, 4, 5...
given requirements right, implement flatmapfunction
fifo queue. queue buffers k
events , emits head whenever new event arrives. in case need fault tolerant streaming application, queue must registered state. flink take care of checkpointing state (i.e., queue) , restore in case of failure.
the flatmapfunction
this:
class lagger(val k: int) extends flatmapfunction[x, x] checkpointed[mutable.queue[x]] { var fifo: mutable.queue[x] = new mutable.queue[x]() override def flatmap(value: x, out: collector[x]): unit = { // add new element queue fifo.enqueue(value) if (fifo.size == k + 1) { // remove head element , emit out.collect(fifo.dequeue()) } } // restore state override def restorestate(state: mutable.queue[x]) = { fifo = state } // state checkpoint override def snapshotstate(cid: long, cts: long): mutable.queue[x] = fifo }
returning elements time lag more involved. require timer threads emission because function called when new element arrives.
Comments
Post a Comment