Bu örnekte bir topic'e mesajlar ekleniyor. Stream ise eklenen mesajları alıyor kelimelerin kaç kere geçtiğini buluyor. Ve son durumu saklıyor ve yeni mesajlar geldikçe kelime ve kaç kere geçtiği listesi güncelleniyor.
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.state.KeyValueStore;
public class StreamApp {
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.STATE_DIR_CONFIG, "C:\\Users\\testuser\\AppData\\Local\\Temp");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> textLines = builder.stream("stream-test-topic1",
Consumed.with(Serdes.String(), Serdes.String()));
KTable<String, Long> wordCounts = textLines.flatMapValues(new ValueMapper<String, Iterable<String>>() {
@Override
public Iterable<String> apply(String textLine) {
return Arrays.asList(textLine.toLowerCase().split("\\W+"));
}
}).groupBy(new KeyValueMapper<String, String, String>() {
@Override
public String apply(String key, String word) {
return word;
}
}).count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"));
wordCounts.toStream().to("stream-test-word-count-topic", Produced.with(Serdes.String(), Serdes.Long()));
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
Bu stream state tutmasından dolayı Kafka'ya bu stream için state'lerin nerede saklaması gerektiği veriliyor :
props.put(StreamsConfig.STATE_DIR_CONFIG, "C:\\Users\\testuser\\AppData\\Local\\Temp");
builder.stream() ile yeni bir stream yaratılıyor. stream-test-topic1 ile stream'de okunacak topic'in adı veriliyor. Stream sürekli bu topic'e eklenen mesajları okuyacakır.
flatMapValues() yöntemi ile .split("\\W+") kullanılarak mesajlar kelimelerine ayrılır. groupBy() yöntemi ile aynı kelimeler gruplanır ve count() yöntemi ile her grubun sayısı alınır.
KTable
wordCounts ile sonuçlar KTable formatında tutulurlar.
wordCounts.toStream().to("stream-test-word-count-topic", Produced.with(Serdes.String(), Serdes.Long()));
ile üretilen sonuç stream-test-word-count-topic adlı topic'e yazılır.
Bir uygulama yazıp stream-test-topic1 topic'e mesaj eklerseniz stream word - count şeklinde sayacak ve onu stream-test-word-count-topic adlı topic'e ekleyecektir. Başka bir uygulama yapıp bu topic'den consume edilirse kelime-sayı şeklinde bilgileri alabilirsiniz.
Aşağıdaki gibi consume edilip basılabilir :
while (true) {
ConsumerRecords<String, Long> records = consumer.poll(Duration.ofMillis(1000));
if(records.count()>0) {
for (ConsumerRecord<String, Long> record : records) {
System.out.println(record.key() +" : "+ record.value());
}
}
}
Bu şekilde consume edildiğinde şu ana kadar her kelimenin toplam kaç kere geçtiği ekranda görülecektir.