diff --git a/core/lib/Drupal/Core/Cron.php b/core/lib/Drupal/Core/Cron.php index a18bf5b5a20014137bfef3149864e7f43e625674..db09c8cfb80b0b7bf0e295de0f94eed4f9786781 100644 --- a/core/lib/Drupal/Core/Cron.php +++ b/core/lib/Drupal/Core/Cron.php @@ -8,7 +8,9 @@ use Drupal\Core\Extension\ModuleHandlerInterface; use Drupal\Core\Lock\LockBackendInterface; use Drupal\Core\Queue\QueueFactory; +use Drupal\Core\Queue\DelayableQueueInterface; use Drupal\Core\Queue\QueueWorkerManagerInterface; +use Drupal\Core\Queue\DelayedRequeueException; use Drupal\Core\Queue\RequeueException; use Drupal\Core\Queue\SuspendQueueException; use Drupal\Core\Session\AccountSwitcherInterface; @@ -180,6 +182,18 @@ protected function processQueues() { $queue_worker->processItem($item->data); $queue->deleteItem($item); } + catch (DelayedRequeueException $e) { + // The worker requested the task not be immediately re-queued. + // - If the queue doesn't support ::delayItem(), we should leave the + // item's current expiry time alone. + // - If the queue does support ::delayItem(), we should allow the + // queue to update the item's expiry using the requested delay. + if ($queue instanceof DelayableQueueInterface) { + // This queue can handle a custom delay; use the duration provided + // by the exception. + $queue->delayItem($item, $e->getDelay()); + } + } catch (RequeueException $e) { // The worker requested the task be immediately requeued. $queue->releaseItem($item); diff --git a/core/lib/Drupal/Core/Queue/DatabaseQueue.php b/core/lib/Drupal/Core/Queue/DatabaseQueue.php index 47b232b1e5c688f4216d0545c73ea405661b587e..bd4ca82c9bd649f84547c7d7a5dccac904ef8dbc 100644 --- a/core/lib/Drupal/Core/Queue/DatabaseQueue.php +++ b/core/lib/Drupal/Core/Queue/DatabaseQueue.php @@ -11,7 +11,7 @@ * * @ingroup queue */ -class DatabaseQueue implements ReliableQueueInterface, QueueGarbageCollectionInterface { +class DatabaseQueue implements ReliableQueueInterface, QueueGarbageCollectionInterface, DelayableQueueInterface { use DependencySerializationTrait; @@ -89,7 +89,7 @@ protected function doCreateItem($data) { 'data' => serialize($data), // We cannot rely on REQUEST_TIME because many items might be created // by a single request which takes longer than 1 second. - 'created' => time(), + 'created' => \Drupal::time()->getCurrentTime(), ]); // Return the new serial ID, or FALSE on failure. return $query->execute(); @@ -140,7 +140,7 @@ public function claimItem($lease_time = 30) { // should really expire. $update = $this->connection->update(static::TABLE_NAME) ->fields([ - 'expire' => time() + $lease_time, + 'expire' => \Drupal::time()->getCurrentTime() + $lease_time, ]) ->condition('item_id', $item->item_id) ->condition('expire', 0); @@ -171,6 +171,33 @@ public function releaseItem($item) { } } + /** + * {@inheritdoc} + */ + public function delayItem($item, int $delay) { + // Only allow a positive delay interval. + if ($delay < 0) { + throw new \InvalidArgumentException('$delay must be non-negative'); + } + + try { + // Add the delay relative to the current time. + $expire = \Drupal::time()->getCurrentTime() + $delay; + // Update the expiry time of this item. + $update = $this->connection->update(static::TABLE_NAME) + ->fields([ + 'expire' => $expire, + ]) + ->condition('item_id', $item->item_id); + return $update->execute(); + } + catch (\Exception $e) { + $this->catchException($e); + // If the table doesn't exist we should consider the item nonexistent. + return TRUE; + } + } + /** * {@inheritdoc} */ diff --git a/core/lib/Drupal/Core/Queue/DelayableQueueInterface.php b/core/lib/Drupal/Core/Queue/DelayableQueueInterface.php new file mode 100644 index 0000000000000000000000000000000000000000..0240a47fb26ba72a0fd28f871c272a00a6abf62e --- /dev/null +++ b/core/lib/Drupal/Core/Queue/DelayableQueueInterface.php @@ -0,0 +1,34 @@ +<?php + +namespace Drupal\Core\Queue; + +/** + * Delayable queue interface. + * + * Classes implementing this interface allow an item to be released on a delay. + * + * @ingroup queue + */ +interface DelayableQueueInterface extends QueueInterface { + + /** + * Delay an item so it runs in the future. + * + * @param object $item + * The item returned by \Drupal\Core\Queue\QueueInterface::claimItem(). + * @param int $delay + * A delay before the item's lock should expire (in seconds). Relative to + * the current time, not the item's current expiry. + * + * @throws \InvalidArgumentException + * When a negative $delay is provided; $delay must be non-negative. + * + * @see \Drupal\Core\Queue\QueueInterface::releaseItem() + * To immediately release an item without delay. + * + * @return bool + * TRUE if the item has been updated, FALSE otherwise. + */ + public function delayItem($item, int $delay); + +} diff --git a/core/lib/Drupal/Core/Queue/DelayedRequeueException.php b/core/lib/Drupal/Core/Queue/DelayedRequeueException.php new file mode 100644 index 0000000000000000000000000000000000000000..2f402fedb5195c3dc8ba47d8bfc7a69adb083ddf --- /dev/null +++ b/core/lib/Drupal/Core/Queue/DelayedRequeueException.php @@ -0,0 +1,53 @@ +<?php + +namespace Drupal\Core\Queue; + +/** + * Throw this exception to leave an item in the queue until its lock expires. + * + * @see \Drupal\Core\Cron::processQueues() + * For more information about how this exception interacts with Drupal's queue + * processing via the built-in cron service. + * @see \Drupal\Core\Queue\DelayableQueueInterface + * Queues must implement this interface to support custom delay intervals; if + * this interface is missing, any custom delay interval specified for this + * exception will be ignored and the remaining time in the original lease will + * be used as the duration of the delay interval. + * @see \Drupal\Core\Queue\RequeueException + * For use when an item needs to be requeued immediately. + */ +class DelayedRequeueException extends \RuntimeException { + + /** + * The interval of time that the item should remain locked (in seconds). + * + * @var int + */ + protected $delay = 0; + + /** + * Constructs a DelayedRequeueException. + * + * @param int $delay + * The desired delay interval for this item. + */ + public function __construct(int $delay = 0) { + if ($delay > 0) { + $this->delay = $delay; + } + } + + /** + * Get the desired delay interval for this item. + * + * @see self::$delay + * For recommended value usage in a queue processor. + * + * @return int + * The desired delay interval for this item. + */ + public function getDelay(): int { + return $this->delay; + } + +} diff --git a/core/lib/Drupal/Core/Queue/Memory.php b/core/lib/Drupal/Core/Queue/Memory.php index 2e4ac0f1c97f3fcf125d8ba5a3a5bfc027d8bd50..ba9d2494dbe7d9d6526a9b7f9d040d195a211e8e 100644 --- a/core/lib/Drupal/Core/Queue/Memory.php +++ b/core/lib/Drupal/Core/Queue/Memory.php @@ -12,6 +12,7 @@ * @ingroup queue */ class Memory implements QueueInterface { + /** * The queue data. * @@ -44,7 +45,7 @@ public function createItem($data) { $item = new \stdClass(); $item->item_id = $this->idSequence++; $item->data = $data; - $item->created = time(); + $item->created = \Drupal::time()->getCurrentTime(); $item->expire = 0; $this->queue[$item->item_id] = $item; return $item->item_id; @@ -63,7 +64,7 @@ public function numberOfItems() { public function claimItem($lease_time = 30) { foreach ($this->queue as $key => $item) { if ($item->expire == 0) { - $item->expire = time() + $lease_time; + $item->expire = \Drupal::time()->getCurrentTime() + $lease_time; $this->queue[$key] = $item; return $item; } diff --git a/core/misc/cspell/dictionary.txt b/core/misc/cspell/dictionary.txt index cc9377e6e471bed9a7d31d62a5085f61cf23a963..e736c742192e2c73aa12e60b12ff6529d14fce3d 100644 --- a/core/misc/cspell/dictionary.txt +++ b/core/misc/cspell/dictionary.txt @@ -410,6 +410,7 @@ deduplicates defalt defaultable defgroup +delayable deletable deletedline deletee diff --git a/core/modules/system/tests/modules/cron_queue_test/src/Plugin/QueueWorker/CronQueueTestDatabaseDelayException.php b/core/modules/system/tests/modules/cron_queue_test/src/Plugin/QueueWorker/CronQueueTestDatabaseDelayException.php new file mode 100644 index 0000000000000000000000000000000000000000..93e4a1e2cb4b0381f387dc2b8ac20ac7397c8819 --- /dev/null +++ b/core/modules/system/tests/modules/cron_queue_test/src/Plugin/QueueWorker/CronQueueTestDatabaseDelayException.php @@ -0,0 +1,28 @@ +<?php + +namespace Drupal\cron_queue_test\Plugin\QueueWorker; + +use Drupal\Core\Queue\DelayedRequeueException; +use Drupal\Core\Queue\QueueWorkerBase; + +/** + * A queue worker for testing cron exception handling. + * + * @QueueWorker( + * id = "cron_queue_test_database_delay_exception", + * title = @Translation("Database delay exception test"), + * cron = {"time" = 1} + * ) + */ +class CronQueueTestDatabaseDelayException extends QueueWorkerBase { + + const DELAY_INTERVAL = 100; + + /** + * {@inheritdoc} + */ + public function processItem($data) { + throw new DelayedRequeueException(self::DELAY_INTERVAL); + } + +} diff --git a/core/modules/system/tests/modules/cron_queue_test/src/Plugin/QueueWorker/CronQueueTestMemoryDelayException.php b/core/modules/system/tests/modules/cron_queue_test/src/Plugin/QueueWorker/CronQueueTestMemoryDelayException.php new file mode 100644 index 0000000000000000000000000000000000000000..6548868608d86a7eb5f8af94e453317710104878 --- /dev/null +++ b/core/modules/system/tests/modules/cron_queue_test/src/Plugin/QueueWorker/CronQueueTestMemoryDelayException.php @@ -0,0 +1,28 @@ +<?php + +namespace Drupal\cron_queue_test\Plugin\QueueWorker; + +use Drupal\Core\Queue\DelayedRequeueException; +use Drupal\Core\Queue\QueueWorkerBase; + +/** + * A queue worker for testing cron exception handling. + * + * @QueueWorker( + * id = "cron_queue_test_memory_delay_exception", + * title = @Translation("Memory delay exception test"), + * cron = {"time" = 1} + * ) + */ +class CronQueueTestMemoryDelayException extends QueueWorkerBase { + + /** + * {@inheritdoc} + */ + public function processItem($data) { + // Set the delay to something larger than the original lease. + $cron_time = $this->pluginDefinition['cron']['time']; + throw new DelayedRequeueException($cron_time + 100); + } + +} diff --git a/core/modules/system/tests/src/Kernel/System/CronQueueTest.php b/core/modules/system/tests/src/Kernel/System/CronQueueTest.php index 0a0475f1bbc5f9cefe0c68d24273980b81bed250..667423ec0d8ca8f76c47ca1b4a3f4c4852665bd9 100644 --- a/core/modules/system/tests/src/Kernel/System/CronQueueTest.php +++ b/core/modules/system/tests/src/Kernel/System/CronQueueTest.php @@ -3,7 +3,11 @@ namespace Drupal\Tests\system\Kernel\System; use Drupal\Core\Database\Database; +use Drupal\Core\Queue\DatabaseQueue; +use Drupal\Core\Queue\Memory; use Drupal\KernelTests\KernelTestBase; +use Drupal\cron_queue_test\Plugin\QueueWorker\CronQueueTestDatabaseDelayException; +use Prophecy\Argument; /** * Tests the Cron Queue runner. @@ -33,6 +37,15 @@ class CronQueueTest extends KernelTestBase { */ protected $cron; + /** + * The fake current time used for queue worker / cron testing purposes. + * + * This value should be greater than or equal to zero. + * + * @var int + */ + protected $currentTime = 1000; + /** * {@inheritdoc} */ @@ -41,6 +54,70 @@ protected function setUp(): void { $this->connection = Database::getConnection(); $this->cron = \Drupal::service('cron'); + + $time = $this->prophesize('Drupal\Component\Datetime\TimeInterface'); + $time->getCurrentTime()->willReturn($this->currentTime); + $time->getRequestTime()->willReturn($this->currentTime); + \Drupal::getContainer()->set('datetime.time', $time->reveal()); + $this->assertEquals($this->currentTime, \Drupal::time()->getCurrentTime()); + $this->assertEquals($this->currentTime, \Drupal::time()->getRequestTime()); + + $realQueueFactory = $this->container->get('queue'); + $queue_factory = $this->prophesize(get_class($realQueueFactory)); + $database = new DatabaseQueue('cron_queue_test_database_delay_exception', $this->connection); + $memory = new Memory('cron_queue_test_memory_delay_exception'); + $queue_factory->get('cron_queue_test_database_delay_exception', Argument::cetera())->willReturn($database); + $queue_factory->get('cron_queue_test_memory_delay_exception', Argument::cetera())->willReturn($memory); + $queue_factory->get(Argument::any(), Argument::cetera())->will(function ($args) use ($realQueueFactory) { + return $realQueueFactory->get($args[0], $args[1] ?? FALSE); + }); + + $this->container->set('queue', $queue_factory->reveal()); + } + + /** + * Tests that DelayedRequeueException behaves as expected when running cron. + */ + public function testDelayException() { + $database = $this->container->get('queue')->get('cron_queue_test_database_delay_exception'); + $memory = $this->container->get('queue')->get('cron_queue_test_memory_delay_exception'); + + // Ensure that the queues are of the correct type for this test. + $this->assertInstanceOf('Drupal\Core\Queue\DelayableQueueInterface', $database); + $this->assertNotInstanceOf('Drupal\Core\Queue\DelayableQueueInterface', $memory); + + // Get the queue worker plugin manager. + $manager = $this->container->get('plugin.manager.queue_worker'); + $definitions = $manager->getDefinitions(); + $this->assertNotEmpty($database_lease_time = $definitions['cron_queue_test_database_delay_exception']['cron']['time']); + $this->assertNotEmpty($memory_lease_time = $definitions['cron_queue_test_memory_delay_exception']['cron']['time']); + + // Create the necessary test data and run cron. + $database->createItem('test'); + $memory->createItem('test'); + $this->cron->run(); + + // Fetch the expiry time for the database queue. + $query = $this->connection->select('queue'); + $query->condition('name', 'cron_queue_test_database_delay_exception'); + $query->addField('queue', 'expire'); + $query->range(0, 1); + $expire = $query->execute()->fetchField(); + + // Assert that the delay interval is greater than the lease interval. This + // allows us to assume that (if updated) the new expiry time will be greater + // than the initial expiry time. We can then also assume that the new expiry + // time offset will be identical to the delay interval. + $this->assertGreaterThan($database_lease_time, CronQueueTestDatabaseDelayException::DELAY_INTERVAL); + $this->assertGreaterThan($this->currentTime + $database_lease_time, $expire); + $this->assertEquals(CronQueueTestDatabaseDelayException::DELAY_INTERVAL, $expire - $this->currentTime); + + // Ensure that the memory queue expiry time is unchanged after the + // DelayedRequeueException has been thrown. + $property = (new \ReflectionClass($memory))->getProperty('queue'); + $property->setAccessible(TRUE); + $memory_queue_internal = $property->getValue($memory); + $this->assertEquals($this->currentTime + $memory_lease_time, reset($memory_queue_internal)->expire); } /** diff --git a/core/tests/Drupal/Tests/Core/CronTest.php b/core/tests/Drupal/Tests/Core/CronTest.php new file mode 100644 index 0000000000000000000000000000000000000000..c414c86bed7010cb4c386534627035a830991a16 --- /dev/null +++ b/core/tests/Drupal/Tests/Core/CronTest.php @@ -0,0 +1,213 @@ +<?php + +namespace Drupal\Tests\Core; + +use Drupal\Core\Cron; +use Drupal\Core\KeyValueStore\KeyValueMemoryFactory; +use Drupal\Core\Queue\DelayedRequeueException; +use Drupal\Core\Queue\Memory; +use Drupal\Core\Queue\RequeueException; +use Drupal\Core\Queue\SuspendQueueException; +use Drupal\Core\State\State; +use Drupal\Tests\UnitTestCase; +use Prophecy\Argument; +use Prophecy\Argument\ArgumentsWildcard; +use Psr\Log\LoggerInterface; +use Symfony\Component\DependencyInjection\ContainerBuilder; + +/** + * Tests the Cron class. + * + * @group Cron + * @coversDefaultClass \Drupal\Core\Cron + */ +class CronTest extends UnitTestCase { + + const REQUEUE_COUNT = 3; + + /** + * Define the duration of each item claim for this test. + * + * @var int + */ + protected $claimTime = 300; + + /** + * An instance of the Cron class for testing. + * + * @var \Drupal\Core\Cron + */ + protected $cron; + + /** + * The queue used to store test work items. + * + * @var \Drupal\Core\Queue\QueueInterface + */ + protected $queue; + + /** + * The current state of the test in memory. + * + * @var \Drupal\Core\State\State + */ + protected $state; + + /** + * {@inheritdoc} + */ + protected function setUp(): void { + parent::setUp(); + + // @todo Remove in https://www.drupal.org/project/drupal/issues/2932518 + // + // This line is currently needed so that watchdog_exception() is available + // when unit testing Drupal\Core\Cron and can safely be removed once that + // class no longer refers to it. + require_once $this->root . '/core/includes/bootstrap.inc'; + + // Construct a state object used for testing logger assertions. + $this->state = new State(new KeyValueMemoryFactory()); + + // Create a mock logger to set a flag in the resulting state. + $logger = $this->prophesize('Drupal\Core\Logger\LoggerChannelInterface'); + // Safely ignore the cron re-run message when failing to acquire a lock. + // + // We don't need to run regular cron tasks, and we're still implicitly + // testing that queues are being processed. + // + // This argument will need to be updated to match the message text in + // Drupal\Core\Cron::run() should the original text ever be updated. + $logger->warning(Argument::exact('Attempting to re-run cron while it is already running.'))->shouldBeCalled(); + // Set a flag to track when a message is logged by adding a callback + // function for each logging method. + foreach (get_class_methods(LoggerInterface::class) as $logger_method) { + $logger->{$logger_method}(Argument::cetera())->will(function () { + \Drupal::state()->set('cron_test.message_logged', TRUE); + }); + } + + // Create a logger factory to produce the resulting logger. + $logger_factory = $this->prophesize('Drupal\Core\Logger\LoggerChannelFactoryInterface'); + $logger_factory->get(Argument::exact('cron'))->willReturn($logger->reveal()); + + // Create a mock time service. + $time = $this->prophesize('Drupal\Component\Datetime\TimeInterface'); + + // Build the container using the resulting mock objects. + \Drupal::setContainer(new ContainerBuilder()); + \Drupal::getContainer()->set('logger.factory', $logger_factory->reveal()); + \Drupal::getContainer()->set('datetime.time', $time->reveal()); + \Drupal::getContainer()->set('state', $this->state); + + // Create mock objects for constructing the Cron class. + $module_handler = $this->prophesize('Drupal\Core\Extension\ModuleHandlerInterface'); + $queue_factory = $this->prophesize('Drupal\Core\Queue\QueueFactory'); + $queue_worker_manager = $this->prophesize('Drupal\Core\Queue\QueueWorkerManagerInterface'); + $state = $this->prophesize('Drupal\Core\State\StateInterface'); + $account_switcher = $this->prophesize('Drupal\Core\Session\AccountSwitcherInterface'); + + // Create a lock that will always fail when attempting to acquire; we're + // only interested in testing ::processQueues(), not the other stuff. + $lock_backend = $this->prophesize('Drupal\Core\Lock\LockBackendInterface'); + $lock_backend->acquire(Argument::exact('cron'), Argument::cetera())->willReturn(FALSE); + + // Create a queue worker definition for testing purposes. + $queue_worker = $this->randomMachineName(); + $queue_worker_definition = [ + 'id' => $queue_worker, + 'cron' => [ + 'time' => &$this->claimTime, + ], + ]; + + // Create a queue instance for this queue worker. + $this->queue = new Memory($queue_worker); + $queue_factory->get($queue_worker)->willReturn($this->queue); + + // Create a mock queue worker plugin instance based on above definition. + $queue_worker_plugin = $this->prophesize('Drupal\Core\Queue\QueueWorkerInterface'); + $queue_worker_plugin->processItem('Complete')->willReturn(); + $queue_worker_plugin->processItem('Exception')->willThrow(\Exception::class); + $queue_worker_plugin->processItem('DelayedRequeueException')->willThrow(DelayedRequeueException::class); + $queue_worker_plugin->processItem('SuspendQueueException')->willThrow(SuspendQueueException::class); + // 'RequeueException' would normally result in an infinite loop. + // + // This is avoided by throwing RequeueException for the first few calls to + // ::processItem() and then returning void. ::testRequeueException() + // establishes sanity assertions for this case. + $queue_worker_plugin->processItem('RequeueException')->will(function ($args, $mock, $method) { + // Fetch the number of calls to this prophesied method. This value will + // start at zero during the first call. + $method_calls = count($mock->findProphecyMethodCalls($method->getMethodName(), new ArgumentsWildcard($args))); + + // Throw the expected exception on the first few calls. + if ($method_calls < self::REQUEUE_COUNT) { + \Drupal::state()->set('cron_test.requeue_count', $method_calls + 1); + throw new RequeueException(); + } + }); + + // Set the mock queue worker manager to return the definition/plugin. + $queue_worker_manager->getDefinitions()->willReturn([$queue_worker => $queue_worker_definition]); + $queue_worker_manager->createInstance($queue_worker)->willReturn($queue_worker_plugin->reveal()); + + // Construct the Cron class to test. + $this->cron = new Cron($module_handler->reveal(), $lock_backend->reveal(), $queue_factory->reveal(), $state->reveal(), $account_switcher->reveal(), $logger->reveal(), $queue_worker_manager->reveal(), $time->reveal()); + } + + /** + * Resets the testing state. + */ + protected function resetTestingState() { + $this->queue->deleteQueue(); + $this->state->set('cron_test.message_logged', FALSE); + $this->state->set('cron_test.requeue_count', NULL); + } + + /** + * Data provider for ::testProcessQueues() method. + */ + public function processQueuesTestData() { + return [ + ['Complete', 'assertFalse', 0], + ['Exception', 'assertTrue', 1], + ['DelayedRequeueException', 'assertFalse', 1], + ['SuspendQueueException', 'assertTrue', 1], + ['RequeueException', 'assertFalse', 0], + ]; + } + + /** + * Tests the ::processQueues() method. + * + * @covers ::processQueues + * @dataProvider processQueuesTestData + */ + public function testProcessQueues($item, $message_logged_assertion, $count_post_run) { + $this->resetTestingState(); + $this->queue->createItem($item); + $this->assertFalse($this->state->get('cron_test.message_logged')); + $this->assertEquals(1, $this->queue->numberOfItems()); + $this->cron->run(); + $this->{$message_logged_assertion}($this->state->get('cron_test.message_logged')); + $this->assertEquals($count_post_run, $this->queue->numberOfItems()); + } + + /** + * Verify that RequeueException causes an item to be processed multiple times. + */ + public function testRequeueException() { + $this->resetTestingState(); + $this->queue->createItem('RequeueException'); + $this->cron->run(); + + // Fetch the number of times this item was requeued. + $actual_requeue_count = $this->state->get('cron_test.requeue_count'); + // Make sure the item was requeued at least once. + $this->assertIsInt($actual_requeue_count); + // Ensure that the actual requeue count matches the expected value. + $this->assertEquals(self::REQUEUE_COUNT, $actual_requeue_count); + } + +}