Reintroduce support for clustered applications

Description

This is delayed until 6.1 on purpose.

See for early experimentation.

There are multiple ways to introduce clustering, but we discussed two main approaches so far:

  1. As in Search 5, clustering at the backend level: process "entity changes" locally, then send "document change" events to a message queue, to be processed by other nodes, each nodes having an assigned index or shard that only this node will handle.

  2. Clustering at the mapper level: directly send "entity changes" events to the message queue, to be processed by other nodes, each nodes having an assigned entity or shard that only this node will handle.

The second approach is probably what we will end up doing, but I'm mentioning both for the sake of completeness.

Messaging technology

A few technologies have been mentioned, but using Kafka would give us a great starting point to add Debezium integration (see ), since Debezium uses Kafka.

Approach 1: document change events

See Search 5

Previous discussion (mostly obsolete, and not very clear: it's mostly notes...):

Which users?

  • Infinispan, mainly

  • WF users, using WF clustering APIs?

  • Other users, with a JGroups or JMS implementation?

We might want to keep this to a minimum, given we’ve been having a hard time keeping up in Search 5… Personally (yrodiere) I’d rather improve support for Elasticsearch, or even add a Solr backend supporting clustered mode, than add another clustering implementation for Lucene.
Let’s keep things simple, addressing as many users as possible while keeping the effort manageable:

  • Users who don’t want clustering can use the Lucene or Elasticsearch backend, whichever suits them best.

  • Users who want clustering can use the Elasticsearch backend, or can even use Infinispan directly.

  • Integrators who want Hibernate Search with clustering (such as Infinispan) can implement their own clustering module (they will anyway).

Requirements:

  • We need to implement clustering for write operations only:

  • Initialization logic

  • stream/non-stream worker

  • Search queries, from what I know, should still execute locally. One doubt though: I know HSQuery is currently serializable, but I don’t know why.

  • We need to be able to switch a backend from read-only mode to read-write (to handle master changes)

Implementation thoughts:

  • We’ll need a way to transform the state of a IndexManagerBuilder to and from a Serializable value, so as to be able to initialize the state of an IndexManager on Master from the state of an IndexManager on a slave

    • Note that parts of the model may not be serializable. For instance the DateTimeFormatter which is part of LocalDate field models on Elasticsearch is not serializable. We have two solutions for that:

      • Either we constrain the model so that it’s fully serializable (for instance only allow users to specify DateTimeFormatters using string patterns, and then we can simply serialize this string and rebuild the formatter remotely), and also constrain to-be-encoded field values so that they must be fully serializable (make GeoPoint extend Serializable, in particular). But then we start to lose flexibility.

      • OR we create a master model that is more low-level than the original one, and assume that part of the encoding will be done on the client side, not on the master. Then the values we will transfer to the master when indexing will be low-level representations of documents.

  • We’ll need to handle the case where multiple slaves try to initialize the same index on master. Either trigger an error, or do validation instead of initialization.

  • We’ll need a way to transform the state of a DocumentState to and from a Serializable value, so as to be able to delegate write operations to the master.

    • If we chose to only have a low-level version on the model on the master (see above), this needs to be implemented inside the backend, because part of document building can only be done on slaves with extensive knowledge of the mapping. For instance, we cannot transfer a DateTimeFormatter over the network (it’s not serializable), so date formatting must be done on the client side.

      • This could very simply be implemented by making the delegate backend provide instances of a “DocumentSerializer” class, which would be usable with its IndexFieldReferences and would just build a serializable version of the document. Then, on the master, we could pass this serialized document to the “low-level” index manager.

  • Need a way for messages sent from slaves to master to be detected as “obsolete” while we are hot-updating the schema. Also, need a way to actually create a new index and populate it in such case.
    Some system where the user affects a global “version number” to his application (and optionally we compute hashes for mappings/indexes) could help doing both (detecting obsolete indexing requests and obsolete schemas). See

    .

The main problem with this solution is concurrency control: if nodes can send "document change" events, how do we handle two nodes sending conflicting events: one node wants to set document X to a certain value, and another node wants to set it to another value?

We discussed a lot of solutions, none of them being really satisfying:

  • Elasticsearch + ES optimistic concurrency control (versioning)

  • What to use for version number?

  • Optimistic locking in DB, meaning “version” column in each table? Would not work with @IndexedEmbedded.
    DB sequences? Won’t work with overlapping transactions: the relative order of the two sequences will be meaningless, and the transactions may not fail because the changes happen in different embedded objects.

  • Flake IDs? Not comparable across nodes, not even numbers. Also, same problem as above with DB sequences.

  • Optimistic locking in DB, plus we force version increments on the root entity when an embedded entity is changed. Then overlapping transactions for a given document would just… fail. And would not be a problem for us anymore. But it’s a bit hard to sell to users.

  • Use sharding to make sure that changes related to a single document will only ever be processed by a single node, in the order they are received. Problem: we may not receive the events in the order the changes happened in the database.
    Maybe we could check that incoming events in a given quantum of time (the maximum expected duration of a transaction) are not related to the same document; if they are, the master should reload and reindex the entity?

  • Debezium, so that "document change" events are in the same order as the transaction log.

One problem common to all these solutions is that two concurrent transactions might impact the same document, because of @IndexedEmbedded... So when we get two "document changes" event, the acceptable behavior might not be to "pick a winner", but just to "merge the events", which may be impossible to do at that point (lacking the information about entity changes).

Approach 2: entity change events

Instead of sharing indexing events, e.g. “add this document that I already generated”, share entity events, e.g. “indexed entity MyType with ID 1 should be added”.
On the other end of the queue, consume the events by loading the entities, processing them, and indexing the resulting document.

Compared to the “document change event” solution, this makes more of the indexing process asynchronous, potentially reducing the overhead of Hibernate Search for the user. In a web application for example, this could shave off some time from the processing of each HTTP request.

In particular, a single entity change might trigger lots of reindexing works in these cases:

  • When the entity is embedded in lots of other indexed entities

  • When the entity is embedded in a few very heavy indexed entities, which will trigger the loading of many other entities during their reindexing.

  • When we process entities within the same transaction that made the changes, having to perform all these additional loads might end up delaying the transaction (or the post-transaction synchronization) for quite a long time. Reducing the loads to just what’s necessary to determine which entities must be reindexed might provide a big improvement in these cases.

Pros:

  • Solves the concurrency issues mentioned in solution 1 in a very simple way: if we get two "entity changes" events in a row, even in the wrong order, the second one will be processed and trigger reindexing based on the current database state, and as a result it will always produce an up-to-date document taking into account both changes.

  • Reduces the latency in user applications (not indexing latency, but actual HTTP request processing latency): we don't bloat user transactions with additional database loading for indexing.

  • Reduces the risk of OOM errors in big reindexing events: we can control when to clear and flush a session, which we cannot do when we’re using the user’s Session. We could flush and clear between each indexing, or every 10 of them, … We’re free!

  • Might be improved even further by moving the processing of “contained” types to the queue, too.

  • In a “hot-update” scenario (see ), where we want to process each event twice (once with the old mapping, once with the new one), we would only have to duplicate the queue (send events to both the old and new nodes); everything else would be business as usual.

Cons:

  • Requires asynchronous processing to make sense: waiting for the queued event to be consumed would make no sense.

  • Requires DB access from the “processing” nodes. This might not be a big deal in most architectures, though, because users are likely to use their application nodes as processing nodes, and to not have dedicated “processing only” nodes.

  • Probably increases database usage, as we cannot benefit from the first-level session cache anymore.

Environment

None

Assignee

Unassigned

Reporter

Yoann Rodière

Labels

None

Suitable for new contributors

None

Pull Request

None

Feedback Requested

None

Fix versions

Priority

Major
Configure