From cfbc9182fe187286cbce02b6a11936941b948c6e Mon Sep 17 00:00:00 2001 From: rozer16 Date: Tue, 16 May 2023 00:41:06 +0530 Subject: [PATCH] Create Test. java Signed-off-by: rozer16 --- Test. java | 45 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) create mode 100644 Test. java diff --git a/Test. java b/Test. java new file mode 100644 index 00000000..e44a27fd --- /dev/null +++ b/Test. java @@ -0,0 +1,45 @@ +import org.apache.kafka.clients.consumer.*; +import org.apache.kafka.common.*; +import org.springframework.kafka.core.*; + +import java.util.*; + +public class KafkaPartitionReader { + + private final KafkaTemplate kafkaTemplate; + + public KafkaPartitionReader(KafkaTemplate kafkaTemplate) { + this.kafkaTemplate = kafkaTemplate; + } + + public void readFromPartition(String topic, int partition) { + Properties consumerProps = new Properties(); + consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group"); + + KafkaConsumer kafkaConsumer = new KafkaConsumer<>(consumerProps); + + TopicPartition topicPartition = new TopicPartition(topic, partition); + kafkaConsumer.assign(Collections.singletonList(topicPartition)); + + kafkaConsumer.seekToBeginning(Collections.singletonList(topicPartition)); + + while (true) { + ConsumerRecords records = kafkaConsumer.poll(Duration.ofMillis(100)); + + for (ConsumerRecord record : records) { + System.out.printf("Partition: %d, Offset: %d, Key: %s, Value: %s\n", + record.partition(), record.offset(), record.key(), record.value()); + } + } + } + + public static void main(String[] args) { + KafkaTemplate kafkaTemplate = new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(new HashMap<>())); + + KafkaPartitionReader reader = new KafkaPartitionReader(kafkaTemplate); + reader.readFromPartition("my-topic", 0); // Specify your topic and partition number here + } +}