diff --git a/FAQ.md b/FAQ.md index ece86b14..30693d39 100644 --- a/FAQ.md +++ b/FAQ.md @@ -402,7 +402,7 @@ command via CLI. > [!IMPORTANT] > for environment variables that has `{TASK}` tag, you **MUST** replace it with one -> of `IMPORT`, `EXPORT`, `PUSH`, `BACKUP`, `PRUNE`, `INDEXES`, `REQUESTS`. To see tasks active settings run +> of `IMPORT`, `EXPORT`, `PUSH`, `BACKUP`, `PRUNE`, `INDEXES`. To see tasks active settings run ```bash $ docker exec -ti watchstate console system:tasks diff --git a/README.md b/README.md index 3526a959..66a8fa34 100644 --- a/README.md +++ b/README.md @@ -9,6 +9,14 @@ out of the box, this tool support `Jellyfin`, `Plex` and `Emby` media servers. ## Updates +### 2024-08-18 + +We have started migrating the old events system to a new one, so far we have migrated the `progress` and `requests` to it. As such, +The old tasks `state:progress` and `state:requests` are now gone. To control if you want to enable the watch progress, there is new +environment variable `WS_SYNC_PROGRESS` which you can set to `true` to enable the watch progress. It's disabled by default. + +We will continue to migrate the rest of the events to the new system, and we will keep you updated. + ### 2024-08-10 I have recently added new experimental feature, to play your content directly from the WebUI. This feature is still in diff --git a/bin/console b/bin/console index c90901f4..da279ff4 100755 --- a/bin/console +++ b/bin/console @@ -63,6 +63,7 @@ try { ] ); fwrite(STDERR, $message . PHP_EOL); + fwrite(STDERR, $e->getTraceAsString() . PHP_EOL); exit(503); } diff --git a/config/config.php b/config/config.php index 5ed97dab..0ef1eb14 100644 --- a/config/config.php +++ b/config/config.php @@ -9,9 +9,7 @@ use App\Commands\Events\DispatchCommand; use App\Commands\State\BackupCommand; use App\Commands\State\ExportCommand; use App\Commands\State\ImportCommand; -use App\Commands\State\ProgressCommand; use App\Commands\State\PushCommand; -use App\Commands\State\RequestsCommand; use App\Commands\System\IndexCommand; use App\Commands\System\PruneCommand; use App\Libs\Mappers\Import\MemoryMapper; @@ -76,6 +74,9 @@ return (function () { 'proxy' => (bool)env('WS_TRUST_PROXY', false), 'header' => (string)env('WS_TRUST_HEADER', 'X-Forwarded-For'), ], + 'sync' => [ + 'progress' => (bool)env('WS_SYNC_PROGRESS', false), + ], ]; $config['backends_file'] = fixPath(env('WS_BACKENDS_FILE', ag($config, 'path') . '/config/servers.yaml')); @@ -273,14 +274,6 @@ return (function () { 'timer' => $checkTaskTimer((string)env('WS_CRON_PUSH_AT', '*/10 * * * *'), '*/10 * * * *'), 'args' => env('WS_CRON_PUSH_ARGS', '-v'), ], - ProgressCommand::TASK_NAME => [ - 'command' => ProgressCommand::ROUTE, - 'name' => ProgressCommand::TASK_NAME, - 'info' => 'Send play progress to backends.', - 'enabled' => (bool)env('WS_CRON_PROGRESS', false), - 'timer' => $checkTaskTimer((string)env('WS_CRON_PROGRESS_AT', '*/45 * * * *'), '*/45 * * * *'), - 'args' => env('WS_CRON_PROGRESS_ARGS', '-v'), - ], BackupCommand::TASK_NAME => [ 'command' => BackupCommand::ROUTE, 'name' => BackupCommand::TASK_NAME, @@ -305,14 +298,6 @@ return (function () { 'timer' => $checkTaskTimer((string)env('WS_CRON_INDEXES_AT', '0 3 * * 3'), '0 3 * * 3'), 'args' => env('WS_CRON_INDEXES_ARGS', '-v'), ], - RequestsCommand::TASK_NAME => [ - 'command' => RequestsCommand::ROUTE, - 'name' => RequestsCommand::TASK_NAME, - 'info' => 'Process queued http requests.', - 'enabled' => (bool)env('WS_CRON_REQUESTS', true), - 'timer' => $checkTaskTimer((string)env('WS_CRON_REQUESTS_AT', '*/2 * * * *'), '*/2 * * * *'), - 'args' => env('WS_CRON_REQUESTS_ARGS', '-v --no-stats'), - ], DispatchCommand::TASK_NAME => [ 'command' => DispatchCommand::ROUTE, 'name' => DispatchCommand::TASK_NAME, diff --git a/config/env.spec.php b/config/env.spec.php index 936051f4..e7b0fc8f 100644 --- a/config/env.spec.php +++ b/config/env.spec.php @@ -161,6 +161,11 @@ return (function () { 'description' => 'All executing all commands in the console. They must be prefixed with $', 'type' => 'bool', ], + [ + 'key' => 'WS_SYNC_PROGRESS', + 'description' => 'Enable watch progress sync.', + 'type' => 'bool', + ], ]; $validateCronExpression = function (string $value): string { @@ -186,7 +191,7 @@ return (function () { }; // -- Do not forget to update the tasks list if you add a new task. - $tasks = ['import', 'export', 'push', 'progress', 'backup', 'prune', 'indexes', 'requests']; + $tasks = ['import', 'export', 'push', 'backup', 'prune', 'indexes']; $task_env = [ [ 'key' => 'WS_CRON_{TASK}', diff --git a/frontend/components/Pager.vue b/frontend/components/Pager.vue new file mode 100644 index 00000000..e6b49bb9 --- /dev/null +++ b/frontend/components/Pager.vue @@ -0,0 +1,70 @@ + + + diff --git a/frontend/layouts/default.vue b/frontend/layouts/default.vue index 77eee395..a028be0d 100644 --- a/frontend/layouts/default.vue +++ b/frontend/layouts/default.vue @@ -79,10 +79,15 @@ - + Events + + + Old Events + + Ignore List diff --git a/frontend/pages/events/index.vue b/frontend/pages/events/index.vue new file mode 100644 index 00000000..03b42aa6 --- /dev/null +++ b/frontend/pages/events/index.vue @@ -0,0 +1,321 @@ + + + diff --git a/frontend/pages/events/view.vue b/frontend/pages/events/view.vue new file mode 100644 index 00000000..d9105d7f --- /dev/null +++ b/frontend/pages/events/view.vue @@ -0,0 +1,210 @@ + + + diff --git a/frontend/pages/events.vue b/frontend/pages/old_events.vue similarity index 73% rename from frontend/pages/events.vue rename to frontend/pages/old_events.vue index 6f0ca308..d3c518ef 100644 --- a/frontend/pages/events.vue +++ b/frontend/pages/old_events.vue @@ -4,7 +4,7 @@
- Events + Legacy Events
@@ -18,11 +18,13 @@
This page will show events that are queued to be handled or sent to the backends. + This endpoint is being deprecated and will be removed in the future, We are migrating to the new + events endpoint.
-
+
-
-
- - - Request events - -
- Events from backends. Consumed by state:requests task. -
-
- -
-
-
-

- -   - - - -

- - - -
-
-
-
-
-   - -
-
-
-
-   - -
-
-
-
- - is Tainted: {{ i.item?.isTainted ? 'Yes' : 'No' }} -
-
-
-
- - {{ formatDuration(i.item.progress) }} -
-
-
-
- -
-
-
-
{ isLoading.value = true queue.value = [] progress.value = [] - requests.value = [] - const response = await request(`/system/events`) + const response = await request(`/system/old_events`) let json try { @@ -340,7 +258,6 @@ const loadContent = async () => { queue.value = json?.queue progress.value = json?.progress - requests.value = json?.requests } catch (e) { return notification('error', 'Error', e.message) } finally { @@ -354,7 +271,7 @@ const deleteItem = async (item, type, key) => { } try { - const response = await request(`/system/events/0`, { + const response = await request(`/system/old_events/0`, { method: 'DELETE', body: JSON.stringify({type: type, id: key}) }) @@ -386,9 +303,6 @@ const deleteItem = async (item, type, key) => { case 'progress': progress.value = progress.value.filter(i => i.key !== key) break - case 'requests': - requests.value = requests.value.filter(i => i.key !== key) - break } } catch (e) { diff --git a/frontend/utils/events/helpers.js b/frontend/utils/events/helpers.js new file mode 100644 index 00000000..0cca5faf --- /dev/null +++ b/frontend/utils/events/helpers.js @@ -0,0 +1,20 @@ +const makeName = id => id.split('-').slice(0)[0] + +const getStatusClass = status => { + switch (status) { + case 0: + return 'is-light' + case 1: + return 'is-warning' + case 2: + return 'is-success' + case 3: + return 'is-danger' + case 4: + return 'is-danger is-light' + default: + return 'is-light' + } +} + +export {makeName, getStatusClass} diff --git a/migrations/sqlite_1723988129_add-reference-to-events.sql b/migrations/sqlite_1723988129_add-reference-to-events.sql new file mode 100644 index 00000000..5332514e --- /dev/null +++ b/migrations/sqlite_1723988129_add-reference-to-events.sql @@ -0,0 +1,73 @@ +-- # migrate_up + +CREATE TABLE "_tmp_events" +( + "id" text NOT NULL, + "status" integer NOT NULL DEFAULT '0', + "reference" text NULL, + "event" text NOT NULL, + "event_data" text NOT NULL DEFAULT '{}', + "options" text NOT NULL DEFAULT '{}', + "attempts" integer NOT NULL DEFAULT '0', + "logs" text NOT NULL DEFAULT '{}', + "created_at" numeric NOT NULL, + "updated_at" numeric NULL, + PRIMARY KEY ("id") +); + +INSERT INTO "_tmp_events" ("id", "status", "event", "event_data", "options", "attempts", "logs", "created_at", + "updated_at") +SELECT "id", + "status", + "event", + "event_data", + "options", + "attempts", + "logs", + "created_at", + "updated_at" +FROM "events"; + +DROP TABLE "events"; +ALTER TABLE "_tmp_events" + RENAME TO "events"; +CREATE INDEX "events_event" ON "events" ("event"); +CREATE INDEX "events_status" ON "events" ("status"); +CREATE INDEX "events_reference" ON "events" ("reference"); + +-- # migrate_down + +CREATE TABLE "_tmp_events" +( + "id" text NOT NULL, + "status" integer NOT NULL DEFAULT '0', + "event" text NOT NULL, + "event_data" text NOT NULL DEFAULT '{}', + "options" text NOT NULL DEFAULT '{}', + "attempts" integer NOT NULL DEFAULT '0', + "logs" text NOT NULL DEFAULT '{}', + "created_at" numeric NOT NULL, + "updated_at" numeric NULL, + PRIMARY KEY ("id") +); + +INSERT INTO "_tmp_events" ("id", "status", "event", "event_data", "options", "attempts", "logs", "created_at", + "updated_at") +SELECT "id", + "status", + "event", + "event_data", + "options", + "attempts", + "logs", + "created_at", + "updated_at" +FROM "events"; + +DROP TABLE "events"; +ALTER TABLE "_tmp_events" + RENAME TO "events"; +CREATE INDEX "events_event" ON "events" ("event"); +CREATE INDEX "events_status" ON "events" ("status"); + +-- put your downgrade database commands here. diff --git a/src/API/Backend/Webhooks.php b/src/API/Backend/Webhooks.php index ead7cce4..46b0753f 100644 --- a/src/API/Backend/Webhooks.php +++ b/src/API/Backend/Webhooks.php @@ -14,7 +14,8 @@ use App\Libs\LogSuppressor; use App\Libs\Options; use App\Libs\Traits\APITraits; use App\Libs\Uri; -use DateInterval; +use App\Listeners\ProcessRequestEvent; +use App\Model\Events\EventsTable; use Monolog\Handler\StreamHandler; use Monolog\Level; use Monolog\Logger; @@ -190,8 +191,6 @@ final class Webhooks return api_response(Status::NOT_MODIFIED); } - $items = $this->cache->get('requests', []); - $itemId = r('{type}://{id}:{tainted}@{backend}', [ 'type' => $entity->type, 'backend' => $entity->via, @@ -199,21 +198,13 @@ final class Webhooks 'id' => ag($entity->getMetadata($entity->via), iState::COLUMN_ID, '??'), ]); - $items[$itemId] = [ - 'options' => [ + queueEvent(ProcessRequestEvent::NAME, $entity->getAll(), [ + 'unique' => true, + EventsTable::COLUMN_REFERENCE => $itemId, + EventsTable::COLUMN_OPTIONS => [ Options::IMPORT_METADATA_ONLY => $metadataOnly, - ], - 'entity' => $entity, - ]; - - $this->cache->set('requests', $items, new DateInterval('P3D')); - - $pEnabled = (bool)env('WS_CRON_PROGRESS', false); - if ($pEnabled && false === $metadataOnly && true === $entity->hasPlayProgress() && !$entity->isWatched()) { - $progress = $this->cache->get('progress', []); - $progress[str_replace($itemId, ':tainted@', ':untainted@')] = $entity; - $this->cache->set('progress', $progress, new DateInterval('P3D')); - } + ] + ]); $this->write($request, Level::Info, 'Queued [{backend}: {event}] {item.type} [{item.title}].', [ 'backend' => $entity->via, @@ -224,7 +215,7 @@ final class Webhooks 'type' => $entity->type, 'played' => $entity->isWatched() ? 'Yes' : 'No', 'queue_id' => $itemId, - 'progress' => $pEnabled && $entity->hasPlayProgress() ? $entity->getPlayProgress() : null, + 'progress' => $entity->hasPlayProgress() ? $entity->getPlayProgress() : null, ] ] ); diff --git a/src/API/System/Events.php b/src/API/System/Events.php index 4d43e57f..ecb1e1e8 100644 --- a/src/API/System/Events.php +++ b/src/API/System/Events.php @@ -6,114 +6,174 @@ namespace App\API\System; use App\Libs\Attributes\Route\Delete; use App\Libs\Attributes\Route\Get; -use App\Libs\Container; -use App\Libs\Database\DatabaseInterface as iDB; +use App\Libs\Attributes\Route\Patch; +use App\Libs\Attributes\Route\Post; use App\Libs\DataUtil; -use App\Libs\Entity\StateInterface as iState; use App\Libs\Enums\Http\Status; -use App\Libs\Traits\APITraits; -use DateInterval; +use App\Model\Events\Event as EntityItem; +use App\Model\Events\EventsRepository; +use App\Model\Events\EventsTable as EntityTable; +use App\Model\Events\EventStatus; use Psr\Http\Message\ResponseInterface as iResponse; use Psr\Http\Message\ServerRequestInterface as iRequest; -use Psr\SimpleCache\CacheInterface as iCache; -use Psr\SimpleCache\InvalidArgumentException; -final class Events +final readonly class Events { - use APITraits; - public const string URL = '%{api.prefix}/system/events'; - private const array TYPES = ['queue', 'progress', 'requests']; + public const int PERPAGE = 10; - public function __construct(private iCache $cache, private iDB $db) + public function __construct(private EventsRepository $repo) { } - /** - * @throws InvalidArgumentException - */ - #[Get(self::URL . '[/]', name: 'system.events')] - public function __invoke(iRequest $request): iResponse + #[Get(pattern: self::URL . '[/]')] + public function list(iRequest $request): iResponse { - $response = [ - 'queue' => [], - 'progress' => [], - 'requests' => [], + [$page, $perpage, $start] = getPagination($request, 1, self::PERPAGE); + + $arrParams = []; + + $this->repo->setPerpage($perpage)->setStart($start)->setDescendingOrder(); + + $entities = $this->repo->findAll($arrParams, [ + EntityTable::COLUMN_ID, + EntityTable::COLUMN_EVENT, + EntityTable::COLUMN_STATUS, + EntityTable::COLUMN_EVENT_DATA, + EntityTable::COLUMN_OPTIONS, + EntityTable::COLUMN_ATTEMPTS, + EntityTable::COLUMN_CREATED_AT, + EntityTable::COLUMN_UPDATED_AT, + ]); + + $total = $this->repo->getTotal(); + + $arr = [ + 'paging' => [ + 'page' => $page, + 'total' => $total, + 'perpage' => $perpage, + 'next' => $page < @ceil($total / $perpage) ? $page + 1 : null, + 'previous' => !empty($entities) && $page > 1 ? $page - 1 : null + ], + 'items' => [], + 'statuses' => [], ]; - foreach ($this->cache->get('queue', []) as $key => $item) { - if (null === ($entity = $this->db->get(Container::get(iState::class)::fromArray($item)))) { - continue; - } - $response['queue'][] = ['key' => $key, 'item' => $this->formatEntity($entity)]; + foreach (EventStatus::cases() as $status) { + $arr['statuses'][] = [ + 'id' => $status->value, + 'name' => ucfirst(strtolower($status->name)), + ]; } - foreach ($this->cache->get('progress', []) as $key => $item) { - if (null !== ($entity = $this->db->get($item))) { - $item->id = $entity->id; - } - - $response['progress'][] = ['key' => $key, 'item' => $this->formatEntity($item)]; + foreach ($entities as $entity) { + $arr['items'][] = $this->formatEntity($entity); } - foreach ($this->cache->get('requests', []) as $key => $request) { - if (null === ($item = ag($request, 'entity')) || false === ($item instanceof iState)) { - continue; - } - - if (null !== ($entity = $this->db->get($item))) { - $item->id = $entity->id; - } - - $response['requests'][] = ['key' => $key, 'item' => $this->formatEntity($item)]; - } - - return api_response(Status::OK, $response); + return api_response(Status::OK, $arr); } - /** - * @throws InvalidArgumentException - */ - #[Delete(self::URL . '/{id}[/]', name: 'system.events.delete')] - public function deleteEvent(iRequest $request, array $args = []): iResponse + #[Post(pattern: self::URL . '[/]')] + public function create(iRequest $request): iResponse { - $params = DataUtil::fromRequest($request, true); + $params = DataUtil::fromRequest($request); - if (null === ($id = $params->get('id', ag($args, 'id')))) { - return api_error('Invalid id.', Status::BAD_REQUEST); + if (null === ($event = $params->get(EntityTable::COLUMN_EVENT))) { + return api_error('No event name was given.', Status::BAD_REQUEST, [ + ...$params->getAll() + ]); } - $type = $params->get('type', 'queue'); + $data = (array)$params->get(EntityTable::COLUMN_EVENT_DATA, []); + $item = queueEvent($event, $data, [EventsRepository::class => $this->repo]); - if (false === in_array($type, self::TYPES, true)) { - return api_error(r("Invalid type '{type}'. Only '{types}' are supported.", [ - 'type' => $type, - 'types' => implode(", ", self::TYPES), - ]), Status::BAD_REQUEST); + return api_message(r("Event '{event}' was queued.", [ + 'event' => $item->event, + ]), Status::ACCEPTED, $this->formatEntity($item)); + } + + #[Get(pattern: self::URL . '/{id:uuid}[/]')] + public function read(string $id): iResponse + { + if (null === ($entity = $this->repo->findById($id))) { + return api_error('Item does not exists', Status::NOT_FOUND); } - $items = $this->cache->get($type, []); + return api_response(Status::OK, $this->formatEntity($entity)); + } - if (empty($items)) { - return api_error(r('{type} is empty.', ['type' => $type]), Status::NOT_FOUND); + #[Delete(pattern: self::URL . '/{id:uuid}[/]')] + public function delete(string $id): iResponse + { + if (null === ($entity = $this->repo->findById($id))) { + return api_error('Item does not exists', Status::NOT_FOUND); } - if (false === array_key_exists($id, $items)) { - return api_error(r("Record id '{id}' doesn't exists. for '{type}' list.", [ - 'id' => $id, - 'type' => $type, - ]), Status::NOT_FOUND); + if (EventStatus::RUNNING === $entity->status) { + return api_error('Cannot delete event that is in running state.', Status::BAD_REQUEST); } - if ('queue' === $type) { - $item = Container::get(iState::class)::fromArray(['id' => $id]); - queuePush($item, remove: true); - } else { - unset($items[$id]); - $this->cache->set($type, $items, new DateInterval('P3D')); + $this->repo->remove($entity); + + return api_response(Status::OK, $this->formatEntity($entity)); + } + + #[Patch(pattern: self::URL . '/{id:uuid}[/]')] + public function update(iRequest $request, string $id): iResponse + { + if (null === ($entity = $this->repo->findById($id))) { + return api_error('Item does not exists', Status::NOT_FOUND); } - return api_response(Status::OK, ['id' => $id, 'type' => $type, 'status' => 'deleted']); + if (EventStatus::RUNNING === $entity->status) { + return api_error('Cannot update event in running state.', Status::BAD_REQUEST); + } + + $params = DataUtil::fromRequest($request); + + // -- Update State. + if (null !== ($status = $params->get(EntityTable::COLUMN_STATUS))) { + if (false === is_int($status) && false === ctype_digit($status)) { + return api_error('status parameter must be a number.', Status::BAD_REQUEST); + } + + if (null == ($status = EventStatus::tryFrom((int)$status))) { + return api_error('Invalid status parameter was given.', Status::BAD_REQUEST); + } + + $entity->status = $status; + } + + if (null !== ($event = $params->get(EntityTable::COLUMN_EVENT))) { + $entity->event = $event; + } + + if (null !== ($event_data = $params->get(EntityTable::COLUMN_EVENT_DATA))) { + $entity->event_data = $event_data; + } + + if (true === (bool)$params->get('reset_logs', false)) { + $entity->logs = []; + } + + $changed = !empty($entity->diff()); + + if ($changed) { + $entity->updated_at = (string)makeDate(); + $entity->logs[] = 'Event was manually updated'; + $this->repo->save($entity); + } + + return api_message($changed ? 'Updated' : 'No Changes detected', Status::OK, $this->formatEntity($entity)); + } + + private function formatEntity(EntityItem $entity): array + { + $data = $entity->getAll(); + $data['status_name'] = $entity->getStatusText(); + + return $data; } } diff --git a/src/API/System/OldEvents.php b/src/API/System/OldEvents.php new file mode 100644 index 00000000..f971e09c --- /dev/null +++ b/src/API/System/OldEvents.php @@ -0,0 +1,119 @@ + [], + 'progress' => [], + 'requests' => [], + ]; + + foreach ($this->cache->get('queue', []) as $key => $item) { + if (null === ($entity = $this->db->get(Container::get(iState::class)::fromArray($item)))) { + continue; + } + $response['queue'][] = ['key' => $key, 'item' => $this->formatEntity($entity)]; + } + + foreach ($this->cache->get('progress', []) as $key => $item) { + if (null !== ($entity = $this->db->get($item))) { + $item->id = $entity->id; + } + + $response['progress'][] = ['key' => $key, 'item' => $this->formatEntity($item)]; + } + + foreach ($this->cache->get('requests', []) as $key => $request) { + if (null === ($item = ag($request, 'entity')) || false === ($item instanceof iState)) { + continue; + } + + if (null !== ($entity = $this->db->get($item))) { + $item->id = $entity->id; + } + + $response['requests'][] = ['key' => $key, 'item' => $this->formatEntity($item)]; + } + + return api_response(Status::OK, $response); + } + + /** + * @throws InvalidArgumentException + */ + #[Delete(self::URL . '/{id}[/]', name: 'system.events.delete')] + public function deleteEvent(iRequest $request, array $args = []): iResponse + { + $params = DataUtil::fromRequest($request, true); + + if (null === ($id = $params->get('id', ag($args, 'id')))) { + return api_error('Invalid id.', Status::BAD_REQUEST); + } + + $type = $params->get('type', 'queue'); + + if (false === in_array($type, self::TYPES, true)) { + return api_error(r("Invalid type '{type}'. Only '{types}' are supported.", [ + 'type' => $type, + 'types' => implode(", ", self::TYPES), + ]), Status::BAD_REQUEST); + } + + $items = $this->cache->get($type, []); + + if (empty($items)) { + return api_error(r('{type} is empty.', ['type' => $type]), Status::NOT_FOUND); + } + + if (false === array_key_exists($id, $items)) { + return api_error(r("Record id '{id}' doesn't exists. for '{type}' list.", [ + 'id' => $id, + 'type' => $type, + ]), Status::NOT_FOUND); + } + + if ('queue' === $type) { + $item = Container::get(iState::class)::fromArray(['id' => $id]); + queuePush($item, remove: true); + } else { + unset($items[$id]); + $this->cache->set($type, $items, new DateInterval('P3D')); + } + + return api_response(Status::OK, ['id' => $id, 'type' => $type, 'status' => 'deleted']); + } +} diff --git a/src/Backends/Common/ClientInterface.php b/src/Backends/Common/ClientInterface.php index 4a3644f7..1a445007 100644 --- a/src/Backends/Common/ClientInterface.php +++ b/src/Backends/Common/ClientInterface.php @@ -6,6 +6,8 @@ namespace App\Backends\Common; use App\Libs\Entity\StateInterface as iState; use App\Libs\Exceptions\Backends\InvalidContextException; +use App\Libs\Exceptions\Backends\NotImplementedException; +use App\Libs\Exceptions\Backends\UnexpectedVersionException; use App\Libs\Mappers\ImportInterface as iImport; use App\Libs\QueueRequests; use DateTimeInterface as iDate; @@ -128,6 +130,8 @@ interface ClientInterface * @param iDate|null $after only push items after this date. * * @return array empty array. The data is pushed to the queue. + * @throws NotImplementedException is thrown if the backend does not support this feature. + * @throws UnexpectedVersionException is thrown if the backend version does not support this feature. */ public function progress(array $entities, QueueRequests $queue, iDate|null $after = null): array; diff --git a/src/Backends/Common/Context.php b/src/Backends/Common/Context.php index aad81c09..6b3be242 100644 --- a/src/Backends/Common/Context.php +++ b/src/Backends/Common/Context.php @@ -4,11 +4,15 @@ declare(strict_types=1); namespace App\Backends\Common; +use App\Libs\Container; use App\Libs\Options; use Psr\Http\Message\UriInterface; +use Psr\Log\LoggerInterface as iLogger; -readonly class Context +final class Context { + protected iLogger|null $logger = null; + /** * Make backend context for classes to work with. * @@ -24,16 +28,16 @@ readonly class Context * @param array $options optional options. */ public function __construct( - public string $clientName, - public string $backendName, - public UriInterface $backendUrl, - public Cache $cache, - public string|int|null $backendId = null, - public string|int|null $backendToken = null, - public string|int|null $backendUser = null, - public array $backendHeaders = [], - public bool $trace = false, - public array $options = [] + public readonly string $clientName, + public readonly string $backendName, + public readonly UriInterface $backendUrl, + public readonly Cache $cache, + public readonly string|int|null $backendId = null, + public readonly string|int|null $backendToken = null, + public readonly string|int|null $backendUser = null, + public readonly array $backendHeaders = [], + public readonly bool $trace = false, + public readonly array $options = [] ) { } @@ -49,4 +53,22 @@ readonly class Context $status = true === (bool)ag($this->options, Options::IS_LIMITED_TOKEN, false); return true === $withUser ? $status && null !== $this->backendUser : $status; } + + public function hasLogger(): bool + { + return null !== $this->logger; + } + + public function withLogger(iLogger $logger): self + { + $clone = clone $this; + $clone->logger = $logger; + + return $clone; + } + + public function getLogger(): iLogger + { + return $this->logger ?? Container::get(iLogger::class); + } } diff --git a/src/Backends/Emby/EmbyClient.php b/src/Backends/Emby/EmbyClient.php index e90b13d1..034de9b2 100644 --- a/src/Backends/Emby/EmbyClient.php +++ b/src/Backends/Emby/EmbyClient.php @@ -145,6 +145,10 @@ class EmbyClient implements iClient ), ]) ); + + if ($context->hasLogger()) { + $cloned->context = $cloned->context->withLogger($context->getLogger()); + } $cloned->guid = $cloned->guid->withContext($cloned->context); diff --git a/src/Backends/Jellyfin/JellyfinClient.php b/src/Backends/Jellyfin/JellyfinClient.php index deff658a..736b58ae 100644 --- a/src/Backends/Jellyfin/JellyfinClient.php +++ b/src/Backends/Jellyfin/JellyfinClient.php @@ -163,6 +163,10 @@ class JellyfinClient implements iClient ]) ); + if ($context->hasLogger()) { + $cloned->context = $cloned->context->withLogger($context->getLogger()); + } + $cloned->guid = $cloned->guid->withContext($cloned->context); return $cloned; diff --git a/src/Backends/Plex/PlexClient.php b/src/Backends/Plex/PlexClient.php index 35d8c528..931e7648 100644 --- a/src/Backends/Plex/PlexClient.php +++ b/src/Backends/Plex/PlexClient.php @@ -162,6 +162,10 @@ class PlexClient implements iClient ]) ); + if ($context->hasLogger()) { + $cloned->context = $cloned->context->withLogger($context->getLogger()); + } + $cloned->guid = $cloned->guid->withContext($cloned->context); return $cloned; diff --git a/src/Commands/Events/DispatchCommand.php b/src/Commands/Events/DispatchCommand.php index c403ff7b..9d136f1e 100644 --- a/src/Commands/Events/DispatchCommand.php +++ b/src/Commands/Events/DispatchCommand.php @@ -76,6 +76,24 @@ final class DispatchCommand extends Command assert($this->dispatcher instanceof EventDispatcher); foreach ($events as $event) { + if (null === ($newState = $this->repo->findById($event->id))) { + $this->logger->notice("The event '{id}' was deleted while the dispatcher was running", [ + 'id' => $event->id + ]); + continue; + } + + if ($newState->status !== Status::PENDING) { + $this->logger->notice( + "The event '{id}' was changed to '{status}' while the dispatcher was running. Ignoring event.", + [ + 'id' => $event->id, + 'status' => $newState->status->name, + ] + ); + continue; + } + $this->runEvent($event); } diff --git a/src/Commands/State/ProgressCommand.php b/src/Commands/State/ProgressCommand.php deleted file mode 100644 index c7ce2aa0..00000000 --- a/src/Commands/State/ProgressCommand.php +++ /dev/null @@ -1,387 +0,0 @@ -setName(self::ROUTE) - ->setDescription('Push queued watch progress.') - ->addOption('keep', 'k', InputOption::VALUE_NONE, 'Do not expunge queue after run is complete.') - ->addOption('dry-run', null, InputOption::VALUE_NONE, 'Do not commit changes to backends.') - ->addOption('list', 'l', InputOption::VALUE_NONE, 'List queued items.') - ->addOption('ignore-date', null, InputOption::VALUE_NONE, 'Ignore date comparison.') - ->setHelp( - r( - <<user watch progress to export enabled backends. - - You should not run this manually and instead rely on scheduled task to run this command. - - This command require the metadata to be already saved in database. - If no metadata available for a backend, then watch progress update won't be sent to that backend. - HELP, - [ - 'cmd' => trim(commandContext()), - 'route' => self::ROUTE, - ] - ) - ); - } - - /** - * Make sure the command is not running in parallel. - * - * @param InputInterface $input - * @param OutputInterface $output - * @return int - * @throws \Psr\Cache\InvalidArgumentException if the cache key is not a legal value - */ - protected function runCommand(InputInterface $input, OutputInterface $output): int - { - return $this->single(fn(): int => $this->process($input, $output), $output); - } - - /** - * Run the command. - * - * @param InputInterface $input The input interface. - * @param OutputInterface $output The output interface. - * @return int Returns the status code. - * @throws \Psr\Cache\InvalidArgumentException if the cache key is not a legal value - * @noinspection PhpRedundantCatchClauseInspection - */ - protected function process(InputInterface $input, OutputInterface $output): int - { - if (!$this->cache->has('progress')) { - $this->logger->info('No watch progress items in the queue.'); - return self::SUCCESS; - } - - /** @var array $entities */ - $entities = []; - - foreach ($this->cache->get('progress', []) as $queueItem) { - assert($queueItem instanceof iState); - - $dbItem = $this->db->get($queueItem); - if (null === $dbItem || $dbItem->isWatched() || $queueItem->isWatched()) { - continue; - } - - $dbItem = $dbItem->apply($queueItem); - - if (!$dbItem->hasPlayProgress()) { - continue; - } - - if (array_key_exists($dbItem->id, $entities) && $entities[$dbItem->id]->getPlayProgress( - ) > $dbItem->getPlayProgress()) { - continue; - } - - $entities[$dbItem->id] = $dbItem; - } - - if (empty($entities)) { - $this->cache->delete('progress'); - $this->logger->debug('No watch progress items in the queue.'); - return self::SUCCESS; - } - - if ($input->getOption('list')) { - return $this->listItems($input, $output, $entities); - } - - $list = []; - $supported = Config::get('supported', []); - - foreach ((array)Config::get('servers', []) as $backendName => $backend) { - $type = strtolower(ag($backend, 'type', 'unknown')); - - if (true !== (bool)ag($backend, 'export.enabled')) { - $this->logger->info("SYSTEM: Export to '{backend}' is disabled by user.", [ - 'backend' => $backendName, - ]); - - continue; - } - - if (!isset($supported[$type])) { - $this->logger->error("SYSTEM: '{backend}' Invalid type.", [ - 'backend' => $backendName, - 'condition' => [ - 'expected' => implode(', ', array_keys($supported)), - 'given' => $type, - ], - ]); - continue; - } - - if (null === ($url = ag($backend, 'url')) || false === isValidURL($url)) { - $this->logger->error("SYSTEM: '{backend}' Invalid url.", [ - 'backend' => $backendName, - 'url' => $url ?? 'None', - ]); - continue; - } - - $backend['name'] = $backendName; - $list[$backendName] = $backend; - } - - if (empty($list)) { - $this->logger->warning('SYSTEM: There are no backends with export enabled.'); - return self::FAILURE; - } - - foreach ($list as $name => &$backend) { - try { - $opts = ag($backend, 'options', []); - - if ($input->getOption('ignore-date')) { - $opts[Options::IGNORE_DATE] = true; - } - - if ($input->getOption('dry-run')) { - $opts[Options::DRY_RUN] = true; - } - - if ($input->getOption('trace')) { - $opts[Options::DEBUG_TRACE] = true; - } - - $backend['options'] = $opts; - $backend['class'] = $this->getBackend(name: $name, config: $backend); - - $backend['class']->progress(entities: $entities, queue: $this->queue); - } catch (UnexpectedVersionException $e) { - $this->logger->notice( - "SYSTEM: Sync play progress is not supported for '{backend}'. '{error.message}' at '{error.file}:{error.line}'.", - [ - 'backend' => $name, - 'error' => [ - 'kind' => $e::class, - 'line' => $e->getLine(), - 'message' => $e->getMessage(), - 'file' => after($e->getFile(), ROOT_PATH), - ], - 'exception' => [ - 'file' => $e->getFile(), - 'line' => $e->getLine(), - 'kind' => get_class($e), - 'message' => $e->getMessage(), - 'trace' => $e->getTrace(), - ], - ] - ); - } catch (Throwable $e) { - $this->logger->error( - message: "SYSTEM: Exception '{error.kind}' was thrown unhandled during '{backend}' request to sync progress. '{error.message}' at '{error.file}:{error.line}'.", - context: [ - 'backend' => $name, - 'error' => [ - 'kind' => $e::class, - 'line' => $e->getLine(), - 'message' => $e->getMessage(), - 'file' => after($e->getFile(), ROOT_PATH), - ], - 'exception' => [ - 'file' => $e->getFile(), - 'line' => $e->getLine(), - 'kind' => get_class($e), - 'message' => $e->getMessage(), - 'trace' => $e->getTrace(), - ], - ] - ); - } - } - - unset($backend); - - $total = count($this->queue); - - if ($total >= 1) { - $start = makeDate(); - $this->logger->notice("SYSTEM: Sending '{total}' progress update requests.", [ - 'total' => $total, - 'time' => [ - 'start' => $start, - ], - ]); - - foreach ($this->queue->getQueue() as $response) { - $context = ag($response->getInfo('user_data'), 'context', []); - - try { - if ($input->getOption('trace')) { - $this->logger->debug("Processing '{backend}' - '{item.title}' response.", [ - 'url' => ag($context, 'remote.url', '??'), - 'status_code' => $response->getStatusCode(), - 'headers' => $response->getHeaders(false), - 'response' => $response->getContent(false), - ...$context - ]); - } - - if (!in_array($response->getStatusCode(), [200, 204])) { - $this->logger->error( - "SYSTEM: Request to change '{backend}' '{item.title}' watch progress returned with unexpected '{status_code}' status code.", - [ - 'status_code' => $response->getStatusCode(), - ...$context - ] - ); - continue; - } - - $this->logger->notice("SYSTEM: Updated '{backend}' '{item.title}' watch progress.", [ - ...$context, - 'status_code' => $response->getStatusCode(), - ]); - } catch (Throwable $e) { - $this->logger->error( - message: "SYSTEM: Exception '{error.kind}' was thrown unhandled during '{backend}' request to change watch progress of {item.type} '{item.title}'. '{error.message}' at '{error.file}:{error.line}'.", - context: [ - 'error' => [ - 'kind' => $e::class, - 'line' => $e->getLine(), - 'message' => $e->getMessage(), - 'file' => after($e->getFile(), ROOT_PATH), - ], - 'exception' => [ - 'file' => $e->getFile(), - 'line' => $e->getLine(), - 'kind' => get_class($e), - 'message' => $e->getMessage(), - 'trace' => $e->getTrace(), - ], - ...$context, - ] - ); - } - } - - $end = makeDate(); - - $this->logger->notice("SYSTEM: Sent '{total}' watch progress requests.", [ - 'total' => $total, - 'time' => [ - 'start' => $start, - 'end' => $end, - 'duration' => $end->getTimestamp() - $start->getTimestamp(), - ], - ]); - - $this->logger->notice("Using WatchState Version - '{version}'.", ['version' => getAppVersion()]); - } else { - $this->logger->notice('SYSTEM: No play state changes detected.'); - } - - if (false === $input->getOption('keep') && false === $input->getOption('dry-run')) { - $this->cache->delete('progress'); - } - - return self::SUCCESS; - } - - /** - * Renders and displays a list of items based on the specified output mode. - * - * @param InputInterface $input The input interface object. - * @param OutputInterface $output The output interface object. - * @param array $items An array of items to be listed. - * - * @return int The status code indicating the success of the method execution. - */ - private function listItems(InputInterface $input, OutputInterface $output, array $items): int - { - $list = []; - - $mode = $input->getOption('output'); - - foreach ($items as $item) { - if ('table' === $mode) { - $builder = [ - 'queued' => makeDate(ag($item->getExtra($item->via), iState::COLUMN_EXTRA_DATE))->format( - 'Y-m-d H:i:s T' - ), - 'via' => $item->via, - 'title' => $item->getName(), - 'played' => $item->isWatched() ? 'Yes' : 'No', - 'play_time' => formatDuration($item->getPlayProgress()), - 'tainted' => $item->isTainted() ? 'Yes' : 'No', - 'event' => ag($item->getExtra($item->via), iState::COLUMN_EXTRA_EVENT, '??'), - ]; - } else { - $builder = [ - ...$item->getAll(), - 'tainted' => $item->isTainted(), - ]; - } - - $list[] = $builder; - } - - $this->displayContent($list, $output, $mode); - - return self::SUCCESS; - } -} diff --git a/src/Commands/State/RequestsCommand.php b/src/Commands/State/RequestsCommand.php deleted file mode 100644 index 6a6008fa..00000000 --- a/src/Commands/State/RequestsCommand.php +++ /dev/null @@ -1,229 +0,0 @@ -setName(self::ROUTE) - ->setDescription('Process queued webhook requests.') - ->addOption('keep', 'k', InputOption::VALUE_NONE, 'Do not expunge queue after run is complete.') - ->addOption('list', 'l', InputOption::VALUE_NONE, 'List queued requests.') - ->addOption('dry-run', null, InputOption::VALUE_NONE, 'Do not commit changes to backends.') - ->addOption('no-stats', null, InputOption::VALUE_NONE, 'Do not display end of run stats.') - ->setHelp('This command process queued webhook requests.'); - } - - /** - * Make sure the command is not running in parallel. - * - * @param InputInterface $input The input interface. - * @param OutputInterface $output The output interface. - * - * @return int The exit code of the command. - * @throws \Psr\Cache\InvalidArgumentException if the $key string is not a legal value - */ - protected function runCommand(InputInterface $input, OutputInterface $output): int - { - return $this->single(fn(): int => $this->process($input, $output), $output); - } - - /** - * Run the command. - * - * @param InputInterface $input The input interface. - * @param OutputInterface $output The output interface. - * - * @return int The exit code of the command. - * @throws \Psr\Cache\InvalidArgumentException if the $key string is not a legal value - */ - protected function process(InputInterface $input, OutputInterface $output): int - { - if (!$this->cache->has('requests')) { - $this->logger->info('No requests in the queue.'); - return self::SUCCESS; - } - - $queued = []; - $requests = $this->cache->get('requests', []); - - if (count($requests) < 1) { - $this->logger->info('No requests in the queue.'); - return self::SUCCESS; - } - - if ($input->getOption('list')) { - return $this->listItems($input, $output, $requests); - } - - if ($input->getOption('dry-run')) { - $this->logger->info('Dry run mode. No changes will be committed.'); - } - - $this->mapper->setOptions([ - Options::DRY_RUN => $input->getOption('dry-run'), - Options::DEBUG_TRACE => $input->getOption('trace') - ]); - - $fn = function (iState $state) use (&$queued) { - $queued[$state->id] = $state; - }; - - foreach ($requests as $request) { - $entity = ag($request, 'entity'); - assert($entity instanceof iState); - - $options = ag($request, 'options', []); - - $lastSync = ag(Config::get("servers.{$entity->via}", []), 'import.lastSync'); - if (null !== $lastSync) { - $lastSync = makeDate($lastSync); - } - - $this->logger->notice('SYSTEM: Processing [{backend}] [{title}] {tainted} request.', [ - 'backend' => $entity->via, - 'title' => $entity->getName(), - 'event' => ag($entity->getExtra($entity->via), iState::COLUMN_EXTRA_EVENT, '??'), - 'tainted' => $entity->isTainted() ? 'tainted' : 'untainted', - 'lastSync' => $lastSync, - ]); - - $this->mapper->add($entity, [ - Options::IMPORT_METADATA_ONLY => (bool)ag($options, Options::IMPORT_METADATA_ONLY), - Options::STATE_UPDATE_EVENT => $fn, - 'after' => $lastSync, - ]); - } - - foreach ($queued as $item) { - queuePush($item); - } - - $operations = $this->mapper->commit(); - - $a = [ - [ - 'Type' => ucfirst(iState::TYPE_MOVIE), - 'Added' => $operations[iState::TYPE_MOVIE]['added'] ?? '-', - 'Updated' => $operations[iState::TYPE_MOVIE]['updated'] ?? '-', - 'Failed' => $operations[iState::TYPE_MOVIE]['failed'] ?? '-', - ], - new TableSeparator(), - [ - 'Type' => ucfirst(iState::TYPE_EPISODE), - 'Added' => $operations[iState::TYPE_EPISODE]['added'] ?? '-', - 'Updated' => $operations[iState::TYPE_EPISODE]['updated'] ?? '-', - 'Failed' => $operations[iState::TYPE_EPISODE]['failed'] ?? '-', - ], - ]; - - if (false === $input->getOption('no-stats')) { - (new Table($output)) - ->setHeaders(array_keys($a[0])) - ->setStyle('box') - ->setRows(array_values($a)) - ->render(); - } - - if (false === $input->getOption('keep') && false === $input->getOption('dry-run')) { - $this->cache->delete('requests'); - } - - return self::SUCCESS; - } - - /** - * Lists items based on the provided input and output. - * - * @param InputInterface $input The input interface object. - * @param OutputInterface $output The output interface object. - * @param array $requests The array of requests. - * - * @return int Returns the success status code. - */ - private function listItems(InputInterface $input, OutputInterface $output, array $requests): int - { - $list = []; - - $mode = $input->getOption('output'); - - foreach ($requests as $request) { - $opts = ag($request, 'options', []); - $item = ag($request, 'entity'); - - assert($item instanceof iState); - - if ('table' === $mode) { - $builder = [ - 'queued' => makeDate(ag($item->getExtra($item->via), iState::COLUMN_EXTRA_DATE))->format( - 'Y-m-d H:i:s T' - ), - 'via' => $item->via, - 'title' => $item->getName(), - 'played' => $item->isWatched() ? 'Yes' : 'No', - 'tainted' => $item->isTainted() ? 'Yes' : 'No', - 'event' => ag($item->getExtra($item->via), iState::COLUMN_EXTRA_EVENT, '??'), - ]; - } else { - $builder = [ - ...$item->getAll(), - 'tainted' => $item->isTainted(), - 'options' => $opts - ]; - } - - $list[] = $builder; - } - - $this->displayContent($list, $output, $mode); - - return self::SUCCESS; - } -} diff --git a/src/Libs/Database/DBLayer.php b/src/Libs/Database/DBLayer.php index 33c426b5..6cf5b7b0 100644 --- a/src/Libs/Database/DBLayer.php +++ b/src/Libs/Database/DBLayer.php @@ -46,6 +46,7 @@ final class DBLayer public const string IS_LEFT_OUTER_JOIN = 'LEFT OUTER JOIN'; public const string IS_MATCH_AGAINST = 'MATCH() AGAINST()'; public const string IS_JSON_CONTAINS = 'JSON_CONTAINS'; + public const string IS_JSON_EXTRACT = 'JSON_EXTRACT'; public const string IS_JSON_SEARCH = 'JSON_SEARCH'; public function __construct(private PDO $pdo) @@ -596,6 +597,23 @@ final class DBLayer $bind[$eBindName] = $opt[2]; break; + case self::IS_JSON_EXTRACT: + if (!isset($opt[1], $opt[2], $opt[3])) { + throw new RuntimeException('IS_JSON_CONTAINS: expects 3 parameters.'); + } + + $eBindName = '__db_je_' . random_int(1, 1000); + + $keys[] = sprintf( + "JSON_EXTRACT(%s, %s) %s %s", + $this->escapeIdentifier($column, true), + $opt[1], + $opt[2], + ':' . $eBindName, + ); + + $bind[$eBindName] = $opt[3]; + break; case self::IS_INNER_JOIN: case self::IS_LEFT_JOIN: case self::IS_LEFT_OUTER_JOIN: diff --git a/src/Libs/Events/DataEvent.php b/src/Libs/Events/DataEvent.php index 906761ba..9e33c9c0 100644 --- a/src/Libs/Events/DataEvent.php +++ b/src/Libs/Events/DataEvent.php @@ -16,6 +16,11 @@ class DataEvent extends Event return $this->eventInfo; } + public function getReference(): string|null + { + return $this->eventInfo->reference; + } + public function addLog(string $log): void { $this->eventInfo->logs[] = $log; @@ -30,4 +35,9 @@ class DataEvent extends Event { return $this->eventInfo->event_data; } + + public function getOptions(): array + { + return $this->eventInfo->options; + } } diff --git a/src/Libs/Exceptions/Backends/NotImplementedException.php b/src/Libs/Exceptions/Backends/NotImplementedException.php new file mode 100644 index 00000000..a08a44b9 --- /dev/null +++ b/src/Libs/Exceptions/Backends/NotImplementedException.php @@ -0,0 +1,14 @@ +bubble = true; + parent::__construct($level, true); + } + + public static function create(Closure $callback, $level = Level::Debug): self + { + return new self($callback, $level); + } + + public function close(): void + { + $this->closed = true; + } + + protected function write(LogRecord $record): void + { + if (true === $this->closed) { + return; + } + + $date = $record['datetime'] ?? 'No date set'; + + if (true === ($date instanceof DateTimeInterface)) { + $date = $date->format(DateTimeInterface::ATOM); + } + + $message = r('[{date}] {level}: {message}', [ + 'date' => $date, + 'level' => $record['level_name'] ?? $record['level'] ?? '??', + 'message' => $record['message'], + ]); + + if (false === empty($record['context']) && true === (bool)Config::get('logs.context')) { + $message .= ' { ' . arrayToString($record['context']) . ' }'; + } + + ($this->callback)($message, $record); + } +} diff --git a/src/Libs/Initializer.php b/src/Libs/Initializer.php index 4d07a4d0..03d007f8 100644 --- a/src/Libs/Initializer.php +++ b/src/Libs/Initializer.php @@ -43,7 +43,6 @@ final class Initializer private Cli $cli; private ConsoleOutput $cliOutput; private iLogger|null $accessLog = null; - private bool $booted = false; /** * Initializes the object. diff --git a/src/Libs/Mappers/Import/DirectMapper.php b/src/Libs/Mappers/Import/DirectMapper.php index f9ce2e0d..1e48a18c 100644 --- a/src/Libs/Mappers/Import/DirectMapper.php +++ b/src/Libs/Mappers/Import/DirectMapper.php @@ -4,13 +4,15 @@ declare(strict_types=1); namespace App\Libs\Mappers\Import; +use App\Libs\Config; use App\Libs\Container; use App\Libs\Database\DatabaseInterface as iDB; use App\Libs\Entity\StateInterface as iState; use App\Libs\Mappers\ImportInterface as iImport; use App\Libs\Message; use App\Libs\Options; -use DateInterval; +use App\Listeners\ProcessProgressEvent; +use App\Model\Events\EventsTable; use DateTimeInterface as iDate; use PDOException; use Psr\Log\LoggerInterface as iLogger; @@ -403,7 +405,7 @@ final class DirectMapper implements iImport $newPlayProgress = (int)ag($entity->getMetadata($entity->via), iState::COLUMN_META_DATA_PROGRESS); $oldPlayProgress = (int)ag($cloned->getMetadata($entity->via), iState::COLUMN_META_DATA_PROGRESS); - $playChanged = $newPlayProgress != $oldPlayProgress; + $playChanged = $newPlayProgress > ($oldPlayProgress + 10); // -- this sometimes leads to never ending updates as data from backends conflicts. if ($playChanged || true === (bool)ag($this->options, Options::MAPPER_ALWAYS_UPDATE_META)) { @@ -420,13 +422,18 @@ final class DirectMapper implements iImport $progress = !$entity->isWatched() && $playChanged && $entity->hasPlayProgress(); if (count($changes) >= 1) { + $_keys = array_merge($keys, [iState::COLUMN_EXTRA]); + if ($playChanged && $progress) { + $_keys[] = iState::COLUMN_VIA; + } + $local = $local->apply($entity, fields: $_keys); $this->logger->notice( $progress ? "MAPPER: '{backend}' updated '{title}' due to play progress change." : "MAPPER: '{backend}' updated '{title}' metadata.", [ 'id' => $cloned->id, 'backend' => $entity->via, 'title' => $cloned->getName(), - 'changes' => $changes, + 'changes' => $progress ? $local->diff(fields: $_keys) : $changes, ] ); } @@ -442,7 +449,7 @@ final class DirectMapper implements iImport 'id' => ag($entity->getMetadata($entity->via), iState::COLUMN_ID, '??'), ]); - $this->progressItems[$itemId] = $entity; + $this->progressItems[$itemId] = $local; } } @@ -740,13 +747,18 @@ final class DirectMapper implements iImport */ public function commit(): array { - if (true === (bool)env('WS_CRON_PROGRESS', false) && count($this->progressItems) >= 1) { + if (true === (bool)Config::get('sync.progress', false) && count($this->progressItems) >= 1) { try { - $progress = $this->cache->get('progress', []); - foreach ($this->progressItems as $itemId => $entity) { - $progress[$itemId] = $entity; + foreach ($this->progressItems as $entity) { + queueEvent(ProcessProgressEvent::NAME, [iState::COLUMN_ID => $entity->id], [ + 'unique' => true, + EventsTable::COLUMN_REFERENCE => r('{type}://{id}@{backend}', [ + 'type' => $entity->type, + 'backend' => $entity->via, + 'id' => ag($entity->getMetadata($entity->via), iState::COLUMN_ID, '??'), + ]), + ]); } - $this->cache->set('progress', $progress, new DateInterval('P3D')); } catch (\Psr\SimpleCache\InvalidArgumentException) { } } @@ -828,6 +840,14 @@ final class DirectMapper implements iImport return $this; } + /** + * @inheritdoc + */ + public function getLogger(): iLogger + { + return $this->logger; + } + /** * @inheritdoc */ diff --git a/src/Libs/Mappers/Import/MemoryMapper.php b/src/Libs/Mappers/Import/MemoryMapper.php index 3cf9cda4..ba3e69d3 100644 --- a/src/Libs/Mappers/Import/MemoryMapper.php +++ b/src/Libs/Mappers/Import/MemoryMapper.php @@ -4,12 +4,14 @@ declare(strict_types=1); namespace App\Libs\Mappers\Import; +use App\Libs\Config; use App\Libs\Database\DatabaseInterface as iDB; use App\Libs\Entity\StateInterface as iState; use App\Libs\Mappers\ImportInterface as iImport; use App\Libs\Message; use App\Libs\Options; -use DateInterval; +use App\Listeners\ProcessProgressEvent; +use App\Model\Events\EventsTable; use DateTimeInterface as iDate; use PDOException; use Psr\Log\LoggerInterface as iLogger; @@ -277,7 +279,7 @@ final class MemoryMapper implements iImport $newPlayProgress = (int)ag($entity->getMetadata($entity->via), iState::COLUMN_META_DATA_PROGRESS); $oldPlayProgress = (int)ag($cloned->getMetadata($entity->via), iState::COLUMN_META_DATA_PROGRESS); - $playChanged = $newPlayProgress != $oldPlayProgress; + $playChanged = $newPlayProgress > ($oldPlayProgress + 10); // -- this sometimes leads to never ending updates as data from backends conflicts. if ($playChanged || true === (bool)ag($this->options, Options::MAPPER_ALWAYS_UPDATE_META)) { @@ -293,16 +295,23 @@ final class MemoryMapper implements iImport $this->removePointers($cloned)->addPointers($this->objects[$pointer], $pointer); $changes = $this->objects[$pointer]->diff(fields: $keys); - $progress = !$entity->isWatched() && $playChanged && $entity->hasPlayProgress(); + if (count($changes) >= 1) { + $_keys = array_merge($keys, [iState::COLUMN_EXTRA]); + if ($playChanged && $progress) { + $_keys[] = iState::COLUMN_VIA; + } + + $this->objects[$pointer] = $this->objects[$pointer]->apply(entity: $entity, fields: $_keys); + $this->logger->notice( $progress ? "MAPPER: '{backend}' updated '{title}' due to play progress change." : "MAPPER: '{backend}' updated '{title}' metadata.", [ 'id' => $cloned->id, 'backend' => $entity->via, 'title' => $cloned->getName(), - 'changes' => $changes, + 'changes' => $progress ? $this->objects[$pointer]->diff(fields: $_keys) : $changes, 'fields' => implode(',', $keys), ] ); @@ -315,7 +324,7 @@ final class MemoryMapper implements iImport 'id' => ag($entity->getMetadata($entity->via), iState::COLUMN_ID, '??'), ]); - $this->progressItems[$itemId] = $entity; + $this->progressItems[$itemId] = $this->objects[$pointer]; } } @@ -543,13 +552,18 @@ final class MemoryMapper implements iImport public function commit(): mixed { if (true !== $this->inDryRunMode()) { - if (true === (bool)env('WS_CRON_PROGRESS', false) && count($this->progressItems) >= 1) { + if (true === (bool)Config::get('sync.progress', false) && count($this->progressItems) >= 1) { try { - $progress = $this->cache->get('progress', []); - foreach ($this->progressItems as $itemId => $entity) { - $progress[$itemId] = $entity; + foreach ($this->progressItems as $entity) { + queueEvent(ProcessProgressEvent::NAME, [iState::COLUMN_ID => $entity->id], [ + 'unique' => true, + EventsTable::COLUMN_REFERENCE => r('{type}://{id}@{backend}', [ + 'type' => $entity->type, + 'backend' => $entity->via, + 'id' => ag($entity->getMetadata($entity->via), iState::COLUMN_ID, '??'), + ]), + ]); } - $this->cache->set('progress', $progress, new DateInterval('P3D')); } catch (\Psr\SimpleCache\InvalidArgumentException) { } } @@ -655,6 +669,14 @@ final class MemoryMapper implements iImport return $this; } + /** + * @inheritdoc + */ + public function getLogger(): iLogger + { + return $this->logger; + } + /** * @inheritdoc */ diff --git a/src/Libs/Mappers/Import/RestoreMapper.php b/src/Libs/Mappers/Import/RestoreMapper.php index 88ed6978..0273a5de 100644 --- a/src/Libs/Mappers/Import/RestoreMapper.php +++ b/src/Libs/Mappers/Import/RestoreMapper.php @@ -195,6 +195,14 @@ final class RestoreMapper implements iImport return $this; } + /** + * @inheritdoc + */ + public function getLogger(): iLogger + { + return $this->logger; + } + /** * @inheritdoc */ diff --git a/src/Libs/Mappers/ImportInterface.php b/src/Libs/Mappers/ImportInterface.php index 70908301..13fc86df 100644 --- a/src/Libs/Mappers/ImportInterface.php +++ b/src/Libs/Mappers/ImportInterface.php @@ -108,6 +108,13 @@ interface ImportInterface extends Countable */ public function setLogger(LoggerInterface $logger): self; + /** + * Get the logger instance. + * + * @return LoggerInterface The logger instance. + */ + public function getLogger(): LoggerInterface; + /** * Set the database object for this class. * diff --git a/src/Libs/Options.php b/src/Libs/Options.php index e1511306..6a83f05f 100644 --- a/src/Libs/Options.php +++ b/src/Libs/Options.php @@ -37,6 +37,7 @@ final class Options public const string NO_FALLBACK = 'NO_FALLBACK'; public const string LIMIT_RESULTS = 'LIMIT_RESULTS'; public const string NO_CHECK = 'NO_CHECK'; + public const string LOG_WRITER = 'LOG_WRITER'; private function __construct() { diff --git a/src/Libs/helpers.php b/src/Libs/helpers.php index 438ab962..f9de11cc 100644 --- a/src/Libs/helpers.php +++ b/src/Libs/helpers.php @@ -10,6 +10,7 @@ use App\Libs\APIResponse; use App\Libs\Attributes\Scanner\Attributes as AttributesScanner; use App\Libs\Attributes\Scanner\Item as ScannerItem; use App\Libs\Config; +use App\Libs\ConfigFile; use App\Libs\Container; use App\Libs\DataUtil; use App\Libs\Entity\StateInterface as iState; @@ -29,6 +30,7 @@ use App\Libs\Uri; use App\Model\Events\Event as EventInfo; use App\Model\Events\EventListener; use App\Model\Events\EventsRepository; +use App\Model\Events\EventsTable; use App\Model\Events\EventStatus; use Monolog\Utils; use Nyholm\Psr7\Factory\Psr17Factory; @@ -467,7 +469,7 @@ if (!function_exists('api_error')) { $response = api_response( status: $httpCode, body: array_replace_recursive($body, [ - 'error' => [ + ag($opts, 'top_key', 'error') => [ 'code' => $httpCode->value, 'message' => $message ] @@ -488,6 +490,32 @@ if (!function_exists('api_error')) { } } +if (!function_exists('api_message')) { + /** + * Create API message response. + * + * @param string $message The error message. + * @param Status|int $httpCode Optional. The HTTP status code. Default is {@see Status::OK}. + * @param array $body Optional. Additional fields to include in the response body. + * @param array $headers Optional. Additional headers to include in the response. + * @param string|null $reason Optional. The reason phrase to include in the response. Default is null. + * @param array $opts Optional. Additional options. + * + * @return iResponse A PSR-7 compatible response object. + */ + function api_message( + string $message, + Status|int $httpCode = Status::OK, + array $body = [], + array $headers = [], + string|null $reason = null, + array $opts = [] + ): iResponse { + $opts['top_key'] = 'info'; + return api_error($message, $httpCode, $body, $headers, $reason, $opts); + } +} + if (!function_exists('httpClientChunks')) { /** * Handle response stream as chunks. @@ -735,7 +763,7 @@ if (!function_exists('getAppVersion')) { $proc = Process::fromShellCommandline(sprintf($cmd, escapeshellarg($gitDir))); $proc->run(); if ($proc->isSuccessful()) { - return explode(PHP_EOL, $proc->getOutput())[0]; + return trim(explode(PHP_EOL, $proc->getOutput())[0]); } } } @@ -1959,7 +1987,24 @@ if (!function_exists('queueEvent')) { $repo = ag($opts, EventsRepository::class, fn() => Container::get(EventsRepository::class)); assert($repo instanceof EventsRepository); - $item = $repo->getObject([]); + $item = null; + if (null !== ($reference = ag($opts, EventsTable::COLUMN_REFERENCE))) { + $criteria = []; + $isUnique = (bool)ag($opts, 'unique', false); + + if (false === $isUnique) { + $criteria[EventsTable::COLUMN_STATUS] = EventStatus::PENDING->value; + } + + if (null !== ($refItem = $repo->findByReference($reference, $criteria)) && true === $isUnique) { + $repo->remove($refItem); + } else { + $item = $refItem; + } + unset($refItem); + } + + $item = $item ?? $repo->getObject([]); $item->event = $event; $item->status = EventStatus::PENDING; $item->event_data = $data; @@ -1968,9 +2013,62 @@ if (!function_exists('queueEvent')) { 'class' => ag($opts, 'class', DataEvent::class), ]; + if (ag_exists($opts, EventsTable::COLUMN_OPTIONS) && is_array($opts[EventsTable::COLUMN_OPTIONS])) { + $item->options = array_replace_recursive($opts[EventsTable::COLUMN_OPTIONS], $item->options); + } + + if ($reference) { + $item->reference = $reference; + } + $id = $repo->save($item); $item->id = $id; return $item; } } + +if (!function_exists('getPagination')) { + function getPagination(iRequest $request, int $page = 1, int $perpage = 0, array $options = []): array + { + $page = (int)($request->getQueryParams()['page'] ?? $page); + + if (0 === $perpage) { + $perpage = 25; + } + + if (false === array_key_exists('force_perpage', $options)) { + $perpage = (int)($request->getQueryParams()['perpage'] ?? $perpage); + } + + $start = (($page <= 2) ? ((1 === $page) ? 0 : $perpage) : $perpage * ($page - 1)); + $start = (!$page) ? 0 : $start; + + return [$page, $perpage, $start]; + } +} + +if (!function_exists('getBackend')) { + /** + * Retrieves the backend client for the specified name. + * + * @param string $name The name of the backend. + * @param array $config (Optional) Override the default configuration for the backend. + * + * @return iClient The backend client instance. + * @throws RuntimeException If no backend with the specified name is found. + */ + function getBackend(string $name, array $config = []): iClient + { + $configFile = ConfigFile::open(Config::get('backends_file'), 'yaml'); + + if (null === $configFile->get("{$name}.type", null)) { + throw new RuntimeException(r("No backend named '{backend}' was found.", ['backend' => $name])); + } + + $default = $configFile->get($name); + $default['name'] = $name; + + return makeBackend(array_replace_recursive($default, $config), $name); + } +} diff --git a/src/Listeners/OnTestEvent.php b/src/Listeners/OnTestEvent.php deleted file mode 100644 index 54999f9c..00000000 --- a/src/Listeners/OnTestEvent.php +++ /dev/null @@ -1,20 +0,0 @@ -stopPropagation(); - return $e; - } -} diff --git a/src/Listeners/ProcessProgressEvent.php b/src/Listeners/ProcessProgressEvent.php new file mode 100644 index 00000000..88007efa --- /dev/null +++ b/src/Listeners/ProcessProgressEvent.php @@ -0,0 +1,256 @@ +addLog($level->getName() . ': ' . r($message, $context)); + $this->logger->log($level, $message, $context); + }; + + $e->stopPropagation(); + + $options = $e->getOptions(); + + if (null === ($item = $this->db->get(Container::get(iState::class)::fromArray($e->getData())))) { + $writer(Level::Error, "Item '{id}' Is not referenced locally yet.", ['id' => ag($e->getData(), 'id', '?')]); + return $e; + } + + if ($item->isWatched()) { + $writer(Level::Info, "Item '{id}: {title}' is marked as watched. Not updating watch process.", [ + 'id' => $item->id, + 'title' => $item->getName() + ]); + return $e; + } + + if (false === $item->hasPlayProgress()) { + $writer(Level::Info, "Item '{title}' has no watch progress to export.", ['title' => $item->title]); + return $e; + } + + $list = []; + + $supported = Config::get('supported', []); + + foreach ((array)Config::get('servers', []) as $backendName => $backend) { + $type = strtolower(ag($backend, 'type', 'unknown')); + + if (true !== (bool)ag($backend, 'export.enabled')) { + $writer(Level::Notice, "SYSTEM: Export to '{backend}' is disabled by user.", [ + 'backend' => $backendName + ]); + continue; + } + + if (!isset($supported[$type])) { + $writer(Level::Error, "SYSTEM: '{backend}' Invalid type.", [ + 'backend' => $backendName, + 'condition' => [ + 'expected' => implode(', ', array_keys($supported)), + 'given' => $type, + ], + ]); + continue; + } + + if (null === ($url = ag($backend, 'url')) || false === isValidURL($url)) { + $writer(Level::Error, "SYSTEM: '{backend}' Invalid url.", [ + 'backend' => $backendName, + 'url' => $url ?? 'None', + ]); + continue; + } + + $backend['name'] = $backendName; + $list[$backendName] = $backend; + } + + if (empty($list)) { + $writer(Level::Info, 'SYSTEM: There are no backends with export enabled.'); + return $e; + } + + foreach ($list as $name => &$backend) { + try { + $opts = ag($backend, 'options', []); + + if (ag($options, 'ignore-date')) { + $opts[Options::IGNORE_DATE] = true; + } + + if (ag($options, 'dry-run')) { + $opts[Options::DRY_RUN] = true; + } + + if (ag($options, 'trace')) { + $opts[Options::DEBUG_TRACE] = true; + } + + $backend['options'] = $opts; + $backend['class'] = getBackend(name: $name, config: $backend); + $backend['class']->progress(entities: [$item->id => $item], queue: $this->queue); + } catch (UnexpectedVersionException|NotImplementedException $e) { + $writer( + Level::Notice, + "SYSTEM: This feature is not available for '{backend}'. '{error.message}' at '{error.file}:{error.line}'.", + [ + 'backend' => $name, + 'error' => [ + 'kind' => $e::class, + 'line' => $e->getLine(), + 'message' => $e->getMessage(), + 'file' => after($e->getFile(), ROOT_PATH), + ], + 'exception' => [ + 'file' => $e->getFile(), + 'line' => $e->getLine(), + 'kind' => get_class($e), + 'message' => $e->getMessage(), + 'trace' => $e->getTrace(), + ], + ] + ); + } catch (Throwable $e) { + $writer( + Level::Error, + "SYSTEM: Exception '{error.kind}' was thrown unhandled during '{backend}' request to sync progress. '{error.message}' at '{error.file}:{error.line}'.", + [ + 'backend' => $name, + 'error' => [ + 'kind' => $e::class, + 'line' => $e->getLine(), + 'message' => $e->getMessage(), + 'file' => after($e->getFile(), ROOT_PATH), + ], + 'exception' => [ + 'file' => $e->getFile(), + 'line' => $e->getLine(), + 'kind' => get_class($e), + 'message' => $e->getMessage(), + 'trace' => $e->getTrace(), + ], + ] + ); + } + } + + unset($backend); + + $total = count($this->queue); + + if ($total >= 1) { + $start = makeDate(); + + $writer(Level::Notice, "SYSTEM: Sending '{total}' progress update requests.", [ + 'total' => $total, + 'time' => [ + 'start' => $start, + ], + ]); + + foreach ($this->queue->getQueue() as $response) { + $context = ag($response->getInfo('user_data'), 'context', []); + + try { + if (ag($options, 'trace')) { + $writer(Level::Debug, "Processing '{backend}' - '{item.title}' response.", [ + 'url' => ag($context, 'remote.url', '??'), + 'status_code' => $response->getStatusCode(), + 'headers' => $response->getHeaders(false), + 'response' => $response->getContent(false), + ...$context + ]); + } + + if (!in_array($response->getStatusCode(), [Status::OK->value, Status::NO_CONTENT->value])) { + $writer( + Level::Error, + "SYSTEM: Request to change '{backend}' '{item.title}' watch progress returned with unexpected '{status_code}' status code.", + [ + 'status_code' => $response->getStatusCode(), + ...$context + ] + ); + continue; + } + + $writer(Level::Notice, "SYSTEM: Updated '{backend}' '{item.title}' watch progress.", [ + ...$context, + 'status_code' => $response->getStatusCode(), + ]); + } catch (Throwable $e) { + $writer( + Level::Error, + "SYSTEM: Exception '{error.kind}' was thrown unhandled during '{backend}' request to change watch progress of {item.type} '{item.title}'. '{error.message}' at '{error.file}:{error.line}'.", + [ + 'error' => [ + 'kind' => $e::class, + 'line' => $e->getLine(), + 'message' => $e->getMessage(), + 'file' => after($e->getFile(), ROOT_PATH), + ], + 'exception' => [ + 'file' => $e->getFile(), + 'line' => $e->getLine(), + 'kind' => get_class($e), + 'message' => $e->getMessage(), + 'trace' => $e->getTrace(), + ], + ...$context, + ] + ); + } + } + + $end = makeDate(); + $writer(Level::Notice, "SYSTEM: Sent '{total}' watch progress requests.", [ + 'total' => $total, + 'time' => [ + 'start' => $start, + 'end' => $end, + 'duration' => $end->getTimestamp() - $start->getTimestamp(), + ], + ]); + } else { + $writer(Level::Notice, 'SYSTEM: No watch progress changes detected.'); + } + + return $e; + } +} diff --git a/src/Listeners/ProcessRequestEvent.php b/src/Listeners/ProcessRequestEvent.php new file mode 100644 index 00000000..95f4b9bc --- /dev/null +++ b/src/Listeners/ProcessRequestEvent.php @@ -0,0 +1,78 @@ +stopPropagation(); + + $entity = Container::get(iState::class)::fromArray($e->getData()); + + if (null !== ($lastSync = ag(Config::get("servers.{$entity->via}", []), 'import.lastSync'))) { + $lastSync = makeDate($lastSync); + } + + $message = r('SYSTEM: Processing [{backend}] [{title}] {tainted} request.', [ + 'backend' => $entity->via, + 'title' => $entity->getName(), + 'event' => ag($entity->getExtra($entity->via), iState::COLUMN_EXTRA_EVENT, '??'), + 'tainted' => $entity->isTainted() ? 'tainted' : 'untainted', + 'lastSync' => $lastSync, + ]); + + $e->addLog($message); + $this->logger->notice($message); + + $logger = clone $this->logger; + assert($logger instanceof Logger); + + $handler = ProxyHandler::create($e->addLog(...)); + $logger->pushHandler($handler); + + $oldLogger = $this->mapper->getLogger(); + $this->mapper->setLogger($logger); + + $metadataOnly = (bool)ag($e->getOptions(), Options::IMPORT_METADATA_ONLY); + $this->mapper->add($entity, [ + Options::IMPORT_METADATA_ONLY => $metadataOnly, + Options::STATE_UPDATE_EVENT => fn(iState $state) => queuePush($state), + 'after' => $lastSync, + ]); + + $this->mapper->commit(); + $this->mapper->setLogger($oldLogger); + + $handler->close(); + + return $e; + } +} diff --git a/src/Model/Events/Event.php b/src/Model/Events/Event.php index 33c0af0f..53288bd9 100644 --- a/src/Model/Events/Event.php +++ b/src/Model/Events/Event.php @@ -25,6 +25,11 @@ final class Event extends EntityTable */ public EventStatus $status = EventStatus::PENDING; + /** + * @uses EntityTable::COLUMN_REFERENCE + */ + public string|null $reference = null; + /** * @uses EntityTable::COLUMN_EVENT */ diff --git a/src/Model/Events/EventsRepository.php b/src/Model/Events/EventsRepository.php index 34b38195..7d767185 100644 --- a/src/Model/Events/EventsRepository.php +++ b/src/Model/Events/EventsRepository.php @@ -30,6 +30,25 @@ final class EventsRepository return $this->_findOne([$this->primaryKey => $id]); } + /** + * Will return the last event by reference. + * + * @param string|int $reference Reference to search by. + * @param array $criteria Criteria to search by. + * + * @return EntityItem|null The event or null if not found. + */ + public function findByReference(string|int $reference, array $criteria = []): EntityItem|null + { + $criteria[EntityTable::COLUMN_REFERENCE] = $reference; + + $items = (clone $this)->setPerpage(1)->setStart(0)->setDescendingOrder() + ->setSort(EntityTable::COLUMN_CREATED_AT) + ->findAll($criteria); + + return $items[0] ?? null; + } + /** * @param array $criteria Criteria to search by. * @param array $cols Columns to select. diff --git a/src/Model/Events/EventsTable.php b/src/Model/Events/EventsTable.php index ecbbe668..027bef3c 100644 --- a/src/Model/Events/EventsTable.php +++ b/src/Model/Events/EventsTable.php @@ -13,6 +13,7 @@ abstract class EventsTable extends BasicModel public const string COLUMN_ID = 'id'; public const string COLUMN_STATUS = 'status'; + public const string COLUMN_REFERENCE = 'reference'; public const string COLUMN_EVENT = 'event'; public const string COLUMN_EVENT_DATA = 'event_data'; public const string COLUMN_OPTIONS = 'options';