A huge difference between monarch and other tsdb that isn’t outlined in this overview, is that a storage primitive for schema values is a histogram. Most (maybe all besides Circonus) tsdb try to create histograms at query time using counter primitives.
All of those query time histogram aggregations are making pretty subtle trade offs that make analysis fraught.
In my experience, Monarch storing histograms and being unable to rebucket on the fly is a big problem. A percentile line on a histogram will be incredibly misleading, because it's trying to figure out what the p50 of a bunch of buckets is. You'll see monitoring artifacts like large jumps and artificial plateaus as a result of how requests fall into buckets. The bucketer on the default RPC latency metric might not be well tuned for your service. I've seen countless experienced oncallers tripped up by this, because "my graphs are lying to me" is not their first thought.
Wow, this is a fantastic solution to some questions I've had rattling around in my head for years about the optimal bucket choices to minimize error given a particular set of buckets.
Do I read right that circllhist has a pretty big number of bin sizes and is not configurable (except that they're sparse so may be small on disk)?
I've found myself using high-cardinality Prometheus metrics where I can only afford 10-15 distinct histogram buckets. So I end up
(1) plugging in my live system data from normal operations and from outage periods into various numeric algorithms that propose optimal bucket boundaries. These algorithms tell me that I could get great accuracy if I chose thousands of buckets, which, thanks for rubbing it in about my space problems :(. Then I write some more code to collapse those into 15 buckets while minimizing error at various places (like p50, p95, p99, p999 under normal operations and under irregular operations).
(2) making sure I have an explicit bucket boundary at any target that represents a business objective (if my service promises no more than 1% of requests will take >2500ms, setting a bucket boundary at 2500ms gives me perfectly precise info about whether p99 falls above/below 2500ms)
(3) forgetting to tune this and leaving a bunch of bad defaults in place which often lead to people saying "well, our graph shows a big spike up to 10000ms but that's just because we forgot to tune our histogram bucket boundaries before the outage, actually we have to refer to logs to see the timeouts at 50 sec"
Prometheus is in the process of developing a similar automatic log-linear histogram bucket type. The goal is to make it as cheap as a 10-15 bucket histogram, but not require pre-defined buckets.
Correct me if I’m wrong but I thought the sparse histogram effort in Prometheus will still use single metric line counters for its storage abstraction?
I think it’s a great addition to the product and will excitedly use it but it’s a pretty big difference from a histogram centric db like circonus’ or a schema’d one like monarch.
I’ve used these log-linear history in a few pieces of code. There is some configurability in the abstract - you could choose a different logarithm base.
In practice none of the implementations seem to provide that. Within the each set of buckets for a given log base you have reasonable precision at that magnitude. If your metric is oscillating around 1e6 you shouldn’t care much about the variance at 1e2, and with this scheme you don’t have to tune anything to provide for that.
There are a large amount of subtle tradeoffs around the bucketing scheme (log, vs. log-linear, base) and memory layout (sparse, dense, chunked) the amount of configurability in the histogram space (circllhist, DDSketch, HDRHistogram, ...). A good overview is this discussion here:
As for the circllhist: There are no knobs to turn. It uses base 10 and two decimal digits of precision. In the last 8 years I have not seen a single use-case in the operational domain where this was not appropriate.
Right - it’s why I said “in the abstract.” You could do it and still have a log-linear format. Base 10 works great for real-world distributions.
Thanks for making and ossing circllhist. I’ve been close by to the whole “what’s the OTel histogram going to be” discussion for the last many months and learned a lot from that. That discussion is what introduced me to circllhist and got me using them.
I definitely remember a lot of time spent tweaking histogram buckets for performance vs. accuracy. The default bucketing algorithm at the time was powers of 4 or something very unusual like that.
It's because powers of four was great for the original application of statistics on high traffic services where the primary thing the user was interested in was deviations from the norm, and with a high traffic system the signal for what the norm is would be very strong.
I tried applying it to a service with much lower traffic and found the bucketing to be extremely fussy.
My personal opinion is that they should have done a log linear histogram which solves the problems you mention (with other trade offs) but to me the big news was making the db flexible enough to have that data type.
Leaving the world of single numeric type for each datum will influence the next generation of open source metrics db.
Yeah it was a tough tradeoff for the default case, because the team didn't want to use too much memory in everyone's binary since the RPC metrics were on by default. This is easily changeable by the user if necessary, though.
The histograms are useful on their own (visualized as a heatmap). If percentile lines are necessary (and they often aren't), I prefer to overlay them on top of the heatmap so it is clear where the bucket edges are.
I've been pretty happy with datadog's distribution type [1] that uses their own approximate histogram data structure [2]. I haven't evaluated their error bounds deeply in production yet, but I haven't had to tune any bucketing. The linked paper [3] claims a fixed percentage of relative error per percentile.
That is a very different tradeoff, though. A DDSketch is absolutely gigantic compared to a power-of-four binned distribution that could be implemented as a vector of integers. A practical DDSketch will be 5KiB+. And when they say DDSketch merges are "fast" they are comparing to other sketches that take microseconds or more to merge, not to CDF vectors that can be merged literally in nanoseconds.
Prometheus is adding sparse histograms. There's a couple of online talks about it already but one of the maintainers, Ganesh, is giving a talk at kubecon on it next week off anyone is attending and curious about it.
Wavefront also has histogram ingestion (I wrote the original implementation, I'm sure it is much better now). Hugely important if you ask me but honestly I don't think that many customers use it.
Granted, it looks like Monarch supports a more cleanly-defined schema for distributions, whereas Prometheus just relies on you to define the buckets yourself and follow the convention of using a "le" label to expose them. But the underlying representation (an empirical CDF) seems to be the same, and so the accuracy tradeoffs should also be the same.
Much different. When you are reporting histograms you can combine them and see the true p50 or whatever across all the individual systems reporting the metric.
Can you elaborate a bit? You can do the same in Prometheus by summing the bucket counts. Not sure what you mean by “true p50” either. With buckets it’s always an approximation based on the bucket widths.
Ah, I misunderstood what you meant. If you are reporting static buckets I get how that is better than what folks typically do but how do you know the buckets a priori? Others back their histograms with things like https://github.com/tdunning/t-digest. It is pretty powerful as the buckets are dynamic based on the data and histograms can be added together.
Yes. This. Also, displaying histograms in heatmap format can allow you to intuit the behavior of layered distributed systems, caches, etc. Relatedly, exemplars allowed tying related data to histogram buckets. For example, RPC traces could be tied to the latency bucket & time at which they complete, giving a natural means to tie metrics monitoring and tracing, so you can "go to the trace with the problem". This is described in the paper as well.
A lot of Google projects seem to rely on other Google projects. In this case Monarch relies on spanner.
I guess its nice to publish at least the conceptual design so that others can implement it in “rest of the world” case. Working with OSS can be painful, slow and time consuming so this seems like a reasonable middle ground (although selfishly I do wish all of this was source available).
Atomic clocks aren't that exotic, and a GPS disciplined ovenized quartz oscillator will do just fine outside of a disruption. The hard part is getting the right sampling semantics, requiring end to end error analysis.
MQL is an improvement over the internal language, IMO. There are some missing features around literal tables, but otherwise the language is more consistent and flexible.
I don't quite get the benefit of pull model by default either. A pull model by default means that it's not easy for a library to publish its metrics. For instance, every god damn application is expected to implement a `/metrics` endpoint for a freaking agent to publish the application's metrics to Prometheus. With Monarch, any library or application can simply publish metrics to Monarch's API. Similarly in Netflix, publishing to its Altas system is totally transparent to library authors, with the help of their metric library.
Sometimes I feel many open source systems do not give a shit about productivity.
The general theory is that if a push-based system is getting overloaded you drop metric submissions but if a pull-based system is overloaded it will query less frequently and you’ll just get less resolution.
Thanks. It's a valid thought on such trade-off. I do think we can still favor productivity without losing resolution, though, for the following two reasons:
1. A pull-based system can pull less when the system is overload, which means a pulled service needs to keep historical stats. For instance, the endpoint `/metric` needs to keep previous gauge values or the accumulated counters. That said, a push-based metric library can keep history too. Indeed, it is exactly what the micrometers library does.
2. Don't let the metric system overload. This sounds like a hyperbole, but it is what companies do in practice: telemetry system is so foundational and critical to an internet company that it should always run smoothly.
Interesting that Google replaced a pull based metric system similar to Prometheus with a push based system... I thought one of the selling points of Prometheus and the pull based dance was how scalable it was?
It's sort of a pull/push hybrid. The client connects to the collection system and is told how often to send each metric (or group of them) back over that same connection. You configure per target/metric collection policy centrally.
So, much like a Zabbix agent, with both active (push) & passive (pull) capabilities.
We're diving into OTEL, and the registration / discovery challenges don't seem to have any kind of best-practice consensus out there. We're looking at NodeRED (telegraf agent can query from same at startup) but it brings its own challenges.
I haven't read the full paper, but do you know if the push model was revisited mostly for auto-registration / discovery, or performance bottlenecks at the server, or some other concern?
Typically for us, once we've got the hard part - an entity registered - we're happy with pull only. A no-response from a prod end-point is an automatic critical. I guess at their scale there's more nuance around one or more agents being non-responsive.
EDIT: Oh, there's not much in the paper on the subject, as it happens. And yes, it's vanilla discovery woes.
"Push-based data collection improves system robustness while simplifying system architecture. Early versions of Monarch discovered monitored entities and “pulled” monitoring data by querying the monitored entity.
"This required setting up discovery services and proxies, complicating system architecture and negatively impacting overall scalability. Push-based collection, where entities simply send their data to Monarch, eliminates these dependencies."
If you can put these into the metrics system as metrics themselves, then join against them, that works. (IIRC handling all the various things that could be happening in a data center like drains, planned & unplanned downtime, etc., it's more complex than a simple join.)
It was originally push but i think they went back to sort of scheduled pull mode after a few years. There was a very in depth review doc written about this internally which maybe will get published some day
Pull collection eventually became a real scaling bottleneck for Monarch.
The way the "pull" collection worked was that there was an external process-discovery mechanism, which the leaf used to connect to the entities it was monitoring, the leaf backend processes would connect to the monitored entities to an endpoint that the collection library would listen on, and those entities collection libraries would stream the metric measurements according to the schedules that the leaves sent.
Several problems.
First, the leaf-side data structures and TCP connections become very expensive. If that leaf process is connecting to many many many thousands of monitored entities, TCP buffers aren't free, keep-alives aren't free, and a host of other data structures. Eventually this became an...interesting...fraction of the CPU and RAM on these leaf processes.
Second, this implies a service discovery mechanism so that the leaves can find the entities to monitor. This was a combination of code in Monarch and an external discovery service. This was a constant source of headaches an outages, as the appearance and disappearance of entities is really spiky and unpredictable. Any burp in operation of the discovery service could cause a monitoring outage as well. Relatedly, the technical "powers that be" decided that the particular discovery service, of which Monarch was the largest user, wasn't really something that was suitable for the infrastructure at scale. This decision was made largely independently of Monarch, but required Monarch to move off.
Third, Monarch does replication, up to three ways. In the pull-based system, it wasn't possible to guarantee that the measurement that each replica sees is the same measurement with the same microsecond timestamp. This was a huge data quality issue that made the distributed queries much harder to make correct and performant. Also, the clients had to pay both in persistent TCP connections on their side and in RAM, state machines, etc., for this replication as a connection would be made from each backend leaf processes holding a replica for a given client.
Fourth, persistent TCP connections and load balancers don't really play well together.
Fifth, not everyone wants to accept incoming connections in their binary.
Sixth, if the leaf process doesn't need to know the collection policies for all the clients, those policies don't have to be distributed and updated to all of them. At scale this matters for both machine resources and reliability. This can be made a separate service, pushed to the "edge", etc.
Switching from a persistent connection to the clients pushing measurements in distinct RPCs as they were recorded eventually solved all of these problems. It was a very intricate transition that took a long time. A lot of people worked very hard on this, and should be very proud of their work. I hope some of them jump in to the discussion! (At very least they'll add things I missed/didn't remember... ;^)
Thanks George, and apologies for missing this comment on my first scan through this page. Your Youtube talk is lined up for viewing later today.
We're using prom + cortex/mimir. With ~30-60k hosts + at least that figure again for other endpoints (k8s, snmp, etc), so we can get away with semi-manual sharding (os, geo, env, etc). We're happy with 1m polling, which is still maybe 50 packets per query, but no persistent conns held open to agents.
I'm guessing your TCP issues were exacerbated by a much high polling frequency requirement? You come back to persistent connections a lot, so this sounds like a bespoke agent, and/or the problem was not (mostly) a connection establish/tear-down performance issue?
The external discovery service - I assume an in-house, and now long disappeared and not well publicly described system? ;) We're looking at NodeRED to fill that gap, so it also becomes a critical component, but the absence only bites at agent restart. We're pondering wrapping some code around the agents to be smarter about dealing with a non-responsive config service. (During a major incident we have to assume a lot of things will be absent and/or restarting.)
The concerns around incoming conns to their apps, it sounds like those same teams you were dealing with ended up having to instrument their code with something from you anyway -- was it the DoS risk they were concerned about?
It was more that they would rather send Monarch an RPC than be connected to. Not everyone wants e.g. an HTTP server in their process. For example maybe they are security sensitive, or have a limited memory envelope, or other reasons.
Again, a highly envious feature of a large organisation with almost exclusively bespoke applications that can port & integrate custom libraries directly into applications. Us little people have to contend with mostly black box applications, or occasionally native instrumented with at best a prom-alike endpoint.
Amusingly in the pre-web 1990's, at Telstra (Australia telco) we also developed & implemented a custom performance monitoring library that was integrated into in-house applications.
The rest of us can only hope Opentelemetry becomes more widely adopted. They have put in a lot of effort in decoupling the application instrumentation from the monitoring solution, to allow more rich instrumentation than just a prom-alike endpoint.
Re-reading my comment, it may have been worded badly.
The phrase 'you aren't Google' is true for 99.9% of us. We all get to fix the problems in front of us, was my point. And at that scale you've got unique problems, but also an architecture, imperative, and most importantly an ethos that lets you solve them in this fashion.
I was more reflecting on the (actually pretty fine) tools available to SREs caring for off-the-shelf OS's and products, and a little on the whole 'we keep coming full circle' thing.
The system design choice was to make data visible to queries as soon as possible after being pushed to Monarch, to satisfy alerting guarantees.
Thus there was no queue like a pubsub or Kafka in front of Monarch.
At scale this required a "smoothness of flow". What I mean by this is that at the scale the system was operating the extent and shape of the latency long tail began to matter. If there are many many many many thousands of RPCs flowing through servers in the intermediate routing layers, any pauses at that layer or at the leaf layer below that extended even a few seconds could cause queueing problems at the routing layer that could impact flows to leaf instances that were not delayed. This would impact quality of collection.
Even something as simple as updating a range map table at the routing layer had to be done carefully to avoid contention during the update so as to not disturb the flow, which in practice could mean updating two copies of the data structure in a manner analogous to a blue green deployment.
At the leaf backends this required decoupling--to make eventual--many ancillary data structure updates for data structures that were consulted in the ingest path, and to eventually get to the point where queries and ingest shared no locks.
https://prometheus.io/docs/introduction/faq/#why-do-you-pull... list a few reasons and also end with a note that it probably doesn't matter in the end. Personally, for smaller deployments, i like it because it gives you an easy overview of what should be running, otherwise you need to maintain this list elsewhere anyway, though today with all the auto-scaling around, the concept of "up" is getting more fuzzy.
On top of that there is also less risk that herd of misbehaving clients DoS the monitoring system, usually moments when you need such system the most. This of course wouldn't be a problem with a more scalable solution that distributes ingestion from querying, like the Monarch.
This. Any new large query or aggregation in the Borgmon/Prometheus model requires re-solving federation, and continuing maintenance of runtime configuration. That might technically be scalable in that you could do it but you have to maintain it, and pay the labor cost. It's not practical over a certain size or system complexity. It's also friction. You can only do the queries you can afford to set up.
That's why Google spent all that money to build Monarch. At the end of the day Monarch is vastly cheaper in person time and resources than manually-configured Borgmon/Prometheus. And there is much less friction in trying new queries, etc.
100% Prometheus compatible, proven to scale to 1B active series.
It's not about comparisons, every tool has it's own place and feature set that may be right for you depending on what you're doing. But if you've reached the end of the road with Prometheus due to scale and you need massive scale and perfect compatibility... Then Mimir stands out.
You can set up dist eval similar to how it was done in borgmon but you gotta do it manually (or maybe write an operator to automate). One of Monarchs core ideas is to do that behind the scenes for you
This still requires the query to fit on a single node at some point unless you're doing multi-level aggregation. Monarch does that. Being able to push expensive, highly-parallelizable aspects of the query down into the leaves is a massive benefit.
Yep, in theory it’s great if you can afford a team of highly qualified engs to run it like google and dealing with huge metrics. In practice I found it was not hard to crash a monarch mixer (at least some years ago) + most orgs won’t be able to afford nor take advantage of it
I don't understand how the properties you're describing imply that Prometheus isn't scalable.
High Availability always requires duplication of effort. Scaling queries always requires sharding and aggregation at some level.
I've deployed stock Prometheus at global scale, O(100k) targets, with great success. You have to understand and buy into Prometheus' architectural model, of course.
And I've seen a system that did that. We had to predict what global aggregates we would need. Looking at a metric required finding the right instance to connect to if it wasn't in the bubbleup. Picking the right expressions to avoid double counts was hard. Want to do something fancy? No luck because of the lack of distributed querying.
The ways in which you can scale Prometheus: you can scale anything.
It does not; itself, have highly scalable properties built in.
It does not do sharding, it does not do proxying, it does not do batching, it does not do anything that would allow it to run multiple servers and query over multiple servers.
Look. I’m not saying that it doesn’t work; but when I read about borgmon and Prometheus: I understood the design goal was intentionally not to solve these hard problems, and instead use them as primitive time series systems that can be deployed with a small footprint basically everywhere (and individually queried).
I submit to you, I could also have an influxdb in every server and get the same “scalability”.
Difference being that I can actually run a huge influxdb cluster with a dataset that exceeds the capabilities of a single machine.
It seems like you're asserting a very specific definition of scalability that excludes Prometheus' scalability model. Scalability is an abstract property of a system that can be achieved in many different ways. It doesn't require any specific model of sharding, batching, query replication, etc. Do you not agree?
Ignorance? I'm a core Prometheus contributor and a 20+ year distsys veteran. Prometheus is not a database, and scalability is not well-defined. These are not controversial statements.
It is extremely unbecoming to lie about who you are on this forum.
Scalability is defined differently depending on context; in this context (a monitoring/time series solution) it is defined as being able to hold a dataset larger than a single machine that scales horizontally.
Downsampling the data or transforming it does not meet that criteria, since that’s no longer the original data.
The way Prometheus “scales” today is a bolt-on passthrough with federation. It’s not designed for it at all, and means that your query will use other nodes as data sources until it runs out of ram evaluating the query. Or not.
The most common method of “scaling” Prometheus is making a tree; you can do that with anything (so it is not inherent to the technology, thus not a defining characteristic, if everything can be defined the same way then nothing can be- the term ceases to have meaning: https://valyala.medium.com/measuring-vertical-scalability-fo...)
I’ll tell you how influx scales: your data is horizontally sharded across nodes, queries are conducted cross shards.
That’s what scalability of the database layer is.
Not fetching data from other nodes and putting it together yourself.
Rehydrating from many datasets is not the storage system scaling: the collector layer doing the hydration is the thing that is scaling.
If you sold me a solution that used Prometheus underneath but was distributed across all nodes, perhaps we could talk.
I should add (and extremely frustratedly): if you’re not lying and you’re a core Prometheus maintainer, you should know this. I’m deeply embarrassed to be telling you this.
> in this context (a monitoring/time series solution) it is defined as being able to hold a dataset larger than a single machine that scales horizontally.
This just isn't true :shrug: Horizontal scaling is one of many strategies.
I think the disconnect is that promethus helps a user to shard things, but it's not automatic. Other time series databases and monitoring solutions automatically distribute and query across servers. It's like postgres vs newswl (aka foundationdb, spanner, etc.,).
While Prometheus supports sharding queries when a user sets it up, my understanding is that this has to be done manually, which is definitely less convenient. This is better than a hypothetical system that doesn't allow this at all, but still not the same as something that handles scaling magically.
Prometheus supports sharding queries the way a screwdriver supports turning multiple screws at once. You can design a system yourself that includes the screwdriver, which will turn all the screws, but there's nothing inherent to the screwdriver that helps you with this. If "scalability" just means "you can use it to design something new from scratch that scales" then the term is pretty meaningless.
You pretty quickly exceed what one instance can handle for memory, cpu or both. At that point you don't have any real good options to scale while maintaining a flat namespace (you need to partition).
This means each new query over a certain size becomes a federation problem, so the friction for trying new things becomes very high above the scale of a single instance.
Well you obviously don't issue metrics queries over arbitrarily large datasets, right? The Prometheus architecture reflects this invariant. You constrain queries against both time and domain boundaries.
Monarch can support both ad-hoc and periodic, standing queries of arbitrarily large size, and has the means to spread the computation out over many intermediate mixer and leaf nodes. It does query push-down so that the "expensive" parts of aggregations, joins, etc., can be done in massively parallel fashion at the leaf level.
It scales so well that many aggregations are set up and computed for every service across the whole company (CPU, memory usage, error rates, etc.). For basic monitoring you can run a new service in production and go and look at a basic dashboard for it without doing anything else to set up monitoring.
That's a choice Prometheus has made, not an invariant. Many scalable systems support arbitrarily large queries as they themselves scale. Prometheus pretty much prevents creating arbitrarily large datasets to begin with, so the point is kind of moot.
Requiring query authors to understand the arrangement of their aggregation layer seems like a reasonable idea but is in fact quite ridiculous.
To be fair, monarch has scaling limits that require you to be aware of how aggregation is done as well. It's amazing what it can do, but depending on how much data you have you might need to design your schema/collection with monarch's architecture in mind.
The pain of it is we’re all jumping on Prometheus (borgmon) without considering why Monarch exists. Monarch doesn’t have a good corollary outside of google.
Maybe some weird mix of timescale DB backed by cockroachdb with a Prometheus push gateway.
They should open source it like they did Kubernetes. Otherwise the world will (continue to) converge on the Prometheus model and Google will be left with this weird system that may be technically better but is unfamiliar to incoming engineers
The key trade off if low-dependency, with everything up to and including alert notification delivery pushed down to the region. For alerting queries to happen there is little infrastructure Monarch depends upon to keep running beyond networking and being scheduled on the machines.
If you think about Bigtable, a key observation that the Monarch team made very early on is that, if you can support good materialized views (implemented as periodic standing queries) written back to the memtable, and the memtable can hold the whole data set needed to drive alerting, this can work even if much of Google's infrastructure is having problems. It also allows Monarch to monitor systems like Bigtable, Colossus, etc., as it doesn't use them as serving dependencies for alerting or recent dashboard data.
It's a question of optimizing for graceful degradation in the presence of failure of the infrastructure around the monitoring system. The times the system will experience its heaviest and most unpredictable load will be when everyone's trying to figure out why their service isn't working.
What? Planet scale? Well. You can literally issue a query that fans out to every continent on Earth, and returns the result right to your dashboard. Not exaggerating. ;^) (OK maybe not Antarctica but I'm not sure...)
All of those query time histogram aggregations are making pretty subtle trade offs that make analysis fraught.