Örnek

Kafka'da Topic'lerle İlgili AdminClient Örnekleri

AdminClient sınıfı Kafka için topic listesini alma, topic yaratma, silme gibi yönetim ile ilgili işlemler yapmaktadır. Aşağıdaki sınıfta topic listesi, topic yaratılması ve silinmesi gibi örnek kodlar bulunmaktadır :
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.errors.TopicExistsException;
public class AdminClientTest {
	//private static String bootstrapServers="localhost:9092";
	
	public void createTopic(final String topicName, final int partitions) {
		final short replicationFactor = 1;
		Map<String, Object> defaultClientConfig = new HashMap<String, Object>();
		defaultClientConfig.put("bootstrap.servers", bootstrapServers);
		// Create admin client
		try (final AdminClient adminClient = KafkaAdminClient.create(defaultClientConfig)) {
			try {
				// Define topic
				final NewTopic newTopic = new NewTopic(topicName, partitions, replicationFactor);
				// Create topic, which is async call.
				final CreateTopicsResult createTopicsResult = adminClient.createTopics(Collections.singleton(newTopic));
				// Since the call is Async, Lets wait for it to complete.
				createTopicsResult.values().get(topicName).get();
				
				System.out.println("created");
			} catch (InterruptedException | ExecutionException e) {
				if (!(e.getCause() instanceof TopicExistsException)) {
					throw new RuntimeException(e.getMessage(), e);
				}
				System.out.println("TopicExistsException - Swallow this exception, just means the topic already exists");
				
			}
		}
	}
	
	public void listTopic() {
		
		Map<String, Object> defaultClientConfig = new HashMap<String, Object>();
		defaultClientConfig.put("bootstrap.servers", bootstrapServers);
		// Create admin client
		try (final AdminClient adminClient = KafkaAdminClient.create(defaultClientConfig)) {
			try {
				
				ListTopicsResult listTopicsResult=adminClient.listTopics();
				
				Set<String> currentTopicList = listTopicsResult.names().get();
				
				for (String string : currentTopicList) {
					
					System.out.println(string);			
				}								
			} catch (InterruptedException | ExecutionException e) {
				e.printStackTrace();				
			}
		}
	}
	
	private void checkAndCreateTopic(final String topicNameTest) {
		
		Map<String, Object> defaultClientConfig = new HashMap<String, Object>();
		defaultClientConfig.put("bootstrap.servers", bootstrapServers);
		
		try (final AdminClient adminClient = KafkaAdminClient.create(defaultClientConfig)) {
			try {
				boolean topicExists = adminClient.listTopics().names().get().stream().anyMatch(
						topicName -> topicName.equalsIgnoreCase(topicNameTest));
				if(!topicExists) { // topic yok 
					
					System.out.println("topic yok");
					
					NewTopic newTopic = new NewTopic(topicNameTest, 1, (short)3); 
					CreateTopicsResult createTopicsResult = adminClient.createTopics(Collections.singleton(newTopic));
					createTopicsResult.values().get(topicNameTest).get();
					
				}else {
					
					System.out.println("topic var. Yaratilmadi");
					
				}
			} catch (InterruptedException | ExecutionException e) {
				e.printStackTrace();
			}	
		}
	
	}	
	public void deleteTopics() {
		
		Map<String, Object> defaultClientConfig = new HashMap<String, Object>();
		defaultClientConfig.put("bootstrap.servers", bootstrapServers);
		try (final AdminClient adminClient = KafkaAdminClient.create(defaultClientConfig)) {
			try {
				
				ListTopicsResult listTopicsResult=adminClient.listTopics();
				
				Set<String> currentTopicList = listTopicsResult.names().get();
				
				adminClient.deleteTopics(currentTopicList);
					
			} catch (Exception e) {
				e.printStackTrace();				
			}
		}
		
	}
	public void listConsumerGroups() {
			
		Map<String, Object> defaultClientConfig = new HashMap<String, Object>();
		defaultClientConfig.put("bootstrap.servers", bootstrapServers);
		try (final AdminClient adminClient = KafkaAdminClient.create(defaultClientConfig)) {
			
			List<ConsumerGroupListing> collectList;
			try {
				collectList = (((KafkaAdminClient) adminClient).listConsumerGroups()).all().get().stream()
						.filter(consumerGroupListing -> consumerGroupListing.groupId().startsWith("myconsumer_"))
						.collect(Collectors.toList());
				
				for (ConsumerGroupListing consumerGroupListing2 : collectList) {
					System.out.println(consumerGroupListing2.groupId());
				}
				
			} catch (InterruptedException | ExecutionException e) {
				e.printStackTrace();	
			}
		}
	}
	public void dumpConsumerTopicOffsets(String groupid) {
		
		Map<String, Object> defaultClientConfig = new HashMap<String, Object>();
		defaultClientConfig.put("bootstrap.servers", bootstrapServers);
		
		try (final AdminClient adminClient = KafkaAdminClient.create(defaultClientConfig)) {
			
			ListConsumerGroupOffsetsResult result = adminClient.listConsumerGroupOffsets(groupid);
			try {
				
				Map<TopicPartition, OffsetAndMetadata> offsetAndMetadata=result.partitionsToOffsetAndMetadata().get();
				
				for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsetAndMetadata.entrySet()) {
					
					System.out.println(entry.getKey().topic()+"."+entry.getKey().partition()+":"+
							entry.getValue().offset());
					
				}
				
			} catch (InterruptedException e) {
				e.printStackTrace();
			} catch (ExecutionException e) {
				e.printStackTrace();
			}		
		}
	}
	public void dumpTopicOffsets(String topic) {
		
		Properties propertiesConsumer = new Properties();
		propertiesConsumer.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
		propertiesConsumer.put(ConsumerConfig.GROUP_ID_CONFIG, "adminclientestclientgroupid");
		propertiesConsumer.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
		propertiesConsumer.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
		propertiesConsumer.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
		propertiesConsumer.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
		KafkaConsumer kafkaConsumer = new KafkaConsumer(propertiesConsumer);
		
		List<PartitionInfo> partitions = kafkaConsumer.partitionsFor(topic);
		
		for (PartitionInfo partitionInfo : partitions) {
			
			TopicPartition topicPartition = new TopicPartition(partitionInfo.topic(),
					partitionInfo.partition());
						
			Map<TopicPartition, Long> mapEndOffset = kafkaConsumer
					.endOffsets(Collections.singleton(topicPartition));
			
			long lastOffset = mapEndOffset.get(topicPartition);
			
			System.out.println(partitionInfo.topic()+"."+partitionInfo.partition()+" last offset : "+lastOffset);
			
		}
		
		kafkaConsumer.close();
	}
	public static void main(String[] args) {
		AdminClientTest adminClientTest=new AdminClientTest();
		
		//adminClientTest.createTopic("adminclient_test_topic", 1);
		//adminClientTest.listTopic();
		
		//adminClientTest.checkAndCreateTopic("adminclient_test_topic_check");
		//adminClientTest.listTopic();
		
		//adminClientTest.deleteTopics();
		//System.out.println("Topics deleted...");
		
		//adminClientTest.listTopic();
		//adminClientTest.listConsumerGroups();
		// adminClientTest.dumpConsumerTopicOffsets("test_client_group");
		adminClientTest.dumpTopicOffsets("mytopic");	
		
	}

}

AdminClient KafkaAdminClient.create(defaultClientConfig)) ile yaratılmaktadır.
createTopic() yöntemi ile topic yaratılmakta (varsayılan olarak dinamik topic yaratılamaz. Bunun için Kafka'da bir ayar yapılması gerekir).
listTopic() ile topic'ler listelenmektedir.
deleteTopics() ile topicsler silinmektedir.
checkAndCreateTopic() ile eğer topic yok ise yaratılmaktadır.
listConsumerGroups() yöntemi ise KAfka'ya bağlanan group id'si myconsumer_ ile başlayan tüm client group'ların bilgisini verir.
dumpConsumerTopicOffsets() ise bir clientgroup'un offset durumlarını basar. topic-partition:offset şeklinde. Bu şekilde bir consomer'un bir topic ve partitioan için hangi offset değelerlerinde kaldığı elde edilebilir.
dumpTopicOffsets() ise bir topic'in partitioan'larının son offset durumunu verir.
dumpConsumerTopicOffsets ile dumpTopicOffsets den gelen sonuç karşılaştırarak bir clientgroup'u için ne kadar mesajın beklediğini çıkarılabilirsiniz. topic offset - consumer topic offset o cunsumer için bekleyen mesaj adedini verir.
zafer.teker , 16.05.2020

Bu Sayfayı Paylaş:

Fibiler Üyelerinin Yorumları


Tüm üyeler içeriklere yorum ekleyerek katkıda bulunabilir : Yorum Gir

Misafir Yorumları




Bu Sayfayı Paylaş:

İletişim Bilgileri

Takip Et

Her Hakkı Saklıdır. Bu sitede yayınlanan tüm bilgi ve fikirlerin kullanımından fibiler.com sorumlu değildir. Bu sitede üretilmiş , derlenmiş içerikleri, fibiler.com'u kaynak göstermek koşuluyla kendi sitenizde kullanılabilirsiniz. Ancak telif hakkı olan içeriklerin hakları sahiplerine aittir