PgQueuer is a minimalist, high-performance job queue library for Python, leveraging the robustness of PostgreSQL. Designed for simplicity and efficiency, PgQueuer uses PostgreSQL's LISTEN/NOTIFY to manage job queues effortlessly.
Does the celery SQLAlchemy broker support PostgreSQL's LISTEN/NOTIFY features?
Similar support in SQLite would simplify testing applications built with celery.
How to add table event messages to SQLite so that the SQLite broker has the same features as AMQP? Could a vtable facade send messages on tablet events?
We use listen notify extensively and it is great. The things it lacks most for us is guaranteed single recipient. All subscribers get all notifications which leads to problems in determining who should act on the message n our case.
Thats exactly what we do but taking a lock takes 1 RTT to the database which means about 100ms. it limits the number of events receivers can handle. IF you have too many events, receivers will be just trying to take a lock most of the time.
Of my head, you could attach a uuid or sequence number to the emitted event. Then based on the uuid or sequence you can let one or the other event consumer pick?
Ex. you have two consumers, if the sequence number is odd, A picks it, if its even B picks.
Great observation, I wrote a little wrongly. What we want ideally is guaranteed delivery to one random free worker. Uuid strategy is better than locking but this could mean that if one worker gets a longer job, all the others on this worker are delayed even if another worker is free.
How does LISTEN/NOTIFY compare to using select for update skip locked? I thought listen/notify can lose queue items when the process crashes? Is that true? Do you need to code for those cases in some manner?
LISTEN/NOTIFY and SELECT FOR UPDATE SKIP LOCKED serve different purposes in PgQueuer. LISTEN/NOTIFY notifies consumers about changes in the queue table, prompting them to check for new jobs. This method doesn’t inherently lose messages if a process crashes, because it simply triggers a check rather than transmitting data. The actual job handling and locking are managed by SELECT FOR UPDATE SKIP LOCKED, which safely processes each job even when multiple workers are involved.
In some systems, those are, effectively, the same. A consumer listens, and the signal is the message. If the consumer process crashes, the message returns to the queue and gets processed when the consumer comes back online.
If the signal and messaging are separated, as in Postgres, where LISTEN/NOTIFY is the signal, and the skip locked query is the message pull, the consumer process would need to do some combination of polling and listening.
In the consumer, that could essentially be a loop that’s just doing the skip locked query on startup, then dropping into a LISTEN query only once there are no messages present in the queue. Then the LISTEN/NOTIFY is just signaling to tell the consumer to check for new messages.
I think the usage of listen/notify is just a mechanism to save you from querying the database every X seconds looking for new tasks (polling). That has some drawbacks, because if the timeout is too small, you are making too much queries that usually may not return any new tasks, and if it's too big, then you may start processing the task long after it was submitted.
This way, it just notifies you that new tasks are ready so you can query the database.
I am going to go the other direction on this... to anyone reading this, please consider using a backend-generic queueing system for your Python project.
Why? Mainly because those systems offer good affordances for testing and running locally in an operationally simple way. They also tend to have decent default answers for various futzy questions around disconnects at various parts of the workflow.
We all know Celery is a buggy pain in the butt, but rolling your own job queue likely ends up with you just writing a similary-buggy pain in the butt. We've already done "Celery but simpler", it's stuff like Dramatiq!
If you have backend-specific needs, you won't listen to this advice. But think deeply how important your needs are. Computers are fast, and you can deal with a lot of events with most systems.
Meanwhile if you use a backend-generic system... well you could write a backend using PgQueuer!
> those systems offer good affordances for testing and running locally in an operationally simple way
Define "operationally simple", most if not all of them need persistent anyway, on top of the queue itself. This eliminates the queue and uses a persistent you likely already have.
Well for example, lots of queueing libraries have an "eager task" runtime option. What does that do? Instead of putting work into a backend queue, it just immediately runs the task in-process. You don't need any processing queue!
How many times have you shipped some background task change, only to realize half your test suite doesn't do anything with background tasks, and you're not testing your business logic to the logical conclusion? Eager task execution catches bugs earlier on, and is close enough to the reality for things that matter, while removing the need for, say, multi-process cordination in most tests.
And you can still test things the "real way" if you need to!
And to your other point: you can use Dramatiq with Postgres, for example[0]. I've written custom backends that just use pg for these libs, it's usually straightforward because the broker classes tend to abstract the gnarly things.
Some message queue brokers that traditionally implement their own backends can also use Postgresql (and other RDBMSs) for persistence. This is a reasonable option if you a.) want to consolidate persistence backends b.) want a mature, battle proven broker and client stack.
Ah that's unfortunate, I had a pretty OK time with Dramatiq (coming off of Celery especially). But I imagine this is dependent on your scale.
I think most people reading this site are working on relatively small systems (that still need background tasks!) and the fixed costs of background tasks can be reasonable. But I could be totally offbase
PgQueuer seems pretty similar to Hatchet - both use Postgres, require their own worker processes, both support async. Hatchet seems to be a lot more powerful though: cron, DAG workflows, retries, timeouts, and a UI dashboard.
https://github.com/tktech/chancy is a very early-stage pet project to work around specific issues I keep running into with Celery that has all of this, including a simple plugin-extendable UI with DAG workflow visualization.
Already done. See: PostgREST. Want to use PostgreSQL (or most other RDBMSs) as the backend for an actively developed, multiprotocol, multiplatform, open source, battle proven message broker that also provides a messaging REST API of its own? Use ActiveMQ (either flavor) and configure a JDBC backend. Done.
Hangfire with few plugins can be an absolute godsent in 99% of situations i've encountered. The one downside is the documentation is very very lacking, and you have to google a lot until you get to a good place. Despite that, i've used, use, and will continue to use Hangfire, as it's a great tool!
But can you make a decent job queue with anything that stores data? Not easily. E.g. you need atomicity if multiple consumers can take jobs, and I think you need CAS for that, not just any storage will do, right?
You probably need ACI and also D if you want your jobs to persist.
I haven’t used Graphile Worker since I’m not familiar with JavaScript. PgQueuer is tailored for Python with PostgreSQL environments. I’d be interested to hear about Graphile Worker’s features and how they might inspire improvements to PgQueuer.
I’ve been thinking about the potential for PostgreSQL-backed job queue libraries to share a common schema. For instance, I’m a big fan of Oban in Elixir: https://github.com/sorentwo/oban
Given that there are many Sidekiq-compatible libraries across various languages, it might be beneficial to have a similar approach for PostgreSQL-based job queues. This could allow for job processing in different languages while maintaining compatibility.
Alternatively, we could consider developing a core job queue library in Rust, with language-specific bindings. This would provide a robust, cross-language solution while leveraging the performance and safety benefits of Rust.
If you want a generic queue that can be consumed in any runtime, you can just build it directly into postgres via extensions like https://github.com/tembo-io/pgmq.
Also, pgmq can run as a TLE (trusted language extension), so you can install it into cloud hosted Postgres solutions like Supabase. We're using pgmq, and it's solid so far.
What I like about https://github.com/tembo-io/pgmq is that it can be used with any programming language and does not require any background workers or running a binary in addition to postgres.
It is based on SQLite, but it’s written in a modular way. It would be easy to add Postgres as a backend (in fact, it might “just work” if I switch the ORM connection string.)
Not today. It's a work in progress! There are several iterations that I'm working on:
1. Primary with secondaries as replicas (replication for availability)
2. Sharding across multiple nodes (sharding for horizontal scaling)
3. Sharding with replication
However, those aren't ready yet. The easiest way to implement this would probably be to use Postgres as the backing storage for the queue, which means relying on Postgres' multiple node support. Then the queue server itself could also scale up and down independently.
Working on the docs! I'd love your feedback - what makes them seem unfinished? (What would you want to see that would make them feel more complete?)
It does not implement immediate notification of new submissions because the SQS protocol doesn't have a "push" mechanism, only pull.
The software, however, could support this for a different queue protocol. This is because SQLite is just used as a disk store for queue items. The golang code itself still processes each message before writing to disk. Since that code is "aware" of incoming messages, it could implement an immediate notification mechanism if there was a protocol that supported it.
River ( https://riverqueue.com ) is a Postgres background job engine written in Go, which also has insert only clients in other languages. Currently we have these for Ruby and Python:
This is the exact use case I'm running into right now. I've been looking at BullMQ some it has good typescript support, and is working towards a 1.0 for python. But, I have tried it out in a production stack yet
We have been using bullmq in production for just over a year. It is a piece of technology that our team doesn't have to think about, which is pretty much all I could ask for.
We did end up adding some additional generics which allows us to get strong typing between producers and consumers. That I think has been a key piece of making it easy to use and avoiding dumb mistakes.
Qless "solves" this problem (in redis) by having all core logic written as lua and executed in redis.
You could take a similar approach for pg: define a series of procedures that provide all the required functionality, and then language bindings are all just thin wrappers (to handle language native stuff) around calls to execute a given procedure with the correct arguments.
A common schema is one nice thing, but imho the win of these db backed queues is being able to do things, including enqueue background jobs in a single transaction. e.g. create user, enqueue welcome email - both get done, or not - with redid-based, this is ... not usually a thing; if you fail to do one, it's left half done, leading to more code etc
p.s. I maintain a ruby equivalent called QueueClassic
This looks like a great task queue, I'm a massive proponent of "Postgres is all you need" [0] and doubling down on it with my project that takes it to the extreme.
What I would love is a Postgres task queue that does multi-step pipelines, with fan out and accumulation. In my view a structured relational database is a particularly good backend for that as it inherently can model the structure. Is that something you have considered exploring?
The one thing with listen/notify that I find lacking is the max payload size of 8k, it somewhat limits its capability without having to start saving stuff to tables. What I would really like is a streaming table, with a schema and all the rich type support... maybe one day.
Update-related throughput and index problems are only a problem if you update tables. You can use an append-only structure to mitigate some of that: insert new entries with the updated statuses instead. You gain the benefit of history also. You can even coax the index into holding non-key values for speed with INCLUDE to CREATE INDEX.
You can then delete the older rows when needed or as required.
Query planner issues are a general problem in postgres and is not unique to this problem. Not sure what O(1) means in this context. I am not sure pg has ever been able to promise constant-time access to anything; indeed, with an index, it'd never be asymptotically upper bounded as constant time at all?
By the time you need append-only job statuses it's better to move to a dedicated queue. Append-only statuses help but they also make the polling query a lot more expensive.
Deleting older rows is a nightmare at scale. It leaves holes in the earlier parts of the table and nerfs half the advantage of using append-only in the first place. You end up paying 8kb page IO costs for a single job.
Dedicated queues have constant time operations for enqueue and dequeue which don't blow up at random times.
It's far from trivial. Autoanalyze doesn't work on partitioned tables, only on the partitions themselves. Partitioning a busy job queue table is a nightmare in itself.
Indeed, that's my experience too. We used partitions like others mentioned below, but Postgres had issues with moving rows across tables atomically and had to implement our custom complex queries to overcome it. Plus job expiration was dynamic and had to use background cleaning. The bigger problem was with the planner not able to pick up sudden changes in volume and had to use a cron to run analyze on it. Managing retries with backoffs, etc.. At some point we stopped fighting it and just moved to SQS, we have zero problems since, no maintenence needed, and it's still free so we saved storage cost, time and developer effort for ongoing maintenance.
We still use Postgres for simple queues, but those don't really require a library as it's quite simple usually, with some advisory locks we can handle the crashed job unlocking fairly well too.
asyncpg clears out any listeners you have setup once a connection is returned to pool. This will lead to 'missed' events. I guess its something of the same story with psycopg?
If event(s) any jobs will be picked up by the next event or by a timer that checks every 30 seconds or so (can be set by the dev.)
Regarding multi-step pipelines and fan-out capabilities: It's a great suggestion, and while PgQueuer doesn't currently support this, it's something I'm considering for future updates.
As for the LISTEN/NOTIFY payload limit, PgQueuer uses these signals just to indicate changes in the queue table, avoiding the size constraint by not transmitting substantial data through this channel.
I won´t call it "hate", but I've ran into quite some situations where the Postgres version caused a lot of pain.
- When it wasn't as easy as a dedicated solution: where installing and managing a focused service is overall easier than shoehorning it into PG.
- when it didn't perform anywhere close to a dedicated solution: overhead from the guarantees that PG makes (acid and all that) when you don't need them. Or where the relational architecture isn't suited for this type of data: e.g. hierarchical, time-series, etc.
- when it's not as feature complete as a dedicated service: for example I am quite sure one can build (parts of) an ActiveDirectory or Kafka Bus, entirely in PG. But it will lack features that in future you'll likely need - they are built into these dedicated solutions because they are often needed after all.
Just to share an anecdote, we've been able to get to market faster than ever before by just using Postgres (Supabase) for basically everything. We're leveraging RLS, and it's saved us from building an API (just using PostgREST). We've knocked months off our project timeline by doing this.
Is multi-step (fan out, etc) typically something a queue or message bus would handle?
I’ve always handled this with an orchestrator solution like (think Airflow and similar).
Or is this a matter of use case? Like for a real-time scenario where you need a series of things to happen (user registration, etc) maybe a queue handling this makes sense? Whereas with longer running tasks (ETL pipelines, etc) the orchestrator is beneficial?
Glancing at it briefly, I like the Workflows feature. I'm a long time Sidekiq user (Ruby), and while you can construct workflows pretty easily (especially using nested batches and callbacks in the Pro version), there really isn't a dedicated UI for visualizing them.
I’ve been using river for some low volume stuff. I love that I can add a job to the queue in the same db transaction that handle the synchronous changes.
Procrastinate also uses PostgreSQL's LISTEN/NOTIFY (but can optionally be turned off and use polling). It also supports many features (and more are planned), like sync and async jobs (it uses asyncio under the hood), periodic tasks, retries, task locks, priorities, job cancellation/aborting, Django integration (optional).
DISCLAIMER: I am a co-maintainer of Procrastinate.
I’m using Procrastinate in several projects. Would definitely like to see a comparison.
What I personally love about Procrastinate is async, locks, delayed and scheduled jobs, queue specific workers (allowing to factor the backend in various ways). All this with a simple codebase and schema.
I really like the emergence of simple queuing tools for robust database management systems. Keep things simple and remove infrastructure complexity. Definitely a +1 from me!
For handling straightforward asynchronous tasks like sending opt-in emails, we've developed a similar library at All Quiet for C# and MongoDB: https://allquiet.app/open-source/mongo-queueing
In this context:
LISTEN/NOTIFY in PostgreSQL is comparable to MongoDB's change streams.
SELECT FOR UPDATE SKIP LOCKED in PostgreSQL can be likened to MongoDB's atomic read/update operations.
I think for me the problem with every single new PG queue is that it seems like everyone and their mother thinks they need to reinvent this specific wheel for some reason and the flavor of the day doesn’t often bring much new to the space. Probably because it's
1. Pretty easy to understand and grok the problem space
2. Scratching the programmer itch of wanting something super generic that you can reuse all over the place
3. Doable with a modest effort over a reasonable scope of time
4. Built on rock solid internals (Postgres) with specific guarantees that you can lean on
Probably could easily find more by searching, I only spent about 5 minutes looking and grabbing the first ones I found.
I'm all for doing this kind of thing as an academic exercise, because it's a great way to learn about this problem space. But at this point if you're reinventing the Postgres job queue wheel and sharing it to this technical audience you need to probably also include why your wheel is particularly interesting if you want to grab my attention.
At low-medium scale, this will be fine. Even at higher scale, so long as you monitor autovacuum performance on the queue table.
At some point it may become practical to bring a dedicated queue system into the stack, sure, but this can massively simplify things when you don’t need or want the additional complexity.
And it's guaranteed that both the row and job for Elasticsearch update are inserted.
If you use a dedicated queue system them this becomes a lot more tricky:
begin;
insert_row();
schedule_job_for_elasticsearch();
commit; // Can fail, and then we have a ES job but no SQL row.
begin;
insert_row();
commit;
schedule_job_for_elasticsearch(); // Can fail, and then we have a SQL row and no job.
There are of course also situations where this doesn't apply, but this "insert row(s) in SQL and then queue job to do more with that" is a fairly common use case for queues, and in those cases this is a great choice.
Transactional Outbox solves this. You use a table like in the first example but instead of actually doing the ElasticSearch update the Outbox table is piped into the dedicated queue.
Most of these two phase problems can be solved by having separate queue consumers.
And as far as I can tell, this is only a perk when your two actions are mutate the collocated database and do X. For all other situations this seems like a downgrade.
I agree, there is no need for FANG level infrastructure. Imo. in most cases, the simplicity / performance tradeoff for small/medium is worth it. There is also a statistics tooling that helps you monitor throughput and failure rats (aggregated on a per second basis)
Instead of SQS, I recently created a basic abstraction on PG that mimics the SQS apis. The intention was to use it during development and we would simply switch to SQS later.
Never did. The production code still uses PG based queue (which has been improved since) and pg just works perfectly fine. Might still need to go with a dedicated queue service at some point but it has been perfectly fine so far.
I mean I love postgres like the next guy. And I like simple solutions as long as they work.
I just wonder if this is truly simpler than using a redis or rabbitmq queue if you need Queues. If you're already using a cloud provider sqs is quite trivial as well.
I guess if you already have postgres and don't want to use the cloud provider's solution. You can use this to avoid hosting another piece of infra.
db-based gives you the ability to query against your queues, if you use case needs it. Other options tend to dispose the state once the job is finished.
I've done this with mysql. Never do one at a time if you have jobs per minute over 30. It won't scale. Instead have the job dispatcher reserve 100 at a time and then fire that off to a subprocess which will subsequently fire off a process for each job. A three layer approach makes it much easier to build out multiserver. Or if you don't want the headache just use SQS which is pretty much free under 1 million jobs.
Similar support in SQLite would simplify testing applications built with celery.
How to add table event messages to SQLite so that the SQLite broker has the same features as AMQP? Could a vtable facade send messages on tablet events?
Are there sqlite Triggers?
Celery > Backends and Brokers: https://docs.celeryq.dev/en/stable/getting-started/backends-...
/? sqlalchemy listen notify: https://www.google.com/search?q=sqlalchemy+listen+notify :
asyncpg.Connection.add_listener
sqlalchemy.event.listen, @listen_for
psychopg2 conn.poll(), while connection.notifies
psychopg2 > docs > advanced > Advanced notifications: https://www.psycopg.org/docs/advanced.html#asynchronous-noti...
PgQueuer.db, PgQueuer.listeners.add_listener; asyncpg add_listener: https://github.com/janbjorge/PgQueuer/blob/main/src/PgQueuer...
asyncpg/tests/test_listeners.py: https://github.com/MagicStack/asyncpg/blob/master/tests/test...
/? sqlite LISTEN NOTIFY: https://www.google.com/search?q=sqlite+listen+notify
sqlite3 update_hook: https://www.sqlite.org/c3ref/update_hook.html