Distributed Databases: Apache Pulsar

Hará algo más de un año desde que en este mismo blog se publicará un artículo teórico en el que se desgranaban los conceptos troncales de Apache Kafka, el popular sistema de mensajería creado por LinkedIn. Por tanto, y para celebrar el aniversario, nada mejor que hacer lo propio con uno de sus principales competidores, él no tan desconocido Apache Pulsar. ¿Son tan parecidos como se cree? ¿Cuál de los es el caballo ganador? Para conocer la respuesta a la primera cuestión, toma una buena taza de café, acomódate y disfruta, pero si tu idea es encontrar el cara a cara definitivo entre ambos productos, estás en el post equivocado.

Como ya es costumbre en la casa, en el presente artículo se pretende describir la arquitectura y características de la citada base de datos, no sin antes invitaros a leer la primera y segunda parte acerca de los conceptos básicos de las bases de datos distribuidas.

Introduction

Pulsar se define oficialmente como una plataforma cloud native de mensajería distribuida. Una descripción breve, clara y precisa, que, en esencia, es muy similar a la que se utilizó para detallar Kafka en su día. Parafraseando aquel artículo, se trata de un sistema de mensajería distribuido, altamente escalable, construido con el objetivo de permitir publicar y consumir grandes volúmenes de datos con baja latencia. Ahora bien, pronto descubriréis cómo cada uno traza su propio camino para alcanzar dichos objetivos, de una forma no tan similar como cabria esperar.

Comencemos con un poco de historia. Pulsar nace allá por 2013 de la mano de Yahoo! para satisfacer una serie de requisitos tecnológicos que su negocio precisaba por aquel entonces y a los que Kafka no era capaz de dar respuesta. En concreto, el gigante de Sunnyvale necesitaba un sistema de mensajería multi-tenant, geo-replicado, altamente escalable, capaz de albergar más de 1 millón de tópicos de forma duradera y sin renunciar por ello a la baja latencia. A estas alturas os estaréis preguntando en qué apartados flaquea el archiconocido producto de LinkedIn…

Bien, el primer escollo se encuentra en su estrategia de escalado, concretamente, en cómo organiza internamente los datos. Kafka almacena los mensajes a nivel de broker, lo que implica que cada vez se añade una nueva instancia, se deben replicar y reasignar algunas de las particiones, lo que puede llevar mas o menos tiempo en función del sistema. Evidentemente, este rebalanceo puede tener un impacto mayor o menor en el rendimiento de los consumidores, en función de la estrategia de asignación empleada.

Tampoco ayuda el hecho de que modificar el número de particiones de las que se compone un tópico, pueda alterar el orden de los mensajes. Esto obliga a planificar meticulosamente el número de tópicos, particiones y replicas antes de salir a producción, al menos si se quieren evitar problemas con el escalado, pero puede ser complicado si no se disponen las volumétricas adecuadas o el tráfico recibido es sustancialmente mayor al inicialmente esperado.

A esto se le añade el hecho de que desde el propio blog de Apache recomiendan un máximo de 4,000 particiones por broker and y 200.000 por cluster desde la versión 1.1.0 liberada en noviembre del 2019, lejos del millón tópicos requeridos por Yahoo! en 2013.

Finalmente, Kafka no dispone de soporte nativo para la geo-replicación, lo que obliga a hacer uso de herramientas externas como MirrorMaker, o multi-tenancy con aislamiento total, para lo que se utilizan políticas de autorización a nivel de tópico.

Todo esto llevo a Yahoo! a construir su propio producto, que en 2016 donaría a la fundación Apache y con el que en la actualidad procesan mas de 100 mil millones de mensajes al día, a lo largo de 200 millones de tópicos. Otros gigantes como Tencent, Verizon MediaIterableNutanix, o Overstock.com, también han incorporado Pulsar a su stack tecnológico, como se puede observar en los enlaces proporcionados.

Architecture

Tal y como se mencionaba anteriormente, Pulsar es un producto construido para funcionar de forma nativa en modo multi-región, por lo tanto, se entiende por instancia, un grupo compuesto por uno más clusters de Pulsar.

Así, la principal diferencia respecto a Kafka y piedra de angular del proyecto es que emplea una arquitectura multi-capa con la que desacopla la capa encargada de servir los mensajes, de la capa de almacenamiento de los mismos, permitiendo una escalado independiente, a la par de elástico, de cada uno de ellos. Como todo pro tiene su contra, esta decisión, en teoría, también dificulta el proceso de aprovisionamiento y mantenimiento del sistema, del mismo modo que aumenta el trafico de red, si bien en entornos cloud esto último ya no supone tanto problema.

A alto nivel, Pulsar se compone de 3 componentes principales:

  • Uno o más brokers stateless encargados de gestionar los mensajes entrantes de los productores, almacenarlo en instancias de BookKeeper y enviarlos a los consumidores, en base a tópicos.
  • Un clúster de BookKeeper compuesto por uno más nodos (bookies) encargados de gestionar el almacenamiento persistente de los mensajes.
  • Un clúster de ZooKeeper encargado de gestionar las tareas de coordinación, almacenamiento de metadatos y configuración de los clusters de Pulsar.

En este punto se hace referencia a varios conceptos que aun no han sido descritos, por lo que es el momento idóneo para saltar a la siguiente sección y presentarlos como es debido.

Topics

Pulsar trabaja internamente con el concepto de tópicos, que no son mas que canales para el envío de mensajes, normalmente del mismo tipo, desde los productores a los consumidores.

Los tópicos se agrupan en namespaces, un mecanismo de agrupación lógico para administrar aquellos tópicos que puedan estar relacionados. Esto engloba políticas para la gestión del control de acceso, replicación, geo-replicación entre clusters o expiración de los mensajes, entre otros. Así, una namespace puede contener infinitos tópicos, pero un tópico solo puede pertenecer a un único namespace.

A su vez, un namespace pertenece a un único tenant, que a su vez puede tener uno o varios namespaces. Se entiende por tenant una agrupación lógica que puede representar, por ejemplo, a una unidad de negocio, producto o línea, con el que gestionar de forma unificada políticas de autenticación, autorización, schema, cuota de almacenamiento, o TTL de los mensajes, entre otros.

Ahora bien, a diferencia de Kafka, los tópicos, por defecto, no se componen de un número fijo de particiones, sino que se dividen en un conjunto de segmentos, que se crean en función de un límite de tamaño o tiempo preconfigurado, distribuidos y replicados entre los distintos Bookies.

Por tanto, los mensajes de un tópico no están vinculados a un nodo de almacenamiento en concreto, lo que permite reemplazar o escalar el número de Bookies de forma instantánea, sin perdida de servicio o rebalanceo de por medio. Además, él nodo más pequeño o lento del clúster no hace de cuello de botella al resto. Recordar que, en Kafka, una partición se almacena como un único dato continuo en el broker líder, que posteriormente es replicado en un número preconfigurado de nodos, lo que implica que el tamaño máximo de la partición esta limitado por el espacio en disco de los nodos de réplica.

Así, un tópico solo puede ser gestionado por un único broker, por lo que Pulsar también maneja el concepto de tópicos particionados, el cual internamente se implementa como N tópicos internos, donde N es el número de particiones. De esta forma, se evitan posibles cuellos de botella y se maximiza el thoughtput, si bien es necesario definir el número de particiones en su creación.

Internamente, un tópico no es mas que una secuencia ordenada e inmutable de registros que se agrega continuamente sobre un write-ahead log (WAL) distribuido gestionado por Apache BookKeeper. Ahora bien, con el objetivo de maximizar el rendimiento, cada broker dispone de una cache en la que almacenar los mensajes, de tal manera que solo leerá los datos de los Bookies cuando esta se vea desbordada. El productor no recibirá confirmación alguna hasta que el mensaje haya sido confirmado en ambos soportes, evitando así posibles perdidas de datos.

A diferencia de otros sistemas de mensajería, los mensajes en Pulsar no tienen un id explícito. En su lugar, a los registros se les asigna un identificador secuencial no consecutivo, que los identifica de forma única dentro de un tópico. Esto evita sobrecargar el cluster con índices auxiliares que mapeen los identificadores de los mensajes a las ubicaciones físicas reales, eso sí, a costa de no poder garantizar el orden de los mensajes a lo largo de múltiples particiones en los tópicos particionados.

Pulsar utiliza el término Cursor para describir el seguimiento de offsets, pero de una forma mas avanzada a la vista en Kafka, permitiendo también un ack selectivo, con en el que confirmar mensajes individuales y dejar mensajes anteriores del tópico sin confirmar.

Por defecto, un mensaje solo se retiene o persiste hasta que todos los consumidores lo hayan procesado, si bien este comportamiento puede ser modificado mediante políticas de retención basados en tamaño o tiempo de vida, entre otros, de mismo modo que en Kafka. Esto es francamente potente ya que permite a los consumidores retroceder deliberadamente a un antiguo offset y volver a reprocesar dichos mensajes, como, por ejemplo, cuando se emplea como event store.

Sí, esta es la característica clave por la se puede clasificar a Pulsar como una base de datos distribuida, y no, el hecho de acumular muchos mensajes no penaliza el rendimiento.

A modo de curiosidad, señalar que también es posible configurar tópicos sin persistencia en disco, en los que únicamente se almacenan lo mensajes temporalmente en la cache descrita previamente y se envían directamente a los consumidores. Este modelo es ideal si se busca minimizar los tiempos de respuesta, siempre y cuando el caso de uso contemple la perdida de mensajes, ya que, si un consumidor no se encuentra disponible, el broker no podrá entregar el mensaje y, por tanto, nunca podrá volver a recibirlo.

Finalmente, en lo que al formato de los mensajes se refiere, los tópicos en Pulsar no tienen un schema estricto por defecto, por lo que no es obligatorio que todos tengan el mismo numero de campos o que estos sean del mismo tipo. Ahora bien, si que proporciona de caja un schema-registry con el que definir un formato para los mensajes y almacenar un historial versionado de todos los esquemas, lo que ayuda a mantener un modelo de gobierno férreo.

Consumers and Subscriptions

Un consumidor es un proceso que se suscribe a un tópico mediante una suscripción, valga la redundancia, para poder recibir mensajes del cluster. Si, la sentencia esta correctamente redactada, ya que, a diferencia de Kafka, en el que un consumidor hace un poll sobre el broker para recuperar mensajes, en Pulsar el consumidor envía una solicitud de permiso de flujo al broker, para que este pueda enviarle mensajes sobre una cola interna que actua a modo de buffer, que por defecto puede llegar a almacenar hasta 1000 mensajes.

Así, cada vez que se invoca un “consumer.receive()“, se retira un mensaje del buffer. Una vez que la aplicación ha consumidor la mitad de los mensajes de la cola interna, el consumidor notifica al broker que puede enviarle más mensajes, con la misma solicitud descrita anteriormente. Es decir, dado que por defecto el tamaño de la cola interna es de 1000 mensajes, cuando el consumidor consuma 500 mensajes, le enviará una notificación al broker para solicitar otros 500 mensajes más.

A pesar de que el modelo de trabajo entre ambos productos sea distinto, Pulsar provee un adaptador para facilitar la migración de aquellos aplicativos Java que estuvieran haciendo uso de Kafka.

A diferencia de Kafka, en Pulsar no existen grupos de consumidores, sino que se maneja el concepto de suscripciones, que no es más que la modalidad en la que los mensajes son enviados a los consumidores, siendo posible configurar su modelo de distribución entre 4 modalidades.

Exclusive

En el modo exclusivo, el modelo de suscripción por defecto en Pulsar, solamente un consumidor puede formar parte de la suscripción. Por lo tanto, si múltiples consumidores tratan de suscribirse a un tópico con la misma suscripción, se produce un error. Evidentemente, un tópico puede tener varias suscripciones adjuntas.

Failover

En el modo failover, múltiples consumidores pueden formar parte de una suscripción, si bien uno de ellos es seleccionado como el consumidor maestro del tópico y, por tanto, el único que recibe mensajes. Si el consumidor maestro dejara de funcionar o perdiera la conexión, todos los mensajes (no confirmados y posteriores) se entregarían al siguiente consumidor activo.

Fuente Original

Shared

En el modo shared, múltiples consumidores pueden formar parte de una suscripción y los mensajes se entregan en una distribución por turnos Round-robin a los consumidores, asegurando que cada mensaje es procesado por un único consumidor. Si un consumidor dejara de funcionar o perdiera la conexión, todos los mensajes que le hubieran sido enviados, pero no hubieran sido confirmados, serian redistribuidos entre el resto de los consumidores. Sobra decir este modelo no garantiza el orden de entrega de los mensajes.

Fuente Original

Key_Shared

En el modo shared, múltiples consumidores pueden formar parte de una suscripción y los mensajes se entregan a los consumidores en base a su clave o clave de ordenación. Es decir, todos los mensajes con la misma clave o clave de ordenación son consumidos siempre por el mismo consumidor, obteniendo un comportamiento muy similar al que provee la estrategia de RangeAssignor de Kafka. Si un consumidor dejara de funcionar o perdiera la conexión, todos los mensajes que le hubieran sido enviados, pero no hubieran sido confirmados, serian redistribuidos entre el resto de los consumidores. Ese modelo sí que garantiza el orden de entrega de los mensajes para una misma clave o clave de ordenación.

Fuente Original

Partitioning

Tal y como se detallaba en la sección previa, los tópicos de Pulsar se dividen internamente en segmentos que se reparten a lo largo de los distintos bookies, garantizando el orden de los mensajes en un tópico, así como la escalabilidad de la solución. Estos segmentos se crean o rotan en función de un límite de tamaño o tiempo preconfigurado, lo que permite distribuir uniformemente el tráfico y evitar puntos calientes en el clúster.

En lo que a los tópicos particionados se refiere, los cuales internamente se implementa como N tópicos internos, donde N es el número de particiones, se debe especificar el modo de enrutamiento a nivel de productor para determinar en qué partición publicar cada mensaje.

  • RoundRobinPartition: El productor publica los mensajes en todas las particiones de forma rotatoria, siempre y cuando no se especifique una clave.
  • SinglePartition: El productor publica los mensajes en una misma partición seleccionada al azar, siempre y cuando no se especifique una clave.
  • Hash partitioning: Independientemente de la estrategia, si el mensaje enviado contiene una clave se aplica el algoritmo de hashing JavaStringHash sobre la misma, si bien también es posible utilizar un mumur3 de 32 bits, y se divide entre el numero de particiones del tópico. De esta forma se garantiza que todos los mensajes con la misma clave (no vacía), se envían a la misma partición por orden de llegada.
  • CustomPartition: El producto publica los mensajes en las particiones en base a un algoritmo personalizado.

Replication

Llega el momento de hablar de la replicación, factor que impacta de lleno en el nivel de disponibilidad y consistencia que es capaz de ofrecer el producto.

Pero antes, un pequeño repaso de 30 segundos acerca de los conceptos básicos previamente descritos. Lo primero de todo, Pulsar opta por una arquitectura multi-capa con la que desacoplar la capa encargada de servir los mensajes (brokers), de la capa de almacenamiento de estos (bookies), por lo que la gestión de la replicación recae sobre esta última. Así, los mensajes se almacenan en tópicos que se dividen internamente en segmentos, que a su vez se almacenan como ledgers entre los distintos bookies, instancias de Apache BookKeeper, que conforman el cluster encargado de gobernar la persistencia.

Se podría decir que un ledger es una aglutinación inmutable de un grupo de mensajes de un tópico concreto. Los ledger se rotan en función de un tiempo de vida o tamaño máximo, si bien no es posible eliminarlos hasta que todos los mensajes de su interior están listos para ello, lo cual depende de sí han sido consumidos o de la política de retención aplicada.

Finalmente, los ledgers se dividen en fragmentos, la unidad de trabajo mas pequeña con la que trabaja Apache BookKeeper y la que en ultima instancia se replica y distribuye entre los distintos Bookies. Zookeeper se encarga de almacenar los metadatos que relacionan los tópicos, ledgers, fragmentos y Bookies.

Así, la configuración de escritura y replicación de los ledgers y fragmentos se gestiona mediante los siguientes tres parámetros:

  • Ensemble Size (E): Determina el número de bookies disponibles para un ledger. Por defecto 2.
  • Write Quorum Size (Qw): Determina el número de bookies sobre los que Pulsar debe escribir un dato, o lo que es lo mismo, el numero de replicas. Por defecto 2.
  • Ack Quorum Size (Qa): Determina el numero de Bookies que deben confirmar la escritura antes de devolver la respuesta al productor. Por defecto 2.

Es decir, por defecto se almacenan dos copias de todos los mensajes distribuidos entre dos Bookies y no se devuelve la respuesta al productor hasta que se hayan confirmado ambas escrituras. Ahora bien, es posible sobrescribir estos parámetros por defecto mediante políticas a nivel de namespaces.

$ bin/pulsar-admin namespaces set-persistence --bookkeeper-ack-quorum 3 --bookkeeper-ensemble 5 --bookkeeper-write-quorum 3 my-tenant/my-namespace

En este caso, se almacenan tres copias de todos los mensajes, pero ahora se distribuyen entre 5 Bookies (Round-robin), de tal forma que cada uno de ellos tiene un subconjunto de 3/5 mensajes. Esto permite distribuir la carga entre mas instancias y no sobrecargar dos en concreto. Así, no se devuelve la respuesta al productor hasta que se hayan confirmado las escrituras en todas las replicas.

Geo-Replication

Antes de abordar de lleno la gestión de la consistencia, permitidme romper, por una vez, el habitual esquema preestablecido en los artículos dedicados a las bases de datos distribuidas, y comentar un aspecto tan relevante como la geo-replicación, más aun cuando se trata de características que Pulsar incorpora de caja.

Lo primero de todo, y aunque pueda quedar ligeramente fuera del ámbito del escrito, señalar que la geo-replicación cumple dos funciones principales. La primera y la mas evidente, la posibilidad de desplegar el sistema de forma activa en múltiples regiones, lo que a la postre permitir reducir el tiempo de respuesta de las peticiones al poder redirigirlas al cluster más cercano, en lugar de a un único punto centralizado. La segunda, la posibilidad de replicar los datos en un cluster secundario ubicado en una región pasiva, para que en caso de producirse un desastre que dejara KO la región activa, disponer de los datos ya replicados con los que minimizar tanto el RTO (Recovery Time Objective) como el RPO (Recovery Point Objective).

Dicho esto, la geo-replicación puede funcionar tanto de forma síncrona como de forma asíncrona, cada una con sus pros y sus contras, siendo ambos modelos compatibles por Pulsar.

En el modelo síncrono, se emplea una estrategia de escritura basada en quorum, por la que no se devuelve la respuesta al productor hasta que la mayoría de los clusters hayan confirmado la escritura. Esta modalidad es idónea cuando se quiere emplear un modelo de despliegue activo-activo con el que dar servicio de forma simultánea desde múltiples regiones, o en su defecto, cuando se requiere minimizar a casi a cero tanto el RTO como el RPO previamente descritos, como medida de disaster recovery. Por contra, el tiempo de respuesta se incrementa ostensiblemente, al tener que esperar a qué más de la mitad de clusters hayan replicado el cambio. Para que esto sea posible, Zookeeper debe estar instalado como un único cluster multi-región.

En el modelo asíncrono, se devuelve la respuesta al productor tan pronto como el registro haya sido persistido en el cluster local. Esta modalidad es idónea si se garantizar un bajo tiempo de respuesta a la par que se replican los datos en near-real-time en otras regiones, pero no garantiza un RTO y RPO cercano a cero en caso de desastre. Este modelo no requiere que Zookeeper sea instalado como un único cluster multi-región.

Fuente Original

Finalmente, señalar que la replicación se configura a nivel de namespaces, siempre y cuando haya configurado un tenant con acceso a ambos clusters, lo que a la postre permite seleccionar que tópicos deben ser geo-replicados.

Consistency

Al igual que en Kafka, por defecto tanto las peticiones de lectura como las de escritura son atendidas siempre por un Bookie designado como leader, lo que garantiza una consistencia fuerte, repeatable read en concreto, en escenarios de normalidad. Es decir, cada lectura devuelve siempre la escritura más reciente o un error.

Por contra, la disponibilidad en términos del teorema de CAP se ve afectada ya que no todos los nodos disponibles devuelven una respuesta para todas las solicitudes de lectura y escritura en un período de tiempo razonable. En definitiva, se trata de un sistema CP (Consistent and Partition Tolerant).

Ahora bien, para mejorar el rendimiento del sistema en determinados escenarios que no requieran de consistencia, Pulsar permite leer de aquellos bookies no lideres, que no tienen porque haber replicado aun el ultimo cambio, por lo que se podría decir que también puede funcionar en modo AP (Available and Partition Tolerant).

Entonces, si se establecen propiedades Ensemble Size, Write Quorum Size y Ack Quorum Size con el mismo valor y se permiten lecturas desde cualquier Bookie… ¿Lo convierte esto en un sistema que cumple con las tres propiedades del teorema de CAP? No, ya que la propiedad Availability se vería afectada al no poder dar respuesta para todas las solicitudes de lectura y escritura en un período de tiempo razonable.

Conclusiones

En conclusión, Pulsar es un excelente sistema de mensajería distribuida que aboga por una arquitectura multi-capa con la que desacopla la capa encargada de servir los mensajes, de la capa de almacenamiento de estos, permitiendo una escalado independiente, a la par de elástico, de cada uno de ellos, sin perdida de servicio o rebalanceo de por medio. Esta decisión también tiene su contra, ya que dificulta el proceso de aprovisionamiento y mantenimiento del sistema, del mismo modo que aumenta el trafico de red.

Con todo ello, solo queda esperar que en los próximos años la comunidad a su alrededor crezca y su presencia sea más habitual entre las grandes organizaciones.

Referencias

Se recomienda encarecidamente leer los siguientes artículos que han servido de base para el escrito:

  1. https://anuradhaneo.medium.com/kafka-is-not-the-best-anymore-meet-pulsar-9eb435c9fc0b
  2. https://jack-vanlightly.com/blog/2018/10/2/understanding-how-apache-pulsar-works
  3. https://pulsar.apache.org/
  4. https://stackoverflow.com/questions/55147356/topic-replication-in-apache-pulsar
  5. https://streamnative.io/en/blog/tech/2020-07-08-pulsar-vs-kafka-part-1
  6. https://streamnative.io/en/blog/tech/2021-01-14-pulsar-architecture-performance-tuning
  7. https://www.slideshare.net/ydn/october-2016-hug-pulsar-a-highly-scalable-low-latency-pubsub-messaging-system
  8. https://www.splunk.com/en_us/blog/it/cursors-in-apache-pulsar.html
  9. https://www.splunk.com/en_us/blog/it/geo-replication-in-apache-pulsar-part-1-concepts-and-features.html

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s