Örnek

Basit Bir Consumer Örneği

Aşağıdaki Kafka'da bir topic'e gelen mesajları alan consumer örneği var :
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
public class ConsumerTopic {
	
	public final static String TOPIC_NAME = "topic1";
	
	public static void main(String[] args) {
		
		Properties properties = new Properties();
		properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
		
		properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test_consumer_group");
		properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
		properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
		
		KafkaConsumer<String, String>  consumer=new KafkaConsumer<String, String>(properties);
		consumer.subscribe(Collections.singletonList(TOPIC_NAME));
		
        try {                       
            while (true) {            	
				ConsumerRecords<String, String> records = 
					consumer.poll(Duration.ofMillis(10000));
				
				if(records.count()>0) {
					for (ConsumerRecord<String, String> record : records) {											    
						System.out.println("text : "+record.value());					    
					}					
				}
            }            
        } finally {
            consumer.close();
        }	
	}
}
localhost:9092 Kafka sunucunun adresidir.
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"10.11.11.11:9092,10.11.11.12:9092,10.11.11.13:9092");
Client Group id olarak test_consumer_group veriyoruz. KafkaConsumer yaratıp topic1 topic'ine abone oluyoruz.
Sonsuz döngü açıyoruz consumer.poll() yöntemi ile mesajları sürekli çekiyoruz. Eğer mesaj yok ise 10 sn beklemesini söylüyoruz. Mesaj gelirse alt satıra geçiyoruz ve döngü açıp gelen mesajları ekrana bastırıyoruz.
Bu program hiç sonlanmaz. Dışarıdan kapatılması gerekir.
Consumer thread safe değildir. Dışarıdan kapatılması için aşağıdaki gibi yapılmalıdır :
 public class KafkaConsumerRunner implements Runnable {
     private final AtomicBoolean closed = new AtomicBoolean(false);
     private final KafkaConsumer consumer;
     public void run() {
         try {
             consumer.subscribe(Arrays.asList("topic"));
             while (!closed.get()) {
                 ConsumerRecords records = consumer.poll(10000);
                 // Handle new records
             }
         } catch (WakeupException e) {
             // Ignore exception if closing
             if (!closed.get()) throw e;
         } finally {
             consumer.close();
         }
     }
     // Shutdown hook which can be called from a separate thread
     public void shutdown() {
         closed.set(true);
         consumer.wakeup();
     }
 }

Bu konuda açıklama için : kafka.apache.org/10/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#multithreaded
zafer.teker , 14.05.2020

Bu Sayfayı Paylaş:

Fibiler Üyelerinin Yorumları


Tüm üyeler içeriklere yorum ekleyerek katkıda bulunabilir : Yorum Gir

Misafir Yorumları




Bu Sayfayı Paylaş:

İletişim Bilgileri

Takip Et

Her Hakkı Saklıdır. Bu sitede yayınlanan tüm bilgi ve fikirlerin kullanımından fibiler.com sorumlu değildir. Bu sitede üretilmiş , derlenmiş içerikleri, fibiler.com'u kaynak göstermek koşuluyla kendi sitenizde kullanılabilirsiniz. Ancak telif hakkı olan içeriklerin hakları sahiplerine aittir