Initial commit: Grav CMS setup with HTML reference material
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,574 @@
|
||||
<?php
|
||||
|
||||
/*
|
||||
* This file is part of the Symfony package.
|
||||
*
|
||||
* (c) Fabien Potencier <fabien@symfony.com>
|
||||
*
|
||||
* For the full copyright and license information, please view the LICENSE
|
||||
* file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
namespace Symfony\Component\Messenger\Bridge\Doctrine\Transport;
|
||||
|
||||
use Doctrine\DBAL\Connection as DBALConnection;
|
||||
use Doctrine\DBAL\Driver\Exception as DriverException;
|
||||
use Doctrine\DBAL\Driver\Result as DriverResult;
|
||||
use Doctrine\DBAL\Exception as DBALException;
|
||||
use Doctrine\DBAL\Exception\TableNotFoundException;
|
||||
use Doctrine\DBAL\LockMode;
|
||||
use Doctrine\DBAL\Platforms\MySQLPlatform;
|
||||
use Doctrine\DBAL\Platforms\OraclePlatform;
|
||||
use Doctrine\DBAL\Query\QueryBuilder;
|
||||
use Doctrine\DBAL\Result;
|
||||
use Doctrine\DBAL\Schema\AbstractAsset;
|
||||
use Doctrine\DBAL\Schema\AbstractSchemaManager;
|
||||
use Doctrine\DBAL\Schema\Comparator;
|
||||
use Doctrine\DBAL\Schema\Schema;
|
||||
use Doctrine\DBAL\Schema\SchemaDiff;
|
||||
use Doctrine\DBAL\Schema\Synchronizer\SchemaSynchronizer;
|
||||
use Doctrine\DBAL\Schema\Table;
|
||||
use Doctrine\DBAL\Types\Types;
|
||||
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
|
||||
use Symfony\Component\Messenger\Exception\TransportException;
|
||||
use Symfony\Contracts\Service\ResetInterface;
|
||||
|
||||
/**
|
||||
* @internal since Symfony 5.1
|
||||
*
|
||||
* @author Vincent Touzet <vincent.touzet@gmail.com>
|
||||
* @author Kévin Dunglas <dunglas@gmail.com>
|
||||
*/
|
||||
class Connection implements ResetInterface
|
||||
{
|
||||
protected const TABLE_OPTION_NAME = '_symfony_messenger_table_name';
|
||||
|
||||
protected const DEFAULT_OPTIONS = [
|
||||
'table_name' => 'messenger_messages',
|
||||
'queue_name' => 'default',
|
||||
'redeliver_timeout' => 3600,
|
||||
'auto_setup' => true,
|
||||
];
|
||||
|
||||
/**
|
||||
* Configuration of the connection.
|
||||
*
|
||||
* Available options:
|
||||
*
|
||||
* * table_name: name of the table
|
||||
* * connection: name of the Doctrine's entity manager
|
||||
* * queue_name: name of the queue
|
||||
* * redeliver_timeout: Timeout before redeliver messages still in handling state (i.e: delivered_at is not null and message is still in table). Default: 3600
|
||||
* * auto_setup: Whether the table should be created automatically during send / get. Default: true
|
||||
*/
|
||||
protected $configuration = [];
|
||||
protected $driverConnection;
|
||||
protected $queueEmptiedAt;
|
||||
private $schemaSynchronizer;
|
||||
private $autoSetup;
|
||||
|
||||
public function __construct(array $configuration, DBALConnection $driverConnection, ?SchemaSynchronizer $schemaSynchronizer = null)
|
||||
{
|
||||
$this->configuration = array_replace_recursive(static::DEFAULT_OPTIONS, $configuration);
|
||||
$this->driverConnection = $driverConnection;
|
||||
$this->schemaSynchronizer = $schemaSynchronizer;
|
||||
$this->autoSetup = $this->configuration['auto_setup'];
|
||||
}
|
||||
|
||||
public function reset()
|
||||
{
|
||||
$this->queueEmptiedAt = null;
|
||||
}
|
||||
|
||||
public function getConfiguration(): array
|
||||
{
|
||||
return $this->configuration;
|
||||
}
|
||||
|
||||
public static function buildConfiguration(string $dsn, array $options = []): array
|
||||
{
|
||||
if (false === $params = parse_url($dsn)) {
|
||||
throw new InvalidArgumentException('The given Doctrine Messenger DSN is invalid.');
|
||||
}
|
||||
|
||||
$query = [];
|
||||
if (isset($params['query'])) {
|
||||
parse_str($params['query'], $query);
|
||||
}
|
||||
|
||||
$configuration = ['connection' => $params['host']];
|
||||
$configuration += $query + $options + static::DEFAULT_OPTIONS;
|
||||
|
||||
$configuration['auto_setup'] = filter_var($configuration['auto_setup'], \FILTER_VALIDATE_BOOLEAN);
|
||||
|
||||
// check for extra keys in options
|
||||
$optionsExtraKeys = array_diff(array_keys($options), array_keys(static::DEFAULT_OPTIONS));
|
||||
if (0 < \count($optionsExtraKeys)) {
|
||||
throw new InvalidArgumentException(sprintf('Unknown option found: [%s]. Allowed options are [%s].', implode(', ', $optionsExtraKeys), implode(', ', array_keys(static::DEFAULT_OPTIONS))));
|
||||
}
|
||||
|
||||
// check for extra keys in options
|
||||
$queryExtraKeys = array_diff(array_keys($query), array_keys(static::DEFAULT_OPTIONS));
|
||||
if (0 < \count($queryExtraKeys)) {
|
||||
throw new InvalidArgumentException(sprintf('Unknown option found in DSN: [%s]. Allowed options are [%s].', implode(', ', $queryExtraKeys), implode(', ', array_keys(static::DEFAULT_OPTIONS))));
|
||||
}
|
||||
|
||||
return $configuration;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param int $delay The delay in milliseconds
|
||||
*
|
||||
* @return string The inserted id
|
||||
*
|
||||
* @throws DBALException
|
||||
*/
|
||||
public function send(string $body, array $headers, int $delay = 0): string
|
||||
{
|
||||
$now = new \DateTime();
|
||||
$availableAt = (clone $now)->modify(sprintf('%+d seconds', $delay / 1000));
|
||||
|
||||
$queryBuilder = $this->driverConnection->createQueryBuilder()
|
||||
->insert($this->configuration['table_name'])
|
||||
->values([
|
||||
'body' => '?',
|
||||
'headers' => '?',
|
||||
'queue_name' => '?',
|
||||
'created_at' => '?',
|
||||
'available_at' => '?',
|
||||
]);
|
||||
|
||||
$this->executeStatement($queryBuilder->getSQL(), [
|
||||
$body,
|
||||
json_encode($headers),
|
||||
$this->configuration['queue_name'],
|
||||
$now,
|
||||
$availableAt,
|
||||
], [
|
||||
Types::STRING,
|
||||
Types::STRING,
|
||||
Types::STRING,
|
||||
Types::DATETIME_MUTABLE,
|
||||
Types::DATETIME_MUTABLE,
|
||||
]);
|
||||
|
||||
return $this->driverConnection->lastInsertId();
|
||||
}
|
||||
|
||||
public function get(): ?array
|
||||
{
|
||||
if ($this->driverConnection->getDatabasePlatform() instanceof MySQLPlatform) {
|
||||
try {
|
||||
$this->driverConnection->delete($this->configuration['table_name'], ['delivered_at' => '9999-12-31 23:59:59']);
|
||||
} catch (DriverException $e) {
|
||||
// Ignore the exception
|
||||
} catch (TableNotFoundException $e) {
|
||||
if ($this->autoSetup) {
|
||||
$this->setup();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
get:
|
||||
$this->driverConnection->beginTransaction();
|
||||
try {
|
||||
$query = $this->createAvailableMessagesQueryBuilder()
|
||||
->orderBy('available_at', 'ASC')
|
||||
->setMaxResults(1);
|
||||
|
||||
if ($this->driverConnection->getDatabasePlatform() instanceof OraclePlatform) {
|
||||
$query->select('m.id');
|
||||
}
|
||||
|
||||
// Append pessimistic write lock to FROM clause if db platform supports it
|
||||
$sql = $query->getSQL();
|
||||
|
||||
// Wrap the rownum query in a sub-query to allow writelocks without ORA-02014 error
|
||||
if ($this->driverConnection->getDatabasePlatform() instanceof OraclePlatform) {
|
||||
$query = $this->createQueryBuilder('w')
|
||||
->where('w.id IN ('.str_replace('SELECT a.* FROM', 'SELECT a.id FROM', $sql).')')
|
||||
->setParameters($query->getParameters(), $query->getParameterTypes());
|
||||
|
||||
if (method_exists(QueryBuilder::class, 'forUpdate')) {
|
||||
$query->forUpdate();
|
||||
}
|
||||
|
||||
$sql = $query->getSQL();
|
||||
} elseif (method_exists(QueryBuilder::class, 'forUpdate')) {
|
||||
$query->forUpdate();
|
||||
try {
|
||||
$sql = $query->getSQL();
|
||||
} catch (DBALException $e) {
|
||||
}
|
||||
} elseif (preg_match('/FROM (.+) WHERE/', (string) $sql, $matches)) {
|
||||
$fromClause = $matches[1];
|
||||
$sql = str_replace(
|
||||
sprintf('FROM %s WHERE', $fromClause),
|
||||
sprintf('FROM %s WHERE', $this->driverConnection->getDatabasePlatform()->appendLockHint($fromClause, LockMode::PESSIMISTIC_WRITE)),
|
||||
$sql
|
||||
);
|
||||
}
|
||||
|
||||
// use SELECT ... FOR UPDATE to lock table
|
||||
if (!method_exists(QueryBuilder::class, 'forUpdate')) {
|
||||
$sql .= ' '.$this->driverConnection->getDatabasePlatform()->getWriteLockSQL();
|
||||
}
|
||||
|
||||
$stmt = $this->executeQuery(
|
||||
$sql,
|
||||
$query->getParameters(),
|
||||
$query->getParameterTypes()
|
||||
);
|
||||
$doctrineEnvelope = $stmt instanceof Result || $stmt instanceof DriverResult ? $stmt->fetchAssociative() : $stmt->fetch();
|
||||
|
||||
if (false === $doctrineEnvelope) {
|
||||
$this->driverConnection->commit();
|
||||
$this->queueEmptiedAt = microtime(true) * 1000;
|
||||
|
||||
return null;
|
||||
}
|
||||
// Postgres can "group" notifications having the same channel and payload
|
||||
// We need to be sure to empty the queue before blocking again
|
||||
$this->queueEmptiedAt = null;
|
||||
|
||||
$doctrineEnvelope = $this->decodeEnvelopeHeaders($doctrineEnvelope);
|
||||
|
||||
$queryBuilder = $this->driverConnection->createQueryBuilder()
|
||||
->update($this->configuration['table_name'])
|
||||
->set('delivered_at', '?')
|
||||
->where('id = ?');
|
||||
$now = new \DateTime();
|
||||
$this->executeStatement($queryBuilder->getSQL(), [
|
||||
$now,
|
||||
$doctrineEnvelope['id'],
|
||||
], [
|
||||
Types::DATETIME_MUTABLE,
|
||||
]);
|
||||
|
||||
$this->driverConnection->commit();
|
||||
|
||||
return $doctrineEnvelope;
|
||||
} catch (\Throwable $e) {
|
||||
$this->driverConnection->rollBack();
|
||||
|
||||
if ($this->autoSetup && $e instanceof TableNotFoundException) {
|
||||
$this->setup();
|
||||
goto get;
|
||||
}
|
||||
|
||||
throw $e;
|
||||
}
|
||||
}
|
||||
|
||||
public function ack(string $id): bool
|
||||
{
|
||||
try {
|
||||
if ($this->driverConnection->getDatabasePlatform() instanceof MySQLPlatform) {
|
||||
return $this->driverConnection->update($this->configuration['table_name'], ['delivered_at' => '9999-12-31 23:59:59'], ['id' => $id]) > 0;
|
||||
}
|
||||
|
||||
return $this->driverConnection->delete($this->configuration['table_name'], ['id' => $id]) > 0;
|
||||
} catch (DBALException $exception) {
|
||||
throw new TransportException($exception->getMessage(), 0, $exception);
|
||||
}
|
||||
}
|
||||
|
||||
public function reject(string $id): bool
|
||||
{
|
||||
try {
|
||||
if ($this->driverConnection->getDatabasePlatform() instanceof MySQLPlatform) {
|
||||
return $this->driverConnection->update($this->configuration['table_name'], ['delivered_at' => '9999-12-31 23:59:59'], ['id' => $id]) > 0;
|
||||
}
|
||||
|
||||
return $this->driverConnection->delete($this->configuration['table_name'], ['id' => $id]) > 0;
|
||||
} catch (DBALException $exception) {
|
||||
throw new TransportException($exception->getMessage(), 0, $exception);
|
||||
}
|
||||
}
|
||||
|
||||
public function setup(): void
|
||||
{
|
||||
$configuration = $this->driverConnection->getConfiguration();
|
||||
$assetFilter = $configuration->getSchemaAssetsFilter();
|
||||
$configuration->setSchemaAssetsFilter(function ($tableName) {
|
||||
if ($tableName instanceof AbstractAsset) {
|
||||
$tableName = $tableName->getName();
|
||||
}
|
||||
|
||||
if (!\is_string($tableName)) {
|
||||
throw new \TypeError(sprintf('The table name must be an instance of "%s" or a string ("%s" given).', AbstractAsset::class, get_debug_type($tableName)));
|
||||
}
|
||||
|
||||
return $tableName === $this->configuration['table_name'];
|
||||
});
|
||||
$this->updateSchema();
|
||||
$configuration->setSchemaAssetsFilter($assetFilter);
|
||||
$this->autoSetup = false;
|
||||
}
|
||||
|
||||
public function getMessageCount(): int
|
||||
{
|
||||
$queryBuilder = $this->createAvailableMessagesQueryBuilder()
|
||||
->select('COUNT(m.id) AS message_count')
|
||||
->setMaxResults(1);
|
||||
|
||||
$stmt = $this->executeQuery($queryBuilder->getSQL(), $queryBuilder->getParameters(), $queryBuilder->getParameterTypes());
|
||||
|
||||
return $stmt instanceof Result || $stmt instanceof DriverResult ? $stmt->fetchOne() : $stmt->fetchColumn();
|
||||
}
|
||||
|
||||
public function findAll(?int $limit = null): array
|
||||
{
|
||||
$queryBuilder = $this->createAvailableMessagesQueryBuilder();
|
||||
|
||||
if (null !== $limit) {
|
||||
$queryBuilder->setMaxResults($limit);
|
||||
}
|
||||
|
||||
$stmt = $this->executeQuery($queryBuilder->getSQL(), $queryBuilder->getParameters(), $queryBuilder->getParameterTypes());
|
||||
$data = $stmt instanceof Result || $stmt instanceof DriverResult ? $stmt->fetchAllAssociative() : $stmt->fetchAll();
|
||||
|
||||
return array_map(function ($doctrineEnvelope) {
|
||||
return $this->decodeEnvelopeHeaders($doctrineEnvelope);
|
||||
}, $data);
|
||||
}
|
||||
|
||||
public function find($id): ?array
|
||||
{
|
||||
$queryBuilder = $this->createQueryBuilder()
|
||||
->where('m.id = ? and m.queue_name = ?');
|
||||
|
||||
$stmt = $this->executeQuery($queryBuilder->getSQL(), [$id, $this->configuration['queue_name']]);
|
||||
$data = $stmt instanceof Result || $stmt instanceof DriverResult ? $stmt->fetchAssociative() : $stmt->fetch();
|
||||
|
||||
return false === $data ? null : $this->decodeEnvelopeHeaders($data);
|
||||
}
|
||||
|
||||
/**
|
||||
* @internal
|
||||
*/
|
||||
public function configureSchema(Schema $schema, DBALConnection $forConnection): void
|
||||
{
|
||||
// only update the schema for this connection
|
||||
if ($forConnection !== $this->driverConnection) {
|
||||
return;
|
||||
}
|
||||
|
||||
if ($schema->hasTable($this->configuration['table_name'])) {
|
||||
return;
|
||||
}
|
||||
|
||||
$this->addTableToSchema($schema);
|
||||
}
|
||||
|
||||
/**
|
||||
* @internal
|
||||
*/
|
||||
public function getExtraSetupSqlForTable(Table $createdTable): array
|
||||
{
|
||||
return [];
|
||||
}
|
||||
|
||||
private function createAvailableMessagesQueryBuilder(): QueryBuilder
|
||||
{
|
||||
$now = new \DateTime();
|
||||
$redeliverLimit = (clone $now)->modify(sprintf('-%d seconds', $this->configuration['redeliver_timeout']));
|
||||
|
||||
return $this->createQueryBuilder()
|
||||
->where('m.queue_name = ?')
|
||||
->andWhere('m.delivered_at is null OR m.delivered_at < ?')
|
||||
->andWhere('m.available_at <= ?')
|
||||
->setParameters([
|
||||
$this->configuration['queue_name'],
|
||||
$redeliverLimit,
|
||||
$now,
|
||||
], [
|
||||
Types::STRING,
|
||||
Types::DATETIME_MUTABLE,
|
||||
Types::DATETIME_MUTABLE,
|
||||
]);
|
||||
}
|
||||
|
||||
private function createQueryBuilder(string $alias = 'm'): QueryBuilder
|
||||
{
|
||||
$queryBuilder = $this->driverConnection->createQueryBuilder()
|
||||
->from($this->configuration['table_name'], $alias);
|
||||
|
||||
$alias .= '.';
|
||||
|
||||
if (!$this->driverConnection->getDatabasePlatform() instanceof OraclePlatform) {
|
||||
return $queryBuilder->select($alias.'*');
|
||||
}
|
||||
|
||||
// Oracle databases use UPPER CASE on tables and column identifiers.
|
||||
// Column alias is added to force the result to be lowercase even when the actual field is all caps.
|
||||
|
||||
return $queryBuilder->select(str_replace(', ', ', '.$alias,
|
||||
$alias.'id AS "id", body AS "body", headers AS "headers", queue_name AS "queue_name", '.
|
||||
'created_at AS "created_at", available_at AS "available_at", '.
|
||||
'delivered_at AS "delivered_at"'
|
||||
));
|
||||
}
|
||||
|
||||
private function executeQuery(string $sql, array $parameters = [], array $types = [])
|
||||
{
|
||||
try {
|
||||
$stmt = $this->driverConnection->executeQuery($sql, $parameters, $types);
|
||||
} catch (TableNotFoundException $e) {
|
||||
if ($this->driverConnection->isTransactionActive()) {
|
||||
throw $e;
|
||||
}
|
||||
|
||||
// create table
|
||||
if ($this->autoSetup) {
|
||||
$this->setup();
|
||||
}
|
||||
$stmt = $this->driverConnection->executeQuery($sql, $parameters, $types);
|
||||
}
|
||||
|
||||
return $stmt;
|
||||
}
|
||||
|
||||
protected function executeStatement(string $sql, array $parameters = [], array $types = [])
|
||||
{
|
||||
try {
|
||||
if (method_exists($this->driverConnection, 'executeStatement')) {
|
||||
$stmt = $this->driverConnection->executeStatement($sql, $parameters, $types);
|
||||
} else {
|
||||
$stmt = $this->driverConnection->executeUpdate($sql, $parameters, $types);
|
||||
}
|
||||
} catch (TableNotFoundException $e) {
|
||||
if ($this->driverConnection->isTransactionActive()) {
|
||||
throw $e;
|
||||
}
|
||||
|
||||
// create table
|
||||
if ($this->autoSetup) {
|
||||
$this->setup();
|
||||
}
|
||||
if (method_exists($this->driverConnection, 'executeStatement')) {
|
||||
$stmt = $this->driverConnection->executeStatement($sql, $parameters, $types);
|
||||
} else {
|
||||
$stmt = $this->driverConnection->executeUpdate($sql, $parameters, $types);
|
||||
}
|
||||
}
|
||||
|
||||
return $stmt;
|
||||
}
|
||||
|
||||
private function getSchema(): Schema
|
||||
{
|
||||
$schema = new Schema([], [], $this->createSchemaManager()->createSchemaConfig());
|
||||
$this->addTableToSchema($schema);
|
||||
|
||||
return $schema;
|
||||
}
|
||||
|
||||
private function addTableToSchema(Schema $schema): void
|
||||
{
|
||||
$table = $schema->createTable($this->configuration['table_name']);
|
||||
// add an internal option to mark that we created this & the non-namespaced table name
|
||||
$table->addOption(self::TABLE_OPTION_NAME, $this->configuration['table_name']);
|
||||
$table->addColumn('id', Types::BIGINT)
|
||||
->setAutoincrement(true)
|
||||
->setNotnull(true);
|
||||
$table->addColumn('body', Types::TEXT)
|
||||
->setNotnull(true);
|
||||
$table->addColumn('headers', Types::TEXT)
|
||||
->setNotnull(true);
|
||||
$table->addColumn('queue_name', Types::STRING)
|
||||
->setLength(190) // MySQL 5.6 only supports 191 characters on an indexed column in utf8mb4 mode
|
||||
->setNotnull(true);
|
||||
$table->addColumn('created_at', Types::DATETIME_MUTABLE)
|
||||
->setNotnull(true);
|
||||
$table->addColumn('available_at', Types::DATETIME_MUTABLE)
|
||||
->setNotnull(true);
|
||||
$table->addColumn('delivered_at', Types::DATETIME_MUTABLE)
|
||||
->setNotnull(false);
|
||||
$table->setPrimaryKey(['id']);
|
||||
$table->addIndex(['queue_name']);
|
||||
$table->addIndex(['available_at']);
|
||||
$table->addIndex(['delivered_at']);
|
||||
}
|
||||
|
||||
private function decodeEnvelopeHeaders(array $doctrineEnvelope): array
|
||||
{
|
||||
$doctrineEnvelope['headers'] = json_decode($doctrineEnvelope['headers'], true);
|
||||
|
||||
return $doctrineEnvelope;
|
||||
}
|
||||
|
||||
private function updateSchema(): void
|
||||
{
|
||||
if (null !== $this->schemaSynchronizer) {
|
||||
$this->schemaSynchronizer->updateSchema($this->getSchema(), true);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
$schemaManager = $this->createSchemaManager();
|
||||
$comparator = $this->createComparator($schemaManager);
|
||||
$schemaDiff = $this->compareSchemas($comparator, method_exists($schemaManager, 'introspectSchema') ? $schemaManager->introspectSchema() : $schemaManager->createSchema(), $this->getSchema());
|
||||
$platform = $this->driverConnection->getDatabasePlatform();
|
||||
$exec = method_exists($this->driverConnection, 'executeStatement') ? 'executeStatement' : 'exec';
|
||||
|
||||
if (!method_exists(SchemaDiff::class, 'getCreatedSchemas')) {
|
||||
foreach ($schemaDiff->toSaveSql($platform) as $sql) {
|
||||
$this->driverConnection->$exec($sql);
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
if ($platform->supportsSchemas()) {
|
||||
foreach ($schemaDiff->getCreatedSchemas() as $schema) {
|
||||
$this->driverConnection->$exec($platform->getCreateSchemaSQL($schema));
|
||||
}
|
||||
}
|
||||
|
||||
if ($platform->supportsSequences()) {
|
||||
foreach ($schemaDiff->getAlteredSequences() as $sequence) {
|
||||
$this->driverConnection->$exec($platform->getAlterSequenceSQL($sequence));
|
||||
}
|
||||
|
||||
foreach ($schemaDiff->getCreatedSequences() as $sequence) {
|
||||
$this->driverConnection->$exec($platform->getCreateSequenceSQL($sequence));
|
||||
}
|
||||
}
|
||||
|
||||
foreach ($platform->getCreateTablesSQL($schemaDiff->getCreatedTables()) as $sql) {
|
||||
$this->driverConnection->$exec($sql);
|
||||
}
|
||||
|
||||
foreach ($schemaDiff->getAlteredTables() as $tableDiff) {
|
||||
foreach ($platform->getAlterTableSQL($tableDiff) as $sql) {
|
||||
$this->driverConnection->$exec($sql);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private function createSchemaManager(): AbstractSchemaManager
|
||||
{
|
||||
return method_exists($this->driverConnection, 'createSchemaManager')
|
||||
? $this->driverConnection->createSchemaManager()
|
||||
: $this->driverConnection->getSchemaManager();
|
||||
}
|
||||
|
||||
private function createComparator(AbstractSchemaManager $schemaManager): Comparator
|
||||
{
|
||||
return method_exists($schemaManager, 'createComparator')
|
||||
? $schemaManager->createComparator()
|
||||
: new Comparator();
|
||||
}
|
||||
|
||||
private function compareSchemas(Comparator $comparator, Schema $from, Schema $to): SchemaDiff
|
||||
{
|
||||
return method_exists($comparator, 'compareSchemas') || method_exists($comparator, 'doCompareSchemas')
|
||||
? $comparator->compareSchemas($from, $to)
|
||||
: $comparator->compare($from, $to);
|
||||
}
|
||||
}
|
||||
|
||||
if (!class_exists(\Symfony\Component\Messenger\Transport\Doctrine\Connection::class, false)) {
|
||||
class_alias(Connection::class, \Symfony\Component\Messenger\Transport\Doctrine\Connection::class);
|
||||
}
|
||||
+36
@@ -0,0 +1,36 @@
|
||||
<?php
|
||||
|
||||
/*
|
||||
* This file is part of the Symfony package.
|
||||
*
|
||||
* (c) Fabien Potencier <fabien@symfony.com>
|
||||
*
|
||||
* For the full copyright and license information, please view the LICENSE
|
||||
* file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
namespace Symfony\Component\Messenger\Bridge\Doctrine\Transport;
|
||||
|
||||
use Symfony\Component\Messenger\Stamp\NonSendableStampInterface;
|
||||
|
||||
/**
|
||||
* @author Vincent Touzet <vincent.touzet@gmail.com>
|
||||
*/
|
||||
class DoctrineReceivedStamp implements NonSendableStampInterface
|
||||
{
|
||||
private $id;
|
||||
|
||||
public function __construct(string $id)
|
||||
{
|
||||
$this->id = $id;
|
||||
}
|
||||
|
||||
public function getId(): string
|
||||
{
|
||||
return $this->id;
|
||||
}
|
||||
}
|
||||
|
||||
if (!class_exists(\Symfony\Component\Messenger\Transport\Doctrine\DoctrineReceivedStamp::class, false)) {
|
||||
class_alias(DoctrineReceivedStamp::class, \Symfony\Component\Messenger\Transport\Doctrine\DoctrineReceivedStamp::class);
|
||||
}
|
||||
+175
@@ -0,0 +1,175 @@
|
||||
<?php
|
||||
|
||||
/*
|
||||
* This file is part of the Symfony package.
|
||||
*
|
||||
* (c) Fabien Potencier <fabien@symfony.com>
|
||||
*
|
||||
* For the full copyright and license information, please view the LICENSE
|
||||
* file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
namespace Symfony\Component\Messenger\Bridge\Doctrine\Transport;
|
||||
|
||||
use Doctrine\DBAL\Exception as DBALException;
|
||||
use Doctrine\DBAL\Exception\RetryableException;
|
||||
use Symfony\Component\Messenger\Envelope;
|
||||
use Symfony\Component\Messenger\Exception\LogicException;
|
||||
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
|
||||
use Symfony\Component\Messenger\Exception\TransportException;
|
||||
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
|
||||
use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
|
||||
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
|
||||
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
|
||||
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
|
||||
|
||||
/**
|
||||
* @author Vincent Touzet <vincent.touzet@gmail.com>
|
||||
*/
|
||||
class DoctrineReceiver implements ListableReceiverInterface, MessageCountAwareInterface
|
||||
{
|
||||
private const MAX_RETRIES = 3;
|
||||
private $retryingSafetyCounter = 0;
|
||||
private $connection;
|
||||
private $serializer;
|
||||
|
||||
public function __construct(Connection $connection, ?SerializerInterface $serializer = null)
|
||||
{
|
||||
$this->connection = $connection;
|
||||
$this->serializer = $serializer ?? new PhpSerializer();
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function get(): iterable
|
||||
{
|
||||
try {
|
||||
$doctrineEnvelope = $this->connection->get();
|
||||
$this->retryingSafetyCounter = 0; // reset counter
|
||||
} catch (RetryableException $exception) {
|
||||
// Do nothing when RetryableException occurs less than "MAX_RETRIES"
|
||||
// as it will likely be resolved on the next call to get()
|
||||
// Problem with concurrent consumers and database deadlocks
|
||||
if (++$this->retryingSafetyCounter >= self::MAX_RETRIES) {
|
||||
$this->retryingSafetyCounter = 0; // reset counter
|
||||
throw new TransportException($exception->getMessage(), 0, $exception);
|
||||
}
|
||||
|
||||
return [];
|
||||
} catch (DBALException $exception) {
|
||||
throw new TransportException($exception->getMessage(), 0, $exception);
|
||||
}
|
||||
|
||||
if (null === $doctrineEnvelope) {
|
||||
return [];
|
||||
}
|
||||
|
||||
return [$this->createEnvelopeFromData($doctrineEnvelope)];
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function ack(Envelope $envelope): void
|
||||
{
|
||||
try {
|
||||
$this->connection->ack($this->findDoctrineReceivedStamp($envelope)->getId());
|
||||
} catch (DBALException $exception) {
|
||||
throw new TransportException($exception->getMessage(), 0, $exception);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function reject(Envelope $envelope): void
|
||||
{
|
||||
try {
|
||||
$this->connection->reject($this->findDoctrineReceivedStamp($envelope)->getId());
|
||||
} catch (DBALException $exception) {
|
||||
throw new TransportException($exception->getMessage(), 0, $exception);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function getMessageCount(): int
|
||||
{
|
||||
try {
|
||||
return $this->connection->getMessageCount();
|
||||
} catch (DBALException $exception) {
|
||||
throw new TransportException($exception->getMessage(), 0, $exception);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function all(?int $limit = null): iterable
|
||||
{
|
||||
try {
|
||||
$doctrineEnvelopes = $this->connection->findAll($limit);
|
||||
} catch (DBALException $exception) {
|
||||
throw new TransportException($exception->getMessage(), 0, $exception);
|
||||
}
|
||||
|
||||
foreach ($doctrineEnvelopes as $doctrineEnvelope) {
|
||||
yield $this->createEnvelopeFromData($doctrineEnvelope);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function find($id): ?Envelope
|
||||
{
|
||||
try {
|
||||
$doctrineEnvelope = $this->connection->find($id);
|
||||
} catch (DBALException $exception) {
|
||||
throw new TransportException($exception->getMessage(), 0, $exception);
|
||||
}
|
||||
|
||||
if (null === $doctrineEnvelope) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return $this->createEnvelopeFromData($doctrineEnvelope);
|
||||
}
|
||||
|
||||
private function findDoctrineReceivedStamp(Envelope $envelope): DoctrineReceivedStamp
|
||||
{
|
||||
/** @var DoctrineReceivedStamp|null $doctrineReceivedStamp */
|
||||
$doctrineReceivedStamp = $envelope->last(DoctrineReceivedStamp::class);
|
||||
|
||||
if (null === $doctrineReceivedStamp) {
|
||||
throw new LogicException('No DoctrineReceivedStamp found on the Envelope.');
|
||||
}
|
||||
|
||||
return $doctrineReceivedStamp;
|
||||
}
|
||||
|
||||
private function createEnvelopeFromData(array $data): Envelope
|
||||
{
|
||||
try {
|
||||
$envelope = $this->serializer->decode([
|
||||
'body' => $data['body'],
|
||||
'headers' => $data['headers'],
|
||||
]);
|
||||
} catch (MessageDecodingFailedException $exception) {
|
||||
$this->connection->reject($data['id']);
|
||||
|
||||
throw $exception;
|
||||
}
|
||||
|
||||
return $envelope->with(
|
||||
new DoctrineReceivedStamp($data['id']),
|
||||
new TransportMessageIdStamp($data['id'])
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
if (!class_exists(\Symfony\Component\Messenger\Transport\Doctrine\DoctrineReceiver::class, false)) {
|
||||
class_alias(DoctrineReceiver::class, \Symfony\Component\Messenger\Transport\Doctrine\DoctrineReceiver::class);
|
||||
}
|
||||
@@ -0,0 +1,60 @@
|
||||
<?php
|
||||
|
||||
/*
|
||||
* This file is part of the Symfony package.
|
||||
*
|
||||
* (c) Fabien Potencier <fabien@symfony.com>
|
||||
*
|
||||
* For the full copyright and license information, please view the LICENSE
|
||||
* file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
namespace Symfony\Component\Messenger\Bridge\Doctrine\Transport;
|
||||
|
||||
use Doctrine\DBAL\Exception as DBALException;
|
||||
use Symfony\Component\Messenger\Envelope;
|
||||
use Symfony\Component\Messenger\Exception\TransportException;
|
||||
use Symfony\Component\Messenger\Stamp\DelayStamp;
|
||||
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
|
||||
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
|
||||
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
|
||||
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
|
||||
|
||||
/**
|
||||
* @author Vincent Touzet <vincent.touzet@gmail.com>
|
||||
*/
|
||||
class DoctrineSender implements SenderInterface
|
||||
{
|
||||
private $connection;
|
||||
private $serializer;
|
||||
|
||||
public function __construct(Connection $connection, ?SerializerInterface $serializer = null)
|
||||
{
|
||||
$this->connection = $connection;
|
||||
$this->serializer = $serializer ?? new PhpSerializer();
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function send(Envelope $envelope): Envelope
|
||||
{
|
||||
$encodedMessage = $this->serializer->encode($envelope);
|
||||
|
||||
/** @var DelayStamp|null $delayStamp */
|
||||
$delayStamp = $envelope->last(DelayStamp::class);
|
||||
$delay = null !== $delayStamp ? $delayStamp->getDelay() : 0;
|
||||
|
||||
try {
|
||||
$id = $this->connection->send($encodedMessage['body'], $encodedMessage['headers'] ?? [], $delay);
|
||||
} catch (DBALException $exception) {
|
||||
throw new TransportException($exception->getMessage(), 0, $exception);
|
||||
}
|
||||
|
||||
return $envelope->with(new TransportMessageIdStamp($id));
|
||||
}
|
||||
}
|
||||
|
||||
if (!class_exists(\Symfony\Component\Messenger\Transport\Doctrine\DoctrineSender::class, false)) {
|
||||
class_alias(DoctrineSender::class, \Symfony\Component\Messenger\Transport\Doctrine\DoctrineSender::class);
|
||||
}
|
||||
+135
@@ -0,0 +1,135 @@
|
||||
<?php
|
||||
|
||||
/*
|
||||
* This file is part of the Symfony package.
|
||||
*
|
||||
* (c) Fabien Potencier <fabien@symfony.com>
|
||||
*
|
||||
* For the full copyright and license information, please view the LICENSE
|
||||
* file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
namespace Symfony\Component\Messenger\Bridge\Doctrine\Transport;
|
||||
|
||||
use Doctrine\DBAL\Connection as DbalConnection;
|
||||
use Doctrine\DBAL\Schema\Schema;
|
||||
use Doctrine\DBAL\Schema\Table;
|
||||
use Symfony\Component\Messenger\Envelope;
|
||||
use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
|
||||
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
|
||||
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
|
||||
use Symfony\Component\Messenger\Transport\SetupableTransportInterface;
|
||||
use Symfony\Component\Messenger\Transport\TransportInterface;
|
||||
|
||||
/**
|
||||
* @author Vincent Touzet <vincent.touzet@gmail.com>
|
||||
*/
|
||||
class DoctrineTransport implements TransportInterface, SetupableTransportInterface, MessageCountAwareInterface, ListableReceiverInterface
|
||||
{
|
||||
private $connection;
|
||||
private $serializer;
|
||||
private $receiver;
|
||||
private $sender;
|
||||
|
||||
public function __construct(Connection $connection, SerializerInterface $serializer)
|
||||
{
|
||||
$this->connection = $connection;
|
||||
$this->serializer = $serializer;
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function get(): iterable
|
||||
{
|
||||
return ($this->receiver ?? $this->getReceiver())->get();
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function ack(Envelope $envelope): void
|
||||
{
|
||||
($this->receiver ?? $this->getReceiver())->ack($envelope);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function reject(Envelope $envelope): void
|
||||
{
|
||||
($this->receiver ?? $this->getReceiver())->reject($envelope);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function getMessageCount(): int
|
||||
{
|
||||
return ($this->receiver ?? $this->getReceiver())->getMessageCount();
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function all(?int $limit = null): iterable
|
||||
{
|
||||
return ($this->receiver ?? $this->getReceiver())->all($limit);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function find($id): ?Envelope
|
||||
{
|
||||
return ($this->receiver ?? $this->getReceiver())->find($id);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function send(Envelope $envelope): Envelope
|
||||
{
|
||||
return ($this->sender ?? $this->getSender())->send($envelope);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function setup(): void
|
||||
{
|
||||
$this->connection->setup();
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds the Table to the Schema if this transport uses this connection.
|
||||
*/
|
||||
public function configureSchema(Schema $schema, DbalConnection $forConnection): void
|
||||
{
|
||||
$this->connection->configureSchema($schema, $forConnection);
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds extra SQL if the given table was created by the Connection.
|
||||
*
|
||||
* @return string[]
|
||||
*/
|
||||
public function getExtraSetupSqlForTable(Table $createdTable): array
|
||||
{
|
||||
return $this->connection->getExtraSetupSqlForTable($createdTable);
|
||||
}
|
||||
|
||||
private function getReceiver(): DoctrineReceiver
|
||||
{
|
||||
return $this->receiver = new DoctrineReceiver($this->connection, $this->serializer);
|
||||
}
|
||||
|
||||
private function getSender(): DoctrineSender
|
||||
{
|
||||
return $this->sender = new DoctrineSender($this->connection, $this->serializer);
|
||||
}
|
||||
}
|
||||
|
||||
if (!class_exists(\Symfony\Component\Messenger\Transport\Doctrine\DoctrineTransport::class, false)) {
|
||||
class_alias(DoctrineTransport::class, \Symfony\Component\Messenger\Transport\Doctrine\DoctrineTransport::class);
|
||||
}
|
||||
+71
@@ -0,0 +1,71 @@
|
||||
<?php
|
||||
|
||||
/*
|
||||
* This file is part of the Symfony package.
|
||||
*
|
||||
* (c) Fabien Potencier <fabien@symfony.com>
|
||||
*
|
||||
* For the full copyright and license information, please view the LICENSE
|
||||
* file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
namespace Symfony\Component\Messenger\Bridge\Doctrine\Transport;
|
||||
|
||||
use Doctrine\DBAL\Platforms\PostgreSQLPlatform;
|
||||
use Doctrine\Persistence\ConnectionRegistry;
|
||||
use Symfony\Bridge\Doctrine\RegistryInterface;
|
||||
use Symfony\Component\Messenger\Exception\TransportException;
|
||||
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
|
||||
use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
|
||||
use Symfony\Component\Messenger\Transport\TransportInterface;
|
||||
|
||||
/**
|
||||
* @author Vincent Touzet <vincent.touzet@gmail.com>
|
||||
*/
|
||||
class DoctrineTransportFactory implements TransportFactoryInterface
|
||||
{
|
||||
private $registry;
|
||||
|
||||
public function __construct($registry)
|
||||
{
|
||||
if (!$registry instanceof RegistryInterface && !$registry instanceof ConnectionRegistry) {
|
||||
throw new \TypeError(sprintf('Expected an instance of "%s" or "%s", but got "%s".', RegistryInterface::class, ConnectionRegistry::class, get_debug_type($registry)));
|
||||
}
|
||||
|
||||
$this->registry = $registry;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param array $options You can set 'use_notify' to false to not use LISTEN/NOTIFY with postgresql
|
||||
*/
|
||||
public function createTransport(string $dsn, array $options, SerializerInterface $serializer): TransportInterface
|
||||
{
|
||||
$useNotify = ($options['use_notify'] ?? true);
|
||||
unset($options['transport_name'], $options['use_notify']);
|
||||
// Always allow PostgreSQL-specific keys, to be able to transparently fallback to the native driver when LISTEN/NOTIFY isn't available
|
||||
$configuration = PostgreSqlConnection::buildConfiguration($dsn, $options);
|
||||
|
||||
try {
|
||||
$driverConnection = $this->registry->getConnection($configuration['connection']);
|
||||
} catch (\InvalidArgumentException $e) {
|
||||
throw new TransportException('Could not find Doctrine connection from Messenger DSN.', 0, $e);
|
||||
}
|
||||
|
||||
if ($useNotify && $driverConnection->getDatabasePlatform() instanceof PostgreSQLPlatform) {
|
||||
$connection = new PostgreSqlConnection($configuration, $driverConnection);
|
||||
} else {
|
||||
$connection = new Connection($configuration, $driverConnection);
|
||||
}
|
||||
|
||||
return new DoctrineTransport($connection, $serializer);
|
||||
}
|
||||
|
||||
public function supports(string $dsn, array $options): bool
|
||||
{
|
||||
return 0 === strpos($dsn, 'doctrine://');
|
||||
}
|
||||
}
|
||||
|
||||
if (!class_exists(\Symfony\Component\Messenger\Transport\Doctrine\DoctrineTransportFactory::class, false)) {
|
||||
class_alias(DoctrineTransportFactory::class, \Symfony\Component\Messenger\Transport\Doctrine\DoctrineTransportFactory::class);
|
||||
}
|
||||
+151
@@ -0,0 +1,151 @@
|
||||
<?php
|
||||
|
||||
/*
|
||||
* This file is part of the Symfony package.
|
||||
*
|
||||
* (c) Fabien Potencier <fabien@symfony.com>
|
||||
*
|
||||
* For the full copyright and license information, please view the LICENSE
|
||||
* file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
namespace Symfony\Component\Messenger\Bridge\Doctrine\Transport;
|
||||
|
||||
use Doctrine\DBAL\Schema\Table;
|
||||
|
||||
/**
|
||||
* Uses PostgreSQL LISTEN/NOTIFY to push messages to workers.
|
||||
*
|
||||
* If you do not want to use the LISTEN mechanism, set the `use_notify` option to `false` when calling DoctrineTransportFactory::createTransport.
|
||||
*
|
||||
* @internal
|
||||
*
|
||||
* @author Kévin Dunglas <dunglas@gmail.com>
|
||||
*/
|
||||
final class PostgreSqlConnection extends Connection
|
||||
{
|
||||
/**
|
||||
* * check_delayed_interval: The interval to check for delayed messages, in milliseconds. Set to 0 to disable checks. Default: 60000 (1 minute)
|
||||
* * get_notify_timeout: The length of time to wait for a response when calling PDO::pgsqlGetNotify, in milliseconds. Default: 0.
|
||||
*/
|
||||
protected const DEFAULT_OPTIONS = parent::DEFAULT_OPTIONS + [
|
||||
'check_delayed_interval' => 60000,
|
||||
'get_notify_timeout' => 0,
|
||||
];
|
||||
|
||||
public function __sleep(): array
|
||||
{
|
||||
throw new \BadMethodCallException('Cannot serialize '.__CLASS__);
|
||||
}
|
||||
|
||||
public function __wakeup()
|
||||
{
|
||||
throw new \BadMethodCallException('Cannot unserialize '.__CLASS__);
|
||||
}
|
||||
|
||||
public function __destruct()
|
||||
{
|
||||
$this->unlisten();
|
||||
}
|
||||
|
||||
public function reset()
|
||||
{
|
||||
parent::reset();
|
||||
$this->unlisten();
|
||||
}
|
||||
|
||||
public function get(): ?array
|
||||
{
|
||||
if (null === $this->queueEmptiedAt) {
|
||||
return parent::get();
|
||||
}
|
||||
|
||||
// This is secure because the table name must be a valid identifier:
|
||||
// https://www.postgresql.org/docs/current/sql-syntax-lexical.html#SQL-SYNTAX-IDENTIFIERS
|
||||
$this->executeStatement(sprintf('LISTEN "%s"', $this->configuration['table_name']));
|
||||
|
||||
// The condition should be removed once support for DBAL <3.3 is dropped
|
||||
if (method_exists($this->driverConnection, 'getNativeConnection')) {
|
||||
$wrappedConnection = $this->driverConnection->getNativeConnection();
|
||||
} else {
|
||||
$wrappedConnection = $this->driverConnection;
|
||||
while (method_exists($wrappedConnection, 'getWrappedConnection')) {
|
||||
$wrappedConnection = $wrappedConnection->getWrappedConnection();
|
||||
}
|
||||
}
|
||||
|
||||
$notification = $wrappedConnection->pgsqlGetNotify(\PDO::FETCH_ASSOC, $this->configuration['get_notify_timeout']);
|
||||
if (
|
||||
// no notifications, or for another table or queue
|
||||
(false === $notification || $notification['message'] !== $this->configuration['table_name'] || $notification['payload'] !== $this->configuration['queue_name']) &&
|
||||
// delayed messages
|
||||
(microtime(true) * 1000 - $this->queueEmptiedAt < $this->configuration['check_delayed_interval'])
|
||||
) {
|
||||
usleep(1000);
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
return parent::get();
|
||||
}
|
||||
|
||||
public function setup(): void
|
||||
{
|
||||
parent::setup();
|
||||
|
||||
$this->executeStatement(implode("\n", $this->getTriggerSql()));
|
||||
}
|
||||
|
||||
/**
|
||||
* @return string[]
|
||||
*/
|
||||
public function getExtraSetupSqlForTable(Table $createdTable): array
|
||||
{
|
||||
if (!$createdTable->hasOption(self::TABLE_OPTION_NAME)) {
|
||||
return [];
|
||||
}
|
||||
|
||||
if ($createdTable->getOption(self::TABLE_OPTION_NAME) !== $this->configuration['table_name']) {
|
||||
return [];
|
||||
}
|
||||
|
||||
return $this->getTriggerSql();
|
||||
}
|
||||
|
||||
private function getTriggerSql(): array
|
||||
{
|
||||
$functionName = $this->createTriggerFunctionName();
|
||||
|
||||
return [
|
||||
// create trigger function
|
||||
sprintf(<<<'SQL'
|
||||
CREATE OR REPLACE FUNCTION %1$s() RETURNS TRIGGER AS $$
|
||||
BEGIN
|
||||
PERFORM pg_notify('%2$s', NEW.queue_name::text);
|
||||
RETURN NEW;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
SQL
|
||||
, $functionName, $this->configuration['table_name']),
|
||||
// register trigger
|
||||
sprintf('DROP TRIGGER IF EXISTS notify_trigger ON %s;', $this->configuration['table_name']),
|
||||
sprintf('CREATE TRIGGER notify_trigger AFTER INSERT OR UPDATE ON %1$s FOR EACH ROW EXECUTE PROCEDURE %2$s();', $this->configuration['table_name'], $functionName),
|
||||
];
|
||||
}
|
||||
|
||||
private function createTriggerFunctionName(): string
|
||||
{
|
||||
$tableConfig = explode('.', $this->configuration['table_name']);
|
||||
|
||||
if (1 === \count($tableConfig)) {
|
||||
return sprintf('notify_%1$s', $tableConfig[0]);
|
||||
}
|
||||
|
||||
return sprintf('%1$s.notify_%2$s', $tableConfig[0], $tableConfig[1]);
|
||||
}
|
||||
|
||||
private function unlisten()
|
||||
{
|
||||
$this->executeStatement(sprintf('UNLISTEN "%s"', $this->configuration['table_name']));
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user