Örnek

Kafka'da Json Şeklinde Mesaj Saklamak İçin Kullanılan Serileştirme Örneği

Kafka producer'da key ve value için bir Serializer ister. Eğer bir nesneyi json tipinde saklamak istiyorsanız bunun için bir Serializer vermeniz gerekir. Aşağıdak gibi bir Serializer kullanılabilir :
import org.apache.kafka.common.serialization.Serializer;
import com.fasterxml.jackson.databind.ObjectMapper;
public class MyKafkaJsonSerializer implements Serializer<MyObject>{
	@Override
	public byte[] serialize(String topic, MyObject data) {
		
		byte[] byteData = null;
        ObjectMapper objectMapper = new ObjectMapper();
        try {
            byteData = objectMapper.writeValueAsBytes(data);
        } catch (Exception e) {
            e.printStackTrace(); 
        }
        return byteData;
	}
}
Bu sınıf aşağıdaki gibi kullanılabilir :
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, MyKafkaJsonSerializer.class.getName());
Object'yi Json'a çevirmek için jackson kütüphanesi kullanılmıştır. MyObject verilen herhangi tipte bir nesnedir. Bu nesne jackson kütüphanesi ile Json'a çevrilecektir.
Alırken ise tersinin yapılması gerekir. Bunun için Deserializer kullanılır. Aşağıdaki gibi bir sınıf yapılabiliriz :
import org.apache.kafka.common.serialization.Deserializer;
import com.fasterxml.jackson.databind.ObjectMapper;
public class MyKafkaJsonDeserializer implements Deserializer<MyObject>{
	@Override
	public MyObject deserialize(String topic, byte[] data) {
		
		ObjectMapper mapper = new ObjectMapper();
		MyObjectobj = null;
        try {
            obj = mapper.readValue(data, MyObject.class);
        } catch (Exception e) {
        	e.printStackTrace(); 
        }
        return obj;
	}
}
Burada kafkadab gelen json formatındaki veri MyObject sınıfına çevrilmektedir. Consumer'a aşağıdaki gibi verilmelidir :
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MyKafkaJsonDeserializer.class.getName());
Bu şekilde gelen mesaj MyObject tipinde olacaktır :
while (true) {
	ConsumerRecords<String, MyObject> records = consumer.poll(Duration.ofMillis(1000));
	if(records.count()>0) {
		for (ConsumerRecord<String, MyObject> record : records) {								
			System.out.println(record.topic() + "-" + record.key() +" : "+  record.value().getInfo());
		}
	}
}
Kafka'da Stream bir topic'ten aldığı mesajları başka topiclere yazmak için kullanılır. Eğer sizin yarattığınız bir object'i alıp diğer topiclere yazmasını istiyorsanız bunun için Serde tipinde bir sınıf yaratılmalı, serializer ve deserializer içerisinde belirtilmeli :
import java.util.Map;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
public class MyObjectSerde implements Serde<MyObject> {
    private KafkaJsonSerializer serializer = new KafkaJsonSerializer();
    private KafkaJsonDeserializer deserializer = new KafkaJsonDeserializer();
    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        serializer.configure(configs, isKey);
        deserializer.configure(configs, isKey);
    }
    @Override
    public void close() {
        serializer.close();
        deserializer.close();
    }
    @Override
    public Serializer<MyObject> serializer() {
        return serializer;
    }
    @Override
    public Deserializer<MyObject> deserializer() {
        return deserializer;
    }
}
Bu şekilde yaratılan MyObjectSerde sınıfı stream'e aşağıdaki gibi verilebilir :
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
MyObjectSerde.class.getName());
zafer.teker , 14.05.2020

Bu Sayfayı Paylaş:

Fibiler Üyelerinin Yorumları


Tüm üyeler içeriklere yorum ekleyerek katkıda bulunabilir : Yorum Gir

Misafir Yorumları




Bu Sayfayı Paylaş:

İletişim Bilgileri

Takip Et

Her Hakkı Saklıdır. Bu sitede yayınlanan tüm bilgi ve fikirlerin kullanımından fibiler.com sorumlu değildir. Bu sitede üretilmiş , derlenmiş içerikleri, fibiler.com'u kaynak göstermek koşuluyla kendi sitenizde kullanılabilirsiniz. Ancak telif hakkı olan içeriklerin hakları sahiplerine aittir