Task Queues

Task queues manage background work that must be executed outside the usual HTTP request-response cycle.

Why are task queues necessary?

Tasks are handled asynchronously either because they are not initiated by an HTTP request or because they are long-running jobs that would dramatically reduce the performance of an HTTP response.

For example, a web application could poll the GitHub API every 10 minutes to collect the names of the top 100 starred repositories. A task queue would handle invoking code to call the GitHub API, process the results and store them in a persistent database for later use.

Another example is when a database query would take too long during the HTTP request-response cycle. The query could be performed in the background on a fixed interval with the results stored in the database. When an HTTP request comes in that needs those results a query would simply fetch the precalculated result instead of re-executing the longer query. This precalculation scenario is a form of caching enabled by task queues.

Other types of jobs for task queues include

spreading out large numbers of independent database inserts over time instead of inserting everything at once

aggregating collected data values on a fixed interval, such as every 15 minutes

scheduling periodic jobs such as batch processes

Task queue projects

The defacto standard Python task queue is Celery . The other task queue projects that arise tend to come from the perspective that Celery is overly complicated for simple use cases. My recommendation is to put the effort into Celery's reasonable learning curve as it is worth the time it takes to understand how to use the project.

The Celery distributed task queue is the most commonly used Python library for handling asynchronous tasks and scheduling.

The RQ (Redis Queue) is a simple Python library for queueing jobs and processing them in the background with workers. RQ is backed by Redis and is designed to have a low barrier to entry.

Taskmaster is a lightweight simple distributed queue for handling large volumes of one-off tasks.

Huey is a Redis-based task queue that aims to provide a simple, yet flexible framework for executing tasks. Huey supports task scheduling, crontab-like repeating tasks, result storage and automatic retry in the event of failure.

Kuyruk is simple and easy to use task queue system built on top of RabbitMQ. Although feature set is small, new features can be added by extensions.

Dramatiq is a fast and reliable alternative to Celery. It supports RabbitMQ and Redis as message brokers.

django-carrot is a simple task queue specifically for Django that can serve when Celery is overkill.

tasq is a brokerless task queue for simple use cases. It is not recommended for production unless further testing and development is done.

Hosted message and task queue services

Task queue third party services aim to solve the complexity issues that arise when scaling out a large deployment of distributed task queues.

Iron.io is a distributed messaging service platform that works with many types of task queues such as Celery. It also is built to work with other IaaS and PaaS environments such as Amazon Web Services and Heroku.

Amazon Simple Queue Service (SQS) is a set of five APIs for creating, sending, receiving, modifying and deleting messages.

CloudAMQP is at its core managed servers with RabbitMQ installed and configured. This service is an option if you are using RabbitMQ and do not want to maintain RabbitMQ installations on your own servers.

Open source examples that use task queues

flask-celery-example is a simple Flask application with Celery as a task queue and Redis as the broker.

django_dramatiq_example and flask_dramatiq_example are simple apps that demo how you can use Dramatiq with Django and Flask, respectively.

Task queue resources

International Space Station notifications with Python and Redis Queue (RQ) shows how to combine the RQ task queue library with Flask to send text message notifications every time a condition is met - in this blog post's case that the ISS is currently flying over your location on Earth.

Evaluating persistent, replicated message queues is a detailed comparison of Amazon SQS, MongoDB, RabbitMQ, HornetQ and Kafka's designs and performance.

Why Task Queues is a presentation for what task queues are and why they are needed.

Asynchronous Processing in Web Applications Part One and Part Two are great reads for understanding the difference between a task queue and why you shouldn't use your database as one.

Flask by Example Implementing a Redis Task Queue provides a detailed walkthrough of setting up workers to use RQ with Redis.

Heroku has a clear walkthrough for using RQ for background tasks .

How to use Celery with RabbitMQ is a detailed walkthrough for using these tools on an Ubuntu VPS.

Celery - Best Practices explains things you should not do with Celery and shows some underused features for making task queues easier to work with.

Celery in Production on the Caktus Group blog contains good practices from their experience using Celery with RabbitMQ, monitoring tools and other aspects not often discussed in existing documentation.

A 4 Minute Intro to Celery is a short introductory task queue screencast.

This Celery tasks checklist has some nice tips and resources for using Celery in your applications.

Heroku wrote about how to secure Celery when tasks are otherwise sent over unencrypted networks.

Miguel Grinberg wrote a nice post on using the task queue Celery with Flask . He gives an overview of Celery followed by specific code to set up the task queue and integrate it with Flask.

Ditching the Task Queue for Gevent explains how in some cases you can replace the complexity of a task queue with concurrency. For example, you can remove Celery in favor of gevent .

3 Gotchas for Working with Celery are things to keep in mind when you're new to the Celery task queue implementation.

Setting up an asynchronous task queue for Django using Celery and Redis is a straightforward tutorial for setting up the Celery task queue for Django web applications using the Redis broker on the back end.

Asynchronous Tasks with Flask and Redis Queue looks at how to configure Redis Queue to handle long-running tasks in a Flask app.

Developing an Asynchronous Task Queue in Python looks at how to implement several asynchronous task queues using Python's multiprocessing library and Redis.

Task queue learning checklist

Pick a slow function in your project that is called during an HTTP request.

Determine if you can precompute the results on a fixed interval instead of during the HTTP request. If so, create a separate function you can call from elsewhere then store the precomputed value in the database.

Read the Celery documentation and the links in the resources section below to understand how the project works.

Install a message broker such as RabbitMQ or Redis and then add Celery to your project. Configure Celery to work with the installed message broker.

Use Celery to invoke the function from step one on a regular basis.

Have the HTTP request function use the precomputed value instead of the slow running code it originally relied upon.

What's next to learn after task queues?

How do I log errors that occur in my application?

I want to learn more about app users via web analytics.

What tools exist for monitoring a deployed web app?

Table of Contents

Full stack python.

Task Queues

A list of message brokers and task queue libraries spanning many programming languages and implementations., message brokers distribute messages from producers to consumers., https://aws.amazon.com/sqs/.

Amazon Simple Queue Service (SQS) is a fully managed message queuing service that makes it easy to decouple and scale microservices, distributed systems, and serverless applications. Building applications from individual components that each perform a discrete function improves scalability and reliability, and is best practice design for modern applications.

Apache ActiveMQ

Http://activemq.apache.org/.

Apache ActiveMQ™ is the most popular open source, multi-protocol, Java-based messaging server.

Apache Kafka

Http://kafka.apache.org/.

Kafka® is used for building real-time data pipelines and streaming apps. It is horizontally scalable, fault-tolerant, wicked fast, and runs in production in thousands of companies.

Apache Pulsar

Http://pulsar.incubator.apache.org/.

Apache Pulsar is an open-source distributed pub-sub messaging system originally created at Yahoo and now part of the Apache Software Foundation.

Apache Qpid

Http://qpid.apache.org/.

Apache Qpid™ makes messaging tools that speak AMQP and support many languages and platforms.

Apache RocketMQ

Https://rocketmq.apache.org/.

Apache RocketMQ™ is an open source distributed messaging and streaming data platform.

Azure Service Bus

Https://azure.microsoft.com/en-us/services/service-bus/.

Depend on Azure Service Bus when you need highly-reliable cloud messaging service between applications and services, even when one or more is offline.

http://kr.github.io/beanstalkd/

Beanstalk is a simple, fast work queue. Its interface is generic, but was originally designed for reducing the latency of page views in high-volume web applications by running time-consuming tasks asynchronously.

Bedrock::Jobs

Http://bedrockdb.com/jobs.html.

Bedrock::Jobs is a plugin to the Bedrock data foundation that manages a scheduled job queue.

http://contribsys.com/faktory/

At a high level, Faktory is a work server. It is the repository for background jobs within your application. Jobs have a type and a set of arguments and are placed into queues for workers to fetch and execute.

http://gearman.org/

Gearman provides a generic application framework to farm out work to other machines or processes that are better suited to do the work. It allows you to do work in parallel, to load balance processing, and to call functions between languages. It can be used in a variety of applications, from high-availability web sites to the transport of database replication events. In other words, it is the nervous system for how distributed processing communicates.

Google Cloud PubSub

Https://cloud.google.com/pubsub/docs/.

Google Cloud Pub/Sub is a fully-managed real-time messaging service that allows you to send and receive messages between independent applications.

https://www.inngest.com

Inngest is an open source event-driven queue in which serverless functions are triggered by HTTP events. Inngest does not require one to configure queues up front and instead, systems can just send events to start using in minutes. Inngest aims to be SDK-less and use standard libraries and interfaces to provide a better developer experience. Inngest can be self-hosted or one can use Inngest's cloud platform offering.

http://www.iron.io/mq

MQ provides a reliable way to communicate between services and components. Highly available, persistent by design, with best-effort one-time delivery, MQ is the most industrial strength, cloud-native solution for modern application architecture.

https://mosquitto.org/

Eclipse Mosquitto™ is an open source (EPL/EDL licensed) message broker that implements the MQTT protocol versions 3.1 and 3.1.1. MQTT provides a lightweight method of carrying out messaging using a publish/subscribe model. This makes it suitable for Internet of Things messaging such as with low power sensors or mobile devices such as phones, embedded computers or microcontrollers like the Arduino.

https://nats.io/

NATS is an open source, lightweight, high-performance cloud native infrastructure messaging system. It implements a highly scalable and elegant publish-subscribe (pub/sub) distribution model. The performant nature of NATS make it an ideal base for building modern, reliable, scalable cloud native distributed systems.

http://nsq.io/

NSQ is a realtime distributed messaging platform designed to operate at scale, handling billions of messages per day. It promotes distributed and decentralized topologies without single points of failure, enabling fault tolerance and high availability coupled with a reliable message delivery guarantee.

https://www.postgresql.org/

While PostgreSQL is not a message queue per se, it does support facilities for implementing queues via the "SELECT ... FOR UPDATE SKIP LOCKED" clause and the notification subsystem. Postgres-based message queues can be very convenient if you're already using Postgres as your database and you don't need too much throughput. Another advantage to Postgres is you can enqueue jobs transactionally with other database operations, causing related operations to be rolled back if a job can't be enqueued.

http://rabbitmq.com/

RabbitMQ is lightweight and easy to deploy on premises and in the cloud. It supports multiple messaging protocols. RabbitMQ can be deployed in distributed and federated configurations to meet high-scale, high-availability requirements.

https://redpanda.com

Redpanda is a Kafka®-compatible streaming data platform that is up to 10x faster and 6x more hardware-efficient. It is also JVM-free, ZooKeeper®-free, Jepsen-tested and source available.

https://redis.io

Redis is an open source (BSD licensed), in-memory data structure store, used as a database, cache and message broker.

ServerlessQ

Https://serverlessq.com.

ServerlessQ is a hosted message queue service for the serverless era of computing. Without any setup you can re-use your existing APIs and let ServerlessQ handle queueing and retrying your messages. You also get an overview of all your messages and error codes.

https://zeplo.io

Zeplo is a next generation message queue powered by HTTPS, no setup required. Supports delayed, scheduled and retry of messages. In-built console to debug and reprocess failed records. Infinitely scaleable.

Libraries and Frameworks

Task queue libraries generally provide higher-level and language specific abstractions over message brokers., https://github.com/bee-queue/bee-queue.

A simple, fast, robust job/task queue for Node.js, backed by Redis.

http://celeryproject.org/

Celery is an asynchronous task queue/job queue based on distributed message passing. It is focused on real-time operation, but supports scheduling as well.

https://github.com/yoomoney-tech/db-queue

Db-queue provides a rich interface to database-backed message queue (PostgreSQL, Oracle, MSSQL). Db-queue offers a rich set of features and is designed to handle high throughput. Db-queue assumes that you are already using relational databases in your production environment and that adding another dependency is undesirable.

Delayed::Job

Https://github.com/collectiveidea/delayed_job.

Delayed::Job (or DJ) encapsulates the common pattern of asynchronously executing longer tasks in the background.

https://dramatiq.io

Dramatiq is an alternative to Celery with a focus on simplicity, reliability and performance.

https://github.com/akira/exq

Exq is a job processing library compatible with Resque / Sidekiq for the Elixir language.

https://github.com/faust-streaming/faust

A library for building streaming applications in Python.

https://huey.readthedocs.io/en/latest/

A lightweight alternative.

https://koyoweb.org/job/index.html

Koyo is a web-development toolkit for Racket that comes with a built-in task queue based on PostgreSQL.

http://automattic.github.io/kue/

Kue is a feature rich priority job queue for node.js backed by redis. A key feature of Kue is its clean user-interface for viewing and managing queued, active, failed, and completed jobs.

https://laravel.com/docs/5.5/queues

Laravel queues provide a unified API across a variety of different queue backends, such as Beanstalk, Amazon SQS, Redis, or even a relational database. Queues allow you to defer the processing of a time consuming task, such as sending an email, until a later time. Deferring these time consuming tasks drastically speeds up web requests to your application.

https://github.com/RichardKnop/machinery

Machinery is an asynchronous task queue/job queue based on distributed message passing.

MassTransit

Http://masstransit-project.com/.

MassTransit is a free, open source, lightweight message bus for creating distributed applications using the .NET framework. MassTransit provides an extensive set of features on top existing message transports, resulting in a developer friendly way to asynchronously connect services using message-based conversation patterns. Message-based communication is a reliable and scalable way to implement a service oriented architecture.

node-rethinkdb-job-queue

Https://github.com/grantcarthew/node-rethinkdb-job-queue.

rethinkdb-job-queue is a persistent job or task queue backed by RethinkDB. It has been built as an alternative to the many queues available on NPM.

queue_classic

Https://github.com/queueclassic/queue_classic.

queue_classic provides a simple interface to a PostgreSQL-backed message queue. queue_classic specializes in concurrent locking and minimizing database load while providing a simple, intuitive developer experience. queue_classic assumes that you are already using PostgreSQL in your production environment and that adding another dependency (e.g. redis, beanstalkd, 0mq) is undesirable.

https://github.com/resque/resque

Resque is a Redis-backed Ruby library for creating background jobs, placing them on multiple queues, and processing them later.

http://python-rq.org/

RQ (Redis Queue) is a simple Python library for queueing jobs and processing them in the background with workers. It is backed by Redis and it is designed to have a low barrier to entry. It can be integrated in your web stack easily.

http://sidekiq.org/

Sidekiq is a full-featured background processing framework for Ruby. It aims to be simple to integrate with any modern Rails application and much higher performance than other existing solutions.

https://github.com/mperham/sidekiq.cr

A version of Sidekiq for the Crystal language.

https://github.com/temporalio/temporal

Temporal is a microservice orchestration platform which enables developers to build scalable applications without sacrificing productivity or reliability. Temporal server executes units of application logic, Workflows, in a resilient manner that automatically handles intermittent failures, and retries failed operations.

https://github.com/joakimk/toniq

Simple and reliable background job processing library for Elixir.

https://github.com/graphile/worker

High performance Node.js/PostgreSQL job queue (also suitable for getting jobs generated by PostgreSQL triggers/functions out into a different work queue)

https://zeromq.org/

ZeroMQ (also known as ØMQ, 0MQ, or zmq) looks like an embeddable networking library but acts like a concurrency framework. It gives you sockets that carry atomic messages across various transports like in-process, inter-process, TCP, and multicast. You can connect sockets N-to-N with patterns like fan-out, pub-sub, task distribution, and request-reply. It's fast enough to be the fabric for clustered products. Its asynchronous I/O model gives you scalable multicore applications, built as asynchronous message-processing tasks. It has a score of language APIs and runs on most operating systems.

Made by @Bogdanp . Inspired by queues.io . Last updated 2023-11-11.

task queue database

  • Latest Articles
  • Top Articles
  • Posting/Update Guidelines
  • Article Help Forum

task queue database

  • View Unanswered Questions
  • View All Questions
  • View C# questions
  • View C++ questions
  • View Javascript questions
  • View PHP questions
  • View Python questions
  • CodeProject.AI Server
  • All Message Boards...
  • Running a Business
  • Sales / Marketing
  • Collaboration / Beta Testing
  • Work Issues
  • Design and Architecture
  • Artificial Intelligence
  • Internet of Things
  • ATL / WTL / STL
  • Managed C++/CLI
  • Objective-C and Swift
  • System Admin
  • Hosting and Servers
  • Linux Programming
  • .NET (Core and Framework)
  • Visual Basic
  • Web Development
  • Site Bugs / Suggestions
  • Spam and Abuse Watch
  • Competitions
  • The Insider Newsletter
  • The Daily Build Newsletter
  • Newsletter archive
  • CodeProject Stuff
  • Most Valuable Professionals
  • The Lounge  
  • The CodeProject Blog
  • Where I Am: Member Photos
  • The Insider News
  • The Weird & The Wonderful
  • What is 'CodeProject'?
  • General FAQ
  • Ask a Question
  • Bugs and Suggestions

task queue database

Building High Performance Queue in Database for storing Orders, Notifications, Tasks

task queue database

Introduction

We have Queues everywhere. There are queues for asynchronously sending notifications like email and SMS in most websites. E-Commerce sites have queues for storing orders, processing and dispatching them. Factory Assembly line automation systems have queues for running tasks in parallel, in a certain order. Queue is a widely used data structure that sometimes have to be created in a database instead of using specialized queue technologies like MSMQ. Running a high performance and highly scalable queue using database technologies is a big challenge and it’s hard to maintain when the queue starts to get millions of rows queue and dequeued per day. Let me show you some common design mistakes made in designing Queue-like tables and how to get maximum performance and scalability from a queue implemented using simple database features.

Let’s first identify the challenges you have in such queue tables:

  • The table is both read and write. Thus queuing and dequeuing impact each other and cause lock contention, transaction deadlocks, IO timeouts, etc. under heavy load.
  • When multiple receivers try to read from the same queue, they randomly get duplicate items picked out of the queue, thus resulting in duplicate processing. You need to implement some kind of high performance row lock on the queue so that the same item never gets picked up by concurrent receivers.
  • The Queue table needs to store rows in a certain order and read in a certain order, which is an index design challenge. It’s not always first in and first out. Sometimes Orders have higher priority and need to be processed regardless of when they are queued.
  • The Queue table needs to store serialized objects in XML or binary form, which becomes a storage and index rebuild challenge. You can’t rebuild index on the Queue table because it contains text and/or binary fields. Thus the tables keep getting slower and slower every day and eventually queries start timing out until you take a downtime and rebuild the indexes.
  • During dequeue, a batch of rows are selected, updated and then returned for processing. You have a “ State ” column that defines the state of the items. During dequeue, you select items of certain state. Now State only has a small set of values, e.g. PENDING , PROCESSING , PROCESSED , ARCHIVED . As a result, you cannot create index on “ State ” column because that does not give you enough selectivity. There can be thousands of rows having the same state. As a result, any dequeue operation results in a clustered index scan that’s both CPU and IO intensive and produces lock contention.
  • During dequeue, you cannot just remove the rows from table because that causes fragmentation in the table. Moreover, you need to retry orders/jobs/notification N times in case they fail on the first attempt. This means rows are stored for longer period, indexes keep growing and dequeue gets slower day by day.
  • You have to archive processed items from the Queue table to a different table or database, in order to keep the main Queue table small. That means moving large amount of rows of some particular status to another database. Such large data removal leaves the table highly defragmented causing poor queue/dequeue performance.
  • You have a 24x7 business. You have no maintenance window where you can take a downtime and archive large number of rows. This means you have to continuously archive rows without affecting production queue-dequeue traffic.

If you have implemented such queue tables, you might have suffered from one or more of the above challenges. Let me give you some tips on how to overcome these challenges and how to design and maintain a high performance queue table.

Building a Typical Queue in SQL Server

Let’s take a typical Queue table as an example and we will see how well it works under concurrent load.

In this queue tables, items are dequeued using QueueDateTime as the sort order in order to simulate first-in-first-out or a priority queue. The QueueDateTime is not necessarily the time when the item is queued. It’s the time when it should be processed. So, item with the earliest time gets picked up early. The TextData is the large character field to store the payload.

The table has a non-clustered index on QueueDateTime in order to make the sorting using QueueDateTime faster during dequeue.

First, let’s populate this table with around 40K rows worth 500MB data, where each row has varying size of payload.

Then we will dequeue 10 items at a time. During dequeue, it will pick first 10 items based on QueueDateTime and Status = 0 and then it will update the Status to 1 to mark the items as being processed. We aren’t deleting the rows from the queue table during dequeue because we want to make sure the items aren’t lost forever in case the application that receives them fails to process them.

The above query first selects 10 rows from the QueueSlow table, and then it stores in a temporary table variable. It then changes the status of the selected rows to make sure no one else takes those rows again. If it is able to successfully update the same 10 rows and no one else has taken them by this time, then it commits the transaction, which makes those items marked as done and are not picked up on subsequent calls. But if it cannot update the same 10 rows that it picked with the right status, someone else has taken them in the meantime and it aborts in order to preserve consistency.

Let’s measure the IO performance:

The output of this is:

Let’s sum it up:

  • Total Logical Read = 1695
  • Total LOB Logical Read = 675
  • LOB Logical Read Count = 3

We will compare these with a faster solution and see how much improvement we get. One thing to notice is the high LOB Logical Read and the number of times LOB Logical Read is performed. There should be no reason to read Large Objects thrice as we need to load them only once. This clearly shows that SQL Server has to unnecessarily read large objects in order to satisfy the queries.

As queue and dequeue happens, and rows are moved out of the table for archival, the table gets fragments day by day. You can’t rebuild the clustered index online in order to eliminate the fragmentation because it has a varchar(max) field. So, you have no choice but to take a downtime and then rebuild the indexes. Downtimes are always expensive, even if you don’t have a 24x7 system.

Building a Faster Queue

First you need to reduce the high logical read. In order to do that, you need to split the QueueSlow table into two tables – QueueMeta and QueueData . QueueMeta contains only the fields that participate in WHERE clause. It’s a lightweight table only to hold the fields that are used in search queries. Thus SQL Server will be able to fit several rows in a 8K page and running through this table will be faster than running throw QueueSlow table where each page contains only a handful of rows, sometimes a single row spans across multiple pages due to the large payload.

Secondly, you will be able to rebuild indexes on the QueueMeta table online, while the production traffic is still hitting the table. This way, the performance of the QueueMeta table will not degrade and you won’t need a downtime anymore.

This table holds all the fields that appear in the search query. All other fields related to the payload are moved to the QueueData table.

There’s no clustered index on this table.

The Dequeue procedure is slightly modified to perform the query on the QueueMeta table first, and then select the payload from QueueData table.

When I populate the QueueMeta and QueueData with exact same data from QueueSlow and rebuild index on both table and do the comparison, I can see some significant improvement:

  • Total Logical Read = 1546 (vs 1695)
  • Total LOB Read = 380 (vs 675)
  • LOB Read Count = 1 (vs 3)

Here you see the number of logical reads is 149 less and LOB read is 295 less. The LOB read count is just 1, exactly what we should expect.

Comparing the Performance Under Load

When I simulate concurrent queue and dequeue and measure the performance counters, the result looks like this:

Let’s investigate the important counters and see the improvements:

  • Page Splits/sec – The faster solution has lower page split, almost none compared to the slower solution. This is because during insert, sometimes a row does not fit on a page that is partially populated and thus need to be split into new page. You can read more about Page Splits from here .
  • Transactions/sec – we get more value for our money. The more transactions per sec, the more queue operation happens while dequeue goes on. It shows the performance of fast queue operation is better compared to the slower option.
  • Lock Timeouts/sec – It shows how many queries wait for lock on a certain object and eventually gave up because it does not get the lock on time. The higher the value, the worse performance you get from your database. You have to try to keep this near zero. The above result shows the number of lock timeouts in the fast queue is less than the slower solution.
  • Batch Requests/Sec – Shows how many SELECT queries are performed per second. It shows both solutions are performing same number of SELECT s, although the faster solution is reading from multiple tables. So, the dequeue stored procedure on the slower solution is not significantly optimal.

It’s not just better performance, the greatest benefit is you can run INDEX DEFRAG online on the QueueMeta table and thus prevent the Queue from slowing down gradually.

Fastest Queue in SQL Server 2005, 2008

SQL Server 2005 introduced the OUTPUT clause in UPDATE , INSERT and DELETE statements. It allows you to get the rows that are affected by the insert , update , delete from a single query. You don’t have to first select some rows, then lock them, then update them and then return them. You can do it in one shot – update and return.

Here’s the dequeue procedure after the change:

This single line UPDATE statement does everything that the Dequeue procedures we have seen so far do. The IO stats are impressive as well:

  • Total Logical Read = 553
  • LOB Logical Read = 56

It is at least 3 times less IO intensive than the faster solution.

I have used a special locking strategy here – READPAST . This indicates that if it finds some rows locked, it won’t wait for the rows to be unlocked. It will just ignore them. Since we aren’t doing a SELECT and then an UPDATE , there’s no need to use the HOLDLOCK here. This is one of the reasons for better performance.

Archiving Strategy for Queue Tables

As you queue records on the Queue table, it keeps growing. You have to ensure the queue table remains within reasonable size so that backups and index rebuild don’t take forever. There are two ways to archive rows – continuously round the clock in small batches or in large batch during off peak. If you have a 24x7 system and there’s no off peak period for you, then you need to archive in smaller batch, continuously with some delay. However, you should not delete the rows from queue table during dequeue because delete is an expensive operation. It makes dequeue slower. Instead you should delete the processed items in the queue in another background job so that it does not impact the dequeue performance. Moreover, in a reliable queue, you can’t delete the rows during dequeue. If the process that deals with the items fails for some reason and does not queue them again for retry, then the items are lost forever. Sometimes you need to keep an eye on the queue to ensure items that are picked up for processing gets processed within a certain period. If not, then they need to be sent to the front of the queue to be picked up again for processing. For all these reasons, it’s best to keep items in the queue during dequeue and just update their status.

Performance and reliability of your order processing systems, task execution systems or notification systems depend on how well you design your queues. Since these directly impact customer satisfaction and eventually your bottom line, it’s important you spend enough time making the right design decision while building your queues. Otherwise over time it becomes a liability and you end up having to re-engineer the queues at the cost of lost business and high resource engagement.

  • 18 th September, 2010: Initial post

Image 4

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)

Comments and Discussions

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.

task queue database

task queue database

Successfully Deploying a Task Queue

Firing off a background task is easy. But what if it fails? What if it never got fired at all? How do you distribute a large number of async tasks across your computing infrastructure - and keep tabs on all of them?

Every system needs to run background tasks. But managing them isn’t as simple as telling a cron job or Lambda function to “just do it.”

In this article, we’ll dive into what successfully deploying a task queue for background tasks requires, including orchestrating and monitoring tasks at scale.

The utility of background tasks and task queues

💡 A background task is any computational task that runs asynchronously without delivering an immediate response to a user. It may be a discrete task or part of a larger workflow of related tasks.

Examples of background tasks include parsing PDF documents, sending multi-cast e-mails, generating image thumbnails, crawling a Web site, fulfilling an order, or kicking off an Extract-Load-Transform (ELT) job. In each of these cases, we expect the action will be:

  • Long-running. Image transformation, number-crunching, and data loads are compute-intensive tasks that may take minutes - or even hours - to complete.
  • Asynchronous. For example, if a task needs to make multiple HTTP requests to external sites, we can’t guarantee how long each server will take to respond.

A user’s connection will likely time out if we force them to wait for a response from such tasks. Even if it doesn’t, the wait may still be too long. These types of tasks should not be run in an application; one recent survey found that 54% of e-commerce site users expect Web pages to load in three seconds or less. That makes it critical to move any long-running task into background tasks, which requires separate architecture to scale successfully (more on that later).

Background tasks can be either schedule-driven or event-driven . An example of a schedule-driven background task is a long-running ELT job or other data migration task that runs once a day. Use cases like image thumbnail processing would fall under event-driven tasks, as they’re activated by a user action (in this case, uploading an image).

A task queue may run a single task or it may run multiple tasks, either sequentially or concurrently, as part of a larger workflow. Traditional task queue architecture gets complex when considering these dependencies.

Challenges with deploying a robust task queue

Many teams that start with background tasks will start with the simplest approach they can manage. This usually means running a task as a cron job , a serverless cloud function (such as AWS Lambda ), or a Docker container in Kubernetes.

As a team’s needs grow, these simple solutions become quite complex. If you’re running dependent workloads with different architectural requirements, it becomes increasingly challenging to orchestrate when, where, and how you’ll run them. Monitoring becomes more complicated as you may have tasks running in different locations and using different technologies (virtual machines, containers, serverless functions, third-party tools, etc.). This all comes to a head when failure occurs: the more intertwined the system, the longer it takes to debug the failure.

There are challenges with managing even simple tasks in a highly scalable architecture. For example, how does your task queue handle notifications and retry semantics? What happens if a container faults or an HTTP request returns a 500 server error?

Teams also find they need greater visibility into the tasks themselves. Suppose you support dozens of webhooks wired to serverless functions . Do you know when they last ran - and for how long? How do you detect anomalies when managing dozens or hundreds of background tasks?

Teams in this position sometimes turn to a distributed task queue like Celery to create a basic task queue and orchestration system. But tools like Celery still leave teams with a lot to build in terms of infrastructure versatility and monitoring logic.

Tips for deploying a successful task queue

How do you go beyond cron and deploy a more scalable and versatile task queue? Here are the core elements you should include in any background task manager architecture:

Ensure scalability

Account for failure, turn tasks into workflows.

  • Create a debugging infrastructure for tasks

You can have some level of monitoring and observability if you’re running a single task. But what about dozens? Or hundreds? As the complexity of your architecture grows, you need more than a single cron job or an unmonitored callback to a serverless function.

You might think, why can’t I run multiple just like I run one? A better question would be, what happens when you need to run a function in different infrastructure, or debug a dependency between multiple functions? This gets quite time consuming at scale.

To ensure scalability, your task queue should support:

  • Managing multiple task queues that run tasks on specific target infrastructure and distributing them at scale
  • Monitoring all of your tasks through a central pane of glass
  • Centralized logging , metrics, multiple trigger types, and retry semantics at scale

Supporting this means building out an infrastructure to support it. For example, a typical scalable Celery deployment will involve standing up and maintaining scalable clusters of multiple worker nodes . (Remember: you’ll need to monitor the scripts and that infrastructure itself to ensure reliability and uptime.)

It’s inevitable. Your background tasks are going to fail. The question isn’t whether or not they fail - it’s how well they handle the failure.

A robust task queue needs multiple mechanisms to account for failure, including:

  • A variety of retry semantics . For example, you will want to use a retry with exponential backoff for an apparent transient error, such as a Web server being unreachable or returning a 500 server error.
  • Self-healing logic . If a virtual machine fails to respond, a task queue’s task can restart it via its cloud provider before re-attempting the task.
  • Graceful failure . For permanent errors - a database item can’t be found, a page returns a 404, credentials are rejected - you’ll need support for reasonable fallback behaviors.
  • Notification of failure . When failure happens, your team needs to know so they can remediate it ASAP.

Additionally, think about accounting for the hardest-to-detect failure case: what if your task fails to run? If you expect a task to run every hour and it hasn’t run for three days, you have a problem. Without the proper monitoring, that problem may go unnoticed for weeks or months.

Build in mechanisms, such as pings or logs sent to a centralized monitoring service , to keep tabs on your task’s runs. Once implemented, you can define alerts to generate notifications if the number of expected runs in a time period fails to meet the expected threshold.

Smaller units of software that do one thing - and do it well - are easier to maintain, monitor, and debug. That means that tasks should be small and discrete. If your “task” is complete - fulfilling an order, deploying a CI/CD pipeline, processing data - you need a task queue that supports workflows. Additionally, if your discrete tasks are interdependent, that interdependency itself is a workflow.

💡 Workflows are logical units of execution comprised of two or more tasks. A workflow system can create, deploy, and monitor complex tasks, particularly important when tasks depend on each other for proper execution. It can also coordinate tasks across infrastructure, systems, triggers, and teams.

Workflows and workflow orchestration support the following basic functions:

  • Scheduling. Enables tasks to run reliably at a set time, not when an individual engineer remembers to run a script.
  • Ordering operations. Runs tasks in the proper order, and scales out resources as needed to prevent failure.
  • Observability. Provides visibility into the status of a workflow and its individual tasks, along with detailed logs for debugging when failures inevitably occur.
  • Versatile triggers. Enables running a workflow via scheduled or event-based triggers across a variety of systems, including virtual machines, Kubernetes clusters, and cloud providers.

Debugging infrastructure for tasks

It’s impossible to recreate certain failures you’ve seen in production. Rich, centralized logging and metrics are critical to debugging issues you can’t reproduce easily. An important aspect of this is a rich UI which will help you find the failure and its downstream dependencies quickly.

All logging and metrics should be part of a centralized logging interface easily accessible by any engineer or SRE. Team members should be able to find and drill down into the logs for a specific task run to discover error messages and pinpoint root causes.

Get started with Prefect workflows

When tasks require diverse infrastructure, grow increasingly dependent on each other, and have users that need to understand their state, they function more like workflows. Scaling a complex workflow architecture is not dissimilar from scaling an application - you might need an API, background tasks, have stakeholders, and more. That requires a workflow orchestrator that’s  up to par.

Deploying a scalable task queue isn’t a stroll through the daisies. There are a lot of moving parts. And at the end of the day, you’re signing up to create yet another piece of infrastructure that must be deployed, monitored, and maintained.

That’s why Prefect provides full support for deploying complex workflows. Using Prefect, you can define even the most involved workflow as a series of tasks with a simple decorator run via Python.

Prefect’s workflow orchestration solution will work with any set of diverse infrastructure along with full observability, retry semantics, and a wide range of triggers. You can run workflow tasks in all major cloud providers, trigger workflows from any third-party system, and integrate with dozens of external systems via our pre-provided integrations packages .

Prefect makes complex workflows simpler, not harder. Try Prefect Cloud for free for yourself, download our open source package , join our Slack community , or talk to one of our engineers to learn more.

Related Content

task queue database

Object Rocket

Create a Simple Task Queue with Flask and Redis

task queue database

Introduction

When it comes to website performance, even a few seconds can be too long to keep your users waiting, especially if you want them to keep coming back to your website. Some tasks take longer to execute than others, and it can make sense to remove the lengthier ones from the typical HTTP request-and-response cycle. That’s where a task queue can be a game changer. With a task queue, you can shift tasks into the queue to be processed later, allowing you to return a response to the user immediately. In this article, we’ll show you how to create your own task queue using Flask and Redis.

Prerequisites

Before we proceed with this tutorial, let’s review a few key prerequisites that need to be in place:

You’ll need to have Python installed on your machine. The latest version of Python 3 is recommended because Python 2 has been deprecated.

You’ll also need to install some packages, and you’ll use the PIP3 package manager to install the Flask micro web framework. Use the command pip3 -v in the terminal to verify its version number. If you’d like to see which packages are currently installed, use the command pip3 list .

Install Redis

If you don’t have Redis installed on your machine, you can download and install the server for macOS or Windows. You can also pull Redis as a Docker image in the Docker Hub.

Install Redis in Linux

If you’re running a Debian distribution of Linux like Ubuntu or Linux Mint, the installation process is different. First, you’ll need to update the packages of the index files using the apt-get repository. This will make sure that you install the latest available version of Redis:

Next, install the redis-server available in the apt repository. Use the command below to install Redis on your machine:

Once you’ve installed Redis, you can enable the Redis server and reboot:

Install and set up Flask

Our next task will be to install the Flask micro web framework using the pip3 command. We’ll install this in a virtual environment in Python 3. We can start by creating a project directory where we’ll create the virtual environment in Python and then change into that directory:

To create a virtual environment, use the command shown below:

This command will create a directory called venv , which contains the Python library and others.

To start the virtual environment, run the following activation script:

Once it’s activated, we’ll be able to use the pip3 Python package manager to install Flask:

NOTE: You can verify the Flask version that was installed by using the command python -m flask --version . The output will look like this:

Python 3.6.9 Flask 1.1.1 Werkzeug 0.16.0

We can also use pip3 to install Redis as a Python package. This will serve as the database that will store our data:

Next, we’ll install the Redis Queue, which is a library for processing task queues:

Create a task queued file

Now that we’ve installed all the necessary packages, let’s start building the task queue. Using a code editor such as VSCODE , SUBLIME or ATOM , we’ll create a file that will set up our Flask and Redis application:

We’ll import all of the necessary libraries from the file to create our task queue application:

Next, we’ll create a variable named app in Flask. This will be used to set up the Redis instance and our task queue:

We then create a function that will handle the task queue:

Finally, we’ll create a route for the query string for the len(n) parameter:

Before we run the file that we just created, let’s display the worker process that is separate from the app:

We should see output that looks like the following:

To run our task queue application, we’ll use the following command in the directory that holds our Python file:

This will run on the localhost 127.0.0.1 and append the path /task to the URL to see if the application is running.

Try going to the URL 127.0.0./task?n=hello and refresh your browser. This will create a separate work process which is connected to the Redis queue library.

At this point, the task is running in the background. Our task queue was a success.

Utilizing a task queue can help you manage tasks in the background while providing optimal website performance to your users. In this article, we showed you how to build a simple task queue application using Flask and Redis. With our step-by-step instructions, you’ll be able to follow along and create your own application to queue and manage tasks.

Pilot the ObjectRocket Platform Free!

Try Fully-Managed CockroachDB, Elasticsearch, MongoDB, PostgreSQL (Beta) or Redis.

  • Connect to Redis server on ObjectRocket using WSL
  • Redis HSET Data Type on ObjectRocket
  • How to connect PHP to a Redis instance on ObjectRocket
  • Guides on the Redis Zset and How to Use it
  • How to Perform the Redis Flush All Command
  • How to Use the Redis HMSET Command
  • How to Use the Redis to Store Json Data
  • How to Use the Redis Transactions Commands in the Database

Keep in the know!

Subscribe to our emails and we’ll let you know what’s going on at ObjectRocket. We hate spam and make it easy to unsubscribe.

  • Elasticsearch
  • Database Migration
  • Cost of Ownership
  • About ObjectRocket
  • Case Studies
  • Documentation
  • White Papers
  • Submit Ticket
  • System Status = 500), 'degraded': (code 100), 'up': (code

task queue database

  • © 2022 ObjectRocket, a Rackspace Company
  • Terms of Service

MongoDB® is a registered trademark of MongoDB, Inc. Redis® and the Redis® logo are trademarks of Salvatore Sanfilippo in the US and other countries. Elasticsearch® is a trademark of Elasticsearch BV, registered in the US and in other countries.

Building a task queue, Part 1

This post is part 1 of a series where I build a task queue system. Other parts:

Some time ago, I came across a post suggesting that engineering teams should build their own task queues. I don't agree, but thinking about it made me realize that task queues are actually a great learning project, especially if you're learning a new language or stuck in the "advanced beginner" spot. Nearly every app that goes to production needs one. And there are many different challenges to tackle and different implementations to try out. Plus, knowing what goes on within a task qeue makes you utilise your existing quees better.

I realized I could do with some challenges in these areas, so I'm going to take my own advice and build my own queueing system. This article aims to be sort of a journal of what I'm doing and learning.

Thoughts on queues

I'm very far from being a queue expert, but I know that there are different types of queues. I know there are job queues and task queues (which might be the same thing?) and message queues. The idea is that job/task queues (such as Ruby's Sidekiq and Resque) are for pushing tasks you need done, and one or more workers pick them from the queue and execute them. A message queue (eg RabbitMQ, Kafka), by comparison, is for publishing messages, which can then be consumed (one or more times) by interested subscribers.

it isn't strictly separated; people often use message queues as job queues too, and vice versa. But I'll be building a task queue here. I think a message queue is another interesting challenge, though.

I like to think of queue systems as having two main parts: the infrastructure and the interface. The infrastructure is what powers the queue, and is responsible for making sure your jobs run as you wish. The interface is how your application communicates with the infrastructure (for instance, to queue jobs). There could also be a web UI for viewing information about your queues, but I'm ignoring that for now.

A queuing system can come with one or both parts, and they can have different degrees of complexity. For example:

  • Beanstalkd, RabbitMQ, and Kafka are mostly used as external infrastructure. You don't have to install Beanstalkd into your app, just start the process and install (or write) a library to talk to it.
  • Rails' ActiveRecord is an interface to different queuing backends.
  • Sidekiq and Delayed Job are Ruby tools that are installed as part of your app. They come with their own interface, but you can also use them with other interfaces, such as Rails' ActiveJob.
  • The Node.js libraries BullMQ and Bee Queue are also infrastructure and interface. However, you typically have to use their interface.
  • Laravel's queuing system is an interface that comes with its own infrastructure, and its infra gives you different queuing backends as options, from database- and Redis-backed ones to services like Beanstalkd or Amazon Simple Queue Service (SQS).

Architecture

There are often two major architecture decisions:

  • Where jobs are stored (and how long, and when)
  • How jobs are executed

Two common options for job storage are a database and Redis. Redis is nice because it allows fast access, and you can easily retrieve new jobs without having to poll at intervals (for example, by using blocking commands ). Databases are nice because they're simpler and allow a wider variety of queries, which is useful if you intend to store jobs or support querying them through the UI.

I think Redis is generally superior for this, especially for high-activity systems. With a database, the more jobs you insert, the longer your queries take. And even when you delete old records, they aren't necessarily immediately deleted on disk.

As for execution, most in-app queue systems seem to follow this basic process:

  • you push a job to the queue by saving it to the datastore
  • a process that's always listening (the worker ) fetches it from the datastore and executes it

But there's no rule that we have to do this. For instance, a basic queue system could be something like this:

  • Put the code for your job in a separate file, say send_welcome_email.rb
  • Run the script in a separate process and don't wait for the result

But this isn't a robust system. For starters, it takes time to start a process, we have no error handling or management capabilities, and we might end up overloading our machine with too many processes.

Using a worker makes things better:

  • The worker is already active and listening, so there is no startup overhead and it can react to new jobs quickly.
  • You can start multiple workers to improve concurrency (e.g. 1 process per CPU core).
  • Jobs are pushed to a store, which is better for reliability

In practice, though, there are many different variations of this:

  • The worker process could itself spawn multiple threads for more concurrency (Sidekiq, GoodJob). This means that your jobs must be thread-safe.
  • The worker process could fork itself into a new process for each job (Resque).
  • There could be a "master" worker, which serves as a coordinator and manager of the others. It can start or stop processes depending on your configuration. It can monitor the health of processes, kill those that go past a memory limit or execution time, and start new ones. ( Sidekiq Enterprise )
  • You might not talk to the datastore directly; instead, the master process has its internal storage, and merely exposes an API (such as localhost:2946). To enqueue a job, you send it as a request to that endpoint. This is typically used by external infra like Beanstalkd.

What I'm building

I'll be building something inspired by Sidekiq. I think its architecture is powerful and its interface is simple. In my case, this means:

  • Data store: Redis (but I'll start out with a database implementation). Sidekiq doesn't store completed jobs, but I'd like to store them for a limited time
  • Executors: worker processes which spawn threads. I do like the "master" model, though, so I'll see if I can give that a try too.

I did some research to find out the most common and useful queue features, and I've highlighted some I think are a good start:

  • dispatching jobs (with a delay, on specific queue, in bulk, on a schedule)
  • giving queues names and priorities
  • telling a worker which queues to process
  • chaining jobs
  • splitting a job into batches
  • error handling
  • retry policies
  • job callbacks/middleware
  • graceful shutdown of workers
  • testing helpers

I've created a dummy Sinatra app with a route that queues jobs:

Next up, configuring the database using the Sequel query builder.

Nothing serious in the migration script yet; it's just a stub for when I'm ready to run migrations.

Okay, we're ready. Let's do some queueing!

Job interface

Most job libraries have you create a job class to contain the logic you want to execute. Then, when queueing, you either:

  • pass an instance of the job to the library, which is then serialized , stored the job, and unserialized when it's time to execute, or
  • pass the job arguments to the library, and it stores those, then creates an instance of the job when it's ready to execute it.

I'll go with the second, because I don't have to worry about any gotchas in serialization.

Here's what I want queuing a job to look like:

args will be passed to the job when executing it, while job_options are meant for the executor. Here the caller can configure things like a delay, queue name, or priority. Some job libraries use additional methods for this (ActiveJob would be DoSomeStuff.set(options).perform(arguments) , while Sidekiq would be DoSomeStuff.perform_after(time, args) ), but I'm sticking with this for a start.

Now, to implement my job interface. We're starting out with the database as our store, so let's set up our jobs table. I haven't fully thought out the implementation yet, but at the least our jobs will need to store the job arguments and the execution details.

Since we've defined a state column, let's talk a bit about the job lifecycle. I've decided to go with these:

  • waiting : The job is waiting to be picked up.
  • executing : The job has been picked up by a worker.
  • failed : The job failed, but it's going to be retried.
  • dead : The job has failed and exhausted the maximum retry count. It won't be retried anymore.

The reserved_by field is used for locks . It allows us to improve concurrency by having multiple queue workers without them clashing with each other. When a worker picks up a job, it sets that field so that other workers don't try to pick up the same job. A worker will only pick up a job that isn't reserved.

Run this with ruby lib/db_migrate.rb , and we're good.

Next, we'll provide a Queueable class that has the dispatch method, which DoSomeStuff extends.

Zooming in on some details:

  • Gator::Queueable is the class end-users will extend. Right now, it only provides two things: the static dispatch method, and a logger for when your job is being executed.
  • Gator::Models::Job is a class that we use to interact with jobs in the database. Its contents:
  • We store the class name as the name of the job, so the job executor can create an instance of the job class. Unfortunately, this means that if we rename the class after enqueuing a job, but before it is executed, our executor might crash, since the job class no longer exists. (This would also happen if we used the serialize-store-deserialize approach—there'd be no class to deserialize to.)

This is a known limitation of such job systems. It can be fixed by giving every job a name that's independent of the class name, and looking that up instead. Example:

However, this pushes the problem to another layer. What happens if you want to change the custom name? Similar problem. There isn't a full solution for this, but it isn't a very frequent problem, and you can usually find several workarounds.

And now updating our route, and visiting it:

Okay, great! We can enqueue jobs now. That's the easy part.

The queue worker

Our queue worker will poll the database for new jobs. If there are any, it will try to reserve one.

Let's flesh this out into a Worker class:

Finally, a CLI script that instantiates and starts a Worker :

Note the --require option. I've added this so we can load the user's app before starting the worker, otherwise we'll get errors for undefined classes.

Now we start the worker ( boot.rb is the file that loads my app):

And it immediately executes our waiting job:

We can enqueue more jobs, and process them almost instantly:

One limitation of thr reserved_by system is that, if the worker is somehow killed while processing a job, it will never be picked up again. To fix this, we could add a reserved_at column that we use to decide if a job is still locked. But I won't bother with that, since we're switching to Redis later, which makes time-based locks easier.

Concurrency

Let's test our concurrency setup. What happens when we have multiple queue workers? I'll try enqueueing three jobs with three workers listening:

Well, it's a bit disappointing. All three jobs are processed by the same worker.

task queue database

There are a few reasons for this:

  • The job doesn't don't do any real work, just prints a log. That takes almost no time. If you look at the timestamps, you'll see that all three jobs are executed within a second.
  • Our worker loop algorithm only sleeps if there weren't any jobs. As long as there's a job to process, the worker will keep processing.

These two factors combined mean that the worker that picks up the first job will finish it, and pick up the next, and so on, before the other workers even get to check for jobs again (after 5s). The impact of more workers will only be seen when we have a lot of jobs, or our jobs actually do some work that takes time.

Let's see what happens if I add a little "work" to the job:

Ah, that's better. One worker still has nothing to do, but the other two share the three jobs. And none is processed twice.

Handling failures

But what if a job fails? Right now, it would crash our worker. So one more thing for today: let's add some basic error handling.

For now, all we'll do is record it when an error happens. We just need to adjust a few methods in our worker:

And I'll edit the job so it randomly fails:

And now, when I queue a couple of jobs, some fail:

task queue database

And we can see them marked as failed in the database, complete with the error details:

task queue database

Okay, that's a good start. You can see the full code at this point on this commit .

Check out the next post in the series.

I write about my software engineering learnings and experiments. Stay updated with Tentacle : tntcl.app/blog.shalvah.me .

Other Posts

Building a task queue, part 2.

This post is part 2 of a series where I build a task queue system. Other parts: Part 1 Part 3 Part 4 Next, I'll beef up my queue library with some...

Building a task queue, Part 4

This post is part 4 of a series where I build a task queue system. Other parts: Part 1 Part 2 Part 3 Before we continue, at this point, you'd have...

Queue Data Structures: How to Build a Node Task Queue

Craig Buckler

This tutorial explains queue data structures and demonstrates queuing systems. Queues are often used to process long-running tasks such as email newsletter delivery. Below, you’ll build a simple Node task queue.

It’s not always practical to execute a task the moment it’s requested.

Consider an email newsletter administration system. After writing, an administrator must hit a big red “SEND NOW” button. The application could send every email immediately and show a “completed” response. That would work for a dozen messages, but how long would it take for 1,000 subscribers or more? The browser request would time out before the process completed.

Another example: a user can upload any number of photographs to a gallery application. The system resizes and sharpens each image for alternative dimensions. This process could run on upload, but it would to incur a delay for every image.

It’s more effective to decouple tasks in these situations. The user receives an instant response but task processing occurs in the background. Other applications or servers handle tasks and schedule re-attempts on failure. The user can receive alerts or examine logs to determine progress.

What Are Queue Data Structures?

A queue is a data structure which holds a collection of items:

  • Any process can send (or enqueue ) an item at any time — such as send newsletter X to recipient Y.
  • Any process can receive (or dequeue ) the item at the front of the queue — for example, the item that’s been in the queue for longest.

Queue data structures are a first-in-first-out (FIFO) structure. The first item added to the queue will be the first out.

A Basic JavaScript Task Queue Data Structure

You can create a task queue using a JavaScript array. The push() method adds an item to the end of an Array while the shift() method removes and returns an item from the start:

Your queue data structures can hold any data in individual array elements. You can push strings, numbers, Booleans, other arrays, or objects.

You can use an ES6 class to define any number of separate queues:

These simple queue data structures may be useful for less critical client-side code such as queuing UI updates so processing occurs in a single DOM update. localStorage or IndexedDB can offer a level of data persistence if necessary.

Queuing Platforms

In-memory queues are less practical for complex server applications:

  • Two or more separate applications can’t (easily) access the same queue.
  • Queue data disappears when the application terminates.

Purpose built message-broker software provides more robust queuing. Platforms vary, but offer features such as:

  • data persistence in a choice of databases with replication, sharding, and clustering options
  • a range of access protocols, often including HTTP and Web Sockets
  • any number of separate queues
  • delayed messaging, where message processing can occur at a later time
  • transaction-like support, where a message is re-queued when processing isn’t confirmed
  • publish-subscribe patterns, where applications receive an event when a new item appears on a queue

Message-broker software includes Redis , RabbitMQ , Apache ActiveMQ , and Gearman . Cloud messaging services include Amazon SQS , Azure Service Bus , and Google Pub/Sub .

These may be viable options for enterprise-level applications. Yet they could be overkill if you have simpler requirements and already use a database.

Use MongoDB as Our Node Task Queue’s Message Broker

It’s possible to develop a sophisticated Node task queue system to manage queue data structures in a couple of hundred lines of code.

The queue-mongodb module described here uses MongoDB for data storage, but the same concepts could be adopted by any SQL or NoSQL database. The code is available on GitHub and npm .

Node Task Queue Project: Getting Started

Make sure you have Node.js 14 or above installed, then create a new project folder such as queue-test . Add a new package.json file:

Note: "type": "module" configures the project to use ES6 modules. The "scripts" will send and receive queued items.

Install the queue-mongodb module:

Then create a .env file with your MongoDB database connection credentials. For example:

Note: this creates a queue collection ( QUEUE_DB_COLL ) in the qdb database ( QUEUE_DB_NAME ). You can use an existing database, but make sure the collection doesn’t clash with another.

Database read/write access must be granted to the user root ( QUEUE_DB_USER ) with password mysecret ( QUEUE_DB_PASS ). Set both values blank if no authentication is required.

Start a MongoDB database if it’s not already running. Those with Docker and Docker Compose can create a new docker-compose.yml file:

Then run docker-compose up to download and start MongoDB with a persistent data volume.

Docker is available Linux, macOS, and Windows 10. See the Docker installation instructions .

Create a new send.js file to add a randomly generated email messages to a queue named news :

Execute it with npm run send and you’ll see output such as this:

The .send() method returns an qItem object containing:

  • the MongoDB document _id
  • the date/time the item was originally queued, and
  • a copy of the message data

Run the script any number of times to add further items to the queue. The items queued will increment on every run.

Now create a new receive.js file to retrieve messages from the same Node task queue:

Run npm run receive to fetch and process queued items:

No email is sent in this example, but that could be implemented using Nodemailer or another suitable module.

If processing fails — perhaps because the mail server is down — an item can be re-queued with this:

The second 600 argument is an optional number of seconds or future date. This command re-queues the item after 600 seconds (ten minutes) have elapsed.

This is a simple example, but any application can send data to any number of queues. Another process, perhaps started as a cron job, can receive and process items when necessary.

How the queue-mongodb Module Works

The type string passed to the class constructor defines a queue name. The .send() method creates a new MongoDB document when passed data to add to the queue. The MongoDB document contains:

  • A MongoDB _id (the creation date/time is encoded within the value).
  • The queue type .
  • A processing date/time value named proc . It’s possible to set a future time but the current time is the default.
  • The item data . This can be anything: a Boolean, number, string, array, object, and so on.

The .receive() method locates the oldest document that has a matching type and a proc date/time in the past. The document is formatted, returned to the calling code, and deleted from the database.

The following sections describe the module in further detail.

queue-mongodb Module: Initialization

The dotenv module reads the .env environment variables if necessary. A database connection object is created using the official mongodb driver module :

The qCollection variable holds a reference to the database’s queue collection (defined by QUEUE_DB_COLL ). It’s created and returned by the dbConnect() function, which also defines the collection schema and indexes when necessary. All Queue methods run const q = await dbConnect(); to get the collection reference:

The dbClose() function closes the database connection:

queue-mongodb Module: Queue Constructor

The Queue constructor sets the queue type or name:

queue-mongodb Module: Queue.send() Method

The .send() method adds data to the queue with the appropriate type . It has an optional delayUntil parameter, which adds an item to the queue at a future time by specifying a number of seconds or a Date() .

The method inserts a new document into the database and returns a qItem object ( { _id , sent , data } ) or null if unsuccessful:

queue-mongodb Module: Queue.receive() Method

The .receive() method retrieves and deletes the oldest queued item in the database with a specific type and a proc date/time in the past. It returns a qItem object ( { _id , sent , data } ) or null if nothing is available or an error occurs:

queue-mongodb Module: Queue.remove() Method

The .remove() method deletes the queued item identified by a qItem object ( { _id , sent , data } ) returned by the .send() method. It can be used to remove a queued item regardless of its position in the queue.

The method returns the number of deleted documents (normally 1) or null when an error occurs:

queue-mongodb Module: Queue.purge() Method

The .purge() method deletes all queued items of the same type and returns the number of deletions:

queue-mongodb Module: Queue.count() Method

The .count() method returns the number of queued items of the same type :

queue-mongodb Module: Queue.close() Method

The .close() method runs the dbClose() function to terminate the database connection so the Node.js event loop can end:

A New Queue

Queues are a consideration for any web application with computationally expensive functions that could cause a bottleneck. They can improve performance and maintenance by decoupling applications into smaller, faster, more robust processes. Dedicated message broker software is an option, but simple queuing systems like the Node task queue we built today are possible in a few dozen lines of code.

FAQs About Using the Queue Data Structure in Node

A queue is a linear data structure that follows the First-In-First-Out (FIFO) principle, where the first element added is the first to be removed. In a queue, elements are added to the rear and removed from the front.

Queues are useful for managing tasks or data in a sequential manner. They are commonly used for background processing, task scheduling, event handling, and managing resources like connections and requests.

You can implement a queue using arrays or linked lists. In Node.js, you can use the built-in array data structure or implement a custom queue using JavaScript objects and methods.

The key difference is the order of removal. In a queue, the first element added is the first to be removed (FIFO), while in a stack, the last element added is the first to be removed (LIFO).

Queues are used for task scheduling, managing concurrent requests, processing messages in a message queue, handling event listeners, and more. They are vital for scenarios where order and sequence matter.

  • Español – América Latina
  • Português – Brasil
  • Documentation
  • App Engine standard environment

Task Queue Overview

This page describes what task queues are, and when and how to use them. Task queues let applications perform work, called tasks , asynchronously outside of a user request. If an app needs to execute work in the background, it adds tasks to task queues . The tasks are executed later, by worker services.

The Task Queue service is designed for asynchronous work. It does not provide strong guarantees around the timing of task delivery and is therefore unsuitable for interactive applications where a user is waiting for the result.

Push queues and pull queues

Task queues come in two flavors, push and pull . The manner in which the Task Queue service dispatches task requests to worker services is different for the different queues.

Push queues run tasks by delivering HTTP requests to App Engine worker services. They dispatch these requests at a reliable, steady rate and guarantee reliable task execution. Because you can control the rate at which tasks are sent from the queue, you can control the workers' scaling behavior and hence your costs.

Because tasks are executed as requests targeted at App Engine services, they are subject to stringent deadlines. Tasks handled by automatic scaling services must finish in ten minutes. Tasks handled by basic and manual scaling services can run for up to 24 hours.

All task queue tasks are performed asynchronously . The application that creates the task hands it off to the queue. The originating application is not notified whether or not the task completes, or if it was successful.

If a worker fails to process a task, the Task Queue service provides the queue with a retry mechanism, so the task can be retried a finite number of times.

Push queues

One typical push queue use case is a "slow" operation. Consider a social network messaging system. Every time a user sends a message, the network needs to update the followers of the sender. This can be a very time-consuming operation. Using a push queue, the application can enqueue a task for each message as it arrives to be dispatched to a worker service for processing. When the worker receives the task request, it can retrieve the sender's list of followers and update the DB for each one. The worker can be made even more efficient by enqueuing another pushtask for each database update.

Another use for push queues is scheduled tasks. Imagine an application that implements an ad campaign. A group of tasks written to send out emails can be added to a push queue with instructions to withhold the tasks until a specified time in the future. When the due date arrives, the Task Queue service will begin to issue requests to execute the tasks.

Pull queues

Pull queues work well when you need to batch tasks together for efficient execution. One solution takes advantage of the ability to attach a tag to a pull task. Workers can lease a group of tasks that have the same tag. A typical example might be an app that maintains leaderboards for numerous different games, with many players and groups constantly in play. Every time there is a new high score, the app can enqueue a pull task with the score and the player, and use the game ID as a task tag. A worker periodically "wakes up", leases a group of tasks with the same game ID, and updates the leaderboard. You can lease tasks explicitly, using a specified tag value, or let the service decide which group of similarly tagged tasks to send.

What's next

  • Read about push queues
  • Read about pull queues

Except as otherwise noted, the content of this page is licensed under the Creative Commons Attribution 4.0 License , and code samples are licensed under the Apache 2.0 License . For details, see the Google Developers Site Policies . Java is a registered trademark of Oracle and/or its affiliates.

Last updated 2024-02-23 UTC.

Developing an Asynchronous Task Queue in Python

Share this tutorial.

  • Hacker News

This tutorial looks at how to implement several asynchronous task queues using Python's multiprocessing library and Redis .

Queue Data Structures

Following along, multiprocessing pool, multiprocessing queue.

A queue is a First-In-First-Out ( FIFO ) data structure.

  • an item is added at the tail ( enqueue )
  • an item is removed at the head ( dequeue )

queue

You'll see this in practice as you code out the examples in this tutorial.

Let's start by creating a basic task:

So, get_word_counts finds the twenty most frequent words from a given text file and saves them to an output file. It also prints the current process identifier (or pid) using Python's os library.

Create a project directory along with a virtual environment. Then, use pip to install NLTK :

Once installed, invoke the Python shell and download the stopwords corpus :

If you experience an SSL error refer to this article. Example fix: >>> import nltk >>> nltk.download ( 'stopwords' ) [ nltk_data ] Error loading stopwords: <urlopen error [ SSL: [ nltk_data ] CERTIFICATE_VERIFY_FAILED ] certificate verify failed: [ nltk_data ] unable to get local issuer certificate ( _ssl.c:1056 ) > False >>> import ssl >>> try: ... _create_unverified_https_context = ssl._create_unverified_context ... except AttributeError: ... pass ... else : ... ssl._create_default_https_context = _create_unverified_https_context ... >>> nltk.download ( 'stopwords' ) [ nltk_data ] Downloading package stopwords to [ nltk_data ] /Users/michael.herman/nltk_data... [ nltk_data ] Unzipping corpora/stopwords.zip. True

Add the above tasks.py file to your project directory but don't run it quite yet.

We can run this task in parallel using the multiprocessing library:

Here, using the Pool class, we processed four tasks with two processes.

Did you notice the map_async method? There are essentially four different methods available for mapping tasks to processes. When choosing one, you have to take multi-args, concurrency, blocking, and ordering into account:

Without both close and join , garbage collection may not occur, which could lead to a memory leak.

  • close tells the pool not to accept any new tasks
  • join tells the pool to exit after all tasks have completed
Following along? Grab the Project Gutenberg sample text files from the "data" directory in the simple-task-queue repo, and then add an "output" directory. Your project directory should look like this: ├── data │   ├── dracula.txt │   ├── frankenstein.txt │   ├── heart-of-darkness.txt │   └── pride-and-prejudice.txt ├── output ├── simple_pool.py └── tasks.py

It should take less than a second to run:

This script ran on a i9 Macbook Pro with 16 cores.

So, the multiprocessing Pool class handles the queuing logic for us. It's perfect for running CPU-bound tasks or really any job that can be broken up and distributed independently. If you need more control over the queue or need to share data between multiple processes, you may want to look at the Queue class.

For more on this along with the difference between parallelism (multiprocessing) and concurrency (multithreading), review the Speeding Up Python with Concurrency, Parallelism, and asyncio article.

Let's look at a simple example:

The Queue class, also from the multiprocessing library, is a basic FIFO (first in, first out) data structure. It's similar to the queue.Queue class, but designed for interprocess communication. We used put to enqueue an item to the queue and get to dequeue an item.

Check out the Queue source code for a better understanding of the mechanics of this class.

Now, let's look at more advanced example:

Here, we enqueued 40 tasks (ten for each text file) to the queue, created separate processes via the Process class, used start to start running the processes, and, finally, used join to complete the processes.

It should still take less than a second to run.

Challenge : Check your understanding by adding another queue to hold completed tasks. You can enqueue them within the process_tasks function.

The multiprocessing library provides support for logging as well:

To test, change task_queue.put("dracula.txt") to task_queue.put("drakula.txt") . You should see the following error outputted ten times in the terminal:

Want to log to disc?

Again, cause an error by altering one of the file names, and then run it. Take a look at process.log . It's not quite as organized as it should be since the Python logging library does not use shared locks between processes. To get around this, let's have each process write to its own file. To keep things organized, add a logs directory to your project folder:

Moving right along, instead of using an in-memory queue, let's add Redis into the mix.

Following along? Download and install Redis if you do not already have it installed. Then, install the Python interface : (env)$ pip install redis == 4 .5.5

We'll break the logic up into four files:

  • redis_queue.py creates new queues and tasks via the SimpleQueue and SimpleTask classes, respectively.
  • redis_queue_client enqueues new tasks.
  • redis_queue_worker dequeues and processes tasks.
  • redis_queue_server spawns worker processes.

Here, we defined two classes, SimpleQueue and SimpleTask :

  • SimpleQueue creates a new queue and enqueues, dequeues, and gets the length of the queue.
  • SimpleTask creates new tasks, which are used by the instance of the SimpleQueue class to enqueue new tasks, and processes new tasks.
Curious about lpush() , brpop() , and llen() ? Refer to the Command reference page. ( The brpop() function is particularly cool because it blocks the connection until a value exists to be popped!)

This module will create a new instance of Redis and the SimpleQueue class. It will then enqueue 40 tasks.

If a task is available, the dequeue method is called, which then de-serializes the task and calls the process_task method (in redis_queue.py ).

The run method spawns four new worker processes.

You probably don’t want four processes running at once all the time, but there may be times that you will need four or more processes. Think about how you could programmatically spin up and down additional workers based on demand.

To test, run redis_queue_server.py and redis_queue_client.py in separate terminal windows:

example

Check your understanding again by adding logging to the above application.

In this tutorial, we looked at a number of asynchronous task queue implementations in Python. If the requirements are simple enough, it may be easier to develop a queue in this manner. That said, if you're looking for more advanced features -- like task scheduling, batch processing, job prioritization, and retrying of failed tasks -- you should look into a full-blown solution. Check out Celery , RQ , or Huey .

Grab the final code from the simple-task-queue repo.

The Definitive Guide to Celery and FastAPI

Learn how to add Celery to a FastAPI application to provide asynchronous task processing.

Recommended Tutorials

Stay sharp with course updates.

Join our mailing list to be notified about updates and new releases.

Send Us Feedback

Advisory boards aren’t only for executives. Join the LogRocket Content Advisory Board today →

LogRocket blog logo

  • Product Management
  • Solve User-Reported Issues
  • Find Issues Faster
  • Optimize Conversion and Adoption
  • Start Monitoring for Free

Optimizing task queues with Celery and Flask

task queue database

If you’ve stumbled upon this article, chances are you’re familiar with Flask and you’re working on adding a feature to your web app that takes quite a few seconds (if not more) to execute. Maybe you want to know if there is a better or faster way to do so.

Celery Logo Over a Green Yellow and Blue Background

Some common examples include:

  • Calling a third-party API to fetch some data based on user input
  • Sending an email to the user on Sign Up
  • Generating a PDF report

These types of tasks block the request/response cycle until it completes, meaning the user would need to wait a while.

To offload long-running tasks like these, you can use Celery , which provides a mechanism to offload these tasks to separate worker threads.

Celery communicates via messages, usually using a broker to mediate between clients and workers. To initiate a task, the Celery client adds a message to the queue, and the broker then delivers that message to a worker.

The most commonly used brokers are Redis and RabbitMQ . We’ll set up a Redis server locally to make use of this mechanism.

Prerequisites

  • Python 3.6+
  • Virtualenv v20+

Additionally, intermediate knowledge of Python and Flask is expected. Everything else will be explained as the article progresses.

Setting up the project

Download the starter project and set it up using the following commands:

Open http://127.0.0.1:5000/ in your browser, and, if everything works well, you should be able to see “Hello, world!”.

Hello World Browser

Next, let’s add a route that will contain a Button that, when clicked, will trigger a mock long-running task, such as sending an email, generating a PDF report, calling a third-party API, etc.

We’ll mock this API by using time.sleep() , which will block the running of the application for 15 seconds.

Open app.py and add the following block of code.

Make sure to import the time module by adding the following, along with the import statements at the top of the file:

Next, create a directory named templates in the root of the project. Inside that, create a new file named tasks.html and add the following:

Your project structure should look something like this:

Back in the terminal, stop and restart the Flask server again, then open http://127.0.0.1:5000/tasks in your browser. You should see the tasks.html page rendered with a single button.

Trigger Long-Running Task

Now, when you click on the Trigger Long-Running Task button, it will execute to the route /long_running_task , which will execute the function def long_running_task() as defined in the app.py file.

Notice that the page will be in the “loading” state for 15 seconds, so your application is stuck in that state and cannot perform any other operation until the current one is complete.

After 15 seconds, you should see the task completed, and the expected response in the browser.

Task Completed

Also, note that you’ll be able to see the print statements in the terminal window while the long-running task is being executed.

Print Statements

Now, let’s see how we can use Celery to run this task in the background.

In case you had any problems, you should be able to see the current state of your project here.

Setting up Celery and Redis

You have already installed the Celery python package in the initial setup. To confirm the installation of the package, you can run pip freeze in your terminal window with the virtualenv activated to see all the packages installed.

task queue database

Over 200k developers use LogRocket to create better digital experiences

task queue database

Next, you need to install Redis Server on your local machine. You can find the official installation instructions here .

Now, let’s set up Celery.

Getting started with Celery

Create a new file in the project root called celery_utils.py . This will be used to initialize the Celery app instance, similar to how we have a Flask app initialized in app.py . Add the following code to the file:

Here’s a brief explanation:

  • The Celery python package is imported
  • name : this is the name of the Celery worker
  • backend : this is the URL of the backend to be used, which, in this case, is Redis, and the host URL is defined with variable CELERY_BROKER_URL
  • broker : similar to the backend, it’s required to define the URL of the broker, which is also the Redis server
  • <explain ContextTask>
  • <return instance of celery app>

Next, let’s use Celery to define a long-running task. Make the following changes in app.py :

Add the following near the import statements.

Add the following after the statements initializing the Flask app:

Next, add the following block of code toward the bottom of the file:

Here, we simply defined a function named sending_email_with_celery() , which will mock the functionality of sending an email that could take 15 seconds to complete.

However, to make this function run as a background task, the decorator @celery.task is added on the line just above the function definition.

If you’re not familiar with decorators in Python, here’s a good article to get started .

Finally, define a route to trigger this function:

In this code block, we define the route /long_running_task_celery , which triggers a function as a Celery task. Notice that the function is called by using the delay() method.

This indicates that we want to run this function as a Celery task, not as a regular Python function.

Finally, to see this in action, let’s add another button in tasks.html to trigger this function.

Note: Here’s the GitHub repo for this section.

Time to see it in action!

Make sure you have the Flask server running in a terminal window.

Terminal Window Flask

In another terminal window, cd to the root of the project and run the following command to start the Celery worker.

Celery Worker

Open http://127.0.0.1:5000/tasks  in your browser, where you should see two buttons:

  • Triggers a long-running function with Python
  • Triggers a long-running function with Celery

We’ve already seen that if we trigger a long-running function with Python, the server is stuck until the execution of that function is complete.

Now, if you click on the button Trigger Long-Running Task with Celery , you’ll see that the page instantly redirects to the route /long_running_task_celery , and you’ll see an expected output in the browser window.

Triggered With Celery

In the background, the execution of the function is being done by Celery. To see the logs of the function running, switch to the terminal window where you started the Celery worker. It should look something like this:

Celery Worker Logs

That’s it! You now know how to set up and run long-running tasks with Celery in your Flask web application. Here’s a quick recap. To run a function as a Celery task, you need to:

Get set up with LogRocket's modern error tracking in minutes:

  • Visit https://logrocket.com/signup/ to get an app ID

Install LogRocket via npm or script tag. LogRocket.init() must be called client-side, not server-side

  • Import the instance of Celery app in your file
  • Add decorator @celery.task on top of the function definition
  • Run the function using the function_name.delay() method

Share this:

  • Click to share on Twitter (Opens in new window)
  • Click to share on Reddit (Opens in new window)
  • Click to share on LinkedIn (Opens in new window)
  • Click to share on Facebook (Opens in new window)

task queue database

Stop guessing about your digital experience with LogRocket

Recent posts:.

Using CRDTs To Build Collaborative Rust Web Applications

Using CRDTs to build collaborative Rust web applications

CRDTs, or conflict-free replicated data types, is a concept that underlies applications facing the issue of data replication across a […]

task queue database

Guide to using TensorFlow in Rust

We explore the fusion of TensorFlow and Rust, delving into how we can integrate these two technologies to build and train a neural network.

task queue database

Using SignalDB with React: A complete guide

SignalDB enables automatic data synchronization between your components and a local in-memory or persistent database.

task queue database

A guide to Next.js layouts and nested layouts

Understanding how layouts, nested layouts, and custom layouts work in Next.js is crucial for building complex, user-friendly projects.

task queue database

Leave a Reply Cancel reply

  • Español – América Latina
  • Português – Brasil
  • Tiếng Việt
  • Documentation
  • Cloud Functions

Enqueue functions with Cloud Tasks

Task queue functions take advantage of Google Cloud Tasks to help your app run time-consuming, resource-intensive, or bandwidth-limited tasks asynchronously, outside your main application flow.

For example, imagine that you want to create backups of a large set of image files that are currently hosted on an API with a rate limit. In order to be a responsible consumer of that API, you need to respect their rate limits. Plus, this kind of long-running job could be vulnerable to failure due to timeouts and memory limits.

To mitigate this complexity, you can write a task queue function that sets basic task options like scheduleTime , and dispatchDeadline , and then hands the function off to a queue in Cloud Tasks. The Cloud Tasks environment is designed specifically to ensure effective congestion control and retry policies for these kinds of operations.

The Firebase SDK for Cloud Functions for Firebase v3.20.1 and higher interoperates with Firebase Admin SDK v10.2.0 and higher to support task queue functions.

Using task queue functions with Firebase can result in charges for Cloud Tasks processing. See Cloud Tasks pricing for more information.

Create task queue functions

To use task queue functions, follow this workflow:

  • Write a task queue function using the Firebase SDK for Cloud Functions.
  • Test your function by triggering it with an HTTP request.
  • Deploy your function with the Firebase CLI. When deploying your task queue function for the first time, the CLI will create a task queue in Cloud Tasks with options (rate limiting and retry) specified in your source code.
  • Add tasks to the newly created task queue, passing along parameters to set up an execution schedule if needed. You can achieve this by writing the code using the Admin SDK and deploying it to Cloud Functions for Firebase.

Write task queue functions

Use onDispatch to get started writing task queue functions. An important part of writing a task queue function is to set per-queue retry and rate- limiting configuration. Code samples in this page are based on an app that sets up a service that backs up all images from NASA's Astronomy Picture of the Day :

Configure task queue functions

Task queue functions come with a powerful set of configuration settings to precisely control rate limits and retry behavior of a task queue:

  • retryConfig.maxAttempts=5 : Each task in the task queue is automatically retried up to 5 times. This helps mitigate transient errors like network errors or temporary service disruption of a dependent, external service.
  • retryConfig.minBackoffSeconds=60 : Each task is retried at least 60 seconds apart from each attempt. This provides a large buffer between each attempt so we don't rush to exhaust the 5 retry attempts too quickly.
  • rateLimits.maxConcurrentDispatch=6 : At most 6 tasks are dispatched at a given time. This helps ensure a steady stream of requests to the underlying function and helps reduce the number of active instances and cold starts.

Test task queue functions

Task queue functions in the Firebase Local Emulator Suite are exposed as simple HTTP functions. You can test an emulated task function by sending an HTTP POST request with a json data payload:

Deploy task queue functions

Deploy task queue function using the Firebase CLI:

When deploying a task queue function for the first time, the CLI creates a task queue in Cloud Tasks with options (rate limiting and retry) specified in your source code.

If you encounter permissions errors when deploying functions, make sure that the appropriate IAM roles are assigned to the user running the deployment commands.

Enqueue task queue functions

Task queue functions can be enqueued in Cloud Tasks from a trusted server environment like Cloud Functions for Firebase using the Firebase Admin SDK for Node.js. If you're new to the Admin SDKs, see Add Firebase to a server to get started.

In a typical flow, the Admin SDK creates a new task, enqueues it in Cloud Tasks, and sets the configuration for the task:

  • scheduleDelaySeconds : The sample code tries to spread out execution of tasks by associating a delay of Nth minutes for the Nth task. This translates to triggering ~ 1 task/minute. Note that you can also use scheduleTime if you want Cloud Tasks to trigger a task at a specific time.
  • dispatchDeadlineSeconds : Maximum amount of time Cloud Tasks will wait for a task to complete. Cloud Tasks will retry the task following the retry configuration of the queue or until this deadline is reached. In the sample, the queue is configured to retry the task up to 5 times, but the task is automatically cancelled if the whole process (including retry attempts) takes more than 5 minutes.

Troubleshooting

Turn on cloud tasks logging.

Logs from Cloud Tasks contain useful diagnostic information like the status of the request associated with a task. By default, logs from Cloud Tasks are turned off due to the large volume of logs it can potentially generate on your project. We recommend you turn on the debug logs while you are actively developing and debugging your task queue functions. See Turning on logging .

IAM Permissions

You may see PERMISSION DENIED errors when enqueueing tasks or when Cloud Tasks tries to invoke your task queue functions. Ensure that your project has the following IAM bindings:

The identity used to enqueue tasks to Cloud Tasks needs cloudtasks.tasks.create IAM permission.

In the sample, this is the App Engine default service account

The identity used to enqueue tasks to Cloud Tasks needs permission to use the service account associated with a task in Cloud Tasks.

In the sample, this is the App Engine default service account .

See Google Cloud IAM documentation for instructions on how to add the App Engine default service account as a user of the App Engine default service account.

The identity used to trigger the task queue function needs cloudfunctions.functions.invoke permission.

Except as otherwise noted, the content of this page is licensed under the Creative Commons Attribution 4.0 License , and code samples are licensed under the Apache 2.0 License . For details, see the Google Developers Site Policies . Java is a registered trademark of Oracle and/or its affiliates.

Last updated 2024-02-21 UTC.

Why a database is not always the right tool for a queue based system

Imagine that you have a web application for customers to upload a text document, which then converts it into a PDF, and then emails it back to the customer. This site receives a lot of PDF creation requests every second and every request takes a second to process. An orderly, organized queue is needed to be able to handle all requests in a timely manner.

User scenario

  • Customer uploads a text document
  • Your application converts the text document into a PDF
  • Your application emails the PDF back to the customer

Database for a queue based system

Database for a queue based system

In this kind of situation, you might consider using a database for your PDF job queue. Typically you would create a database table that has a queue with records representing PDF requests. You would then place a flag in the table representing which state the task is in and whether the task is completed or not.

You need to write code to insert the new requests into the database. Code that takes an input from the database, perhaps change a status column, with values such as "NEW" and "PROCESSING", code that handles the request, more code that again updates the database status field to "FINISHED", and more code to remove the request from the queue.

Database vs message queue

A message queue is built to handle this kind of scenario, making it easy to queue or dequeue messages.

Another thing that you should keep in mind when you build a solution where you let your database act as a queue, is that your application has to ask the server over and over again for the latest queued requests. Messages that have already been consumed need to be filtered out. The number of results received might need to be limited.

To operate effectively, you might need to poll the database quickly and frequently. Of course, this adds a significant load to the database.

Frequent DB queries

The database table is both read from and written to. Usually, it is a quick operation to add, update or query data from a table, but now you will have to deal with all of these happening in the same table. Queuing/dequeuing will impact each other under heavy loads, causing lock conditions, transaction deadlocks, IO timeouts, etc.

Note: As long as you don't have very high-performance throughput requirements and if you do not have to communicate with any other systems then a database will probably work fine.

Now imagine that your web application is getting used by more and more customers. You have to queue many PDF creation requests in your database and you want to be able to process more requests per second on a concurrent basis. You can connect more workers (processes that handle the requests) or you can scale your solution by adding more machines into your pool of resources. You can even add more power (CPU, RAM) to your existing machine to be able to handle the high load. You decide to connect more workers to your database.

Lock on messages and manual handling of the complexity

When a second worker is added, it’s possible that the two workers will try to handle the same request at the same time. You need to implement some kind of row lock on the queue so that the same request never gets picked up by concurrent receivers.

transactions

Each worker needs to acquire some sort of a lock on the request. Once the exclusive lock has been acquired for the row the system needs to handle the update (e.g. update a status to "PROCESSING"), and after that, the lock will need to be released (e.g. by committing the transaction) as soon as possible so that other workers can access the queue too.

You can also imagine what happens when one of your workers gets a network issue and can not process the request. The request has to be added back to the queue, the status has to be changed back somehow or you have to use transactions and lock it until you have processed the document - meaning that you will lock the queue for other workers.

All this complexity will have to be managed by the programmer so even more code has to be written. A decent message queue will take care of a lot of this automatically and you don't have to worry about any deadlocks or race conditions (when your system attempts to perform two or more operations at the same time).

Manual cleanup

If you don’t clean up your database table regularly or delete requests as soon as they have been handled, your database will keep growing until the performance of your database takes a hit. Eventually, it will become a serious operation problem for you.

The locking challenges and the scan process to find more work will make you run into scalability hang-ups and it will typically also slow down your database.

Message queues

We have now talked a lot of the limitations that you might notice if you use a database for a queue based system (if the system is heavily used or when the system most likely needs to scale up when it is growing). We have talked about the constant polling of messages from the queue, the manual cleanups of messages, deadlocks, race conditions and failed processing that has to be handled manually. All these makes it fairly clear that a database table is not the right tool for a queue based system. It's time to understand when and why the message queue shines!

A message queue only needs to ensure that a message is stored and successfully processed, it doesn't need to bother with locking and writing transactions. You can receive your PDF processing request and let the worker do all the processing, the worker can then move on to the next message of the queue. Message queue scaling is much easier than scaling databases; all that needs to be done is to simply add another worker when the queue starts to fill up. If you get too many connections or some other operation that requires a lot of CPU, simply add more CPU power to your message queue server. In a message queue, you don’t need to worry about any race conditions or locks keeping you out of the system.

Pushed in real-time

Messages from a message queue are pushed in real-time instead of periodically polled from a database. A significantly higher volume of concurrent messages can be supported efficiently using a message queue. Messages in a message queue are automatically cleaned up after being received.

Acknowledgment

You may wonder what happens if one of the workers starts to handle a request and dies when it is partly finished. If a worker dies, the task should be delivered to another worker or held in the queue until the worker is ready to receive the request again. In order to make sure a message is never lost, a good message queue supports message acknowledgments. An acknowledgment is sent back from the worker to tell the message queue that a particular message has been received and processed and that the message queue is free to delete it. If a worker dies without sending an acknowledgment, the message queue will understand that a message wasn't processed fully and will redeliver it to the queue and to another worker. That way you can be sure that no message is lost.

Databases are great for storing information, like product information on a webshop. It is great when you need to ask questions about the data or sort it in different categories. Using a message queue is highly recommended when you need to process a high-volume of asynchronous messages.

A message queue is perfect to use when you want a high performance, highly concurrent and scalable system that can process thousands of messages per second concurrently across many servers/processes.

If you feel that it's about time to get started with your first service using a message queue, check out our article: RabbitMQ for beginners, what is RabbitMQ?

Please email us at if you have any suggestions or feedback.

Enjoy this article? Don't forget to share it with others. 😉

task queue database

Lovisa Johansson

"The Optimal RabbitMQ Guide"

CloudAMQP - industry leading RabbitMQ as a service

Start your managed cluster today. CloudAMQP is 100% free to try.

Start your FREE plan today!

13,000+ users including these smart companies

DraftKings

Connections vs. Queues

Driver notes and prerequisites, generating job classes, class structure, unique jobs, encrypted jobs, rate limiting, preventing job overlaps, throttling exceptions, delayed dispatching, synchronous dispatching, jobs & database transactions, job chaining.

  • Customizing The Queue and Connection

Specifying Max Job Attempts / Timeout Values

Error handling, defining batchable jobs, dispatching batches, chains and batches, adding jobs to batches, inspecting batches, cancelling batches, batch failures, pruning batches, storing batches in dynamodb, queueing closures, the queue:work command, queue priorities, queue workers and deployment, job expirations and timeouts, supervisor configuration, cleaning up after failed jobs, retrying failed jobs, ignoring missing models, pruning failed jobs, storing failed jobs in dynamodb, disabling failed job storage, failed job events, clearing jobs from queues, monitoring your queues, faking a subset of jobs, testing job chains, testing job batches, introduction.

While building your web application, you may have some tasks, such as parsing and storing an uploaded CSV file, that take too long to perform during a typical web request. Thankfully, Laravel allows you to easily create queued jobs that may be processed in the background. By moving time intensive tasks to a queue, your application can respond to web requests with blazing speed and provide a better user experience to your customers.

Laravel queues provide a unified queueing API across a variety of different queue backends, such as Amazon SQS , Redis , or even a relational database.

Laravel's queue configuration options are stored in your application's config/queue.php configuration file. In this file, you will find connection configurations for each of the queue drivers that are included with the framework, including the database, Amazon SQS , Redis , and Beanstalkd drivers, as well as a synchronous driver that will execute jobs immediately (for use during local development). A null queue driver is also included which discards queued jobs.

[!NOTE] Laravel now offers Horizon, a beautiful dashboard and configuration system for your Redis powered queues. Check out the full Horizon documentation for more information.

Before getting started with Laravel queues, it is important to understand the distinction between "connections" and "queues". In your config/queue.php configuration file, there is a connections configuration array. This option defines the connections to backend queue services such as Amazon SQS, Beanstalk, or Redis. However, any given queue connection may have multiple "queues" which may be thought of as different stacks or piles of queued jobs.

Note that each connection configuration example in the queue configuration file contains a queue attribute. This is the default queue that jobs will be dispatched to when they are sent to a given connection. In other words, if you dispatch a job without explicitly defining which queue it should be dispatched to, the job will be placed on the queue that is defined in the queue attribute of the connection configuration:

Some applications may not need to ever push jobs onto multiple queues, instead preferring to have one simple queue. However, pushing jobs to multiple queues can be especially useful for applications that wish to prioritize or segment how jobs are processed, since the Laravel queue worker allows you to specify which queues it should process by priority. For example, if you push jobs to a high queue, you may run a worker that gives them higher processing priority:

In order to use the database queue driver, you will need a database table to hold the jobs. To generate a migration that creates this table, run the queue:table Artisan command. Once the migration has been created, you may migrate your database using the migrate command:

Finally, don't forget to instruct your application to use the database driver by updating the QUEUE_CONNECTION variable in your application's .env file:

In order to use the redis queue driver, you should configure a Redis database connection in your config/database.php configuration file.

[!WARNING] The serializer and compression Redis options are not supported by the redis queue driver.

Redis Cluster

If your Redis queue connection uses a Redis Cluster, your queue names must contain a key hash tag . This is required in order to ensure all of the Redis keys for a given queue are placed into the same hash slot:

When using the Redis queue, you may use the block_for configuration option to specify how long the driver should wait for a job to become available before iterating through the worker loop and re-polling the Redis database.

Adjusting this value based on your queue load can be more efficient than continually polling the Redis database for new jobs. For instance, you may set the value to 5 to indicate that the driver should block for five seconds while waiting for a job to become available:

[!WARNING] Setting block_for to 0 will cause queue workers to block indefinitely until a job is available. This will also prevent signals such as SIGTERM from being handled until the next job has been processed.

Other Driver Prerequisites

The following dependencies are needed for the listed queue drivers. These dependencies may be installed via the Composer package manager:

  • Amazon SQS: aws/aws-sdk-php ~3.0
  • Beanstalkd: pda/pheanstalk ~4.0
  • Redis: predis/predis ~1.0 or phpredis PHP extension

Creating Jobs

By default, all of the queueable jobs for your application are stored in the app/Jobs directory. If the app/Jobs directory doesn't exist, it will be created when you run the make:job Artisan command:

The generated class will implement the Illuminate\Contracts\Queue\ShouldQueue interface, indicating to Laravel that the job should be pushed onto the queue to run asynchronously.

[!NOTE] Job stubs may be customized using stub publishing .

Job classes are very simple, normally containing only a handle method that is invoked when the job is processed by the queue. To get started, let's take a look at an example job class. In this example, we'll pretend we manage a podcast publishing service and need to process the uploaded podcast files before they are published:

In this example, note that we were able to pass an Eloquent model directly into the queued job's constructor. Because of the SerializesModels trait that the job is using, Eloquent models and their loaded relationships will be gracefully serialized and unserialized when the job is processing.

If your queued job accepts an Eloquent model in its constructor, only the identifier for the model will be serialized onto the queue. When the job is actually handled, the queue system will automatically re-retrieve the full model instance and its loaded relationships from the database. This approach to model serialization allows for much smaller job payloads to be sent to your queue driver.

handle Method Dependency Injection

The handle method is invoked when the job is processed by the queue. Note that we are able to type-hint dependencies on the handle method of the job. The Laravel service container automatically injects these dependencies.

If you would like to take total control over how the container injects dependencies into the handle method, you may use the container's bindMethod method. The bindMethod method accepts a callback which receives the job and the container. Within the callback, you are free to invoke the handle method however you wish. Typically, you should call this method from the boot method of your App\Providers\AppServiceProvider service provider :

[!WARNING] Binary data, such as raw image contents, should be passed through the base64_encode function before being passed to a queued job. Otherwise, the job may not properly serialize to JSON when being placed on the queue.

Queued Relationships

Because all loaded Eloquent model relationships also get serialized when a job is queued, the serialized job string can sometimes become quite large. Furthermore, when a job is deserialized and model relationships are re-retrieved from the database, they will be retrieved in their entirety. Any previous relationship constraints that were applied before the model was serialized during the job queueing process will not be applied when the job is deserialized. Therefore, if you wish to work with a subset of a given relationship, you should re-constrain that relationship within your queued job.

Or, to prevent relations from being serialized, you can call the withoutRelations method on the model when setting a property value. This method will return an instance of the model without its loaded relationships:

If you are using PHP constructor property promotion and would like to indicate that an Eloquent model should not have its relations serialized, you may use the WithoutRelations attribute:

If a job receives a collection or array of Eloquent models instead of a single model, the models within that collection will not have their relationships restored when the job is deserialized and executed. This is to prevent excessive resource usage on jobs that deal with large numbers of models.

[!WARNING] Unique jobs require a cache driver that supports locks . Currently, the memcached , redis , dynamodb , database , file , and array cache drivers support atomic locks. In addition, unique job constraints do not apply to jobs within batches.

Sometimes, you may want to ensure that only one instance of a specific job is on the queue at any point in time. You may do so by implementing the ShouldBeUnique interface on your job class. This interface does not require you to define any additional methods on your class:

In the example above, the UpdateSearchIndex job is unique. So, the job will not be dispatched if another instance of the job is already on the queue and has not finished processing.

In certain cases, you may want to define a specific "key" that makes the job unique or you may want to specify a timeout beyond which the job no longer stays unique. To accomplish this, you may define uniqueId and uniqueFor properties or methods on your job class:

In the example above, the UpdateSearchIndex job is unique by a product ID. So, any new dispatches of the job with the same product ID will be ignored until the existing job has completed processing. In addition, if the existing job is not processed within one hour, the unique lock will be released and another job with the same unique key can be dispatched to the queue.

[!WARNING] If your application dispatches jobs from multiple web servers or containers, you should ensure that all of your servers are communicating with the same central cache server so that Laravel can accurately determine if a job is unique.

Keeping Jobs Unique Until Processing Begins

By default, unique jobs are "unlocked" after a job completes processing or fails all of its retry attempts. However, there may be situations where you would like your job to unlock immediately before it is processed. To accomplish this, your job should implement the ShouldBeUniqueUntilProcessing contract instead of the ShouldBeUnique contract:

Unique Job Locks

Behind the scenes, when a ShouldBeUnique job is dispatched, Laravel attempts to acquire a lock with the uniqueId key. If the lock is not acquired, the job is not dispatched. This lock is released when the job completes processing or fails all of its retry attempts. By default, Laravel will use the default cache driver to obtain this lock. However, if you wish to use another driver for acquiring the lock, you may define a uniqueVia method that returns the cache driver that should be used:

[!NOTE] If you only need to limit the concurrent processing of a job, use the WithoutOverlapping job middleware instead.

Laravel allows you to ensure the privacy and integrity of a job's data via encryption . To get started, simply add the ShouldBeEncrypted interface to the job class. Once this interface has been added to the class, Laravel will automatically encrypt your job before pushing it onto a queue:

Job Middleware

Job middleware allow you to wrap custom logic around the execution of queued jobs, reducing boilerplate in the jobs themselves. For example, consider the following handle method which leverages Laravel's Redis rate limiting features to allow only one job to process every five seconds:

While this code is valid, the implementation of the handle method becomes noisy since it is cluttered with Redis rate limiting logic. In addition, this rate limiting logic must be duplicated for any other jobs that we want to rate limit.

Instead of rate limiting in the handle method, we could define a job middleware that handles rate limiting. Laravel does not have a default location for job middleware, so you are welcome to place job middleware anywhere in your application. In this example, we will place the middleware in an app/Jobs/Middleware directory:

As you can see, like route middleware , job middleware receive the job being processed and a callback that should be invoked to continue processing the job.

After creating job middleware, they may be attached to a job by returning them from the job's middleware method. This method does not exist on jobs scaffolded by the make:job Artisan command, so you will need to manually add it to your job class:

[!NOTE] Job middleware can also be assigned to queueable event listeners, mailables, and notifications.

Although we just demonstrated how to write your own rate limiting job middleware, Laravel actually includes a rate limiting middleware that you may utilize to rate limit jobs. Like route rate limiters , job rate limiters are defined using the RateLimiter facade's for method.

For example, you may wish to allow users to backup their data once per hour while imposing no such limit on premium customers. To accomplish this, you may define a RateLimiter in the boot method of your AppServiceProvider :

In the example above, we defined an hourly rate limit; however, you may easily define a rate limit based on minutes using the perMinute method. In addition, you may pass any value you wish to the by method of the rate limit; however, this value is most often used to segment rate limits by customer:

Once you have defined your rate limit, you may attach the rate limiter to your job using the Illuminate\Queue\Middleware\RateLimited middleware. Each time the job exceeds the rate limit, this middleware will release the job back to the queue with an appropriate delay based on the rate limit duration.

Releasing a rate limited job back onto the queue will still increment the job's total number of attempts . You may wish to tune your tries and maxExceptions properties on your job class accordingly. Or, you may wish to use the retryUntil method to define the amount of time until the job should no longer be attempted.

If you do not want a job to be retried when it is rate limited, you may use the dontRelease method:

[!NOTE] If you are using Redis, you may use the Illuminate\Queue\Middleware\RateLimitedWithRedis middleware, which is fine-tuned for Redis and more efficient than the basic rate limiting middleware.

Laravel includes an Illuminate\Queue\Middleware\WithoutOverlapping middleware that allows you to prevent job overlaps based on an arbitrary key. This can be helpful when a queued job is modifying a resource that should only be modified by one job at a time.

For example, let's imagine you have a queued job that updates a user's credit score and you want to prevent credit score update job overlaps for the same user ID. To accomplish this, you can return the WithoutOverlapping middleware from your job's middleware method:

Any overlapping jobs of the same type will be released back to the queue. You may also specify the number of seconds that must elapse before the released job will be attempted again:

If you wish to immediately delete any overlapping jobs so that they will not be retried, you may use the dontRelease method:

The WithoutOverlapping middleware is powered by Laravel's atomic lock feature. Sometimes, your job may unexpectedly fail or timeout in such a way that the lock is not released. Therefore, you may explicitly define a lock expiration time using the expireAfter method. For example, the example below will instruct Laravel to release the WithoutOverlapping lock three minutes after the job has started processing:

[!WARNING] The WithoutOverlapping middleware requires a cache driver that supports locks . Currently, the memcached , redis , dynamodb , database , file , and array cache drivers support atomic locks.

Sharing Lock Keys Across Job Classes

By default, the WithoutOverlapping middleware will only prevent overlapping jobs of the same class. So, although two different job classes may use the same lock key, they will not be prevented from overlapping. However, you can instruct Laravel to apply the key across job classes using the shared method:

Laravel includes a Illuminate\Queue\Middleware\ThrottlesExceptions middleware that allows you to throttle exceptions. Once the job throws a given number of exceptions, all further attempts to execute the job are delayed until a specified time interval lapses. This middleware is particularly useful for jobs that interact with third-party services that are unstable.

For example, let's imagine a queued job that interacts with a third-party API that begins throwing exceptions. To throttle exceptions, you can return the ThrottlesExceptions middleware from your job's middleware method. Typically, this middleware should be paired with a job that implements time based attempts :

The first constructor argument accepted by the middleware is the number of exceptions the job can throw before being throttled, while the second constructor argument is the number of minutes that should elapse before the job is attempted again once it has been throttled. In the code example above, if the job throws 10 exceptions within 5 minutes, we will wait 5 minutes before attempting the job again.

When a job throws an exception but the exception threshold has not yet been reached, the job will typically be retried immediately. However, you may specify the number of minutes such a job should be delayed by calling the backoff method when attaching the middleware to the job:

Internally, this middleware uses Laravel's cache system to implement rate limiting, and the job's class name is utilized as the cache "key". You may override this key by calling the by method when attaching the middleware to your job. This may be useful if you have multiple jobs interacting with the same third-party service and you would like them to share a common throttling "bucket":

[!NOTE] If you are using Redis, you may use the Illuminate\Queue\Middleware\ThrottlesExceptionsWithRedis middleware, which is fine-tuned for Redis and more efficient than the basic exception throttling middleware.

Dispatching Jobs

Once you have written your job class, you may dispatch it using the dispatch method on the job itself. The arguments passed to the dispatch method will be given to the job's constructor:

If you would like to conditionally dispatch a job, you may use the dispatchIf and dispatchUnless methods:

In new Laravel applications, the sync driver is the default queue driver. This driver executes jobs synchronously in the foreground of the current request, which is often convenient during local development. If you would like to actually begin queueing jobs for background processing, you may specify a different queue driver within your application's config/queue.php configuration file.

If you would like to specify that a job should not be immediately available for processing by a queue worker, you may use the delay method when dispatching the job. For example, let's specify that a job should not be available for processing until 10 minutes after it has been dispatched:

[!WARNING] The Amazon SQS queue service has a maximum delay time of 15 minutes.

Dispatching After the Response is Sent to the Browser

Alternatively, the dispatchAfterResponse method delays dispatching a job until after the HTTP response is sent to the user's browser if your web server is using FastCGI. This will still allow the user to begin using the application even though a queued job is still executing. This should typically only be used for jobs that take about a second, such as sending an email. Since they are processed within the current HTTP request, jobs dispatched in this fashion do not require a queue worker to be running in order for them to be processed:

You may also dispatch a closure and chain the afterResponse method onto the dispatch helper to execute a closure after the HTTP response has been sent to the browser:

If you would like to dispatch a job immediately (synchronously), you may use the dispatchSync method. When using this method, the job will not be queued and will be executed immediately within the current process:

While it is perfectly fine to dispatch jobs within database transactions, you should take special care to ensure that your job will actually be able to execute successfully. When dispatching a job within a transaction, it is possible that the job will be processed by a worker before the parent transaction has committed. When this happens, any updates you have made to models or database records during the database transaction(s) may not yet be reflected in the database. In addition, any models or database records created within the transaction(s) may not exist in the database.

Thankfully, Laravel provides several methods of working around this problem. First, you may set the after_commit connection option in your queue connection's configuration array:

When the after_commit option is true , you may dispatch jobs within database transactions; however, Laravel will wait until the open parent database transactions have been committed before actually dispatching the job. Of course, if no database transactions are currently open, the job will be dispatched immediately.

If a transaction is rolled back due to an exception that occurs during the transaction, the jobs that were dispatched during that transaction will be discarded.

[!NOTE] Setting the after_commit configuration option to true will also cause any queued event listeners, mailables, notifications, and broadcast events to be dispatched after all open database transactions have been committed.

Specifying Commit Dispatch Behavior Inline

If you do not set the after_commit queue connection configuration option to true , you may still indicate that a specific job should be dispatched after all open database transactions have been committed. To accomplish this, you may chain the afterCommit method onto your dispatch operation:

Likewise, if the after_commit configuration option is set to true , you may indicate that a specific job should be dispatched immediately without waiting for any open database transactions to commit:

Job chaining allows you to specify a list of queued jobs that should be run in sequence after the primary job has executed successfully. If one job in the sequence fails, the rest of the jobs will not be run. To execute a queued job chain, you may use the chain method provided by the Bus facade. Laravel's command bus is a lower level component that queued job dispatching is built on top of:

In addition to chaining job class instances, you may also chain closures:

[!WARNING] Deleting jobs using the $this->delete() method within the job will not prevent chained jobs from being processed. The chain will only stop executing if a job in the chain fails.

Chain Connection and Queue

If you would like to specify the connection and queue that should be used for the chained jobs, you may use the onConnection and onQueue methods. These methods specify the queue connection and queue name that should be used unless the queued job is explicitly assigned a different connection / queue:

Chain Failures

When chaining jobs, you may use the catch method to specify a closure that should be invoked if a job within the chain fails. The given callback will receive the Throwable instance that caused the job failure:

[!WARNING] Since chain callbacks are serialized and executed at a later time by the Laravel queue, you should not use the $this variable within chain callbacks.

Customizing The Queue a Connection

Dispatching to a particular queue.

By pushing jobs to different queues, you may "categorize" your queued jobs and even prioritize how many workers you assign to various queues. Keep in mind, this does not push jobs to different queue "connections" as defined by your queue configuration file, but only to specific queues within a single connection. To specify the queue, use the onQueue method when dispatching the job:

Alternatively, you may specify the job's queue by calling the onQueue method within the job's constructor:

Dispatching to a Particular Connection

If your application interacts with multiple queue connections, you may specify which connection to push a job to using the onConnection method:

You may chain the onConnection and onQueue methods together to specify the connection and the queue for a job:

Alternatively, you may specify the job's connection by calling the onConnection method within the job's constructor:

Max Attempts

If one of your queued jobs is encountering an error, you likely do not want it to keep retrying indefinitely. Therefore, Laravel provides various ways to specify how many times or for how long a job may be attempted.

One approach to specifying the maximum number of times a job may be attempted is via the --tries switch on the Artisan command line. This will apply to all jobs processed by the worker unless the job being processed specifies the number of times it may be attempted:

If a job exceeds its maximum number of attempts, it will be considered a "failed" job. For more information on handling failed jobs, consult the failed job documentation . If --tries=0 is provided to the queue:work command, the job will be retried indefinitely.

You may take a more granular approach by defining the maximum number of times a job may be attempted on the job class itself. If the maximum number of attempts is specified on the job, it will take precedence over the --tries value provided on the command line:

If you need dynamic control over a particular job's maximum attempts, you may define a tries method on the job:

Time Based Attempts

As an alternative to defining how many times a job may be attempted before it fails, you may define a time at which the job should no longer be attempted. This allows a job to be attempted any number of times within a given time frame. To define the time at which a job should no longer be attempted, add a retryUntil method to your job class. This method should return a DateTime instance:

[!NOTE] You may also define a tries property or retryUntil method on your queued event listeners .

Max Exceptions

Sometimes you may wish to specify that a job may be attempted many times, but should fail if the retries are triggered by a given number of unhandled exceptions (as opposed to being released by the release method directly). To accomplish this, you may define a maxExceptions property on your job class:

In this example, the job is released for ten seconds if the application is unable to obtain a Redis lock and will continue to be retried up to 25 times. However, the job will fail if three unhandled exceptions are thrown by the job.

Often, you know roughly how long you expect your queued jobs to take. For this reason, Laravel allows you to specify a "timeout" value. By default, the timeout value is 60 seconds. If a job is processing for longer than the number of seconds specified by the timeout value, the worker processing the job will exit with an error. Typically, the worker will be restarted automatically by a process manager configured on your server .

The maximum number of seconds that jobs can run may be specified using the --timeout switch on the Artisan command line:

If the job exceeds its maximum attempts by continually timing out, it will be marked as failed.

You may also define the maximum number of seconds a job should be allowed to run on the job class itself. If the timeout is specified on the job, it will take precedence over any timeout specified on the command line:

Sometimes, IO blocking processes such as sockets or outgoing HTTP connections may not respect your specified timeout. Therefore, when using these features, you should always attempt to specify a timeout using their APIs as well. For example, when using Guzzle, you should always specify a connection and request timeout value.

[!WARNING] The pcntl PHP extension must be installed in order to specify job timeouts. In addition, a job's "timeout" value should always be less than its "retry after" value. Otherwise, the job may be re-attempted before it has actually finished executing or timed out.

Failing on Timeout

If you would like to indicate that a job should be marked as failed on timeout, you may define the $failOnTimeout property on the job class:

If an exception is thrown while the job is being processed, the job will automatically be released back onto the queue so it may be attempted again. The job will continue to be released until it has been attempted the maximum number of times allowed by your application. The maximum number of attempts is defined by the --tries switch used on the queue:work Artisan command. Alternatively, the maximum number of attempts may be defined on the job class itself. More information on running the queue worker can be found below .

Manually Releasing a Job

Sometimes you may wish to manually release a job back onto the queue so that it can be attempted again at a later time. You may accomplish this by calling the release method:

By default, the release method will release the job back onto the queue for immediate processing. However, you may instruct the queue to not make the job available for processing until a given number of seconds has elapsed by passing an integer or date instance to the release method:

Manually Failing a Job

Occasionally you may need to manually mark a job as "failed". To do so, you may call the fail method:

If you would like to mark your job as failed because of an exception that you have caught, you may pass the exception to the fail method. Or, for convenience, you may pass a string error message which will be converted to an exception for you:

[!NOTE] For more information on failed jobs, check out the documentation on dealing with job failures .

Job Batching

Laravel's job batching feature allows you to easily execute a batch of jobs and then perform some action when the batch of jobs has completed executing. Before getting started, you should create a database migration to build a table which will contain meta information about your job batches, such as their completion percentage. This migration may be generated using the queue:batches-table Artisan command:

To define a batchable job, you should create a queueable job as normal; however, you should add the Illuminate\Bus\Batchable trait to the job class. This trait provides access to a batch method which may be used to retrieve the current batch that the job is executing within:

To dispatch a batch of jobs, you should use the batch method of the Bus facade. Of course, batching is primarily useful when combined with completion callbacks. So, you may use the then , catch , and finally methods to define completion callbacks for the batch. Each of these callbacks will receive an Illuminate\Bus\Batch instance when they are invoked. In this example, we will imagine we are queueing a batch of jobs that each process a given number of rows from a CSV file:

The batch's ID, which may be accessed via the $batch->id property, may be used to query the Laravel command bus for information about the batch after it has been dispatched.

[!WARNING] Since batch callbacks are serialized and executed at a later time by the Laravel queue, you should not use the $this variable within the callbacks.

Naming Batches

Some tools such as Laravel Horizon and Laravel Telescope may provide more user-friendly debug information for batches if batches are named. To assign an arbitrary name to a batch, you may call the name method while defining the batch:

Batch Connection and Queue

If you would like to specify the connection and queue that should be used for the batched jobs, you may use the onConnection and onQueue methods. All batched jobs must execute within the same connection and queue:

You may define a set of chained jobs within a batch by placing the chained jobs within an array. For example, we may execute two job chains in parallel and execute a callback when both job chains have finished processing:

Conversely, you may run batches of jobs within a chain by defining batches within the chain. For example, you could first run a batch of jobs to release multiple podcasts then a batch of jobs to send the release notifications:

Sometimes it may be useful to add additional jobs to a batch from within a batched job. This pattern can be useful when you need to batch thousands of jobs which may take too long to dispatch during a web request. So, instead, you may wish to dispatch an initial batch of "loader" jobs that hydrate the batch with even more jobs:

In this example, we will use the LoadImportBatch job to hydrate the batch with additional jobs. To accomplish this, we may use the add method on the batch instance that may be accessed via the job's batch method:

[!WARNING] You may only add jobs to a batch from within a job that belongs to the same batch.

The Illuminate\Bus\Batch instance that is provided to batch completion callbacks has a variety of properties and methods to assist you in interacting with and inspecting a given batch of jobs:

Returning Batches From Routes

All Illuminate\Bus\Batch instances are JSON serializable, meaning you can return them directly from one of your application's routes to retrieve a JSON payload containing information about the batch, including its completion progress. This makes it convenient to display information about the batch's completion progress in your application's UI.

To retrieve a batch by its ID, you may use the Bus facade's findBatch method:

Sometimes you may need to cancel a given batch's execution. This can be accomplished by calling the cancel method on the Illuminate\Bus\Batch instance:

As you may have noticed in the previous examples, batched jobs should typically determine if their corresponding batch has been cancelled before continuing execution. However, for convenience, you may assign the SkipIfBatchCancelled middleware to the job instead. As its name indicates, this middleware will instruct Laravel to not process the job if its corresponding batch has been cancelled:

When a batched job fails, the catch callback (if assigned) will be invoked. This callback is only invoked for the first job that fails within the batch.

Allowing Failures

When a job within a batch fails, Laravel will automatically mark the batch as "cancelled". If you wish, you may disable this behavior so that a job failure does not automatically mark the batch as cancelled. This may be accomplished by calling the allowFailures method while dispatching the batch:

Retrying Failed Batch Jobs

For convenience, Laravel provides a queue:retry-batch Artisan command that allows you to easily retry all of the failed jobs for a given batch. The queue:retry-batch command accepts the UUID of the batch whose failed jobs should be retried:

Without pruning, the job_batches table can accumulate records very quickly. To mitigate this, you should schedule the queue:prune-batches Artisan command to run daily:

By default, all finished batches that are more than 24 hours old will be pruned. You may use the hours option when calling the command to determine how long to retain batch data. For example, the following command will delete all batches that finished over 48 hours ago:

Sometimes, your jobs_batches table may accumulate batch records for batches that never completed successfully, such as batches where a job failed and that job was never retried successfully. You may instruct the queue:prune-batches command to prune these unfinished batch records using the unfinished option:

Likewise, your jobs_batches table may also accumulate batch records for cancelled batches. You may instruct the queue:prune-batches command to prune these cancelled batch records using the cancelled option:

Laravel also provides support for storing batch meta information in DynamoDB instead of a relational database. However, you will need to manually create a DynamoDB table to store all of the batch records.

Typically, this table should be named job_batches , but you should name the table based on the value of the queue.batching.table configuration value within your application's queue configuration file.

DynamoDB Batch Table Configuration

The job_batches table should have a string primary partition key named application and a string primary sort key named id . The application portion of the key will contain your application's name as defined by the name configuration value within your application's app configuration file. Since the application name is part of the DynamoDB table's key, you can use the same table to store job batches for multiple Laravel applications.

In addition, you may define ttl attribute for your table if you would like to take advantage of automatic batch pruning .

DynamoDB Configuration

Next, install the AWS SDK so that your Laravel application can communicate with Amazon DynamoDB:

Then, set the queue.batching.driver configuration option's value to dynamodb . In addition, you should define key , secret , and region configuration options within the batching configuration array. These options will be used to authenticate with AWS. When using the dynamodb driver, the queue.batching.database configuration option is unnecessary:

Pruning Batches in DynamoDB

When utilizing DynamoDB to store job batch information, the typical pruning commands used to prune batches stored in a relational database will not work. Instead, you may utilize DynamoDB's native TTL functionality to automatically remove records for old batches.

If you defined your DynamoDB table with a ttl attribute, you may define configuration parameters to instruct Laravel how to prune batch records. The queue.batching.ttl_attribute configuration value defines the name of the attribute holding the TTL, while the queue.batching.ttl configuration value defines the number of seconds after which a batch record can be removed from the DynamoDB table, relative to the last time the record was updated:

Instead of dispatching a job class to the queue, you may also dispatch a closure. This is great for quick, simple tasks that need to be executed outside of the current request cycle. When dispatching closures to the queue, the closure's code content is cryptographically signed so that it can not be modified in transit:

Using the catch method, you may provide a closure that should be executed if the queued closure fails to complete successfully after exhausting all of your queue's configured retry attempts :

[!WARNING] Since catch callbacks are serialized and executed at a later time by the Laravel queue, you should not use the $this variable within catch callbacks.

Running the Queue Worker

Laravel includes an Artisan command that will start a queue worker and process new jobs as they are pushed onto the queue. You may run the worker using the queue:work Artisan command. Note that once the queue:work command has started, it will continue to run until it is manually stopped or you close your terminal:

[!NOTE] To keep the queue:work process running permanently in the background, you should use a process monitor such as Supervisor to ensure that the queue worker does not stop running.

You may include the -v flag when invoking the queue:work command if you would like the processed job IDs to be included in the command's output:

Remember, queue workers are long-lived processes and store the booted application state in memory. As a result, they will not notice changes in your code base after they have been started. So, during your deployment process, be sure to restart your queue workers . In addition, remember that any static state created or modified by your application will not be automatically reset between jobs.

Alternatively, you may run the queue:listen command. When using the queue:listen command, you don't have to manually restart the worker when you want to reload your updated code or reset the application state; however, this command is significantly less efficient than the queue:work command:

Running Multiple Queue Workers

To assign multiple workers to a queue and process jobs concurrently, you should simply start multiple queue:work processes. This can either be done locally via multiple tabs in your terminal or in production using your process manager's configuration settings. When using Supervisor , you may use the numprocs configuration value.

Specifying the Connection and Queue

You may also specify which queue connection the worker should utilize. The connection name passed to the work command should correspond to one of the connections defined in your config/queue.php configuration file:

By default, the queue:work command only processes jobs for the default queue on a given connection. However, you may customize your queue worker even further by only processing particular queues for a given connection. For example, if all of your emails are processed in an emails queue on your redis queue connection, you may issue the following command to start a worker that only processes that queue:

Processing a Specified Number of Jobs

The --once option may be used to instruct the worker to only process a single job from the queue:

The --max-jobs option may be used to instruct the worker to process the given number of jobs and then exit. This option may be useful when combined with Supervisor so that your workers are automatically restarted after processing a given number of jobs, releasing any memory they may have accumulated:

Processing All Queued Jobs and Then Exiting

The --stop-when-empty option may be used to instruct the worker to process all jobs and then exit gracefully. This option can be useful when processing Laravel queues within a Docker container if you wish to shutdown the container after the queue is empty:

Processing Jobs for a Given Number of Seconds

The --max-time option may be used to instruct the worker to process jobs for the given number of seconds and then exit. This option may be useful when combined with Supervisor so that your workers are automatically restarted after processing jobs for a given amount of time, releasing any memory they may have accumulated:

Worker Sleep Duration

When jobs are available on the queue, the worker will keep processing jobs with no delay in between jobs. However, the sleep option determines how many seconds the worker will "sleep" if there are no jobs available. Of course, while sleeping, the worker will not process any new jobs:

Maintenance Mode and Queues

While your application is in maintenance mode , no queued jobs will be handled. The jobs will continue to be handled as normal once the application is out of maintenance mode.

To force your queue workers to process jobs even if maintenance mode is enabled, you may use --force option:

Resource Considerations

Daemon queue workers do not "reboot" the framework before processing each job. Therefore, you should release any heavy resources after each job completes. For example, if you are doing image manipulation with the GD library, you should free the memory with imagedestroy when you are done processing the image.

Sometimes you may wish to prioritize how your queues are processed. For example, in your config/queue.php configuration file, you may set the default queue for your redis connection to low . However, occasionally you may wish to push a job to a high priority queue like so:

To start a worker that verifies that all of the high queue jobs are processed before continuing to any jobs on the low queue, pass a comma-delimited list of queue names to the work command:

Since queue workers are long-lived processes, they will not notice changes to your code without being restarted. So, the simplest way to deploy an application using queue workers is to restart the workers during your deployment process. You may gracefully restart all of the workers by issuing the queue:restart command:

This command will instruct all queue workers to gracefully exit after they finish processing their current job so that no existing jobs are lost. Since the queue workers will exit when the queue:restart command is executed, you should be running a process manager such as Supervisor to automatically restart the queue workers.

[!NOTE] The queue uses the cache to store restart signals, so you should verify that a cache driver is properly configured for your application before using this feature.

Job Expiration

In your config/queue.php configuration file, each queue connection defines a retry_after option. This option specifies how many seconds the queue connection should wait before retrying a job that is being processed. For example, if the value of retry_after is set to 90 , the job will be released back onto the queue if it has been processing for 90 seconds without being released or deleted. Typically, you should set the retry_after value to the maximum number of seconds your jobs should reasonably take to complete processing.

[!WARNING] The only queue connection which does not contain a retry_after value is Amazon SQS. SQS will retry the job based on the Default Visibility Timeout which is managed within the AWS console.

Worker Timeouts

The queue:work Artisan command exposes a --timeout option. By default, the --timeout value is 60 seconds. If a job is processing for longer than the number of seconds specified by the timeout value, the worker processing the job will exit with an error. Typically, the worker will be restarted automatically by a process manager configured on your server :

The retry_after configuration option and the --timeout CLI option are different, but work together to ensure that jobs are not lost and that jobs are only successfully processed once.

[!WARNING] The --timeout value should always be at least several seconds shorter than your retry_after configuration value. This will ensure that a worker processing a frozen job is always terminated before the job is retried. If your --timeout option is longer than your retry_after configuration value, your jobs may be processed twice.

In production, you need a way to keep your queue:work processes running. A queue:work process may stop running for a variety of reasons, such as an exceeded worker timeout or the execution of the queue:restart command.

For this reason, you need to configure a process monitor that can detect when your queue:work processes exit and automatically restart them. In addition, process monitors can allow you to specify how many queue:work processes you would like to run concurrently. Supervisor is a process monitor commonly used in Linux environments and we will discuss how to configure it in the following documentation.

Installing Supervisor

Supervisor is a process monitor for the Linux operating system, and will automatically restart your queue:work processes if they fail. To install Supervisor on Ubuntu, you may use the following command:

[!NOTE] If configuring and managing Supervisor yourself sounds overwhelming, consider using Laravel Forge , which will automatically install and configure Supervisor for your production Laravel projects.

Configuring Supervisor

Supervisor configuration files are typically stored in the /etc/supervisor/conf.d directory. Within this directory, you may create any number of configuration files that instruct supervisor how your processes should be monitored. For example, let's create a laravel-worker.conf file that starts and monitors queue:work processes:

In this example, the numprocs directive will instruct Supervisor to run eight queue:work processes and monitor all of them, automatically restarting them if they fail. You should change the command directive of the configuration to reflect your desired queue connection and worker options.

[!WARNING] You should ensure that the value of stopwaitsecs is greater than the number of seconds consumed by your longest running job. Otherwise, Supervisor may kill the job before it is finished processing.

Starting Supervisor

Once the configuration file has been created, you may update the Supervisor configuration and start the processes using the following commands:

For more information on Supervisor, consult the Supervisor documentation .

Dealing With Failed Jobs

Sometimes your queued jobs will fail. Don't worry, things don't always go as planned! Laravel includes a convenient way to specify the maximum number of times a job should be attempted . After an asynchronous job has exceeded this number of attempts, it will be inserted into the failed_jobs database table. Synchronously dispatched jobs that fail are not stored in this table and their exceptions are immediately handled by the application.

A migration to create the failed_jobs table is typically already present in new Laravel applications. However, if your application does not contain a migration for this table, you may use the queue:failed-table command to create the migration:

When running a queue worker process, you may specify the maximum number of times a job should be attempted using the --tries switch on the queue:work command. If you do not specify a value for the --tries option, jobs will only be attempted once or as many times as specified by the job class' $tries property:

Using the --backoff option, you may specify how many seconds Laravel should wait before retrying a job that has encountered an exception. By default, a job is immediately released back onto the queue so that it may be attempted again:

If you would like to configure how many seconds Laravel should wait before retrying a job that has encountered an exception on a per-job basis, you may do so by defining a backoff property on your job class:

If you require more complex logic for determining the job's backoff time, you may define a backoff method on your job class:

You may easily configure "exponential" backoffs by returning an array of backoff values from the backoff method. In this example, the retry delay will be 1 second for the first retry, 5 seconds for the second retry, 10 seconds for the third retry, and 10 seconds for every subsequent retry if there are more attempts remaining:

When a particular job fails, you may want to send an alert to your users or revert any actions that were partially completed by the job. To accomplish this, you may define a failed method on your job class. The Throwable instance that caused the job to fail will be passed to the failed method:

[!WARNING] A new instance of the job is instantiated before invoking the failed method; therefore, any class property modifications that may have occurred within the handle method will be lost.

To view all of the failed jobs that have been inserted into your failed_jobs database table, you may use the queue:failed Artisan command:

The queue:failed command will list the job ID, connection, queue, failure time, and other information about the job. The job ID may be used to retry the failed job. For instance, to retry a failed job that has an ID of ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece , issue the following command:

If necessary, you may pass multiple IDs to the command:

You may also retry all of the failed jobs for a particular queue:

To retry all of your failed jobs, execute the queue:retry command and pass all as the ID:

If you would like to delete a failed job, you may use the queue:forget command:

[!NOTE] When using Horizon , you should use the horizon:forget command to delete a failed job instead of the queue:forget command.

To delete all of your failed jobs from the failed_jobs table, you may use the queue:flush command:

When injecting an Eloquent model into a job, the model is automatically serialized before being placed on the queue and re-retrieved from the database when the job is processed. However, if the model has been deleted while the job was waiting to be processed by a worker, your job may fail with a ModelNotFoundException .

For convenience, you may choose to automatically delete jobs with missing models by setting your job's deleteWhenMissingModels property to true . When this property is set to true , Laravel will quietly discard the job without raising an exception:

You may prune the records in your application's failed_jobs table by invoking the queue:prune-failed Artisan command:

By default, all the failed job records that are more than 24 hours old will be pruned. If you provide the --hours option to the command, only the failed job records that were inserted within the last N number of hours will be retained. For example, the following command will delete all the failed job records that were inserted more than 48 hours ago:

Laravel also provides support for storing your failed job records in DynamoDB instead of a relational database table. However, you must manually create a DynamoDB table to store all of the failed job records. Typically, this table should be named failed_jobs , but you should name the table based on the value of the queue.failed.table configuration value within your application's queue configuration file.

The failed_jobs table should have a string primary partition key named application and a string primary sort key named uuid . The application portion of the key will contain your application's name as defined by the name configuration value within your application's app configuration file. Since the application name is part of the DynamoDB table's key, you can use the same table to store failed jobs for multiple Laravel applications.

In addition, ensure that you install the AWS SDK so that your Laravel application can communicate with Amazon DynamoDB:

Next, set the queue.failed.driver configuration option's value to dynamodb . In addition, you should define key , secret , and region configuration options within the failed job configuration array. These options will be used to authenticate with AWS. When using the dynamodb driver, the queue.failed.database configuration option is unnecessary:

You may instruct Laravel to discard failed jobs without storing them by setting the queue.failed.driver configuration option's value to null . Typically, this may be accomplished via the QUEUE_FAILED_DRIVER environment variable:

If you would like to register an event listener that will be invoked when a job fails, you may use the Queue facade's failing method. For example, we may attach a closure to this event from the boot method of the AppServiceProvider that is included with Laravel:

[!NOTE] When using Horizon , you should use the horizon:clear command to clear jobs from the queue instead of the queue:clear command.

If you would like to delete all jobs from the default queue of the default connection, you may do so using the queue:clear Artisan command:

You may also provide the connection argument and queue option to delete jobs from a specific connection and queue:

[!WARNING] Clearing jobs from queues is only available for the SQS, Redis, and database queue drivers. In addition, the SQS message deletion process takes up to 60 seconds, so jobs sent to the SQS queue up to 60 seconds after you clear the queue might also be deleted.

If your queue receives a sudden influx of jobs, it could become overwhelmed, leading to a long wait time for jobs to complete. If you wish, Laravel can alert you when your queue job count exceeds a specified threshold.

To get started, you should schedule the queue:monitor command to run every minute . The command accepts the names of the queues you wish to monitor as well as your desired job count threshold:

Scheduling this command alone is not enough to trigger a notification alerting you of the queue's overwhelmed status. When the command encounters a queue that has a job count exceeding your threshold, an Illuminate\Queue\Events\QueueBusy event will be dispatched. You may listen for this event within your application's EventServiceProvider in order to send a notification to you or your development team:

When testing code that dispatches jobs, you may wish to instruct Laravel to not actually execute the job itself, since the job's code can be tested directly and separately of the code that dispatches it. Of course, to test the job itself, you may instantiate a job instance and invoke the handle method directly in your test.

You may use the Queue facade's fake method to prevent queued jobs from actually being pushed to the queue. After calling the Queue facade's fake method, you may then assert that the application attempted to push jobs to the queue:

You may pass a closure to the assertPushed or assertNotPushed methods in order to assert that a job was pushed that passes a given "truth test". If at least one job was pushed that passes the given truth test then the assertion will be successful:

If you only need to fake specific jobs while allowing your other jobs to execute normally, you may pass the class names of the jobs that should be faked to the fake method:

You may fake all jobs except for a set of specified jobs using the except method:

To test job chains, you will need to utilize the Bus facade's faking capabilities. The Bus facade's assertChained method may be used to assert that a chain of jobs was dispatched. The assertChained method accepts an array of chained jobs as its first argument:

As you can see in the example above, the array of chained jobs may be an array of the job's class names. However, you may also provide an array of actual job instances. When doing so, Laravel will ensure that the job instances are of the same class and have the same property values of the chained jobs dispatched by your application:

You may use the assertDispatchedWithoutChain method to assert that a job was pushed without a chain of jobs:

Testing Chained Batches

If your job chain contains a batch of jobs , you may assert that the chained batch matches your expectations by inserting a Bus::chainedBatch definition within your chain assertion:

The Bus facade's assertBatched method may be used to assert that a batch of jobs was dispatched. The closure given to the assertBatched method receives an instance of Illuminate\Bus\PendingBatch , which may be used to inspect the jobs within the batch:

You may use the assertBatchCount method to assert that a given number of batches were dispatched:

You may use assertNothingBatched to assert that no batches were dispatched:

Testing Job / Batch Interaction

In addition, you may occasionally need to test an individual job's interaction with its underlying batch. For example, you may need to test if a job cancelled further processing for its batch. To accomplish this, you need to assign a fake batch to the job via the withFakeBatch method. The withFakeBatch method returns a tuple containing the job instance and the fake batch:

Using the before and after methods on the Queue facade , you may specify callbacks to be executed before or after a queued job is processed. These callbacks are a great opportunity to perform additional logging or increment statistics for a dashboard. Typically, you should call these methods from the boot method of a service provider . For example, we may use the AppServiceProvider that is included with Laravel:

Using the looping method on the Queue facade , you may specify callbacks that execute before the worker attempts to fetch a job from a queue. For example, you might register a closure to rollback any transactions that were left open by a previously failed job:

workflow_Task_Queue_data table size very large. Using SQL Express database.

Profile image

I have a workflow database that is growing out of the size that is supported by SQL Express.  I have reduced the days the logs are kept but it appears that it is going to continue to grow.  I have some workflows that are very complicated and may be taking up the space however they have been working for months before this.  

I need to know what I can do to reduce the size so I can continue to use SQL Express.  Or determine if this is how its going to be and recommend to the customer to move to Full SQL.  

The customer is looking at reports run from the forms database but are not looking at anything from the Workflow database. 

Table Sizes.PNG

  • Most popular
  • Oldest to newest
  • Newest to oldest

Hello, 

I have been working with Tech Support on this. I don't have many active workflows.  My Persisted Instances are in the low hundreds.  I do have a very complicated workflow that updates a website however no running instances of it.  I suspect the data is stored as completed data in the database but it seems there really isn't a good way to clean it out.  

My next step if the customer OK's it is to export my workflows and create a new Workflow Database.  This of course well kill any running workflows and we will have to attempt to restart them.  

I have adjusted the Advanced Server Options as shown a few days ago but as of yet the size of the database has not dropped.

Advanced Server Options Current.PNG

You should be able to tie it back to the workflow and instances it belongs to by joining the table on instance_id to the search_instance table:

Maintenance options are unlikely to apply here since the instances with open tasks would be still in progress and not subject to cleanup.

After you find out what workflow they tie back to, what would the next steps be?

You take a look at what the workflow definition is doing. Usually a low number of instances with a large number of tasks/activities indicates an inefficient way of processing large data sets (entries, SQL tables, etc).

Since information stays around for the duration of the workflow instance plus the retention period, the size of the database can be affected if you run large workflows.

For the record, setting the retention for instance data to 1 day may make it hard to investigate an issue like this if the instance details are gone before you get a chance to take a look.

Another thing to note is that SQL does not shrink databases on its own, so even if the data is purged from the database, the size on disk will not change.

What are the dbo.workflow_task_queue and dbo.workflow_task_queue_data tables used for? How/why is information inserted and removed from the tables?

That looks like an infinite loop. Or possibly task deletion has been turned off. Probably best you have it investigated through Tech Support.

Sign in to reply to this post.

  • Asked October 6, 2017
  • Updated May 10, 2022

Post Editors

Build an LLM-Powered Data Agent for Data Analysis

task queue database

An AI agent is a system consisting of planning capabilities, memory, and tools to perform tasks requested by a user. For complex tasks such as data analytics or interacting with complex systems, your application may depend on ‌collaboration among different types of agents. For more context, see Introduction to LLM Agents and Building Your First LLM Agent Application .

This post explains the agent types required to build an accurate LLM application that can handle nuanced data analysis tasks when queried. It walks through an example use case for building a data analyst agent application, including code snippets. Finally, it provides some considerations for AI developers to consider when optimizing and building LLM agent apps.

LLM agent types for data analysis tasks

To begin, this section explains two main types of LLM agents and how they work—data agents and API or execution agents. I’ll also present an agent swarm use case, which involves multiple agents collaborating to solve a problem. Note that these agent types somewhat represent the ends of a spectrum. Blended, purpose-built agents can be created for specific use cases.

Data agents

Data agents are typically designed for an extractive goal. In other words, data agents assist users in extracting information from a wide range of data sources. They help with assistive reasoning tasks. 

For example, a financial analyst might ask, “In how many quarters of this year did the company have a positive cash flow?” This type of question requires reasoning, search (structured, unstructured, or both), and planning capabilities.

API or execution agents

API or execution agents are designed for an execution goal. These agents carry out a task or set of tasks requested by a user.

Consider the same financial analyst working with an Excel spreadsheet that contains the past year’s closing prices for 10 stocks. The analyst wants to organize these closing prices according to one or more statistical formulas. Excel APIs need to be chained together to perform this task. For another API agent example, see the Google Places API Copilot Demo .

Agent swarms

Agent swarms involve multiple data agents and multiple API agents collaborating in a decentralized manner to solve a complex problem . Agent swarms are designed for workflows that include both extractive and execution tasks that require different forms of planning and agent core harnesses. 

For example, imagine that the financial analyst wants to study the top five fast food stocks for investment planning. The sequence of actions needed to reach this goal are outlined below and in Figure 1.

  • Mine stock prices. The data agent hits a structured database with SQL or pandas or Quandl API.
  • Extract more relevant information from 10-K and 10-Q reports. Execute search engine calls to get forms using data agent. Extract information using data agent retrieval-augmented generation (RAG) calls.
  • Store the information in Excel. API agent calls the Excel API.
  • Extract user sentiment from social media content. Execute social media API calls using data mining with data agent. Perform sentiment analysis using RAG data agent.
  • Use preselected metrics to generate indicators using API agent (Sheets API).
  • Generate the report using API agent.
  • Upload key graphs, plots, and charts to a PowerPoint slide using API agent (PowerPoint API).

Flowchart diagram showing a natural workflow for multiple agents collaborating together to solve a problem.

As more types of LLM agents are modeled, they can interact with each other in the agent swarm to effectively solve problems. Constraining the problem into different agent verticals enables building agents with smaller models. This requires less effort for customization and retains modularity, which in turn provides benefits for adding new features, selecting the features you want, and simplifying deployment scaling. In this ecosystem, every agent looks at another agent like a tool and uses its help when required. 

Building a data analyst agent

With this general taxonomy as a foundation, this section dives into building a data agent for a use case of talking to an SQL database for inventory management. The following discussion assumes you have read Building Your First LLM Agent Application , or are otherwise familiar with the basics of LLM agents.

Choose an LLM

Begin by identifying which LLM to use. This example uses the Mixtral 8x7B LLM available in the NVIDIA NGC catalog. It accelerates various models and makes them available as APIs. The first API calls per model are free for experimentation. 

Note that if you’re working with a model that isn’t tuned to handle agent workflows, you can reformulate the prompts below as a series of multiple-choice questions (MCQs). This should work, as most of the models are instruction-tuned to handle MCQs. Learn more about fine-tuning . 

Select a use case

Next, select a use case. The use case for this post is talking to an SQL database for inventory management. Then populate that database with, for example, three tables. 

Note that the information presented below is for exemplary purposes only and is not intended to convey actual details.

For experimentation, store the preceding entries in an SQLite database. These entries are tailored for the schema shown in Figure 2. The intention is to create a simplified version of a database that is typically at the heart of any inventory management system. These databases contain information about current inventory levels, suppliers, and more.

Three tables containing the database schema: Inventory, Product, and Supplier.

LLM agent components

An LLM agent contains four key components: tools, memory module, planning module, and agent core. Details about the components for this example are provided below.

This example uses the following two tools: 

  • Calculator: For any basic calculations needed after querying the data. To keep it simple, an LLM is used here. Any service or API can be added to solve said problem.
  • SQL Query Executor: For querying the database for raw data.

A simple buffer or list to keep track of all the agent’s actions.

A linear greedy approach. To achieve this, create a “faux tool” for “generate the final answer.” This idea is addressed further in the section below. 

Time to put everything together. The prompt for the agent core LLM looks something like this:

The preceding prompt includes all the tools and related information. You can design the core’s code harness in a way such that, given any tool but the “generate the final answer” faux tool, the agent will append the results of the tool used to memory and re-access the situation. This is an iterative greedy approach where the “best” decision is made for the individual step.

In summary, a data agent has access to planning capability, memory, multiple data access tools, and means of performing related analytical tasks. Figure 3 shows the general architecture of a data agent.

Agent has a core, memory, planning module, and tools.

Data agent example

This section provides an example that showcases how different tools can be used together to solve business questions. The key advantage for using an agent in a scenario like this is that the user doesn’t need to know details about the database or the technical skill to run queries.  

Question : “How much excess inventory do I have for Google Pixel 6?” Answer : Based on the retrieved information from the inventory system, you currently have 80 units of excess inventory for Google Pixel 6. This calculation is derived by subtracting the minimum required quantity (20) from the current quantity in stock (100). (See above for source data.)

task queue database

To solve this question, the agent performed the following steps:

Step 1: QueryDB tool

  • Generate SQL
  • Query database
  • Store results in memory

Step 2: Calculator tool

  • Solve math problem using LLM
  • Store results in memory. Note that this can be replaced with code generation plus code execution. To learn more about dynamic tools, see Build an LLM-Powered API Agent for Task Execution .

Step 3: Final answer generation

The following prompt is for the agent core LLM after SQL is generated.

Key considerations when building data agent applications

Keep in mind the following key considerations when building your LLM agent application.

Scaling the tools

Imagine a case with 100K tables and 100 tools, rather than three tables and three tools. One way to accommodate this type of scaling is to add an intermediate RAG step. This step might pull in the top five most relevant tools for the agent to select from. This can apply to memory, database schema, or any other options that the agent needs to consider.

Working with multiple vector databases

You can also build a topical router to direct the queries to the correct database in situations with multiple SQL or vector databases.

Better planning for implementation

A simple linear solver to implement a greedy iterative solution is featured here. It can be replaced by a task decomposition module or a plan compiler of sorts to generate a more efficient plan of execution. 

This post has explained the basics of how to build an LLM agent application for data analytics to help familiarize you with the concepts behind building agents. I highly recommend exploring the open-source ecosystem to select the best agent framework for your application. 

Ready to build your own LLM data agent for production? Check out the AI Chatbot with Retrieval-Augmented Generation free hands-on lab to help you build reliable and scalable solutions.

To read more about LLM agents, see Build an LLM-Powered API Agent for Task Execution .

Related resources

  • GTC session: Streamlining Enterprise Data Operations with Multimodal RAG and LangChain
  • GTC session: Application of LLMs/LLaMA for Financial Timeseries Forecasting
  • GTC session: LLMOps: The New Frontier of Machine Learning Operations
  • NGC Containers: genai-llm-playground
  • Webinar: Implementing Large Language Models
  • Webinar: Bringing Generative AI to Life with NVIDIA Jetson

About the Authors

Avatar photo

Related posts

task queue database

Build an LLM-Powered API Agent for Task Execution

Stylized image of a computer monitor on a purple background and the words Part 2.

Building Your First LLM Agent Application

Stylized computer monitor on a purple background and the words Part 1.

Introduction to LLM Agents

Image of headphones with speech bubbles saying hello in five languages.

Explainer: What Is Agent Assist?

Tag cloud of AI terms in initials A.I.

Explainer: What Is AI Computing?

Illustration demonstrating RAG.

Evaluating Retriever for Enterprise-Grade RAG

Computer-generated image of a canal with bounding boxes for houses, trees, boats, and people.

Top Computer Vision/Video Analytics Sessions at NVIDIA GTC 2024

Retrieval-Augmented Generation Conference Sessions at GTC

Top Retrieval-Augmented Generation (RAG) Sessions at NVIDIA GTC 2024 Sessions

Three examples of synthetic tabular data generation visuals.

AWS Database Blog

Create and run aws dms tasks using aws step functions.

AWS Database Migration Service (AWS DMS) is a managed service that helps you migrate databases to AWS easily and securely. It supports various database sources and targets, including Amazon Relational Database Service (Amazon RDS), Amazon Aurora , Amazon Redshift , Amazon Simple Storage Service (Amazon S3), and more. With AWS DMS, you can migrate your data to and from most widely used commercial and open source databases.

AWS Step Functions is a serverless workflow orchestration service that allows you to coordinate and visualize multiple AWS services into serverless workflows. It provides a graphical interface to define the flow and conditions of your workflow using a JSON-based language called Amazon States Language .

In this post, we explore how you can use Step Functions to create and orchestrate AWS DMS tasks.

Solution overview

For this post, we use an S3 bucket as the source and Amazon Aurora PostgreSQL-Compatible Edition as the target database instance. You can automate the AWS DMS task creation by integrating with AWS Lambda and Step Functions. This solution provides an end-to-end pipeline to migrate the data in an automated way. By using Step Functions for AWS DMS task creation, you can simplify the process and gain the following benefits:

  • Automation – You can create a workflow that automates the entire AWS DMS task-creation process, reducing manual effort and potential errors.
  • Orchestration – You can define the flow and dependencies between steps, making it straightforward to manage complex migration scenarios.
  • Monitoring – Step Functions provides built-in logging and monitoring, giving you insights into the progress and status of each step in the replication process. This helps you identify bottlenecks, troubleshoot issues, and monitor the health of your migration.

The following diagram illustrates this architecture.

task queue database

The workflow includes the following steps:

  • A user uploads CSV files to the S3 bucket.
  • When an object is uploaded into the S3 bucket, the ObjectPut event is initiated to invoke a Lambda function.
  • The Lambda function invokes a Step Functions state machine using the StartExecution API call.
  • The state machine orchestrates the creation of the AWS DMS task and initiates a full load from Amazon S3 to Amazon Aurora PostgreSQL.

Prerequisites

To get started, you must complete the following prerequisites:

  • Create a S3 bucket using the AWS Management Console or the AWS Command Line Interface (AWS CLI).
  • Set up an AWS DMS replication instance in the same Region as your source and target.
  • Create an Aurora PostgreSQL cluster.
  • Confirm network connectivity between the AWS DMS replication instance and Aurora PostgreSQL instance.
  • Ensure that the data stored in your S3 bucket is in a format compatible with AWS DMS. A common source format is CSV.
  • Create AWS Identity and Access Management (IAM) roles and policies that grant AWS DMS permissions to access Amazon S3.
  • Create AWS IAM roles and policies that grant AWS State Machine permissions to access AWS DMS.
  • Ensure that you have configured Amazon Simple Notification Service (Amazon SNS) topic for task status.

In the following sections, we provide a step-by-step guide for automating the AWS DMS task.

Create source and target AWS DMS endpoints

We use AWS DMS endpoints to define the source and target databases in a migration task. Endpoints contain the necessary information to connect to the databases, such as server names, credentials, ports, and database names. To create an endpoint for source and target , you can use the AWS DMS console or the AWS CLI.

When using Amazon S3 as a source , the source data files must be in CSV format; for example, /schemaName/<folder-name>/tableName/LOAD001.csv . For example, suppose that your data files are in the S3 path s3://<bucketName>/dmsfolder/dbo/ratings/ratings.csv . The following screenshot shows an example of the data in the ratings.csv file.

task queue database

The following screenshot shows example settings of creating a target endpoint on the AWS DMS console. When using a PostgreSQL database as a target , provide the necessary details such as the PostgreSQL server endpoint, database credentials, and other connection settings.

task queue database

Create an IAM role to invoke Step Functions

Create an IAM role with permission to invoke a Step Functions state machine from a Lambda function. For this post, the state machine is called DMS-S3-postgres-stepFunctions. The Lambda execution role should have permissions to interact with Step Functions, including the necessary start_execution action. See the following code:

Create the Step Functions state machine

To create a Step Functions workflow for creating an AWS DMS replication task, use the Step Functions console or the AWS CLI to create the Step Functions state machine DMS-S3-postgres-stepFunctions

Use the necessary input parameters for creating an AWS DMS task, such as source and target database endpoints, replication instance, and selection rules. These parameters are passed as input to the state machine workflow.

The following screenshot shows an example using the console, in which you use an empty template and enter the state machine code.

task queue database

We use the following state machine code for this post:

Create an IAM role for the Lambda function to access the S3 bucket

Create an IAM role that provides the necessary permissions for a Lambda function to access and interact with S3 buckets. This role is commonly used when creating Lambda functions that are triggered by S3 events or need to perform operations on S3 objects. For more information, refer to How do I allow my Lambda function access to my Amazon S3 bucket?

Create a Lambda function

To invoke Step Functions from a Lambda function, you can use the AWS SDK or the AWS SDK for Python (Boto3) to interact with the Step Functions API. Use the start_execution method of the Step Functions client to invoke the Step Functions state machine by providing the Amazon Resource Name (ARN) of the state machine and the input data.

On the Lambda console, create your Lambda function using the following code (provide the ARN of the state machine you created earlier):

Create a S3 event notification

With the S3 event notification, whenever an object is created or uploaded to the specified S3 bucket, the Lambda function is triggered with the S3 event information as the input, and it will process the event.

To enable Lambda notifications using the Amazon S3 console, complete the following steps:

  • On the Amazon S3 console, navigate to your bucket
  • Choose the Properties tab.
  • In the Event notification section, choose Create event notification .
  • For Prefix , enter dmsfolder/dbo/ratings/ .
  • For Suffix , enter .csv .
  • In the Events type section, for Object creation , select Put .

If you upload a large file, use a multipart upload and select the s3:ObjectCreated:CompleteMultipartUpload event.

task queue database

  • For Destination , select Lambda function .
  • For Specify Lambda function , you can choose from an existing function or create a new one. For this post, we select Enter Lambda function ARN and enter the ARN.
  • Choose Save changes .

task queue database

For more information, refer to Enabling and configuring event notifications using the Amazon S3 console .

Upload the CSV file into Amazon S3

To upload a data file into Amazon S3, you can use the Amazon S3 console or the AWS CLI. For more information, refer to Uploading objects .

task queue database

After you upload the file, you can observe that the AWS DMS full load task creation is in progress. It loads the data from the CSV file you uploaded from Amazon S3 to the Aurora PostgreSQL database instance. To verify the results, connect to target database and run a query on the ratings table.

task queue database

To clean up and prevent further costs, complete the following steps:

  • Drop the database tables in the target data stores.
  • Delete the AWS DMS replication instance .
  • Delete the S3 bucket and CSV files that you uploaded to the S3 bucket.
  • Delete the Lambda function .
  • Delete the Step Functions state machine .

When using Step Functions for creating AWS DMS tasks, you can achieve better control, visibility, flexibility, and scalability in managing your database migration process. It enables you to build robust and resilient migration workflows that can be customized to meet your specific requirements. With the complexity involved in database migrations, we highly recommend testing the migration steps in non-production environments prior to making changes in production.

If you have any questions or suggestions about this post, leave a comment.

About the authors

task queue database

  • Getting Started

Blog Topics

  • Amazon Aurora
  • Amazon DocumentDB
  • Amazon DynamoDB
  • Amazon ElastiCache
  • Amazon Keyspaces (for Apache Cassandra)
  • Amazon Managed Blockchain
  • Amazon MemoryDB for Redis
  • Amazon Neptune
  • Amazon Quantum Ledger Database (Amazon QLDB)
  • Amazon Timestream
  • AWS Database Migration Service
  • AWS Schema Conversion Tool
  •  Twitter
  •  Facebook
  •  LinkedIn
  •  Twitch
  •  Email Updates

Help | Advanced Search

Computer Science > Artificial Intelligence

Title: an interactive agent foundation model.

Abstract: The development of artificial intelligence systems is transitioning from creating static, task-specific models to dynamic, agent-based systems capable of performing well in a wide range of applications. We propose an Interactive Agent Foundation Model that uses a novel multi-task agent training paradigm for training AI agents across a wide range of domains, datasets, and tasks. Our training paradigm unifies diverse pre-training strategies, including visual masked auto-encoders, language modeling, and next-action prediction, enabling a versatile and adaptable AI framework. We demonstrate the performance of our framework across three separate domains -- Robotics, Gaming AI, and Healthcare. Our model demonstrates its ability to generate meaningful and contextually relevant outputs in each area. The strength of our approach lies in its generality, leveraging a variety of data sources such as robotics sequences, gameplay data, large-scale video datasets, and textual information for effective multimodal and multi-task learning. Our approach provides a promising avenue for developing generalist, action-taking, multimodal systems.

Submission history

Access paper:.

  • Download PDF
  • Other Formats

license icon

References & Citations

  • Google Scholar
  • Semantic Scholar

BibTeX formatted citation

BibSonomy logo

Bibliographic and Citation Tools

Code, data and media associated with this article, recommenders and search tools.

  • Institution

arXivLabs: experimental projects with community collaborators

arXivLabs is a framework that allows collaborators to develop and share new arXiv features directly on our website.

Both individuals and organizations that work with arXivLabs have embraced and accepted our values of openness, community, excellence, and user data privacy. arXiv is committed to these values and only works with partners that adhere to them.

Have an idea for a project that will add value for arXiv's community? Learn more about arXivLabs .

  • Work & Careers
  • Life & Arts

Become an FT subscriber

Limited time offer save up to 40% on standard digital.

  • Global news & analysis
  • Expert opinion
  • Special features
  • FirstFT newsletter
  • Videos & Podcasts
  • Android & iOS app
  • FT Edit app
  • 10 gift articles per month

Explore more offers.

Standard digital.

  • FT Digital Edition

Premium Digital

Print + premium digital.

Then $75 per month. Complete digital access to quality FT journalism on any device. Cancel anytime during your trial.

  • 10 additional gift articles per month
  • Global news & analysis
  • Exclusive FT analysis
  • Videos & Podcasts
  • FT App on Android & iOS
  • Everything in Standard Digital
  • Premium newsletters
  • Weekday Print Edition

Complete digital access to quality FT journalism with expert analysis from industry leaders. Pay a year upfront and save 20%.

  • Everything in Print
  • Everything in Premium Digital

The new FT Digital Edition: today’s FT, cover to cover on any device. This subscription does not include access to ft.com or the FT App.

Terms & Conditions apply

Explore our full range of subscriptions.

Why the ft.

See why over a million readers pay to read the Financial Times.

International Edition

task queue database

RECOMMENDED READS

  • I-JEPA: The first AI model based on Yann LeCun’s vision for more human-like AI
  • Celebrating 10 years of FAIR: A decade of advancing the state-of-the-art through open research
  • Turing Award presented to Yann LeCun, Geoffrey Hinton, and Yoshua Bengio
  • Today, we’re publicly releasing the Video Joint Embedding Predictive Architecture (V-JEPA) model, a crucial step in advancing machine intelligence with a more grounded understanding of the world.
  • This early example of a physical world model excels at detecting and understanding highly detailed interactions between objects.
  • In the spirit of responsible open science, we’re releasing this model under a Creative Commons NonCommercial license for researchers to further explore.

As humans, much of what we learn about the world around us—particularly in our early stages of life—is gleaned through observation. Take Newton’s third law of motion: Even an infant (or a cat) can intuit, after knocking several items off a table and observing the results, that what goes up must come down. You don’t need hours of instruction or to read thousands of books to arrive at that result. Your internal world model—a contextual understanding based on a mental model of the world—predicts these consequences for you, and it’s highly efficient.

“V-JEPA is a step toward a more grounded understanding of the world so machines can achieve more generalized reasoning and planning,” says Meta’s VP & Chief AI Scientist Yann LeCun, who proposed the original Joint Embedding Predictive Architectures (JEPA) in 2022. “Our goal is to build advanced machine intelligence that can learn more like humans do, forming internal models of the world around them to learn, adapt, and forge plans efficiently in the service of completing complex tasks.”

Video JEPA in focus

V-JEPA is a non-generative model that learns by predicting missing or masked parts of a video in an abstract representation space. This is similar to how our Image Joint Embedding Predictive Architecture (I-JEPA) compares abstract representations of images (rather than comparing the pixels themselves). Unlike generative approaches that try to fill in every missing pixel, V-JEPA has the flexibility to discard unpredictable information, which leads to improved training and sample efficiency by a factor between 1.5x and 6x.

Because it takes a self-supervised learning approach, V-JEPA is pre-trained entirely with unlabeled data. Labels are only used to adapt the model to a particular task after pre-training. This type of architecture proves more efficient than previous models, both in terms of the number of labeled examples needed and the total amount of effort put into learning even the unlabeled data. With V-JEPA, we’ve seen efficiency boosts on both of these fronts.

With V-JEPA, we mask out a large portion of a video so the model is only shown a little bit of the context. We then ask the predictor to fill in the blanks of what’s missing—not in terms of the actual pixels, but rather as a more abstract description in this representation space.

task queue database

Masking methodology

V-JEPA wasn’t trained to understand one specific type of action. Instead it used self-supervised training on a range of videos and learned a number of things about how the world works. The team also carefully considered the masking strategy—if you don’t block out large regions of the video and instead randomly sample patches here and there, it makes the task too easy and your model doesn’t learn anything particularly complicated about the world.

It’s also important to note that, in most videos, things evolve somewhat slowly over time. If you mask a portion of the video but only for a specific instant in time and the model can see what came immediately before and/or immediately after, it also makes things too easy and the model almost certainly won’t learn anything interesting. As such, the team used an approach where it masked portions of the video in both space and time, which forces the model to learn and develop an understanding of the scene.

Efficient predictions

Making these predictions in the abstract representation space is important because it allows the model to focus on the higher-level conceptual information of what the video contains without worrying about the kind of details that are most often unimportant for downstream tasks. After all, if a video shows a tree, you’re likely not concerned about the minute movements of each individual leaf.

One of the reasons why we’re excited about this direction is that V-JEPA is the first model for video that’s good at “frozen evaluations,” which means we do all of our self-supervised pre-training on the encoder and the predictor, and then we don’t touch those parts of the model anymore. When we want to adapt them to learn a new skill, we just train a small lightweight specialized layer or a small network on top of that, which is very efficient and quick.

task queue database

Previous work had to do full fine-tuning, which means that after pre-training your model, when you want the model to get really good at fine-grained action recognition while you’re adapting your model to take on that task, you have to update the parameters or the weights in all of your model. And then that model overall becomes specialized at doing that one task and it’s not going to be good for anything else anymore. If you want to teach the model a different task, you have to use different data, and you have to specialize the entire model for this other task. With V-JEPA, as we’ve demonstrated in this work, we can pre-train the model once without any labeled data, fix that, and then reuse those same parts of the model for several different tasks, like action classification, recognition of fine-grained object interactions, and activity localization.

task queue database

Avenues for future research...

While the “V” in V-JEPA stands for “video,” it only accounts for the visual content of videos thus far. A more multimodal approach is an obvious next step, so we’re thinking carefully about incorporating audio along with the visuals.

As a proof of concept, the current V-JEPA model excels at fine-grained object interactions and distinguishing detailed object-to-object interactions that happen over time. For example, if the model needs to be able to distinguish between someone putting down a pen, picking up a pen, and pretending to put down a pen but not actually doing it, V-JEPA is quite good compared to previous methods for that high-grade action recognition task. However, those things work on relatively short time scales. If you show V-JEPA a video clip of a few seconds, maybe up to 10 seconds, it’s great for that. So another important step for us is thinking about planning and the model’s ability to make predictions over a longer time horizon.

...and the path toward AMI

To date, our work with V-JEPA has been primarily about perception—understanding the contents of various video streams in order to obtain some context about the world immediately surrounding us. The predictor in this Joint Embedding Predictive Architecture serves as an early physical world model: You don’t have to see everything that’s happening in the frame, and it can tell you conceptually what’s happening there. As a next step, we want to show how we can use this kind of a predictor or world model for planning or sequential decision-making.

We know that it’s possible to train JEPA models on video data without requiring strong supervision and that they can watch videos in the way an infant might—just observing the world passively, learning a lot of interesting things about how to understand the context of those videos in such a way that, with a small amount of labeled data, you can quickly acquire a new task and ability to recognize different actions.

V-JEPA is a research model, and we’re exploring a number of future applications. For example, we expect that the context V-JEPA provides could be useful for our embodied AI work as well as our work to build a contextual AI assistant for future AR glasses. We firmly believe in the value of responsible open science, and that’s why we’re releasing the V-JEPA model under the CC BY-NC license so other researchers can extend this work.

Our latest updates delivered to your inbox

Subscribe to our newsletter to keep up with Meta AI news, events, research breakthroughs, and more.

Join us in the pursuit of what’s possible with AI.

task queue database

Latest Work

Our Actions

Meta © 2024

US House forms AI task force as legislative push stalls

U.S. House Speaker Mike Johnson (R-LA) holds a press conference at Capitol Hill in Washington, U.S.

Reporting by David Shepardson; editing by Miral Fahmy

Our Standards: The Thomson Reuters Trust Principles. , opens new tab

Intuitive Machines’ Odysseus lunar lander captures a wide field of view image of Schomberger crater on the Moon

Moscow bans more EU officials and politicians from Russia in response to sanctions

Russia's foreign ministry said on Friday it had significantly expanded a list of European Union officials and politicians banned from entering Russia in response to the latest round of sanctions by the bloc.

Former Tunisian President Moncef Marzouki speaks at a meeting to launch his new political party in Tunis

IMAGES

  1. How to implement a job queue with Redis

    task queue database

  2. Task queues and why do we need them.

    task queue database

  3. Implementing a Task Queue in SQL

    task queue database

  4. Priority Queue in Data Structure: Implementation & Types by Simplilearn

    task queue database

  5. Simplifying Task Management with Innovative Queue Feature

    task queue database

  6. A Task Queue ML Model Deployment

    task queue database

VIDEO

  1. Task Scheduler

  2. 17. How task queue works in nodejs

  3. Multilevel Queue Scheduling Part 01

  4. Blog /postgres-task-queue demo-1

  5. Tasker

  6. CC-215 Database System

COMMENTS

  1. sql

    The best way to implement a job queue in a relational database system is to use SKIP LOCKED.. SKIP LOCKED is a lock acquisition option that applies to both read/share (FOR SHARE) or write/exclusive (FOR UPDATE) locks and is widely supported nowadays:. Oracle 10g and later; PostgreSQL 9.5 and later; SQL Server 2005 and later

  2. Implementing a Task Queue in SQL

    A task is assigned to exactly 1 worker (or 0 workers) at a time. Once completed, a task is never assigned again. When a task reaches a configured maximum execution time without completing, it will be assigned again to a worker. Let's jump in! Design

  3. Task Queues

    A task queue would handle invoking code to call the GitHub API, process the results and store them in a persistent database for later use. Another example is when a database query would take too long during the HTTP request-response cycle. The query could be performed in the background on a fixed interval with the results stored in the database.

  4. Welcome to Django Q

    Django Q is a native Django task queue, scheduler and worker application using Python multiprocessing. Features ¶ Multiprocessing worker pools Asynchronous tasks Scheduled, cron and repeated tasks Signed and compressed packages Failure and success database or cache Result hooks, groups and chains Django Admin integration

  5. Task Queues

    Task Queues A list of message brokers and task queue libraries spanning many programming languages and implementations. Brokers Message brokers distribute messages from producers to consumers. https://aws.amazon.com/sqs/

  6. Building High Performance Queue in Database for storing Orders

    Queue is a widely used data structure that sometimes have to be created in a database instead of using specialized queue technologies like MSMQ. Running a high performance and highly scalable queue using database technologies is a big challenge and it's hard to maintain when the queue starts to get millions of rows queue and dequeued per day.

  7. Successfully Deploying a Task Queue

    A task queue may run a single task or it may run multiple tasks, either sequentially or concurrently, as part of a larger workflow. Traditional task queue architecture gets complex when considering these dependencies. Challenges with deploying a robust task queue

  8. Create a Simple Task Queue with Flask and Redis

    That's where a task queue can be a game changer. With a task queue, you can shift tasks into the queue to be processed later, allowing you to return a response to the user immediately. ... This will serve as the database that will store our data: 1. pip3 install redis. Next, we'll install the Redis Queue, which is a library for processing ...

  9. Building a task queue, Part 1

    it isn't strictly separated; people often use message queues as job queues too, and vice versa. But I'll be building a task queue here. I think a message queue is another interesting challenge, though. Parts. I like to think of queue systems as having two main parts: the infrastructure and the interface. The infrastructure is what powers the ...

  10. Queue Data Structures: How to Build a Node Task Queue

    A queue is a data structure which holds a collection of items: Any process can send (or enqueue) an item at any time — such as send newsletter X to recipient Y. Any process can receive (or...

  11. Task Queue Overview

    Task queues let applications perform work, called tasks , asynchronously outside of a user request. If an app needs to execute work in the background, it adds tasks to task queues. The tasks are executed later, by worker services. The Task Queue service is designed for asynchronous work. It does not provide strong guarantees around the timing ...

  12. Daskqueue: Dask-based distributed task queue

    A task Queue is usually the best approach when trying to distribute millions of tasks. Implementing distributed system using queues is a common trade-off: trading the increased latency for...

  13. Database-Backed Queues: Ensuring Data Integrity in a ...

    One such tool we introduced recently is the database-backed queue — that is, using a table in a relational database as a queue for asynchronous jobs. ... the age of the oldest task in the queue ...

  14. Developing an Asynchronous Task Queue in Python

    Let's start by creating a basic task: nltk.corpusstopwordsCOMMON_WORDS.txt""r" enqueue(get_word_counts,)queue.enqueue(get_word_counts,)queue.enqueue(get_word_counts,)queue.enqueue(get_word_counts,)count+=4print(f{count} tasks!") This module will create a new instance of Redis and the SimpleQueue class. It will then enqueue 40 tasks.

  15. Optimizing task queues with Celery and Flask

    Celery communicates via messages, usually using a broker to mediate between clients and workers. To initiate a task, the Celery client adds a message to the queue, and the broker then delivers that message to a worker. The most commonly used brokers are Redis and RabbitMQ. We'll set up a Redis server locally to make use of this mechanism.

  16. Enqueue functions with Cloud Tasks

    To use task queue functions, follow this workflow: Write a task queue function using the Firebase SDK for Cloud Functions. Test your function by triggering it with an HTTP request. Deploy...

  17. Why a database is not always the right tool for a queue ...

    Database for a queue based system. In this kind of situation, you might consider using a database for your PDF job queue. Typically you would create a database table that has a queue with records representing PDF requests. You would then place a flag in the table representing which state the task is in and whether the task is completed or not.

  18. Queues

    In order to use the database queue driver, you will need a database table to hold the jobs. To generate a migration that creates this table, run the queue:table Artisan command. Once the migration has been created, you may migrate your database using the migrate command: php artisan queue:table. php artisan migrate.

  19. Web Server Task Queue (How it works?)

    Task queue along with async/await does not make each request takes shorter to run (fetching from database still takes equally long compare to without task queue), it simply unblocks the main ...

  20. java

    use database as a queue of tasks. In one of our java applications (based on postgresql db), we have a database table that maintains a list of tasks to be executed. Each row has a json blob for the details of a task as well as scheduled time value. We have a few java workers/threads whose jobs are to search for tasks that are ready for execution ...

  21. workflow_Task_Queue_data table size very large. Using SQL Express

    select count(a.task_id) , a.instance_id , a.activity_name , b.workflow_id from workflow_task_queue_data a inner join search_instance b on a.instance_id = b.instance_id group by a.instance_id, a.activity_name,b.workflow_id. Maintenance options are unlikely to apply here since the instances with open tasks would be still in progress and not ...

  22. Build an LLM-Powered Data Agent for Data Analysis

    This post explains the agent types required to build an accurate LLM application that can handle nuanced data analysis tasks when queried. It walks through an example use case for building a data analyst agent application, including code snippets. Finally, it provides some considerations for AI developers to consider when optimizing and ...

  23. Create and run AWS DMS tasks using AWS Step Functions

    Create the Step Functions state machine. To create a Step Functions workflow for creating an AWS DMS replication task, use the Step Functions console or the AWS CLI to create the Step Functions state machine DMS-S3-postgres-stepFunctions. Use the necessary input parameters for creating an AWS DMS task, such as source and target database endpoints, replication instance, and selection rules.

  24. [2402.05929] An Interactive Agent Foundation Model

    Download PDF Abstract: The development of artificial intelligence systems is transitioning from creating static, task-specific models to dynamic, agent-based systems capable of performing well in a wide range of applications. We propose an Interactive Agent Foundation Model that uses a novel multi-task agent training paradigm for training AI agents across a wide range of domains, datasets, and ...

  25. Royal Mail hackers LockBit hobbled by global law enforcement

    FBI, UK's NCA and Europol part of coalition that has disrupted cyber crime gang involved in ransomware attacks

  26. V-JEPA: The next step toward advanced machine intelligence

    V-JEPA is a non-generative model that learns by predicting missing or masked parts of a video in an abstract representation space. This is similar to how our Image Joint Embedding Predictive Architecture (I-JEPA) compares abstract representations of images (rather than comparing the pixels themselves). Unlike generative approaches that try to fill in every missing pixel, V-JEPA has the ...

  27. Using SQL Server as a DB queue with multiple clients

    CREATE TABLE Queue ID INT NOT NULL PRIMARY KEY, Command NVARCHAR(100), Processed INT NOT NULL CHECK (Processed in (0,1,2) ), Version timestamp) You grab the top 1 unprocessed row, set the status to underprocessing and set the status back to processed when things are done.

  28. US House forms AI task force as legislative push stalls

    Leaders of the U.S. House of Representatives said Tuesday they are forming a bipartisan task force to explore potential legislation to address concerns around artificial intelligence.