diff --git a/core/lib/Drupal/Core/Database/Connection.php b/core/lib/Drupal/Core/Database/Connection.php
index 94a23fb05fcf25cd58ca82fac9dda4f2986e8bc1..196087487295983197c437a0b6a9d4d113d402de 100644
--- a/core/lib/Drupal/Core/Database/Connection.php
+++ b/core/lib/Drupal/Core/Database/Connection.php
@@ -145,12 +145,28 @@ function __construct($dsn, $username, $password, $driver_options = array()) {
     // Call PDO::__construct and PDO::setAttribute.
     parent::__construct($dsn, $username, $password, $driver_options);
 
-    // Set a specific PDOStatement class if the driver requires that.
+    // Set a Statement class, unless the driver opted out.
     if (!empty($this->statementClass)) {
       $this->setAttribute(PDO::ATTR_STATEMENT_CLASS, array($this->statementClass, array($this)));
     }
   }
 
+  /**
+   * Destroys this Connection object.
+   *
+   * PHP does not destruct an object if it is still referenced in other
+   * variables. In case of PDO database connection objects, PHP only closes the
+   * connection when the PDO object is destructed, so any references to this
+   * object may cause the number of maximum allowed connections to be exceeded.
+   */
+  public function destroy() {
+    // Destroy all references to this connection by setting them to NULL.
+    // The Statement class attribute only accepts a new value that presents a
+    // proper callable, so we reset it to PDOStatement.
+    $this->setAttribute(PDO::ATTR_STATEMENT_CLASS, array('PDOStatement', array()));
+    $this->schema = NULL;
+  }
+
   /**
    * Returns the default query options for any given query.
    *
diff --git a/core/lib/Drupal/Core/Database/Database.php b/core/lib/Drupal/Core/Database/Database.php
index 6a425de0501a9be69571fc0810c4a0d3c6a0e3da..25fe8fa6abbefe40ddbd0bac44e00f5625c898ce 100644
--- a/core/lib/Drupal/Core/Database/Database.php
+++ b/core/lib/Drupal/Core/Database/Database.php
@@ -338,8 +338,8 @@ final public static function renameConnection($old_key, $new_key) {
    */
   final public static function removeConnection($key) {
     if (isset(self::$databaseInfo[$key])) {
+      self::closeConnection(NULL, $key);
       unset(self::$databaseInfo[$key]);
-      unset(self::$connections[$key]);
       return TRUE;
     }
     else {
@@ -402,11 +402,24 @@ public static function closeConnection($target = NULL, $key = NULL) {
     if (!isset($key)) {
       $key = self::$activeKey;
     }
-    // To close the connection, we need to unset the static variable.
+    // To close a connection, it needs to be set to NULL and removed from the
+    // static variable. In all cases, closeConnection() might be called for a
+    // connection that was not opened yet, in which case the key is not defined
+    // yet and we just ensure that the connection key is undefined.
     if (isset($target)) {
+      if (isset(self::$connections[$key][$target])) {
+        self::$connections[$key][$target]->destroy();
+        self::$connections[$key][$target] = NULL;
+      }
       unset(self::$connections[$key][$target]);
     }
     else {
+      if (isset(self::$connections[$key])) {
+        foreach (self::$connections[$key] as $target => $connection) {
+          self::$connections[$key][$target]->destroy();
+          self::$connections[$key][$target] = NULL;
+        }
+      }
       unset(self::$connections[$key]);
     }
   }
diff --git a/core/lib/Drupal/Core/Database/Driver/mysql/Connection.php b/core/lib/Drupal/Core/Database/Driver/mysql/Connection.php
index fbbff42b71ad92097172ad8cc425feeb18dbdad9..ae2417619ef59628171242590ddcac624771762e 100644
--- a/core/lib/Drupal/Core/Database/Driver/mysql/Connection.php
+++ b/core/lib/Drupal/Core/Database/Driver/mysql/Connection.php
@@ -24,11 +24,11 @@
 class Connection extends DatabaseConnection {
 
   /**
-   * Flag to indicate if we have registered the nextID cleanup function.
+   * Flag to indicate if the cleanup function in __destruct() should run.
    *
    * @var boolean
    */
-  protected $shutdownRegistered = FALSE;
+  protected $needsCleanup = FALSE;
 
   public function __construct(array $connection_options = array()) {
     // This driver defaults to transaction support, except if explicitly passed FALSE.
@@ -89,6 +89,12 @@ public function __construct(array $connection_options = array()) {
     $this->exec(implode('; ', $connection_options['init_commands']));
   }
 
+  public function __destruct() {
+    if ($this->needsCleanup) {
+      $this->nextIdDelete();
+    }
+  }
+
   public function queryRange($query, $from, $count, array $args = array(), array $options = array()) {
     return $this->query($query . ' LIMIT ' . (int) $from . ', ' . (int) $count, $args, $options);
   }
@@ -126,12 +132,7 @@ public function nextId($existing_id = 0) {
       $this->query('INSERT INTO {sequences} (value) VALUES (:value) ON DUPLICATE KEY UPDATE value = value', array(':value' => $existing_id));
       $new_id = $this->query('INSERT INTO {sequences} () VALUES ()', array(), array('return' => Database::RETURN_INSERT_ID));
     }
-    if (!$this->shutdownRegistered) {
-      // Use register_shutdown_function() here to keep the database system
-      // independent of Drupal.
-      register_shutdown_function(array($this, 'nextIdDelete'));
-      $shutdownRegistered = TRUE;
-    }
+    $this->needsCleanup = TRUE;
     return $new_id;
   }
 
diff --git a/core/lib/Drupal/Core/Database/Transaction.php b/core/lib/Drupal/Core/Database/Transaction.php
index 10adadb7111f08cb2a191f2d9f48a4f3f3f5fdac..2dceb24d6814a06fea78958a26f08f2b4b1585d3 100644
--- a/core/lib/Drupal/Core/Database/Transaction.php
+++ b/core/lib/Drupal/Core/Database/Transaction.php
@@ -50,8 +50,8 @@ class Transaction {
    */
   protected $name;
 
-  public function __construct(Connection &$connection, $name = NULL) {
-    $this->connection = &$connection;
+  public function __construct(Connection $connection, $name = NULL) {
+    $this->connection = $connection;
     // If there is no transaction depth, then no transaction has started. Name
     // the transaction 'drupal_transaction'.
     if (!$depth = $connection->transactionDepth()) {
diff --git a/core/modules/system/lib/Drupal/system/Tests/Database/ConnectionUnitTest.php b/core/modules/system/lib/Drupal/system/Tests/Database/ConnectionUnitTest.php
new file mode 100644
index 0000000000000000000000000000000000000000..522a60f474759c1fbd1f37d070cb1f4183d5a0f0
--- /dev/null
+++ b/core/modules/system/lib/Drupal/system/Tests/Database/ConnectionUnitTest.php
@@ -0,0 +1,206 @@
+<?php
+
+/**
+ * @file
+ * Contains Drupal\system\Tests\Database\ConnectionUnitTest.
+ */
+
+namespace Drupal\system\Tests\Database;
+
+use Drupal\Core\Database\Database;
+use Drupal\simpletest\UnitTestBase;
+
+/**
+ * Tests management of database connections.
+ */
+class ConnectionUnitTest extends UnitTestBase {
+
+  protected $key;
+  protected $target;
+
+  protected $monitor;
+  protected $originalCount;
+
+  public static function getInfo() {
+    return array(
+      'name' => 'Connection unit tests',
+      'description' => 'Tests management of database connections.',
+      'group' => 'Database',
+    );
+  }
+
+  function setUp() {
+    parent::setUp();
+
+    $this->key = 'default';
+    $this->originalTarget = 'default';
+    $this->target = 'DatabaseConnectionUnitTest';
+
+    // Create an additional connection to monitor the connections being opened
+    // and closed in this test.
+    // @see TestBase::changeDatabasePrefix()
+    $connection_info = Database::getConnectionInfo('default');
+    Database::addConnectionInfo('default', 'monitor', $connection_info['default']);
+    global $databases;
+    $databases['default']['monitor'] = $connection_info['default'];
+    $this->monitor = Database::getConnection('monitor');
+  }
+
+  /**
+   * Adds a new database connection info to Database.
+   */
+  protected function addConnection() {
+    // Add a new target to the connection, by cloning the current connection.
+    $connection_info = Database::getConnectionInfo($this->key);
+    Database::addConnectionInfo($this->key, $this->target, $connection_info[$this->originalTarget]);
+
+    // Verify that the new target exists.
+    $info = Database::getConnectionInfo($this->key);
+    // Note: Custom assertion message to not expose database credentials.
+    $this->assertIdentical($info[$this->target], $connection_info[$this->key], 'New connection info found.');
+  }
+
+  /**
+   * Returns the connection ID of the current test connection.
+   *
+   * @return integer
+   */
+  protected function getConnectionID() {
+    return (int) Database::getConnection($this->target, $this->key)->query('SELECT CONNECTION_ID()')->fetchField();
+  }
+
+  /**
+   * Asserts that a connection ID exists.
+   *
+   * @param integer $id
+   *   The connection ID to verify.
+   */
+  protected function assertConnection($id) {
+    $list = $this->monitor->query('SHOW PROCESSLIST')->fetchAllKeyed(0, 0);
+    return $this->assertTrue(isset($list[$id]), format_string('Connection ID @id found.', array('@id' => $id)));
+  }
+
+  /**
+   * Asserts that a connection ID does not exist.
+   *
+   * @param integer $id
+   *   The connection ID to verify.
+   */
+  protected function assertNoConnection($id) {
+    $list = $this->monitor->query('SHOW PROCESSLIST')->fetchAllKeyed(0, 0);
+    return $this->assertFalse(isset($list[$id]), format_string('Connection ID @id not found.', array('@id' => $id)));
+  }
+
+  /**
+   * Tests Database::closeConnection() without query.
+   *
+   * @todo getConnectionID() executes a query.
+   */
+  function testOpenClose() {
+    // Add and open a new connection.
+    $this->addConnection();
+    $id = $this->getConnectionID();
+    Database::getConnection($this->target, $this->key);
+
+    // Verify that there is a new connection.
+    $this->assertConnection($id);
+
+    // Close the connection.
+    Database::closeConnection($this->target, $this->key);
+    // Wait 20ms to give the database engine sufficient time to react.
+    usleep(20000);
+
+    // Verify that we are back to the original connection count.
+    $this->assertNoConnection($id);
+  }
+
+  /**
+   * Tests Database::closeConnection() with a query.
+   */
+  function testOpenQueryClose() {
+    // Add and open a new connection.
+    $this->addConnection();
+    $id = $this->getConnectionID();
+    Database::getConnection($this->target, $this->key);
+
+    // Verify that there is a new connection.
+    $this->assertConnection($id);
+
+    // Execute a query.
+    Database::getConnection($this->target, $this->key)->query('SHOW TABLES');
+
+    // Close the connection.
+    Database::closeConnection($this->target, $this->key);
+    // Wait 20ms to give the database engine sufficient time to react.
+    usleep(20000);
+
+    // Verify that we are back to the original connection count.
+    $this->assertNoConnection($id);
+  }
+
+  /**
+   * Tests Database::closeConnection() with a query and custom prefetch method.
+   */
+  function testOpenQueryPrefetchClose() {
+    // Add and open a new connection.
+    $this->addConnection();
+    $id = $this->getConnectionID();
+    Database::getConnection($this->target, $this->key);
+
+    // Verify that there is a new connection.
+    $this->assertConnection($id);
+
+    // Execute a query.
+    Database::getConnection($this->target, $this->key)->query('SHOW TABLES')->fetchCol();
+
+    // Close the connection.
+    Database::closeConnection($this->target, $this->key);
+    // Wait 20ms to give the database engine sufficient time to react.
+    usleep(20000);
+
+    // Verify that we are back to the original connection count.
+    $this->assertNoConnection($id);
+  }
+
+  /**
+   * Tests Database::closeConnection() with a select query.
+   */
+  function testOpenSelectQueryClose() {
+    // Add and open a new connection.
+    $this->addConnection();
+    $id = $this->getConnectionID();
+    Database::getConnection($this->target, $this->key);
+
+    // Verify that there is a new connection.
+    $this->assertConnection($id);
+
+    // Create a table.
+    $name = 'foo';
+    Database::getConnection($this->target, $this->key)->schema()->createTable($name, array(
+      'fields' => array(
+        'name' => array(
+          'type' => 'varchar',
+          'length' => 255,
+        ),
+      ),
+    ));
+
+    // Execute a query.
+    Database::getConnection($this->target, $this->key)->select('foo', 'f')
+      ->fields('f', array('name'))
+      ->execute()
+      ->fetchAll();
+
+    // Drop the table.
+    Database::getConnection($this->target, $this->key)->schema()->dropTable($name);
+
+    // Close the connection.
+    Database::closeConnection($this->target, $this->key);
+    // Wait 20ms to give the database engine sufficient time to react.
+    usleep(20000);
+
+    // Verify that we are back to the original connection count.
+    $this->assertNoConnection($id);
+  }
+
+}