diff --git a/modules/system/system.info b/modules/system/system.info index 8bf71536f884a2f99be3127e7edc6a37221cdf18..0afcdaf9a20e4b3fabe0273b6a7720c7f93aead5 100644 --- a/modules/system/system.info +++ b/modules/system/system.info @@ -6,6 +6,7 @@ version = VERSION core = 7.x files[] = system.module files[] = system.admin.inc +files[] = system.queue.inc files[] = image.gd.inc files[] = system.install required = TRUE diff --git a/modules/system/system.install b/modules/system/system.install index 48dfd9b0ed9e4e3da5959e3f82d4d20ebf5c2c5f..8dc92d45c0de1b3e801992e1639d76f151161dc2 100644 --- a/modules/system/system.install +++ b/modules/system/system.install @@ -1052,6 +1052,67 @@ function system_schema() { 'primary key' => array('mlid'), ); + $schema['queue'] = array( + 'description' => 'Stores items in queues.', + 'fields' => array( + 'item_id' => array( + 'type' => 'serial', + 'unsigned' => TRUE, + 'not null' => TRUE, + 'description' => 'Primary Key: Unique item ID.', + ), + 'name' => array( + 'type' => 'varchar', + 'length' => 255, + 'not null' => TRUE, + 'default' => '', + 'description' => 'The queue name.', + ), + 'consumer_id' => array( + 'type' => 'int', + 'not null' => TRUE, + 'default' => 0, + 'description' => 'The ID of the dequeuing consumer.', + ), + 'data' => array( + 'type' => 'text', + 'not null' => FALSE, + 'size' => 'big', + 'serialize' => TRUE, + 'description' => 'The arbitrary data for the item.', + ), + 'expire' => array( + 'type' => 'int', + 'not null' => TRUE, + 'default' => 0, + 'description' => 'Timestamp when the claim lease expires on the item.', + ), + 'created' => array( + 'type' => 'int', + 'not null' => TRUE, + 'default' => 0, + 'description' => 'Timestamp when the item was created.', + ), + ), + 'primary key' => array('item_id'), + 'indexes' => array( + 'consumer_queue' => array('consumer_id', 'name', 'created'), + 'consumer_expire' => array('consumer_id', 'expire'), + ), + ); + + $schema['queue_consumer_id'] = array( + 'description' => 'Stores queue consumer IDs, used to auto-increment the consumer ID so that a unique consumer ID is used.', + 'fields' => array( + 'consumer_id' => array( + 'type' => 'serial', + 'not null' => TRUE, + 'description' => 'Primary Key: Unique consumer ID used to make sure only one consumer gets one item.', + ), + ), + 'primary key' => array('consumer_id'), + ); + $schema['registry'] = array( 'description' => "Each record is a function, class, or interface name and the file it is in.", 'fields' => array( @@ -3298,6 +3359,76 @@ function system_update_7021() { return $ret; } +/** + * Add the queue tables. + */ +function system_update_7022() { + $schema['queue'] = array( + 'description' => 'Stores items in queues.', + 'fields' => array( + 'item_id' => array( + 'type' => 'serial', + 'unsigned' => TRUE, + 'not null' => TRUE, + 'description' => 'Primary Key: Unique item ID.', + ), + 'name' => array( + 'type' => 'varchar', + 'length' => 255, + 'not null' => TRUE, + 'default' => '', + 'description' => 'The queue name.', + ), + 'consumer_id' => array( + 'type' => 'int', + 'not null' => TRUE, + 'default' => 0, + 'description' => 'The ID of the dequeuing consumer.', + ), + 'data' => array( + 'type' => 'text', + 'not null' => FALSE, + 'size' => 'big', + 'serialize' => TRUE, + 'description' => 'The arbitrary data for the item.', + ), + 'expire' => array( + 'type' => 'int', + 'not null' => TRUE, + 'default' => 0, + 'description' => 'Timestamp when the claim lease expires on the item.', + ), + 'created' => array( + 'type' => 'int', + 'not null' => TRUE, + 'default' => 0, + 'description' => 'Timestamp when the item was created.', + ), + ), + 'primary key' => array('item_id'), + 'indexes' => array( + 'consumer_queue' => array('consumer_id', 'name', 'created'), + 'consumer_expire' => array('consumer_id', 'expire'), + ), + ); + + $schema['queue_consumer_id'] = array( + 'description' => 'Stores queue consumer IDs, used to auto-incrament the consumer ID so that a unique consumer ID is used.', + 'fields' => array( + 'consumer_id' => array( + 'type' => 'serial', + 'not null' => TRUE, + 'description' => 'Primary Key: Unique consumer ID used to make sure only one consumer gets one item.', + ), + ), + 'primary key' => array('consumer_id'), + ); + db_create_table($ret, 'queue', $schema['queue']); + db_create_table($ret, 'queue_consumer_id', $schema['queue_consumer_id']); + + return $ret; +} + /** * @} End of "defgroup updates-6.x-to-7.x" * The next series of updates should start at 8000. diff --git a/modules/system/system.module b/modules/system/system.module index 98d1b2f0061179c0d2c1f6d8a752254017a77ff2..861c3c51c84869eb73a168a467f9b2239d5434b7 100644 --- a/modules/system/system.module +++ b/modules/system/system.module @@ -1587,6 +1587,16 @@ function system_cron() { foreach ($cache_tables as $table) { cache_clear_all(NULL, $table); } + + // Reset expired items in the default queue implementation table. If that's + // not used, this will simply be a no-op. + db_update('queue') + ->fields(array( + 'consumer_id' => 0, + 'expire' => 0, + )) + ->condition('expire', REQUEST_TIME, '<') + ->execute(); } /** diff --git a/modules/system/system.queue.inc b/modules/system/system.queue.inc new file mode 100644 index 0000000000000000000000000000000000000000..9fbed26eb0d976cff23452f5c4a775973e1fed2a --- /dev/null +++ b/modules/system/system.queue.inc @@ -0,0 +1,255 @@ +<?php +// $Id$ + +/** + * @file + * Queue functionality. + */ + +/** + * @defgroup queue Queue operations + * @{ + * The queue system allows placing items in a queue and processing them later. + * The system tries to ensure that only one consumer can process an item. + * + * Before a queue can be used it needs to be created by + * DrupalQueueInterface::createQueue(). + * + * Items can be added to the queue by passing an arbitrary data object to + * DrupalQueueInterface::createItem(). + * + * To process an item, call DrupalQueueInterface::claimItem() and specify how + * long you want to have a lease for working on that item. When finished + * processing, the item needs to be deleted by calling + * DrupalQueueInterface::deleteItem(). If the consumer dies, the item will be + * made available again by the DrapalQueueInterface implementation once the + * lease expires. Another consumer will then be able to receive it when calling + * DrupalQueueInterface::claimItem(). + * + * The $item object used by the DrupalQueueInterface can contain arbitrary + * metadata depending on the implementation. Systems using the interface should + * only rely on the data property which will contain the information passed to + * DrupalQueueInterface::createItem(). The full queue item returned by + * DrupalQueueInterface::createItem() needs to be passed to + * DrupalQueueInterface::deleteItem() once processing is completed. + * + * While the queue system makes a best effort to preserve order in messages, + * due to the pluggable nature of the queue, there is no guarantee that items + * will be delivered on claim in the order they were sent. For example, some + * implementations like beanstalkd or others with distributed back-ends like + * Amazon SQS will be managing jobs for a large set of producers and consumers + * where a strict FIFO ordering will likely not be preserved. + * + * The system also makes no guarantees about a task only being executed once: + * callers that have non-idempotent tasks either need to live with the + * possiblity of the task being invoked multiple times in cases where a claim + * lease expires, or need to implement their own transactions to make their + * tasks idempotent. + */ + +/** + * Factory class for interacting with queues. + */ +class DrupalQueue { + /** + * Get a queue object for a given name. + * + * @param $name + * Arbitrary string. The name of the queue to work with. + * @return + * The queue object for a given name. + */ + public static function get($name) { + static $queues; + if (!isset($queues[$name])) { + $class = variable_get('queue_module_'. $name, 'System') . 'Queue'; + $queues[$name] = new $class($name); + } + return $queues[$name]; + } +} + +interface DrupalQueueInterface { + /** + * Start working with a queue. + * + * @param $name + * Arbitrary string. The name of the queue to work with. + */ + public function __construct($name); + + /** + * Add a queue item and store it directly to the queue. + * + * @param $data + * Arbitrary data to be associated with the new task in the queue. + * @return + * TRUE if the item was successfully created and was (best effort) added + * to the queue, otherwise FALSE. We don't guarantee the item was + * committed to disk, that your disk wasn't hit by a meteor, etc, but as + * far as we know, the item is now in the queue. + */ + public function createItem($data); + + /** + * Retrieve the number of items in the queue. + * + * This is intended to provide a "best guess" count of the number of items in + * the queue. Depending on the implementation and the setup, the accuracy of + * the results of this function may vary. + * + * e.g. On a busy system with a large number of consumers and items, the + * result might only be valid for a fraction of a second and not provide an + * accurate representation. + * + * @return + * An integer estimate of the number of items in the queue. + */ + public function numberOfItems(); + + /** + * Claim an item in the queue for processing. + * + * @param $lease_time + * How long the processing is expected to take in seconds, defaults to an + * hour. After this lease expires, the item will be reset and another + * consumer can claim the item. For idempotent tasks (which can be run + * multiple times without side effects), shorter lease times would result + * in lower latency in case a consumer fails. For tasks that should not be + * run more than once (non-idempotent), a larger lease time will make it + * more rare for a given task to run multiple times in cases of failure, + * at the cost of higher latency. + * @return + * On success we return an item object. If the queue is unable to claim an + * item it returns false. This implies a best effort to retrieve an item + * and either the queue is empty or there is some other non-recoverable + * problem. + */ + public function claimItem($lease_time = 3600); + + /** + * Delete a finished item from the queue. + * + * @param $item + * The item returned by claimItem(). + */ + public function deleteItem($item); + + /** + * Create a queue. + * + * Called during installation and should be used to perform any necessary + * initialization operations. This should not be confused with the + * constructor for these objects, which is called every time an object is + * instantiated to operate on a queue. This operation is only needed the + * first time a given queue is going to be initialized (for example, to make + * a new database table or directory to hold tasks for the queue -- it + * depends on the queue implementation if this is necessary at all). + */ + public function createQueue(); + + /** + * Delete a queue and every item in the queue. + */ + public function deleteQueue(); +} + +/** + * Default queue implementation. + */ +class SystemQueue implements DrupalQueueInterface { + /** + * Our internal consumer ID for this queue instance. + * + * This is created lazily when we start consuming items with claimItem(). + * + * @var integer + */ + protected $consumerId; + + /** + * The name of the queue this instance is working with. + * + * @var string + */ + protected $name; + + public function __construct($name) { + $this->name = $name; + } + + public function createItem($data) { + $record = new stdClass(); + $record->name = $this->name; + $record->data = $data; + $record->consumer_id = 0; + // We cannot rely on REQUEST_TIME because many items might be created by a + // single request which takes longer than 1 second. + $record->created = time(); + return drupal_write_record('queue', $record) !== FALSE; + } + + public function numberOfItems() { + return db_query('SELECT COUNT(item_id) FROM {queue} WHERE name = :name', array(':name' => $this->name))->fetchField(); + } + + public function claimItem($lease_time = 30) { + if (!isset($this->consumerId)) { + $this->consumerId = db_insert('queue_consumer_id') + ->useDefaults(array('consumer_id')) + ->execute(); + } + // Claim an item by updating its consumer_id and expire fields. If claim + // is not successful another thread may have claimed the item in the + // meantime. Therefore loop until an item is successfully claimed or we are + // reasonably sure there are no unclaimed items left. + while (TRUE) { + $item = db_query_range('SELECT data, item_id FROM {queue} q WHERE consumer_id = 0 AND name = :name ORDER BY created ASC', array(':name' => $this->name), 0, 1)->fetchObject(); + if ($item) { + // Try to mark the item as ours. We cannot rely on REQUEST_TIME + // because items might be claimed by a single consumer which runs + // longer than 1 second. If we continue to use REQUEST_TIME instead of + // the current time(), we steal time from the lease, and will tend to + // reset items before the lease should really expire. + $update = db_update('queue') + ->fields(array( + 'consumer_id' => $this->consumerId, + 'expire' => time() + $lease_time, + )) + ->condition('item_id', $item->item_id) + ->condition('consumer_id', 0); + // If there are affected rows, this update succeeded. + if ($update->execute()) { + $item->data = unserialize($item->data); + return $item; + } + } + else { + // No items currently available to claim. + return FALSE; + } + } + } + + public function deleteItem($item) { + db_delete('queue') + ->condition('item_id', $item->item_id) + ->execute(); + } + + public function createQueue() { + // All tasks are stored in a single database table (which is created when + // Drupal is first installed) so there is nothing we need to do to create + // a new queue. + } + + public function deleteQueue() { + db_delete('queue') + ->condition('name', $this->name) + ->execute(); + } +} + +/** + * @} End of "defgroup queue". + */ diff --git a/modules/system/system.test b/modules/system/system.test index 1e26330a90261d0bc65049a4495605ca27a24b19..60c872df083858b1aeb894444e5ecc3ecde77f4d 100644 --- a/modules/system/system.test +++ b/modules/system/system.test @@ -895,3 +895,95 @@ class SystemThemeFunctionalTest extends DrupalWebTestCase { $this->assertRaw('themes/garland', t('Site default theme used on the add content page.')); } } + + +/** + * Test the basic queue functionality. + */ +class QueueTestCase extends DrupalWebTestCase { + public static function getInfo() { + return array( + 'name' => t('Queue functionality'), + 'description' => t('Queues and dequeues a set of items to check the basic queue functionality.'), + 'group' => t('System'), + ); + } + + /** + * Queues and dequeues a set of items to check the basic queue functionality. + */ + function testQueue() { + // Create two queues. + $queue1 = DrupalQueue::get($this->randomName()); + $queue1->createQueue(); + $queue2 = DrupalQueue::get($this->randomName()); + $queue2->createQueue(); + + // Create four items. + $data = array(); + for ($i = 0; $i < 4; $i++) { + $data[] = array($this->randomName() => $this->randomName()); + } + + // Queue items 1 and 2 in the queue1. + $queue1->createItem($data[0]); + $queue1->createItem($data[1]); + + // Retrieve two items from queue1. + $items = array(); + $new_items = array(); + + $items[] = $item = $queue1->claimItem(); + $new_items[] = $item->data; + + $items[] = $item = $queue1->claimItem(); + $new_items[] = $item->data; + + // First two dequeued items should match the first two items we queued. + $this->assertEqual($this->queueScore($data, $new_items), 2, t('Two items matched')); + + // Add two more items. + $queue1->createItem($data[2]); + $queue1->createItem($data[3]); + + $this->assertTrue($queue1->numberOfItems(), t('Queue 1 is not empty after adding items.')); + $this->assertFalse($queue2->numberOfItems(), t('Queue 2 is empty while Queue 1 has items')); + + $items[] = $item = $queue1->claimItem(); + $new_items[] = $item->data; + + $items[] = $item = $queue1->claimItem(); + $new_items[] = $item->data; + + // All dequeued items should match the items we queued exactly once, + // therefore the score must be exactly 4. + $this->assertEqual($this->queueScore($data, $new_items), 4, t('Four items matched')); + + // There should be no duplicate items. + $this->assertEqual($this->queueScore($new_items, $new_items), 4, t('Four items matched')); + + // Delete all items from queue1. + foreach ($items as $item) { + $queue1->deleteItem($item); + } + + // Check that both queues are empty. + $this->assertFalse($queue1->numberOfItems(), t('Queue 1 is empty')); + $this->assertFalse($queue2->numberOfItems(), t('Queue 2 is empty')); + } + + /** + * This function returns the number of equal items in two arrays. + */ + function queueScore($items, $new_items) { + $score = 0; + foreach ($items as $item) { + foreach ($new_items as $new_item) { + if ($item === $new_item) { + $score++; + } + } + } + return $score; + } +}