Skip to content

Commit cfbc918

Browse files
authored
Create Test. java
Signed-off-by: rozer16 <[email protected]>
1 parent e33aa20 commit cfbc918

File tree

1 file changed

+45
-0
lines changed

1 file changed

+45
-0
lines changed

Test. java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
import org.apache.kafka.clients.consumer.*;
2+
import org.apache.kafka.common.*;
3+
import org.springframework.kafka.core.*;
4+
5+
import java.util.*;
6+
7+
public class KafkaPartitionReader {
8+
9+
private final KafkaTemplate<String, String> kafkaTemplate;
10+
11+
public KafkaPartitionReader(KafkaTemplate<String, String> kafkaTemplate) {
12+
this.kafkaTemplate = kafkaTemplate;
13+
}
14+
15+
public void readFromPartition(String topic, int partition) {
16+
Properties consumerProps = new Properties();
17+
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
18+
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
19+
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
20+
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group");
21+
22+
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(consumerProps);
23+
24+
TopicPartition topicPartition = new TopicPartition(topic, partition);
25+
kafkaConsumer.assign(Collections.singletonList(topicPartition));
26+
27+
kafkaConsumer.seekToBeginning(Collections.singletonList(topicPartition));
28+
29+
while (true) {
30+
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
31+
32+
for (ConsumerRecord<String, String> record : records) {
33+
System.out.printf("Partition: %d, Offset: %d, Key: %s, Value: %s\n",
34+
record.partition(), record.offset(), record.key(), record.value());
35+
}
36+
}
37+
}
38+
39+
public static void main(String[] args) {
40+
KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(new HashMap<>()));
41+
42+
KafkaPartitionReader reader = new KafkaPartitionReader(kafkaTemplate);
43+
reader.readFromPartition("my-topic", 0); // Specify your topic and partition number here
44+
}
45+
}

0 commit comments

Comments
 (0)