Flink Queryable state not working -


i running flink ide. storing data in queryable is working, somehow when query it, throws exception

exeception

failure(akka.actor.actornotfound: actor not found for: actorselection[anchor(akka.tcp://flink@127.0.0.1:6123/), path(/user/jobmanager)]) 

my code:

config.setstring(configconstants.job_manager_ipc_address_key,"localhost") config.setstring(configconstants.job_manager_ipc_port_key,"6123")  @throws[throwable] def recover(failure: throwable): future[array[byte]] = if (failure.isinstanceof[assertionerror]) return futures.failed(failure) else {   // @ startup failures expected   // due races. make sure don't   // fail test.   return patterns.after(retrydelay, test_actor_system.scheduler, test_actor_system.dispatcher, new callable[future[array[byte]]]() {     @throws[exception]     def call: future[array[byte]] = return getkvstatewithretries(queryname, key, serializedkey)   }) } }    @suppresswarnings(array("unchecked"))   private def getkvstatewithretries(queryname: string,                                 keyhash: int,                                 serializedkey: array[byte]): future[array[byte]] = {  val kvstate = client.getkvstate(jobid, queryname, keyhash, serializedkey) kvstate.recoverwith(recover(queryname, keyhash, serializedkey))   }  def onsuccess = new onsuccess[array[byte]]() { @throws(classof[throwable]) override def onsuccess(result: array[byte]): unit = {   println("found record ")   val value = kvstaterequestserializer.deserializevalue(result, valueserializer)   println(value)  } }   override def invoke(query: querymetadata): unit = { println("getting inside querystore"+query.record) val serializedresult = flinkquery.getresult(query.record, queryname) serializedresult.onsuccess(onsuccess)  

i not spawning new mini-cluster or cluster.submit https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/apache/flink/test/query/queryablestateitcase.java want in same cluster in same envrionment main app running env.execute. step neccessary.

from documentation deault flink runs @ localhost:6123 there problem connection. need submit job in seperate cluster.

after lot of googling found solution.

i using localstreamenvironment , getting same error, until found thread remoteenv connect failed. error described different setup(not locally) gist example contained in topic used testing creating localflinkminicluster parameter "usesingleactorsystem" set false.

looking @ implementation of localstreamenvironment minicluster created "usesingleactorsystem" set true.

i created class localqueryablestreamenvironment extending localstreamenvironment mini cluster created "usesingleactorsystem" set true, , working ide.

now code follow:

configuration:

configuration config = new configuration(); config.setlong(taskmanageroptions.managed_memory_size, 6); config.setboolean(configconstants.local_start_webserver, true); config.setinteger(jobmanageroptions.web_port, jobmanageroptions.web_port.defaultvalue()); config.setboolean(queryablestateoptions.server_enable, true); config.setstring(jobmanageroptions.address, "localhost"); config.setinteger(jobmanageroptions.port,jobmanageroptions.port.defaultvalue()); **config.setinteger(configconstants.local_number_task_manager, 2);** 

note: queryablestate works config local_number_task_manager set value more 1!

instantiate/execute environment:

localqueryablestreamenvironment env = localqueryablestreamenvironment.createlocalenvironment(3, config); ... env.addsource(anysource)    .keyby(anyatribute)    .flatmap(new updatemystatetobequeriedlatermapper())    .addsink(..); //etc ... env.execute("jobnamehere"); 

and create client:

final configuration config = new configuration(); config.setstring(jobmanageroptions.address, "localhost"); config.setinteger(jobmanageroptions.port, jobmanageroptions.port.defaultvalue());  highavailabilityservices highavailabilityservices = highavailabilityservicesutils     .createhighavailabilityservices(                    config,                     executors.newsinglethreadscheduledexecutor(),                    highavailabilityservicesutils.addressresolution.try_address_resolution     ); return new queryablestateclient(config,highavailabilityservices); 

for more info access:

queryable states in apacheflink - implementation

queryable state client 1.3.0-rc0

my dependencies:

compile group: 'org.apache.flink', name: 'flink-java', version: '1.3.1' compile group: 'org.apache.flink', name: 'flink-jdbc', version: '1.3.1' compile group: 'org.apache.flink', name: 'flink-streaming-java_2.11', version: '1.3.1' compile group: 'org.apache.flink', name: 'flink-clients_2.11', version: '1.3.1' compile group: 'org.apache.flink', name: 'flink-cep_2.11', version: '1.3.1' compile group: 'org.apache.flink', name: 'flink-connector-kafka-0.10_2.11', version: '1.3.1' compile 'org.apache.flink:flink-runtime-web_2.11:1.3.1' 

Comments

Popular posts from this blog

amazon web services - S3 Pre-signed POST validate file type? -

c# - Check Keyboard Input Winforms -