Elasticsearch: Time based conditional update

Dado que en esta ocasión, el título no deja mucho espacio a la imaginación, la introducción será breve y directa.

En el presente artículo se pretende describir algo tan sencillo como él garantizar que únicamente se actualiza un documento en Elasticsearch, si su timestamp es mayor al del almacenado.

Aunque a priori pueda parecer algo sencillo, pronto descubriréis que puede complicarse más de lo imaginado.

Caso de uso

Se dispone de un cluster vanilla de Kubernetes, con un microservicio Spring Boot encargado de leer mensajes de un tópico de Kafka y proyectarlos en orden sobre un Elasticsearch.

Idempotency

Antes de nada, comentar que es absolutamente imprescindible que todas las operaciones de proyección sean idempotentes. Es decir, el resultado de procesar una solicitud varias veces debe ser exactamente el mismo que procesarla una vez, de lo contrario, no es posible garantizar la coherencia de los datos mediante este mecanismo.

Scripting

Una de las tantas características interesantes de Elasticsearch, es la posibilidad de ejecutar scripts o expresiones personalizadas antes cada operación, que permiten modificar el resultado para añadir, modificar o eliminar campos de esta.

Así, todos los scripts deben respetar el siguiente formato.

 "script": {
    "lang":   "...",  
    "source" | "id": "...", 
    "params": { ... } 
  }
  • Lang: Idioma en el que está escrito el script, por defecto painless.
  • Source: Campo que puede albergar la lógica del script o su identificador .
  • Params: Listado de parámetros de los que hace uso el script.

Dado que esta explicación puede resultar un tanto fría, nada mejor que el siguiente sencillo ejemplo, basado en la documentación oficial del producto, que ilustra su funcionamiento a las mil maravillas.

PUT products/_doc/1?refresh
{
  "cost_price": 100
}

GET products/_search
{
  "script_fields": {
    "sales_price": {
      "script": {
        "lang":   "painless",
        "source": "ctx._source['cost_price'] * params.markup",
        "params": {
          "markup": 1.2
        }
      }
    }
  }
}

Básicamente, cuando se almacena el precio de un producto en Elasticsearch, se refleja el coste real del mismo, mientras que cuando se quiere leer su valor, se le aplica un margen del 20% via script para su venta. Sencillo, ¿no?

Lo interesante de esto es que él calculo se realiza a nivel de base de datos y no a nivel de aplicación, obteniendo así un mejor rendimiento. Además, esto scripts pueden ser almacenados y cacheados en Elasticsearch, con lo que no es necesario especificar la lógica a ejecutar en cada operación, tan solo su identificador.

POST _scripts/products/calculate-sales-price
{
  "script": {
    "lang": "painless",
    "source": "ctx._source['cost_price'] * params.markup",
  }
}

GET products/_search
{
  "script_fields": {
    "sales_price": {
      "script": {
        "id": "calculate-sales-price",
        "params": {
          "markup": 1.2
        }
      }
    }
  }
}

Una vez detallado el sistema de scripting, es hora de meterse en harina.

Single source order guarantee

En este primer caso de uso se leen datos desde un único tópico de Kafka, tanto para insertar como para actualizar su valor, sin que estos hayan sido previamente leídos de Elasticsearch.

Es decir, siempre se ejecuta una operación de tipo upsert, mientras que el script se encarga de verificar que el timestamp del documento a almacenar es superior al almacenado. De lo contrario, simplemente lo ignora.

Para ello, lo primero es almacenar el aclamado script en Elasticsearch.

POST _scripts/projection_order_guarantee
{
  "script": {
    "lang": "painless",
    "source": "if (ctx._source['timestamp'] != null && ctx._source['timestamp'] >= params['timestamp'] ) { ctx.op = 'noop'} else {for (k in params.keySet()){if (!k.equals('ctx')){ctx._source.put(k, params.get(k))}}}"
  }
}

Ahora bien, el script requiere que la operativa cumpla con los siguientes dos requisitos:

  • La operación debe realizarse vía parámetros, por lo que es necesario convertir el documento a insertar en un mapa.
  • El mapa de parámetros debe contener una entrada timestamp.

En el siguiente fragmento de código se muestra un ejemplo de ello.

// Define params from document
Map<String, Object> params = objectMapper.convertValue(document, Map.class);

// Define script
Script script = new Script(ScriptType.STORED, null, "projection_order_guarantee", params);

// Define upsert request
UpdateRequest updateRequest = new UpdateRequest(index, documentId).scriptedUpsert(true).script(script).upsert();

// Execute
restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT);

Multiple source order guarantee

En este segundo caso de uso se leen datos desde múltiples tópicos de Kafka, tanto para insertar como para actualizar su valor, sin que estos hayan sido previamente leídos de Elasticsearch.

Al igual que antes, también se ejecuta una operación de tipo upsert, mientras que el script se encarga de verificar que el timestamp del documento a almacenar es superior al almacenado. De lo contrario, simplemente lo ignora.

La principal diferencia reside en que, al haber múltiples fuentes de datos, no se puede hacer uso de un único timestamp para ejercer las validaciones, sino uno por origen o tópico de Kafka. Esto es debido a que se debe garantizar el orden únicamente para cada origen de datos, de lo contrario, se estarían descartando datos validos.

Una vez aclarado, este es el script a almacenar en Elasticsearch.

POST _scripts/projection_order_guarantee
{
  "script": {
    "lang": "painless",
    "source": "if (ctx._source[params.last_updated_source_name] != null && ctx._source[params.last_updated_source_name] >= params[params.last_updated_source_name] ) { ctx.op = 'noop'} else {for (k in params.keySet()){if (!k.equals('ctx')){ctx._source.put(k, params.get(k))}}}"
  }
}

De nuevo, el script requiere que la operativa cumpla con los siguientes requisitos:

  • La operación debe realizarse vía parámetros, por lo que es necesario convertir el documento a insertar en un mapa.
  • El mapa de parámetros debe contener una entrada timestamp especifica para el origen del dato.
  • El mapa de parámetros debe contener un campo con el nombre del campo que contiene el timestamp, para que el script posteriormente sepa contra qué valor realizar la validación.

En el siguiente fragmento de código se muestra un ejemplo de ello.

// Define params
Map<String, Object> params = objectMapper.convertValue(document, Map.class);

// Add last update field for the source
params.put("last_updated_source_name", source);
params.put(source, timestamp);

// Define script
Script script = new Script(ScriptType.STORED, null, "projection_order_guarantee", params);

// Define upsert request
UpdateRequest updateRequest = new UpdateRequest(index, documentId).scriptedUpsert(true).script(script).upsert();

// Execute
restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT);

Multiple source order guarantee with previous reading

En este último caso de uso se leen datos desde múltiples tópicos de Kafka, tanto para insertar como para actualizar su valor, habiendo sido previamente leídos de Elasticsearch. Es decir, la operación debe seguir siendo idempotente, pero determinadas casuísticas requieren de una consulta previa para su operativa interna. Por ejemplo, un evento que solo se proyecta si el saldo del usuario es mayor a 10.000$.

En esta ocasión, la principal diferencia reside en qué es necesario facilitar el SeqNoPrimaryTerm, un número de secuencia que identifica cada operación realizada en un documento, para garantizar que el dato leído no ha sido modificado durante el transcurso de la operación.

Asi, el script a almacenar en Elasticsearch es el mismo que en el anterior caso de uso:

POST _scripts/projection_order_guarantee
{
  "script": {
    "lang": "painless",
    "source": "if (ctx._source[params.last_updated_source_name] != null && ctx._source[params.last_updated_source_name] >= params[params.last_updated_source_name] ) { ctx.op = 'noop'} else {for (k in params.keySet()){if (!k.equals('ctx')){ctx._source.put(k, params.get(k))}}}"
  }
}

Como no, el script requiere que la operativa cumpla con los siguientes requisitos:

  • La operación debe realizarse vía parámetros, por lo que es necesario convertir el documento a insertar en un mapa.
  • El mapa de parámetros debe contener una entrada timestamp especifica para el origen del dato.
  • El mapa de parámetros debe contener un campo con el nombre del campo que contiene el timestamp, para que el script posteriormente sepa contra qué valor realizar la validación.
  • La operación debe hacer uso de campo SeqNoPrimaryTerm para garantizar que el dato leído no ha sido modificado durante el transcurso de la operación.

En el siguiente fragmento de código se muestra un ejemplo completo que cubre las tres casuísticas.

@Autowired
private RestHighLevelClient restHighLevelClient;

private ObjectMapper objectMapper = new ObjectMapper();

public <T> void projection(String index, T document, String documentId, String source, String timestamp) {
	internalProjection(index, document, documentId, source, timestamp, null);
}

public <T> void projection(String index, T document, String documentId, String source, String timestamp, SeqNoPrimaryTerm seqNoPrimaryTerm) {
	internalProjection(index, document, documentId, source, timestamp, Optional.ofNullable(seqNoPrimaryTerm));
}

private <T> void internalProjection(String index, T document, String documentId, String source, String timestamp,
		Optional<SeqNoPrimaryTerm> seqNoPrimaryTerm) {
	try {

		// Set method
		Supplier<DocWriteResponse> request;
		if (seqNoPrimaryTerm == null) {
			// Upsert with script
			request = () -> {
				try {
					return restHighLevelClient.update(
							new UpdateRequest(index), documentId).scriptedUpsert(true)
									.script(getProjectionOrderGuaranteeScript(document, source, timestamp)).upsert(),
							RequestOptions.DEFAULT);
				} catch (IOException e) {
					throw new RuntimeException(e);
				}
			};
		} else if (seqNoPrimaryTerm.isEmpty()) {
			// Index
			request = () -> {
				try {
					return restHighLevelClient.index(new IndexRequest(index).create(true),
							RequestOptions.DEFAULT);
				} catch (IOException e) {
					throw new RuntimeException(e);
				}
			};
		} else {
			request = () -> {
				// Update with script
				try {
					return restHighLevelClient.update(new UpdateRequest(index, documentId)
							.script(getProjectionOrderGuaranteeScript(document, source, timestamp))
							.setIfPrimaryTerm(seqNoPrimaryTerm.get().getPrimaryTerm())
							.setIfSeqNo(seqNoPrimaryTerm.get().getSequenceNumber()), RequestOptions.DEFAULT);
				} catch (IOException e) {
					throw new RuntimeException(e);
				}
			};
		}

	} catch (Throwable e) {
		logger.error(e.getMessage(), e);
	}
}

private <T> Script getProjectionOrderGuaranteeScript(T document, String source, String timestamp)
		throws JsonProcessingException {
	// Create params for script
	Map<String, Object> params = objectMapper.convertValue(document, Map.class);

	// Add last update field for the source
	params.put("last_updated_source_name", source);
	params.put(source, timestamp);

	// Return the script
	return new Script(ScriptType.STORED, null, "projection_order_guarantee", params);
}

Básicamente, en caso de no hacer una lectura previa de los datos, no se envía el parámetro SeqNoPrimaryTerm y se ejecuta una operación upsert junto al script.

Ahora bien, en caso de enviarlo, pueden darse dos casuísticas distintas; si es nulo quiere decir que el documento no existe y por tanto debe realizarse una inserción; si no es nulo, se ejecuta una operación de actualización, sin opción a inserción, junto al script.

Finalmente, comentar que el parametro SeqNoPrimaryTerm se obtiene declarándolo como un atributo del documento.

@NoArgsConstructor
@Getter
@Setter
@Document(indexName = "customers")
public class Customer {

	@Id
	private String customerId;

	private String firstName;

	private String lastName;

	private SeqNoPrimaryTerm seqNoPrimaryTerm;

}

Conclusiones

En conclusión, para poder garantizar que únicamente se actualiza un documento en Elasticsearch si su timestamp es mayor al del almacenado, es imprescindible tener en mente todas las casuísticas comentadas a lo largo del articulo y no fijarse únicamente en el timestamp.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s