
Una de las características mas interesantes de Kubernetes es la posibilidad de determinar si una aplicación está o no operativa para servir operaciones, a través de los health checks. La mayoría de los frameworks proporcionan ya de caja, una serie funcionalidades, para comprobar no solo el estado de salud de la propia aplicación, sino también el de los sistemas con lo que interactúa, como, por ejemplo, bases de datos, sistemas de mensajería o caches.
En el caso de las aplicaciones que hacen uso de Apache Kafka, lograr una implementación fiable y eficiente que valide su correcto funcionamiento no es tarea baladí. Es por ello que en el presente articulo se pretende analizar algunas de las opciones más populares del mercado, así como dar algunas recomendaciones para desarrollar vuestra propia solución a medida.
Una ultima recomendación antes de emprender esta nueva aventura: en caso de no disponer de unos conocimientos básicos acerca de Kafka, echad un ojo al post escrito al respecto en este mismo blog.
Health Check
Antes de entrar lleno en los detalles de implementación y con el objetivo de tener todo el contexto, se realizará una breve introducción al concepto de los health checks en Kubernetes.
Los health checks, como su propio nombre indican, son un conjunto de mecanismos que Kubernetes proporciona para verificar que la aplicación, o para ser mas exactos, el contenedor que alberga la aplicación está sano y listo para atender peticiones.
Estos mecanismos se dividen en dos tipos de pruebas que el Kubelet realiza de forma periódica, sobre todos los contenedores desplegados en la plataforma.
Readiness probe
Prueba para determinar si un contenedor está preparado para recibir tráfico. En caso de que no lo estuviera, se elimina del balanceador de Kubernetes para dejar de recibir peticiones hasta que se encuentre operativo de nuevo.
Normalmente se establece una configuración mas cortoplacista que la del Liveness probe, con el objetivo de permitir que una aplicación se recupere por sí misma en caso de que estuviera saturada.
readinessProbe:
failureThreshold: 10
httpGet:
path: /health
port: 8080
scheme: HTTP
initialDelaySeconds: 30
periodSeconds: 5
successThreshold: 1
timeoutSeconds: 5
En la siguiente imagen se observa una representación de como funciona el Readiness probe.
Liveness probe
Prueba para determinar si un contenedor esta vivo. En caso de que no lo estuviera, se le aplica un reinicio para tratar de corregir posibles errores relacionados con un deadlock en la aplicación, que no le permitieran operar con normalidad.
Tal y como se describía anteriormente, se tiende a establecer una configuración mas largoplacista que la del Readyness probe, con la idea de que el contenedor deje de recibir peticiones y pueda recuperarse por si solo, antes de forzar un reinicio.
livenessProbe:
failureThreshold: 3
httpGet:
path: /health
port: 8080
scheme: HTTP
initialDelaySeconds: 120
periodSeconds: 60
successThreshold: 1
timeoutSeconds: 10
En la siguiente imagen se observa una representación de cómo funciona el Liveness probe.
Finalmente, comentar que, para poder llevar a cabo estas validaciones, las aplicaciones deben exponer de alguna de las siguientes vías su estado de salud:
- HTTP: Kubernetes realiza una petición get al path y puerto especificado del contenedor, esperando un 200 como respuesta. Es la opción utilizada en la mayoría de los casos.
- TCP Socket: Kubernetes establece una conexión TCP contra el puerto especificado del contenedor.
- Exec: Kubernetes ejecuta el comando especificado en el contenedor, esperando un exit code 0 como respuesta.
Kafka
Entrando ya en materia y tal y como se detallaba en la introducción, los frameworks mas populares ofrecen ya de caja mecanismos para exponer un endpoint HTTP con el estado de la salud de la aplicación, en el que no solo se valida su situación, sino también el de los sistemas con lo que interactúa, como, por ejemplo, bases de datos, sistemas de mensajería o caches.
La relación es sencilla, si la aplicación requiere de forma indispensable de alguno de estos sistemas para funcionar, no tiene sentido que siga recibiendo peticiones que no podrá atender con éxito.
Dicho esto, es hora de analizar algunas de las soluciones mas populares de la actualidad.
Spring Actuator
Aunque a día de hoy el modulo para Kafka de Spring Actuator no se encuentre disponible, esta fue la implementación utilizada en su momento:
try (AdminClient adminClient = AdminClient.create(this.kafkaAdmin.getConfig())) {
DescribeClusterResult result = adminClient.describeCluster(
this.describeOptions);
String brokerId = result.controller().get().idString();
int replicationFactor = getReplicationFactor(brokerId, adminClient);
int nodes = result.nodes().get().size();
Status status = nodes >= replicationFactor ? Status.UP : Status.DOWN;
builder.status(status)
.withDetail("clusterId", result.clusterId().get())
.withDetail("brokerId", brokerId)
.withDetail("nodes", nodes);
}
Es, sin lugar a duda, una de las implementaciones más básicas que se pueden encontrar, en la que se pregunta a Kafka por el estado de cluster, con el que validar la conectividad entre ambos sistemas. Finalmente, se verifica que el numero de nodos es mayor al numero de replicas por partición, ya que no tendría sentido ubicar dos replicas de una misma partición en el mismo broker.
Esta solución es francamente pobre ya que no se está validando que el canal realmente funcione. Es decir, no hay un intercambio de mensajes entre ambos sistemas y tampoco ningún tipo de métrica que lo corrobore. Por otro lado, el hecho de validar si hay más brokers que replicas tampoco es una métrica acertada, pues cada tópico puede tener un factor de replicación distinto.
Micronaut
La implementación utilizada por micronaut es la siguiente:
return controller.switchMap(node -> {
String brokerId = node.idString();
ConfigResource configResource = new ConfigResource(ConfigResource.Type.BROKER, brokerId);
DescribeConfigsResult configResult = adminClient.describeConfigs(Collections.singletonList(configResource));
Flowable<Map<ConfigResource, Config>> configs = Flowable.fromFuture(configResult.all());
return configs.switchMap(resources -> {
Config config = resources.get(configResource);
ConfigEntry ce = config.get(REPLICATION_PROPERTY);
int replicationFactor = Integer.parseInt(ce.value());
return nodes.switchMap(nodeList -> clusterId.map(clusterIdString -> {
int nodeCount = nodeList.size();
HealthResult.Builder builder;
if (nodeCount >= replicationFactor) {
builder = HealthResult.builder(ID, HealthStatus.UP);
} else {
builder = HealthResult.builder(ID, HealthStatus.DOWN);
}
return builder
.details(CollectionUtils.mapOf(
"brokerId", brokerId,
"clusterId", clusterIdString,
"nodes", nodeCount
)).build();
}));
});
}).onErrorReturn(throwable ->
HealthResult.builder(ID, HealthStatus.DOWN)
.exception(throwable).build()
);
Micronaut sigue la senda de Spring y se decanta por el mismo modelo de solución, que, si bien puede servir para cumplir el expediente, no deja de ser una implementación francamente pobre. Es decir, únicamente se valida la conectividad entre ambos sistemas, por lo que sí el envío o recepción de mensajes dejara de funcionar por un problema no relacionado con la conectividad, no seria capaz de detectarlo.
Spring Cloud Stream Binder Kafka
La implementación utilizada por spring-cloud-stream-binder-kafka es la siguiente:
Set<String> downMessages = new HashSet<>();
final Map<String, KafkaMessageChannelBinder.TopicInformation> topicsInUse = KafkaBinderHealthIndicator.this.binder
.getTopicsInUse();
if (topicsInUse.isEmpty()) {
try {
this.metadataConsumer.listTopics(Duration.ofSeconds(this.timeout));
}
catch (Exception e) {
return Health.down().withDetail("No topic information available",
"Kafka broker is not reachable").build();
}
return Health.unknown().withDetail("No bindings found",
"Kafka binder may not be bound to destinations on the broker").build();
}
else {
for (String topic : topicsInUse.keySet()) {
KafkaMessageChannelBinder.TopicInformation topicInformation = topicsInUse
.get(topic);
if (!topicInformation.isTopicPattern()) {
List<PartitionInfo> partitionInfos = this.metadataConsumer
.partitionsFor(topic);
for (PartitionInfo partitionInfo : partitionInfos) {
if (topicInformation.getPartitionInfos()
.contains(partitionInfo)
&& partitionInfo.leader().id() == -1) {
downMessages.add(partitionInfo.toString());
}
}
}
}
}
if (downMessages.isEmpty()) {
return Health.up().build();
}
else {
return Health.down()
.withDetail("Following partitions in use have no leaders: ",
downMessages.toString())
.build();
}
En este modelo de solución se crea un nuevo consumidor para trata de confirmar que los tópicos utilizados por la aplicación existen en Kafka. Gracias a ello se logra verificar tanto la conectividad entre ambos sistemas, así como que la aplicación dispone de todos los tópicos necesarios para funcionar. Finalmente, se verifica que las particiones a consumir tienen al menos una replica líder, de lo contrario no podría atender las peticiones ni del productor ni del consumidor.
El mayor hándicap de esta implementación es que, de nuevo, y a pesar de que se comprueba que los tópicos requeridos por la aplicación existen, no sé esta validando que el canal realmente funciona. Por otro lado, el hecho de comprobar si cada partición tiene una replica líder tampoco es garantía de nada, siendo esto responsabilidad de un sistema de monitorización específico de infraestructura.
AspNetCore.Diagnostics.HealthChecks
La implementación de los health check de Kafka mas utilizada para ASP.NET Core es la siguiente:
try
{
if (_producer == null)
{
_producer = new ProducerBuilder<string, string>(_configuration).Build();
}
var message = new Message<string, string>()
{
Key = "healthcheck-key",
Value = $"Check Kafka healthy on {DateTime.UtcNow}"
};
var result = await _producer.ProduceAsync(_topic, message);
if (result.Status == PersistenceStatus.NotPersisted)
{
return new HealthCheckResult(context.Registration.FailureStatus, description: $"Message is not persisted or a failure is raised on health check for kafka.");
}
return HealthCheckResult.Healthy();
}
catch (Exception ex)
{
return new HealthCheckResult(context.Registration.FailureStatus, exception: ex);
}
}
Esta aproximación toma el camino opuesto al de spring-cloud-stream-binder-kafka y opta por enviar un mensaje a un tópico de healtch check para verificar tanto la conectividad entre ambos sistemas, así como el correcto funcionamiento del canal.
Si bien se trata de una implementación mas certera que las anteriormente detalladas, se echa en falta una validación adicional para la parte consumidora, ya que se conforma con que el mensaje sea almacenado en Kafka.
A estas alturas, acertadamente habréis deducido que una buena implementación debe, al menos, verificar el correcto funcionamiento del canal, tanto para la parte productora como para la consumidora. De nada sirve ofrecer métricas adicionales si no son capaces de garantizar esta premisa básica.
Es por lo que, a continuación, se presentan dos alternativas custom con las que tratar de lograr una implementación fiable y eficiente que valide todo lo anteriormente descrito.
Messaging based health check
La primera propuesta es una evolución de la vista para AspNetCore.
// Declare return variable
Health health = null;
try {
// Create Kafka clients
AdminClient adminClient = kafkaManager.getAdminClient();
KafkaConsumer<byte[], byte[]> consumer = kafkaManager.getConsumer();
KafkaProducer<byte[], byte[]> producer = kafkaManager.getProducer();
// Get cluster information
DescribeClusterResult cluster = adminClient.describeCluster(new DescribeClusterOptions().timeoutMs(2000));
// Extract variables
String clusterId = cluster.clusterId().get();
String brokerId = cluster.controller().get().idString();
int nodes = cluster.nodes().get().size();
// Verifies that the channel works by sending a message to Kafka and reading it
verifyChannel(producer, consumer);
// Set health status as UP
health = Health.up().withDetail("clusterId", clusterId).withDetail("brokerId", brokerId)
.withDetail("nodes", nodes).build();
} catch (Throwable ex) {
// If any error occurs, set health status as DOWN
return Health.down(ex).build();
}
// Return result
return health;
En este modelo solución, se comienza por comprobar la conectividad entre ambos sistemas preguntado por el estado del cluster, información que posteriormente se utiliza para devolver en la respuesta. Finalmente, el productor envía un mensaje a un tópico HealthCheck, que él consumidor debe leer en un determinado espacio temporal para garantizar el correcto funcionamiento del canal.
Como el objetivo de realizar esta implementación lo mas eficiente posible, el admin client, el consumidor y el productor son singletons. Además, para minimizar el tiempo del primer poll se opta por asignar directamente todas las particiones del tópico al consumidor, en lugar de crear un grupo especifico para él.
private KafkaConsumer<byte[], byte[]> createConsumer() {
// Set Kafka consumer configuration properties
Map<String, Object> props = createMap(entry(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers),
entry(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, IKafkaConstants.OFFSET_RESET_LATEST),
entry(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class),
entry(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class));
// Declare Kafka consumer
KafkaConsumer<byte[], byte[]> kafkaConsumer = null;
try {
// Create Kafka consumer
kafkaConsumer = new KafkaConsumer<>(props);
// Determine which partitions to subscribe to, for now do all
final List<PartitionInfo> partitionInfos = kafkaConsumer.partitionsFor(IKafkaConstants.TOPIC_NAME);
// Check if topic exist
if (partitionInfos != null) {
// Pull out partitions, convert to topic partitions
final Collection<TopicPartition> topicPartitions = new ArrayList<>();
for (final PartitionInfo partitionInfo : partitionInfos) {
topicPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
}
// Asign topic partitions to consumer
kafkaConsumer.assign(topicPartitions);
// Poll to initialize offset
kafkaConsumer.poll(Duration.ofMillis(1000));
} else {
LOGGER.info("{} topic does not exist and health check tests will fail", IKafkaConstants.TOPIC_NAME);
}
} catch (Exception e) {
LOGGER.info("Connection to Kafka could not be established");
}
// Return Kafka consumer
return kafkaConsumer;
}
En lo que al método de envío y recepción de los mensajes se refiere, no tiene mayor complicación. Se establece un UUID como clave de mensaje que el consumidor posteriormente utiliza para discriminar los eventos dirigidos a él.
private void verifyChannel(KafkaProducer<byte[], byte[]> producer, KafkaConsumer<byte[], byte[]> consumer) {
// Generate message key
String msgKey = UUID.randomUUID().toString();
// Create message
ProducerRecord<byte[], byte[]> record = new ProducerRecord<byte[], byte[]>(IKafkaConstants.TOPIC_NAME,
msgKey.getBytes(), IKafkaConstants.MSG.getBytes());
// Send message without waiting to fill the buffer
producer.send(record);
producer.flush();
// Message readed flag
boolean msgRecived = false;
// Attemps flag
int attempts = 0;
while (true) {
// Increase attempts
attempts++;
// Get messages or wait the passed timeout if there is none
final ConsumerRecords<byte[], byte[]> consumerRecords = consumer.poll(Duration.ofMillis(200));
// Iterate messages
Iterator<ConsumerRecord<byte[], byte[]>> consumerRecordIterator = consumerRecords.iterator();
while (consumerRecordIterator.hasNext()) {
// Get message
ConsumerRecord<byte[], byte[]> msg = consumerRecordIterator.next();
// Search for the message sent by the key
if (new String(msg.key(), StandardCharsets.UTF_8).equals(msgKey)) {
// Update message readed flag
msgRecived = true;
// Break loop
break;
}
}
// If no message found count is reached to threshold exit loop
if (msgRecived || attempts == IKafkaConstants.MAX_NO_MESSAGE_FOUND_COUNT)
break;
else
continue;
}
// If message not recived, throw error to set health status as DOWN
if (!msgRecived) {
throw new RuntimeException("The channel can not be verified");
}
}
Dado que el consumidor no pertenece a un grupo de consumidores y por tanto, no se almacena el offset de los mensajes consumidos en Kafka, se recomienda encarecidamente establecer un TTL bajo para el tópico en cuestión, de tal forma que cada vez el aplicativo arranque reprocese el menor numero de mensajes posibles. En caso de que el número de instancias o aplicaciones fuera elevado, se recomienda crear multiples topics de health check, agrupados por el criterio que mejor ajuste a cada caso de uso, si bien una organización basada en dominios funcionalidades podría ser un buen punto de comienzo.
Dicho esto, esta solución presenta el handicap de que no se esta trabajando con los productores y consumidores que realmente se utilizan en la aplicación, por lo que podría darse el caso de que alguno de ellos fallara, pero el mecanismo de health check no fuera capaz de detectarlo.
Offset based health check
Esta segunda propuesta se desmarca completamente de todas las descritas anteriormente y opta por analizar el estado de los consumidores reales de la aplicación.
// Check if consumers continues consuming messages
try {
Map<TopicPartition, Long> consumersOffsets consumersOffsets = getConsumersOffsets();
for (Entry<TopicPartition, Long> entry : consumersOffsets.entrySet()) {
TopicPartition key = entry.getKey();
Long actualConsumerOffset = entry.getValue();
Long lastConsumerOffset = lastConsumerOffsets.get(key);
if (actualConsumerOffset.equals(lastConsumerOffset)) {
Long lastTopicOffset = lastTopicOffsets.get(key);
if (lastConsumerOffset < lastTopicOffset) {
throw new HealthException("The consumer is not consuming the new messages", details);
}
}
}
} finally {
// Update last offsets values for next execution
lastConsumerOffsets = consumersOffsets;
lastTopicOffsets = topicsOffsets;
}
En este modelo de solución se comprueba si los consumidores utilizados en la aplicación realmente están funcionando, en base al offset. Es decir, cada vez que se realiza un health check, se compara el si el offset del consumidor ha avanzado respecto a la anterior prueba y si no fuera así, se valida si debería haberlo en base al offset del tópico. Si el offset del tópico sigue creciendo, pero el del consumidor se encuentra estancando, es un claro indicativo de que algo va no va bien.
Esta implementación es la más eficiente de todas, ya que tiene la ventaja de que no requiere de un intercambio de mensajes, al mismo tiempo que valida que el consumidor real de la aplicación funciona correctamente. Ahora bien, si el aplicativo hiciera uso de un productor no se validaría.
Conclusiones
En conclusión, en el mercado se pueden encontrar distintas soluciones para validar el estado de salud de aquellas aplicaciones desplegadas en Kubernetes que hagan uso de Apache Kafka.
Todas tienen sus puntos positivos y negativos, pero si se requiere de una implementación fiable es muy posible que tengas que realizar un desarrollo custom que cumpla con los requisitos descritos a lo largo del articulo.