Distributed Databases: Kafka

Si el anterior artículo estaba centrado en desgranar las peculiaridades de Amazon DynamoDB, hoy le llega el turno a Apache Kafka, el que probablemente sea el sistema de mensajería por excelencia de la actualidad y depende a quien a preguntes, una de las bases de datos distribuidas mas interesantes del mercado.

Como ya es costumbre en la casa, en el presente articulo se pretende describir la arquitectura y características de la citada base de datos, no si antes invitaros a leer la primera y segunda parte acerca de los conceptos básicos de las bases de datos distribuidas.

Introduction

Apache Kafka se describe oficialmente cómo una plataforma de streaming distribuida diseñada para publicar, almacenar, procesar y consumir flujos de datos en real time. En otras palabras, se trata de un sistema de mensajería distribuid0, altamente escalable, construido con el objetivo de permitir publicar y consumir grandes volúmenes de datos con baja latencia.

Originalmente fue desarrollado por Linkedin para dar solución, precisamente, al reto de recolección y procesamiento de grandes volúmenes de logs en tiempo real al que se enfrentaba la compañía en sus inicios. Comentar que, cuando se menciona el concepto de logs, no se hace referencia a las trazas de aplicación convencionales que todos conocemos, sino a eventos producidos por las acciones de usuarios en una aplicación (logins, pageviews, clicks, likes, sharing, comments, search queries) o métricas operacionales (call stack, call latency, errors, system metrics).

Por aquel entonces, los sistemas de mensajería no eran distribuidos, es decir, la escalabilidad se limitada únicamente al eje vertical, y la garantía de entrega, ya fuera sobre una o varias colas de forma transaccional, primaba sobre el rendimiento. Para ser mas exactos, en el protocolo JMS el envío de mensajes en batch por parte de los productores no está soportado, por lo que cada mensaje requiere un viaje de ida y vuelta TCP / IP completo, y lo consumidores deben enviar un ACK después de procesar cada evento, penalizando fuertemente el rendimiento.

Esto no encajaba con los requisitos de Linkedin en el que, perder ocasionalmente algunos eventos de páginas vistas, no suponía el fin del mundo, por lo que comenzaron a trabajar en una alternativa propia.

Para su desarrollo, analizaron y tomaron prestados algunos conceptos de los agregadores de logs y sistemas de mensajería mas populares del momento, como Scribe, Data Highway o Flume, unido a una serie elecciones de diseño poco convencionales en aquel entonces. De aquella mezcla surgió Kafka, un sistema altamente escalable y eficiente, que en 2011 sería publicado como software libre y en octubre del siguiente año superaría la etapa de incubación de la fundación Apache.

Como curiosidad, en la actualidad Linkedin se sustenta sobre 100 clusters de Kafka compuestos por mas de 4000 brokers, que dan cabida a 100.000 tópicos repartidos en 7 millones de particiones, llegando a procesar la friolera de 7 trillones de mensajes al día. Definitivamente su capacidad de escalado no admite debate alguno.

Architecture

Al igual que otros tantos productos, Kafka se basa una arquitectura descentralizada peer to peer (P2P) de nodos simétricos, en el que cada uno de ellos tiene exactamente el mismo conjunto de responsabilidades que el resto. Es decir, no hay nodos maestros, lideres, ni coordinadores con funciones adicionales o dedicadas.

Este diseño de arquitectura simplifica el proceso de aprovisionamiento y mantenimiento del sistema, al mismo tiempo que garantiza un escalado lineal del rendimiento. Es decir, si con un nodo la base de datos es capaz de atender 200 peticiones, al añadir un segundo nodo llegara hasta las 400, todo ello sin procesos de actualización que afecte todos los nodos ni tiempos de caída.

Ahora bien, a diferencia de la arquitectura de DynamoDB descrita en la anterior entrega y con el objetivo de facilitar las tareas de coordinación, Kafka se despliega en conjunto con Zookeeper, un servicio de consenso de alta disponibilidad.

Para los que no conozcan Zookeeper, comentar que trabaja de forma muy similar la de un sistema de gestión de ficheros mediante una API que permite: crear un path, establecer el valor de un path, leer el valor de path, eliminar un path y enumerar los elementos secundarios de un path.

En base a estas sencillas operaciones, Kafka hace uso de Zookeeper para las siguientes labores de coordinación:

  1. Detectar cuándo se añaden o se eliminan nuevos brokers y consumidores.
  2. Lanzar un proceso de rebalanceo en los consumidores, cuando algún broker o consumidor es eliminado.
  3. Realizar un seguimiento del offset consumido de cada partición.

En este punto se hace referencia a varios conceptos que aun no han sido descritos, por lo que es el momento idóneo para saltar a la siguiente sección y presentarlos como es debido.

Topics

Kafka trabaja internamente con el concepto de tópicos, que no son mas que agrupaciones de mensajes del mismo tipo. Dada su naturaleza distribuida, los tópicos a su vez se dividen en particiones que se reparten entre los distintos nodos o brokers para permitir el escalado horizontal, una de sus principales señas de identidad.

Así, una partición es una secuencia ordenada e inmutable de registros que se agrega continuamente sobre un commit-log. Con el objetivo de maximizar el rendimiento, este commit-log solo se vuelca a disco una vez se haya alcanzado un número configurable de mensajes o haya transcurrido un cierto período de tiempo. Hasta entonces, se almacena en memoria.

A diferencia de otros sistemas de mensajería, los mensajes en Kafka no tienen un id explícito. En su lugar, a los registros de las particiones se les asigna un identificador secuencial no consecutivo llamado offset, que los identifica de forma única dentro de una partición. Esto evita sobrecargar el cluster con índices auxiliares que mapeen los identificadores de los mensajes a las ubicaciones físicas reales, eso sí, a costa de no poder garantizar el orden de los mensajes a lo largo de múltiples particiones.

Otra de las grandes diferencias respecto a la competencia es que los mensajes no son eliminados de Kafka una vez son consumidos, si no que dispone de un período de retención configurable después de lo cual se descartan para liberar espacio. Esto es francamente potente ya que permite a los consumidores retroceder deliberadamente a un antiguo offset y volver a reprocesar dichos mensajes, como por ejemplo, cuando se emplea como event store.

Si, esta es la característica clave por la que mucha gente clasifica a Kafka como una base de datos distribuida, y no, el hecho de acumular muchos mensajes no penaliza el rendimiento.

Finalmente, en lo que al formato de los mensajes se refiere, los tópicos en Kafka no tienen un schema estricto, por lo que no es obligatorio que todos tengan el mismo numero de campos o que sean del mismo tipo. Ahora bien, el uso de schema registries como el de Confluent con el que definir un formato para los mensajes y almacenar un historial versionado de todos los esquemas, ayuda a mantener un modelo de gobierno férreo.

Consumers and Consumer Groups

Esta inusual organización interna de los datos tiene también implicaciones en cómo los consumidores interactúan con los brokers a la hora de consumir los datos.

Kafka trabaja internamente con el concepto de grupos de consumidores, en el que cada grupo está compuesto por uno o más consumidores, que, valga la redundancia, consumen mensajes de uno mas tópicos. Así, a cada consumidor se le asigna, en exclusiva, un determinado numero de particiones, garantizando que todos los mensajes de una misma partición son consumidos por un único consumidor dentro de cada grupo, sin necesidad de gestionar bloqueos.

Esta distribución es lo que permite escalar de forma horizontal a aquellas aplicaciones que consuman mensajes de Kafka, limitando el numero máximo de instancias al numero de particiones existentes del tópico a consumir.

Finalmente y aunque pueda parecer evidente, aclarar que distintos grupos de consumidores pueden consumir de forma independiente y simultánea un mismo mensaje, sin necesidad de coordinación alguna para ello.

Llegados a este punto es cuando cobran sentido aquellas tareas de coordinación de las que Zookeeper es responsable:

  1. Detectar cuándo se añaden o se eliminan nuevos brokers y consumidores.
  2. Lanzar un proceso de rebalanceo en los consumidores, cuando algún broker o consumidor es eliminado.
  3. Realizar un seguimiento del offset consumido de cada partición.

Entrando en mas detalle, cada vez que un broker o consumidor arranca, almacena dicha información en Zookeeper, lo que le permite mantener un registro con todos los brokers, grupos de consumidores (con sus correspondientes offsets por partición) y consumidores que forman parte del cluster de Kafka.

Gracias a ello, si el estado de un broker o consumidor cambia, Zookeeper puede notificar a todos los consumidores de ello, para lanzar los procesos de rebalanceo correspondientes. Aunque este proceso será detallado en la siguiente sección, simplemente adelantar que consiste en reasignar las particiones sobre las que trabaja cada consumidor, en función del cambio producido.

Finalmente, el hecho de que Zookeeper realice un seguimiento del offset consumido de cada partición, permite que cada vez que un consumidor ingrese en un grupo pueda comenzar a trabajar desde el ultimo mensaje procesado.

Partitioning

Tal y como se mencionaba en la anterior sección, los tópicos de Kafka se dividen en particiones que se reparten a lo largo de los distintos nodos que forman el clusters. La lógica empleada por defecto para determinar el destino de los datos es la siguiente:

  1. Si el mensaje enviado contiene una clave, se aplica el algoritmo de hashing mumur2 de 32 bits sobre la misma y se divide entre el numero de particiones del tópico. De esta forma se garantiza que todos los mensajes con la misma clave (no vacía), se envían a la misma partición por orden de llegada.
  2. Por el contrario, si el mensaje enviado no contiene una clave, se aplica un algoritmo round robin para determinar su ubicación.

Ahora bien y a diferencia de otras bases de datos, las estrategias de particionamiento también tienen su lugar en la parte consumidora, ya que Kafka debe ser capaz de asignar dinámicamente las particiones a cada consumidor, así como de reasignarlas en caso de que alguna broker o consumidor dejara de funcionar.

Así, es necesario configurar a nivel de consumidor la estrategia a emplear entre alguna de las que Kafka provee out of the box para ello: Range, Round Robin y Sticky Assignor. Si, al igual que para los productores es posible diseñar un algoritmo personalizado y por supuesto, todos los consumidores de un mismo grupo deben tener configurada la misma estrategia si no quieren encontrarse con una InconsistentGroupProtocolException.

RangeAssignor

RangeAssignor es la estrategia por defecto y consiste en co-localizar particiones de uno o varios tópicos. Una imagen vale más que mil palabras.

Internamente primero se agrupan los consumidores en orden lexicográfico utilizando el member_id asignado por broker. Luego, se agrupan las particiones de los tópicos disponibles en orden numérico. Finalmente, se asignan las particiones de cada tópico a partir del primer consumidor.

Esto es muy útil, por ejemplo, cuando se necesita que un mismo consumidor procese mensajes de dos tópicos distintos, que tienen el mismo número de particiones, clave, así como la misma estrategia de particionamiento.

Ahora bien, en caso de que no se requiera trabajar con particiones co-localizadas, no se recomienda emplear esta estrategia, ya que puede resultar ineficiente en caso de que el número de particiones de los distintos tópicos fuera muy dispar.

Para ser mas exactos, en casos de borde en los que un único consumidor escuchara de muchas particiones, mientras que el resto lo hiciera de tan solo una o unas pocas particiones, podría acabar desembocando en que estos últimos no recibieran mensajes durante un prologando periodo, quedando en estado idle. Debido a ello, Kafka acabaría eliminando dichos consumidores del grupo, lanzando un proceso de rebalanceo.

RoundRobinAssignor

RoundRobinAssignor es una estrategia en la que las particiones se reparten entre todos los consumidores siguiendo la clásica planificación Round-Robin. De nuevo, una imagen vale más que mil palabras.

Internamente y al igual que en la estrategia RangeAssignor, primero se agrupan los consumidores y particiones en orden lexicográfico y finalmente se asignan de forma ordenada.

Así, su principal virtud es que permite repartir la carga de forma equitativa entre todos los consumidores que componen el grupo, maximizando su eficiencia.

Por contra, si alguno de los consumidores deja de funcionar, no aplica ningún tipo de algoritmo inteligente a la hora de reasignar las particiones en el proceso de rebalanceo, sino que vuelve a calcular que partición debe ser asignada a cada consumidor, haciendo uso del mismo algoritmo. Esto evidentemente afecta al rendimiento de los consumidores.

StickyAssignor

StickyAssignor es una estrategia muy similar a la RoundRobinAssignor, con la diferencia de que si alguno de los consumidores deja de funcionar, trata de minimizar el número de reasignaciones a realizar. Y si, de nuevo, una imagen vale mas que mil palabras.

Dejando de lado las estrategias de particionamiento, otro de los aspectos clave a tener en cuenta en Kafka es el número de particiones por tópico. Por desgracia, no hay un valor por defecto que se adapte a todos los casos de uso, tan solo una serie de directrices a tener en cuenta.

Lo primero de todo es recordar que el número de particiones de un topico define el máximo numero de consumidores de un grupo. Es decir, una partición solo puede ser consumida por un único consumidor dentro de un grupo de consumidores.

Por tanto, el hecho de definir un bajo número de particiones para un tópico puede suponer un cuello de botella en el throughput de los consumidores. Lo mismo ocurre con los brokers, ya que un bajo número de particiones puede hacer que la carga no se reparta de forma equitativa entre todos los brokers, sobrecargando algunos de ellos y en consecuencia, limitando su throughput.

Entonces… ¿la solución pasa por definir un elevado número de particiones por tópico? Tampoco. Por un lado, cuántas mas particiones tiene un tópico, menor es la probabilidad de que los mensajes con la misma clave se almacenen en la misma partición.

Por otro lado, cada productor gestiona un buffer por separado para cada partición del tópico, por tanto, a mayor número de particiones, más buffers debe gestionar, con el consecuente aumento del consumo de memoria y posible penalización en el throughput.

Los brokers tampoco se libran de las consecuencias de gestionar un elevado número de particiones, pero no, no se debe al hecho de tener que mantener un fichero de commit-log por partición. El problema se produce cuando uno de los brokers deja de funcionar y es necesario designar cual es la nueva partición leader entre todas las replicas existentes. A mayor numero particiones, mayor es el tiempo que lleva dicho proceso y a esto hay que añadirle el proceso de rebalanceo en los consumidores.

Sí, no se ha mencionado nada acerca de replicas lideres hasta el momento, es algo que se detalla en la sección de replicación, pero no tiene mayor importancia para la compresión de la explicación.

Entonces… ¿es posible definir un número de particiones inicial y modificarlo en función de las necesidades? A pesar de ser una solución posible, esto puede implicar que los mensajes con la misma clave que antes se ubicaban en una misma partición, puedan comenzar a almacenarse en otra. Esto se debe a que, tal y como se comentaba al inicio de la sección, si el mensaje enviado contiene una clave, se aplica el algoritmo de hashing mumur2 de 32 bits sobre la misma y se divide entre el numero de particiones del tópico. Al modificarse el numero de particiones, el resultado es distinto.

En resumen, la mejor solución pasa por conocer bien las necesidades del caso de uso y realizar pruebas de rendimiento que te ayuden a determinar el número ideal. Al respecto de esto, en el blog de confluence tienen un fantástico articulo que puede servir de ayuda.

Replication

Llega el momento de hablar de la replicación, factor que impacta de lleno en el nivel de consistencia que es capaz de ofrecer el producto.

La replicación en Kafka no es mas que el proceso de mantener un numero configurable de copias de las particiones que conforman un tópico, distribuidas a lo largo de los distintos brokers del cluster, con el propósito de garantizar la disponibilidad en el caso de que alguno de ellos dejara de funcionar.

Ahora bien, con el objetivo de garantizar la consistencia total y una “buena” disponibilidad, una de las réplicas de cada partición se designa como líder mientras que el resto son seguidores. Así, todas las operaciones, tanto de lectura como de escritura, las atiende la partición líder, mientras que el resto mantienen una copia de los datos a modo de backup.

El éxito de este sistema de replicación reside en garantizar que, si alguno de los brokers deja de funcionar, otra de las replicas seguidoras que este actualizada o al día pase a convertirse en la nueva partición líder.

Es por ello que Kafka exige que para que una partición pueda optar a ser líder debe pertenecer al grupo de in-sync replicas (ISR), un subconjunto de replicas que cumpla con las siguientes dos condiciones:

  1. El número de mensajes no replicados no supera el valor establecido en la propiedad replica.lag.max.messages.
  2. El periodo de tiempo sin solicitar la replicación de datos al lider no supera el valor establecido en la propiedad replica.lag.time.max.ms. Esto permite que si en un determinado momento llegan muchos mensajes a la partición líder, superando el valor establecido en la propiedad replica.lag.max.messages, el resto de replicas tengan un margen de tiempo para ponerse al día antes de dejar de pertenecer al grupo ISR.

Dicho esto, el líder realiza un seguimiento continuo de las replicas en ISR y si alguna de ellas muere o se queda atrás, la elimina del grupo. Y sí, para cada partición se almacena en Zookeeper tanto el líder actual y como el conjunto de las replicas en ISR.

En lo que al proceso de replicación se refiere, todo comienza con el productor buscando en Zookeeper cual es el broker en donde se ubica la partición líder, ya que es la única que pueda procesar las peticiones. Una vez que el líder recibe el mensaje, lo almacena en su commit-log personal, mientras que el resto de replicas le preguntan periódicamente sobre nuevas actualizaciones para mantenerse sincronizadas.

Cada vez que las replicas reciben un mensaje, lo escriben en su propio commit-log y envían una confirmación al lider. Finalmente, una vez que el líder recibe el ack por parte de las réplicas en ISR, el mensaje pasa al estado commited. Un ultimo apunte al respecto y es que, como era de esperar, el orden de los mensajes también se garantiza entre las distintas replicas, es decir, que reciben todos los registros en el mismo orden en que están escritos en la partición líder.

Dicho esto, Kafka permite configurar mediante la propiedad acks cuando la operación del productor se considera completada para devolverle la respuesta al cliente.

  1. Si se configura como acks=0 significa que el productor no espera ninguna respuesta por parte del broker, un fire and forget en toda regla. El mensaje podría perderse si el broker esta caído en ese momento.
  2. Si se configura como acks=1 significa que el productor recibe una respuesta por parte del broker, una vez que la replica líder ha recibido los datos y los ha almacenado en su commit-log personal. Como no se espera a que las particiones seguidoras confirmen que también han recibido el mensaje, este podría perderse si el broker que contiene la partición líder se cae en ese preciso instante.
  3. Si se configura como acks=1 significa que el productor recibe una respuesta por parte del broker, cuando todas las replicas en ISR han recibido y almacenado el mensaje, garantizando así una consistencia total. Es la opción configurada por defecto.

En estos momentos os estaréis preguntado que sucede si el broker que contiene la replica primaria deja de funcionar y no hay ninguna replica en ISR disponible. Bajo este escenario se abren dos posibilidades, muy dependientes de los requisitos del caso uso al que te enfrentas:

  1. Esperar hasta que una replica ISR vuelva a la vida y elegir esta réplica como líder, con la consecuente perdida de servicio.
  2. Escoger una replica no ISR cómo líder, con la consecuente perdida de los datos.

Consistency

Tal y como se detallaba en la anterior sección, tanto las peticiones de lectura como las de escritura son atendidas siempre por la partición líder, lo que garantiza una consistencia total en escenarios de normalidad. Es decir, cada lectura devuelve siempre la escritura más reciente o un error.

Por contra, la disponibilidad en términos del teorema de CAP se ve afectada ya que no todos los nodos disponibles devuelven una respuesta para todas las solicitudes de lectura y escritura en un período de tiempo razonable. En definitiva, se trata de un sistema CP (Consistent and Partition Tolerant).

Ahora bien, ¿que ocurre cuando un broker deja de funcionar? Si se quiere garantizar una consistencia total de los datos, será necesario configurar la propiedad acks=all para garantizar que cada mensaje ha sido almacenado en todas las replicas ISR.

De igual manera y suponiendo un cluster compuesto por 3 nodos, el mínimo indispensable en todos los sistemas distribuidos para cumplir con la regla (N/2)+1, se recomienda configurar un mínimo de 3 replicas ISR (incluyendo la partición líder) para garantizar la consistencia total incluso cuando dos de los tres nodos fallan.

Ahora bien, tal y como describen en el blog de localz-engineering, el tiempo de respuesta se ve directamente impactado en función del nivel de consistencia escogido. De nuevo, serán los requisitos de vuestro caso de uso quienes dictaminen la opción a seleccionar.

Fuente Original

Kafka as distributed database

Hasta ahora se ha descrito Kafka en términos objetivos, pero bajo un prisma de un sistema de mensajería, no como una base de datos distribuida. ¿Por qué entonces el título indica lo contrario?

Básicamente, porque al contrario de otros sistemas de mensajería, se trata de un producto distribuido y escalable, en el que los mensajes no se eliminan una vez son consumidos y pueden residir de forma indefinida sin impactar en el rendimiento.

Por tanto, un consumidor puede reprocesar todos los mensajes de un tópico y a partir de ellos regenerar el estado del negocio, ya sea en memoria o en otra base de datos. Es decir, actuar a modo de event-store, muy típico en arquitecturas CQRS.

Esto, unido a la capacidad de procesar streams en tiempo real y no solo un mensaje cada vez, lo convierte en una opción a considerar como base de datos en determinados casos de uso, cómo el event-store mencionado previamente.

Conclusiones

En conclusión, Kafka es una excelente sistema de mensajería distribuida que en determinadas situaciones también puede actuar a modo de base de datos.

Sus fortalezas son claras: escalabilidad horizontal, consistencia, bajo tiempo de respuesta, durabilidad de los mensajes y el sistema de grupos de consumidores. Por contra, requiere conocer bien el producto y configurarlo adecuadamente en función del caso de uso, si no se quiere que se vuelva en tu contra.

Referencias

Se recomienda encarecidamente leer los siguientes artículos que han servido de base para el escrito:

  1. https://blog.softwaremill.com/7-mistakes-when-using-apache-kafka-44358cd9cd6
  2. http://cloudurable.com/blog/kafka-tutorial-kafka-producer-advanced-java-examples/index.html
  3. https://www.confluent.io/blog/how-choose-number-topics-partitions-kafka-cluster/
  4. https://www.confluent.io/blog/okay-store-data-apache-kafka/
  5. https://docs.confluent.io/current/clients/producer.html
  6. https://www.confluent.io/blog/hands-free-kafka-replication-a-lesson-in-operational-simplicity/
  7. https://engineering.linkedin.com/blog/2019/apache-kafka-trillion-messages
  8. https://es.wikipedia.org/wiki/Apache_Kafka
  9. https://kafka.apache.org/documentation/
  10. https://medium.com/@_amanarora/replication-in-kafka-58b39e91b64e
  11. https://medium.com/localz-engineering/localz-meets-kafka-message-durability-and-latency-1bbee35fdf86
  12. https://medium.com/streamthoughts/understanding-kafka-partition-assignment-strategies-and-how-to-write-your-own-custom-assignor-ebeda1fc06f3
  13. https://medium.com/@anyili0928/what-i-have-learned-from-kafka-partition-assignment-strategy-799fdf15d3ab
  14. https://www.microsoft.com/en-us/research/wp-content/uploads/2017/09/Kafka.pdf
  15. https://wuciawe.github.io/kafka/2017/03/09/notes-on-kafka-replication.html

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s