Since our last post, Sixfold has become part of the Transporeon family and has been rebranded to Transporeon Visibility Hub. This is our first post under the new brand, however, the technology and the team behind the product and the previous posts are the same. Our old posts still remain accurate, albeit with a different naming scheme.
At Transporeon Visibility Hub, we have embraced the event streaming paradigm through the use of Kafka. This provides us the ability to write scalable, performant code and start consuming new messages easily.
However, sometimes there is still a need to consume a new type of message with a long history or an existing message which has failed to be processed correctly due to a bug or some other unforeseen issue. In these cases, we are using different processes that we internally call backfilling and reconsuming.
What those terms exactly mean, as well as how they are put into practice, can be read below.
We’ve covered some specific use cases of Kafka more deeply in previous blog posts:
- Bringing Kafka based architecture to the next level using simple PostgreSQL tables
- Reducing database queries to a minimum with DataLoaders
- Preparing for the “known” unknown — scaling a service 10x in 30 days
For a better overview of our setup the following needs to be mentioned. First, we have quite a few microservices in operation. Each microservice can have many running instances. Each service has their own database, shared between the instances, but not across services. Second, some of our Kafka topics have a relatively long retention period. This enables the reconsuming logic described below. For topics with more strict retention policies, reconsuming has some caveats.
Backfilling is used internally when there is a need to fix a limited number of entities in one of our services’ databases. The process of backfilling can be used when the service acting as the source of truth has all the entities that need to be modified in the downstream service(s). For this, the upstream service simply produces new Kafka messages containing all the required information to correct the state in the downstream service. You can think of this as re-producing messages instead of re-consuming messages.
The message produced is constructed from the latest database state. This ensures that all data updates are contained in the generated message. Creating a backfill message like this can be very fast when the message creation logic has been set up beforehand. If not, a small code update to produce the message is required. We have chosen to write simple CLI commands to achieve this.
Backfilling is a good fit when only a limited number of entities need updating. Most likely, the biggest amount of time will go towards finding the entities that need to be updated. After this, it is just a matter of triggering the backfill for the affected entities. This could be likened to targeted surgery, only affecting the database entities that need to be changed.
Few cases where backfilling is useful for downstream services:
- Logic has changed, requiring some fields to be recalculated
- A bug has caused the entity to be malformed or not persisted at all
The only requirement to use backfilling is that the upstream service has the entity correctly saved. If the upstream service has not stored the data, then there is nothing to create a new message from. In such case, the downstream service can only rely on fixing any bugs or making changes to itself, followed by re-consuming old messages from Kafka.
An example of backfilling
Below is an example of using backfilling to fix an issue where some data was not saved correctly.
In this scenario, a user registers and is added to a company. When the user confirms their registration, two separate Kafka messages are produced.
One message creates the user entity and another creates a company relationship. The upstream service saves all the data correctly and sends the aforementioned messages.
The relationship between the company and the user is saved, but there is a bug in a downstream service that prevents saving the user to the database.
Foreign keys are not used in this example.
This kind of state can block any subsequent actions since there is an assumption that if the relationship exists, then both the user and company entities exist as well. We have configured an alert to be raised whenever a partition is stuck for a while. The bug will get fixed by an engineer, and at this point, the easiest way to fix the missing entity is to backfill it.
Using a CLI command, backfilling logic is triggered on the upstream service. A new user message is produced to be consumed and processed in the downstream services. After the consuming finishes, the user entity is present and the logic can continue from where it was blocked before.
In these kinds of cases, there is a big caveat to keep in mind. Namely, the message keying in Kafka is important.
If the relationship message is keyed in a manner that it goes in the same partition as the user create message then it will need to be skipped to allow the consumer to get to the user creation message first. Then the relationship message would need to be backfilled as well.
Why backfill and not reconsume? In the example, only one entity needs to be fixed, and if reconsuming was used, then it would require a lot of coding effort and involve the risk of the downstream service going out of sync with the upstream service. Reconsuming would also need to occur in all downstream services that have this bug, which further increases the amount of risk at hand.
To continue with the same analogy and compare it with the surgical approach of backfilling a single or limited set of messages, reconsuming is more of a “hammer” approach for a single service. Reconsuming is consumer-based and is controlled by the downstream service, as opposed to the producer-based backfilling which is controlled by the upstream service.
As the terminology itself suggests, it is meant for (re)consuming historical messages in the topic, starting from a past offset. It is usually a rather resource-heavy process.
In case there is an existing consumer which already handles the message, it requires some thought to avoid situations in which data first goes out of sync due to reconsumption of old messages, before coming back in sync as the latest messages are consumed.
Furthermore, consideration has to be put to any side effects that message consumption might have, as re-consumption would also re-trigger those.
There are a few use cases for reconsuming:
- The service needs to start using data which hasn’t been persisted in its database before, meaning there is a completely new type of message or new field(s) in an existing one.
- The processing logic has been changed, and the changes need to be applied to all the existing and future messages.
Different approaches to reconsuming
Depending on the situation, there are 3 main ways that we have used to reconsume messages.
- Letting the existing consumer group reconsume the historical messages by setting its offset into the past
- Parallel processing by existing and temporary consumer groups until the temporary consumer has caught up
- Dynamic processing with a handover from the temporary consumer group to the existing consumer group
For approaches 2 and 3 a new temporary consumer group has to be created. This consumer group will have separate handlers for the messages that need to be picked up for reconsuming, and it will process only the important messages, ignoring all the others. Depending on the amount of historical data, the time needed to complete the reconsume can be quite long.
The temporary consumer group is important. If the reconsuming process was carried out by just changing the offsets of the original consumer group (as described in approach 1), then it would reprocess all the messages from that offset to the present day.
This can surface the issues mentioned before — side effects caused by changes in consumer code over time, data becoming out of sync first and only then getting back to sync etc.
The temporary group approach allows for reconsuming logic to be as precise as possible, only affecting a small subset of the data (a single field, a new entity etc.), instead of rewinding the clock on the entire service to some historical timestamp.
Reconsuming / Just changing offsets of an existing consumer group
Simply setting the existing consumer group’s offset to some time in the past is the simplest way to reconsume historical messages. This could be called the “naive way” to reconsume messages. As the number of different message types in a topic grows, it becomes too nuclear in almost all cases. This approach can be used in cases of disaster recovery and when setting up new services or adding a topic to a consumer group.
Reconsuming / Parallel processing from existing and temporary consumer groups
Parallel processing is a good choice when adding a new field to an entity that is in constant use. As mentioned before, a temporary consumer group is created to allow consuming of historical messages while not halting the existing consumer group.
To achieve parallelism, the existing consumer group must continue handling forthcoming messages while the temporary group is still processing the historical ones. This ensures that existing data is kept up to date during the entire reconsuming process.
Both the existing and temporary consumer groups need handlers for the messages that are about to be processed.
The temporary group handler needs to save only the new field, while the existing group handler needs to take into account all the fields including the new one.
Until the temporary consumer group catches up to the existing consumer group, the new field’s state will be outdated. This is due to the temporary consumer group writing outdated state to the new field until the reconsume finishes.
Parallel reconsuming handler example
Both message handlers are wrapped with extra logic. For the existing consumer group, it keeps track of the offsets that have been processed.
For the temporary consumer groups, the extra logic determines when the temporary handler needs to stop processing messages. This incurs an overhead due to the fact of having to check the database for the handover offset.
More details about the handover process can be found further down.
Reconsuming / Dynamic processing with a handover
Dynamic processing is a good fit when adding a new entity to a service. In this case, the data can be out of date during the reconsuming process since it is not used yet. As opposed to the parallel approach, all messages are processed only once, making it preferable in case there are side effects, which can only happen one time per message.
The message handlers can be identical for both the existing and temporary consumer groups. With this approach, the existing consumer group handler will skip messages until the temporary consumer group catches up. At this point, the handover logic is used to pass processing back to the existing consumer group, stopping the temporary one.
Dynamic processing handler example
Again, both message handlers are wrapped with extra logic.
For the existing consumer group, the extra logic keeps track of the offsets that have been processed and skips all messages until the handover happens.
For the temporary consumer group, the extra logic determines when the temporary handler needs to stop processing messages and hand over to the existing handler.
Next, we will cover how the handover is accomplished for both parallel and dynamic reconsuming. A database table is used to keep track of all the partitions and how far along their reconsuming process is. This table is critical to determine when to stop the temporary consumer group.
Each time the wrapper code for the temporary consumer group is executed it checks if the temporary consumer group has caught up to the
existing_consumer_batch_last_offset. Once this happens the
reconsume_consumer_handover_offset is filled with the current offset and the temporary message group will start skipping any new messages.
It is safe to remove the reconsuming logic after all partitions have been handed over. Although, low-volume messages might not receive enough new payloads to update offsets for all the partitions. In this case, reconsuming could be considered done once the lag is gone in the temporary consumer group.
Finally, to clean things up, the temporary consumer group logic can be removed together with the wrapper from the existing message handler.
We are back to normal again, with the messages being successfully reconsumed.
Things to look out for and keep in mind
- During the reconsuming process, the new entities or fields are not up-to-date and should not be used before the process is finished.
- Different database constraints might have evolved which might cause problems during the reconsumption. For example, foreign keys need to be removed and re-added afterward to avoid the database blocking updates/inserts.
- Reconsuming messages causes lag in the temporary consumer group.
- In case of adding a new field, the temporary handler should not update the existing fields. Otherwise, the entire state will be inconsistent during re-consuming.
In this post, we introduced two different ways we reprocess Kafka messages: backfilling and reconsuming. They are quite different by nature and should be used in places where they are appropriate.
Backfilling is suitable for fixing smaller batches of entities in the downstream services, without changing anything in the codebase.
Reconsuming, on the other hand, takes place in one specific service and requires careful code modifications to process a lot of past messages without any unintended side effects.
These approaches are likely quite specific to the way we use Kafka. We are very interested in hearing how you deal with similar scenarios, so please let us know in the responses to this post!