feat(demo): add story 1 — Sorano: Rock and Time

This commit is contained in:
2026-06-20 21:19:57 +02:00
parent 42ed59a6b3
commit 8f87155c1d
5508 changed files with 1595740 additions and 124 deletions
@@ -0,0 +1,298 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Command;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Completion\CompletionInput;
use Symfony\Component\Console\Completion\CompletionSuggestions;
use Symfony\Component\Console\Helper\Dumper;
use Symfony\Component\Console\Question\ChoiceQuestion;
use Symfony\Component\Console\Style\SymfonyStyle;
use Symfony\Component\DependencyInjection\ServiceLocator;
use Symfony\Component\ErrorHandler\Exception\FlattenException;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
use Symfony\Component\Messenger\Stamp\ErrorDetailsStamp;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp;
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Component\VarDumper\Caster\Caster;
use Symfony\Component\VarDumper\Caster\TraceStub;
use Symfony\Component\VarDumper\Cloner\ClonerInterface;
use Symfony\Component\VarDumper\Cloner\Stub;
use Symfony\Component\VarDumper\Cloner\VarCloner;
use Symfony\Contracts\Service\ServiceProviderInterface;
/**
* @author Ryan Weaver <ryan@symfonycasts.com>
*
* @internal
*/
abstract class AbstractFailedMessagesCommand extends Command
{
protected const DEFAULT_TRANSPORT_OPTION = 'choose';
protected $failureTransports;
private $globalFailureReceiverName;
/**
* @param ServiceProviderInterface $failureTransports
*/
public function __construct(?string $globalFailureReceiverName, $failureTransports)
{
$this->failureTransports = $failureTransports;
if (!$failureTransports instanceof ServiceProviderInterface) {
trigger_deprecation('symfony/messenger', '5.3', 'Passing a receiver as 2nd argument to "%s()" is deprecated, pass a service locator instead.', __METHOD__);
if (null === $globalFailureReceiverName) {
throw new InvalidArgumentException(sprintf('The argument "globalFailureReceiver" from method "%s()" must be not null if 2nd argument is not a ServiceLocator.', __METHOD__));
}
$this->failureTransports = new ServiceLocator([$globalFailureReceiverName => static function () use ($failureTransports) { return $failureTransports; }]);
}
$this->globalFailureReceiverName = $globalFailureReceiverName;
parent::__construct();
}
protected function getReceiverName(): string
{
trigger_deprecation('symfony/messenger', '5.3', 'The method "%s()" is deprecated, use getGlobalFailureReceiverName() instead.', __METHOD__);
return $this->globalFailureReceiverName;
}
protected function getGlobalFailureReceiverName(): ?string
{
return $this->globalFailureReceiverName;
}
/**
* @return mixed
*/
protected function getMessageId(Envelope $envelope)
{
/** @var TransportMessageIdStamp $stamp */
$stamp = $envelope->last(TransportMessageIdStamp::class);
return null !== $stamp ? $stamp->getId() : null;
}
protected function displaySingleMessage(Envelope $envelope, SymfonyStyle $io)
{
$io->title('Failed Message Details');
/** @var SentToFailureTransportStamp|null $sentToFailureTransportStamp */
$sentToFailureTransportStamp = $envelope->last(SentToFailureTransportStamp::class);
/** @var RedeliveryStamp|null $lastRedeliveryStamp */
$lastRedeliveryStamp = $envelope->last(RedeliveryStamp::class);
/** @var ErrorDetailsStamp|null $lastErrorDetailsStamp */
$lastErrorDetailsStamp = $envelope->last(ErrorDetailsStamp::class);
$lastRedeliveryStampWithException = $this->getLastRedeliveryStampWithException($envelope, true);
$rows = [
['Class', \get_class($envelope->getMessage())],
];
if (null !== $id = $this->getMessageId($envelope)) {
$rows[] = ['Message Id', $id];
}
if (null === $sentToFailureTransportStamp) {
$io->warning('Message does not appear to have been sent to this transport after failing');
} else {
$failedAt = '';
$errorMessage = '';
$errorCode = '';
$errorClass = '(unknown)';
if (null !== $lastRedeliveryStamp) {
$failedAt = $lastRedeliveryStamp->getRedeliveredAt()->format('Y-m-d H:i:s');
}
if (null !== $lastErrorDetailsStamp) {
$errorMessage = $lastErrorDetailsStamp->getExceptionMessage();
$errorCode = $lastErrorDetailsStamp->getExceptionCode();
$errorClass = $lastErrorDetailsStamp->getExceptionClass();
} elseif (null !== $lastRedeliveryStampWithException) {
// Try reading the errorMessage for messages that are still in the queue without the new ErrorDetailStamps.
$errorMessage = $lastRedeliveryStampWithException->getExceptionMessage();
if (null !== $lastRedeliveryStampWithException->getFlattenException()) {
$errorClass = $lastRedeliveryStampWithException->getFlattenException()->getClass();
}
}
$rows = array_merge($rows, [
['Failed at', $failedAt],
['Error', $errorMessage],
['Error Code', $errorCode],
['Error Class', $errorClass],
['Transport', $sentToFailureTransportStamp->getOriginalReceiverName()],
]);
}
$io->table([], $rows);
/** @var RedeliveryStamp[] $redeliveryStamps */
$redeliveryStamps = $envelope->all(RedeliveryStamp::class);
$io->writeln(' Message history:');
foreach ($redeliveryStamps as $redeliveryStamp) {
$io->writeln(sprintf(' * Message failed at <info>%s</info> and was redelivered', $redeliveryStamp->getRedeliveredAt()->format('Y-m-d H:i:s')));
}
$io->newLine();
if ($io->isVeryVerbose()) {
$io->title('Message:');
$dump = new Dumper($io, null, $this->createCloner());
$io->writeln($dump($envelope->getMessage()));
$io->title('Exception:');
$flattenException = null;
if (null !== $lastErrorDetailsStamp) {
$flattenException = $lastErrorDetailsStamp->getFlattenException();
} elseif (null !== $lastRedeliveryStampWithException) {
$flattenException = $lastRedeliveryStampWithException->getFlattenException();
}
$io->writeln(null === $flattenException ? '(no data)' : $dump($flattenException));
} else {
$io->writeln(' Re-run command with <info>-vv</info> to see more message & error details.');
}
}
protected function printPendingMessagesMessage(ReceiverInterface $receiver, SymfonyStyle $io)
{
if ($receiver instanceof MessageCountAwareInterface) {
if (1 === $receiver->getMessageCount()) {
$io->writeln('There is <comment>1</comment> message pending in the failure transport.');
} else {
$io->writeln(sprintf('There are <comment>%d</comment> messages pending in the failure transport.', $receiver->getMessageCount()));
}
}
}
/**
* @param string|null $name
*/
protected function getReceiver(/* ?string $name = null */): ReceiverInterface
{
if (1 > \func_num_args() && __CLASS__ !== static::class && __CLASS__ !== (new \ReflectionMethod($this, __FUNCTION__))->getDeclaringClass()->getName() && !$this instanceof \PHPUnit\Framework\MockObject\MockObject && !$this instanceof \Prophecy\Prophecy\ProphecySubjectInterface && !$this instanceof \Mockery\MockInterface) {
trigger_deprecation('symfony/messenger', '5.3', 'The "%s()" method will have a new "string $name" argument in version 6.0, not defining it is deprecated.', __METHOD__);
}
$name = \func_num_args() > 0 ? func_get_arg(0) : null;
if (null === $name = $name ?? $this->globalFailureReceiverName) {
throw new InvalidArgumentException(sprintf('No default failure transport is defined. Available transports are: "%s".', implode('", "', array_keys($this->failureTransports->getProvidedServices()))));
}
if (!$this->failureTransports->has($name)) {
throw new InvalidArgumentException(sprintf('The "%s" failure transport was not found. Available transports are: "%s".', $name, implode('", "', array_keys($this->failureTransports->getProvidedServices()))));
}
return $this->failureTransports->get($name);
}
protected function getLastRedeliveryStampWithException(Envelope $envelope): ?RedeliveryStamp
{
if (null === \func_get_args()[1]) {
trigger_deprecation('symfony/messenger', '5.2', sprintf('Using the "getLastRedeliveryStampWithException" method in the "%s" class is deprecated, use the "Envelope::last(%s)" instead.', self::class, ErrorDetailsStamp::class));
}
// Use ErrorDetailsStamp instead if it is available
if (null !== $envelope->last(ErrorDetailsStamp::class)) {
return null;
}
/** @var RedeliveryStamp $stamp */
foreach (array_reverse($envelope->all(RedeliveryStamp::class)) as $stamp) {
if (null !== $stamp->getExceptionMessage()) {
return $stamp;
}
}
return null;
}
private function createCloner(): ?ClonerInterface
{
if (!class_exists(VarCloner::class)) {
return null;
}
$cloner = new VarCloner();
$cloner->addCasters([FlattenException::class => function (FlattenException $flattenException, array $a, Stub $stub): array {
$stub->class = $flattenException->getClass();
return [
Caster::PREFIX_VIRTUAL.'message' => $flattenException->getMessage(),
Caster::PREFIX_VIRTUAL.'code' => $flattenException->getCode(),
Caster::PREFIX_VIRTUAL.'file' => $flattenException->getFile(),
Caster::PREFIX_VIRTUAL.'line' => $flattenException->getLine(),
Caster::PREFIX_VIRTUAL.'trace' => new TraceStub($flattenException->getTrace()),
];
}]);
return $cloner;
}
protected function printWarningAvailableFailureTransports(SymfonyStyle $io, ?string $failureTransportName): void
{
$failureTransports = array_keys($this->failureTransports->getProvidedServices());
$failureTransportsCount = \count($failureTransports);
if ($failureTransportsCount > 1) {
$io->writeln([
sprintf('> Loading messages from the <comment>global</comment> failure transport <comment>%s</comment>.', $failureTransportName),
'> To use a different failure transport, pass <comment>--transport=</comment>.',
sprintf('> Available failure transports are: <comment>%s</comment>', implode(', ', $failureTransports)),
"\n",
]);
}
}
protected function interactiveChooseFailureTransport(SymfonyStyle $io)
{
$failedTransports = array_keys($this->failureTransports->getProvidedServices());
$question = new ChoiceQuestion('Select failed transport:', $failedTransports, 0);
$question->setMultiselect(false);
return $io->askQuestion($question);
}
public function complete(CompletionInput $input, CompletionSuggestions $suggestions): void
{
if ($input->mustSuggestOptionValuesFor('transport')) {
$suggestions->suggestValues(array_keys($this->failureTransports->getProvidedServices()));
return;
}
if ($input->mustSuggestArgumentValuesFor('id')) {
$transport = $input->getOption('transport');
$transport = self::DEFAULT_TRANSPORT_OPTION === $transport ? $this->getGlobalFailureReceiverName() : $transport;
$receiver = $this->getReceiver($transport);
if (!$receiver instanceof ListableReceiverInterface) {
return;
}
$ids = [];
foreach ($receiver->all(50) as $envelope) {
$ids[] = $this->getMessageId($envelope);
}
$suggestions->suggestValues($ids);
return;
}
}
}
@@ -0,0 +1,276 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Command;
use Psr\Container\ContainerInterface;
use Psr\Log\LoggerInterface;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Completion\CompletionInput;
use Symfony\Component\Console\Completion\CompletionSuggestions;
use Symfony\Component\Console\Exception\InvalidOptionException;
use Symfony\Component\Console\Exception\RuntimeException;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\ConsoleOutputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Question\ChoiceQuestion;
use Symfony\Component\Console\Style\SymfonyStyle;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
use Symfony\Component\Messenger\EventListener\ResetServicesListener;
use Symfony\Component\Messenger\EventListener\StopWorkerOnFailureLimitListener;
use Symfony\Component\Messenger\EventListener\StopWorkerOnMemoryLimitListener;
use Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener;
use Symfony\Component\Messenger\EventListener\StopWorkerOnTimeLimitListener;
use Symfony\Component\Messenger\RoutableMessageBus;
use Symfony\Component\Messenger\Worker;
/**
* @author Samuel Roze <samuel.roze@gmail.com>
*/
class ConsumeMessagesCommand extends Command
{
protected static $defaultName = 'messenger:consume';
protected static $defaultDescription = 'Consume messages';
private $routableBus;
private $receiverLocator;
private $eventDispatcher;
private $logger;
private $receiverNames;
private $resetServicesListener;
private $busIds;
public function __construct(RoutableMessageBus $routableBus, ContainerInterface $receiverLocator, EventDispatcherInterface $eventDispatcher, ?LoggerInterface $logger = null, array $receiverNames = [], ?ResetServicesListener $resetServicesListener = null, array $busIds = [])
{
$this->routableBus = $routableBus;
$this->receiverLocator = $receiverLocator;
$this->eventDispatcher = $eventDispatcher;
$this->logger = $logger;
$this->receiverNames = $receiverNames;
$this->resetServicesListener = $resetServicesListener;
$this->busIds = $busIds;
parent::__construct();
}
/**
* {@inheritdoc}
*/
protected function configure(): void
{
$defaultReceiverName = 1 === \count($this->receiverNames) ? current($this->receiverNames) : null;
$this
->setDefinition([
new InputArgument('receivers', InputArgument::IS_ARRAY, 'Names of the receivers/transports to consume in order of priority', $defaultReceiverName ? [$defaultReceiverName] : []),
new InputOption('limit', 'l', InputOption::VALUE_REQUIRED, 'Limit the number of received messages'),
new InputOption('failure-limit', 'f', InputOption::VALUE_REQUIRED, 'The number of failed messages the worker can consume'),
new InputOption('memory-limit', 'm', InputOption::VALUE_REQUIRED, 'The memory limit the worker can consume'),
new InputOption('time-limit', 't', InputOption::VALUE_REQUIRED, 'The time limit in seconds the worker can handle new messages'),
new InputOption('sleep', null, InputOption::VALUE_REQUIRED, 'Seconds to sleep before asking for new messages after no messages were found', 1),
new InputOption('bus', 'b', InputOption::VALUE_REQUIRED, 'Name of the bus to which received messages should be dispatched (if not passed, bus is determined automatically)'),
new InputOption('queues', null, InputOption::VALUE_REQUIRED | InputOption::VALUE_IS_ARRAY, 'Limit receivers to only consume from the specified queues'),
new InputOption('no-reset', null, InputOption::VALUE_NONE, 'Do not reset container services after each message'),
])
->setDescription(self::$defaultDescription)
->setHelp(<<<'EOF'
The <info>%command.name%</info> command consumes messages and dispatches them to the message bus.
<info>php %command.full_name% <receiver-name></info>
To receive from multiple transports, pass each name:
<info>php %command.full_name% receiver1 receiver2</info>
Use the --limit option to limit the number of messages received:
<info>php %command.full_name% <receiver-name> --limit=10</info>
Use the --failure-limit option to stop the worker when the given number of failed messages is reached:
<info>php %command.full_name% <receiver-name> --failure-limit=2</info>
Use the --memory-limit option to stop the worker if it exceeds a given memory usage limit. You can use shorthand byte values [K, M or G]:
<info>php %command.full_name% <receiver-name> --memory-limit=128M</info>
Use the --time-limit option to stop the worker when the given time limit (in seconds) is reached.
If a message is being handled, the worker will stop after the processing is finished:
<info>php %command.full_name% <receiver-name> --time-limit=3600</info>
Use the --bus option to specify the message bus to dispatch received messages
to instead of trying to determine it automatically. This is required if the
messages didn't originate from Messenger:
<info>php %command.full_name% <receiver-name> --bus=event_bus</info>
Use the --queues option to limit a receiver to only certain queues (only supported by some receivers):
<info>php %command.full_name% <receiver-name> --queues=fasttrack</info>
Use the --no-reset option to prevent services resetting after each message (may lead to leaking services' state between messages):
<info>php %command.full_name% <receiver-name> --no-reset</info>
EOF
)
;
}
/**
* {@inheritdoc}
*/
protected function interact(InputInterface $input, OutputInterface $output)
{
$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
if ($this->receiverNames && !$input->getArgument('receivers')) {
$io->block('Which transports/receivers do you want to consume?', null, 'fg=white;bg=blue', ' ', true);
$io->writeln('Choose which receivers you want to consume messages from in order of priority.');
if (\count($this->receiverNames) > 1) {
$io->writeln(sprintf('Hint: to consume from multiple, use a list of their names, e.g. <comment>%s</comment>', implode(', ', $this->receiverNames)));
}
$question = new ChoiceQuestion('Select receivers to consume:', $this->receiverNames, 0);
$question->setMultiselect(true);
$input->setArgument('receivers', $io->askQuestion($question));
}
if (!$input->getArgument('receivers')) {
throw new RuntimeException('Please pass at least one receiver.');
}
}
/**
* {@inheritdoc}
*/
protected function execute(InputInterface $input, OutputInterface $output)
{
$receivers = [];
foreach ($receiverNames = $input->getArgument('receivers') as $receiverName) {
if (!$this->receiverLocator->has($receiverName)) {
$message = sprintf('The receiver "%s" does not exist.', $receiverName);
if ($this->receiverNames) {
$message .= sprintf(' Valid receivers are: %s.', implode(', ', $this->receiverNames));
}
throw new RuntimeException($message);
}
$receivers[$receiverName] = $this->receiverLocator->get($receiverName);
}
if (null !== $this->resetServicesListener && !$input->getOption('no-reset')) {
$this->eventDispatcher->addSubscriber($this->resetServicesListener);
}
$stopsWhen = [];
if (null !== $limit = $input->getOption('limit')) {
if (!is_numeric($limit) || 0 >= $limit) {
throw new InvalidOptionException(sprintf('Option "limit" must be a positive integer, "%s" passed.', $limit));
}
$stopsWhen[] = "processed {$limit} messages";
$this->eventDispatcher->addSubscriber(new StopWorkerOnMessageLimitListener($limit, $this->logger));
}
if ($failureLimit = $input->getOption('failure-limit')) {
$stopsWhen[] = "reached {$failureLimit} failed messages";
$this->eventDispatcher->addSubscriber(new StopWorkerOnFailureLimitListener($failureLimit, $this->logger));
}
if ($memoryLimit = $input->getOption('memory-limit')) {
$stopsWhen[] = "exceeded {$memoryLimit} of memory";
$this->eventDispatcher->addSubscriber(new StopWorkerOnMemoryLimitListener($this->convertToBytes($memoryLimit), $this->logger));
}
if (null !== $timeLimit = $input->getOption('time-limit')) {
if (!is_numeric($timeLimit) || 0 >= $timeLimit) {
throw new InvalidOptionException(sprintf('Option "time-limit" must be a positive integer, "%s" passed.', $timeLimit));
}
$stopsWhen[] = "been running for {$timeLimit}s";
$this->eventDispatcher->addSubscriber(new StopWorkerOnTimeLimitListener($timeLimit, $this->logger));
}
$stopsWhen[] = 'received a stop signal via the messenger:stop-workers command';
$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
$io->success(sprintf('Consuming messages from transport%s "%s".', \count($receivers) > 1 ? 's' : '', implode(', ', $receiverNames)));
if ($stopsWhen) {
$last = array_pop($stopsWhen);
$stopsWhen = ($stopsWhen ? implode(', ', $stopsWhen).' or ' : '').$last;
$io->comment("The worker will automatically exit once it has {$stopsWhen}.");
}
$io->comment('Quit the worker with CONTROL-C.');
if (OutputInterface::VERBOSITY_VERBOSE > $output->getVerbosity()) {
$io->comment('Re-run the command with a -vv option to see logs about consumed messages.');
}
$bus = $input->getOption('bus') ? $this->routableBus->getMessageBus($input->getOption('bus')) : $this->routableBus;
$worker = new Worker($receivers, $bus, $this->eventDispatcher, $this->logger);
$options = [
'sleep' => $input->getOption('sleep') * 1000000,
];
if ($queues = $input->getOption('queues')) {
$options['queues'] = $queues;
}
$worker->run($options);
return 0;
}
public function complete(CompletionInput $input, CompletionSuggestions $suggestions): void
{
if ($input->mustSuggestArgumentValuesFor('receivers')) {
$suggestions->suggestValues(array_diff($this->receiverNames, array_diff($input->getArgument('receivers'), [$input->getCompletionValue()])));
return;
}
if ($input->mustSuggestOptionValuesFor('bus')) {
$suggestions->suggestValues($this->busIds);
}
}
private function convertToBytes(string $memoryLimit): int
{
$memoryLimit = strtolower($memoryLimit);
$max = ltrim($memoryLimit, '+');
if (str_starts_with($max, '0x')) {
$max = \intval($max, 16);
} elseif (str_starts_with($max, '0')) {
$max = \intval($max, 8);
} else {
$max = (int) $max;
}
switch (substr(rtrim($memoryLimit, 'b'), -1)) {
case 't': $max *= 1024;
// no break
case 'g': $max *= 1024;
// no break
case 'm': $max *= 1024;
// no break
case 'k': $max *= 1024;
}
return $max;
}
}
@@ -0,0 +1,150 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Command;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Completion\CompletionInput;
use Symfony\Component\Console\Completion\CompletionSuggestions;
use Symfony\Component\Console\Exception\RuntimeException;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Style\SymfonyStyle;
/**
* A console command to debug Messenger information.
*
* @author Roland Franssen <franssen.roland@gmail.com>
*/
class DebugCommand extends Command
{
protected static $defaultName = 'debug:messenger';
protected static $defaultDescription = 'List messages you can dispatch using the message buses';
private $mapping;
public function __construct(array $mapping)
{
$this->mapping = $mapping;
parent::__construct();
}
/**
* {@inheritdoc}
*/
protected function configure()
{
$this
->addArgument('bus', InputArgument::OPTIONAL, sprintf('The bus id (one of "%s")', implode('", "', array_keys($this->mapping))))
->setDescription(self::$defaultDescription)
->setHelp(<<<'EOF'
The <info>%command.name%</info> command displays all messages that can be
dispatched using the message buses:
<info>php %command.full_name%</info>
Or for a specific bus only:
<info>php %command.full_name% command_bus</info>
EOF
)
;
}
/**
* {@inheritdoc}
*/
protected function execute(InputInterface $input, OutputInterface $output)
{
$io = new SymfonyStyle($input, $output);
$io->title('Messenger');
$mapping = $this->mapping;
if ($bus = $input->getArgument('bus')) {
if (!isset($mapping[$bus])) {
throw new RuntimeException(sprintf('Bus "%s" does not exist. Known buses are "%s".', $bus, implode('", "', array_keys($this->mapping))));
}
$mapping = [$bus => $mapping[$bus]];
}
foreach ($mapping as $bus => $handlersByMessage) {
$io->section($bus);
$tableRows = [];
foreach ($handlersByMessage as $message => $handlers) {
if ($description = self::getClassDescription($message)) {
$tableRows[] = [sprintf('<comment>%s</>', $description)];
}
$tableRows[] = [sprintf('<fg=cyan>%s</fg=cyan>', $message)];
foreach ($handlers as $handler) {
$tableRows[] = [
sprintf(' handled by <info>%s</>', $handler[0]).$this->formatConditions($handler[1]),
];
if ($handlerDescription = self::getClassDescription($handler[0])) {
$tableRows[] = [sprintf(' <comment>%s</>', $handlerDescription)];
}
}
$tableRows[] = [''];
}
if ($tableRows) {
$io->text('The following messages can be dispatched:');
$io->newLine();
$io->table([], $tableRows);
} else {
$io->warning(sprintf('No handled message found in bus "%s".', $bus));
}
}
return 0;
}
private function formatConditions(array $options): string
{
if (!$options) {
return '';
}
$optionsMapping = [];
foreach ($options as $key => $value) {
$optionsMapping[] = $key.'='.$value;
}
return ' (when '.implode(', ', $optionsMapping).')';
}
private static function getClassDescription(string $class): string
{
try {
$r = new \ReflectionClass($class);
if ($docComment = $r->getDocComment()) {
$docComment = preg_split('#\n\s*\*\s*[\n@]#', substr($docComment, 3, -2), 2)[0];
return trim(preg_replace('#\s*\n\s*\*\s*#', ' ', $docComment));
}
} catch (\ReflectionException $e) {
}
return '';
}
public function complete(CompletionInput $input, CompletionSuggestions $suggestions): void
{
if ($input->mustSuggestArgumentValuesFor('bus')) {
$suggestions->suggestValues(array_keys($this->mapping));
}
}
}
@@ -0,0 +1,104 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Command;
use Symfony\Component\Console\Exception\RuntimeException;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\ConsoleOutputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Style\SymfonyStyle;
use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
/**
* @author Ryan Weaver <ryan@symfonycasts.com>
*/
class FailedMessagesRemoveCommand extends AbstractFailedMessagesCommand
{
protected static $defaultName = 'messenger:failed:remove';
protected static $defaultDescription = 'Remove given messages from the failure transport';
/**
* {@inheritdoc}
*/
protected function configure(): void
{
$this
->setDefinition([
new InputArgument('id', InputArgument::REQUIRED | InputArgument::IS_ARRAY, 'Specific message id(s) to remove'),
new InputOption('force', null, InputOption::VALUE_NONE, 'Force the operation without confirmation'),
new InputOption('transport', null, InputOption::VALUE_OPTIONAL, 'Use a specific failure transport', self::DEFAULT_TRANSPORT_OPTION),
new InputOption('show-messages', null, InputOption::VALUE_NONE, 'Display messages before removing it (if multiple ids are given)'),
])
->setDescription(self::$defaultDescription)
->setHelp(<<<'EOF'
The <info>%command.name%</info> removes given messages that are pending in the failure transport.
<info>php %command.full_name% {id1} [{id2} ...]</info>
The specific ids can be found via the messenger:failed:show command.
EOF
)
;
}
/**
* {@inheritdoc}
*/
protected function execute(InputInterface $input, OutputInterface $output)
{
$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
$failureTransportName = $input->getOption('transport');
if (self::DEFAULT_TRANSPORT_OPTION === $failureTransportName) {
$failureTransportName = $this->getGlobalFailureReceiverName();
}
$receiver = $this->getReceiver($failureTransportName);
$shouldForce = $input->getOption('force');
$ids = (array) $input->getArgument('id');
$shouldDisplayMessages = $input->getOption('show-messages') || 1 === \count($ids);
$this->removeMessages($failureTransportName, $ids, $receiver, $io, $shouldForce, $shouldDisplayMessages);
return 0;
}
private function removeMessages(string $failureTransportName, array $ids, ReceiverInterface $receiver, SymfonyStyle $io, bool $shouldForce, bool $shouldDisplayMessages): void
{
if (!$receiver instanceof ListableReceiverInterface) {
throw new RuntimeException(sprintf('The "%s" receiver does not support removing specific messages.', $failureTransportName));
}
foreach ($ids as $id) {
$envelope = $receiver->find($id);
if (null === $envelope) {
$io->error(sprintf('The message with id "%s" was not found.', $id));
continue;
}
if ($shouldDisplayMessages) {
$this->displaySingleMessage($envelope, $io);
}
if ($shouldForce || $io->confirm('Do you want to permanently remove this message?', false)) {
$receiver->reject($envelope);
$io->success(sprintf('Message with id %s removed.', $id));
} else {
$io->note(sprintf('Message with id %s not removed.', $id));
}
}
}
}
@@ -0,0 +1,226 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Command;
use Psr\Log\LoggerInterface;
use Symfony\Component\Console\Exception\RuntimeException;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\ConsoleOutputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Style\SymfonyStyle;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
use Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener;
use Symfony\Component\Messenger\Exception\LogicException;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Component\Messenger\Transport\Receiver\SingleMessageReceiver;
use Symfony\Component\Messenger\Worker;
/**
* @author Ryan Weaver <ryan@symfonycasts.com>
*/
class FailedMessagesRetryCommand extends AbstractFailedMessagesCommand
{
protected static $defaultName = 'messenger:failed:retry';
protected static $defaultDescription = 'Retry one or more messages from the failure transport';
private $eventDispatcher;
private $messageBus;
private $logger;
public function __construct(?string $globalReceiverName, $failureTransports, MessageBusInterface $messageBus, EventDispatcherInterface $eventDispatcher, ?LoggerInterface $logger = null)
{
$this->eventDispatcher = $eventDispatcher;
$this->messageBus = $messageBus;
$this->logger = $logger;
parent::__construct($globalReceiverName, $failureTransports);
}
/**
* {@inheritdoc}
*/
protected function configure(): void
{
$this
->setDefinition([
new InputArgument('id', InputArgument::IS_ARRAY, 'Specific message id(s) to retry'),
new InputOption('force', null, InputOption::VALUE_NONE, 'Force action without confirmation'),
new InputOption('transport', null, InputOption::VALUE_OPTIONAL, 'Use a specific failure transport', self::DEFAULT_TRANSPORT_OPTION),
])
->setDescription(self::$defaultDescription)
->setHelp(<<<'EOF'
The <info>%command.name%</info> retries message in the failure transport.
<info>php %command.full_name%</info>
The command will interactively ask if each message should be retried
or discarded.
Some transports support retrying a specific message id, which comes
from the <info>messenger:failed:show</info> command.
<info>php %command.full_name% {id}</info>
Or pass multiple ids at once to process multiple messages:
<info>php %command.full_name% {id1} {id2} {id3}</info>
EOF
)
;
}
/**
* {@inheritdoc}
*/
protected function execute(InputInterface $input, OutputInterface $output)
{
$this->eventDispatcher->addSubscriber(new StopWorkerOnMessageLimitListener(1));
$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
$io->comment('Quit this command with CONTROL-C.');
if (!$output->isVeryVerbose()) {
$io->comment('Re-run the command with a -vv option to see logs about consumed messages.');
}
$failureTransportName = $input->getOption('transport');
if (self::DEFAULT_TRANSPORT_OPTION === $failureTransportName) {
$this->printWarningAvailableFailureTransports($io, $this->getGlobalFailureReceiverName());
}
if ('' === $failureTransportName || null === $failureTransportName) {
$failureTransportName = $this->interactiveChooseFailureTransport($io);
}
$failureTransportName = self::DEFAULT_TRANSPORT_OPTION === $failureTransportName ? $this->getGlobalFailureReceiverName() : $failureTransportName;
$receiver = $this->getReceiver($failureTransportName);
$this->printPendingMessagesMessage($receiver, $io);
$io->writeln(sprintf('To retry all the messages, run <comment>messenger:consume %s</comment>', $failureTransportName));
$shouldForce = $input->getOption('force');
$ids = $input->getArgument('id');
if (0 === \count($ids)) {
if (!$input->isInteractive()) {
throw new RuntimeException('Message id must be passed when in non-interactive mode.');
}
$this->runInteractive($failureTransportName, $io, $shouldForce);
return 0;
}
$this->retrySpecificIds($failureTransportName, $ids, $io, $shouldForce);
$io->success('All done!');
return 0;
}
private function runInteractive(string $failureTransportName, SymfonyStyle $io, bool $shouldForce)
{
$receiver = $this->failureTransports->get($failureTransportName);
$count = 0;
if ($receiver instanceof ListableReceiverInterface) {
// for listable receivers, find the messages one-by-one
// this avoids using get(), which for some less-robust
// transports (like Doctrine), will cause the message
// to be temporarily "acked", even if the user aborts
// handling the message
while (true) {
$ids = [];
foreach ($receiver->all(1) as $envelope) {
++$count;
$id = $this->getMessageId($envelope);
if (null === $id) {
throw new LogicException(sprintf('The "%s" receiver is able to list messages by id but the envelope is missing the TransportMessageIdStamp stamp.', $failureTransportName));
}
$ids[] = $id;
}
// break the loop if all messages are consumed
if (0 === \count($ids)) {
break;
}
$this->retrySpecificIds($failureTransportName, $ids, $io, $shouldForce);
}
} else {
// get() and ask messages one-by-one
$count = $this->runWorker($failureTransportName, $receiver, $io, $shouldForce);
}
// avoid success message if nothing was processed
if (1 <= $count) {
$io->success('All failed messages have been handled or removed!');
}
}
private function runWorker(string $failureTransportName, ReceiverInterface $receiver, SymfonyStyle $io, bool $shouldForce): int
{
$count = 0;
$listener = function (WorkerMessageReceivedEvent $messageReceivedEvent) use ($io, $receiver, $shouldForce, &$count) {
++$count;
$envelope = $messageReceivedEvent->getEnvelope();
$this->displaySingleMessage($envelope, $io);
$shouldHandle = $shouldForce || $io->confirm('Do you want to retry (yes) or delete this message (no)?');
if ($shouldHandle) {
return;
}
$messageReceivedEvent->shouldHandle(false);
$receiver->reject($envelope);
};
$this->eventDispatcher->addListener(WorkerMessageReceivedEvent::class, $listener);
$worker = new Worker(
[$failureTransportName => $receiver],
$this->messageBus,
$this->eventDispatcher,
$this->logger
);
try {
$worker->run();
} finally {
$this->eventDispatcher->removeListener(WorkerMessageReceivedEvent::class, $listener);
}
return $count;
}
private function retrySpecificIds(string $failureTransportName, array $ids, SymfonyStyle $io, bool $shouldForce)
{
$receiver = $this->getReceiver($failureTransportName);
if (!$receiver instanceof ListableReceiverInterface) {
throw new RuntimeException(sprintf('The "%s" receiver does not support retrying messages by id.', $failureTransportName));
}
foreach ($ids as $id) {
$envelope = $receiver->find($id);
if (null === $envelope) {
throw new RuntimeException(sprintf('The message "%s" was not found.', $id));
}
$singleReceiver = new SingleMessageReceiver($receiver, $envelope);
$this->runWorker($failureTransportName, $singleReceiver, $io, $shouldForce);
}
}
}
@@ -0,0 +1,153 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Command;
use Symfony\Component\Console\Exception\RuntimeException;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\ConsoleOutputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Style\SymfonyStyle;
use Symfony\Component\Messenger\Stamp\ErrorDetailsStamp;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
/**
* @author Ryan Weaver <ryan@symfonycasts.com>
*/
class FailedMessagesShowCommand extends AbstractFailedMessagesCommand
{
protected static $defaultName = 'messenger:failed:show';
protected static $defaultDescription = 'Show one or more messages from the failure transport';
/**
* {@inheritdoc}
*/
protected function configure(): void
{
$this
->setDefinition([
new InputArgument('id', InputArgument::OPTIONAL, 'Specific message id to show'),
new InputOption('max', null, InputOption::VALUE_REQUIRED, 'Maximum number of messages to list', 50),
new InputOption('transport', null, InputOption::VALUE_OPTIONAL, 'Use a specific failure transport', self::DEFAULT_TRANSPORT_OPTION),
])
->setDescription(self::$defaultDescription)
->setHelp(<<<'EOF'
The <info>%command.name%</info> shows message that are pending in the failure transport.
<info>php %command.full_name%</info>
Or look at a specific message by its id:
<info>php %command.full_name% {id}</info>
EOF
)
;
}
/**
* {@inheritdoc}
*/
protected function execute(InputInterface $input, OutputInterface $output)
{
$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
$failureTransportName = $input->getOption('transport');
if (self::DEFAULT_TRANSPORT_OPTION === $failureTransportName) {
$this->printWarningAvailableFailureTransports($io, $this->getGlobalFailureReceiverName());
}
if ('' === $failureTransportName || null === $failureTransportName) {
$failureTransportName = $this->interactiveChooseFailureTransport($io);
}
$failureTransportName = self::DEFAULT_TRANSPORT_OPTION === $failureTransportName ? $this->getGlobalFailureReceiverName() : $failureTransportName;
$receiver = $this->getReceiver($failureTransportName);
$this->printPendingMessagesMessage($receiver, $io);
if (!$receiver instanceof ListableReceiverInterface) {
throw new RuntimeException(sprintf('The "%s" receiver does not support listing or showing specific messages.', $failureTransportName));
}
if (null === $id = $input->getArgument('id')) {
$this->listMessages($failureTransportName, $io, $input->getOption('max'));
} else {
$this->showMessage($failureTransportName, $id, $io);
}
return 0;
}
private function listMessages(?string $failedTransportName, SymfonyStyle $io, int $max)
{
/** @var ListableReceiverInterface $receiver */
$receiver = $this->getReceiver($failedTransportName);
$envelopes = $receiver->all($max);
$rows = [];
foreach ($envelopes as $envelope) {
/** @var RedeliveryStamp|null $lastRedeliveryStamp */
$lastRedeliveryStamp = $envelope->last(RedeliveryStamp::class);
/** @var ErrorDetailsStamp|null $lastErrorDetailsStamp */
$lastErrorDetailsStamp = $envelope->last(ErrorDetailsStamp::class);
$lastRedeliveryStampWithException = $this->getLastRedeliveryStampWithException($envelope, true);
$errorMessage = '';
if (null !== $lastErrorDetailsStamp) {
$errorMessage = $lastErrorDetailsStamp->getExceptionMessage();
} elseif (null !== $lastRedeliveryStampWithException) {
// Try reading the errorMessage for messages that are still in the queue without the new ErrorDetailStamps.
$errorMessage = $lastRedeliveryStampWithException->getExceptionMessage();
}
$rows[] = [
$this->getMessageId($envelope),
\get_class($envelope->getMessage()),
null === $lastRedeliveryStamp ? '' : $lastRedeliveryStamp->getRedeliveredAt()->format('Y-m-d H:i:s'),
$errorMessage,
];
}
if (0 === \count($rows)) {
$io->success('No failed messages were found.');
return;
}
$io->table(['Id', 'Class', 'Failed at', 'Error'], $rows);
if (\count($rows) === $max) {
$io->comment(sprintf('Showing first %d messages.', $max));
}
$io->comment(sprintf('Run <comment>messenger:failed:show {id} --transport=%s -vv</comment> to see message details.', $failedTransportName));
}
private function showMessage(?string $failedTransportName, string $id, SymfonyStyle $io)
{
/** @var ListableReceiverInterface $receiver */
$receiver = $this->getReceiver($failedTransportName);
$envelope = $receiver->find($id);
if (null === $envelope) {
throw new RuntimeException(sprintf('The message "%s" was not found.', $id));
}
$this->displaySingleMessage($envelope, $io);
$io->writeln([
'',
sprintf(' Run <comment>messenger:failed:retry %s --transport=%s</comment> to retry this message.', $id, $failedTransportName),
sprintf(' Run <comment>messenger:failed:remove %s --transport=%s</comment> to delete it.', $id, $failedTransportName),
]);
}
}
@@ -0,0 +1,95 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Command;
use Psr\Container\ContainerInterface;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Completion\CompletionInput;
use Symfony\Component\Console\Completion\CompletionSuggestions;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Style\SymfonyStyle;
use Symfony\Component\Messenger\Transport\SetupableTransportInterface;
/**
* @author Vincent Touzet <vincent.touzet@gmail.com>
*/
class SetupTransportsCommand extends Command
{
protected static $defaultName = 'messenger:setup-transports';
protected static $defaultDescription = 'Prepare the required infrastructure for the transport';
private $transportLocator;
private $transportNames;
public function __construct(ContainerInterface $transportLocator, array $transportNames = [])
{
$this->transportLocator = $transportLocator;
$this->transportNames = $transportNames;
parent::__construct();
}
protected function configure()
{
$this
->addArgument('transport', InputArgument::OPTIONAL, 'Name of the transport to setup', null)
->setDescription(self::$defaultDescription)
->setHelp(<<<EOF
The <info>%command.name%</info> command setups the transports:
<info>php %command.full_name%</info>
Or a specific transport only:
<info>php %command.full_name% <transport></info>
EOF
)
;
}
protected function execute(InputInterface $input, OutputInterface $output)
{
$io = new SymfonyStyle($input, $output);
$transportNames = $this->transportNames;
// do we want to set up only one transport?
if ($transport = $input->getArgument('transport')) {
if (!$this->transportLocator->has($transport)) {
throw new \RuntimeException(sprintf('The "%s" transport does not exist.', $transport));
}
$transportNames = [$transport];
}
foreach ($transportNames as $id => $transportName) {
$transport = $this->transportLocator->get($transportName);
if ($transport instanceof SetupableTransportInterface) {
$transport->setup();
$io->success(sprintf('The "%s" transport was set up successfully.', $transportName));
} else {
$io->note(sprintf('The "%s" transport does not support setup.', $transportName));
}
}
return 0;
}
public function complete(CompletionInput $input, CompletionSuggestions $suggestions): void
{
if ($input->mustSuggestArgumentValuesFor('transport')) {
$suggestions->suggestValues($this->transportNames);
return;
}
}
}
@@ -0,0 +1,75 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Command;
use Psr\Cache\CacheItemPoolInterface;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\ConsoleOutputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Style\SymfonyStyle;
use Symfony\Component\Messenger\EventListener\StopWorkerOnRestartSignalListener;
/**
* @author Ryan Weaver <ryan@symfonycasts.com>
*/
class StopWorkersCommand extends Command
{
protected static $defaultName = 'messenger:stop-workers';
protected static $defaultDescription = 'Stop workers after their current message';
private $restartSignalCachePool;
public function __construct(CacheItemPoolInterface $restartSignalCachePool)
{
$this->restartSignalCachePool = $restartSignalCachePool;
parent::__construct();
}
/**
* {@inheritdoc}
*/
protected function configure(): void
{
$this
->setDefinition([])
->setDescription(self::$defaultDescription)
->setHelp(<<<'EOF'
The <info>%command.name%</info> command sends a signal to stop any <info>messenger:consume</info> processes that are running.
<info>php %command.full_name%</info>
Each worker command will finish the message they are currently processing
and then exit. Worker commands are *not* automatically restarted: that
should be handled by a process control system.
EOF
)
;
}
/**
* {@inheritdoc}
*/
protected function execute(InputInterface $input, OutputInterface $output)
{
$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
$cacheItem = $this->restartSignalCachePool->getItem(StopWorkerOnRestartSignalListener::RESTART_REQUESTED_TIMESTAMP_KEY);
$cacheItem->set(microtime(true));
$this->restartSignalCachePool->save($cacheItem);
$io->success('Signal successfully sent to stop any running workers.');
return 0;
}
}