SQLAzureFederationsSynchronizer.php 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296
  1. <?php
  2. /*
  3. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  4. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  5. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  6. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  7. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  8. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  9. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  10. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  11. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  12. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  13. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  14. *
  15. * This software consists of voluntary contributions made by many individuals
  16. * and is licensed under the MIT license. For more information, see
  17. * <http://www.doctrine-project.org>.
  18. */
  19. namespace Doctrine\DBAL\Sharding\SQLAzure;
  20. use Doctrine\DBAL\Schema\Schema;
  21. use Doctrine\DBAL\Connection;
  22. use Doctrine\DBAL\Types\Type;
  23. use Doctrine\DBAL\Schema\Synchronizer\AbstractSchemaSynchronizer;
  24. use Doctrine\DBAL\Sharding\SingleDatabaseSynchronizer;
  25. /**
  26. * SQL Azure Schema Synchronizer
  27. *
  28. * Will iterate over all shards when performing schema operations. This is done
  29. * by partitioning the passed schema into subschemas for the federation and the
  30. * global database and then applying the operations step by step using the
  31. * {@see \Doctrine\DBAL\Sharding\SingleDatabaseSynchronizer}.
  32. *
  33. * @author Benjamin Eberlei <kontakt@beberlei.de>
  34. */
  35. class SQLAzureFederationsSynchronizer extends AbstractSchemaSynchronizer
  36. {
  37. const FEDERATION_TABLE_FEDERATED = 'azure.federated';
  38. const FEDERATION_DISTRIBUTION_NAME = 'azure.federatedOnDistributionName';
  39. /**
  40. * @var SQLAzureShardManager
  41. */
  42. private $shardManager;
  43. /**
  44. * @var SchemaSynchronizer
  45. */
  46. private $synchronizer;
  47. public function __construct(Connection $conn, SQLAzureShardManager $shardManager, SchemaSynchronizer $sync = null)
  48. {
  49. parent::__construct($conn);
  50. $this->shardManager = $shardManager;
  51. $this->synchronizer = $sync ?: new SingleDatabaseSynchronizer($conn);
  52. }
  53. /**
  54. * Get the SQL statements that can be executed to create the schema.
  55. *
  56. * @param Schema $createSchema
  57. * @return array
  58. */
  59. public function getCreateSchema(Schema $createSchema)
  60. {
  61. $sql = array();
  62. list($global, $federation) = $this->partitionSchema($createSchema);
  63. $globalSql = $this->synchronizer->getCreateSchema($global);
  64. if ($globalSql) {
  65. $sql[] = "-- Create Root Federation\n" .
  66. "USE FEDERATION ROOT WITH RESET;";
  67. $sql = array_merge($sql, $globalSql);
  68. }
  69. $federationSql = $this->synchronizer->getCreateSchema($federation);
  70. if ($federationSql) {
  71. $defaultValue = $this->getFederationTypeDefaultValue();
  72. $sql[] = $this->getCreateFederationStatement();
  73. $sql[] = "USE FEDERATION " . $this->shardManager->getFederationName() . " (" . $this->shardManager->getDistributionKey() . " = " . $defaultValue . ") WITH RESET, FILTERING = OFF;";
  74. $sql = array_merge($sql, $federationSql);
  75. }
  76. return $sql;
  77. }
  78. /**
  79. * Get the SQL Statements to update given schema with the underlying db.
  80. *
  81. * @param Schema $toSchema
  82. * @param bool $noDrops
  83. * @return array
  84. */
  85. public function getUpdateSchema(Schema $toSchema, $noDrops = false)
  86. {
  87. return $this->work($toSchema, function($synchronizer, $schema) use ($noDrops) {
  88. return $synchronizer->getUpdateSchema($schema, $noDrops);
  89. });
  90. }
  91. /**
  92. * Get the SQL Statements to drop the given schema from underlying db.
  93. *
  94. * @param Schema $dropSchema
  95. * @return array
  96. */
  97. public function getDropSchema(Schema $dropSchema)
  98. {
  99. return $this->work($dropSchema, function($synchronizer, $schema) {
  100. return $synchronizer->getDropSchema($schema);
  101. });
  102. }
  103. /**
  104. * Create the Schema
  105. *
  106. * @param Schema $createSchema
  107. * @return void
  108. */
  109. public function createSchema(Schema $createSchema)
  110. {
  111. $this->processSql($this->getCreateSchema($createSchema));
  112. }
  113. /**
  114. * Update the Schema to new schema version.
  115. *
  116. * @param Schema $toSchema
  117. * @return void
  118. */
  119. public function updateSchema(Schema $toSchema, $noDrops = false)
  120. {
  121. $this->processSql($this->getUpdateSchema($toSchema, $noDrops));
  122. }
  123. /**
  124. * Drop the given database schema from the underlying db.
  125. *
  126. * @param Schema $dropSchema
  127. * @return void
  128. */
  129. public function dropSchema(Schema $dropSchema)
  130. {
  131. $this->processSqlSafely($this->getDropSchema($dropSchema));
  132. }
  133. /**
  134. * Get the SQL statements to drop all schema assets from underlying db.
  135. *
  136. * @return array
  137. */
  138. public function getDropAllSchema()
  139. {
  140. $this->shardManager->selectGlobal();
  141. $globalSql = $this->synchronizer->getDropAllSchema();
  142. if ($globalSql) {
  143. $sql[] = "-- Work on Root Federation\nUSE FEDERATION ROOT WITH RESET;";
  144. $sql = array_merge($sql, $globalSql);
  145. }
  146. $shards = $this->shardManager->getShards();
  147. foreach ($shards as $shard) {
  148. $this->shardManager->selectShard($shard['rangeLow']);
  149. $federationSql = $this->synchronizer->getDropAllSchema();
  150. if ($federationSql) {
  151. $sql[] = "-- Work on Federation ID " . $shard['id'] . "\n" .
  152. "USE FEDERATION " . $this->shardManager->getFederationName() . " (" . $this->shardManager->getDistributionKey() . " = " . $shard['rangeLow'].") WITH RESET, FILTERING = OFF;";
  153. $sql = array_merge($sql, $federationSql);
  154. }
  155. }
  156. $sql[] = "USE FEDERATION ROOT WITH RESET;";
  157. $sql[] = "DROP FEDERATION " . $this->shardManager->getFederationName();
  158. return $sql;
  159. }
  160. /**
  161. * Drop all assets from the underyling db.
  162. *
  163. * @return void
  164. */
  165. public function dropAllSchema()
  166. {
  167. $this->processSqlSafely($this->getDropAllSchema());
  168. }
  169. private function partitionSchema(Schema $schema)
  170. {
  171. return array(
  172. $this->extractSchemaFederation($schema, false),
  173. $this->extractSchemaFederation($schema, true),
  174. );
  175. }
  176. private function extractSchemaFederation(Schema $schema, $isFederation)
  177. {
  178. $partionedSchema = clone $schema;
  179. foreach ($partionedSchema->getTables() as $table) {
  180. if ($isFederation) {
  181. $table->addOption(self::FEDERATION_DISTRIBUTION_NAME, $this->shardManager->getDistributionKey());
  182. }
  183. if ( $table->hasOption(self::FEDERATION_TABLE_FEDERATED) !== $isFederation) {
  184. $partionedSchema->dropTable($table->getName());
  185. } else {
  186. foreach ($table->getForeignKeys() as $fk) {
  187. $foreignTable = $schema->getTable($fk->getForeignTableName());
  188. if ($foreignTable->hasOption(self::FEDERATION_TABLE_FEDERATED) !== $isFederation) {
  189. throw new \RuntimeException("Cannot have foreign key between global/federation.");
  190. }
  191. }
  192. }
  193. }
  194. return $partionedSchema;
  195. }
  196. /**
  197. * Work on the Global/Federation based on currently existing shards and
  198. * perform the given operation on the underyling schema synchronizer given
  199. * the different partioned schema instances.
  200. *
  201. * @param Schema $schema
  202. * @param Closure $operation
  203. * @return array
  204. */
  205. private function work(Schema $schema, \Closure $operation)
  206. {
  207. list($global, $federation) = $this->partitionSchema($schema);
  208. $sql = array();
  209. $this->shardManager->selectGlobal();
  210. $globalSql = $operation($this->synchronizer, $global);
  211. if ($globalSql) {
  212. $sql[] = "-- Work on Root Federation\nUSE FEDERATION ROOT WITH RESET;";
  213. $sql = array_merge($sql, $globalSql);
  214. }
  215. $shards = $this->shardManager->getShards();
  216. foreach ($shards as $shard) {
  217. $this->shardManager->selectShard($shard['rangeLow']);
  218. $federationSql = $operation($this->synchronizer, $federation);
  219. if ($federationSql) {
  220. $sql[] = "-- Work on Federation ID " . $shard['id'] . "\n" .
  221. "USE FEDERATION " . $this->shardManager->getFederationName() . " (" . $this->shardManager->getDistributionKey() . " = " . $shard['rangeLow'].") WITH RESET, FILTERING = OFF;";
  222. $sql = array_merge($sql, $federationSql);
  223. }
  224. }
  225. return $sql;
  226. }
  227. private function getFederationTypeDefaultValue()
  228. {
  229. $federationType = Type::getType($this->shardManager->getDistributionType());
  230. switch ($federationType->getName()) {
  231. case Type::GUID:
  232. $defaultValue = '00000000-0000-0000-0000-000000000000';
  233. break;
  234. case Type::INTEGER:
  235. case Type::SMALLINT:
  236. case Type::BIGINT:
  237. $defaultValue = '0';
  238. break;
  239. default:
  240. $defaultValue = '';
  241. break;
  242. }
  243. return $defaultValue;
  244. }
  245. private function getCreateFederationStatement()
  246. {
  247. $federationType = Type::getType($this->shardManager->getDistributionType());
  248. $federationTypeSql = $federationType->getSqlDeclaration(array(), $this->conn->getDatabasePlatform());
  249. return "--Create Federation\n" .
  250. "CREATE FEDERATION " . $this->shardManager->getFederationName() . " (" . $this->shardManager->getDistributionKey() . " " . $federationTypeSql ." RANGE)";
  251. }
  252. }