3

I have something like below which works well, but I would prefer checking health without sending any message, (not only checking socket connection). I know Kafka has something like KafkaHealthIndicator out of the box, does someone have experience or example using it ?

   public class KafkaHealthIndicator implements HealthIndicator {
   private final Logger log = LoggerFactory.getLogger(KafkaHealthIndicator.class);

   private KafkaTemplate<String, String> kafka;

   public KafkaHealthIndicator(KafkaTemplate<String, String> kafka) {
   this.kafka = kafka;
   }

  @Override
  public Health health() {
  try {
     kafka.send("kafka-health-indicator", "❥").get(100, TimeUnit.MILLISECONDS);
  } catch (InterruptedException | ExecutionException | TimeoutException e) {
      return Health.down(e).build();
  }
  return Health.up().build();
 }
}
Michal Foksa
  • 9,725
  • 8
  • 43
  • 63
icecool09
  • 49
  • 1
  • 3
  • I have seen developers of my organisation to just copy this code without any change. Whenever you are sending any kafka packet to a topic you should add a processing time.And this topic could be used by multiple services in microservice system so better to send service name also. Something like : kafka.send("kafka-health-indicator", "ProcessingTime : "+ LocalDateTime.now(ZoneOffset.UTC) + " , Service : myService"); – akash777.sharma May 13 '22 at 06:20

2 Answers2

4

kafkaAdminClient.describeCluster(..) is point where Kafka availability is being tested.

@Configuration
public class KafkaConfig {

    @Autowired
    private KafkaAdmin kafkaAdmin;

    @Bean
    public AdminClient kafkaAdminClient() {
        return AdminClient.create(kafkaAdmin.getConfigurationProperties());
    }

    @Bean
    public HealthIndicator kafkaHealthIndicator(AdminClient kafkaAdminClient) {
        final DescribeClusterOptions options = new DescribeClusterOptions()
            .timeoutMs(1000);

        return new AbstractHealthIndicator() {
            @Override
            protected void doHealthCheck(Health.Builder builder) throws Exception {
                // When Kafka is not connected, describeCluster() method throws
                // an exception which in turn sets this indicator as being DOWN.
                kafkaAdminClient.describeCluster(options);

                builder.up().build();
            }
        };
    }

}

For more verbose probe add:

DescribeClusterResult clusterDesc = kafkaAdminClient.describeCluster(options);
builder.up()
    .withDetail("clusterId", clusterDesc.clusterId().get())
    .withDetail("nodeCount", clusterDesc.nodes().get().size())
    .build();
Michal Foksa
  • 9,725
  • 8
  • 43
  • 63
  • please i can't access to options variable in this line kafkaAdminClient.describeCluster(options); – James Jul 26 '21 at 14:56
  • @James There were few typos. Have a look now. – Michal Foksa Jul 26 '21 at 15:20
  • thank you. now it's ok.but im getting error TimeoutException Timed out waiting to send the call. – James Jul 26 '21 at 15:55
  • @James Guess you are not connected to Kafka – Michal Foksa Jul 26 '21 at 18:59
  • Foska. i think it's an authentication problem. do you know which authentication properties i should provide to DescribeClusterOptions – James Jul 27 '21 at 13:31
  • @James No need to provide anything to options. You must have correctly set Kafka configuration in Spring Boot context (application.yaml/properties) - spring.kafka.* properties. It all depends on a setup you have. Once application connects to Kafka, healt indicator will too. – Michal Foksa Jul 27 '21 at 14:21
  • Let us [continue this discussion in chat](https://chat.stackoverflow.com/rooms/235343/discussion-between-james-and-michal-foksa). – James Jul 27 '21 at 14:40
  • 2
    I'm not sure if this solution is specific to a certain version but, in my case, it doesn't work. When you stop the Kafka service after the spring-boot service starts up this still continues to return `UP`. I'm not sure why this is happening though (I'm guessing something related to admin client always working with KafkaFuture objects?) So I used `kafkaAdminClient.listTopics(new ListTopicsOptions().timeoutMs(5000)).names().get(5000, MILLISECONDS)` to force-wait for the result of a KafkaFuture and now, it throws a `TimeoutException` whenever the Kafka service is down. – emrekgn Nov 16 '21 at 15:23
  • describeCluster() seems to no longer throw exceptions so the only way to fail that call is if you try to retrieve the results from the futures and it fails. – Aithusa May 13 '22 at 15:51
0

Use the AdminClient API to check the health of the cluster via describing the cluster and/or the topic(s) you'll be interacting with, and verifying those topics have the required number of insync replicas, for example

Kafka has something like KafkaHealthIndicator out of the box

It doesn't. Spring's Kafka integration might

OneCricketeer
  • 151,199
  • 17
  • 111
  • 216