Hacker News new | past | comments | ask | show | jobs | submit login
Scaling Kafka at Honeycomb (honeycomb.io)
166 points by i0exception on Nov 30, 2021 | hide | past | favorite | 44 comments



I've never used Kafka but this post is yet another hard earned lesson in log replication systems where storage tiering should be much higher on the hierarchy of needs than horizontal scaling of individual logs/topics/streams. In my experience the times when you need storage tiering something awful is already happening.

During network partitions or other scenarios where your disks are filling up quickly it's much easier to reason about how to get your log healthy by aggressively offloading to tiered storage and trimming than it is to re-partition (read: reconfigure), which often requires writes to some consensus-backed metadata store, which is also likely experiencing its own issues at that time.

Another great benefit of storage tiering is that you can externally communicate a shorter data retention period than you actually have in practice, while you really put your recovery and replay systems through their paces to get the confidence you need. Tiered storage can also be a great place to bootstrap new nodes from.


Can you link a good article on storage tiering for Kafka?



> July 2019 we did a rolling restart to convert from self-packaged Kafka 0.10.0

Ouch, that's a lot of fixed bugs you weren't reaping the benefits of >_< What was the reason to stick on 0.10.0 for so long?

After we hit a few bad ones that finally convinced our sysop team to move past 0.11.x, life was far better - especially recovery speed after an unclean shutdown. Used to take two hours, dropped to like 10 minutes.

There was a particular bug I can't find for the life of me that we hit about four times in one year where the replicas would get confused about where the high watermark was, and refuse to fetch from the leader. Although to be fair to Kafka 0.10.x, I think that was a bug introduced in 0.11.0. Which is where I developed my personal philosophy of "never upgrade to a x.x.0 Kafka release if it can be avoided."

> The toil of handling reassigning partitions during broker replacement by hand every time one of the instances was terminated by AWS began to grate upon us

I see you like Cruise Control in the Confluent Platform, did you try it earlier?

> In October 2020, Confluent announced Confluent Platform 6.0 with Tiered Storage support

Tiered storage is slowly coming to FOSS Kafka, hopefully in 3.2.0, thanks to some very nice developers from AirBnB. Credit to the StreamNative team, that FOSS Pulsar has tiered storage (and schema registry) built-in.


> What was the reason to stick on 0.10.0 for so long?

Aforementioned self-packaging, we were mangling the .tar.gz files into .debs, and we had to remember to update the debs and then push them out onto our systems, instead of just using Apt. Thus why Confluent's prebuilt distro helped a lot! But also the team was just _afraid_ of Kafka and didn't want to touch it unnecessarily.

> I see you like Cruise Control in the Confluent Platform, did you try it earlier?

We definitely should have. We tried Datadog's Kafka-kit but found adapting it to use Wavefront or Honeycomb Metrics products was more problematic than it needed to be.

> Tiered storage is slowly coming to FOSS Kafka, hopefully in 3.2.0, thanks to some very nice developers from AirBnB. Credit to the StreamNative team, that FOSS Pulsar has tiered storage built-in.

Yeah, we're glad the rest of the world gets to have it, and also glad we paid upfront for Confluent's enterprise feature version to get us out of the immediate bind we had in 2020. Those EBS/instance storage bills were adding up fast.


The irony of Honeycomb using an open source tool from Datadog is not lost on me =)


This is one of the major benefits of open source. You can share engineering resources, even with your competitors, for things that aren't your bread & butter.


Yup! In fact, we collaborate with Datadog, New Relic, Splunk, Lightstep, et al on OpenTelemetry (https://news.ycombinator.com/item?id=28997275 / opentelemetry.io)


stand on the shoulders of giants!


> we paid upfront for Confluent's enterprise feature version to get us out of the immediate bind we had in 2020.

Definitely agree it's an essential feature for large datasets - in the past I've used Kafka Connect to stream data to S3 for longer term retention, but it's something else to manage, and getting data back into a topic if needed can be a bit painful.


getting to just use the same consistent API without rewriting clients was AMAZING.


This always reminds me of this thing I’ve written some time ago : https://gruchalski.com/posts/2021-04-02-kafka-infinite-reten...


Based on our experience with Apache Kafka and alternative streaming systems, Apache Pulsar natively addresses the Honeycomb's needs.

- Decoupling of Broker & Storage Layer

- Tierered Storage (SSD, HDD, S3,...)

We use both Kafka and Pulsar in our systems.

- Kafka is used for microservices communication and operational data sharing

- Pulsar is used for streaming large customer data in thousands of topics


In 2016 when we were founded, Pulsar wasn't a thing yet. Today, we'd be very likely to use Pulsar if we were starting from scratch.


Agree. We did a recent migration from Kafka (Confluent Platform) to OSS Pulsar for a near identical use case and couldn't have been happier - we've saved costs, improved uptime, and reduced errors. We found it much easier to manage Pulsar ourselves with automation that it mostly removed the need to use a managed Kafka service (we've tried solutions from AWS, Confluent, and Aiven).

The only downside with Pulsar (which is constantly improving) was that community support for teething issues was a bit slower than what we saw with Kafka.


Now Kafka has ZK taken out and tiered storage coming in these factors are becoming mute points. With tiered storage Kafka has 2 layers and hence the benefits of broker/storage decoupling. By comparison, Pulsar has broker, bookkeeper, S3, and ZK layers to contend with. This is why a second layer was never added to Kafka directly.


What's the story like for migrating from Kafka to Pulsar?


- Kafka couldn't cope up if there are hundreds or few thousands topics. High CPU load, longer startup times...

- Even empty Kafka topic consumes 20MB of on-disk storage (that's 20GB for 1000 topics)

- Inevitable coupling of non-partitioned topic to a particular Kafka broker limiting the storage scale

- Tiered storage was not available previously in Kafka (is it now available in open source version?)

- Native Multi Tenant support with Authentication / Authorization support not available in Kafka (essential for customer based thousands of namespaces and topics)


> Kafka couldn't cope up if there are hundreds or few thousands topics. High CPU load, longer startup times...

It also uses num_partitions*2 open file descriptors per topic, which can quickly surpass the default ulimit on a host. Always remember to raise the ulimit before you near 1000 topics, otherwise Kafka crashes.


right -- the question I and others have is, can you convert an existing topic in place or do you have to do the dual-writer/shadow launch, then drain the old one model etc.


We choose both Kafka and Pulsar from the start. Haven't done any migrations from one to other.

If we are gonna move from Kafka to Pulsar (operational data),

- Publish new messages only to Pulsar

- Wait for Kafka Subscriber to process all pending messages

- Start Pulsar Subscriber (step 2 and 3 can done in parallel or sequential based on message ordering needs)

We don't need to worry about pulsar subscriber offset since existing messages in kafka topic won't be copied to pulsar topic preventing duplicate messages. The whole migration would be completed in 1-2 min (depending on Step 2 wait time).

This approach is carefully taken such that there can be little lag in subscriber but the publisher should not be affected by any means.


What's the difference between Kafka, Pulsar, RabbitMQ and Crossbar.io ? They all seem to be hubs for PUB/SUB and RPC.


Is Pulsar a good fit for microservices communication ?


Yes, definitely. It's just that we used Kafka in the start for microservices communication and later choosen Pulsar for large data streaming.

We have it in our roadmap to migrate the former to Pulsar, but not the utmost priority right now. It also simplifies Infrastructure management by keeping one event streaming stack instead of two.


I'm not a Kafka user but it seems very similar to my experience running Elasticsearch clusters tuned for low latency response time.

There's a complicated mix of requirements for CPU, memory, disk, _and_ network speed, and meeting all of them cost effectively is a real challenge.

Similarly, it's easy to build a cluster that performs well until a single node fails. The increased load per node plus the CPU and network cost of replicating data to a replacement instance can really cause trouble.

Elasticsearch also runs on the JVM so I'm hoping the new EC2 instance types will work for us too. They look to be really great.


Maybe I missed it but are you able to talk about how many messages a second, partition count and average message size?

I run a few hundred Kafka clusters with message counts per second in the tens of millions for some clusters, a few thousand partitions, message sizes around 7kb with gzip compression, and have never needed the amount of CPU and network/disk throughput mentioned. With node counts range between ~10-25. Most of my clusters reaching those speeds at most average around 7Gbps of disk throughput per broker.

I have recently started running Kafka in GCP with their balanced ssd disks capping out at 1.2Gbps I'm not seeing much of a performance impact. It requires a few more brokers to reach the same throughput but not having any of the performance and scaling issues mentioned in this post.

My brokers are sized a bit differently than mentioned in the post as well, low amount of CPU (maximum 20ish cores) but much more memory around 248GB for my larger clusters. So maybe that has to do with it? Maybe the broker sizes that were chosen are not ideal for the workload?

Maybe I've been lucky in my setups but I would like to know a bit more. Having been running Kafka since the 0.10 days and now on 2.6 for all my clusters this type of performance problem seems a bit puzzling.


1.5M messages/sec, average message size 1kb pre compression, 300 bytes post compression/batching.

the problem was that we were really really disk limited before for keeping the 48 hour window of data, having to keep everything on NVMe or EBS was astoundingly expensive.

but yeah, we run it all off 6 brokers now.


If I understand correctly, there were:

- issues with tail latency and cost when using gp2

- issues with generally bad performance when using st1

- issues with reliability when using gp3 (as an early adopter of aws "GA" product)

- issues with insufficient disk space when using local-attached nvme

- issues with confluent licensing cost

And tiered storage solves all of that.

The thing is, I have not seen kafka struggling with disk performance when running on gcp pd-ssd. Perhaps even pd-balanced would do the trick, as indicated by rmb938's comment. I am glad that you guys finally landed on a boring solution now, but things have been rather boring for years with another cloud provider. Perhaps there is no material impact from the high tail latency when using gp2, and you just needed a better contract negotiator? Surely the tail latency would be worse now whenever data need to be pulled from S3?


Oh, believe me, we have hired Corey Quinn (Duckbill Group). AWS budged on some things, but not on the EBS cost.


Maybe it's worth trying out GCP for a POC cluster? Downside is they don't have any ARM instances but some back of the napkin math does show that an equivalent setup of using 6x im4gn.2xlarge in AWS to 6xn2-standard-8 and 6x3750gb pd-balanced ssds is roughly around the same cost and disk perf, could be a bit cheaper with their AMD instances instead of Intel. If you compare it to a gp2 disk it's roughly 4x faster, but the same per for local disk on im4gn.2xlarge.

I also have had decent success with getting committed use and reservations on GCP ssd compared to other cloud providers.


It would be for GCP based customers, but we are a telemetry platform and making our (majority) AWS based customers pay 0.08 per GB to egress to us is a non-starter for them :/


Did you look at any of the other solutions such as fq? 300 bytes is a solidly small size. I’m guessing Kafka has gotten faster since this doc was published, but might be worth investigating. https://github.com/neophenix/StateOfTheMQ/blob/master/state_...


Ah, https://github.com/circonus-labs/fq

It was less mature in 2016 when we made the original technology choice (and is still, I'd say, probably not a Boring Technology today). With batching, Kafka is plenty fast for us!


>> Historically, our business requirements have meant keeping a buffer of 24 to 48 hours of data to guard against the risk of a bug in retriever corrupting customer data.

I have used much larger buffers before. Some bugs can lurk around for a while before noticed. For example the lack of something is much harder to notice.


Yes -- now that we're just paying the cost to store once on S3 rather than 3x on NVMe, we can plausibly extend the window to 72 hours or longer! before, it was a pragmatic, constraint-driven compromise.


This is an awesome write up. I love reading these warts and all accounts - they're always way more useful than the typical case study "we switched to X and it saved us Y%!" marketing posts.

One point that makes Intel not look quite so bad performance wise - based on my own benchmarking, I'm pretty sure when this article talks about cores they actually mean vCPUs. In AWS on x86, 1 vCPU is 1 hyperthread, so it's kind of half a core. On Graviton 2, 1 vCPU is one full core, the CPUs don't have hyperthreading. This means that you need 10 Intel cores to do the same work as 16 Graviton cores, not 20. This of course doesn't change the cost savings from switching to arm64.


Intel looks less bad when you compare one physical core to one physical core, but AWS definitely will not sell you one physical Intel core for the price of one ARM core, they will sell you _half_ of one Intel core for 20% more than an ARM core.


> …RedPanda, a scratch backend rewrite in Rust that is client API compatible

I thought RedPanda was mostly C++?


The RedPanda website claims to be written in C++, and their open source github repo agrees.


thanks for the correction! knew an error would slip in there somewhere! apparently they have considered rust though! https://news.ycombinator.com/item?id=25112601

it's fixed now.


I'm still just reading the first couple sections, but I already want to give props for an excellent write-up. You're explaining in (mostly) plain language the purpose of Kafka, your specific application of it, and lots of great background detail and history of your implementation's evolution. It's also clear that whoever wrote this knows their Ops. Thank you!


+1. Very well written.


It's funny how my bugbears from interacting with distributed async messaging (Kafka) are like 90 degrees orthogonal from the things described here:

(1) Occasionally have wanted to wonder what the actual traffic is. This takes extra software work (writing some kind of inspector tool to consume a sample message and produce a human-readable version of what's inside it).

(2) Sometimes see problems which happen at the broker-partition or partition-consumer assignment level, and tools for visualizing this are really messy.

For example you have 200 partitions and 198 consumer threads -- this means that because of the pigeonhole principle there are 2 threads which own 2 partitions. Randomly, 1% of your data processing will take twice as long, which can be very hard to visualize.

Or for example 10 of your 200 partitions that are managed by broker B which, for some reason, is mishandling messages -- so 5% of messages are being handled poorly, which may not emerge in your metrics the way you expect. Viewing slowness by partition, by owning consumer, and by managing broker can be tricky to remember to do when operating the system.

(3) Provisioning capacity to have n-k availability (so that availability-zone-wide outages as well as deployments/upgrades don't hurt processing) can be tricky.

How many messages per second are arriving? What is the mean processing time per message? How many processors (partitions) do you need to keep up? How much slack do you have -- how much excess capacity is there above the typical message arrival rate, so that you can model how long it will take the cluster to process a backlog after an outage?

(4) Remembering how to scale up when message arrival rate feels like a bit of a chore. You have to increase the number of partitions to be able to handle the new messages ... but then you also have to remember to scale up every consumer. You did remember that, right? And you know you can't ever reduce the partition count, right?

(5) I often end up wondered what the processing latency is. You can approximate this by dividing the total backlog of unprocessed messages for an entire consumer group (unit "messages") by the message arrival rate (unit "arriving messages per second") which gets you something that has dimensionality of "seconds" and represents a quasi processing lag. But the lag is often different per-partition.

Better is to teach the application-level consumer library to emit a metric about how long processing took and how old the message it evaluated was - then, as long as processing is still happening, you can measure delays. Both are messy metrics that need you get and remain hands-on with the data to understand them.

(6) There's a complicated relationship between "processing time per message" and effective capacity -- any application changes which make a Kafka consumer slower may not have immediate effects on end-to-end lag SLIs, but they may increase the amount of parallelism needed to handle peak traffic, and this can be tough to reason about.

(7) Planning only ex post facto for processing outages is always a pain. More than once I've heard teams say "this outage would be a lot shorter if we had built in a way to process newly arrived messages first", and I've even seen folks jury-rig LIFO by e.g. changing the topic name for newly arrived messages and using the previous queue as a backlog only.

I wonder if my clusters have just been too small? The stuff here ("how can we afford to operate this at scale?") is super interesting, just not the reliability stuff I've worried about day-to-day.


Yup, we went with fewer, higher throughput partitions for this exact reason of wanting precise control over partitioning. It's been a blessing and a curse.




Guidelines | FAQ | Lists | API | Security | Legal | Apply to YC | Contact

Search: