Aşağıdaki Kafka'da bir topic'e üç mesaj ekliyoruz :
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serdes;
import com.thy.mercury.medusa.typeb.ValidationException;
public class ProcuderApp {
public final static String TOPIC_NAME = "test-topic";
public static void main(String[] args) throws ValidationException {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, Serdes.String().serializer().getClass().getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, Serdes.String().serializer().getClass().getName());
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
produceMessage(producer, "TEST");
produceMessage(producer, "TEST 2");
produceMessage(producer, "Test 3");
producer.close();
}
private static void produceMessage(KafkaProducer<String, String> producer,
String message) {
ProducerRecord<String, String> record =
new ProducerRecord<>(TOPIC_NAME, "key1", message);
producer.send(record);
System.out.println(message + " sent");
}
}
localhost:9092 Kafka sunucunun adresidir.
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"10.11.11.11:6667,10.11.11.12:6667,10.11.11.13:6667");
şeklinde birden fazla node var ise verilebilir.
Key ve value olarak string kullanılacağını belirtiyoruz. KafkaProducer yaratıp yönteme veriyor ve o yöntem de mesajları insert ediyor. En son producer'ı kapatıyoruz.