Merge pull request #538 from arabcoders/dev

Migrated tasks to use the new events system for queue.
This commit is contained in:
Abdulmohsen
2024-08-20 21:22:03 +03:00
committed by GitHub
11 changed files with 255 additions and 155 deletions

32
FAQ.md
View File

@@ -913,25 +913,37 @@ is an example of how to do it for debian based systems.
```yaml ```yaml
services: services:
watchstate: watchstate:
image: ghcr.io/arabcoders/watchstate:latest container_name: watchstate
# To change the user/group id associated with the tool change the following line. image: ghcr.io/arabcoders/watchstate:latest # The image to use. you can use the latest or dev tag.
user: "${UID:-1000}:${GID:-1000}" user: "${UID:-1000}:${GID:-1000}" # user and group id to run the container under.
group_add: group_add:
- "44" # Add video group to the container. - "44" # Add video group to the container.
- "110" # Add render group to the container. - "105" # Add render group to the container.
container_name: watchstate
restart: unless-stopped restart: unless-stopped
ports: ports:
- "8080:8080" # The port which will serve WebUI + API + Webhooks - "8080:8080" # The port which will serve WebUI + API + Webhooks
devices:
- /dev/dri:/dev/dri # mount the dri devices to the container.
volumes: volumes:
- ./data:/config:rw # mount current directory to container /config directory. - ./data:/config:rw # mount current directory to container /config directory.
- /dev/dri:/dev/dri # mount the dri devices to the container.
- /storage/media:/media:ro # mount your media directory to the container. - /storage/media:/media:ro # mount your media directory to the container.
``` ```
This setup should work for VAAPI encoding in `x86_64` containers, for other architectures you need to adjust the This setup should work for VAAPI encoding in `x86_64` containers, There are currently an issue with nvidia h264_nvenc
`/dev/dri` to match your hardware. There are currently an issue with nvidia h264_nvenc encoding, the alpine build for encoding, the alpine build for`ffmpeg` doesn't include the codec. i am looking for a way include the codec without
`ffmpeg`doesn't include the codec. ballooning the image size by 600MB+. If you have a solution please let me know.
Please know that your `video`, `render` group id might be different then mine, you can run the follow command in docker
host server to get the group ids for both groups.
```bash
$ cat /etc/group | grep -E 'render|video'
video:x:44:your_docker_username
render:x:105:your_docker_username
```
In my docker host the group id for `video` is `44` and for `render` is `105`. change what needed in the `compose.yaml`
file to match your setup.
Note: the tip about adding the group_add came from the user `binarypancakes` in discord. Note: the tip about adding the group_add came from the user `binarypancakes` in discord.

View File

@@ -299,6 +299,7 @@ return (function () {
'enabled' => true, 'enabled' => true,
'timer' => '* * * * *', 'timer' => '* * * * *',
'args' => '-v', 'args' => '-v',
'hide' => true,
], ],
], ],
]; ];

View File

@@ -88,6 +88,9 @@ fi
echo "[$(date +"%Y-%m-%dT%H:%M:%S%z")] Caching tool routes." echo "[$(date +"%Y-%m-%dT%H:%M:%S%z")] Caching tool routes."
/opt/bin/console system:routes /opt/bin/console system:routes
echo "[$(date +"%Y-%m-%dT%H:%M:%S%z")] Caching events listeners."
/opt/bin/console events:cache
echo "[$(date +"%Y-%m-%dT%H:%M:%S%z")] Running database migrations." echo "[$(date +"%Y-%m-%dT%H:%M:%S%z")] Running database migrations."
/opt/bin/console system:db:migrations /opt/bin/console system:db:migrations

View File

@@ -85,7 +85,12 @@
</time> </time>
</span> </span>
<span class="card-footer-item"> <span class="card-footer-item">
<span v-if="!item.updated_at" class="icon"><i class="fas fa-spinner fa-spin"></i></span> <template v-if="!item.updated_at">
<span v-if="0 === item.status" class="icon">
<i class="fas fa-spinner fa-spin"></i>
</span>
<span v-else>None</span>
</template>
<template v-else> <template v-else>
<span class="icon"><i class="fas fa-calendar-alt"></i></span> <span class="icon"><i class="fas fa-calendar-alt"></i></span>
<time class="has-tooltip" v-tooltip="`Updated at: ${moment(item.updated_at)}`"> <time class="has-tooltip" v-tooltip="`Updated at: ${moment(item.updated_at)}`">

View File

@@ -122,7 +122,7 @@
<span class="icon"><i class="fas fa-clock" :class="{ 'fa-spin': task.queued }"></i></span> <span class="icon"><i class="fas fa-clock" :class="{ 'fa-spin': task.queued }"></i></span>
<span> <span>
<template v-if="!task.queued">Queue Task</template> <template v-if="!task.queued">Queue Task</template>
<template v-else>Remove from queue</template> <template v-else>Cancel Task</template>
</span> </span>
</span> </span>
</button> </button>
@@ -239,7 +239,7 @@ const queueTask = async task => {
try { try {
const response = await request(`/tasks/${task.name}/queue`, {method: is_queued ? 'DELETE' : 'POST'}) const response = await request(`/tasks/${task.name}/queue`, {method: is_queued ? 'DELETE' : 'POST'})
if (response.ok) { if (response.ok) {
notification('success', 'Success', `Task '${task.name}' has been ${is_queued ? 'removed from the queue' : 'queued'}.`) notification('success', 'Success', `Task '${task.name}' has been ${is_queued ? 'cancelled' : 'queued'}.`)
task.queued = !is_queued task.queued = !is_queued
if (task.queued) { if (task.queued) {
queued.value.push(task.name) queued.value.push(task.name)

View File

@@ -2,24 +2,26 @@
declare(strict_types=1); declare(strict_types=1);
namespace App\API\Tasks; namespace App\API;
use App\Commands\System\TasksCommand; use App\Commands\System\TasksCommand;
use App\Libs\Attributes\Route\Get; use App\Libs\Attributes\Route\Get;
use App\Libs\Attributes\Route\Route; use App\Libs\Attributes\Route\Route;
use App\Libs\Enums\Http\Status; use App\Libs\Enums\Http\Status;
use App\Model\Events\Event;
use App\Model\Events\EventsRepository;
use App\Model\Events\EventsTable;
use App\Model\Events\EventStatus;
use Cron\CronExpression; use Cron\CronExpression;
use DateInterval;
use Psr\Http\Message\ResponseInterface as iResponse; use Psr\Http\Message\ResponseInterface as iResponse;
use Psr\Http\Message\ServerRequestInterface as iRequest; use Psr\Http\Message\ServerRequestInterface as iRequest;
use Psr\SimpleCache\CacheInterface as iCache;
use Psr\SimpleCache\InvalidArgumentException; use Psr\SimpleCache\InvalidArgumentException;
final class Index final class Tasks
{ {
public const string URL = '%{api.prefix}/tasks'; public const string URL = '%{api.prefix}/tasks';
public function __construct(private readonly iCache $cache) public function __construct(private EventsRepository $eventsRepo)
{ {
} }
@@ -29,22 +31,28 @@ final class Index
#[Get(self::URL . '[/]', name: 'tasks.index')] #[Get(self::URL . '[/]', name: 'tasks.index')]
public function tasksIndex(): iResponse public function tasksIndex(): iResponse
{ {
$queuedTasks = $this->cache->get('queued_tasks', []); $tasks = [];
$response = [
'tasks' => [],
'queued' => $queuedTasks,
'status' => isTaskWorkerRunning(),
];
foreach (TasksCommand::getTasks() as $task) { foreach (TasksCommand::getTasks() as $task) {
$task = self::formatTask($task); $task = self::formatTask($task);
$task['queued'] = in_array(ag($task, 'name'), $queuedTasks); if (true === (bool)ag($task, 'hide', false)) {
continue;
$response['tasks'][] = $task;
} }
return api_response(Status::OK, $response); $task['queued'] = null !== $this->isQueued(ag($task, 'name'));
$tasks[] = $task;
}
$queued = [];
foreach (array_filter($tasks, fn($item) => $item['queued'] === true) as $item) {
$queued[] = $item['name'];
}
return api_response(Status::OK, [
'tasks' => $tasks,
'queued' => $queued,
'status' => isTaskWorkerRunning(),
]);
} }
/** /**
@@ -59,29 +67,41 @@ final class Index
return api_error('Task not found.', Status::NOT_FOUND); return api_error('Task not found.', Status::NOT_FOUND);
} }
$queuedTasks = $this->cache->get('queued_tasks', []); $queuedTask = $this->isQueued(ag($task, 'name'));
if ('POST' === $request->getMethod()) { if ('POST' === $request->getMethod()) {
$queuedTasks[] = $id; if (null !== $queuedTask) {
$this->cache->set('queued_tasks', $queuedTasks, new DateInterval('P3D')); return api_error('Task already queued.', Status::CONFLICT);
return api_response(Status::ACCEPTED, ['queue' => $queuedTasks]); }
$event = queueEvent(TasksCommand::NAME, ['name' => $id], [
EventsTable::COLUMN_REFERENCE => r('task://{name}', ['name' => $id]),
]);
return api_response(Status::ACCEPTED, $event->getAll());
} }
if ('DELETE' === $request->getMethod()) { if ('DELETE' === $request->getMethod()) {
$queuedTasks = array_filter($queuedTasks, fn($v) => $v !== $id); if (null === $queuedTask) {
$this->cache->set('queued_tasks', $queuedTasks, new DateInterval('P3D')); return api_error('Task not queued.', Status::NOT_FOUND);
return api_response(Status::OK, ['queue' => $queuedTasks]); }
if ($queuedTask->status === EventStatus::RUNNING) {
return api_error('Cannot remove task in running state.', Status::BAD_REQUEST);
}
$queuedTask->status = EventStatus::CANCELLED;
$this->eventsRepo->save($queuedTask);
return api_response(Status::OK);
} }
return api_response(Status::OK, [ return api_response(Status::OK, [
'task' => $id, 'task' => $id,
'is_queued' => in_array($id, $queuedTasks), 'is_queued' => null !== $queuedTask,
]); ]);
} }
/**
* @throws InvalidArgumentException
*/
#[Get(self::URL . '/{id:[a-zA-Z0-9_-]+}[/]', name: 'tasks.task.view')] #[Get(self::URL . '/{id:[a-zA-Z0-9_-]+}[/]', name: 'tasks.task.view')]
public function taskView(string $id): iResponse public function taskView(string $id): iResponse
{ {
@@ -91,10 +111,8 @@ final class Index
return api_error('Task not found.', Status::NOT_FOUND); return api_error('Task not found.', Status::NOT_FOUND);
} }
$queuedTasks = $this->cache->get('queued_tasks', []); $data = Tasks::formatTask($task);
$data['queued'] = null !== $this->isQueued(ag($task, 'name'));
$data = Index::formatTask($task);
$data['queued'] = in_array(ag($task, 'name'), $queuedTasks);
return api_response(Status::OK, $data); return api_response(Status::OK, $data);
} }
@@ -115,6 +133,7 @@ final class Index
'prev_run' => null, 'prev_run' => null,
'command' => ag($task, 'command'), 'command' => ag($task, 'command'),
'args' => ag($task, 'args'), 'args' => ag($task, 'args'),
'hide' => (bool)ag($task, 'hide', false),
]; ];
if (!is_string($item['command'])) { if (!is_string($item['command'])) {
@@ -136,4 +155,11 @@ final class Index
return $item; return $item;
} }
private function isQueued(string $id): Event|null
{
return $this->eventsRepo->findByReference(r('task://{name}', ['name' => $id]), [
EventsTable::COLUMN_STATUS => EventStatus::PENDING->value
]);
}
} }

View File

@@ -142,7 +142,7 @@ final class DispatchCommand extends Command
$event->updated_at = (string)makeDate(); $event->updated_at = (string)makeDate();
$this->repo->save($event); $this->repo->save($event);
$this->logger->error($errorLog); $this->logger->error($errorLog, ['trace' => $e->getTrace()]);
} }
} }
} }

View File

@@ -20,9 +20,9 @@ use Symfony\Component\Console\Output\OutputInterface;
#[Cli(command: self::ROUTE)] #[Cli(command: self::ROUTE)]
final class IndexCommand extends Command final class IndexCommand extends Command
{ {
public const ROUTE = 'system:index'; public const string ROUTE = 'system:index';
public const TASK_NAME = 'indexes'; public const string TASK_NAME = 'indexes';
/** /**
* Class constructor. * Class constructor.

View File

@@ -7,14 +7,19 @@ namespace App\Commands\System;
use App\Command; use App\Command;
use App\Libs\Attributes\Route\Cli; use App\Libs\Attributes\Route\Cli;
use App\Libs\Config; use App\Libs\Config;
use App\Libs\Container;
use App\Libs\Events\DataEvent;
use App\Libs\Extends\ConsoleOutput; use App\Libs\Extends\ConsoleOutput;
use App\Libs\Stream; use App\Libs\Stream;
use App\Model\Events\EventListener;
use App\Model\Events\EventsRepository;
use Closure;
use Cron\CronExpression; use Cron\CronExpression;
use Exception; use Exception;
use Psr\SimpleCache\CacheInterface as iCache;
use Psr\SimpleCache\InvalidArgumentException; use Psr\SimpleCache\InvalidArgumentException;
use Symfony\Component\Console\Completion\CompletionInput; use Symfony\Component\Console\Completion\CompletionInput;
use Symfony\Component\Console\Completion\CompletionSuggestions; use Symfony\Component\Console\Completion\CompletionSuggestions;
use Symfony\Component\Console\Input\ArrayInput;
use Symfony\Component\Console\Input\InputInterface as iInput; use Symfony\Component\Console\Input\InputInterface as iInput;
use Symfony\Component\Console\Input\InputOption; use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\ConsoleOutputInterface; use Symfony\Component\Console\Output\ConsoleOutputInterface;
@@ -30,15 +35,18 @@ use Throwable;
#[Cli(command: self::ROUTE)] #[Cli(command: self::ROUTE)]
final class TasksCommand extends Command final class TasksCommand extends Command
{ {
public const string NAME = 'run_task';
public const string ROUTE = 'system:tasks'; public const string ROUTE = 'system:tasks';
private array $logs = []; private array $logs = [];
private array $taskOutput = []; private array $taskOutput = [];
private Closure|null $writer = null;
/** /**
* Class Constructor. * Class Constructor.
*/ */
public function __construct(private readonly iCache $cache) public function __construct(private EventsRepository $eventsRepo)
{ {
set_time_limit(0); set_time_limit(0);
ini_set('memory_limit', '-1'); ini_set('memory_limit', '-1');
@@ -134,7 +142,7 @@ final class TasksCommand extends Command
*/ */
protected function runCommand(iInput $input, iOutput $output): int protected function runCommand(iInput $input, iOutput $output): int
{ {
if ($input->getOption('run')) { if ($input->hasOption('run') && $input->getOption('run')) {
return $this->runTasks($input, $output); return $this->runTasks($input, $output);
} }
@@ -158,6 +166,59 @@ final class TasksCommand extends Command
return self::SUCCESS; return self::SUCCESS;
} }
#[EventListener(self::NAME)]
public function runEventTask(DataEvent $event): DataEvent
{
$event->stopPropagation();
if (null === ($name = ag($event->getData(), 'name'))) {
$event->addLog(r('No task name was specified.'));
return $event;
}
$task = self::getTasks($name);
if (empty($task)) {
$event->addLog(r("Invalid task '{name}'. There are no task with that name registered.", ['name' => $name]));
return $event;
}
try {
$input = new ArrayInput([], $this->getDefinition());
$input->setOption('run', null);
$input->setOption('task', null);
$input->setOption('save-log', true);
$input->setOption('live', false);
$this->writer = function ($msg) use (&$event) {
static $lastSave = null;
$timeNow = hrtime(as_number: true);
if (null === $lastSave) {
$lastSave = $timeNow;
}
$event->addLog($msg);
if ($timeNow > $lastSave) {
$this->eventsRepo->save($event->getEvent());
$lastSave = $timeNow + (10 * 1_000_000_000);
}
};
$event->addLog(r("Task: Run '{command}'.", ['command' => ag($task, 'command')]));
$exitCode = $this->runTask($task, $input, Container::get(iOutput::class));
$event->addLog(r("Task: End '{command}' (Exit Code: {code})", [
'command' => ag($task, 'command'),
'code' => $exitCode,
]));
} finally {
$this->writer = null;
}
return $event;
}
/** /**
* Runs the tasks. * Runs the tasks.
* *
@@ -176,31 +237,13 @@ final class TasksCommand extends Command
$task = strtolower($task); $task = strtolower($task);
if (false === ag_exists($tasks, $task)) { if (false === ag_exists($tasks, $task)) {
$output->writeln( $output->writeln(r('<error>There are no task named [{task}].</error>', [
r('<error>There are no task named [{task}].</error>', [
'task' => $task 'task' => $task
]) ]));
);
return self::FAILURE; return self::FAILURE;
} }
$run[] = ag($tasks, $task); $run[] = ag($tasks, $task);
} elseif (null !== ($queued = $this->cache->get('queued_tasks', null))) {
foreach ($queued as $taskName) {
$task = strtolower($taskName);
if (false === ag_exists($tasks, $task)) {
$output->writeln(
r('<error>There are no task named [{task}].</error>', [
'task' => $task
])
);
continue;
}
$run[] = ag($tasks, $task);
}
$this->cache->delete('queued_tasks');
} else { } else {
foreach ($tasks as $task) { foreach ($tasks as $task) {
if (false === (bool)ag($task, 'enabled')) { if (false === (bool)ag($task, 'enabled')) {
@@ -208,7 +251,6 @@ final class TasksCommand extends Command
} }
assert($task['timer'] instanceof CronExpression); assert($task['timer'] instanceof CronExpression);
if ($task['timer']->isDue('now')) { if ($task['timer']->isDue('now')) {
$run[] = $task; $run[] = $task;
} }
@@ -216,15 +258,35 @@ final class TasksCommand extends Command
} }
if (count($run) < 1) { if (count($run) < 1) {
$output->writeln( $output->writeln(r('<info>[{datetime}] No task scheduled to run at this time.</info>', [
r('<info>[{datetime}] No task scheduled to run at this time.</info>', [
'datetime' => makeDate(), 'datetime' => makeDate(),
]), ]), iOutput::VERBOSITY_VERBOSE);
iOutput::VERBOSITY_VERBOSE
);
} }
foreach ($run as $task) { foreach ($run as $task) {
$this->runTask($task, $input, $output);
}
if ($input->getOption('save-log') && count($this->logs) >= 1) {
try {
$stream = new Stream(Config::get('tasks.logfile'), 'a');
$stream->write(preg_replace('#\R+#', PHP_EOL, implode(PHP_EOL, $this->logs)) . PHP_EOL . PHP_EOL);
$stream->close();
} catch (Throwable $e) {
$this->write(r("<error>Failed to open/write to logfile '{file}'. Error '{message}'.</error>", [
'file' => Config::get('tasks.logfile'),
'message' => $e->getMessage(),
]), $input, $output);
return self::INVALID;
}
}
return self::SUCCESS;
}
private function runTask(array $task, iInput $input, iOutput $output): int
{
$cmd = []; $cmd = [];
$cmd[] = ROOT_PATH . '/bin/console'; $cmd[] = ROOT_PATH . '/bin/console';
@@ -247,7 +309,7 @@ final class TasksCommand extends Command
$this->taskOutput[] = trim($out); $this->taskOutput[] = trim($out);
if (!$input->getOption('live')) { if (!$input->hasOption('live') && $input->getOption('live')) {
return; return;
} }
@@ -259,7 +321,7 @@ final class TasksCommand extends Command
} }
if (count($this->taskOutput) < 1) { if (count($this->taskOutput) < 1) {
continue; return $process->getExitCode();
} }
$ended = makeDate()->format('D, H:i:s T'); $ended = makeDate()->format('D, H:i:s T');
@@ -290,24 +352,8 @@ final class TasksCommand extends Command
} }
$this->taskOutput = []; $this->taskOutput = [];
}
if ($input->getOption('save-log') && count($this->logs) >= 1) { return $process->getExitCode();
try {
$stream = new Stream(Config::get('tasks.logfile'), 'a');
$stream->write(preg_replace('#\R+#', PHP_EOL, implode(PHP_EOL, $this->logs)) . PHP_EOL . PHP_EOL);
$stream->close();
} catch (Throwable $e) {
$this->write(r('<error>Failed to open log file [{file}]. Error [{message}].</error>', [
'file' => Config::get('tasks.logfile'),
'message' => $e->getMessage(),
]), $input, $output);
return self::INVALID;
}
}
return self::SUCCESS;
} }
/** /**
@@ -327,7 +373,11 @@ final class TasksCommand extends Command
assert($output instanceof ConsoleOutput); assert($output instanceof ConsoleOutput);
$output->writeln($text, $level); $output->writeln($text, $level);
if ($input->getOption('save-log')) { if (null !== $this->writer) {
($this->writer)($output->getLastMessage());
}
if ($input->hasOption('save-log') && $input->getOption('save-log')) {
$this->logs[] = $output->getLastMessage(); $this->logs[] = $output->getLastMessage();
} }
} }
@@ -353,6 +403,7 @@ final class TasksCommand extends Command
'description' => $task['info'] ?? '', 'description' => $task['info'] ?? '',
'enabled' => (bool)$task['enabled'], 'enabled' => (bool)$task['enabled'],
'timer' => $timer, 'timer' => $timer,
'hide' => (bool)($task['hide'] ?? false),
]; ];
try { try {

View File

@@ -23,6 +23,9 @@ class DataEvent extends Event
public function addLog(string $log): void public function addLog(string $log): void
{ {
if (count($this->eventInfo->logs) > 200) {
array_shift($this->eventInfo->logs);
}
$this->eventInfo->logs[] = $log; $this->eventInfo->logs[] = $log;
} }

View File

@@ -63,7 +63,12 @@ final class EventsRepository
$criteria[EntityTable::COLUMN_REFERENCE] = $reference; $criteria[EntityTable::COLUMN_REFERENCE] = $reference;
return $this->_remove($criteria); $stmt = $this->db->delete($this->table, $criteria, [
'limit' => 1,
'orderby' => [EntityTable::COLUMN_CREATED_AT => 'DESC'],
]);
return $stmt->rowCount() > 0;
} }
/** /**
@@ -95,10 +100,4 @@ final class EventsRepository
{ {
return $this->_remove($criteria); return $this->_remove($criteria);
} }
public function removeById(string $id): bool
{
return $this->_removeById($id, $this->primaryKey);
}
} }