From 2e41f80698a82a1ac1426e7aafd34aed77c70f40 Mon Sep 17 00:00:00 2001 From: "Abdulmhsen B. A. A." Date: Mon, 19 Aug 2024 14:31:31 +0300 Subject: [PATCH] Migrate state:push into the new event system. --- FAQ.md | 2 +- NEWS.md | 40 ++++ README.md | 52 ++--- config/config.php | 14 +- config/env.spec.php | 7 +- frontend/pages/events/index.vue | 10 +- frontend/pages/history/[id]/index.vue | 9 +- frontend/utils/events/helpers.js | 4 +- src/Commands/Events/DispatchCommand.php | 2 +- src/Commands/State/PushCommand.php | 272 ------------------------ src/Libs/helpers.php | 66 +++--- src/Listeners/ProcessProgressEvent.php | 125 +++++------ src/Listeners/ProcessPushEvent.php | 183 ++++++++++++++++ src/Listeners/ProcessRequestEvent.php | 2 +- src/Model/Events/EventsRepository.php | 17 ++ 15 files changed, 367 insertions(+), 438 deletions(-) delete mode 100644 src/Commands/State/PushCommand.php create mode 100644 src/Listeners/ProcessPushEvent.php diff --git a/FAQ.md b/FAQ.md index 30693d39..5c56d4fc 100644 --- a/FAQ.md +++ b/FAQ.md @@ -402,7 +402,7 @@ command via CLI. > [!IMPORTANT] > for environment variables that has `{TASK}` tag, you **MUST** replace it with one -> of `IMPORT`, `EXPORT`, `PUSH`, `BACKUP`, `PRUNE`, `INDEXES`. To see tasks active settings run +> of `IMPORT`, `EXPORT`, `BACKUP`, `PRUNE`, `INDEXES`. To see tasks active settings run ```bash $ docker exec -ti watchstate console system:tasks diff --git a/NEWS.md b/NEWS.md index a5231118..3f7cc7f0 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,5 +1,45 @@ # Old Updates +### 2024-08-10 + +I have recently added new experimental feature, to play your content directly from the WebUI. This feature is still in +alpha, and missing a lot of features. But it's a start. Right now it does auto transcode on the fly to play any content in the browser. + +The feature requires that you mount your media directories to the `WatchState` container similar to the `File integrity` feature. I have plans to expand +the feature to support more controls, however, right now it's only support basic subtitles streams and default audio stream or first audio stream. + +The transcoder works by converting the media on the fly to `HLS` segments, and the subtitles are selectable via the player ui which are also converted to `vtt` format. + +Expects bugs and issues, as the feature is still in alpha. But I would love to hear your feedback. You can play the media by visiting +the history page of the item you will see red play button on top right corner of the page. If the items has a play button, then you correctly mounted +the media directories. otherwise, the button be disabled with tooltip of `Media is inaccessible`. + +The feature is not meant to replace your backend media player, the purpose of this feature is to quickly check the media without leaving the WebUI. + +### 2024-08-01 + +We recently enabled listening on tls connections via `8443` which can be controlled by `HTTPS_PORT` environment variable. +Before today, we simply only exposed the port via the `Dockerfile`, but we weren't listening for connections on it. + +However, please keep in mind that the certificate is self-signed, and you might get a warning from your browser. You can +either accept the warning or add the certificate to your trusted certificates. We strongly recommend using a reverse proxy. +instead of relying on self-signed certificates. + +### 2024-07-22 + +We have recently added a new WebUI feature, `File integrity`, this feature will help you to check if your media backends +are reporting files that are not available on the disk. This feature is still in alpha, and we are working on improving +it. + +This feature `REQUIRES` that you mount your media directories to the `WatchState` container preferably as readonly. There is plans to add +a path replacement feature to allow you change the pathing, but it's not implemented yet. + +This feature will work on both local and remote cloud storages provided they are mounted into the container. We also may recommend not to +use this feature depending on how your cloud storage provider treats file stat calls. As it might lead to unnecessary money spending. and of course +it will be slower. + +For more information about how we cache the stat calls, please refer to the [FAQ](FAQ.md#How-does-the-file-integrity-feature-works). + ### 2024-07-06 Recently we have introduced a new feature that allows you to use Jellyfin and Emby OAuth access tokens for syncing diff --git a/README.md b/README.md index 66a8fa34..78496270 100644 --- a/README.md +++ b/README.md @@ -9,6 +9,18 @@ out of the box, this tool support `Jellyfin`, `Plex` and `Emby` media servers. ## Updates +### 2024-08-19 + +We have migrated the `state:push` task into the new events system, as such the old task `state:push` is now gone. +To enable the new event handler for push events, use the new environment variable `WS_PUSH_ENABLED` and set it to `true`. +Right now, it's disabled by default. However, for people who had the old task enabled, it will reuse that setting. + +Keep in mind, the new event handler is more efficient and will only push data when there is a change in the play state. And it's much faster +than the old task. This event handler will push data within a minute of the change. + +PS: Please enable the task by setting its new environment variable `WS_PUSH_ENABLED` to `true`. The old `WS_CRON_PUSH` is now gone. +and will be removed in the future releases. + ### 2024-08-18 We have started migrating the old events system to a new one, so far we have migrated the `progress` and `requests` to it. As such, @@ -17,46 +29,6 @@ environment variable `WS_SYNC_PROGRESS` which you can set to `true` to enable th We will continue to migrate the rest of the events to the new system, and we will keep you updated. -### 2024-08-10 - -I have recently added new experimental feature, to play your content directly from the WebUI. This feature is still in -alpha, and missing a lot of features. But it's a start. Right now it does auto transcode on the fly to play any content in the browser. - -The feature requires that you mount your media directories to the `WatchState` container similar to the `File integrity` feature. I have plans to expand -the feature to support more controls, however, right now it's only support basic subtitles streams and default audio stream or first audio stream. - -The transcoder works by converting the media on the fly to `HLS` segments, and the subtitles are selectable via the player ui which are also converted to `vtt` format. - -Expects bugs and issues, as the feature is still in alpha. But I would love to hear your feedback. You can play the media by visiting -the history page of the item you will see red play button on top right corner of the page. If the items has a play button, then you correctly mounted -the media directories. otherwise, the button be disabled with tooltip of `Media is inaccessible`. - -The feature is not meant to replace your backend media player, the purpose of this feature is to quickly check the media without leaving the WebUI. - -### 2024-08-01 - -We recently enabled listening on tls connections via `8443` which can be controlled by `HTTPS_PORT` environment variable. -Before today, we simply only exposed the port via the `Dockerfile`, but we weren't listening for connections on it. - -However, please keep in mind that the certificate is self-signed, and you might get a warning from your browser. You can -either accept the warning or add the certificate to your trusted certificates. We strongly recommend using a reverse proxy. -instead of relying on self-signed certificates. - -### 2024-07-22 - -We have recently added a new WebUI feature, `File integrity`, this feature will help you to check if your media backends -are reporting files that are not available on the disk. This feature is still in alpha, and we are working on improving -it. - -This feature `REQUIRES` that you mount your media directories to the `WatchState` container preferably as readonly. There is plans to add -a path replacement feature to allow you change the pathing, but it's not implemented yet. - -This feature will work on both local and remote cloud storages provided they are mounted into the container. We also may recommend not to -use this feature depending on how your cloud storage provider treats file stat calls. As it might lead to unnecessary money spending. and of course -it will be slower. - -For more information about how we cache the stat calls, please refer to the [FAQ](FAQ.md#How-does-the-file-integrity-feature-works). - Refer to [NEWS](NEWS.md) for old updates. # Features diff --git a/config/config.php b/config/config.php index 0ef1eb14..08d3a273 100644 --- a/config/config.php +++ b/config/config.php @@ -9,7 +9,6 @@ use App\Commands\Events\DispatchCommand; use App\Commands\State\BackupCommand; use App\Commands\State\ExportCommand; use App\Commands\State\ImportCommand; -use App\Commands\State\PushCommand; use App\Commands\System\IndexCommand; use App\Commands\System\PruneCommand; use App\Libs\Mappers\Import\MemoryMapper; @@ -75,7 +74,10 @@ return (function () { 'header' => (string)env('WS_TRUST_HEADER', 'X-Forwarded-For'), ], 'sync' => [ - 'progress' => (bool)env('WS_SYNC_PROGRESS', false), + 'progress' => (bool)env('WS_SYNC_PROGRESS', (bool)env('WS_CRON_PROGRESS', false)), + ], + 'push' => [ + 'enabled' => (bool)env('WS_PUSH_ENABLED', (bool)env('WS_CRON_PUSH', false)), ], ]; @@ -266,14 +268,6 @@ return (function () { 'timer' => $checkTaskTimer((string)env('WS_CRON_EXPORT_AT', '30 */1 * * *'), '30 */1 * * *'), 'args' => env('WS_CRON_EXPORT_ARGS', '-v'), ], - PushCommand::TASK_NAME => [ - 'command' => PushCommand::ROUTE, - 'name' => PushCommand::TASK_NAME, - 'info' => 'Send queued events to backends.', - 'enabled' => (bool)env('WS_CRON_PUSH', true), - 'timer' => $checkTaskTimer((string)env('WS_CRON_PUSH_AT', '*/10 * * * *'), '*/10 * * * *'), - 'args' => env('WS_CRON_PUSH_ARGS', '-v'), - ], BackupCommand::TASK_NAME => [ 'command' => BackupCommand::ROUTE, 'name' => BackupCommand::TASK_NAME, diff --git a/config/env.spec.php b/config/env.spec.php index e7b0fc8f..ea0d5381 100644 --- a/config/env.spec.php +++ b/config/env.spec.php @@ -166,6 +166,11 @@ return (function () { 'description' => 'Enable watch progress sync.', 'type' => 'bool', ], + [ + 'key' => 'WS_PUSH_ENABLED', + 'description' => 'Enable Push play state to backends. This feature depends on webhooks being enabled.', + 'type' => 'bool', + ], ]; $validateCronExpression = function (string $value): string { @@ -191,7 +196,7 @@ return (function () { }; // -- Do not forget to update the tasks list if you add a new task. - $tasks = ['import', 'export', 'push', 'backup', 'prune', 'indexes']; + $tasks = ['import', 'export', 'backup', 'prune', 'indexes']; $task_env = [ [ 'key' => 'WS_CRON_{TASK}', diff --git a/frontend/pages/events/index.vue b/frontend/pages/events/index.vue index 03b42aa6..d8798430 100644 --- a/frontend/pages/events/index.vue +++ b/frontend/pages/events/index.vue @@ -39,13 +39,14 @@ -
+

No items found.

+

Search for {{ query }} returned no results.

@@ -79,7 +80,7 @@
- @@ -87,8 +88,7 @@ @@ -173,7 +173,7 @@ const filteredRows = computed(() => { return items.value.filter(i => { return Object.keys(i).some(k => { - if (typeof i[k] === 'object') { + if (typeof i[k] === 'object' && null !== i[k]) { return Object.values(i[k]).some(v => typeof v === 'string' ? v.toLowerCase().includes(toTower) : false) } return typeof i[k] === 'string' ? i[k].toLowerCase().includes(toTower) : false diff --git a/frontend/pages/history/[id]/index.vue b/frontend/pages/history/[id]/index.vue index 95b0c4a3..be8f0a69 100644 --- a/frontend/pages/history/[id]/index.vue +++ b/frontend/pages/history/[id]/index.vue @@ -429,7 +429,14 @@
- {{ JSON.stringify(data, null, 2) }} + {{ + JSON.stringify(Object.keys(data) + .filter(key => !['files', 'hardware', 'content_exists', '_toggle'].includes(key)) + .reduce((obj, key) => { + obj[key] = data[key]; + return obj; + }, {}), null, 2) + }}
diff --git a/frontend/utils/events/helpers.js b/frontend/utils/events/helpers.js index 0cca5faf..570bb3e0 100644 --- a/frontend/utils/events/helpers.js +++ b/frontend/utils/events/helpers.js @@ -3,7 +3,7 @@ const makeName = id => id.split('-').slice(0)[0] const getStatusClass = status => { switch (status) { case 0: - return 'is-light' + return 'is-light has-text-dark' case 1: return 'is-warning' case 2: @@ -13,7 +13,7 @@ const getStatusClass = status => { case 4: return 'is-danger is-light' default: - return 'is-light' + return 'is-light has-text-dark' } } diff --git a/src/Commands/Events/DispatchCommand.php b/src/Commands/Events/DispatchCommand.php index 9d136f1e..a43eb7bc 100644 --- a/src/Commands/Events/DispatchCommand.php +++ b/src/Commands/Events/DispatchCommand.php @@ -22,7 +22,7 @@ use Throwable; #[Cli(command: self::ROUTE)] final class DispatchCommand extends Command { - public const string TASK_NAME = 'Dispatch'; + public const string TASK_NAME = 'dispatch'; public const string ROUTE = 'events:dispatch'; diff --git a/src/Commands/State/PushCommand.php b/src/Commands/State/PushCommand.php deleted file mode 100644 index 44de4c1d..00000000 --- a/src/Commands/State/PushCommand.php +++ /dev/null @@ -1,272 +0,0 @@ -setName(self::ROUTE) - ->setDescription('Push webhook queued events.') - ->addOption('keep', 'k', InputOption::VALUE_NONE, 'Do not expunge queue after run is complete.') - ->addOption('dry-run', null, InputOption::VALUE_NONE, 'Do not commit changes to backends.') - ->addOption('ignore-date', null, InputOption::VALUE_NONE, 'Ignore date comparison.') - ->setHelp( - r( - <<webhook updated play state to export enabled backends. - You should not run this manually and instead rely on scheduled task to run this command. - - This command require the metadata to be already saved in database. - If no metadata available for a backend, then the item will be ignored for that backend. - - If the item was ignored during {route} run, it will be picked up later by next {export_route} run. - - HELP, - [ - 'cmd' => trim(commandContext()), - 'route' => self::ROUTE, - 'export_route' => ExportCommand::ROUTE, - ] - ) - ); - } - - /** - * Make sure the command is not running in parallel. - * - * @param InputInterface $input The input interface. - * @param OutputInterface $output The output interface. - * - * @return int Returns the process result status code. - * @throws \Psr\SimpleCache\InvalidArgumentException if the cache key is not a legal value. - */ - protected function runCommand(InputInterface $input, OutputInterface $output): int - { - return $this->single(fn(): int => $this->process($input), $output); - } - - /** - * Process the queue items and send change play state requests to the supported backends. - * - * @param InputInterface $input The input interface. - * - * @return int Returns the process result status code. - * @throws \Psr\SimpleCache\InvalidArgumentException if the cache key is not a legal value. - */ - protected function process(InputInterface $input): int - { - if (!$this->cache->has('queue')) { - $this->logger->info('No items in the queue.'); - return self::SUCCESS; - } - - $entities = $items = []; - - foreach ($this->cache->get('queue', []) as $item) { - $items[] = Container::get(iState::class)::fromArray($item); - } - - if (!empty($items)) { - foreach ($this->db->find(...$items) as $item) { - $entities[$item->id] = $item; - } - } - - $items = null; - - if (empty($entities)) { - $this->cache->delete('queue'); - $this->logger->debug('No items in the queue.'); - return self::SUCCESS; - } - - $list = []; - $supported = Config::get('supported', []); - - foreach ((array)Config::get('servers', []) as $backendName => $backend) { - $type = strtolower(ag($backend, 'type', 'unknown')); - - if (true !== (bool)ag($backend, 'export.enabled')) { - $this->logger->info('SYSTEM: Export to [{backend}] is disabled by user.', [ - 'backend' => $backendName, - ]); - - continue; - } - - if (!isset($supported[$type])) { - $this->logger->error('SYSTEM: [{backend}] Invalid type.', [ - 'backend' => $backendName, - 'condition' => [ - 'expected' => implode(', ', array_keys($supported)), - 'given' => $type, - ], - ]); - continue; - } - - if (null === ($url = ag($backend, 'url')) || false === isValidURL($url)) { - $this->logger->error('SYSTEM: [{backend}] Invalid url.', [ - 'backend' => $backendName, - 'url' => $url ?? 'None', - ]); - continue; - } - - $backend['name'] = $backendName; - $list[$backendName] = $backend; - } - - if (empty($list)) { - $this->logger->warning('SYSTEM: There are no backends with export enabled.'); - return self::FAILURE; - } - - foreach ($list as $name => &$backend) { - $opts = ag($backend, 'options', []); - - if ($input->getOption('ignore-date')) { - $opts[Options::IGNORE_DATE] = true; - } - - if ($input->getOption('dry-run')) { - $opts[Options::DRY_RUN] = true; - } - - if ($input->getOption('trace')) { - $opts[Options::DEBUG_TRACE] = true; - } - - $backend['options'] = $opts; - $backend['class'] = $this->getBackend(name: $name, config: $backend); - - $backend['class']->push(entities: $entities, queue: $this->queue); - } - - unset($backend); - - $total = count($this->queue); - - if ($total >= 1) { - $start = makeDate(); - $this->logger->notice('SYSTEM: Sending [{total}] change play state requests.', [ - 'total' => $total, - 'time' => [ - 'start' => $start, - ], - ]); - - foreach ($this->queue->getQueue() as $response) { - $context = ag($response->getInfo('user_data'), 'context', []); - - try { - if (200 !== $response->getStatusCode()) { - $this->logger->error( - 'SYSTEM: Request to change [{backend}] [{item.title}] play state returned with unexpected [{status_code}] status code.', - $context - ); - continue; - } - - $this->logger->notice('SYSTEM: Marked [{backend}] [{item.title}] as [{play_state}].', $context); - } catch (Throwable $e) { - $this->logger->error( - message: 'SYSTEM: Exception [{error.kind}] was thrown unhandled during [{backend}] request to change play state of {item.type} [{item.title}]. Error [{error.message} @ {error.file}:{error.line}].', - context: [ - 'error' => [ - 'kind' => $e::class, - 'line' => $e->getLine(), - 'message' => $e->getMessage(), - 'file' => after($e->getFile(), ROOT_PATH), - ], - ...$context, - 'exception' => [ - 'file' => $e->getFile(), - 'line' => $e->getLine(), - 'kind' => get_class($e), - 'message' => $e->getMessage(), - 'trace' => $e->getTrace(), - ], - ] - ); - } - } - - $end = makeDate(); - - $this->logger->notice('SYSTEM: Sent [{total}] change play state requests.', [ - 'total' => $total, - 'time' => [ - 'start' => $start, - 'end' => $end, - 'duration' => $end->getTimestamp() - $start->getTimestamp(), - ], - ]); - - $this->logger->notice('Using WatchState Version - \'{version}\'.', ['version' => getAppVersion()]); - } else { - $this->logger->notice('SYSTEM: No play state changes detected.'); - } - - if (false === $input->getOption('keep') && false === $input->getOption('dry-run')) { - $this->cache->delete('queue'); - } - - return self::SUCCESS; - } -} diff --git a/src/Libs/helpers.php b/src/Libs/helpers.php index f9de11cc..1a83d71c 100644 --- a/src/Libs/helpers.php +++ b/src/Libs/helpers.php @@ -27,6 +27,7 @@ use App\Libs\Response; use App\Libs\Router; use App\Libs\Stream; use App\Libs\Uri; +use App\Listeners\ProcessPushEvent; use App\Model\Events\Event as EventInfo; use App\Model\Events\EventListener; use App\Model\Events\EventsRepository; @@ -536,47 +537,48 @@ if (!function_exists('httpClientChunks')) { if (!function_exists('queuePush')) { /** - * Pushes the entity to the queue. - * - * This method adds the entity to the queue for further processing. + * Add push event to the events queue. * * @param iState $entity The entity to push to the queue. - * @param bool $remove (optional) Whether to remove the entity from the queue if it already exists (default is false). + * @param bool $remove Whether to remove the event from the queue if it's in pending state. Default is false. */ function queuePush(iState $entity, bool $remove = false): void { - if (!$remove && !$entity->hasGuids() && !$entity->hasRelativeGuid()) { + $logger = Container::get(iLogger::class); + + if (false === (bool)Config::get('push.enabled', false)) { + $logger->error("Push is disabled. Unable to push '{via}: {entity}'.", [ + 'via' => $entity->via, + 'entity' => $entity->getName() + ]); return; } - try { - $cache = Container::get(iCache::class); - - $list = $cache->get('queue', []); - - if (true === $remove && array_key_exists($entity->id, $list)) { - unset($list[$entity->id]); - } else { - $list[$entity->id] = ['id' => $entity->id]; - } - - $cache->set('queue', $list, new DateInterval('P7D')); - } catch (\Psr\SimpleCache\InvalidArgumentException $e) { - Container::get(iLogger::class)->error( - message: 'Exception [{error.kind}] was thrown unhandled during saving [{backend} - {title}} into queue. Error [{error.message} @ {error.file}:{error.line}].', - context: [ - 'backend' => $entity->via, - 'title' => $entity->getName(), - 'error' => [ - 'kind' => $e::class, - 'line' => $e->getLine(), - 'message' => $e->getMessage(), - 'file' => after($e->getFile(), ROOT_PATH), - ], - 'trace' => $e->getTrace(), - ], - ); + if (!$entity->id) { + $logger->error("Unable to push event '{via}: {entity}'. It has no local id yet.", [ + 'via' => $entity->via, + 'entity' => $entity->getName() + ]); + return; } + + if (true === $remove) { + Container::get(EventsRepository::class)->removeByReference(r('push://{id}', ['id' => $entity->id])); + return; + } + + if (!$entity->hasGuids() && !$entity->hasRelativeGuid()) { + $logger->error("Unable to push '{id}' event '{via}: {entity}'. It has no GUIDs.", [ + 'id' => $entity->id, + 'via' => $entity->via, + 'entity' => $entity->getName() + ]); + return; + } + + queueEvent(ProcessPushEvent::NAME, [iState::COLUMN_ID => $entity->id], [ + EventsTable::COLUMN_REFERENCE => r('push://{id}', ['id' => $entity->id]), + ]); } } diff --git a/src/Listeners/ProcessProgressEvent.php b/src/Listeners/ProcessProgressEvent.php index 88007efa..46c71719 100644 --- a/src/Listeners/ProcessProgressEvent.php +++ b/src/Listeners/ProcessProgressEvent.php @@ -72,14 +72,15 @@ final readonly class ProcessProgressEvent $type = strtolower(ag($backend, 'type', 'unknown')); if (true !== (bool)ag($backend, 'export.enabled')) { - $writer(Level::Notice, "SYSTEM: Export to '{backend}' is disabled by user.", [ + $writer(Level::Notice, "Export to '{backend}' is disabled by user.", [ 'backend' => $backendName ]); continue; } if (!isset($supported[$type])) { - $writer(Level::Error, "SYSTEM: '{backend}' Invalid type.", [ + $writer(Level::Error, "The backend '{backend}' is using invalid type '{type}'.", [ + 'type' => $type, 'backend' => $backendName, 'condition' => [ 'expected' => implode(', ', array_keys($supported)), @@ -90,7 +91,7 @@ final readonly class ProcessProgressEvent } if (null === ($url = ag($backend, 'url')) || false === isValidURL($url)) { - $writer(Level::Error, "SYSTEM: '{backend}' Invalid url.", [ + $writer(Level::Error, "The backend '{backend}' URL is invalid.", [ 'backend' => $backendName, 'url' => $url ?? 'None', ]); @@ -102,7 +103,7 @@ final readonly class ProcessProgressEvent } if (empty($list)) { - $writer(Level::Info, 'SYSTEM: There are no backends with export enabled.'); + $writer(Level::Error, 'There are no backends with export enabled.'); return $e; } @@ -128,7 +129,7 @@ final readonly class ProcessProgressEvent } catch (UnexpectedVersionException|NotImplementedException $e) { $writer( Level::Notice, - "SYSTEM: This feature is not available for '{backend}'. '{error.message}' at '{error.file}:{error.line}'.", + "This feature is not available for '{backend}'. '{error.message}' at '{error.file}:{error.line}'.", [ 'backend' => $name, 'error' => [ @@ -149,7 +150,7 @@ final readonly class ProcessProgressEvent } catch (Throwable $e) { $writer( Level::Error, - "SYSTEM: Exception '{error.kind}' was thrown unhandled during '{backend}' request to sync progress. '{error.message}' at '{error.file}:{error.line}'.", + "Exception '{error.kind}' was thrown unhandled during '{backend}' request to sync progress. '{error.message}' at '{error.file}:{error.line}'.", [ 'backend' => $name, 'error' => [ @@ -172,83 +173,63 @@ final readonly class ProcessProgressEvent unset($backend); - $total = count($this->queue); + if (count($this->queue) < 1) { + $writer(Level::Notice, "Backend handlers didn't queue items to be updated."); + return $e; + } - if ($total >= 1) { - $start = makeDate(); + foreach ($this->queue->getQueue() as $response) { + $context = ag($response->getInfo('user_data'), 'context', []); - $writer(Level::Notice, "SYSTEM: Sending '{total}' progress update requests.", [ - 'total' => $total, - 'time' => [ - 'start' => $start, - ], - ]); - - foreach ($this->queue->getQueue() as $response) { - $context = ag($response->getInfo('user_data'), 'context', []); - - try { - if (ag($options, 'trace')) { - $writer(Level::Debug, "Processing '{backend}' - '{item.title}' response.", [ - 'url' => ag($context, 'remote.url', '??'), - 'status_code' => $response->getStatusCode(), - 'headers' => $response->getHeaders(false), - 'response' => $response->getContent(false), - ...$context - ]); - } - - if (!in_array($response->getStatusCode(), [Status::OK->value, Status::NO_CONTENT->value])) { - $writer( - Level::Error, - "SYSTEM: Request to change '{backend}' '{item.title}' watch progress returned with unexpected '{status_code}' status code.", - [ - 'status_code' => $response->getStatusCode(), - ...$context - ] - ); - continue; - } - - $writer(Level::Notice, "SYSTEM: Updated '{backend}' '{item.title}' watch progress.", [ - ...$context, + try { + if (ag($options, 'trace')) { + $writer(Level::Debug, "Processing '{backend}: {item.title}' response.", [ + 'url' => ag($context, 'remote.url', '??'), 'status_code' => $response->getStatusCode(), + 'headers' => $response->getHeaders(false), + 'response' => $response->getContent(false), + ...$context ]); - } catch (Throwable $e) { + } + + if (!in_array($response->getStatusCode(), [Status::OK->value, Status::NO_CONTENT->value])) { $writer( Level::Error, - "SYSTEM: Exception '{error.kind}' was thrown unhandled during '{backend}' request to change watch progress of {item.type} '{item.title}'. '{error.message}' at '{error.file}:{error.line}'.", + "Request to change '{backend}: {item.title}' watch progress returned with unexpected '{status_code}' status code.", [ - 'error' => [ - 'kind' => $e::class, - 'line' => $e->getLine(), - 'message' => $e->getMessage(), - 'file' => after($e->getFile(), ROOT_PATH), - ], - 'exception' => [ - 'file' => $e->getFile(), - 'line' => $e->getLine(), - 'kind' => get_class($e), - 'message' => $e->getMessage(), - 'trace' => $e->getTrace(), - ], - ...$context, + 'status_code' => $response->getStatusCode(), + ...$context ] ); + continue; } - } - $end = makeDate(); - $writer(Level::Notice, "SYSTEM: Sent '{total}' watch progress requests.", [ - 'total' => $total, - 'time' => [ - 'start' => $start, - 'end' => $end, - 'duration' => $end->getTimestamp() - $start->getTimestamp(), - ], - ]); - } else { - $writer(Level::Notice, 'SYSTEM: No watch progress changes detected.'); + $writer(Level::Notice, "Updated '{backend}: {item.title}' watch progress.", [ + ...$context, + 'status_code' => $response->getStatusCode(), + ]); + } catch (Throwable $e) { + $writer( + Level::Error, + "Exception '{error.kind}' was thrown unhandled during '{backend}' request to change watch progress of {item.type} '{item.title}'. '{error.message}' at '{error.file}:{error.line}'.", + [ + 'error' => [ + 'kind' => $e::class, + 'line' => $e->getLine(), + 'message' => $e->getMessage(), + 'file' => after($e->getFile(), ROOT_PATH), + ], + 'exception' => [ + 'file' => $e->getFile(), + 'line' => $e->getLine(), + 'kind' => get_class($e), + 'message' => $e->getMessage(), + 'trace' => $e->getTrace(), + ], + ...$context, + ] + ); + } } return $e; diff --git a/src/Listeners/ProcessPushEvent.php b/src/Listeners/ProcessPushEvent.php new file mode 100644 index 00000000..b766c8e1 --- /dev/null +++ b/src/Listeners/ProcessPushEvent.php @@ -0,0 +1,183 @@ +addLog($level->getName() . ': ' . r($message, $context)); + $this->logger->log($level, $message, $context); + }; + + $e->stopPropagation(); + + if (null === ($item = $this->db->get(Container::get(iState::class)::fromArray($e->getData())))) { + $writer(Level::Error, "Item '{id}' is not found or has been deleted.", [ + 'id' => ag($e->getData(), 'id', '?') + ]); + return $e; + } + + $options = $e->getOptions(); + $list = []; + $supported = Config::get('supported', []); + + foreach ((array)Config::get('servers', []) as $backendName => $backend) { + $type = strtolower(ag($backend, 'type', 'unknown')); + + if (true !== (bool)ag($backend, 'export.enabled')) { + $writer(Level::Notice, "Export to '{backend}' is disabled by user.", [ + 'backend' => $backendName + ]); + continue; + } + + if (!isset($supported[$type])) { + $writer(Level::Error, "The backend '{backend}' is using invalid type '{type}'.", [ + 'type' => $type, + 'backend' => $backendName, + 'condition' => [ + 'expected' => implode(', ', array_keys($supported)), + 'given' => $type, + ], + ]); + continue; + } + + if (null === ($url = ag($backend, 'url')) || false === isValidURL($url)) { + $writer(Level::Error, "The backend '{backend}' URL is invalid.", [ + 'backend' => $backendName, + 'url' => $url ?? 'None', + ]); + continue; + } + + $backend['name'] = $backendName; + $list[$backendName] = $backend; + } + + if (empty($list)) { + $writer(Level::Error, 'There are no backends with export enabled.'); + return $e; + } + + foreach ($list as $name => &$backend) { + try { + $opts = ag($backend, 'options', []); + + if (ag($options, 'ignore-date')) { + $opts[Options::IGNORE_DATE] = true; + } + + if (ag($options, 'dry-run')) { + $opts[Options::DRY_RUN] = true; + } + + if (ag($options, 'trace')) { + $opts[Options::DEBUG_TRACE] = true; + } + + $backend['options'] = $opts; + $backend['class'] = getBackend(name: $name, config: $backend); + $backend['class']->push(entities: [$item->id => $item], queue: $this->queue); + } catch (Throwable $e) { + $writer( + Level::Error, + "Exception '{error.kind}' was thrown unhandled during '{backend}' push events. '{error.message}' at '{error.file}:{error.line}'.", + [ + 'backend' => $name, + 'error' => [ + 'kind' => $e::class, + 'line' => $e->getLine(), + 'message' => $e->getMessage(), + 'file' => after($e->getFile(), ROOT_PATH), + ], + 'exception' => [ + 'file' => $e->getFile(), + 'line' => $e->getLine(), + 'kind' => get_class($e), + 'message' => $e->getMessage(), + 'trace' => $e->getTrace(), + ], + ] + ); + } + } + unset($backend); + + if (count($this->queue) < 1) { + $writer(Level::Notice, 'SYSTEM: No play state changes detected.'); + return $e; + } + + foreach ($this->queue->getQueue() as $response) { + $context = ag($response->getInfo('user_data'), 'context', []); + + try { + if (Status::OK !== Status::from($response->getStatusCode())) { + $writer( + Level::Error, + "Request to change '{backend}: {item.title}' play state returned with unexpected '{status_code}' status code.", + $context + ); + continue; + } + + $writer(Level::Notice, "Marked '{backend}: {item.title}' as '{play_state}'.", $context); + } catch (Throwable $e) { + $writer( + Level::Error, + "Exception '{error.kind}' was thrown unhandled during '{backend}' request to change play state of {item.type} '{item.title}'. '{error.message}' at '{error.file}:{error.line}'.", + [ + 'error' => [ + 'kind' => $e::class, + 'line' => $e->getLine(), + 'message' => $e->getMessage(), + 'file' => after($e->getFile(), ROOT_PATH), + ], + ...$context, + 'exception' => [ + 'file' => $e->getFile(), + 'line' => $e->getLine(), + 'kind' => get_class($e), + 'message' => $e->getMessage(), + 'trace' => $e->getTrace(), + ], + ] + ); + } + } + return $e; + } +} diff --git a/src/Listeners/ProcessRequestEvent.php b/src/Listeners/ProcessRequestEvent.php index 95f4b9bc..52ea0317 100644 --- a/src/Listeners/ProcessRequestEvent.php +++ b/src/Listeners/ProcessRequestEvent.php @@ -41,7 +41,7 @@ final readonly class ProcessRequestEvent $lastSync = makeDate($lastSync); } - $message = r('SYSTEM: Processing [{backend}] [{title}] {tainted} request.', [ + $message = r("Processing '{backend}: {title}' {tainted} request.", [ 'backend' => $entity->via, 'title' => $entity->getName(), 'event' => ag($entity->getExtra($entity->via), iState::COLUMN_EXTRA_EVENT, '??'), diff --git a/src/Model/Events/EventsRepository.php b/src/Model/Events/EventsRepository.php index 7d767185..b61d9b5e 100644 --- a/src/Model/Events/EventsRepository.php +++ b/src/Model/Events/EventsRepository.php @@ -49,6 +49,23 @@ final class EventsRepository return $items[0] ?? null; } + /** + * Will return the last event by reference. + * + * @param string|int $reference reference id to remove. + * @param array $criteria Filter criteria. By default, it will only remove pending events. + */ + public function removeByReference(string|int $reference, array $criteria = []): bool + { + if (empty($criteria)) { + $criteria[EntityTable::COLUMN_STATUS] = EventStatus::PENDING->value; + } + + $criteria[EntityTable::COLUMN_REFERENCE] = $reference; + + return $this->_remove($criteria); + } + /** * @param array $criteria Criteria to search by. * @param array $cols Columns to select.