Kafka latest offsets sum

I recently had to build a Kafka producer to send millions of messages to multiple Kafka topics. The exact number of messages sent depends on various factors.


In the end, I wanted to take some metrics about my producer. I want to obtain the latest offset sum for all partitions of a given list of topics.

Here’s a Java snippet to get the sum of the offsets given a list of topics:

@Slf4j
public class KafkaTopicLatestOffsetChecker {

    private static final String BOOTSTRAP_SERVERS = "...";
    private static final Set<String> TOPIC_NAMES = Set.of("topic1", "topic2");
    private static final boolean useSasl = true;
    private static final String KAFKA_USERNAME = System.getenv("KAFKA_USERNAME");
    private static final String KAFKA_PASSWORD = System.getenv("KAKFA_PASSWORD");


    public static void main(String[] args) throws Exception {
        HashMap<String, Object> kafkaConnectionProperties = new HashMap<>();
        kafkaConnectionProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);

        if(useSasl) {
            kafkaConnectionProperties.put(
                    CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
                    "SASL_SSL"
            );
            kafkaConnectionProperties.put(
                    SaslConfigs.SASL_JAAS_CONFIG,
                    "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"" + KAFKA_USERNAME + "\" password=\"" + StringEscapeUtils.escapeJava(KAFKA_PASSWORD) + "\";"
            );
            kafkaConnectionProperties.put(
                    SaslConfigs.SASL_MECHANISM,
                    "SCRAM-SHA-512"
            );
        }

        log.info("Checking the latest offset for the following topics: {}", TOPIC_NAMES);

        Map<TopicPartition, OffsetSpec> topicPartitionOffsetSpecMap = new HashMap<>();
        Map<String, Long> latestOffsetsSum = new HashMap<>();

        try (AdminClient kafkaClient =  KafkaAdminClient.create(kafkaConnectionProperties)) {
            // get the number of partitions for a topic
            kafkaClient.describeTopics(TOPIC_NAMES).topicNameValues().forEach((topicName, topicDescription) -> {
                try {
                    topicDescription.get().partitions().forEach(partition -> {
                        topicPartitionOffsetSpecMap.put(new TopicPartition(topicName, partition.partition()), OffsetSpec.latest());
                    });
                } catch (Exception e) {
                    throw new RuntimeException("Unable to get topic partitions for: " + topicName, e);
                }
            });

            // get the latest offset for a topic partition
            ListOffsetsResult result = kafkaClient.listOffsets(topicPartitionOffsetSpecMap);

            result.all().get().forEach((topicPartition, offsetAndMetadata) -> {
                latestOffsetsSum.compute(topicPartition.topic(), (k, v) -> v == null ? offsetAndMetadata.offset() : v + offsetAndMetadata.offset());

                log.info("Topic: {}, Partition: {}, Latest Offset: {}", topicPartition.topic(), topicPartition.partition(), offsetAndMetadata.offset());
            });
        }

        log.info("Sum of Latest offsets per topic: {}", latestOffsetsSum);
    }
}
Scroll to Top
Share via
Copy link