Kafka'da Consumer'lar varsayılan olarak autocommit'tir. Yani poll() yöntemi ile bir partition'dan mesaj çektiğinizde offset otomatik olarak artırılır. Bunu kapatmak için aşağıdaki gibi bir property verilir:
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
Artık mesajı aldıktan sonra sizin commit etmeniz gerekir :
consumer.commitSync();
Eğer mesaj işlemede hata var ise offset'i artırmak istemiyorsanız :
consumer.seek(new TopicPartition(record.topic(), record.partition()), record.offset());
ile offset'i son mesajın offset değerine koyabilirsiniz. Bu şekilde başarısız işlemde mesaj kaybı oluşmaz.
Örnek kod aşağıdaki gibidir :
public class TestConsumer {
public final static String TOPIC_NAME = "testtopic";
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test_consumer_group");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
KafkaConsumer<String, String> consumer=new KafkaConsumer<String, String>(properties);
consumer.subscribe(Collections.singletonList(TOPIC_NAME));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
try {
// islem basarili ise
consumer.commitSync();
} catch (Exception e) {
// hata var offset'i geri koy
consumer.seek(new TopicPartition(record.topic(), record.partition()), record.offset());
break;
}
}
}
} finally {
consumer.close();
}
}
}