Veri

Kafka'da Bir Consumer İçin Auto Commit Özelliğin Kapatılması

Kafka'da Consumer'lar varsayılan olarak autocommit'tir. Yani poll() yöntemi ile bir partition'dan mesaj çektiğinizde offset otomatik olarak artırılır. Bunu kapatmak için aşağıdaki gibi bir property verilir:
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
Artık mesajı aldıktan sonra sizin commit etmeniz gerekir :
consumer.commitSync();
Eğer mesaj işlemede hata var ise offset'i artırmak istemiyorsanız :
consumer.seek(new TopicPartition(record.topic(), record.partition()), record.offset());
ile offset'i son mesajın offset değerine koyabilirsiniz. Bu şekilde başarısız işlemde mesaj kaybı oluşmaz.
Örnek kod aşağıdaki gibidir :
public class TestConsumer {
	
	public final static String TOPIC_NAME = "testtopic";
	
	public static void main(String[] args) {
		
		Properties properties = new Properties();
		properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
		properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test_consumer_group");
		properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
 StringDeserializer.class.getName());
		properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
	
		properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
		
		KafkaConsumer<String, String> consumer=new KafkaConsumer<String, String>(properties);
		
		consumer.subscribe(Collections.singletonList(TOPIC_NAME));
		
        try {
                        
            while (true) {
            	
				ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
				for (ConsumerRecord<String, String> record : records) {
											
					try {
						
						// islem basarili ise
						
						consumer.commitSync();	
					} catch (Exception e) {		
						// hata var offset'i geri koy
						
						consumer.seek(new TopicPartition(record.topic(), record.partition()), record.offset());
						break;
					}					   
				}
	
            }
        } finally {
            consumer.close();
        }		
	}
}
zafer.teker , 14.05.2020

Bu Sayfayı Paylaş:

Fibiler Üyelerinin Yorumları


Tüm üyeler içeriklere yorum ekleyerek katkıda bulunabilir : Yorum Gir

Misafir Yorumları




Bu Sayfayı Paylaş:

İletişim Bilgileri

Takip Et

Her Hakkı Saklıdır. Bu sitede yayınlanan tüm bilgi ve fikirlerin kullanımından fibiler.com sorumlu değildir. Bu sitede üretilmiş , derlenmiş içerikleri, fibiler.com'u kaynak göstermek koşuluyla kendi sitenizde kullanılabilirsiniz. Ancak telif hakkı olan içeriklerin hakları sahiplerine aittir