Thanks for the write up! In my current application I have a few different scenarios that are a bit different from yours but still require processing aggregated data in order
1. Reading from various files where each file has lines with a unique identifier I can use to process in order: I open all the files and create a min heap reading the first line of each, then process by grabbing the lowest from the min-heap repeatedly, after reading a line from a file, I read another and put it in the min-heap again (the min heap cells contain the opened file descriptor for that file)
2. Aggregating across goroutines that service data generators with different latencies and throughputs. I have a goroutine each that interfaces with them and consider them “producers”. Using a global atomic integer I can quickly assign a unique increasing index to the messages coming in, these can be serviced with a min-heap same as above. There are some considerations about dropping too old messages, so an alternative approach for some cases is to index the min-heap on received time and process only up to time.Now()-some buffering time to allow more time for things to settle before dropping things (trading total latency for this).
3. Similar to the above I have another scenario where throughput ingestion is more important and repeated processing happens in-order but there is no requirement on all messages to have been processed every time, just that they are processed in order (this is the backing for a log viewer). In this case I just slab allocate and dump what I receive without ordering concerns but I also keep a btree with the indexes that I iterate over when it’s time to process. I originally had this buffering like (2) to guarantee mostly ordered insertions in the slabs themselves (which I simply iterated on) but if a stall happened in a goroutine then shifting over the items in the slab when the old items came in became very expensive and could spiral badly.
Wow, that’s some seriously sophisticated stuff - it’s not that often you see a heap used in typical production code (outside of libraries)!
Your first example definitely gives me merge-sort vibes - a really clean way to keep things ordered across multiple sources. The second and third scenarios are a bit beyond what I’ve tackled so far, but super interesting to read about.
This also reminded me of a WIP PR I drafted for rill (probably too niche, so I’m not sure I’ll ever merge it). It implements a channel buffer that behaves like a heap - basically a fixed-size priority queue where re-prioritization only happens for items that pile up due to backpressure. Maybe some of that code could be useful for your future use cases: https://github.com/destel/rill/pull/50
Hah not sure about “production”, I am currently in between jobs and am taking advantage of that to work on a docker/k8s/file TUI log viewer.
I am using those techniques respectively for loading backups (I store each container log in a separate file inside a big zip file, which allows concurrent reading without unpacking) and for servicing the various log producing goroutines (which use the docker/k8s apis as well as fsnotify for files) since I allow creating “views” of containers that consequently need to aggregate in order. The TUI itself, using tview, runs in a separate goroutine at configurable fps reading from these buffers.
I have things mostly working, the latest significant refactoring was introducing the btree based reading after noticing the “fix the order” stalls were too bad, and I am planning to do a show hn when I’m finished. It has been a lot of fun going back to solo-dev greenfield stuff after many years of architecture focused work.
I definitely love golang but despite being careful and having access to great tools like rr and dlv in goland, it can get difficult sometimes to debug deadlocks sometimes especially when mixing channels and locks. I have found this library quite useful to chase down deadlocks in some scenarios https://github.com/sasha-s/go-deadlock
1. Reading from various files where each file has lines with a unique identifier I can use to process in order: I open all the files and create a min heap reading the first line of each, then process by grabbing the lowest from the min-heap repeatedly, after reading a line from a file, I read another and put it in the min-heap again (the min heap cells contain the opened file descriptor for that file)
2. Aggregating across goroutines that service data generators with different latencies and throughputs. I have a goroutine each that interfaces with them and consider them “producers”. Using a global atomic integer I can quickly assign a unique increasing index to the messages coming in, these can be serviced with a min-heap same as above. There are some considerations about dropping too old messages, so an alternative approach for some cases is to index the min-heap on received time and process only up to time.Now()-some buffering time to allow more time for things to settle before dropping things (trading total latency for this).
3. Similar to the above I have another scenario where throughput ingestion is more important and repeated processing happens in-order but there is no requirement on all messages to have been processed every time, just that they are processed in order (this is the backing for a log viewer). In this case I just slab allocate and dump what I receive without ordering concerns but I also keep a btree with the indexes that I iterate over when it’s time to process. I originally had this buffering like (2) to guarantee mostly ordered insertions in the slabs themselves (which I simply iterated on) but if a stall happened in a goroutine then shifting over the items in the slab when the old items came in became very expensive and could spiral badly.