Drupal 7 Queues API

Wednesday, June 15, 2011 - 09:52

The Drupal 7 Queues API is a few feature of Drupal and provides a first in first out data structure that is used internally by Drupal itself and can be completely customised.

The Queues API isn't just a part of the codebase of Drupal, it is used internally as part of several different processes. The Batch API, which allows lots of things to be done at once, is built upon the Queues API and provides some customised queue classes. I won't be going into the batch API in this post, but I might cover it in later posts.

The new cron system in Drupal 7 uses the Queues API heavily. It works by allowing other modules to create queue items during their normal hook_cron() calls, which are then run afterwards. Each module that wants to include an item in the system cron queue can do so by implementing a hook called hook_cron_queue_info(), which tells cron what function to callback when the queue items are retrieved.

For example, the aggregator module updates feeds using cron by creating queue items containing the feed information with the name aggregator_feeds during it's cron function call. These items are then retrieved from the queue and executed by calling the aggregator_refresh() function and passing the information contained in the queue data. The aggregator_refresh() function is defined in aggregator_cron_queue_info() and is used to get the new feed information.

The Update module also makes use of the Queues API, which it uses to store tasks for requests to fetch available update data. These tasks are then retrieved from the queue at a later date and run.

The API uses object oriented principles to generate and define queues so the system is fully customisable.

Why Use Queues?

Before getting into how to use and customise queues I thought it made sense to go into a few real world examples of how to use a queue to accomplish certain tasks.

Batch Processing
If the module you are writing requires processing several items at once, which might not be from the same function call, then it might make sense to add them to a queue. These items can then be executed in one place.

For example, lets say that when you insert a node you want several other nodes to be created that act as inner pages. You would then set up a queue to add these pages in and also to add the menu items needed.

Sequential Processing Of Items
When inserting taxonomy terms you will often want to insert certain terms before others if you want to maintain a parent-child relationship between terms. By using a queue you can make sure that all items are processed one after the other so that any terms that have children will be inserted first, before the child elements are created.

Delay Processing
If you have large amounts of data to run through then doing so all at once might lead to the system overloading. So to spread out the load of these calculations you would add them to a queue and process them either slowly or when the system has the availability.

An example of this might be when creating a payroll system or similar. You need to be sure that every calculation is complete so adding the items to be calculated into a queue means that the system load will be kept to a reasonable level. It also means that if the system does go down the calculations will still be in the queue ready to be processed.

Importing
Importing content is a common task for many Drupal programmers, and sometimes putting together all the data needed for a node can be an expensive process. By figuring out what nodes need to be inserted and adding them to a queue you can then make sure that every node is inserted without overloading the system.

I have been in the situation a couple of items where I have needed to insert large numbers of nodes and used a counting system to keep track of where the system was in the import process. With a queue I wouldn't have needed to write this code as I can be sure that all the items I add to it will be retrieved when I ask for them. In this way I can process as a subsection of the nodes without causing the system to fall over.

Preventing API Service Black Listing
Many third party services (like Twitter) will keep track of the number of API requests that you do. If you exceed a certain limit they will simply block you from doing any more. If you have a large amount of items to process then it might be an idea to use a queue to spread out these items to prevent the API quota being exceeded.

The system.queue.inc File

The queues API is found in the file /modules/system/system.queue.inc. This file contains a couple of interfaces and a few classes that are used to wrap queue functionality.

Reliable vs Non-reliable

Drupal 7 defines two types of queue class that you can use to do different things. These are reliable and non-reliable. Note that the term is non-reliable, not unreliable.

Reliable

With reliable queues you can be sure that every item will executed or processed at least once. A database table is used to store the queue, which means that the queue can exist over several requests. This also means that if the request fails then the queue remains intact.

The only way that Drupal knows that an object is reliable is that it extends the DrupalReliableQueueInterface interface. There is nothing inherently reliable about queue objects that means it is reliable.

This is the most commonly used type of queue and is the sytem default for new queue objects and has a wide variety of uses.

Non-reliable

Non-reliable queues are generally kept in memory only, so all items might exist in a single request. There is no guarantee that all items in the queue will be executed or will be executed in order. If the request fails then the queue might be lost.

An example of this sort of queue might be when sending data for processing to a cloud environment. When you send data over to a cloud environment there is usually little or no complex communication between servers, the data is just sent.

An example of this sort of queue might be when creating a Twitter service bot. There is generally little impact on the user experience if a single Tweet fails so creating a non-reliable queue to send a few Tweets out is an ideal solution.

Drupal Queue Classes

There are three classes within the Queues API that provide the queue functionality. These are DrupalQueue, SystemQueue and MemoryQueue.

DrupalQueue

The DrupalQueue class consists of a single static method called get(). To use it you pass the name of the queue you want to retrieve and the method will return an object that you can use to interact with your queue. The following code is the bare minimum needed to get a queue object.

$my_queue = DrupalQueue::get('my_queue');

The queue object returned will be either a custom queue object or one of the default system objects, depending on what variables have been set. Any queue you create will have a unique set of items that will hang from the queue name that you issued.

You can send a second parameter to the get() method, which is a boolean value that forces the queue object to be reliable. The default value here is false, meaning that the get() method will not force the returned queue to be reliable.

For those of you familiar with object oriented programming you can view DrupalQueue as a factory class, since its only function is to create queue objects. The default returned object is SystemQueue.

SystemQueue

The SystemQueue class is the default queue class in Drupal. It is an example of a reliable queue class and so therefore implements the DrupalReliableQueueInterface interface. This class uses the database table queue to store and retrieve the queue. Any items that are retrieved from the queue are 'leased' to ensure that no two processes get the same queue item.

The queue table is created during the Drupal install process and has the following structure.

ColumnTypeComment
item_idint(10) unsigned Auto IncrementPrimary Key: Unique item ID.
namevarchar(255)The queue name.
datalongblob NULLThe arbitrary data for the item.
expireint(11)Timestamp when the claim lease expires on the item.
createdint(11)Timestamp when the item was created.

MemoryQueue

The MemoryQueue class is an example of a non-reliable queue class and therefore does not implement the DrupalReliableQueueInterface interface. All of the queue items are stored in memory as a $queue parameter in the class. This class also implements item leasing to stop the same items being retrieved from the queue over and over again.

System Variables

Three system variables are used by DrupalQueue to decide what sort of object to return, but setting these variables you can change the output of the get() method. These three variables are described here in the order in which they are used in the get() method.

'queue_class_' . $name
When calling the get() method you can pass in the name of the queue that you want to create. This first variable is used to find any custom classes that you have created. The $name parameter is used to construct a variable name, the value of which is the name of your custom class. If no variable of that name has been created then NULL is returned, in which case no object is created.

queue_default_class
If no object has yet been created then this variable is used to find the default system queue class to be used. The default value here is SystemQueue.

queue_default_reliable_class
If the second parameter of the get() method is true then the get() method will check to make sure that the currently created object (if any) implements the DrupalReliableQueueInterface interface (ie, it is reliable). If it doesn't then the currently created object will be overridden in favour of the default reliable class, which is SystemQueue.

The default behaviour of the get() method is to return a SystemQueue object, but this can be changed by altering these variables.

Using Drupal Queues

Using the Queues API is quite simple. All you need to do is get hold of your queue object and you can manipulate your queue data using that object. The createItem() method lets you add data to the queue and the numberOfItems() method lets you see how many items are present in the queue. The following code gets hold of a queue, adds an item to it, and checks the item count. The item being added here is an array, but this is just to demonstrate that anything can be added to the queue. The MemoryQueue class will store items as they are, whereas the SystemQueue class will serialize the items before storing them in the database.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// Create queue object
$queue = DrupalQueue::get('my_queue');
 
// Create item
$item = array(
  'dataitem1' => 'something',
  'int' => 123
);
 
// Add item to queue
$queue­->createItem($item);
 
// Report on number of items present
echo $queue­->numberOfItems(); // Prints "1"

To create a MemoryQueue object we have to do a little bit of setup before hand. We can either set the queue_default_class variable to be MemoryQueue so that the get() method will give us a MemoryQueue object as a default. Or we can also set a variable called queue_class_memory and set the value to be MemoryQueue. This way we can get hold of a MemoryQueue object without having to alter system settings that might break other modules. Once we have the MemoryQueue object it works in very much the same way.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// Set up the memory queue variable
variable_set('queue_class_memory', 'MemoryQueue');
 
// Get the MemoryQueue object
$queue = DrupalQueue::get('memory');
 
// Set up the item
$item = array(
  'dataitem1' => 'something',
  'int' => 123
);
 
// Add the item to the queue
$queue-­>createItem($item);
 
// Report on number of items
echo $queue­->numberOfItems(); // Prints "1"

To get an item from the queue we use the claimItem() method of the queue object. The return value of this method is an object containing a number of properties, the important one being the data property, which contains whatever we put into the queue.

1
2
3
4
5
6
7
8
// Get the queue
$queue = DrupalQueue::get('my_queue');
 
// Claim item from the queue
$got_item = $queue­->claimItem();
 
// Print data from item
echo $got_item-­>data['dataitem1'];

If no item is found in the queue then claimItem() will return FALSE. This is an important check to make when you are looking for items in your queues, and so you should include this check before you try to use the item object.

1
2
3
4
5
6
7
8
// Claim item from queue
$got_item = $queue­->claimItem();
 
// Make sure we have an item first
if ($got_item !== FALSE) {
  // Use item
  echo $got_item­>data['dataitem1'];
}

When using the SystemQueue class the returned item object contains a data and a item_id property that allows the class to uniquely identify that item later on. So the item we retrieved above looks like the following. The MemoryQueue class returns items that contain slightly more information, like created time, which would otherwise be stored in the queue table and generally isn't needed outside the queue class. It is kept in the MemoryQueue item objects because there is simply no where else to put it.

1
2
3
4
5
6
7
8
9
stdClass Object
(
    [data] => Array
        (
            [dataitem1] => something
            [qwe] => 123
        )
    [item_id] => 89
)

The claimItem() method takes an optional parameter of lease time. This is the amount of time (in seconds) that must pass after the item has been claimed before the item is put back into the queue. The default time for SystemQueue is 24 hours and the default time for MemoryQueue is 30 seconds. You can pass your own lease time to this method to change the amount of time that would elapse before the item can be claimed again. For example, to set the lease time to 100 seconds you would do the following.

$got_item = $queue-­>claimItem(100);

So, once you have the item from the queue you can do pretty much whatever you want with it. It just depends on what you are using the queue for. If you are using the queue to store ID's that you are going to use for inserting content then you can just pass the ID along to your code and do what you need to do with it.

I think the most complex real world example I have used the queue class for is when creating an event syndicator for the NWDUG (North West Drupal User Group). This was a collaborative effort between a few programmers (including myself) and we wanted to use the queue to syndicate our meetups to different event websites. The only data we stored in the queue was the node ID and the service. This meant we could load the latest version of the event node and pass it to a publishevent hook that we had created for that service. Here is that part of the code.

1
2
3
4
5
6
7
// Claim event item
$event = $queue->claimItem();
 
if ($event !== FALSE) {
  // If we have an event then invoke publishevent hook in submodule
  $result = module_invoke($event->data['service'], 'publishevent', node_load($event->data['nid']));
}

Releasing And Deleting Items

Once we have our queue item, and have used it, we can then either release it (reset the lease time) or delete it. To release the item back into the queue use the releaseItem() method, passing the item object as the single parameter. $queue­->releaseItem($got_item); To delete the item from the queue entirely use the deleteItem() method, passing the item object as the single parameter. $queue­->deleteItem($got_item);

Customizing

If the default system classes don't quite do what you need them to you can create your own queue classes that can completely change the way in which the Queues API works. When creating your own queue classes it is probably a good idea to either extend one of the default classes or implement the DrupalQueueInterface. This will give you a common interface that other module devs can work with.

If your queue class is to be reliable, then it MUST implement DrupalReliableQueueInterface, although you can also extend SystemQueue, which is pretty much the same thing. The get() method of DrupalQueue will check to make sure that your object implements DrupalReliableQueueInterface when creating a reliable queue, so if your class doesn't do this you will get the SystemQueue class instead.

After you have built your own custom queue class then all you need to get it working is to create a variable calls 'queue_class_' . $name, the value of which is the class name of your queue class. You can then call DrupalQueue::get() with the variable $name as a string. Setting up this variable would usually be done in an install hook or similar. For example, assuming my class is called MyCustomQueueClass then I would use the following code to get hold of it as a queue object.

1
2
3
4
5
// Set up variable (would usually be done in an install hook
variable_set('queue_class_mycustom', 'MyCustomQueueClass');
 
// Get the queue object
$queue = DrupalQueue::get('mycustom');

After we have retrieved our queue object we can then interact with it in the same way as before, implementing DrupalQueueInterface gives us the same methods to work with.

Examples Of Custom Queue Classes

I thought it might be an idea to create some custom queue classes that did different things to demonstrate how easy it is to create new queue functionality, so here they are.

Stack Class
The simplest thing we can do when creating our own queue classes is to override the default queue classes with our own functionality. The Stack class is an implementation of a stack data structure, a last in first out implementation, using the SystemQueue class as a template. The only thing we change from the SystemQueue class is within the first SQL query, which we change to be ordered by created descending, rather than ascending. This means that the last item we put into the object is the first item we get back.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class Stack extends SystemQueue {
 
  public function claimItem($lease_time = 30) {
    while (TRUE) {
      $item = db_query_range('SELECT data, item_id FROM {queue} q WHERE 
          expire = 0 AND name = :name ORDER BY created DESC', 0, 1,
              array(':name' => $this->name))->fetchObject();
      if ($item) {
        $update = db_update('queue')
          ->fields(array(
            'expire' => time() + $lease_time,
          ))
          ->condition('item_id', $item->item_id)
          ->condition('expire', 0);
        // If there are affected rows, this update succeeded.
        if ($update->execute()) {
          $item->data = unserialize($item->data);
          return $item;
        }
      }
      else {
        // No items currently available to claim.
        return FALSE;
      }
    }
  }
}

WatchdogSystemQueue
This class extends the SystemQueue class and creates a log every time anything is done. It also maintains the original SystemQueue functionality and so might be a good way of learning what is going on inside your Drupal system. You can use this class in the normal way, or you can override the queue_default_class and queue_default_reliable_class variables so that your Drupal install uses this class as a default.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
class WatchdogSystemQueue extends SystemQueue {
 
    public function __construct($name) {
        watchdog('queue', '%name queue created', array('%name' => $name));
        parent::__construct($name);
    }
 
    public function createItem($data) {
        watchdog('queue', 'Item created : %data', array('%data' => print_r($data, TRUE)));
        parent::createItem($data);
    }
 
    public function numberOfItems() {
        $count = parent::numberOfItems();
        watchdog('queue', 'Current item count = %num', array('%num' => $count));
    }
    
    public function claimItem($lease_time = 3600) {
        $return_value = parent::claimItem($lease_time);
        watchdog('queue', 'Item claimed %item (lease time = %lease)', array('%item' => print_r($return_value, TRUE), '%lease' => $lease_time));    
        return $return_value;
    }
 
    public function deleteItem($item) {
        watchdog('queue', 'Item deleted : %item', array('%item' => print_r($item, TRUE)));
        parent::deleteItem($item);
    }
    
    public function releaseItem($item) {
        watchdog('queue', 'Item released : %item', array('%item' => print_r($item, TRUE)));
        parent::releaseItem($item);
    }
    
    public function createQueue() {
        watchdog('queue', 'Queue created ');
        parent::createQueue();
    }
 
    public function deleteQueue() {
        watchdog('queue', 'Queue deleted');
        parent::deleteQueue();
    }
}

EventQueue Class
This class was required as part of the NWDUG website. What we needed was a period of grace after publishing an event before that event was syndicated out to different services (like Twitter, EventBrite, Yahoo Upcoming). What was implemented was a delay queue where any added items would only be available after 45 minutes. The class we created extends the SystemQueue class and just overrides some of the core functionality in much the same way as the Stack class does above.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class EventQueue extends SystemQueue {
 
  public function claimItem($lease_time = 30) {
    while (TRUE) {
      // Prevent items being retrieved that are less than 45 minutes (2700 seconds) old
      $item = db_query_range('SELECT data, item_id FROM {queue} q WHERE expire = 0 AND name = :name AND created >= UNIX_TIMESTAMP(DATE_ADD(NOW(), INTERVAL 2700 SECOND)) ORDER BY created ASC', 0, 1, array(':name' => $this->name))->fetchObject();
      if ($item) {
        $update = db_update('queue')
          ->fields(array(
          'expire' => time() + $lease_time,
        ))
          ->condition('item_id', $item->item_id)
          ->condition('expire', 0);
        // If there are affected rows, this update succeeded.
        if ($update->execute()) {
          $item->data = unserialize($item->data);
          return $item;
        }
      }
      else {
        // No items currently available to claim.
        return FALSE;
      }
    }
  }
}

RandomMemoryQueue
The RandomMemoryQueue is a bit of a silly class, but was created to show that it is possible to extend the MemoryQueue class in just the same way as the SystemQueue class. Items are added in order but are retrieved in random order, maintaining the lease time system of the original class.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class RandomMemoryQueue extends MemoryQueue {
 
  public function claimItem($lease_time = 30) {    
    $available_items = array();
    
    // Extract the remining available items
    foreach ($this->queue as $key => $item) {
      if ($item->expire == 0) {
        $available_items[] = $item;
      }
    }
    
    // Randomly select one (if available)
    if (count($available_items) > 0) {
      $queue_length = count($this->queue);    
      $rand_item = rand(0, $queue_length - 1);
      $item = $available_items[$rand_item];
      $item->expire = time() + $lease_time;
      return $item;
    }
 
    return FALSE;
  }
}

Creating And Destroying

The DrupalQueueInterface interface has two methods available for creating and destroying the queue. There are done when installing your module and only need to be run once. You can call them by getting hold of the queue object in the same way as usual, by using the DrupalQueue::get() method.

createQueue()
Used to create any custom queue tables or setup variables. Should be called within an install hook.

deleteQueue()
Used to drop any custom queue tables and remove any custom variables. Should be called within an uninstall hook.

These methods are not used by any of the core Drupal queue classes as everything is set up for them when Drupal is installed.

Tips

Here are a few tips that might come in handy when using the Drupal Queues API.

  • Unless you really need to rewrite the entire class it is best to extend SystemQueue or MemoryQueue.
  • For custom queue classes in open source projects try to keep the same retrieved item structure as the default system queues.
  • Instead of changing a system variable to get hold of a MemoryQueue object, use the queue_class_ parameter and set the value to be MemoryQueue.
  • At no point do the default queue classes check for the uniqueness of queue items. It is perfectly possible to add the same item over and over again if you are not careful.

Resources

Drupal Queues API

The API is quite well documented at http://api.drupal.org/api/drupal/modules--system--system.queue.inc/group/queue/7

Queue UI Module

A module has been created called Queue UI that allows you to inspect the contents of your queues.

Source Code

The actual source code of the Queues API is very well documented and can be found at /modules/system/system.queue.inc

Category: 
philipnorton42's picture

Philip Norton

Phil is the founder and administrator of #! code and is an IT professional working in the North West of the UK.
Google+ | Twitter

Comments

Thanks!

thanks anyway

Hi,

The information above really helped me out in understanding how Queues work but I was wondering if you could help me with a solution to what I am trying to do? Or any suggestions would be appreciated.

I am integrating my Drupal 7 site with an external API but I am limited to 1 API call per second. In order to avoid any errors since my site will deal with a lot of user submitted content I thought that using the Queues would allow me to control the rate in which those API calls are processed.

What I want the Queue to do is process 1 item every second and not allow more then one item a second to be processed. Is that possible? Or if I can only process the queue on Cron due to performance issues can the Queue process only 1 item from my queue every Cron run?

Basically this is what i am trying to do:

API call > Submitted to queues > API returns info > returned info activates next API call > Submitted to queues, etc.

Any help or guidance would be appreciated. Thanks!

thanks.

Thank you for a clearly written and thorough discussion of the Queues API.

Add new comment