Örnek

Bir Topic'e Eklenen Mesajlardaki Kelimelerin Kaç Kere Geçtiğini Sayan Stream Örneği

Bu örnekte bir topic'e mesajlar ekleniyor. Stream ise eklenen mesajları alıyor kelimelerin kaç kere geçtiğini buluyor. Ve son durumu saklıyor ve yeni mesajlar geldikçe kelime ve kaç kere geçtiği listesi güncelleniyor.
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.state.KeyValueStore;
public class StreamApp {
	public static void main(String[] args) {
		Properties props = new Properties();
		props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
		props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
		props.put(StreamsConfig.STATE_DIR_CONFIG, "C:\\Users\\testuser\\AppData\\Local\\Temp");
		props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
		props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
		StreamsBuilder builder = new StreamsBuilder();
		KStream<String, String> textLines = builder.stream("stream-test-topic1",
				Consumed.with(Serdes.String(), Serdes.String()));
		KTable<String, Long> wordCounts = textLines.flatMapValues(new ValueMapper<String, Iterable<String>>() {
			@Override
			public Iterable<String> apply(String textLine) {
				return Arrays.asList(textLine.toLowerCase().split("\\W+"));
			}
		}).groupBy(new KeyValueMapper<String, String, String>() {
			@Override
			public String apply(String key, String word) {
				return word;
			}
		}).count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"));
		wordCounts.toStream().to("stream-test-word-count-topic", Produced.with(Serdes.String(), Serdes.Long()));
		KafkaStreams streams = new KafkaStreams(builder.build(), props);
		streams.start();
		
		Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
	}
}
Bu stream state tutmasından dolayı Kafka'ya bu stream için state'lerin nerede saklaması gerektiği veriliyor :
props.put(StreamsConfig.STATE_DIR_CONFIG, "C:\\Users\\testuser\\AppData\\Local\\Temp");
builder.stream() ile yeni bir stream yaratılıyor. stream-test-topic1 ile stream'de okunacak topic'in adı veriliyor. Stream sürekli bu topic'e eklenen mesajları okuyacakır.
flatMapValues() yöntemi ile .split("\\W+") kullanılarak mesajlar kelimelerine ayrılır. groupBy() yöntemi ile aynı kelimeler gruplanır ve count() yöntemi ile her grubun sayısı alınır.
KTable wordCounts ile sonuçlar KTable formatında tutulurlar.
wordCounts.toStream().to("stream-test-word-count-topic", Produced.with(Serdes.String(), Serdes.Long()));
ile üretilen sonuç stream-test-word-count-topic adlı topic'e yazılır.
Bir uygulama yazıp stream-test-topic1 topic'e mesaj eklerseniz stream word - count şeklinde sayacak ve onu stream-test-word-count-topic adlı topic'e ekleyecektir. Başka bir uygulama yapıp bu topic'den consume edilirse kelime-sayı şeklinde bilgileri alabilirsiniz.
Aşağıdaki gibi consume edilip basılabilir :
while (true) {
            	
	ConsumerRecords<String, Long> records = consumer.poll(Duration.ofMillis(1000));
	
	if(records.count()>0) {
		for (ConsumerRecord<String, Long> record : records) {						
			System.out.println(record.key() +" : "+  record.value());
		}
	}
}

Bu şekilde consume edildiğinde şu ana kadar her kelimenin toplam kaç kere geçtiği ekranda görülecektir.
zafer.teker , 14.05.2020

Bu Sayfayı Paylaş:

Fibiler Üyelerinin Yorumları


Tüm üyeler içeriklere yorum ekleyerek katkıda bulunabilir : Yorum Gir

Misafir Yorumları




Bu Sayfayı Paylaş:

İletişim Bilgileri

Takip Et

Her Hakkı Saklıdır. Bu sitede yayınlanan tüm bilgi ve fikirlerin kullanımından fibiler.com sorumlu değildir. Bu sitede üretilmiş , derlenmiş içerikleri, fibiler.com'u kaynak göstermek koşuluyla kendi sitenizde kullanılabilirsiniz. Ancak telif hakkı olan içeriklerin hakları sahiplerine aittir