How to create a state store with HashMap as value in Kafka streams? -
i need create state store string key hashmap value. tried below 2 methods.
// first method statestoresupplier avgstorenew = stores.create("avgsnew") .withkeys(serdes.string()) .withvalues(hashmap.class) .persistent() .build(); // second method hashmap<string ,double> h = new hashmap<string ,double>(); statestoresupplier avgstore1 = stores.create("avgs") .withkeys(serdes.string()) .withvalues(serdes.serdefrom(h.getclass())) .persistent() .build();
the code compiles fine without error, runtime error
io.confluent.examples.streams.wordcountprocessorapiexception in thread "main" java.lang.illegalargumentexception: unknown class built-in serializer
can suggest me correct way create state store?
if want create state store, need provide serializer , deserializer class type want use. in kafka stream, there single abstraction called serde wraps serializer , deserializer in single class.
if use .withvalues(class<k> keyclass)
must hold that
@param keyclass class keys, must 1 of types kafka has built-in serdes
because there no built-in serdes
hashmap
need implement 1 first (maybe called hashmapserde
) , give class method .withvalues(serde<k> keyserde)
. furhtermore, must implement actual serializer , deserializer hashmap
, too. if know generic types of hashmap, should specify them (what make implementation of serializer , deserializer simpler.
something (just sketch; generic types omitted):
import org.apache.kafka.common.serialization.serde; import org.apache.kafka.common.serialization.serializer; import org.apache.kafka.common.serialization.deserializer; public class hashmapserde implements serde<hashmap> { void configure(map<string, ?> configs, boolean iskey) { /* put code here */ } void close() { /* put code here */ } serializer<hashmap> serializer() { return new serializer<hashmap>() { public void configure(map<string, ?> configs, boolean iskey) { /* put code here */ } public byte[] serialize(string topic, t data) { /* put code here */ } public void close() { /* put code here */ } }; } deserializer<hashmap> deserializer() { return new deserializer<hashmap>() { public void configure(map<string, ?> configs, boolean iskey) { /* put code here */ } public t deserialize(string topic, byte[] data) { /* put code here */ } public void close() { /* put code here */ } }; } }
if want see examples how implement (de)serializers , serde
, have https://github.com/apache/kafka/tree/trunk/clients/src/main/java/org/apache/kafka/common/serialization , https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/serialization/serdes.java
Comments
Post a Comment