How to create a state store with HashMap as value in Kafka streams? -


i need create state store string key hashmap value. tried below 2 methods.

// first method statestoresupplier avgstorenew = stores.create("avgsnew")           .withkeys(serdes.string())           .withvalues(hashmap.class)           .persistent()           .build();  // second method hashmap<string ,double> h = new hashmap<string ,double>();  statestoresupplier avgstore1 = stores.create("avgs")           .withkeys(serdes.string())           .withvalues(serdes.serdefrom(h.getclass()))           .persistent()           .build(); 

the code compiles fine without error, runtime error

io.confluent.examples.streams.wordcountprocessorapiexception in thread "main" java.lang.illegalargumentexception: unknown class built-in serializer 

can suggest me correct way create state store?

if want create state store, need provide serializer , deserializer class type want use. in kafka stream, there single abstraction called serde wraps serializer , deserializer in single class.

if use .withvalues(class<k> keyclass) must hold that

@param keyclass class keys, must 1 of types kafka has built-in serdes

because there no built-in serdes hashmap need implement 1 first (maybe called hashmapserde) , give class method .withvalues(serde<k> keyserde). furhtermore, must implement actual serializer , deserializer hashmap, too. if know generic types of hashmap, should specify them (what make implementation of serializer , deserializer simpler.

something (just sketch; generic types omitted):

import org.apache.kafka.common.serialization.serde; import org.apache.kafka.common.serialization.serializer; import org.apache.kafka.common.serialization.deserializer;  public class hashmapserde implements serde<hashmap> {      void configure(map<string, ?> configs, boolean iskey) {         /* put code here */     }      void close() {         /* put code here */     }      serializer<hashmap> serializer() {         return new serializer<hashmap>() {             public void configure(map<string, ?> configs, boolean iskey) {                 /* put code here */             }              public byte[] serialize(string topic, t data) {                 /* put code here */             }              public void close() {                 /* put code here */             }         };     }      deserializer<hashmap> deserializer() {         return new deserializer<hashmap>() {             public void configure(map<string, ?> configs, boolean iskey) {                 /* put code here */             }              public t deserialize(string topic, byte[] data) {                 /* put code here */             }              public void close() {                 /* put code here */             }         };     } } 

if want see examples how implement (de)serializers , serde, have https://github.com/apache/kafka/tree/trunk/clients/src/main/java/org/apache/kafka/common/serialization , https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/serialization/serdes.java


Comments

Popular posts from this blog

amazon web services - S3 Pre-signed POST validate file type? -

c# - Check Keyboard Input Winforms -