Flink Queryable state not working -
i running flink ide. storing data in queryable is working, somehow when query it, throws exception
exeception
failure(akka.actor.actornotfound: actor not found for: actorselection[anchor(akka.tcp://flink@127.0.0.1:6123/), path(/user/jobmanager)])
my code:
config.setstring(configconstants.job_manager_ipc_address_key,"localhost") config.setstring(configconstants.job_manager_ipc_port_key,"6123") @throws[throwable] def recover(failure: throwable): future[array[byte]] = if (failure.isinstanceof[assertionerror]) return futures.failed(failure) else { // @ startup failures expected // due races. make sure don't // fail test. return patterns.after(retrydelay, test_actor_system.scheduler, test_actor_system.dispatcher, new callable[future[array[byte]]]() { @throws[exception] def call: future[array[byte]] = return getkvstatewithretries(queryname, key, serializedkey) }) } } @suppresswarnings(array("unchecked")) private def getkvstatewithretries(queryname: string, keyhash: int, serializedkey: array[byte]): future[array[byte]] = { val kvstate = client.getkvstate(jobid, queryname, keyhash, serializedkey) kvstate.recoverwith(recover(queryname, keyhash, serializedkey)) } def onsuccess = new onsuccess[array[byte]]() { @throws(classof[throwable]) override def onsuccess(result: array[byte]): unit = { println("found record ") val value = kvstaterequestserializer.deserializevalue(result, valueserializer) println(value) } } override def invoke(query: querymetadata): unit = { println("getting inside querystore"+query.record) val serializedresult = flinkquery.getresult(query.record, queryname) serializedresult.onsuccess(onsuccess)
i not spawning new mini-cluster or cluster.submit https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/apache/flink/test/query/queryablestateitcase.java want in same cluster in same envrionment main app running env.execute. step neccessary.
from documentation deault flink runs @ localhost:6123 there problem connection. need submit job in seperate cluster.
after lot of googling found solution.
i using localstreamenvironment , getting same error, until found thread remoteenv connect failed. error described different setup(not locally) gist example contained in topic used testing creating localflinkminicluster parameter "usesingleactorsystem" set false.
looking @ implementation of localstreamenvironment minicluster created "usesingleactorsystem" set true.
i created class localqueryablestreamenvironment extending localstreamenvironment mini cluster created "usesingleactorsystem" set true, , working ide.
now code follow:
configuration:
configuration config = new configuration(); config.setlong(taskmanageroptions.managed_memory_size, 6); config.setboolean(configconstants.local_start_webserver, true); config.setinteger(jobmanageroptions.web_port, jobmanageroptions.web_port.defaultvalue()); config.setboolean(queryablestateoptions.server_enable, true); config.setstring(jobmanageroptions.address, "localhost"); config.setinteger(jobmanageroptions.port,jobmanageroptions.port.defaultvalue()); **config.setinteger(configconstants.local_number_task_manager, 2);**
note: queryablestate works config local_number_task_manager set value more 1!
instantiate/execute environment:
localqueryablestreamenvironment env = localqueryablestreamenvironment.createlocalenvironment(3, config); ... env.addsource(anysource) .keyby(anyatribute) .flatmap(new updatemystatetobequeriedlatermapper()) .addsink(..); //etc ... env.execute("jobnamehere");
and create client:
final configuration config = new configuration(); config.setstring(jobmanageroptions.address, "localhost"); config.setinteger(jobmanageroptions.port, jobmanageroptions.port.defaultvalue()); highavailabilityservices highavailabilityservices = highavailabilityservicesutils .createhighavailabilityservices( config, executors.newsinglethreadscheduledexecutor(), highavailabilityservicesutils.addressresolution.try_address_resolution ); return new queryablestateclient(config,highavailabilityservices);
for more info access:
queryable states in apacheflink - implementation
queryable state client 1.3.0-rc0
my dependencies:
compile group: 'org.apache.flink', name: 'flink-java', version: '1.3.1' compile group: 'org.apache.flink', name: 'flink-jdbc', version: '1.3.1' compile group: 'org.apache.flink', name: 'flink-streaming-java_2.11', version: '1.3.1' compile group: 'org.apache.flink', name: 'flink-clients_2.11', version: '1.3.1' compile group: 'org.apache.flink', name: 'flink-cep_2.11', version: '1.3.1' compile group: 'org.apache.flink', name: 'flink-connector-kafka-0.10_2.11', version: '1.3.1' compile 'org.apache.flink:flink-runtime-web_2.11:1.3.1'
Comments
Post a Comment