Health checking for a Kafka application

Una de las características mas interesantes de Kubernetes es la posibilidad de determinar si una aplicación está o no operativa para servir operaciones, a través de los health checks. La mayoría de los frameworks proporcionan ya de caja, una serie funcionalidades, para comprobar no solo el estado de salud de la propia aplicación, sino también el de los sistemas con lo que interactúa, como, por ejemplo, bases de datos, sistemas de mensajería o caches.

En el caso de las aplicaciones que hacen uso de Apache Kafka, lograr una implementación fiable y eficiente que valide su correcto funcionamiento no es tarea baladí. Es por ello que en el presente articulo se pretende analizar algunas de las opciones más populares del mercado, así como dar algunas recomendaciones para desarrollar vuestra propia solución a medida.

Una ultima recomendación antes de emprender esta nueva aventura: en caso de no disponer de unos conocimientos básicos acerca de Kafka, echad un ojo al post escrito al respecto en este mismo blog.

Health Check

Antes de entrar lleno en los detalles de implementación y con el objetivo de tener todo el contexto, se realizará una breve introducción al concepto de los health checks en Kubernetes.

Los health checks, como su propio nombre indican, son un conjunto de mecanismos que Kubernetes proporciona para verificar que la aplicación, o para ser mas exactos, el contenedor que alberga la aplicación está sano y listo para atender peticiones.

Estos mecanismos se dividen en dos tipos de pruebas que el Kubelet realiza de forma periódica, sobre todos los contenedores desplegados en la plataforma.

Readiness probe

Prueba para determinar si un contenedor está preparado para recibir tráfico. En caso de que no lo estuviera, se elimina del balanceador de Kubernetes para dejar de recibir peticiones hasta que se encuentre operativo de nuevo.

Normalmente se establece una configuración mas cortoplacista que la del Liveness probe, con el objetivo de permitir que una aplicación se recupere por sí misma en caso de que estuviera saturada.

        readinessProbe:
          failureThreshold: 10
          httpGet:
            path: /health
            port: 8080
            scheme: HTTP
          initialDelaySeconds: 30
          periodSeconds: 5
          successThreshold: 1
          timeoutSeconds: 5

En la siguiente imagen se observa una representación de como funciona el Readiness probe.

Liveness probe

Prueba para determinar si un contenedor esta vivo. En caso de que no lo estuviera, se le aplica un reinicio para tratar de corregir posibles errores relacionados con un deadlock en la aplicación, que no le permitieran operar con normalidad.

Tal y como se describía anteriormente, se tiende a establecer una configuración mas largoplacista que la del Readyness probe, con la idea de que el contenedor deje de recibir peticiones y pueda recuperarse por si solo, antes de forzar un reinicio.

        livenessProbe:
          failureThreshold: 3
          httpGet:
            path: /health
            port: 8080
            scheme: HTTP
          initialDelaySeconds: 120
          periodSeconds: 60
          successThreshold: 1
          timeoutSeconds: 10

En la siguiente imagen se observa una representación de cómo funciona el Liveness probe.

Finalmente, comentar que, para poder llevar a cabo estas validaciones, las aplicaciones deben exponer de alguna de las siguientes vías su estado de salud:

  • HTTP: Kubernetes realiza una petición get al path y puerto especificado del contenedor, esperando un 200 como respuesta. Es la opción utilizada en la mayoría de los casos.
  • TCP Socket: Kubernetes establece una conexión TCP contra el puerto especificado del contenedor.
  • Exec: Kubernetes ejecuta el comando especificado en el contenedor, esperando un exit code 0 como respuesta.

Kafka

Entrando ya en materia y tal y como se detallaba en la introducción, los frameworks mas populares ofrecen ya de caja mecanismos para exponer un endpoint HTTP con el estado de la salud de la aplicación, en el que no solo se valida su situación, sino también el de los sistemas con lo que interactúa, como, por ejemplo, bases de datos, sistemas de mensajería o caches.

La relación es sencilla, si la aplicación requiere de forma indispensable de alguno de estos sistemas para funcionar, no tiene sentido que siga recibiendo peticiones que no podrá atender con éxito.

Dicho esto, es hora de analizar algunas de las soluciones mas populares de la actualidad.

Spring Actuator

Aunque a día de hoy el modulo para Kafka de Spring Actuator no se encuentre disponible, esta fue la implementación utilizada en su momento:

		try (AdminClient adminClient = AdminClient.create(this.kafkaAdmin.getConfig())) {
			DescribeClusterResult result = adminClient.describeCluster(
					this.describeOptions);
			String brokerId = result.controller().get().idString();
			int replicationFactor = getReplicationFactor(brokerId, adminClient);
			int nodes = result.nodes().get().size();
			Status status = nodes >= replicationFactor ? Status.UP : Status.DOWN;
			builder.status(status)
					.withDetail("clusterId", result.clusterId().get())
					.withDetail("brokerId", brokerId)
					.withDetail("nodes", nodes);
		}

Es, sin lugar a duda, una de las implementaciones más básicas que se pueden encontrar, en la que se pregunta a Kafka por el estado de cluster, con el que validar la conectividad entre ambos sistemas. Finalmente, se verifica que el numero de nodos es mayor al numero de replicas por partición, ya que no tendría sentido ubicar dos replicas de una misma partición en el mismo broker.

Esta solución es francamente pobre ya que no se está validando que el canal realmente funcione. Es decir, no hay un intercambio de mensajes entre ambos sistemas y tampoco ningún tipo de métrica que lo corrobore. Por otro lado, el hecho de validar si hay más brokers que replicas tampoco es una métrica acertada, pues cada tópico puede tener un factor de replicación distinto.

Micronaut

La implementación utilizada por micronaut es la siguiente:

        return controller.switchMap(node -> {
            String brokerId = node.idString();
            ConfigResource configResource = new ConfigResource(ConfigResource.Type.BROKER, brokerId);
            DescribeConfigsResult configResult = adminClient.describeConfigs(Collections.singletonList(configResource));
            Flowable<Map<ConfigResource, Config>> configs = Flowable.fromFuture(configResult.all());
            return configs.switchMap(resources -> {
                Config config = resources.get(configResource);
                ConfigEntry ce = config.get(REPLICATION_PROPERTY);
                int replicationFactor = Integer.parseInt(ce.value());
                return nodes.switchMap(nodeList -> clusterId.map(clusterIdString -> {
                    int nodeCount = nodeList.size();
                    HealthResult.Builder builder;
                    if (nodeCount >= replicationFactor) {
                        builder = HealthResult.builder(ID, HealthStatus.UP);
                    } else {
                        builder = HealthResult.builder(ID, HealthStatus.DOWN);
                    }
                    return builder
                            .details(CollectionUtils.mapOf(
                                    "brokerId", brokerId,
                                    "clusterId", clusterIdString,
                                    "nodes", nodeCount
                            )).build();
                }));
            });
        }).onErrorReturn(throwable ->
                HealthResult.builder(ID, HealthStatus.DOWN)
                        .exception(throwable).build()
        );

Micronaut sigue la senda de Spring y se decanta por el mismo modelo de solución, que, si bien puede servir para cumplir el expediente, no deja de ser una implementación francamente pobre. Es decir, únicamente se valida la conectividad entre ambos sistemas, por lo que sí el envío o recepción de mensajes dejara de funcionar por un problema no relacionado con la conectividad, no seria capaz de detectarlo.

Spring Cloud Stream Binder Kafka

La implementación utilizada por spring-cloud-stream-binder-kafka es la siguiente:

				Set<String> downMessages = new HashSet<>();
				final Map<String, KafkaMessageChannelBinder.TopicInformation> topicsInUse = KafkaBinderHealthIndicator.this.binder
						.getTopicsInUse();
				if (topicsInUse.isEmpty()) {
					try {
						this.metadataConsumer.listTopics(Duration.ofSeconds(this.timeout));
					}
					catch (Exception e) {
						return Health.down().withDetail("No topic information available",
								"Kafka broker is not reachable").build();
					}
					return Health.unknown().withDetail("No bindings found",
							"Kafka binder may not be bound to destinations on the broker").build();
				}
				else {
					for (String topic : topicsInUse.keySet()) {
						KafkaMessageChannelBinder.TopicInformation topicInformation = topicsInUse
								.get(topic);
						if (!topicInformation.isTopicPattern()) {
							List<PartitionInfo> partitionInfos = this.metadataConsumer
									.partitionsFor(topic);
							for (PartitionInfo partitionInfo : partitionInfos) {
								if (topicInformation.getPartitionInfos()
										.contains(partitionInfo)
										&& partitionInfo.leader().id() == -1) {
									downMessages.add(partitionInfo.toString());
								}
							}
						}
					}
				}
				if (downMessages.isEmpty()) {
					return Health.up().build();
				}
				else {
					return Health.down()
							.withDetail("Following partitions in use have no leaders: ",
									downMessages.toString())
							.build();
				}

En este modelo de solución se crea un nuevo consumidor para trata de confirmar que los tópicos utilizados por la aplicación existen en Kafka. Gracias a ello se logra verificar tanto la conectividad entre ambos sistemas, así como que la aplicación dispone de todos los tópicos necesarios para funcionar. Finalmente, se verifica que las particiones a consumir tienen al menos una replica líder, de lo contrario no podría atender las peticiones ni del productor ni del consumidor.

El mayor hándicap de esta implementación es que, de nuevo, y a pesar de que se comprueba que los tópicos requeridos por la aplicación existen, no sé esta validando que el canal realmente funciona. Por otro lado, el hecho de comprobar si cada partición tiene una replica líder tampoco es garantía de nada, siendo esto responsabilidad de un sistema de monitorización específico de infraestructura.

AspNetCore.Diagnostics.HealthChecks

La implementación de los health check de Kafka mas utilizada para ASP.NET Core es la siguiente:

            try
            {
                if (_producer == null)
                {
                    _producer = new ProducerBuilder<string, string>(_configuration).Build();
                }

                var message = new Message<string, string>()
                {
                    Key = "healthcheck-key",
                    Value = $"Check Kafka healthy on {DateTime.UtcNow}"
                };

                var result = await _producer.ProduceAsync(_topic, message);

                if (result.Status == PersistenceStatus.NotPersisted)
                {
                    return new HealthCheckResult(context.Registration.FailureStatus, description: $"Message is not persisted or a failure is raised on health check for kafka.");
                }

                return HealthCheckResult.Healthy();
            }
            catch (Exception ex)
            {
                return new HealthCheckResult(context.Registration.FailureStatus, exception: ex);
            }
        }

Esta aproximación toma el camino opuesto al de spring-cloud-stream-binder-kafka y opta por enviar un mensaje a un tópico de healtch check para verificar tanto la conectividad entre ambos sistemas, así como el correcto funcionamiento del canal.

Si bien se trata de una implementación mas certera que las anteriormente detalladas, se echa en falta una validación adicional para la parte consumidora, ya que se conforma con que el mensaje sea almacenado en Kafka.

A estas alturas, acertadamente habréis deducido que una buena implementación debe, al menos, verificar el correcto funcionamiento del canal, tanto para la parte productora como para la consumidora. De nada sirve ofrecer métricas adicionales si no son capaces de garantizar esta premisa básica.

Es por lo que, a continuación, se presentan dos alternativas custom con las que tratar de lograr una implementación fiable y eficiente que valide todo lo anteriormente descrito.

Messaging based health check

La primera propuesta es una evolución de la vista para AspNetCore.

		// Declare return variable
		Health health = null;

		try {

			// Create Kafka clients
			AdminClient adminClient = kafkaManager.getAdminClient();
			KafkaConsumer<byte[], byte[]> consumer = kafkaManager.getConsumer();
			KafkaProducer<byte[], byte[]> producer = kafkaManager.getProducer();

			// Get cluster information
			DescribeClusterResult cluster = adminClient.describeCluster(new DescribeClusterOptions().timeoutMs(2000));

			// Extract variables
			String clusterId = cluster.clusterId().get();
			String brokerId = cluster.controller().get().idString();
			int nodes = cluster.nodes().get().size();

			// Verifies that the channel works by sending a message to Kafka and reading it
			verifyChannel(producer, consumer);

			// Set health status as UP
			health = Health.up().withDetail("clusterId", clusterId).withDetail("brokerId", brokerId)
					.withDetail("nodes", nodes).build();

		} catch (Throwable ex) {
			// If any error occurs, set health status as DOWN
			return Health.down(ex).build();
		}

		// Return result
		return health;

En este modelo solución, se comienza por comprobar la conectividad entre ambos sistemas preguntado por el estado del cluster, información que posteriormente se utiliza para devolver en la respuesta. Finalmente, el productor envía un mensaje a un tópico HealthCheck, que él consumidor debe leer en un determinado espacio temporal para garantizar el correcto funcionamiento del canal.

Como el objetivo de realizar esta implementación lo mas eficiente posible, el admin client, el consumidor y el productor son singletons. Además, para minimizar el tiempo del primer poll se opta por asignar directamente todas las particiones del tópico al consumidor, en lugar de crear un grupo especifico para él.

	private KafkaConsumer<byte[], byte[]> createConsumer() {
		// Set Kafka consumer configuration properties
		Map<String, Object> props = createMap(entry(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers),
				entry(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, IKafkaConstants.OFFSET_RESET_LATEST),
				entry(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class),
				entry(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class));

		// Declare Kafka consumer
		KafkaConsumer<byte[], byte[]> kafkaConsumer = null;

		try {
			// Create Kafka consumer
			kafkaConsumer = new KafkaConsumer<>(props);

			// Determine which partitions to subscribe to, for now do all
			final List<PartitionInfo> partitionInfos = kafkaConsumer.partitionsFor(IKafkaConstants.TOPIC_NAME);

			// Check if topic exist
			if (partitionInfos != null) {

				// Pull out partitions, convert to topic partitions
				final Collection<TopicPartition> topicPartitions = new ArrayList<>();
				for (final PartitionInfo partitionInfo : partitionInfos) {
					topicPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
				}

				// Asign topic partitions to consumer
				kafkaConsumer.assign(topicPartitions);

				// Poll to initialize offset
				kafkaConsumer.poll(Duration.ofMillis(1000));

			} else {
				LOGGER.info("{} topic does not exist and health check tests will fail", IKafkaConstants.TOPIC_NAME);
			}
		} catch (Exception e) {
			LOGGER.info("Connection to Kafka could not be established");
		}

		// Return Kafka consumer
		return kafkaConsumer;
	}

En lo que al método de envío y recepción de los mensajes se refiere, no tiene mayor complicación. Se establece un UUID como clave de mensaje que el consumidor posteriormente utiliza para discriminar los eventos dirigidos a él.

	private void verifyChannel(KafkaProducer<byte[], byte[]> producer, KafkaConsumer<byte[], byte[]> consumer) {

		// Generate message key
		String msgKey = UUID.randomUUID().toString();

		// Create message
		ProducerRecord<byte[], byte[]> record = new ProducerRecord<byte[], byte[]>(IKafkaConstants.TOPIC_NAME,
				msgKey.getBytes(), IKafkaConstants.MSG.getBytes());

		// Send message without waiting to fill the buffer
		producer.send(record);
		producer.flush();

		// Message readed flag
		boolean msgRecived = false;

		// Attemps flag
		int attempts = 0;

		while (true) {

			// Increase attempts
			attempts++;

			// Get messages or wait the passed timeout if there is none
			final ConsumerRecords<byte[], byte[]> consumerRecords = consumer.poll(Duration.ofMillis(200));

			// Iterate messages
			Iterator<ConsumerRecord<byte[], byte[]>> consumerRecordIterator = consumerRecords.iterator();
			while (consumerRecordIterator.hasNext()) {

				// Get message
				ConsumerRecord<byte[], byte[]> msg = consumerRecordIterator.next();

				// Search for the message sent by the key
				if (new String(msg.key(), StandardCharsets.UTF_8).equals(msgKey)) {

					// Update message readed flag
					msgRecived = true;

					// Break loop
					break;
				}

			}

			// If no message found count is reached to threshold exit loop
			if (msgRecived || attempts == IKafkaConstants.MAX_NO_MESSAGE_FOUND_COUNT)
				break;
			else
				continue;
		}

		// If message not recived, throw error to set health status as DOWN
		if (!msgRecived) {
			throw new RuntimeException("The channel can not be verified");
		}
	}

Dado que el consumidor no pertenece a un grupo de consumidores y por tanto, no se almacena el offset de los mensajes consumidos en Kafka, se recomienda encarecidamente establecer un TTL bajo para el tópico en cuestión, de tal forma que cada vez el aplicativo arranque reprocese el menor numero de mensajes posibles. En caso de que el número de instancias o aplicaciones fuera elevado, se recomienda crear multiples topics de health check, agrupados por el criterio que mejor ajuste a cada caso de uso, si bien una organización basada en dominios funcionalidades podría ser un buen punto de comienzo.

Dicho esto, esta solución presenta el handicap de que no se esta trabajando con los productores y consumidores que realmente se utilizan en la aplicación, por lo que podría darse el caso de que alguno de ellos fallara, pero el mecanismo de health check no fuera capaz de detectarlo.

Offset based health check

Esta segunda propuesta se desmarca completamente de todas las descritas anteriormente y opta por analizar el estado de los consumidores reales de la aplicación.

		// Check if consumers continues consuming messages
		try {
			Map<TopicPartition, Long> consumersOffsets consumersOffsets = getConsumersOffsets();
			for (Entry<TopicPartition, Long> entry : consumersOffsets.entrySet()) {
				TopicPartition key = entry.getKey();
				Long actualConsumerOffset = entry.getValue();
				Long lastConsumerOffset = lastConsumerOffsets.get(key);
				if (actualConsumerOffset.equals(lastConsumerOffset)) {
					Long lastTopicOffset = lastTopicOffsets.get(key);
					if (lastConsumerOffset < lastTopicOffset) {
						throw new HealthException("The consumer is not consuming the new messages", details);
					}
				}
			}
		} finally {
			// Update last offsets values for next execution
			lastConsumerOffsets = consumersOffsets;
			lastTopicOffsets = topicsOffsets;
		}

En este modelo de solución se comprueba si los consumidores utilizados en la aplicación realmente están funcionando, en base al offset. Es decir, cada vez que se realiza un health check, se compara el si el offset del consumidor ha avanzado respecto a la anterior prueba y si no fuera así, se valida si debería haberlo en base al offset del tópico. Si el offset del tópico sigue creciendo, pero el del consumidor se encuentra estancando, es un claro indicativo de que algo va no va bien.

Esta implementación es la más eficiente de todas, ya que tiene la ventaja de que no requiere de un intercambio de mensajes, al mismo tiempo que valida que el consumidor real de la aplicación funciona correctamente. Ahora bien, si el aplicativo hiciera uso de un productor no se validaría.

Conclusiones

En conclusión, en el mercado se pueden encontrar distintas soluciones para validar el estado de salud de aquellas aplicaciones desplegadas en Kubernetes que hagan uso de Apache Kafka.

Todas tienen sus puntos positivos y negativos, pero si se requiere de una implementación fiable es muy posible que tengas que realizar un desarrollo custom que cumpla con los requisitos descritos a lo largo del articulo.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s