I recently had to build a Kafka producer to send millions of messages to multiple Kafka topics. The exact number of messages sent depends on various factors.
In the end, I wanted to take some metrics about my producer. I want to obtain the latest offset sum for all partitions of a given list of topics.
Here’s a Java snippet to get the sum of the offsets given a list of topics:
@Slf4j
public class KafkaTopicLatestOffsetChecker {
private static final String BOOTSTRAP_SERVERS = "...";
private static final Set<String> TOPIC_NAMES = Set.of("topic1", "topic2");
private static final boolean useSasl = true;
private static final String KAFKA_USERNAME = System.getenv("KAFKA_USERNAME");
private static final String KAFKA_PASSWORD = System.getenv("KAKFA_PASSWORD");
public static void main(String[] args) throws Exception {
HashMap<String, Object> kafkaConnectionProperties = new HashMap<>();
kafkaConnectionProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
if(useSasl) {
kafkaConnectionProperties.put(
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
"SASL_SSL"
);
kafkaConnectionProperties.put(
SaslConfigs.SASL_JAAS_CONFIG,
"org.apache.kafka.common.security.scram.ScramLoginModule required username=\"" + KAFKA_USERNAME + "\" password=\"" + StringEscapeUtils.escapeJava(KAFKA_PASSWORD) + "\";"
);
kafkaConnectionProperties.put(
SaslConfigs.SASL_MECHANISM,
"SCRAM-SHA-512"
);
}
log.info("Checking the latest offset for the following topics: {}", TOPIC_NAMES);
Map<TopicPartition, OffsetSpec> topicPartitionOffsetSpecMap = new HashMap<>();
Map<String, Long> latestOffsetsSum = new HashMap<>();
try (AdminClient kafkaClient = KafkaAdminClient.create(kafkaConnectionProperties)) {
// get the number of partitions for a topic
kafkaClient.describeTopics(TOPIC_NAMES).topicNameValues().forEach((topicName, topicDescription) -> {
try {
topicDescription.get().partitions().forEach(partition -> {
topicPartitionOffsetSpecMap.put(new TopicPartition(topicName, partition.partition()), OffsetSpec.latest());
});
} catch (Exception e) {
throw new RuntimeException("Unable to get topic partitions for: " + topicName, e);
}
});
// get the latest offset for a topic partition
ListOffsetsResult result = kafkaClient.listOffsets(topicPartitionOffsetSpecMap);
result.all().get().forEach((topicPartition, offsetAndMetadata) -> {
latestOffsetsSum.compute(topicPartition.topic(), (k, v) -> v == null ? offsetAndMetadata.offset() : v + offsetAndMetadata.offset());
log.info("Topic: {}, Partition: {}, Latest Offset: {}", topicPartition.topic(), topicPartition.partition(), offsetAndMetadata.offset());
});
}
log.info("Sum of Latest offsets per topic: {}", latestOffsetsSum);
}
}