import java.util.Properties; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.Serdes; import com.thy.mercury.medusa.typeb.ValidationException; public class ProcuderApp { public final static String TOPIC_NAME = "test-topic"; public static void main(String[] args) throws ValidationException { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, Serdes.String().serializer().getClass().getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, Serdes.String().serializer().getClass().getName()); KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties); produceMessage(producer, "TEST"); produceMessage(producer, "TEST 2"); produceMessage(producer, "Test 3"); producer.close(); } private static void produceMessage(KafkaProducer<String, String> producer, String message) { ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "key1", message); producer.send(record); System.out.println(message + " sent"); } }localhost:9092 Kafka sunucunun adresidir.
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.11.11.11:6667,10.11.11.12:6667,10.11.11.13:6667");
import java.time.Duration; import java.util.Collections; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; public class ConsumerTopic { public final static String TOPIC_NAME = "topic1"; public static void main(String[] args) { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test_consumer_group"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); KafkaConsumer<String, String> consumer=new KafkaConsumer<String, String>(properties); consumer.subscribe(Collections.singletonList(TOPIC_NAME)); try { while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000)); if(records.count()>0) { for (ConsumerRecord<String, String> record : records) { System.out.println("text : "+record.value()); } } } } finally { consumer.close(); } } }localhost:9092 Kafka sunucunun adresidir.
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.11.11.11:9092,10.11.11.12:9092,10.11.11.13:9092");Client Group id olarak test_consumer_group veriyoruz. KafkaConsumer yaratıp topic1 topic'ine abone oluyoruz.
public class KafkaConsumerRunner implements Runnable { private final AtomicBoolean closed = new AtomicBoolean(false); private final KafkaConsumer consumer; public void run() { try { consumer.subscribe(Arrays.asList("topic")); while (!closed.get()) { ConsumerRecords records = consumer.poll(10000); // Handle new records } } catch (WakeupException e) { // Ignore exception if closing if (!closed.get()) throw e; } finally { consumer.close(); } } // Shutdown hook which can be called from a separate thread public void shutdown() { closed.set(true); consumer.wakeup(); } }
import java.util.Arrays; import java.util.Properties; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.kstream.ValueMapper; import org.apache.kafka.streams.state.KeyValueStore; public class StreamApp { public static void main(String[] args) { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.STATE_DIR_CONFIG, "C:\\Users\\testuser\\AppData\\Local\\Temp"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> textLines = builder.stream("stream-test-topic1", Consumed.with(Serdes.String(), Serdes.String())); KTable<String, Long> wordCounts = textLines.flatMapValues(new ValueMapper<String, Iterable<String>>() { @Override public Iterable<String> apply(String textLine) { return Arrays.asList(textLine.toLowerCase().split("\\W+")); } }).groupBy(new KeyValueMapper<String, String, String>() { @Override public String apply(String key, String word) { return word; } }).count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store")); wordCounts.toStream().to("stream-test-word-count-topic", Produced.with(Serdes.String(), Serdes.Long())); KafkaStreams streams = new KafkaStreams(builder.build(), props); streams.start(); Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); } }Bu stream state tutmasından dolayı Kafka'ya bu stream için state'lerin nerede saklaması gerektiği veriliyor :
props.put(StreamsConfig.STATE_DIR_CONFIG, "C:\\Users\\testuser\\AppData\\Local\\Temp");builder.stream() ile yeni bir stream yaratılıyor. stream-test-topic1 ile stream'de okunacak topic'in adı veriliyor. Stream sürekli bu topic'e eklenen mesajları okuyacakır.
wordCounts.toStream().to("stream-test-word-count-topic", Produced.with(Serdes.String(), Serdes.Long()));ile üretilen sonuç stream-test-word-count-topic adlı topic'e yazılır.
while (true) { ConsumerRecords<String, Long> records = consumer.poll(Duration.ofMillis(1000)); if(records.count()>0) { for (ConsumerRecord<String, Long> record : records) { System.out.println(record.key() +" : "+ record.value()); } } }
import org.apache.kafka.common.serialization.Serializer; import com.fasterxml.jackson.databind.ObjectMapper; public class MyKafkaJsonSerializer implements Serializer<MyObject>{ @Override public byte[] serialize(String topic, MyObject data) { byte[] byteData = null; ObjectMapper objectMapper = new ObjectMapper(); try { byteData = objectMapper.writeValueAsBytes(data); } catch (Exception e) { e.printStackTrace(); } return byteData; } }Bu sınıf aşağıdaki gibi kullanılabilir :
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, MyKafkaJsonSerializer.class.getName());Object'yi Json'a çevirmek için jackson kütüphanesi kullanılmıştır. MyObject verilen herhangi tipte bir nesnedir. Bu nesne jackson kütüphanesi ile Json'a çevrilecektir.
import org.apache.kafka.common.serialization.Deserializer; import com.fasterxml.jackson.databind.ObjectMapper; public class MyKafkaJsonDeserializer implements Deserializer<MyObject>{ @Override public MyObject deserialize(String topic, byte[] data) { ObjectMapper mapper = new ObjectMapper(); MyObjectobj = null; try { obj = mapper.readValue(data, MyObject.class); } catch (Exception e) { e.printStackTrace(); } return obj; } }Burada kafkadab gelen json formatındaki veri MyObject sınıfına çevrilmektedir. Consumer'a aşağıdaki gibi verilmelidir :
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MyKafkaJsonDeserializer.class.getName());Bu şekilde gelen mesaj MyObject tipinde olacaktır :
while (true) { ConsumerRecords<String, MyObject> records = consumer.poll(Duration.ofMillis(1000)); if(records.count()>0) { for (ConsumerRecord<String, MyObject> record : records) { System.out.println(record.topic() + "-" + record.key() +" : "+ record.value().getInfo()); } } }Kafka'da Stream bir topic'ten aldığı mesajları başka topiclere yazmak için kullanılır. Eğer sizin yarattığınız bir object'i alıp diğer topiclere yazmasını istiyorsanız bunun için Serde tipinde bir sınıf yaratılmalı, serializer ve deserializer içerisinde belirtilmeli :
import java.util.Map; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serializer; public class MyObjectSerde implements Serde<MyObject> { private KafkaJsonSerializer serializer = new KafkaJsonSerializer(); private KafkaJsonDeserializer deserializer = new KafkaJsonDeserializer(); @Override public void configure(Map<String, ?> configs, boolean isKey) { serializer.configure(configs, isKey); deserializer.configure(configs, isKey); } @Override public void close() { serializer.close(); deserializer.close(); } @Override public Serializer<MyObject> serializer() { return serializer; } @Override public Deserializer<MyObject> deserializer() { return deserializer; } }Bu şekilde yaratılan MyObjectSerde sınıfı stream'e aşağıdaki gibi verilebilir :
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, MyObjectSerde.class.getName());
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);Artık mesajı aldıktan sonra sizin commit etmeniz gerekir :
consumer.commitSync();Eğer mesaj işlemede hata var ise offset'i artırmak istemiyorsanız :
consumer.seek(new TopicPartition(record.topic(), record.partition()), record.offset());ile offset'i son mesajın offset değerine koyabilirsiniz. Bu şekilde başarısız işlemde mesaj kaybı oluşmaz.
public class TestConsumer { public final static String TOPIC_NAME = "testtopic"; public static void main(String[] args) { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test_consumer_group"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false); KafkaConsumer<String, String> consumer=new KafkaConsumer<String, String>(properties); consumer.subscribe(Collections.singletonList(TOPIC_NAME)); try { while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> record : records) { try { // islem basarili ise consumer.commitSync(); } catch (Exception e) { // hata var offset'i geri koy consumer.seek(new TopicPartition(record.topic(), record.partition()), record.offset()); break; } } } } finally { consumer.close(); } } }
import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutionException; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.CreateTopicsResult; import org.apache.kafka.clients.admin.KafkaAdminClient; import org.apache.kafka.clients.admin.ListTopicsResult; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.common.errors.TopicExistsException; public class AdminClientTest { //private static String bootstrapServers="localhost:9092"; public void createTopic(final String topicName, final int partitions) { final short replicationFactor = 1; Map<String, Object> defaultClientConfig = new HashMap<String, Object>(); defaultClientConfig.put("bootstrap.servers", bootstrapServers); // Create admin client try (final AdminClient adminClient = KafkaAdminClient.create(defaultClientConfig)) { try { // Define topic final NewTopic newTopic = new NewTopic(topicName, partitions, replicationFactor); // Create topic, which is async call. final CreateTopicsResult createTopicsResult = adminClient.createTopics(Collections.singleton(newTopic)); // Since the call is Async, Lets wait for it to complete. createTopicsResult.values().get(topicName).get(); System.out.println("created"); } catch (InterruptedException | ExecutionException e) { if (!(e.getCause() instanceof TopicExistsException)) { throw new RuntimeException(e.getMessage(), e); } System.out.println("TopicExistsException - Swallow this exception, just means the topic already exists"); } } } public void listTopic() { Map<String, Object> defaultClientConfig = new HashMap<String, Object>(); defaultClientConfig.put("bootstrap.servers", bootstrapServers); // Create admin client try (final AdminClient adminClient = KafkaAdminClient.create(defaultClientConfig)) { try { ListTopicsResult listTopicsResult=adminClient.listTopics(); Set<String> currentTopicList = listTopicsResult.names().get(); for (String string : currentTopicList) { System.out.println(string); } } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } } private void checkAndCreateTopic(final String topicNameTest) { Map<String, Object> defaultClientConfig = new HashMap<String, Object>(); defaultClientConfig.put("bootstrap.servers", bootstrapServers); try (final AdminClient adminClient = KafkaAdminClient.create(defaultClientConfig)) { try { boolean topicExists = adminClient.listTopics().names().get().stream().anyMatch( topicName -> topicName.equalsIgnoreCase(topicNameTest)); if(!topicExists) { // topic yok System.out.println("topic yok"); NewTopic newTopic = new NewTopic(topicNameTest, 1, (short)3); CreateTopicsResult createTopicsResult = adminClient.createTopics(Collections.singleton(newTopic)); createTopicsResult.values().get(topicNameTest).get(); }else { System.out.println("topic var. Yaratilmadi"); } } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } } public void deleteTopics() { Map<String, Object> defaultClientConfig = new HashMap<String, Object>(); defaultClientConfig.put("bootstrap.servers", bootstrapServers); try (final AdminClient adminClient = KafkaAdminClient.create(defaultClientConfig)) { try { ListTopicsResult listTopicsResult=adminClient.listTopics(); Set<String> currentTopicList = listTopicsResult.names().get(); adminClient.deleteTopics(currentTopicList); } catch (Exception e) { e.printStackTrace(); } } } public void listConsumerGroups() { Map<String, Object> defaultClientConfig = new HashMap<String, Object>(); defaultClientConfig.put("bootstrap.servers", bootstrapServers); try (final AdminClient adminClient = KafkaAdminClient.create(defaultClientConfig)) { List<ConsumerGroupListing> collectList; try { collectList = (((KafkaAdminClient) adminClient).listConsumerGroups()).all().get().stream() .filter(consumerGroupListing -> consumerGroupListing.groupId().startsWith("myconsumer_")) .collect(Collectors.toList()); for (ConsumerGroupListing consumerGroupListing2 : collectList) { System.out.println(consumerGroupListing2.groupId()); } } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } } public void dumpConsumerTopicOffsets(String groupid) { Map<String, Object> defaultClientConfig = new HashMap<String, Object>(); defaultClientConfig.put("bootstrap.servers", bootstrapServers); try (final AdminClient adminClient = KafkaAdminClient.create(defaultClientConfig)) { ListConsumerGroupOffsetsResult result = adminClient.listConsumerGroupOffsets(groupid); try { Map<TopicPartition, OffsetAndMetadata> offsetAndMetadata=result.partitionsToOffsetAndMetadata().get(); for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsetAndMetadata.entrySet()) { System.out.println(entry.getKey().topic()+"."+entry.getKey().partition()+":"+ entry.getValue().offset()); } } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } } public void dumpTopicOffsets(String topic) { Properties propertiesConsumer = new Properties(); propertiesConsumer.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); propertiesConsumer.put(ConsumerConfig.GROUP_ID_CONFIG, "adminclientestclientgroupid"); propertiesConsumer.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); propertiesConsumer.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); propertiesConsumer.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); propertiesConsumer.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); KafkaConsumer kafkaConsumer = new KafkaConsumer(propertiesConsumer); List<PartitionInfo> partitions = kafkaConsumer.partitionsFor(topic); for (PartitionInfo partitionInfo : partitions) { TopicPartition topicPartition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition()); Map<TopicPartition, Long> mapEndOffset = kafkaConsumer .endOffsets(Collections.singleton(topicPartition)); long lastOffset = mapEndOffset.get(topicPartition); System.out.println(partitionInfo.topic()+"."+partitionInfo.partition()+" last offset : "+lastOffset); } kafkaConsumer.close(); } public static void main(String[] args) { AdminClientTest adminClientTest=new AdminClientTest(); //adminClientTest.createTopic("adminclient_test_topic", 1); //adminClientTest.listTopic(); //adminClientTest.checkAndCreateTopic("adminclient_test_topic_check"); //adminClientTest.listTopic(); //adminClientTest.deleteTopics(); //System.out.println("Topics deleted..."); //adminClientTest.listTopic(); //adminClientTest.listConsumerGroups(); // adminClientTest.dumpConsumerTopicOffsets("test_client_group"); adminClientTest.dumpTopicOffsets("mytopic"); } }