Added the ability to add/remove item from push queue.

This commit is contained in:
Abdulmhsen B. A. A
2022-06-06 02:20:35 +03:00
parent d11ffbf424
commit 514dd3565a
2 changed files with 35 additions and 3 deletions

View File

@@ -13,6 +13,7 @@ use Psr\SimpleCache\InvalidArgumentException;
use Symfony\Component\Console\Helper\Table;
use Symfony\Component\Console\Helper\TableSeparator;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
class QueueCommand extends Command
@@ -30,6 +31,8 @@ class QueueCommand extends Command
protected function configure(): void
{
$this->setName('db:queue')
->addOption('add', 'a', InputOption::VALUE_REQUIRED, 'Add record id to push queue.')
->addOption('remove', 'r', InputOption::VALUE_REQUIRED, 'Remove record id from push queue.')
->setDescription('Show webhook queued events.');
}
@@ -41,6 +44,17 @@ class QueueCommand extends Command
*/
protected function runCommand(InputInterface $input, OutputInterface $output): int
{
if (null !== ($id = $input->getOption('add'))) {
$item = Container::get(StateInterface::class)::fromArray(['id' => $id]);
if (null === ($item = $this->storage->get($item))) {
$output->writeln(sprintf('<error>Record id \'%d\' does not exists.</error>', $id));
return self::FAILURE;
}
queuePush($item);
}
if (!$this->cache->has('queue')) {
$output->writeln('<info>No items in the queue.</info>');
return self::SUCCESS;
@@ -48,7 +62,21 @@ class QueueCommand extends Command
$entities = $items = [];
foreach ($this->cache->get('queue', []) as $item) {
$queue = $this->cache->get('queue', []);
if (null !== ($id = $input->getOption('remove'))) {
if (!array_key_exists($id, $queue)) {
$output->writeln(sprintf('<error>Record id \'%d\' does not exists in the queue.</error>', $id));
return self::FAILURE;
}
unset($queue[$id]);
$item = Container::get(StateInterface::class)::fromArray(['id' => $id]);
queuePush($item, remove: true);
}
foreach ($queue as $item) {
$items[] = Container::get(StateInterface::class)::fromArray($item);
}

View File

@@ -321,7 +321,7 @@ if (!function_exists('httpClientChunks')) {
}
if (!function_exists('queuePush')) {
function queuePush(StateInterface $entity): void
function queuePush(StateInterface $entity, bool $remove = false): void
{
if (!$entity->hasGuids() && !$entity->hasRelativeGuid()) {
return;
@@ -332,7 +332,11 @@ if (!function_exists('queuePush')) {
$list = $cache->get('queue', []);
$list[$entity->id] = ['id' => $entity->id];
if (true === $remove && array_key_exists($entity->id, $list)) {
unset($list[$entity->id]);
} else {
$list[$entity->id] = ['id' => $entity->id];
}
$cache->set('queue', $list, new DateInterval('P7D'));
} catch (\Psr\SimpleCache\InvalidArgumentException $e) {