From 302f94576366baa4fdf52c08492481809005dc8d Mon Sep 17 00:00:00 2001 From: "Abdulmhsen B. A. A" Date: Fri, 10 Nov 2023 18:57:19 +0300 Subject: [PATCH] Added Progress command, it's still experimental and only works correctly for plex at the moment. do not use it yet. --- src/Backends/Common/ClientInterface.php | 11 + src/Backends/Emby/Action/Progress.php | 9 + src/Backends/Emby/EmbyClient.php | 21 ++ src/Backends/Jellyfin/Action/Progress.php | 176 +++++++++++++ src/Backends/Jellyfin/JellyfinClient.php | 21 ++ src/Backends/Plex/Action/Progress.php | 168 ++++++++++++ src/Backends/Plex/PlexClient.php | 21 ++ src/Commands/State/ProgressCommand.php | 307 ++++++++++++++++++++++ src/Libs/Entity/StateEntity.php | 45 ++++ src/Libs/Entity/StateInterface.php | 6 + src/Libs/Initializer.php | 4 +- 11 files changed, 787 insertions(+), 2 deletions(-) create mode 100644 src/Backends/Emby/Action/Progress.php create mode 100644 src/Backends/Jellyfin/Action/Progress.php create mode 100644 src/Backends/Plex/Action/Progress.php create mode 100644 src/Commands/State/ProgressCommand.php diff --git a/src/Backends/Common/ClientInterface.php b/src/Backends/Common/ClientInterface.php index c125f801..6ce2531e 100644 --- a/src/Backends/Common/ClientInterface.php +++ b/src/Backends/Common/ClientInterface.php @@ -110,6 +110,17 @@ interface ClientInterface */ public function push(array $entities, QueueRequests $queue, iDate|null $after = null): array; + /** + * Compare watch progress and push to backend. + * + * @param array $entities + * @param QueueRequests $queue + * @param iDate|null $after + * + * @return array + */ + public function progress(array $entities, QueueRequests $queue, iDate|null $after = null): array; + /** * Search backend libraries. * diff --git a/src/Backends/Emby/Action/Progress.php b/src/Backends/Emby/Action/Progress.php new file mode 100644 index 00000000..7e4f7f14 --- /dev/null +++ b/src/Backends/Emby/Action/Progress.php @@ -0,0 +1,9 @@ +context, + entities: $entities, + queue: $queue, + after: $after + ); + + if ($response->hasError()) { + $this->logger->log($response->error->level(), $response->error->message, $response->error->context); + } + + if (false === $response->isSuccessful()) { + throw new RuntimeException(ag($response->extra, 'message', fn() => $response->error->format())); + } + + return []; + } + public function search(string $query, int $limit = 25, array $opts = []): array { $response = Container::get(SearchQuery::class)( diff --git a/src/Backends/Jellyfin/Action/Progress.php b/src/Backends/Jellyfin/Action/Progress.php new file mode 100644 index 00000000..01aff953 --- /dev/null +++ b/src/Backends/Jellyfin/Action/Progress.php @@ -0,0 +1,176 @@ + $entities + * @param QueueRequests $queue + * @param DateTimeInterface|null $after + * @return Response + */ + public function __invoke( + Context $context, + array $entities, + QueueRequests $queue, + DateTimeInterface|null $after = null + ): Response { + return $this->tryResponse(context: $context, fn: fn() => $this->action($context, $entities, $queue, $after)); + } + + private function action( + Context $context, + array $entities, + QueueRequests $queue, + DateTimeInterface|null $after = null + ): Response { + $ignoreDate = (bool)ag($context->options, Options::IGNORE_DATE, false); + + foreach ($entities as $key => $entity) { + if (true !== ($entity instanceof iState)) { + continue; + } + + if (null !== $after && false === (bool)ag($context->options, Options::IGNORE_DATE, false)) { + if ($after->getTimestamp() > $entity->updated) { + continue; + } + } + + $metadata = $entity->getMetadata($context->backendName); + + $logContext = [ + 'item' => [ + 'id' => $entity->id, + 'type' => $entity->type, + 'title' => $entity->getName(), + ], + ]; + + if (null === ag($metadata, iState::COLUMN_ID, null)) { + $this->logger->warning( + 'Ignoring [{item.title}] for [{backend}]. No metadata was found.', + [ + 'backend' => $context->backendName, + ...$logContext, + ] + ); + continue; + } + + if (null === ($senderDate = ag($entity->getExtra($entity->via), iState::COLUMN_EXTRA_DATE))) { + $this->logger->warning('Ignoring [{item.title}] for [{backend}]. No Sender has set no date.', [ + 'backend' => $context->backendName, + ...$logContext, + ]); + continue; + } + + if ($context->backendName === $entity->via) { + $this->logger->debug('Ignoring event as it was originated from this backend.', [ + 'backend' => $context->backendName, + ...$logContext, + ]); + continue; + } + + if (null !== ($datetime = ag($entity->getExtra($context->backendName), iState::COLUMN_EXTRA_DATE, null))) { + if (false === $ignoreDate && makeDate($datetime) > makeDate($senderDate)) { + $this->logger->warning( + 'Ignoring [{item.title}] for [{backend}]. Sender date is older than backend date.', + [ + 'backend' => $context->backendName, + ...$logContext, + ] + ); + continue; + } + } + + $logContext['remote']['id'] = ag($metadata, iState::COLUMN_ID); + + try { + $url = $context->backendUrl->withPath( + r('/Users/{user_id}/PlayingItems/{item_id}/Progress', [ + 'user_id' => $context->backendUser, + 'item_id' => $logContext['remote']['id'], + ]) + )->withQuery( + http_build_query([ + 'mediaSourceId' => $logContext['remote']['id'], + 'positionTicks' => floor($entity->getPlayProgress() * 1_00_00), + ]) + ); + + $logContext['remote']['url'] = (string)$url; + + $this->logger->debug('Updating [{backend}] {item.type} [{item.title}] watch progress.', [ + 'backend' => $context->backendName, + ...$logContext, + ]); + + if (false === (bool)ag($context->options, Options::DRY_RUN, false)) { + $queue->add( + $this->http->request( + 'POST', + (string)$url, + array_replace_recursive($context->backendHeaders, [ + 'json' => [ + 'UserData' => [ + + ], + ], + 'user_data' => [ + 'id' => $key, + 'context' => $logContext + [ + 'backend' => $context->backendName, + ], + ], + ]) + ) + ); + } + } catch (Throwable $e) { + $this->logger->error( + 'Unhandled exception was thrown during request to change [{backend}] {item.type} [{item.title}] watch progress.', + [ + 'backend' => $context->backendName, + ...$logContext, + 'exception' => [ + 'file' => $e->getFile(), + 'line' => $e->getLine(), + 'kind' => get_class($e), + 'message' => $e->getMessage(), + 'trace' => $context->trace ? $e->getTrace() : [], + ], + ] + ); + } + } + + return new Response(status: true, response: $queue); + } +} diff --git a/src/Backends/Jellyfin/JellyfinClient.php b/src/Backends/Jellyfin/JellyfinClient.php index 0abd7c89..f63b3fde 100644 --- a/src/Backends/Jellyfin/JellyfinClient.php +++ b/src/Backends/Jellyfin/JellyfinClient.php @@ -18,6 +18,7 @@ use App\Backends\Jellyfin\Action\GetUsersList; use App\Backends\Jellyfin\Action\Import; use App\Backends\Jellyfin\Action\InspectRequest; use App\Backends\Jellyfin\Action\ParseWebhook; +use App\Backends\Jellyfin\Action\Progress; use App\Backends\Jellyfin\Action\Push; use App\Backends\Jellyfin\Action\SearchId; use App\Backends\Jellyfin\Action\SearchQuery; @@ -253,6 +254,26 @@ class JellyfinClient implements iClient return []; } + public function progress(array $entities, QueueRequests $queue, iDate|null $after = null): array + { + $response = Container::get(Progress::class)( + context: $this->context, + entities: $entities, + queue: $queue, + after: $after + ); + + if ($response->hasError()) { + $this->logger->log($response->error->level(), $response->error->message, $response->error->context); + } + + if (false === $response->isSuccessful()) { + throw new RuntimeException(ag($response->extra, 'message', fn() => $response->error->format())); + } + + return []; + } + public function search(string $query, int $limit = 25, array $opts = []): array { $response = Container::get(SearchQuery::class)( diff --git a/src/Backends/Plex/Action/Progress.php b/src/Backends/Plex/Action/Progress.php new file mode 100644 index 00000000..fbb31f52 --- /dev/null +++ b/src/Backends/Plex/Action/Progress.php @@ -0,0 +1,168 @@ + $entities + * @param QueueRequests $queue + * @param DateTimeInterface|null $after + * @return Response + */ + public function __invoke( + Context $context, + array $entities, + QueueRequests $queue, + DateTimeInterface|null $after = null + ): Response { + return $this->tryResponse(context: $context, fn: fn() => $this->action($context, $entities, $queue, $after)); + } + + private function action( + Context $context, + array $entities, + QueueRequests $queue, + DateTimeInterface|null $after = null + ): Response { + $ignoreDate = (bool)ag($context->options, Options::IGNORE_DATE, false); + + foreach ($entities as $key => $entity) { + if (true !== ($entity instanceof iState)) { + continue; + } + + if (null !== $after && false === (bool)ag($context->options, Options::IGNORE_DATE, false)) { + if ($after->getTimestamp() > $entity->updated) { + continue; + } + } + + $metadata = $entity->getMetadata($context->backendName); + + $logContext = [ + 'item' => [ + 'id' => $entity->id, + 'type' => $entity->type, + 'title' => $entity->getName(), + ], + ]; + + if (null === ag($metadata, iState::COLUMN_ID, null)) { + $this->logger->warning( + 'Ignoring [{item.title}] for [{backend}]. No metadata was found.', + [ + 'backend' => $context->backendName, + ...$logContext, + ] + ); + continue; + } + + if (null === ($senderDate = ag($entity->getExtra($entity->via), iState::COLUMN_EXTRA_DATE))) { + $this->logger->warning('Ignoring [{item.title}] for [{backend}]. No Sender has set no date.', [ + 'backend' => $context->backendName, + ...$logContext, + ]); + continue; + } + + if ($context->backendName === $entity->via) { + $this->logger->debug('Ignoring event as it was originated from this backend.', [ + 'backend' => $context->backendName, + ...$logContext, + ]); + continue; + } + + if (null !== ($datetime = ag($entity->getExtra($context->backendName), iState::COLUMN_EXTRA_DATE, null))) { + if (false === $ignoreDate && makeDate($datetime) > makeDate($senderDate)) { + $this->logger->warning( + 'Ignoring [{item.title}] for [{backend}]. Sender date is older than backend date.', + [ + 'backend' => $context->backendName, + ...$logContext, + ] + ); + continue; + } + } + + $logContext['remote']['id'] = ag($metadata, iState::COLUMN_ID); + + try { + $url = $context->backendUrl->withPath('/:/progress/')->withQuery( + http_build_query([ + 'key' => $logContext['remote']['id'], + 'identifier' => 'com.plexapp.plugins.library', + 'state' => 'stopped', + 'time' => $entity->getPlayProgress(), + ]) + ); + + $logContext['remote']['url'] = (string)$url; + + $this->logger->debug('Updating [{backend}] {item.type} [{item.title}] watch progress.', [ + 'backend' => $context->backendName, + ...$logContext, + ]); + + if (false === (bool)ag($context->options, Options::DRY_RUN, false)) { + $queue->add( + $this->http->request( + 'POST', + (string)$url, + array_replace_recursive($context->backendHeaders, [ + 'user_data' => [ + 'id' => $key, + 'context' => $logContext + [ + 'backend' => $context->backendName, + ], + ], + ]) + ) + ); + } + } catch (Throwable $e) { + $this->logger->error( + 'Unhandled exception was thrown during request to change [{backend}] {item.type} [{item.title}] watch progress.', + [ + 'backend' => $context->backendName, + ...$logContext, + 'exception' => [ + 'file' => $e->getFile(), + 'line' => $e->getLine(), + 'kind' => get_class($e), + 'message' => $e->getMessage(), + 'trace' => $context->trace ? $e->getTrace() : [], + ], + ] + ); + } + } + + return new Response(status: true, response: $queue); + } +} diff --git a/src/Backends/Plex/PlexClient.php b/src/Backends/Plex/PlexClient.php index e7b72112..f2198bbe 100644 --- a/src/Backends/Plex/PlexClient.php +++ b/src/Backends/Plex/PlexClient.php @@ -19,6 +19,7 @@ use App\Backends\Plex\Action\GetUserToken; use App\Backends\Plex\Action\Import; use App\Backends\Plex\Action\InspectRequest; use App\Backends\Plex\Action\ParseWebhook; +use App\Backends\Plex\Action\Progress; use App\Backends\Plex\Action\Push; use App\Backends\Plex\Action\SearchId; use App\Backends\Plex\Action\SearchQuery; @@ -259,6 +260,26 @@ class PlexClient implements iClient return []; } + public function progress(array $entities, QueueRequests $queue, iDate|null $after = null): array + { + $response = Container::get(Progress::class)( + context: $this->context, + entities: $entities, + queue: $queue, + after: $after + ); + + if ($response->hasError()) { + $this->logger->log($response->error->level(), $response->error->message, $response->error->context); + } + + if (false === $response->isSuccessful()) { + throw new RuntimeException(ag($response->extra, 'message', fn() => $response->error->format())); + } + + return []; + } + public function search(string $query, int $limit = 25, array $opts = []): array { $response = Container::get(SearchQuery::class)( diff --git a/src/Commands/State/ProgressCommand.php b/src/Commands/State/ProgressCommand.php new file mode 100644 index 00000000..c4556b73 --- /dev/null +++ b/src/Commands/State/ProgressCommand.php @@ -0,0 +1,307 @@ +setName(self::ROUTE) + ->setDescription('Push queued watch progress.') + ->addOption('keep', 'k', InputOption::VALUE_NONE, 'Do not expunge queue after run is complete.') + ->addOption('dry-run', null, InputOption::VALUE_NONE, 'Do not commit changes to backends.') + ->addOption('list', 'l', InputOption::VALUE_NONE, 'List queued items.') + ->addOption('ignore-date', null, InputOption::VALUE_NONE, 'Ignore date comparison.') + ->setHelp( + r( + <<***WARNING THIS COMMAND IS EXPERIMENTAL AND MAY NOT WORK AS EXPECTED*** + THIS COMMAND ONLY WORKS CORRECTLY FOR PLEX AT THE MOMENT + + This command push user watch progress to export enabled backends. + You should not run this manually and instead rely on scheduled task to run this command. + + This command require the metadata to be already saved in database. + If no metadata available for a backend, then watch progress update won't be sent to that backend. + HELP, + [ + 'cmd' => trim(commandContext()), + 'route' => self::ROUTE, + ] + ) + ); + } + + /** + * @param InputInterface $input + * @param OutputInterface $output + * @return int + * @throws InvalidArgumentException + */ + protected function runCommand(InputInterface $input, OutputInterface $output): int + { + return $this->single(fn(): int => $this->process($input, $output), $output); + } + + /** + * @throws InvalidArgumentException + */ + protected function process(InputInterface $input, OutputInterface $output): int + { + if (!$this->cache->has('progress')) { + $this->logger->info('No watch progress items in the queue.'); + return self::SUCCESS; + } + + $entities = $items = []; + + foreach ($this->cache->get('progress', []) as $item) { + /** @var iState $item */ + $items[] = Container::get(iState::class)::fromArray($item->getAll()); + } + + if (!empty($items)) { + foreach ($items as $queueItem) { + $dbItem = $this->db->get($queueItem); + $dbItem = $dbItem->apply($queueItem); + + if (!$dbItem->hasPlayProgress()) { + continue; + } + $entities[$dbItem->id] = $dbItem; + } + } + + $items = null; + + if (empty($entities)) { + $this->cache->delete('progress'); + $this->logger->debug('No watch progress items in the queue.'); + return self::SUCCESS; + } + + if ($input->getOption('list')) { + return $this->listItems($input, $output, $entities); + } + + $list = []; + $supported = Config::get('supported', []); + + foreach ((array)Config::get('servers', []) as $backendName => $backend) { + $type = strtolower(ag($backend, 'type', 'unknown')); + + if (true !== (bool)ag($backend, 'export.enabled')) { + $this->logger->info('SYSTEM: Export to [{backend}] is disabled by user.', [ + 'backend' => $backendName, + ]); + + continue; + } + + if (!isset($supported[$type])) { + $this->logger->error('SYSTEM: [{backend}] Invalid type.', [ + 'backend' => $backendName, + 'condition' => [ + 'expected' => implode(', ', array_keys($supported)), + 'given' => $type, + ], + ]); + continue; + } + + if (null === ($url = ag($backend, 'url')) || true !== is_string(parse_url($url, PHP_URL_HOST))) { + $this->logger->error('SYSTEM: [{backend}] Invalid url.', [ + 'backend' => $backendName, + 'url' => $url ?? 'None', + ]); + continue; + } + + $backend['name'] = $backendName; + $list[$backendName] = $backend; + } + + if (empty($list)) { + $this->logger->warning('SYSTEM: There are no backends with export enabled.'); + return self::FAILURE; + } + + foreach ($list as $name => &$backend) { + $opts = ag($backend, 'options', []); + + if ($input->getOption('ignore-date')) { + $opts[Options::IGNORE_DATE] = true; + } + + if ($input->getOption('dry-run')) { + $opts[Options::DRY_RUN] = true; + } + + if ($input->getOption('trace')) { + $opts[Options::DEBUG_TRACE] = true; + } + + $backend['options'] = $opts; + $backend['class'] = $this->getBackend(name: $name, config: $backend); + + $backend['class']->progress(entities: $entities, queue: $this->queue); + } + + unset($backend); + + $total = count($this->queue); + + if ($total >= 1) { + $start = makeDate(); + $this->logger->notice('SYSTEM: Sending [{total}] progress update requests.', [ + 'total' => $total, + 'time' => [ + 'start' => $start, + ], + ]); + + foreach ($this->queue->getQueue() as $response) { + $context = ag($response->getInfo('user_data'), 'context', []); + + try { + if (!in_array($response->getStatusCode(), [200, 204])) { + $this->logger->error( + 'SYSTEM: Request to change [{backend}] [{item.title}] watch progress returned with unexpected [{status_code}] status code.', + [ + 'status_code' => $response->getStatusCode(), + ...$context + ] + ); + continue; + } + + $this->logger->notice('SYSTEM: Updated [{backend}] [{item.title}] watch progress.', $context); + } catch (\Throwable $e) { + $this->logger->error( + 'SYSTEM: Unhandled exception thrown during request to change watch progress of [{backend}] {item.type} [{item.title}].', + [ + ...$context, + 'exception' => [ + 'file' => $e->getFile(), + 'line' => $e->getLine(), + 'kind' => get_class($e), + 'message' => $e->getMessage(), + 'trace' => $e->getTrace(), + ], + ] + ); + } + } + + $end = makeDate(); + + $this->logger->notice('SYSTEM: Sent [{total}] watch progress requests.', [ + 'total' => $total, + 'time' => [ + 'start' => $start, + 'end' => $end, + 'duration' => $end->getTimestamp() - $start->getTimestamp(), + ], + ]); + + $this->logger->notice(sprintf('Using WatchState Version - \'%s\'.', getAppVersion())); + } else { + $this->logger->notice('SYSTEM: No play state changes detected.'); + } + + if (false === $input->getOption('keep') && false === $input->getOption('dry-run')) { + $this->cache->delete('progress'); + } + + return self::SUCCESS; + } + + /** + * List Items. + * + * @param InputInterface $input + * @param OutputInterface $output + * @param array $items + * @return int + */ + private function listItems(InputInterface $input, OutputInterface $output, array $items): int + { + $list = []; + + $mode = $input->getOption('output'); + + foreach ($items as $item) { + if ('table' === $mode) { + $builder = [ + 'queued' => makeDate(ag($item->getExtra($item->via), iState::COLUMN_EXTRA_DATE))->format( + 'Y-m-d H:i:s T' + ), + 'via' => $item->via, + 'title' => $item->getName(), + 'played' => $item->isWatched() ? 'Yes' : 'No', + 'play_time' => $this->formatPlayProgress($item->getPlayProgress()), + 'tainted' => $item->isTainted() ? 'Yes' : 'No', + 'event' => ag($item->getExtra($item->via), iState::COLUMN_EXTRA_EVENT, '??'), + ]; + } else { + $builder = [ + ...$item->getAll(), + 'tainted' => $item->isTainted(), + ]; + } + + $list[] = $builder; + } + + $this->displayContent($list, $output, $mode); + + return self::SUCCESS; + } + + public function formatPlayProgress(int $milliseconds): string + { + $seconds = floor($milliseconds / 1000); + $minutes = floor($seconds / 60); + $hours = floor($minutes / 60); + $seconds = $seconds % 60; + $minutes = $minutes % 60; + + $format = '%02u:%02u:%02u'; + return sprintf($format, $hours, $minutes, $seconds); + } +} diff --git a/src/Libs/Entity/StateEntity.php b/src/Libs/Entity/StateEntity.php index 5d6a5456..bf54f1b0 100644 --- a/src/Libs/Entity/StateEntity.php +++ b/src/Libs/Entity/StateEntity.php @@ -410,6 +410,51 @@ final class StateEntity implements iState return false; } + public function getPlayProgress(): int + { + if ($this->isWatched()) { + return 0; + } + + $compare = []; + + foreach ($this->getMetadata() as $backend => $metadata) { + if (0 !== (int)ag($metadata, iState::COLUMN_WATCHED, 0)) { + continue; + } + + if ((int)ag($metadata, iState::COLUMN_META_DATA_PROGRESS, 0) > 1000) { + $compare[$backend] = [ + 'progress' => (int)ag($metadata, iState::COLUMN_META_DATA_PROGRESS, 0), + 'datetime' => ag($this->getExtra($backend), iState::COLUMN_EXTRA_DATE, 0), + ]; + } + } + + $lastProgress = 0; + $lastDate = makeDate($this->updated - 1); + + foreach ($compare as $data) { + if (null === ($progress = ag($data, 'progress', null))) { + continue; + } + if (null === ($datetime = ag($data, 'datetime', null))) { + continue; + } + + if ($progress < 1000) { + continue; + } + + if (makeDate($datetime) > $lastDate) { + $lastDate = makeDate($datetime); + $lastProgress = $progress; + } + } + + return $lastProgress; + } + private function isEqualValue(string $key, iState $entity): bool { if (iState::COLUMN_UPDATED === $key || iState::COLUMN_WATCHED === $key) { diff --git a/src/Libs/Entity/StateInterface.php b/src/Libs/Entity/StateInterface.php index 7de61002..4175872f 100644 --- a/src/Libs/Entity/StateInterface.php +++ b/src/Libs/Entity/StateInterface.php @@ -352,4 +352,10 @@ interface StateInterface extends LoggerAwareInterface */ public function hasPlayProgress(): bool; + /** + * Get play progress. If the item is watched and/or has no progress, then 0 will be returned. otherwise + * time in milliseconds will be returned. + * @return int + */ + public function getPlayProgress(): int; } diff --git a/src/Libs/Initializer.php b/src/Libs/Initializer.php index 8d59e333..f4071172 100644 --- a/src/Libs/Initializer.php +++ b/src/Libs/Initializer.php @@ -385,9 +385,9 @@ final class Initializer $cache->set('requests', $items, new DateInterval('P3D')); - if (true === $entity->hasPlayProgress()) { + if (false === $metadataOnly && true === $entity->hasPlayProgress()) { $progress = $cache->get('progress', []); - $progress [$itemId] = $entity; + $progress[$itemId] = $entity; $cache->set('progress', $progress, new DateInterval('P1D')); }