Bu Sayfayı Paylaş:

Kavram

Apache Kafka

Tanım: Uygulamalar arasında veri akışını için kullanılan bir streaming platformu. Bir producer ürettiği mesajı Kafka'ya ekler ve bir veya daha fazla Consumer mesajları buradan sıralı bir şekilde alır. Rabbit MQ, Active MQ gibi ürünlere mesajlaşma amacıyla da kullanılabilir.

Kavram

Kafka Java Client

Tanım: Kafka'ya erişmek için kullanılan bir Java kütüphanesi. Producer, consumer, stream ve adminclient yaratılabilir.

Örnek

Basit Bir Producer Örneği

Aşağıdaki Kafka'da bir topic'e üç mesaj ekliyoruz :
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serdes;
import com.thy.mercury.medusa.typeb.ValidationException;
public class ProcuderApp {
	public final static String TOPIC_NAME = "test-topic";
	
	public static void main(String[] args) throws ValidationException {
		Properties properties = new Properties();
		properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
		properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, Serdes.String().serializer().getClass().getName());
		properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, Serdes.String().serializer().getClass().getName());
		KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
		produceMessage(producer, "TEST");
		produceMessage(producer, "TEST 2");
		produceMessage(producer, "Test 3");
		
		producer.close();
	}
	private static void produceMessage(KafkaProducer<String, String> producer, 
String message) {
		ProducerRecord<String, String> record =
 new ProducerRecord<>(TOPIC_NAME, "key1",	message);
		producer.send(record);
		
		System.out.println(message + " sent");
	}
}
localhost:9092 Kafka sunucunun adresidir.
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"10.11.11.11:6667,10.11.11.12:6667,10.11.11.13:6667");

şeklinde birden fazla node var ise verilebilir.
Key ve value olarak string kullanılacağını belirtiyoruz. KafkaProducer yaratıp yönteme veriyor ve o yöntem de mesajları insert ediyor. En son producer'ı kapatıyoruz.

Örnek

Basit Bir Consumer Örneği

Aşağıdaki Kafka'da bir topic'e gelen mesajları alan consumer örneği var :
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
public class ConsumerTopic {
	
	public final static String TOPIC_NAME = "topic1";
	
	public static void main(String[] args) {
		
		Properties properties = new Properties();
		properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
		
		properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test_consumer_group");
		properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
		properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
		
		KafkaConsumer<String, String>  consumer=new KafkaConsumer<String, String>(properties);
		consumer.subscribe(Collections.singletonList(TOPIC_NAME));
		
        try {                       
            while (true) {            	
				ConsumerRecords<String, String> records = 
					consumer.poll(Duration.ofMillis(10000));
				
				if(records.count()>0) {
					for (ConsumerRecord<String, String> record : records) {											    
						System.out.println("text : "+record.value());					    
					}					
				}
            }            
        } finally {
            consumer.close();
        }	
	}
}
localhost:9092 Kafka sunucunun adresidir.
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"10.11.11.11:9092,10.11.11.12:9092,10.11.11.13:9092");
Client Group id olarak test_consumer_group veriyoruz. KafkaConsumer yaratıp topic1 topic'ine abone oluyoruz.
Sonsuz döngü açıyoruz consumer.poll() yöntemi ile mesajları sürekli çekiyoruz. Eğer mesaj yok ise 10 sn beklemesini söylüyoruz. Mesaj gelirse alt satıra geçiyoruz ve döngü açıp gelen mesajları ekrana bastırıyoruz.
Bu program hiç sonlanmaz. Dışarıdan kapatılması gerekir.
Consumer thread safe değildir. Dışarıdan kapatılması için aşağıdaki gibi yapılmalıdır :
 public class KafkaConsumerRunner implements Runnable {
     private final AtomicBoolean closed = new AtomicBoolean(false);
     private final KafkaConsumer consumer;
     public void run() {
         try {
             consumer.subscribe(Arrays.asList("topic"));
             while (!closed.get()) {
                 ConsumerRecords records = consumer.poll(10000);
                 // Handle new records
             }
         } catch (WakeupException e) {
             // Ignore exception if closing
             if (!closed.get()) throw e;
         } finally {
             consumer.close();
         }
     }
     // Shutdown hook which can be called from a separate thread
     public void shutdown() {
         closed.set(true);
         consumer.wakeup();
     }
 }

Bu konuda açıklama için : kafka.apache.org/10/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#multithreaded

Örnek

Bir Topic'e Eklenen Mesajlardaki Kelimelerin Kaç Kere Geçtiğini Sayan Stream Örneği

Bu örnekte bir topic'e mesajlar ekleniyor. Stream ise eklenen mesajları alıyor kelimelerin kaç kere geçtiğini buluyor. Ve son durumu saklıyor ve yeni mesajlar geldikçe kelime ve kaç kere geçtiği listesi güncelleniyor.
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.state.KeyValueStore;
public class StreamApp {
	public static void main(String[] args) {
		Properties props = new Properties();
		props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
		props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
		props.put(StreamsConfig.STATE_DIR_CONFIG, "C:\\Users\\testuser\\AppData\\Local\\Temp");
		props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
		props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
		StreamsBuilder builder = new StreamsBuilder();
		KStream<String, String> textLines = builder.stream("stream-test-topic1",
				Consumed.with(Serdes.String(), Serdes.String()));
		KTable<String, Long> wordCounts = textLines.flatMapValues(new ValueMapper<String, Iterable<String>>() {
			@Override
			public Iterable<String> apply(String textLine) {
				return Arrays.asList(textLine.toLowerCase().split("\\W+"));
			}
		}).groupBy(new KeyValueMapper<String, String, String>() {
			@Override
			public String apply(String key, String word) {
				return word;
			}
		}).count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"));
		wordCounts.toStream().to("stream-test-word-count-topic", Produced.with(Serdes.String(), Serdes.Long()));
		KafkaStreams streams = new KafkaStreams(builder.build(), props);
		streams.start();
		
		Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
	}
}
Bu stream state tutmasından dolayı Kafka'ya bu stream için state'lerin nerede saklaması gerektiği veriliyor :
props.put(StreamsConfig.STATE_DIR_CONFIG, "C:\\Users\\testuser\\AppData\\Local\\Temp");
builder.stream() ile yeni bir stream yaratılıyor. stream-test-topic1 ile stream'de okunacak topic'in adı veriliyor. Stream sürekli bu topic'e eklenen mesajları okuyacakır.
flatMapValues() yöntemi ile .split("\\W+") kullanılarak mesajlar kelimelerine ayrılır. groupBy() yöntemi ile aynı kelimeler gruplanır ve count() yöntemi ile her grubun sayısı alınır.
KTable wordCounts ile sonuçlar KTable formatında tutulurlar.
wordCounts.toStream().to("stream-test-word-count-topic", Produced.with(Serdes.String(), Serdes.Long()));
ile üretilen sonuç stream-test-word-count-topic adlı topic'e yazılır.
Bir uygulama yazıp stream-test-topic1 topic'e mesaj eklerseniz stream word - count şeklinde sayacak ve onu stream-test-word-count-topic adlı topic'e ekleyecektir. Başka bir uygulama yapıp bu topic'den consume edilirse kelime-sayı şeklinde bilgileri alabilirsiniz.
Aşağıdaki gibi consume edilip basılabilir :
while (true) {
            	
	ConsumerRecords<String, Long> records = consumer.poll(Duration.ofMillis(1000));
	
	if(records.count()>0) {
		for (ConsumerRecord<String, Long> record : records) {						
			System.out.println(record.key() +" : "+  record.value());
		}
	}
}

Bu şekilde consume edildiğinde şu ana kadar her kelimenin toplam kaç kere geçtiği ekranda görülecektir.

Örnek

Kafka'da Json Şeklinde Mesaj Saklamak İçin Kullanılan Serileştirme Örneği

Kafka producer'da key ve value için bir Serializer ister. Eğer bir nesneyi json tipinde saklamak istiyorsanız bunun için bir Serializer vermeniz gerekir. Aşağıdak gibi bir Serializer kullanılabilir :
import org.apache.kafka.common.serialization.Serializer;
import com.fasterxml.jackson.databind.ObjectMapper;
public class MyKafkaJsonSerializer implements Serializer<MyObject>{
	@Override
	public byte[] serialize(String topic, MyObject data) {
		
		byte[] byteData = null;
        ObjectMapper objectMapper = new ObjectMapper();
        try {
            byteData = objectMapper.writeValueAsBytes(data);
        } catch (Exception e) {
            e.printStackTrace(); 
        }
        return byteData;
	}
}
Bu sınıf aşağıdaki gibi kullanılabilir :
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, MyKafkaJsonSerializer.class.getName());
Object'yi Json'a çevirmek için jackson kütüphanesi kullanılmıştır. MyObject verilen herhangi tipte bir nesnedir. Bu nesne jackson kütüphanesi ile Json'a çevrilecektir.
Alırken ise tersinin yapılması gerekir. Bunun için Deserializer kullanılır. Aşağıdaki gibi bir sınıf yapılabiliriz :
import org.apache.kafka.common.serialization.Deserializer;
import com.fasterxml.jackson.databind.ObjectMapper;
public class MyKafkaJsonDeserializer implements Deserializer<MyObject>{
	@Override
	public MyObject deserialize(String topic, byte[] data) {
		
		ObjectMapper mapper = new ObjectMapper();
		MyObjectobj = null;
        try {
            obj = mapper.readValue(data, MyObject.class);
        } catch (Exception e) {
        	e.printStackTrace(); 
        }
        return obj;
	}
}
Burada kafkadab gelen json formatındaki veri MyObject sınıfına çevrilmektedir. Consumer'a aşağıdaki gibi verilmelidir :
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MyKafkaJsonDeserializer.class.getName());
Bu şekilde gelen mesaj MyObject tipinde olacaktır :
while (true) {
	ConsumerRecords<String, MyObject> records = consumer.poll(Duration.ofMillis(1000));
	if(records.count()>0) {
		for (ConsumerRecord<String, MyObject> record : records) {								
			System.out.println(record.topic() + "-" + record.key() +" : "+  record.value().getInfo());
		}
	}
}
Kafka'da Stream bir topic'ten aldığı mesajları başka topiclere yazmak için kullanılır. Eğer sizin yarattığınız bir object'i alıp diğer topiclere yazmasını istiyorsanız bunun için Serde tipinde bir sınıf yaratılmalı, serializer ve deserializer içerisinde belirtilmeli :
import java.util.Map;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
public class MyObjectSerde implements Serde<MyObject> {
    private KafkaJsonSerializer serializer = new KafkaJsonSerializer();
    private KafkaJsonDeserializer deserializer = new KafkaJsonDeserializer();
    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        serializer.configure(configs, isKey);
        deserializer.configure(configs, isKey);
    }
    @Override
    public void close() {
        serializer.close();
        deserializer.close();
    }
    @Override
    public Serializer<MyObject> serializer() {
        return serializer;
    }
    @Override
    public Deserializer<MyObject> deserializer() {
        return deserializer;
    }
}
Bu şekilde yaratılan MyObjectSerde sınıfı stream'e aşağıdaki gibi verilebilir :
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
MyObjectSerde.class.getName());

Veri

Kafka'da Bir Consumer İçin Auto Commit Özelliğin Kapatılması

Kafka'da Consumer'lar varsayılan olarak autocommit'tir. Yani poll() yöntemi ile bir partition'dan mesaj çektiğinizde offset otomatik olarak artırılır. Bunu kapatmak için aşağıdaki gibi bir property verilir:
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
Artık mesajı aldıktan sonra sizin commit etmeniz gerekir :
consumer.commitSync();
Eğer mesaj işlemede hata var ise offset'i artırmak istemiyorsanız :
consumer.seek(new TopicPartition(record.topic(), record.partition()), record.offset());
ile offset'i son mesajın offset değerine koyabilirsiniz. Bu şekilde başarısız işlemde mesaj kaybı oluşmaz.
Örnek kod aşağıdaki gibidir :
public class TestConsumer {
	
	public final static String TOPIC_NAME = "testtopic";
	
	public static void main(String[] args) {
		
		Properties properties = new Properties();
		properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
		properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test_consumer_group");
		properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
 StringDeserializer.class.getName());
		properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
	
		properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
		
		KafkaConsumer<String, String> consumer=new KafkaConsumer<String, String>(properties);
		
		consumer.subscribe(Collections.singletonList(TOPIC_NAME));
		
        try {
                        
            while (true) {
            	
				ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
				for (ConsumerRecord<String, String> record : records) {
											
					try {
						
						// islem basarili ise
						
						consumer.commitSync();	
					} catch (Exception e) {		
						// hata var offset'i geri koy
						
						consumer.seek(new TopicPartition(record.topic(), record.partition()), record.offset());
						break;
					}					   
				}
	
            }
        } finally {
            consumer.close();
        }		
	}
}

Örnek

Kafka'da Topic'lerle İlgili AdminClient Örnekleri

AdminClient sınıfı Kafka için topic listesini alma, topic yaratma, silme gibi yönetim ile ilgili işlemler yapmaktadır. Aşağıdaki sınıfta topic listesi, topic yaratılması ve silinmesi gibi örnek kodlar bulunmaktadır :
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.errors.TopicExistsException;
public class AdminClientTest {
	//private static String bootstrapServers="localhost:9092";
	
	public void createTopic(final String topicName, final int partitions) {
		final short replicationFactor = 1;
		Map<String, Object> defaultClientConfig = new HashMap<String, Object>();
		defaultClientConfig.put("bootstrap.servers", bootstrapServers);
		// Create admin client
		try (final AdminClient adminClient = KafkaAdminClient.create(defaultClientConfig)) {
			try {
				// Define topic
				final NewTopic newTopic = new NewTopic(topicName, partitions, replicationFactor);
				// Create topic, which is async call.
				final CreateTopicsResult createTopicsResult = adminClient.createTopics(Collections.singleton(newTopic));
				// Since the call is Async, Lets wait for it to complete.
				createTopicsResult.values().get(topicName).get();
				
				System.out.println("created");
			} catch (InterruptedException | ExecutionException e) {
				if (!(e.getCause() instanceof TopicExistsException)) {
					throw new RuntimeException(e.getMessage(), e);
				}
				System.out.println("TopicExistsException - Swallow this exception, just means the topic already exists");
				
			}
		}
	}
	
	public void listTopic() {
		
		Map<String, Object> defaultClientConfig = new HashMap<String, Object>();
		defaultClientConfig.put("bootstrap.servers", bootstrapServers);
		// Create admin client
		try (final AdminClient adminClient = KafkaAdminClient.create(defaultClientConfig)) {
			try {
				
				ListTopicsResult listTopicsResult=adminClient.listTopics();
				
				Set<String> currentTopicList = listTopicsResult.names().get();
				
				for (String string : currentTopicList) {
					
					System.out.println(string);			
				}								
			} catch (InterruptedException | ExecutionException e) {
				e.printStackTrace();				
			}
		}
	}
	
	private void checkAndCreateTopic(final String topicNameTest) {
		
		Map<String, Object> defaultClientConfig = new HashMap<String, Object>();
		defaultClientConfig.put("bootstrap.servers", bootstrapServers);
		
		try (final AdminClient adminClient = KafkaAdminClient.create(defaultClientConfig)) {
			try {
				boolean topicExists = adminClient.listTopics().names().get().stream().anyMatch(
						topicName -> topicName.equalsIgnoreCase(topicNameTest));
				if(!topicExists) { // topic yok 
					
					System.out.println("topic yok");
					
					NewTopic newTopic = new NewTopic(topicNameTest, 1, (short)3); 
					CreateTopicsResult createTopicsResult = adminClient.createTopics(Collections.singleton(newTopic));
					createTopicsResult.values().get(topicNameTest).get();
					
				}else {
					
					System.out.println("topic var. Yaratilmadi");
					
				}
			} catch (InterruptedException | ExecutionException e) {
				e.printStackTrace();
			}	
		}
	
	}	
	public void deleteTopics() {
		
		Map<String, Object> defaultClientConfig = new HashMap<String, Object>();
		defaultClientConfig.put("bootstrap.servers", bootstrapServers);
		try (final AdminClient adminClient = KafkaAdminClient.create(defaultClientConfig)) {
			try {
				
				ListTopicsResult listTopicsResult=adminClient.listTopics();
				
				Set<String> currentTopicList = listTopicsResult.names().get();
				
				adminClient.deleteTopics(currentTopicList);
					
			} catch (Exception e) {
				e.printStackTrace();				
			}
		}
		
	}
	public void listConsumerGroups() {
			
		Map<String, Object> defaultClientConfig = new HashMap<String, Object>();
		defaultClientConfig.put("bootstrap.servers", bootstrapServers);
		try (final AdminClient adminClient = KafkaAdminClient.create(defaultClientConfig)) {
			
			List<ConsumerGroupListing> collectList;
			try {
				collectList = (((KafkaAdminClient) adminClient).listConsumerGroups()).all().get().stream()
						.filter(consumerGroupListing -> consumerGroupListing.groupId().startsWith("myconsumer_"))
						.collect(Collectors.toList());
				
				for (ConsumerGroupListing consumerGroupListing2 : collectList) {
					System.out.println(consumerGroupListing2.groupId());
				}
				
			} catch (InterruptedException | ExecutionException e) {
				e.printStackTrace();	
			}
		}
	}
	public void dumpConsumerTopicOffsets(String groupid) {
		
		Map<String, Object> defaultClientConfig = new HashMap<String, Object>();
		defaultClientConfig.put("bootstrap.servers", bootstrapServers);
		
		try (final AdminClient adminClient = KafkaAdminClient.create(defaultClientConfig)) {
			
			ListConsumerGroupOffsetsResult result = adminClient.listConsumerGroupOffsets(groupid);
			try {
				
				Map<TopicPartition, OffsetAndMetadata> offsetAndMetadata=result.partitionsToOffsetAndMetadata().get();
				
				for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsetAndMetadata.entrySet()) {
					
					System.out.println(entry.getKey().topic()+"."+entry.getKey().partition()+":"+
							entry.getValue().offset());
					
				}
				
			} catch (InterruptedException e) {
				e.printStackTrace();
			} catch (ExecutionException e) {
				e.printStackTrace();
			}		
		}
	}
	public void dumpTopicOffsets(String topic) {
		
		Properties propertiesConsumer = new Properties();
		propertiesConsumer.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
		propertiesConsumer.put(ConsumerConfig.GROUP_ID_CONFIG, "adminclientestclientgroupid");
		propertiesConsumer.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
		propertiesConsumer.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
		propertiesConsumer.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
		propertiesConsumer.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
		KafkaConsumer kafkaConsumer = new KafkaConsumer(propertiesConsumer);
		
		List<PartitionInfo> partitions = kafkaConsumer.partitionsFor(topic);
		
		for (PartitionInfo partitionInfo : partitions) {
			
			TopicPartition topicPartition = new TopicPartition(partitionInfo.topic(),
					partitionInfo.partition());
						
			Map<TopicPartition, Long> mapEndOffset = kafkaConsumer
					.endOffsets(Collections.singleton(topicPartition));
			
			long lastOffset = mapEndOffset.get(topicPartition);
			
			System.out.println(partitionInfo.topic()+"."+partitionInfo.partition()+" last offset : "+lastOffset);
			
		}
		
		kafkaConsumer.close();
	}
	public static void main(String[] args) {
		AdminClientTest adminClientTest=new AdminClientTest();
		
		//adminClientTest.createTopic("adminclient_test_topic", 1);
		//adminClientTest.listTopic();
		
		//adminClientTest.checkAndCreateTopic("adminclient_test_topic_check");
		//adminClientTest.listTopic();
		
		//adminClientTest.deleteTopics();
		//System.out.println("Topics deleted...");
		
		//adminClientTest.listTopic();
		//adminClientTest.listConsumerGroups();
		// adminClientTest.dumpConsumerTopicOffsets("test_client_group");
		adminClientTest.dumpTopicOffsets("mytopic");	
		
	}

}

AdminClient KafkaAdminClient.create(defaultClientConfig)) ile yaratılmaktadır.
createTopic() yöntemi ile topic yaratılmakta (varsayılan olarak dinamik topic yaratılamaz. Bunun için Kafka'da bir ayar yapılması gerekir).
listTopic() ile topic'ler listelenmektedir.
deleteTopics() ile topicsler silinmektedir.
checkAndCreateTopic() ile eğer topic yok ise yaratılmaktadır.
listConsumerGroups() yöntemi ise KAfka'ya bağlanan group id'si myconsumer_ ile başlayan tüm client group'ların bilgisini verir.
dumpConsumerTopicOffsets() ise bir clientgroup'un offset durumlarını basar. topic-partition:offset şeklinde. Bu şekilde bir consomer'un bir topic ve partitioan için hangi offset değelerlerinde kaldığı elde edilebilir.
dumpTopicOffsets() ise bir topic'in partitioan'larının son offset durumunu verir.
dumpConsumerTopicOffsets ile dumpTopicOffsets den gelen sonuç karşılaştırarak bir clientgroup'u için ne kadar mesajın beklediğini çıkarılabilirsiniz. topic offset - consumer topic offset o cunsumer için bekleyen mesaj adedini verir.



Bu Sayfayı Paylaş:

İletişim Bilgileri

Takip Et

Her Hakkı Saklıdır. Bu sitede yayınlanan tüm bilgi ve fikirlerin kullanımından fibiler.com sorumlu değildir. Bu sitede üretilmiş , derlenmiş içerikleri, fibiler.com'u kaynak göstermek koşuluyla kendi sitenizde kullanılabilirsiniz. Ancak telif hakkı olan içeriklerin hakları sahiplerine aittir