Kafka producer'da key ve value için bir Serializer ister. Eğer bir nesneyi json tipinde saklamak istiyorsanız bunun için bir Serializer vermeniz gerekir. Aşağıdak gibi bir Serializer kullanılabilir :
import org.apache.kafka.common.serialization.Serializer;
import com.fasterxml.jackson.databind.ObjectMapper;
public class MyKafkaJsonSerializer implements Serializer<MyObject>{
@Override
public byte[] serialize(String topic, MyObject data) {
byte[] byteData = null;
ObjectMapper objectMapper = new ObjectMapper();
try {
byteData = objectMapper.writeValueAsBytes(data);
} catch (Exception e) {
e.printStackTrace();
}
return byteData;
}
}
Bu sınıf aşağıdaki gibi kullanılabilir :
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, MyKafkaJsonSerializer.class.getName());
Object'yi Json'a çevirmek için jackson kütüphanesi kullanılmıştır. MyObject verilen herhangi tipte bir nesnedir. Bu nesne jackson kütüphanesi ile Json'a çevrilecektir.
Alırken ise tersinin yapılması gerekir. Bunun için Deserializer kullanılır. Aşağıdaki gibi bir sınıf yapılabiliriz :
import org.apache.kafka.common.serialization.Deserializer;
import com.fasterxml.jackson.databind.ObjectMapper;
public class MyKafkaJsonDeserializer implements Deserializer<MyObject>{
@Override
public MyObject deserialize(String topic, byte[] data) {
ObjectMapper mapper = new ObjectMapper();
MyObjectobj = null;
try {
obj = mapper.readValue(data, MyObject.class);
} catch (Exception e) {
e.printStackTrace();
}
return obj;
}
}
Burada kafkadab gelen json formatındaki veri MyObject sınıfına çevrilmektedir. Consumer'a aşağıdaki gibi verilmelidir :
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MyKafkaJsonDeserializer.class.getName());
Bu şekilde gelen mesaj MyObject tipinde olacaktır :
while (true) {
ConsumerRecords<String, MyObject> records = consumer.poll(Duration.ofMillis(1000));
if(records.count()>0) {
for (ConsumerRecord<String, MyObject> record : records) {
System.out.println(record.topic() + "-" + record.key() +" : "+ record.value().getInfo());
}
}
}
Kafka'da Stream bir topic'ten aldığı mesajları başka topiclere yazmak için kullanılır. Eğer sizin yarattığınız bir object'i alıp diğer topiclere yazmasını istiyorsanız bunun için Serde tipinde bir sınıf yaratılmalı, serializer ve deserializer içerisinde belirtilmeli :
import java.util.Map;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
public class MyObjectSerde implements Serde<MyObject> {
private KafkaJsonSerializer serializer = new KafkaJsonSerializer();
private KafkaJsonDeserializer deserializer = new KafkaJsonDeserializer();
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
serializer.configure(configs, isKey);
deserializer.configure(configs, isKey);
}
@Override
public void close() {
serializer.close();
deserializer.close();
}
@Override
public Serializer<MyObject> serializer() {
return serializer;
}
@Override
public Deserializer<MyObject> deserializer() {
return deserializer;
}
}
Bu şekilde yaratılan MyObjectSerde sınıfı stream'e aşağıdaki gibi verilebilir :
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
MyObjectSerde.class.getName());