Bringing Kafka based architecture to the next level using simple PostgreSQL tables
Written by Tõnis Pool
We’ve mentioned in previous posts that at the heart of Sixfold lives Kafka, which our services use to communicate with each other. Each of our services also has its own Postgres database. At the very abstract level every service does the following:
- Read a message from Kafka (or a larger batch of messages).
- Do some processing (business logic). Possibly including REST requests to external parties.
- Save/update some state in the local DB.
- Potentially produce some new Kafka messages.
You can imagine the architecture as a web of Kafka topics with nodes as services that carry out the required business logic. This architecture has served us well and in general we were quite happy with it, but there were some drawbacks:
#1 Isolation problem — processing messages serially
The beauty within Kafka lies in the Log abstraction. It’s what enables Kafka to both behave a little like a database and a high performance messaging system. The abstraction is simple:
- Producers write messages to the end of the log.
- Consumers mark their progress within the log by continuously committing an offset — how far they’ve read into the log.
This also means that within the scope of a single Log you can make progress linearly. Consumers have to acknowledge processing of the message by moving their offset marker forward. However, when you get a specific bad message within the log whose processing takes a lot of time or halts the stream entirely, then it affects every other (newer) message in the Log. One can imagine a REST request to an external party failing, which means that you would be stuck, not making progress.
This issue is tackled in other messaging systems through some means of selective acknowledgement. The system could allow acknowledging each message individually, which would allow you to make progress with newer messages. But this makes bookkeeping a lot more complicated (performance would suffer) and you generally still want to have some logical sequence of processing messages.
Kafka does have partitions, which should be thought of as unit of parallelism. A single Kafka topic consists of
N partitions - each partition is a standalone Log of messages. Having many partitions helps the above described situation in the sense that only a single partition would get stuck due to the bad message - offsets are tracked per partition. The gotcha is that - at least so far - partitions come with a cost. A Kafka cluster of a specific size can only handle up to some specific number of partitions.
Also while in theory you can add partitions to a topic, in practice it likely means that you unexpectedly split data. The target partition for any given message is chosen by some algorithm, but by default it’s usually a hash of a provided key string modulo the total number of partitions. When using this hashing strategy then message X, which used to be in partition 3, could move to partition 7 after adding 10 partitions. Data for the same message now lives in 2 distinct partitions, which breaks the logical order of messages when re-consuming. As a side note, at Sixfold, we have chosen 100 as the default nr of partitions with the hope that it will carry us well into the future, so we can defer the need to add partitions and shuffle historic data around.
All the above meant that in parts of the system where we make external requests while processing messages we started to feel the pain of committing offsets serially. A specific bad transport could halt or slow down the processing of 1% of other unrelated transports (as we have 100 partitions).
#2 State problem — distinct transactional boundaries
In the introduction we laid out the steps that most of our services follow — consume, process (which includes DB updates) and produce. The issue here is that there is no shared transactional boundary between updating the database and producing a Kafka message that carries the same state update. When DB update fails then Kafka message might still be produced or vice versa. In a distributed system this can lead to hard to diagnose issues with state drift across different parts of the system.
When message producing is conditional (depends on state) then if we update the database first and the message produce fails then this can mean that the message is never sent to rest of the system. As the state in DB would be updated and the code path upon retry is thus different. When we produce first and the database update fails (because of incorrect state) it means in the worst case we enter a loop of continuously sending out duplicate messages until the issue is resolved. What we really would need is to ensure that both actions happen transactionally (as much as possible).
Jobs to the rescue!?
Those 2 problems with our Kafka based architecture started to become more pronounced over time as the overall load and complexity grew. We decided it was time to act — figure out a solution to above problems.
Around the same time we happened to read about how segment.io solved a similar issue to #1 with a solution they dubbed Centrifuge. With Centrifuge they built a queue like solution on top of a database to have per customer isolated Jobs. Taking Centrifuge as inspiration, we devised our own solution, which we internally also called the Jobs library.
Our high level idea was:
- Insert “work” into a table that acts like a queue
- “Executor” takes “work” from DB and runs it
What is a Job?
A Job is an abstraction for a scheduled DB backed async activity with configurable queuing policies. There are 3 key properties that characterize a job:
type- defines what function to execute. Similar to how each Kafka message in our system includes a message type in the envelope (for decoding and handling purposes)
origin- represents the "owner" of the job. The origin is used to associate a job to a specific instance of a service (= Kubernetes pod), which is responsible for running it.
group- a string that can be used to group together multiple jobs into a single thread. This could be something like a
transport_id, in which case all jobs for a particular transport would form a single group and would be executed serially.
Choosing what work to execute
By default, we run 3 instances of each service for redundancy reasons, and we have configured Kubernetes horizontal pod autoscalers. Whenever a service starts to struggle under load, Kubernetes will automatically add a new instance. We wanted the Jobs library to fit nicely into our existing architecture and be horizontally scalable. The question remained — how to divide jobs among executor instances in a safe and performant way.
The solution was staring at us all along — the way Kafka consumer groups work. Kafka consumer group rebalance protocol ensures that all topic partitions are divided among all live and healthy consumer group members. If we have 3 instances of the application and 100 partitions then each instance consume ~33 partitions. The insight was that we could use the same partition mapping to also decide which jobs a given executor should run. Each service instance has 1 Job executor that is aware of the currently assigned partitions.
The added benefit is that there is no separate processes or “executor cluster” to manage. As work comes in via Kafka messages on specific partitions the same process handles it — doesn’t matter whether processing is done in a message handler or via Job abstraction. When there’s too much load, adding instances will automatically spread the load as each instance has fewer partitions to handle.
How did we solve the #1 isolation problem?
Whenever we need a finer grained failure domain than the single partition scope (1 out of 100), we can move the processing to a Job. All Jobs that logically belong together will be scheduled with the same
group value. All Jobs within same
group will be executed serially (or optionally only the last scheduled one). When a REST request for particular transport fails, it will not affect other Jobs as they are all executed independently (based on
group value). All in all, using Jobs allows us to reduce the blast radius of a single bad message, as it in essence creates additional, virtual, partitioning inside a single Kafka partition.
How did we solve the #2 state problem?
By recording Jobs in the service database we can do the state update within the same transaction as inserting a new Job. Combining this with a Job that produces the actual Kafka message, allows us to make the whole operation transactional. If either of the parts fails, updating the data or scheduling the job, both get rolled back and neither happens.
The Job might still fail during execution, in which case it’s retried with exponential backoff, but at least no updates are lost. While the issue persists, further state change messages will be queued up also as Jobs (with same
group value). Once the (transient) issue resolves, and we can again produce messages to Kafka, the updates would go out in logical order for the rest of the system and eventually everyone would be in sync.
This post outlined how we overcame 2 specific hurdles we faced when building products on top of a Kafka based microservice architecture. For brevity, we had to skip over many of the finer details and tricks we employ within the Jobs library itself — such as using CTEs to do as much as possible within single database query or how we retry Job executions. These might be good candidates for a followup post down the line.
However, we have also plenty of problems where we don’t have a well-defined answer for as of yet. As an example, concrete guidelines how to build the “web of topics” further — when to add topics vs when to add new messages to existing ones. We’ll keep writing about our learnings of evolving our Kafka based microservice architecture in this blog. If you’ve faced similar issues in the past, then we’d be happy to hear how you tackled them.