From e7acfc6d862fdfd42d5297a3370b6469d8fd95f1 Mon Sep 17 00:00:00 2001 From: abdulmohsen Date: Tue, 24 May 2022 22:21:24 +0300 Subject: [PATCH] Switch between push and export mode based on number of changed items. --- config/config.php | 4 + src/Commands/State/ExportCommand.php | 291 +++++++++++++++++++++------ src/Libs/Servers/JellyfinServer.php | 42 ++-- src/Libs/Servers/PlexServer.php | 43 ++-- 4 files changed, 288 insertions(+), 92 deletions(-) diff --git a/config/config.php b/config/config.php index a908867d..60319d86 100644 --- a/config/config.php +++ b/config/config.php @@ -28,6 +28,10 @@ return (function () { 'storage' => [ 'version' => 'v01', ], + 'export' => [ + // -- Trigger full export mode if changes exceed X number. + 'threshold' => env('WS_EXPORT_THRESHOLD', 1000), + ], ]; $config['tmpDir'] = fixPath(env('WS_TMP_DIR', $config['path'])); diff --git a/src/Commands/State/ExportCommand.php b/src/Commands/State/ExportCommand.php index ff4ff3f3..7de25dfd 100644 --- a/src/Commands/State/ExportCommand.php +++ b/src/Commands/State/ExportCommand.php @@ -9,6 +9,7 @@ use App\Libs\Config; use App\Libs\Data; use App\Libs\Mappers\ExportInterface; use App\Libs\Options; +use App\Libs\Servers\ServerInterface; use App\Libs\Storage\StorageInterface; use Psr\Log\LoggerInterface; use RuntimeException; @@ -38,8 +39,9 @@ class ExportCommand extends Command protected function configure(): void { $this->setName('state:export') - ->setDescription('Export current local play state to backends.') - ->addOption('force-full', 'f', InputOption::VALUE_NONE, 'Force full export. (will ignore lastSync date)') + ->setDescription('Export local play state to backends.') + ->addOption('force-full', 'f', InputOption::VALUE_NONE, 'Force full export. Ignore last sync date.') + ->addOption('dry-run', null, InputOption::VALUE_NONE, 'Do not commit changes to backends.') ->addOption( 'proxy', null, @@ -67,9 +69,9 @@ class ExportCommand extends Command ) ->addOption( 'ignore-date', - null, + 'i', InputOption::VALUE_NONE, - 'Ignore date comparison, and update backend play state to match local play state.' + 'Ignore date comparison, And synchronous backend play state to match local play state.' ) ->addOption('config', 'c', InputOption::VALUE_REQUIRED, 'Use Alternative config file.'); } @@ -93,31 +95,25 @@ class ExportCommand extends Command $config = Config::get('path') . '/config/servers.yaml'; } - $list = []; - $logger = null; - $serversFilter = (string)$input->getOption('servers-filter'); - $selected = explode(',', $serversFilter); - $isCustom = !empty($serversFilter) && count($selected) >= 1; + $backends = []; + $backendsFilter = (string)$input->getOption('servers-filter'); + $selected = explode(',', $backendsFilter); + $isCustom = !empty($backendsFilter) && count($selected) >= 1; $supported = Config::get('supported', []); - if (null !== $logger) { - $this->logger = $logger; - $this->mapper->setLogger($logger); - } - $this->logger->info(sprintf('Using WatchState Version - \'%s\'.', getAppVersion())); - foreach (Config::get('servers', []) as $name => $server) { - $type = strtolower(ag($server, 'type', 'unknown')); + foreach (Config::get('servers', []) as $name => $backend) { + $type = strtolower(ag($backend, 'type', 'unknown')); - if ($isCustom && !in_array($name, $selected, true)) { + if ($isCustom && false === in_array($name, $selected)) { $this->logger->info( sprintf('%s: Ignoring backend as requested by [-s, --servers-filter].', $name) ); continue; } - if (true !== ag($server, 'export.enabled')) { + if (true !== ag($backend, 'export.enabled')) { $this->logger->info(sprintf('%s: Ignoring backend as requested by user config.', $name)); continue; } @@ -134,7 +130,7 @@ class ExportCommand extends Command continue; } - if (null === ($url = ag($server, 'url')) || false === filter_var($url, FILTER_VALIDATE_URL)) { + if (null === ($url = ag($backend, 'url')) || false === filter_var($url, FILTER_VALIDATE_URL)) { $this->logger->error( sprintf('%s: Backend does not have valid url.', $name), [ @@ -144,11 +140,11 @@ class ExportCommand extends Command continue; } - $server['name'] = $name; - $list[$name] = $server; + $backend['name'] = $name; + $backends[$name] = $backend; } - if (empty($list)) { + if (empty($backends)) { $output->writeln( sprintf( '%s', @@ -158,23 +154,23 @@ class ExportCommand extends Command return self::FAILURE; } - $this->logger->notice('MAPPER: Preloading database into memory.'); - $this->mapper->loadData(); - $this->logger->notice('MAPPER: Finished Preloading database.'); + foreach ($backends as &$backend) { + if (null === ($name = ag($backend, 'name'))) { + continue; + } - $this->storage->singleTransaction(); - - $requests = []; - - foreach ($list as $name => &$server) { Data::addBucket($name); - $opts = ag($server, 'options', []); + $opts = ag($backend, 'options', []); if ($input->getOption('ignore-date')) { $opts[Options::IGNORE_DATE] = true; } + if ($input->getOption('dry-run')) { + $opts[Options::DRY_RUN] = true; + } + if ($input->getOption('proxy')) { $opts['client']['proxy'] = $input->getOption('proxy'); } @@ -187,14 +183,159 @@ class ExportCommand extends Command $opts['client']['timeout'] = $input->getOption('timeout'); } - $server['options'] = $opts; - $server['class'] = makeServer($server, $name); + $backend['options'] = $opts; + $backend['class'] = makeServer($backend, $name)->setLogger($this->logger); + } - if (null !== $logger) { - $server['class'] = $server['class']->setLogger($logger); + unset($backend); + + if (false === $this->isPushable($backends, $input)) { + $this->logger->info('Using export mode.'); + $this->export($backends, $input); + } else { + $this->logger->info('Using push mode.'); + $this->push($backends, $input); + } + + if (false === $input->getOption('dry-run')) { + foreach ($backends as $backend) { + if (null === ($name = ag($backend, 'name'))) { + continue; + } + + Config::save(sprintf('servers.%s.persist', $name), $backend['class']->getPersist()); } - $after = true === $input->getOption('force-full') ? null : ag($server, 'export.lastSync', null); + if (false === $custom && is_writable(dirname($config))) { + copy($config, $config . '.bak'); + } + + file_put_contents($config, Yaml::dump(Config::get('servers', []), 8, 2)); + } + + return self::SUCCESS; + } + + protected function push(array $backends, InputInterface $input): int + { + $minDate = time(); + + foreach ($backends as $backend) { + if (null === ($lastSync = ag($backend, 'export.lastSync', null))) { + throw new RuntimeException( + sprintf('%s: does not have recorded export lastSync.', ag($backend, 'name')) + ); + } + + if ($lastSync < $minDate) { + $minDate = $lastSync; + } + } + + $lastSync = makeDate($minDate); + + $this->logger->debug(sprintf('Preloading changed items since \'%s\'.', $lastSync->format('Y-m-d H:i:s T'))); + + $entities = $this->storage->getAll($lastSync); + + if (empty($entities)) { + $this->logger->notice('No items changed since export last sync.', [ + 'lastSync' => $lastSync, + ]); + return self::SUCCESS; + } + + $this->logger->info(sprintf('Using WatchState Version - \'%s\'.', getAppVersion())); + + $requests = []; + + foreach ($backends as $backend) { + if (null === ($name = ag($backend, 'name'))) { + continue; + } + + assert($backend['class'] instanceof ServerInterface); + + array_push($requests, ...$backend['class']->push($entities, makeDate(ag($backend, 'export.lastSync')))); + + if (false === $input->getOption('dry-run')) { + if (true === (bool)Data::get(sprintf('%s.no_export_update', $name))) { + $this->logger->notice( + sprintf('%s: Not updating last export date. Backend reported an error.', $name) + ); + } else { + Config::save(sprintf('servers.%s.export.lastSync', $name), time()); + } + } + } + + $total = count($requests); + + if ($total < 1) { + $this->logger->notice('No play state changes detected.'); + return self::SUCCESS; + } + + $this->logger->notice(sprintf('HTTP: Sending \'%d\' change play state requests.', $total)); + + foreach ($requests as $response) { + $requestData = $response->getInfo('user_data'); + + try { + if (200 !== $response->getStatusCode()) { + $this->logger->error( + sprintf( + '%s: Request to change \'%s\' play state responded with unexpected status code \'%d\'.', + ag($requestData, 'server', '??'), + ag($requestData, 'itemName', '??'), + $response->getStatusCode() + ) + ); + continue; + } + + $this->logger->notice( + sprintf( + '%s: Marked \'%s\' as \'%s\'.', + ag($requestData, 'server', '??'), + ag($requestData, 'itemName', '??'), + ag($requestData, 'state', '??'), + ) + ); + } catch (ExceptionInterface $e) { + $this->logger->error($e->getMessage()); + } + } + + $this->logger->notice(sprintf('HTTP: Processed \'%d\' change play state requests.', $total)); + + return self::SUCCESS; + } + + /** + * Pull and compare status and then push. + * + * @param array $backends + * @param InputInterface $input + * + * @return mixed + */ + protected function export(array $backends, InputInterface $input): mixed + { + $this->logger->notice('MAPPER: Preloading database into memory.'); + $this->mapper->loadData(); + $this->logger->notice('MAPPER: Finished Preloading database.'); + + $this->storage->singleTransaction(); + + $requests = []; + + foreach ($backends as $backend) { + if (null === ($name = ag($backend, 'name'))) { + continue; + } + + $after = true === $input->getOption('force-full') ? null : ag($backend, 'export.lastSync', null); if (null === $after) { $this->logger->notice(sprintf('%s: Exporting all local play state to this backend.', $name)); @@ -209,14 +350,18 @@ class ExportCommand extends Command ); } - array_push($requests, ...$server['class']->export($this->mapper, $after)); + assert($backend['class'] instanceof ServerInterface); - if (true === (bool)Data::get(sprintf('%s.no_export_update', $name))) { - $this->logger->notice( - sprintf('%s: Not updating last export date. Backend reported an error.', $name) - ); - } else { - Config::save(sprintf('servers.%s.export.lastSync', $name), time()); + array_push($requests, ...$backend['class']->export($this->mapper, $after)); + + if (false === $input->getOption('dry-run')) { + if (true === (bool)Data::get(sprintf('%s.no_export_update', $name))) { + $this->logger->notice( + sprintf('%s: Not updating last export date. Backend reported an error.', $name) + ); + } else { + Config::save(sprintf('servers.%s.export.lastSync', $name), time()); + } } } @@ -263,23 +408,57 @@ class ExportCommand extends Command $this->logger->notice('No state changes detected.'); } - foreach ($list as $server) { - if (null === ($name = ag($server, 'name'))) { + return []; + } + + /** + * Is the number of changes exceed export threshold. + * + * @param array $backends + * @param InputInterface $input + * + * @return bool + */ + protected function isPushable(array $backends, InputInterface $input): bool + { + if (true === $input->getOption('force-full')) { + $this->logger->debug('Not possible to use push when --force-full flag is used.'); + return false; + } + + $threshold = Config::get('export.threshold', 1000); + + foreach ($backends as $backend) { + if (null === ($name = ag($backend, 'name'))) { continue; } - Config::save( - sprintf('servers.%s.persist', $name), - $server['class']->getPersist() - ); + if (false === (bool)ag($backend, 'import.enabled')) { + $this->logger->debug( + sprintf('%s: Import are disabled from this backend, not possible to use push.', $name) + ); + return false; + } + + if (null === ($after = ag($backend, 'export.lastSync', null))) { + $this->logger->debug(sprintf('%s: No recorded export last sync date.', $name)); + return false; + } + + $count = $this->storage->getCount(makeDate($after)); + + if ($count > $threshold) { + $this->logger->debug( + sprintf('%s: Changes since last export sync date are greater than the threshold.', $name), + [ + 'threshold' => $threshold, + 'changes' => $count, + ] + ); + return false; + } } - if (false === $custom && is_writable(dirname($config))) { - copy($config, $config . '.bak'); - } - - file_put_contents($config, Yaml::dump(Config::get('servers', []), 8, 2)); - - return self::SUCCESS; + return true; } } diff --git a/src/Libs/Servers/JellyfinServer.php b/src/Libs/Servers/JellyfinServer.php index ff7b30a3..2712c878 100644 --- a/src/Libs/Servers/JellyfinServer.php +++ b/src/Libs/Servers/JellyfinServer.php @@ -695,6 +695,7 @@ class JellyfinServer implements ServerInterface $this->checkConfig(true); $requests = $stateRequests = []; + $count = count($entities); foreach ($entities as $key => $entity) { if (true !== ($entity instanceof StateEntity)) { @@ -733,9 +734,11 @@ class JellyfinServer implements ServerInterface ) ); - $this->logger->debug(sprintf('%s: Requesting \'%s\' state.', $this->name, $iName), [ - 'url' => $url - ]); + if ($count < 20) { + $this->logger->debug(sprintf('%s: Requesting \'%s\' state.', $this->name, $iName), [ + 'url' => $url + ]); + } $requests[] = $this->http->request( 'GET', @@ -743,7 +746,6 @@ class JellyfinServer implements ServerInterface array_replace_recursive($this->getHeaders(), [ 'user_data' => [ 'id' => $key, - 'state' => &$entity, 'suid' => ag($metadata, iFace::COLUMN_ID), ] ]) @@ -759,11 +761,13 @@ class JellyfinServer implements ServerInterface foreach ($requests as $response) { try { - if (null === ($state = ag($response->getInfo('user_data'), 'state'))) { + if (null === ($id = ag($response->getInfo('user_data'), 'id'))) { $this->logger->error(sprintf('%s: Unable to get item entity state.', $this->name)); continue; } + $state = $entities[$id]; + assert($state instanceof iFace); switch ($response->getStatusCode()) { @@ -1480,19 +1484,21 @@ class JellyfinServer implements ServerInterface ] ); - $mapper->queue( - $this->http->request( - $entity->isWatched() ? 'POST' : 'DELETE', - (string)$url, - array_replace_recursive($this->getHeaders(), [ - 'user_data' => [ - 'itemName' => $iName, - 'server' => $this->name, - 'state' => $entity->isWatched() ? 'Played' : 'Unplayed', - ], - ]) - ) - ); + if (false === (bool)ag($this->options, Options::DRY_RUN, false)) { + $mapper->queue( + $this->http->request( + $entity->isWatched() ? 'POST' : 'DELETE', + (string)$url, + array_replace_recursive($this->getHeaders(), [ + 'user_data' => [ + 'itemName' => $iName, + 'server' => $this->name, + 'state' => $entity->isWatched() ? 'Played' : 'Unplayed', + ], + ]) + ) + ); + } } catch (Throwable $e) { $this->logger->error(sprintf('%s: %s', $this->name, $e->getMessage()), [ 'file' => $e->getFile(), diff --git a/src/Libs/Servers/PlexServer.php b/src/Libs/Servers/PlexServer.php index 779b0839..f4bdddcf 100644 --- a/src/Libs/Servers/PlexServer.php +++ b/src/Libs/Servers/PlexServer.php @@ -722,6 +722,8 @@ class PlexServer implements ServerInterface $requests = $stateRequests = []; + $count = count($entities); + foreach ($entities as $key => $entity) { if (true !== ($entity instanceof StateEntity)) { continue; @@ -757,9 +759,11 @@ class PlexServer implements ServerInterface ) ); - $this->logger->debug(sprintf('%s: Requesting \'%s\' state.', $this->name, $iName), [ - 'url' => $url - ]); + if ($count < 20) { + $this->logger->debug(sprintf('%s: Requesting \'%s\' state.', $this->name, $iName), [ + 'url' => $url + ]); + } $requests[] = $this->http->request( 'GET', @@ -767,7 +771,6 @@ class PlexServer implements ServerInterface array_replace_recursive($this->getHeaders(), [ 'user_data' => [ 'id' => $key, - 'state' => &$entity, 'suid' => ag($metadata, iFace::COLUMN_ID), ] ]) @@ -783,11 +786,13 @@ class PlexServer implements ServerInterface foreach ($requests as $response) { try { - if (null === ($state = ag($response->getInfo('user_data'), 'state'))) { + if (null === ($id = ag($response->getInfo('user_data'), 'id'))) { $this->logger->error(sprintf('%s: Unable to get item entity state.', $this->name)); continue; } + $state = $entities[$id]; + assert($state instanceof iFace); switch ($response->getStatusCode()) { @@ -1495,19 +1500,21 @@ class PlexServer implements ServerInterface ] ); - $mapper->queue( - $this->http->request( - 'GET', - (string)$url, - array_replace_recursive($this->getHeaders(), [ - 'user_data' => [ - 'itemName' => $iName, - 'server' => $this->name, - 'state' => $entity->isWatched() ? 'Played' : 'Unplayed', - ] - ]) - ) - ); + if (false === (bool)ag($this->options, Options::DRY_RUN, false)) { + $mapper->queue( + $this->http->request( + 'GET', + (string)$url, + array_replace_recursive($this->getHeaders(), [ + 'user_data' => [ + 'itemName' => $iName, + 'server' => $this->name, + 'state' => $entity->isWatched() ? 'Played' : 'Unplayed', + ] + ]) + ) + ); + } } catch (Throwable $e) { $this->logger->error(sprintf('%s: %s', $this->name, $e->getMessage()), [ 'file' => $e->getFile(),