How to implement Exactly once senario in kafka consumer and producer

I am trying to implement Exactly Once scenario. using Transactions.

properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");

properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

properties.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "prod-1");

Producer producer = new KafkaProducer<>(properties);

producer.initTransactions();

try {

producer.beginTransaction();

RecordMetadata rm = null;

for (int i = 0; i < 100; i++) {

String record = Arrays.asList(names).get(rand.nextInt(2));

System.out.println(record);

producer.send(newRandomTransaction(record));

}

producer.commitTransaction();

} catch (ProducerFencedException e) {

producer.close();

} catch (KafkaException e) {

producer.abortTransaction();

}

producer.close();

but when I start my application, no message inserting into Kafka topic ....

Read More »

By: StackOverFlow - 6 days ago

Related Posts