Move plugins to manifest, pin Docker version, add Makefile

- Add plugins.txt listing all plugins for reproducible installs
- Add Makefile with setup/start/stop/install-plugins targets
- Remove user/plugins/ from git tracking
- Pin Docker image to 1.7.49.5-ls244

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-04-19 00:55:59 +02:00
parent 8f9ac9ca6e
commit 4f52d4d085
2738 changed files with 0 additions and 472444 deletions
@@ -1,21 +0,0 @@
CHANGELOG
=========
5.3
---
* Deprecated the `prefetch_count` parameter, it has no effect and will be removed in Symfony 6.0.
* `AmqpReceiver` implements `QueueReceiverInterface` to fetch messages from a specific set of queues.
* Add ability to distinguish retry and delay actions
5.2.0
-----
* Add option to confirm message delivery
* DSN now support AMQPS out-of-the-box.
5.1.0
-----
* Introduced the AMQP bridge.
* Deprecated use of invalid options
-19
View File
@@ -1,19 +0,0 @@
Copyright (c) 2018-present Fabien Potencier
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is furnished
to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
-12
View File
@@ -1,12 +0,0 @@
AMQP Messenger
==============
Provides AMQP integration for Symfony Messenger.
Resources
---------
* [Contributing](https://symfony.com/doc/current/contributing/index.html)
* [Report issues](https://github.com/symfony/symfony/issues) and
[send Pull Requests](https://github.com/symfony/symfony/pulls)
in the [main Symfony repository](https://github.com/symfony/symfony)
@@ -1,39 +0,0 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Bridge\Amqp\Transport;
class AmqpFactory
{
public function createConnection(array $credentials): \AMQPConnection
{
return new \AMQPConnection($credentials);
}
public function createChannel(\AMQPConnection $connection): \AMQPChannel
{
return new \AMQPChannel($connection);
}
public function createQueue(\AMQPChannel $channel): \AMQPQueue
{
return new \AMQPQueue($channel);
}
public function createExchange(\AMQPChannel $channel): \AMQPExchange
{
return new \AMQPExchange($channel);
}
}
if (!class_exists(\Symfony\Component\Messenger\Transport\AmqpExt\AmqpFactory::class, false)) {
class_alias(AmqpFactory::class, \Symfony\Component\Messenger\Transport\AmqpExt\AmqpFactory::class);
}
@@ -1,43 +0,0 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Bridge\Amqp\Transport;
use Symfony\Component\Messenger\Stamp\NonSendableStampInterface;
/**
* Stamp applied when a message is received from Amqp.
*/
class AmqpReceivedStamp implements NonSendableStampInterface
{
private $amqpEnvelope;
private $queueName;
public function __construct(\AMQPEnvelope $amqpEnvelope, string $queueName)
{
$this->amqpEnvelope = $amqpEnvelope;
$this->queueName = $queueName;
}
public function getAmqpEnvelope(): \AMQPEnvelope
{
return $this->amqpEnvelope;
}
public function getQueueName(): string
{
return $this->queueName;
}
}
if (!class_exists(\Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceivedStamp::class, false)) {
class_alias(AmqpReceivedStamp::class, \Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceivedStamp::class);
}
@@ -1,150 +0,0 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Bridge\Amqp\Transport;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\LogicException;
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
use Symfony\Component\Messenger\Exception\TransportException;
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
use Symfony\Component\Messenger\Transport\Receiver\QueueReceiverInterface;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
/**
* Symfony Messenger receiver to get messages from AMQP brokers using PHP's AMQP extension.
*
* @author Samuel Roze <samuel.roze@gmail.com>
*/
class AmqpReceiver implements QueueReceiverInterface, MessageCountAwareInterface
{
private $serializer;
private $connection;
public function __construct(Connection $connection, ?SerializerInterface $serializer = null)
{
$this->connection = $connection;
$this->serializer = $serializer ?? new PhpSerializer();
}
/**
* {@inheritdoc}
*/
public function get(): iterable
{
yield from $this->getFromQueues($this->connection->getQueueNames());
}
/**
* {@inheritdoc}
*/
public function getFromQueues(array $queueNames): iterable
{
foreach ($queueNames as $queueName) {
yield from $this->getEnvelope($queueName);
}
}
private function getEnvelope(string $queueName): iterable
{
try {
$amqpEnvelope = $this->connection->get($queueName);
} catch (\AMQPException $exception) {
throw new TransportException($exception->getMessage(), 0, $exception);
}
if (null === $amqpEnvelope) {
return;
}
$body = $amqpEnvelope->getBody();
try {
$envelope = $this->serializer->decode([
'body' => false === $body ? '' : $body, // workaround https://github.com/pdezwart/php-amqp/issues/351
'headers' => $amqpEnvelope->getHeaders(),
]);
} catch (MessageDecodingFailedException $exception) {
// invalid message of some type
$this->rejectAmqpEnvelope($amqpEnvelope, $queueName);
throw $exception;
}
yield $envelope->with(new AmqpReceivedStamp($amqpEnvelope, $queueName));
}
/**
* {@inheritdoc}
*/
public function ack(Envelope $envelope): void
{
try {
$stamp = $this->findAmqpStamp($envelope);
$this->connection->ack(
$stamp->getAmqpEnvelope(),
$stamp->getQueueName()
);
} catch (\AMQPException $exception) {
throw new TransportException($exception->getMessage(), 0, $exception);
}
}
/**
* {@inheritdoc}
*/
public function reject(Envelope $envelope): void
{
$stamp = $this->findAmqpStamp($envelope);
$this->rejectAmqpEnvelope(
$stamp->getAmqpEnvelope(),
$stamp->getQueueName()
);
}
/**
* {@inheritdoc}
*/
public function getMessageCount(): int
{
try {
return $this->connection->countMessagesInQueues();
} catch (\AMQPException $exception) {
throw new TransportException($exception->getMessage(), 0, $exception);
}
}
private function rejectAmqpEnvelope(\AMQPEnvelope $amqpEnvelope, string $queueName): void
{
try {
$this->connection->nack($amqpEnvelope, $queueName, \AMQP_NOPARAM);
} catch (\AMQPException $exception) {
throw new TransportException($exception->getMessage(), 0, $exception);
}
}
private function findAmqpStamp(Envelope $envelope): AmqpReceivedStamp
{
$amqpReceivedStamp = $envelope->last(AmqpReceivedStamp::class);
if (null === $amqpReceivedStamp) {
throw new LogicException('No "AmqpReceivedStamp" stamp found on the Envelope.');
}
return $amqpReceivedStamp;
}
}
if (!class_exists(\Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceiver::class, false)) {
class_alias(AmqpReceiver::class, \Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceiver::class);
}
@@ -1,86 +0,0 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Bridge\Amqp\Transport;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\TransportException;
use Symfony\Component\Messenger\Stamp\DelayStamp;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
/**
* Symfony Messenger sender to send messages to AMQP brokers using PHP's AMQP extension.
*
* @author Samuel Roze <samuel.roze@gmail.com>
*/
class AmqpSender implements SenderInterface
{
private $serializer;
private $connection;
public function __construct(Connection $connection, ?SerializerInterface $serializer = null)
{
$this->connection = $connection;
$this->serializer = $serializer ?? new PhpSerializer();
}
/**
* {@inheritdoc}
*/
public function send(Envelope $envelope): Envelope
{
$encodedMessage = $this->serializer->encode($envelope);
/** @var DelayStamp|null $delayStamp */
$delayStamp = $envelope->last(DelayStamp::class);
$delay = $delayStamp ? $delayStamp->getDelay() : 0;
/** @var AmqpStamp|null $amqpStamp */
$amqpStamp = $envelope->last(AmqpStamp::class);
if (isset($encodedMessage['headers']['Content-Type'])) {
$contentType = $encodedMessage['headers']['Content-Type'];
unset($encodedMessage['headers']['Content-Type']);
if (!$amqpStamp || !isset($amqpStamp->getAttributes()['content_type'])) {
$amqpStamp = AmqpStamp::createWithAttributes(['content_type' => $contentType], $amqpStamp);
}
}
$amqpReceivedStamp = $envelope->last(AmqpReceivedStamp::class);
if ($amqpReceivedStamp instanceof AmqpReceivedStamp) {
$amqpStamp = AmqpStamp::createFromAmqpEnvelope(
$amqpReceivedStamp->getAmqpEnvelope(),
$amqpStamp,
$envelope->last(RedeliveryStamp::class) ? $amqpReceivedStamp->getQueueName() : null
);
}
try {
$this->connection->publish(
$encodedMessage['body'],
$encodedMessage['headers'] ?? [],
$delay,
$amqpStamp
);
} catch (\AMQPException $e) {
throw new TransportException($e->getMessage(), 0, $e);
}
return $envelope;
}
}
if (!class_exists(\Symfony\Component\Messenger\Transport\AmqpExt\AmqpSender::class, false)) {
class_alias(AmqpSender::class, \Symfony\Component\Messenger\Transport\AmqpExt\AmqpSender::class);
}
@@ -1,94 +0,0 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Bridge\Amqp\Transport;
use Symfony\Component\Messenger\Stamp\NonSendableStampInterface;
/**
* @author Guillaume Gammelin <ggammelin@gmail.com>
* @author Samuel Roze <samuel.roze@gmail.com>
*/
final class AmqpStamp implements NonSendableStampInterface
{
private $routingKey;
private $flags;
private $attributes;
private $isRetryAttempt = false;
public function __construct(?string $routingKey = null, int $flags = \AMQP_NOPARAM, array $attributes = [])
{
$this->routingKey = $routingKey;
$this->flags = $flags;
$this->attributes = $attributes;
}
public function getRoutingKey(): ?string
{
return $this->routingKey;
}
public function getFlags(): int
{
return $this->flags;
}
public function getAttributes(): array
{
return $this->attributes;
}
public static function createFromAmqpEnvelope(\AMQPEnvelope $amqpEnvelope, ?self $previousStamp = null, ?string $retryRoutingKey = null): self
{
$attr = $previousStamp->attributes ?? [];
$attr['headers'] = $attr['headers'] ?? $amqpEnvelope->getHeaders();
$attr['content_type'] = $attr['content_type'] ?? $amqpEnvelope->getContentType();
$attr['content_encoding'] = $attr['content_encoding'] ?? $amqpEnvelope->getContentEncoding();
$attr['delivery_mode'] = $attr['delivery_mode'] ?? $amqpEnvelope->getDeliveryMode();
$attr['priority'] = $attr['priority'] ?? $amqpEnvelope->getPriority();
$attr['timestamp'] = $attr['timestamp'] ?? $amqpEnvelope->getTimestamp();
$attr['app_id'] = $attr['app_id'] ?? $amqpEnvelope->getAppId();
$attr['message_id'] = $attr['message_id'] ?? $amqpEnvelope->getMessageId();
$attr['user_id'] = $attr['user_id'] ?? $amqpEnvelope->getUserId();
$attr['expiration'] = $attr['expiration'] ?? $amqpEnvelope->getExpiration();
$attr['type'] = $attr['type'] ?? $amqpEnvelope->getType();
$attr['reply_to'] = $attr['reply_to'] ?? $amqpEnvelope->getReplyTo();
$attr['correlation_id'] = $attr['correlation_id'] ?? $amqpEnvelope->getCorrelationId();
if (null === $retryRoutingKey) {
$stamp = new self($previousStamp->routingKey ?? $amqpEnvelope->getRoutingKey(), $previousStamp->flags ?? \AMQP_NOPARAM, $attr);
} else {
$stamp = new self($retryRoutingKey, $previousStamp->flags ?? \AMQP_NOPARAM, $attr);
$stamp->isRetryAttempt = true;
}
return $stamp;
}
public function isRetryAttempt(): bool
{
return $this->isRetryAttempt;
}
public static function createWithAttributes(array $attributes, ?self $previousStamp = null): self
{
return new self(
$previousStamp->routingKey ?? null,
$previousStamp->flags ?? \AMQP_NOPARAM,
array_merge($previousStamp->attributes ?? [], $attributes)
);
}
}
if (!class_exists(\Symfony\Component\Messenger\Transport\AmqpExt\AmqpStamp::class, false)) {
class_alias(AmqpStamp::class, \Symfony\Component\Messenger\Transport\AmqpExt\AmqpStamp::class);
}
@@ -1,107 +0,0 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Bridge\Amqp\Transport;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
use Symfony\Component\Messenger\Transport\Receiver\QueueReceiverInterface;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Messenger\Transport\SetupableTransportInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;
/**
* @author Nicolas Grekas <p@tchwork.com>
*/
class AmqpTransport implements QueueReceiverInterface, TransportInterface, SetupableTransportInterface, MessageCountAwareInterface
{
private $serializer;
private $connection;
private $receiver;
private $sender;
public function __construct(Connection $connection, ?SerializerInterface $serializer = null)
{
$this->connection = $connection;
$this->serializer = $serializer ?? new PhpSerializer();
}
/**
* {@inheritdoc}
*/
public function get(): iterable
{
return ($this->receiver ?? $this->getReceiver())->get();
}
/**
* {@inheritdoc}
*/
public function getFromQueues(array $queueNames): iterable
{
return ($this->receiver ?? $this->getReceiver())->getFromQueues($queueNames);
}
/**
* {@inheritdoc}
*/
public function ack(Envelope $envelope): void
{
($this->receiver ?? $this->getReceiver())->ack($envelope);
}
/**
* {@inheritdoc}
*/
public function reject(Envelope $envelope): void
{
($this->receiver ?? $this->getReceiver())->reject($envelope);
}
/**
* {@inheritdoc}
*/
public function send(Envelope $envelope): Envelope
{
return ($this->sender ?? $this->getSender())->send($envelope);
}
/**
* {@inheritdoc}
*/
public function setup(): void
{
$this->connection->setup();
}
/**
* {@inheritdoc}
*/
public function getMessageCount(): int
{
return ($this->receiver ?? $this->getReceiver())->getMessageCount();
}
private function getReceiver(): AmqpReceiver
{
return $this->receiver = new AmqpReceiver($this->connection, $this->serializer);
}
private function getSender(): AmqpSender
{
return $this->sender = new AmqpSender($this->connection, $this->serializer);
}
}
if (!class_exists(\Symfony\Component\Messenger\Transport\AmqpExt\AmqpTransport::class, false)) {
class_alias(AmqpTransport::class, \Symfony\Component\Messenger\Transport\AmqpExt\AmqpTransport::class);
}
@@ -1,38 +0,0 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Bridge\Amqp\Transport;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;
/**
* @author Samuel Roze <samuel.roze@gmail.com>
*/
class AmqpTransportFactory implements TransportFactoryInterface
{
public function createTransport(string $dsn, array $options, SerializerInterface $serializer): TransportInterface
{
unset($options['transport_name']);
return new AmqpTransport(Connection::fromDsn($dsn, $options), $serializer);
}
public function supports(string $dsn, array $options): bool
{
return 0 === strpos($dsn, 'amqp://') || 0 === strpos($dsn, 'amqps://');
}
}
if (!class_exists(\Symfony\Component\Messenger\Transport\AmqpExt\AmqpTransportFactory::class, false)) {
class_alias(AmqpTransportFactory::class, \Symfony\Component\Messenger\Transport\AmqpExt\AmqpTransportFactory::class);
}
@@ -1,626 +0,0 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Bridge\Amqp\Transport;
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
use Symfony\Component\Messenger\Exception\LogicException;
use Symfony\Component\Messenger\Exception\TransportException;
/**
* An AMQP connection.
*
* @author Samuel Roze <samuel.roze@gmail.com>
*
* @final
*/
class Connection
{
private const ARGUMENTS_AS_INTEGER = [
'x-delay',
'x-expires',
'x-max-length',
'x-max-length-bytes',
'x-max-priority',
'x-message-ttl',
];
/**
* @see https://github.com/php-amqp/php-amqp/blob/master/amqp_connection_resource.h
*/
private const AVAILABLE_OPTIONS = [
'host',
'port',
'vhost',
'user',
'login',
'password',
'queues',
'exchange',
'delay',
'auto_setup',
'prefetch_count',
'retry',
'persistent',
'frame_max',
'channel_max',
'heartbeat',
'read_timeout',
'write_timeout',
'confirm_timeout',
'connect_timeout',
'rpc_timeout',
'cacert',
'cert',
'key',
'verify',
'sasl_method',
];
private const AVAILABLE_QUEUE_OPTIONS = [
'binding_keys',
'binding_arguments',
'flags',
'arguments',
];
private const AVAILABLE_EXCHANGE_OPTIONS = [
'name',
'type',
'default_publish_routing_key',
'flags',
'arguments',
];
private $connectionOptions;
private $exchangeOptions;
private $queuesOptions;
private $amqpFactory;
private $autoSetupExchange;
private $autoSetupDelayExchange;
/**
* @var \AMQPChannel|null
*/
private $amqpChannel;
/**
* @var \AMQPExchange|null
*/
private $amqpExchange;
/**
* @var \AMQPQueue[]|null
*/
private $amqpQueues = [];
/**
* @var \AMQPExchange|null
*/
private $amqpDelayExchange;
/**
* @var int
*/
private $lastActivityTime = 0;
public function __construct(array $connectionOptions, array $exchangeOptions, array $queuesOptions, ?AmqpFactory $amqpFactory = null)
{
if (!\extension_loaded('amqp')) {
throw new LogicException(sprintf('You cannot use the "%s" as the "amqp" extension is not installed.', __CLASS__));
}
$this->connectionOptions = array_replace_recursive([
'delay' => [
'exchange_name' => 'delays',
'queue_name_pattern' => 'delay_%exchange_name%_%routing_key%_%delay%',
],
], $connectionOptions);
$this->autoSetupExchange = $this->autoSetupDelayExchange = $connectionOptions['auto_setup'] ?? true;
$this->exchangeOptions = $exchangeOptions;
$this->queuesOptions = $queuesOptions;
$this->amqpFactory = $amqpFactory ?? new AmqpFactory();
}
/**
* Creates a connection based on the DSN and options.
*
* Available options:
*
* * host: Hostname of the AMQP service
* * port: Port of the AMQP service
* * vhost: Virtual Host to use with the AMQP service
* * user|login: Username to use to connect the AMQP service
* * password: Password to use to connect to the AMQP service
* * read_timeout: Timeout in for income activity. Note: 0 or greater seconds. May be fractional.
* * write_timeout: Timeout in for outcome activity. Note: 0 or greater seconds. May be fractional.
* * connect_timeout: Connection timeout. Note: 0 or greater seconds. May be fractional.
* * confirm_timeout: Timeout in seconds for confirmation, if none specified transport will not wait for message confirmation. Note: 0 or greater seconds. May be fractional.
* * queues[name]: An array of queues, keyed by the name
* * binding_keys: The binding keys (if any) to bind to this queue
* * binding_arguments: Arguments to be used while binding the queue.
* * flags: Queue flags (Default: AMQP_DURABLE)
* * arguments: Extra arguments
* * exchange:
* * name: Name of the exchange
* * type: Type of exchange (Default: fanout)
* * default_publish_routing_key: Routing key to use when publishing, if none is specified on the message
* * flags: Exchange flags (Default: AMQP_DURABLE)
* * arguments: Extra arguments
* * delay:
* * queue_name_pattern: Pattern to use to create the queues (Default: "delay_%exchange_name%_%routing_key%_%delay%")
* * exchange_name: Name of the exchange to be used for the delayed/retried messages (Default: "delays")
* * auto_setup: Enable or not the auto-setup of queues and exchanges (Default: true)
*
* * Connection tuning options (see http://www.rabbitmq.com/amqp-0-9-1-reference.html#connection.tune for details):
* * channel_max: Specifies highest channel number that the server permits. 0 means standard extension limit
* (see PHP_AMQP_MAX_CHANNELS constant)
* * frame_max: The largest frame size that the server proposes for the connection, including frame header
* and end-byte. 0 means standard extension limit (depends on librabbimq default frame size limit)
* * heartbeat: The delay, in seconds, of the connection heartbeat that the server wants.
* 0 means the server does not want a heartbeat. Note, librabbitmq has limited heartbeat support,
* which means heartbeats checked only during blocking calls.
*
* TLS support (see https://www.rabbitmq.com/ssl.html for details):
* * cacert: Path to the CA cert file in PEM format.
* * cert: Path to the client certificate in PEM format.
* * key: Path to the client key in PEM format.
* * verify: Enable or disable peer verification. If peer verification is enabled then the common name in the
* server certificate must match the server name. Peer verification is enabled by default.
*/
public static function fromDsn(string $dsn, array $options = [], ?AmqpFactory $amqpFactory = null): self
{
if (false === $params = parse_url($dsn)) {
// this is a valid URI that parse_url cannot handle when you want to pass all parameters as options
if (!\in_array($dsn, ['amqp://', 'amqps://'])) {
throw new InvalidArgumentException('The given AMQP DSN is invalid.');
}
$params = [];
}
$useAmqps = 0 === strpos($dsn, 'amqps://');
$pathParts = isset($params['path']) ? explode('/', trim($params['path'], '/')) : [];
$exchangeName = $pathParts[1] ?? 'messages';
parse_str($params['query'] ?? '', $parsedQuery);
$port = $useAmqps ? 5671 : 5672;
$amqpOptions = array_replace_recursive([
'host' => $params['host'] ?? 'localhost',
'port' => $params['port'] ?? $port,
'vhost' => isset($pathParts[0]) ? urldecode($pathParts[0]) : '/',
'exchange' => [
'name' => $exchangeName,
],
], $options, $parsedQuery);
self::validateOptions($amqpOptions);
if (isset($params['user'])) {
$amqpOptions['login'] = rawurldecode($params['user']);
}
if (isset($params['pass'])) {
$amqpOptions['password'] = rawurldecode($params['pass']);
}
if (!isset($amqpOptions['queues'])) {
$amqpOptions['queues'][$exchangeName] = [];
}
$exchangeOptions = $amqpOptions['exchange'];
$queuesOptions = $amqpOptions['queues'];
unset($amqpOptions['queues'], $amqpOptions['exchange']);
if (isset($amqpOptions['auto_setup'])) {
$amqpOptions['auto_setup'] = filter_var($amqpOptions['auto_setup'], \FILTER_VALIDATE_BOOLEAN);
}
$queuesOptions = array_map(function ($queueOptions) {
if (!\is_array($queueOptions)) {
$queueOptions = [];
}
if (\is_array($queueOptions['arguments'] ?? false)) {
$queueOptions['arguments'] = self::normalizeQueueArguments($queueOptions['arguments']);
}
return $queueOptions;
}, $queuesOptions);
if (!$useAmqps) {
unset($amqpOptions['cacert'], $amqpOptions['cert'], $amqpOptions['key'], $amqpOptions['verify']);
}
if ($useAmqps && !self::hasCaCertConfigured($amqpOptions)) {
throw new InvalidArgumentException('No CA certificate has been provided. Set "amqp.cacert" in your php.ini or pass the "cacert" parameter in the DSN to use SSL. Alternatively, you can use amqp:// to use without SSL.');
}
return new self($amqpOptions, $exchangeOptions, $queuesOptions, $amqpFactory);
}
private static function validateOptions(array $options): void
{
if (0 < \count($invalidOptions = array_diff(array_keys($options), self::AVAILABLE_OPTIONS))) {
trigger_deprecation('symfony/messenger', '5.1', 'Invalid option(s) "%s" passed to the AMQP Messenger transport. Passing invalid options is deprecated.', implode('", "', $invalidOptions));
}
if (isset($options['prefetch_count'])) {
trigger_deprecation('symfony/messenger', '5.3', 'The "prefetch_count" option passed to the AMQP Messenger transport has no effect and should not be used.');
}
if (\is_array($options['queues'] ?? false)) {
foreach ($options['queues'] as $queue) {
if (!\is_array($queue)) {
continue;
}
if (0 < \count($invalidQueueOptions = array_diff(array_keys($queue), self::AVAILABLE_QUEUE_OPTIONS))) {
trigger_deprecation('symfony/messenger', '5.1', 'Invalid queue option(s) "%s" passed to the AMQP Messenger transport. Passing invalid queue options is deprecated.', implode('", "', $invalidQueueOptions));
}
}
}
if (\is_array($options['exchange'] ?? false)
&& 0 < \count($invalidExchangeOptions = array_diff(array_keys($options['exchange']), self::AVAILABLE_EXCHANGE_OPTIONS))) {
trigger_deprecation('symfony/messenger', '5.1', 'Invalid exchange option(s) "%s" passed to the AMQP Messenger transport. Passing invalid exchange options is deprecated.', implode('", "', $invalidExchangeOptions));
}
}
private static function normalizeQueueArguments(array $arguments): array
{
foreach (self::ARGUMENTS_AS_INTEGER as $key) {
if (!\array_key_exists($key, $arguments)) {
continue;
}
if (!is_numeric($arguments[$key])) {
throw new InvalidArgumentException(sprintf('Integer expected for queue argument "%s", "%s" given.', $key, get_debug_type($arguments[$key])));
}
$arguments[$key] = (int) $arguments[$key];
}
return $arguments;
}
private static function hasCaCertConfigured(array $amqpOptions): bool
{
return (isset($amqpOptions['cacert']) && '' !== $amqpOptions['cacert']) || '' !== \ini_get('amqp.cacert');
}
/**
* @throws \AMQPException
*/
public function publish(string $body, array $headers = [], int $delayInMs = 0, ?AmqpStamp $amqpStamp = null): void
{
$this->clearWhenDisconnected();
if ($this->autoSetupExchange) {
$this->setupExchangeAndQueues(); // also setup normal exchange for delayed messages so delay queue can DLX messages to it
}
$this->withConnectionExceptionRetry(function () use ($body, $headers, $delayInMs, $amqpStamp) {
if (0 !== $delayInMs) {
$this->publishWithDelay($body, $headers, $delayInMs, $amqpStamp);
return;
}
$this->publishOnExchange(
$this->exchange(),
$body,
$this->getRoutingKeyForMessage($amqpStamp),
$headers,
$amqpStamp
);
});
}
/**
* Returns an approximate count of the messages in defined queues.
*/
public function countMessagesInQueues(): int
{
return array_sum(array_map(function ($queueName) {
return $this->queue($queueName)->declareQueue();
}, $this->getQueueNames()));
}
/**
* @throws \AMQPException
*/
private function publishWithDelay(string $body, array $headers, int $delay, ?AmqpStamp $amqpStamp = null)
{
$routingKey = $this->getRoutingKeyForMessage($amqpStamp);
$isRetryAttempt = $amqpStamp ? $amqpStamp->isRetryAttempt() : false;
$this->setupDelay($delay, $routingKey, $isRetryAttempt);
$this->publishOnExchange(
$this->getDelayExchange(),
$body,
$this->getRoutingKeyForDelay($delay, $routingKey, $isRetryAttempt),
$headers,
$amqpStamp
);
}
private function publishOnExchange(\AMQPExchange $exchange, string $body, ?string $routingKey = null, array $headers = [], ?AmqpStamp $amqpStamp = null)
{
$attributes = $amqpStamp ? $amqpStamp->getAttributes() : [];
$attributes['headers'] = array_merge($attributes['headers'] ?? [], $headers);
$attributes['delivery_mode'] = $attributes['delivery_mode'] ?? 2;
$attributes['timestamp'] = $attributes['timestamp'] ?? time();
$this->lastActivityTime = time();
$exchange->publish(
$body,
$routingKey,
$amqpStamp ? $amqpStamp->getFlags() : \AMQP_NOPARAM,
$attributes
);
if ('' !== ($this->connectionOptions['confirm_timeout'] ?? '')) {
$this->channel()->waitForConfirm((float) $this->connectionOptions['confirm_timeout']);
}
}
private function setupDelay(int $delay, ?string $routingKey, bool $isRetryAttempt)
{
if ($this->autoSetupDelayExchange) {
$this->setupDelayExchange();
}
$queue = $this->createDelayQueue($delay, $routingKey, $isRetryAttempt);
$queue->declareQueue(); // the delay queue always need to be declared because the name is dynamic and cannot be declared in advance
$queue->bind($this->connectionOptions['delay']['exchange_name'], $this->getRoutingKeyForDelay($delay, $routingKey, $isRetryAttempt));
}
private function getDelayExchange(): \AMQPExchange
{
if (null === $this->amqpDelayExchange) {
$this->amqpDelayExchange = $this->amqpFactory->createExchange($this->channel());
$this->amqpDelayExchange->setName($this->connectionOptions['delay']['exchange_name']);
$this->amqpDelayExchange->setType(\AMQP_EX_TYPE_DIRECT);
$this->amqpDelayExchange->setFlags(\AMQP_DURABLE);
}
return $this->amqpDelayExchange;
}
/**
* Creates a delay queue that will delay for a certain amount of time.
*
* This works by setting message TTL for the delay and pointing
* the dead letter exchange to the original exchange. The result
* is that after the TTL, the message is sent to the dead-letter-exchange,
* which is the original exchange, resulting on it being put back into
* the original queue.
*/
private function createDelayQueue(int $delay, ?string $routingKey, bool $isRetryAttempt): \AMQPQueue
{
$queue = $this->amqpFactory->createQueue($this->channel());
$queue->setName($this->getRoutingKeyForDelay($delay, $routingKey, $isRetryAttempt));
$queue->setFlags(\AMQP_DURABLE);
$queue->setArguments([
'x-message-ttl' => $delay,
// delete the delay queue 10 seconds after the message expires
// publishing another message redeclares the queue which renews the lease
'x-expires' => $delay + 10000,
// message should be broadcast to all consumers during delay, but to only one queue during retry
// empty name is default direct exchange
'x-dead-letter-exchange' => $isRetryAttempt ? '' : $this->exchangeOptions['name'],
// after being released from to DLX, make sure the original routing key will be used
// we must use an empty string instead of null for the argument to be picked up
'x-dead-letter-routing-key' => $routingKey ?? '',
]);
return $queue;
}
private function getRoutingKeyForDelay(int $delay, ?string $finalRoutingKey, bool $isRetryAttempt): string
{
$action = $isRetryAttempt ? '_retry' : '_delay';
return str_replace(
['%delay%', '%exchange_name%', '%routing_key%'],
[$delay, $this->exchangeOptions['name'], $finalRoutingKey ?? ''],
$this->connectionOptions['delay']['queue_name_pattern']
).$action;
}
/**
* Gets a message from the specified queue.
*
* @throws \AMQPException
*/
public function get(string $queueName): ?\AMQPEnvelope
{
$this->clearWhenDisconnected();
if ($this->autoSetupExchange) {
$this->setupExchangeAndQueues();
}
if (false !== $message = $this->queue($queueName)->get()) {
return $message;
}
return null;
}
public function ack(\AMQPEnvelope $message, string $queueName): bool
{
return $this->queue($queueName)->ack($message->getDeliveryTag()) ?? true;
}
public function nack(\AMQPEnvelope $message, string $queueName, int $flags = \AMQP_NOPARAM): bool
{
return $this->queue($queueName)->nack($message->getDeliveryTag(), $flags) ?? true;
}
public function setup(): void
{
$this->setupExchangeAndQueues();
$this->setupDelayExchange();
}
private function setupExchangeAndQueues(): void
{
$this->exchange()->declareExchange();
foreach ($this->queuesOptions as $queueName => $queueConfig) {
$this->queue($queueName)->declareQueue();
foreach ($queueConfig['binding_keys'] ?? [null] as $bindingKey) {
$this->queue($queueName)->bind($this->exchangeOptions['name'], $bindingKey, $queueConfig['binding_arguments'] ?? []);
}
}
$this->autoSetupExchange = false;
}
private function setupDelayExchange(): void
{
$this->getDelayExchange()->declareExchange();
$this->autoSetupDelayExchange = false;
}
/**
* @return string[]
*/
public function getQueueNames(): array
{
return array_keys($this->queuesOptions);
}
public function channel(): \AMQPChannel
{
if (null === $this->amqpChannel) {
$connection = $this->amqpFactory->createConnection($this->connectionOptions);
$connectMethod = 'true' === ($this->connectionOptions['persistent'] ?? 'false') ? 'pconnect' : 'connect';
try {
$connection->{$connectMethod}();
} catch (\AMQPConnectionException $e) {
throw new \AMQPException('Could not connect to the AMQP server. Please verify the provided DSN.', 0, $e);
}
$this->amqpChannel = $this->amqpFactory->createChannel($connection);
if ('' !== ($this->connectionOptions['confirm_timeout'] ?? '')) {
$this->amqpChannel->confirmSelect();
$this->amqpChannel->setConfirmCallback(
static function (): bool {
return false;
},
static function () {
throw new TransportException('Message publication failed due to a negative acknowledgment (nack) from the broker.');
}
);
}
$this->lastActivityTime = time();
} elseif (0 < ($this->connectionOptions['heartbeat'] ?? 0) && time() > $this->lastActivityTime + 2 * $this->connectionOptions['heartbeat']) {
$disconnectMethod = 'true' === ($this->connectionOptions['persistent'] ?? 'false') ? 'pdisconnect' : 'disconnect';
$this->amqpChannel->getConnection()->{$disconnectMethod}();
}
return $this->amqpChannel;
}
public function queue(string $queueName): \AMQPQueue
{
if (!isset($this->amqpQueues[$queueName])) {
$queueConfig = $this->queuesOptions[$queueName] ?? [];
$amqpQueue = $this->amqpFactory->createQueue($this->channel());
$amqpQueue->setName($queueName);
$amqpQueue->setFlags($queueConfig['flags'] ?? \AMQP_DURABLE);
if (isset($queueConfig['arguments'])) {
$amqpQueue->setArguments($queueConfig['arguments']);
}
$this->amqpQueues[$queueName] = $amqpQueue;
}
return $this->amqpQueues[$queueName];
}
public function exchange(): \AMQPExchange
{
if (null === $this->amqpExchange) {
$this->amqpExchange = $this->amqpFactory->createExchange($this->channel());
$this->amqpExchange->setName($this->exchangeOptions['name']);
$this->amqpExchange->setType($this->exchangeOptions['type'] ?? \AMQP_EX_TYPE_FANOUT);
$this->amqpExchange->setFlags($this->exchangeOptions['flags'] ?? \AMQP_DURABLE);
if (isset($this->exchangeOptions['arguments'])) {
$this->amqpExchange->setArguments($this->exchangeOptions['arguments']);
}
}
return $this->amqpExchange;
}
private function clearWhenDisconnected(): void
{
if (!$this->channel()->isConnected()) {
$this->clear();
}
}
private function clear(): void
{
$this->amqpChannel = null;
$this->amqpQueues = [];
$this->amqpExchange = null;
$this->amqpDelayExchange = null;
}
private function getDefaultPublishRoutingKey(): ?string
{
return $this->exchangeOptions['default_publish_routing_key'] ?? null;
}
public function purgeQueues()
{
foreach ($this->getQueueNames() as $queueName) {
$this->queue($queueName)->purge();
}
}
private function getRoutingKeyForMessage(?AmqpStamp $amqpStamp): ?string
{
return (null !== $amqpStamp ? $amqpStamp->getRoutingKey() : null) ?? $this->getDefaultPublishRoutingKey();
}
private function withConnectionExceptionRetry(callable $callable): void
{
$maxRetries = 3;
$retries = 0;
retry:
try {
$callable();
} catch (\AMQPConnectionException $e) {
if (++$retries <= $maxRetries) {
$this->clear();
goto retry;
}
throw $e;
}
}
}
if (!class_exists(\Symfony\Component\Messenger\Transport\AmqpExt\Connection::class, false)) {
class_alias(Connection::class, \Symfony\Component\Messenger\Transport\AmqpExt\Connection::class);
}
@@ -1,36 +0,0 @@
{
"name": "symfony/amqp-messenger",
"type": "symfony-messenger-bridge",
"description": "Symfony AMQP extension Messenger Bridge",
"keywords": [],
"homepage": "https://symfony.com",
"license": "MIT",
"authors": [
{
"name": "Fabien Potencier",
"email": "fabien@symfony.com"
},
{
"name": "Symfony Community",
"homepage": "https://symfony.com/contributors"
}
],
"require": {
"php": ">=7.2.5",
"symfony/deprecation-contracts": "^2.1|^3",
"symfony/messenger": "^5.3|^6.0"
},
"require-dev": {
"symfony/event-dispatcher": "^4.4|^5.0|^6.0",
"symfony/process": "^4.4|^5.0|^6.0",
"symfony/property-access": "^4.4|^5.0|^6.0",
"symfony/serializer": "^4.4|^5.0|^6.0"
},
"autoload": {
"psr-4": { "Symfony\\Component\\Messenger\\Bridge\\Amqp\\": "" },
"exclude-from-classmap": [
"/Tests/"
]
},
"minimum-stability": "dev"
}