Root's Migration from In-Process Pub/Sub to Kafka

Logs

Photo by Dimitri Tyan

For a long time at Root, we've used pub/sub as a way to decouple parts of our application. This was done with a small ruby class. Essentially, you'd call publish with an event and the associated attributes, and any subscribers would run inline at that moment.

That has worked very well for us, but we wanted to have the option to eventually extract some "macroservices" from our monolith. To make that extraction work, our current pub/sub solution wouldn't quite cut it (since it needs to be able to call the subscribers directly).

After some investigation into our options (AWS SQS/SNS, AWS Kinesis, RabbitMQ, Redis Pub/Sub, and Kafka), we landed on Kafka as the successor to our in-process solution. In this post, I'm going to dive into what our existing solution looked like, and how we migrated to Kafka without any downtime.

Note: I have simplified this example a little. In our application we actually have multiple instances of the PubSub class I show below (and therefore multiple topics). For the sake of this overview, we'll use only one.

Our in-process pub/sub implementation

Let's start this off with some code. This is a small example of how you might use our in-process pub/sub:

1# engines/profiles/config/initializers/pub_sub.rb
2
3PubSub.default.register(:profile_updated, attributes: %i[account_id profile_id])
4
5# engines/profiles/app/services/profile_service.rb
6
7module ProfileService
8 def self.update(account_id:, profile_attributes:)
9 # ...
10 PubSub.default.publish(:profile_updated, account_id: account_id, profile_id: profile_id)
11 end
12end
13
14# engines/rating/config/initializers/pub_sub.rb
15
16PubSub.default.subscribe(:profile_updated) do |account_id:, profile_id:|
17 RateJob.perform_later(account_id: account_id, profile_id: profile_id)
18end

In this example, we have an instance of PubSub with one event (:profile_updated) that takes account_id and profile_id as attributes. Once the profile is updated in the ProfileService, :profile_updated is published to PubSub.default. Finally, RateJob is subscribed to the :profile_updated event in the rating engine.

One minor thing to note here is the use of "engines." For a little more information on how we have structured our Rails application, check out our CTO's blog post about the modular monolith.

Anyway, a slimmed down implementation of the PubSub class looks roughly like this:

1class PubSub
2 Event = Struct.new(:name, :attributes, :subscribers)
3
4 def self.default
5 @default ||= new
6 end
7
8 def initialize
9 @events = {}
10 end
11
12 def register(event_name, attributes:)
13 event = Event.new(event_name, attributes, [])
14 events[event_name] = event
15 end
16
17 def publish(event_name, attributes = {})
18 event = events.fetch(event_name)
19 raise "invalid event" unless attributes.keys.to_set == event.attributes.to_set
20
21 event.subscribers.each do |subscriber|
22 subscriber.call(attributes)
23 end
24 end
25
26 def subscribe(event_name, &block)
27 event = events.fetch(event_name)
28 raise "invalid subscriber" unless block.parameters.all? { |type, _name| type == "keyreq" }
29 raise "invalid subscriber" unless block.parameters.map { |_type, name| name }.to_set == event.attributes.to_set
30
31 event.subscribers << block
32 end
33
34 private
35
36 attr_reader :events
37end

As mentioned earlier, when we "publish" an event all of the subscribers are executed inline.

Our migration to Kafka

Within our system, we have a good amount of business critical functionality running through our pub/sub setup. Up to this point, that wasn't a point of concern as it was just another piece of code executing. Moving to Kafka introduces another point of failure with a distributed system, so we had to de-risk our migration. To do so, we gathered data and took an incremental approach in cutting over.

Gathering analytics

To fully understand the load we would be putting on this new system, we started by gathering data on our current usage of pub/sub. Primarily, we were interested (at this point) in how many messages per second we were consuming per anticipated topic.

To get these metrics, we instrumented the existing pub/sub code with counts through StatsD.

1class PubSub
2 # ...
3
4 def publish(event_name, attributes = {})
5 statsd.increment("pub_sub.#{name}.#{event_name}.published")
6
7 # ...
8 end
9
10 # ...
11end

We used these metrics to do some capacity planning for our Kafka cluster before implementing and cutting over.

Publishing and consuming messages (no-op)

We didn't want to jump right into publishing and consuming pub/sub through Kafka (even with our analytics). Since we hadn't run a Kafka cluster in production yet, we wanted to be sure we understood how it would behave. To get ourselves more comfortable with it, we started with consumers that would no-op and simply mark messages as consumed:

1class Consumer < Karafka::BaseConsumer
2 def consume(out: $stdout)
3 params_batch.each do |params|
4 out.puts "TOPIC: #{params.fetch("topic")}"
5 out.puts "EVENT: #{params.fetch("event")}"
6 end
7 end
8end

Since the consumers weren't doing any work, we also had to support publishing to both Kafka and our inline subscribers at the same time. To do this, we introduced publish adapters into our PubSub class. Doing this provided us the ability to compose adapters (a MultiAdapter that ran both the Inline and Kafka adapters) as well as override the adapter to Inline in our tests.

1class PubSub
2 def initialize(publish_adapter: nil)
3 # ...
4 @publish_adapter ||=
5 if ENV["PUBLISH_TO_KAFKA_ENABLED"] == "true"
6 PubSub::Adapters::Multi.new([PubSub::Adapters::Inline.new, PubSub::Adapters::Kafka.new])
7 else
8 PubSub::Adapters::Inline.new
9 end
10 end
11
12 def publish(event_name, attributes = {})
13 event = events.fetch(event_name)
14 raise "invalid event" unless attributes.keys.to_set == event.attributes.to_set
15
16 statsd.increment("pub_sub.#{name}.#{event_name}.published")
17 publish_adapter.publish(self, event, attributes)
18
19 # ...
20 end
21
22 def consume(event_name, attributes)
23 event = events.fetch(event_name)
24 event.subscribers.each do |subscriber|
25 subscriber.call(attributes)
26 end
27 end
28
29 # ...
30end
1class PubSub
2 module Adapters
3 class Inline
4 def publish(pub_sub, event, attributes)
5 pub_sub.consume(event.name, attributes)
6 end
7 end
8 end
9end
10
11class PubSub
12 module Adapters
13 class Kafka
14 def initialize(producer: WaterDrop::SyncProducer)
15 @producer = producer
16 end
17
18 def publish(pub_sub, event, attributes)
19 producer.call(
20 {
21 :event => {
22 :name => event.name,
23 :attributes => attributes
24 }
25 }.to_json,
26 :partition_key => pub_sub.partition_key(attributes),
27 :topic => pub_sub.topic
28 )
29 end
30
31 private
32
33 attr_reader :producer
34 end
35 end
36end

As one final bit of precaution, we also set the Kafka publisher up to have a chance-based publish (not shown here) to limit throughput (adjustable through an environment variable) and started publishing only in our background workers. After slowly scaling up to 100% publish chance in background workers and getting a good idea of the differences in timing to publish, we turned publishing to Kafka on in the web workers (again, slowly scaling up the publish chance).

Cutting over fully to publishing and consuming in Kafka

After letting the no-op implementation settle, we included additional metadata in our messages to Kafka to indicate whether the consumer should process the message and updated the consumer to consume the message based on. This value was controlled through an environment variable, so we could switch between in-process pub/sub and Kafka if necessary:

1class PubSub
2 module Adapters
3 class Kafka
4 # ...
5
6 def publish(pub_sub, event, attributes)
7 producer.call(
8 {
9 :event => {
10 :should_consume => ENV["CONSUME_MESSAGES_THROUGH_KAFKA"] == "true",
11 :name => event.name,
12 :attributes => attributes
13 }
14 }.to_json,
15 :partition_key => pub_sub.partition_key(attributes),
16 :topic => pub_sub.topic
17 )
18 end
19
20 # ...
21 end
22 end
23end

Now that we were confident in publishing and consuming, we were able to cutover the environment variable to consume messages in our Kafka consumers. When this environment variable was changed, we stopped publishing to the inline subscribers and were only publishing to Kafka.

1class Consumer < Karafka::BaseConsumer
2 def consume(pub_sub: nil)
3 pub_sub ||= PubSub.default
4
5 params_batch.each do |params|
6 next unless params["should_consume"]
7
8 topic = params.fetch("topic")
9 event = params.fetch("event")
10
11 pub_sub.consume(
12 event.fetch("name").to_sym,
13 event.fetch("attributes").deep_symbolize_keys
14 )
15 end
16 end
17end

Unexpected issues found once we cut over

Like any big launch of an new system, you might run into some unexpected problems. So far, we've run into one problem worth mentioning.

On the first weekend after we cut over to Kafka, we ran into a series of cascading issues:

  • We had large influx of failed jobs
  • Clearing those caused a Redis slowdown
  • The Redis slowdown sufficiently slowed our consumers such that they fell behind
  • Our paging infrastructure failed to alert us of this early on

The fallout of this was that we were so severely behind in processing that the consumers were not able to process a full batch of messages before the session timeout. Because of that, the consumers would miss their heartbeats and be kicked out of the consumer group. Each time that happened, a rebalance occurred further slowing down processing.

There is a good overview of this issue and a hacky way to resolve it in this GitHub comment. To solve the issue going forward, we implemented microbatches with a manual heartbeat after each in our consumer as suggested by the linked comment.

Annnd we're done!

All in all, we had a fairly seamless migration. The one issue mentioned above was a topic with very low importance, so we're chalking it up to a learning experience that will prevent issues in the future on our more important topics. Kafka has now been running our pub/sub in production for about four weeks without any significant issues (knock on wood!).