From dd8425128f80a09f7ac95ee00393a4b9b0689618 Mon Sep 17 00:00:00 2001 From: ArabCoders Date: Sat, 1 Feb 2025 19:02:20 +0300 Subject: [PATCH] Redesigned redesigned how we create sub users config data --- FAQ.md | 48 +- NEWS.md | 13 + README.md | 28 +- config/config.php | 2 +- frontend/assets/css/style.css | 10 + frontend/pages/backends/index.vue | 10 + src/Command.php | 112 ++- src/Commands/Backend/CreateUsersCommand.php | 610 +++++++++++++++ src/Commands/State/BackupCommand.php | 80 +- src/Commands/State/SyncCommand.php | 820 +++++--------------- src/Libs/Mappers/Import/MemoryMapper.php | 30 +- src/Libs/helpers.php | 47 +- 12 files changed, 1021 insertions(+), 789 deletions(-) create mode 100644 src/Commands/Backend/CreateUsersCommand.php diff --git a/FAQ.md b/FAQ.md index 65e11fec..3ea7fd1a 100644 --- a/FAQ.md +++ b/FAQ.md @@ -211,23 +211,32 @@ database state back to the selected backend. ### Is there support for Multi-user setup? -There are minimal support for multi-user setup via `state:sync` command. There are some requirements to get it working -correctly. The tools will try to match the users based on the name, and fallback on the `mapper.yaml` file if it's -provided. The tool will try to sync the users data between the backends. +We are on early stage of supporting multi-user setups, initially few operations are supported. To get started, first you +need to create your own main user backends using admin token for Plex and api key for Jellyfin/Emby. -#### Things that will get synced +Once your own main user is added, make sure to turn on the `import` and `export` for all backends, as the sub users are +initial configuration is based on your own main user configuration. Once your own user is working, turn on the `import` +and `export` tasks in the Tasks page. -* Play status, i.e. watched/unwatched. -* Watch progress. +Now, to create the sub users configurations, you need to run `backend:create` command, which can be done via +`WebUI > Backends > Purple button (users) icon` or via CLI by running the following command: -#### Requirements to get the command working +```bash +$ docker exec -ti watchstate console backend:create -v +``` -* All backends need to have admin level access, this is needed to inquiry about the users and generate the required - access tokens. -* That means for plex, it needs the admin token, to find it - check [plex article about it](https://support.plex.tv/articles/204059436-finding-an-authentication-token-x-plex-token/). -* For jellyfin/emby you need to use the API key, not the user password. You can generate api keys via Dashboard > - Advanced > API Keys. +Once the configuration is created, You can start using the multi-user functionality. Start by enabling the `sync` task +which is responsible for syncing the users play state and watch progress between the backends. + +To enable the task, you can do it via `WebUI > Tasks` page or via CLI by running the following command: + +```bash +$ docker exec -ti watchstate console system:env -k WS_CRON_SYNC -e true +``` + +If your users usernames are different between the backends, you can use the `mapper.yaml` file to map the users between +the backends. For more information about the `mapper.yaml` file, please refer to +the [mapper.yaml](#whats-the-schema-for-the-mapperyaml-file) section. #### Whats the schema for the `mapper.yaml` file? @@ -243,7 +252,7 @@ The schema is simple, it's a list of users in the following format: my_emby_server: name: "mikeJones" options: { } - +# 2nd user... - my_emby_server: name: "jiji_jones" options: { } @@ -253,15 +262,12 @@ The schema is simple, it's a list of users in the following format: my_jellyfin_server: name: "jijiJones" options: { } +#.... more users ``` -This yaml file helps map your users accounts in the different backends, so the tool can sync the correct user data. - -Then simply run `state:sync -v` it will generate the required tokens and match users data between the backends. -then sync the difference. By default, the task is scheduled to run every 3 hour, you can change the schedule by -altering the `WS_CRON_SYNC_AT` environment variable via `ENV` page or `system:env` command. - -To have the task run automatically, you need to enable the task via the `WebUI > Tasks` page or `system:env` command. +This yaml file helps map your users username in the different backends, so the tool can sync the correct user data. If +you added or updated mapping, you should delete `users` directory and generate new data. by running the `backend:create` +command as described in the previous section. ---- diff --git a/NEWS.md b/NEWS.md index 9a5692ba..8b8a2134 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,5 +1,18 @@ # Old Updates +### 2025-01-18 + +Due to popular demand, we finally have added the ability to sync all users data, however, it's limited to only +play state, no progress syncing implemented at this stage. This feature still in alpha expect bugs and issues. + +However our local tests shows that it's working as expected, but we need more testing to be sure. Please report any +issues you encounter. To enable this feature, you will see new task in the `Tasks` page called `Sync`. + +This task will sync all your users play state, However you need to have the backends added with admin token for plex and +API key for jellyfin and emby. Enable the task and let it run, it will sync all users play state. + +Please read the FAQ entry about it at [this link](FAQ.md#is-there-support-for-multi-user-setup). + ### 2024-12-30 We have removed the old environment variables `WS_CRON_PROGRESS` and `WS_CRON_PUSH` in favor of the new ones diff --git a/README.md b/README.md index 9ec03414..52818c7c 100644 --- a/README.md +++ b/README.md @@ -9,6 +9,21 @@ out of the box, this tool support `Jellyfin`, `Plex` and `Emby` media servers. ## Updates +### 2025-02-01 + +Breaking changes as of version 20250201~, in earlier versions, if you want to sync multi-user play state, you only had +to run `state:sync` command, However, due to us extending support for more operation to support multi-user data, we +needed a way to generate per user config instead of relying on `state:sync`, thus we have introduced a new command +called `backends:create`, the purpose of this command is to generate the needed config files for each user. + +This change allow us to support more operations in the future. + +We also have minor breaking change in per user db name, before it was named `user_name.db`, now it's named `user.db` +this change shouldn't effect you as we have backward compatibility in place to rename the old db to the new name. + +for more information about multi-user, Please read the FAQ entry about it +at [this link](FAQ.md#is-there-support-for-multi-user-setup). + ### 2025-01-24 We are excited to share that multi-user sync is now fully supported! Our first goal was to make sure the feature worked, @@ -16,19 +31,6 @@ and since releasing it, we’ve worked hard to improve it based on feedback and as expected and are happy to invite you to start using it. To learn more and get started, please check out the FAQ entry here: [this link](FAQ.md#is-there-support-for-multi-user-setup). -### 2025-01-18 - -Due to popular demand, we finally have added the ability to sync all users data, however, it's limited to only -play state, no progress syncing implemented at this stage. This feature still in alpha expect bugs and issues. - -However our local tests shows that it's working as expected, but we need more testing to be sure. Please report any -issues you encounter. To enable this feature, you will see new task in the `Tasks` page called `Sync`. - -This task will sync all your users play state, However you need to have the backends added with admin token for plex and -API key for jellyfin and emby. Enable the task and let it run, it will sync all users play state. - -Please read the FAQ entry about it at [this link](FAQ.md#is-there-support-for-multi-user-setup). - --- Refer to [NEWS](NEWS.md) for old updates. diff --git a/config/config.php b/config/config.php index 71a34aef..e3d866d1 100644 --- a/config/config.php +++ b/config/config.php @@ -276,7 +276,7 @@ return (function () { SyncCommand::TASK_NAME => [ 'command' => SyncCommand::ROUTE, 'name' => SyncCommand::TASK_NAME, - 'info' => 'Sync ALL users play state. Read the FAQ.', + 'info' => 'Sync sub users play states.', 'enabled' => (bool)env('WS_CRON_SYNC', false), 'timer' => $checkTaskTimer((string)env('WS_CRON_SYNC_AT', '9 */3 * * *'), '9 */3 * * *'), 'args' => env('WS_CRON_SYNC_ARGS', '-v'), diff --git a/frontend/assets/css/style.css b/frontend/assets/css/style.css index 2aac5bf7..3223202f 100644 --- a/frontend/assets/css/style.css +++ b/frontend/assets/css/style.css @@ -140,6 +140,16 @@ body { border: var(--bulma-control-border-width) solid rgba(56, 56, 56, 0.38); } +.button.is-purple { + background-color: #5f00d1; + border-color: transparent; + color: #fff; +} + +.has-text-purple { + color: #5f00d1; +} + @media screen and (min-width: 769px), print { .field.is-grouped-tablet { display: flex; diff --git a/frontend/pages/backends/index.vue b/frontend/pages/backends/index.vue index 56774e7a..16772d22 100644 --- a/frontend/pages/backends/index.vue +++ b/frontend/pages/backends/index.vue @@ -8,6 +8,12 @@
+

+ +

diff --git a/src/Command.php b/src/Command.php index fa6267af..7cef9bf1 100644 --- a/src/Command.php +++ b/src/Command.php @@ -7,18 +7,23 @@ namespace App; use App\Backends\Common\ClientInterface as iClient; use App\Libs\Config; use App\Libs\ConfigFile; +use App\Libs\Container; use App\Libs\Exceptions\RuntimeException; +use App\Libs\Mappers\ExtendedImportInterface as iEImport; +use App\Libs\Options; use App\Listeners\ProcessProfileEvent; use Closure; use DirectoryIterator; +use Psr\Log\LoggerInterface as iLogger; +use Psr\SimpleCache\CacheInterface as iCache; use Symfony\Component\Console\Command\Command as BaseCommand; use Symfony\Component\Console\Command\LockableTrait; use Symfony\Component\Console\Completion\CompletionInput; use Symfony\Component\Console\Completion\CompletionSuggestions; use Symfony\Component\Console\Helper\Table; use Symfony\Component\Console\Helper\TableSeparator; -use Symfony\Component\Console\Input\InputInterface; -use Symfony\Component\Console\Output\OutputInterface; +use Symfony\Component\Console\Input\InputInterface as iInput; +use Symfony\Component\Console\Output\OutputInterface as iOutput; use Symfony\Component\Yaml\Yaml; use Throwable; @@ -42,13 +47,13 @@ class Command extends BaseCommand /** * Execute the command. * - * @param InputInterface $input The input object. - * @param OutputInterface $output The output object. + * @param iInput $input The input object. + * @param iOutput $output The output object. * * @return int The command exit status. * @throws RuntimeException If the profiler was enabled and the run was unsuccessful. */ - protected function execute(InputInterface $input, OutputInterface $output): int + protected function execute(iInput $input, iOutput $output): int { if ($input->hasOption('debug') && $input->getOption('debug')) { $input->setOption('context', true); @@ -57,7 +62,7 @@ class Command extends BaseCommand if (function_exists('putenv')) { @putenv('SHELL_VERBOSITY=3'); } - $output->setVerbosity(OutputInterface::VERBOSITY_DEBUG); + $output->setVerbosity(iOutput::VERBOSITY_DEBUG); } if ($input->hasOption('context') && true === $input->getOption('context')) { @@ -83,7 +88,7 @@ class Command extends BaseCommand $profiler = new \Xhgui\Profiler\Profiler(Config::get('profiler.config', [])); $profiler->enable(Config::get('profiler.flags', null)); $status = $this->runCommand($input, $output); - + try { $data = $profiler->disable(); } catch (Throwable) { @@ -133,11 +138,11 @@ class Command extends BaseCommand * Executes the provided closure in a single instance, ensuring that only one instance of the command is running at a time. * * @param Closure $closure The closure to be executed. - * @param OutputInterface $output The OutputInterface instance for writing output messages. + * @param iOutput $output The OutputInterface instance for writing output messages. * * @return int The return value of the closure. */ - protected function single(Closure $closure, OutputInterface $output): int + protected function single(Closure $closure, iOutput $output): int { try { if (!$this->lock(getAppVersion() . ':' . $this->getName())) { @@ -159,12 +164,12 @@ class Command extends BaseCommand /** * Runs the command and returns the return value. * - * @param InputInterface $input The InputInterface instance for retrieving input data. - * @param OutputInterface $output The OutputInterface instance for writing output messages. + * @param iInput $input The InputInterface instance for retrieving input data. + * @param iOutput $output The OutputInterface instance for writing output messages. * * @return int The return value of the command execution. */ - protected function runCommand(InputInterface $input, OutputInterface $output): int + protected function runCommand(iInput $input, iOutput $output): int { return self::SUCCESS; } @@ -196,10 +201,10 @@ class Command extends BaseCommand * Displays the content in the specified mode. * * @param array $content The content to display. - * @param OutputInterface $output The OutputInterface instance for writing output messages. + * @param iOutput $output The OutputInterface instance for writing output messages. * @param string $mode The display mode. Default is 'json'. */ - protected function displayContent(array $content, OutputInterface $output, string $mode = 'json'): void + protected function displayContent(array $content, iOutput $output, string $mode = 'json'): void { switch ($mode) { case 'json': @@ -257,6 +262,85 @@ class Command extends BaseCommand } } + /** + * Retrieves per user data.. + * + * @param iEImport $mapper The import mapper instance. + * @param iLogger $logger The logger instance. + * @param array $opts (Optional) Additional options. + * + * @return array The user data. + * @throws RuntimeException If the users directory is not readable. + */ + protected function getUserData(iEImport $mapper, iLogger $logger, array $opts = []): array + { + $configs = [ + 'main' => [ + 'config' => ConfigFile::open(Config::get('backends_file'), 'yaml'), + 'mapper' => $mapper, + 'cache' => Container::get(iCache::class), + ] + ]; + + if (true === (bool)ag($opts, 'main_user_only', false)) { + return $configs; + } + + if (true === (bool)ag($opts, 'no_main_user', false)) { + $configs = []; + } + + $usersDir = Config::get('path') . '/users'; + + if (false === is_dir($usersDir)) { + return $configs; + } + + if (false === is_readable($usersDir)) { + throw new RuntimeException(r("Unable to read '{dir}' directory.", ['dir' => $usersDir])); + } + + $mainUserIds = array_map( + fn($backend) => ag($backend, 'user'), + ConfigFile::open(Config::get('backends_file'), 'yaml')->getAll() + ); + + foreach (new DirectoryIterator(Config::get('path') . '/users') as $dir) { + if ($dir->isDot() || false === $dir->isDir()) { + continue; + } + + $config = perUserConfig($dir->getBasename()); + + $subUserIds = array_map(fn($backend) => ag($backend, 'user'), $config->getAll()); + foreach ($mainUserIds as $mainId) { + if (false === in_array($mainId, $subUserIds)) { + continue; + } + continue 2; + } + + $userName = $dir->getBasename(); + $perUserCache = perUserCacheAdapter($userName); + + $configs[$userName] = [ + 'config' => $config, + 'mapper' => $mapper->withDB(perUserDb($userName)) + ->withCache($perUserCache) + ->withLogger($logger) + ->withOptions( + array_replace_recursive($mapper->getOptions(), [ + Options::ALT_NAME => $userName + ]) + ) + ->loadData(), + 'cache' => $perUserCache, + ]; + } + + return $configs; + } + /** * Completes the input by suggesting values for different options and arguments. * diff --git a/src/Commands/Backend/CreateUsersCommand.php b/src/Commands/Backend/CreateUsersCommand.php new file mode 100644 index 00000000..079d52d6 --- /dev/null +++ b/src/Commands/Backend/CreateUsersCommand.php @@ -0,0 +1,610 @@ +setName(self::ROUTE) + ->addOption('regenerate-tokens', 'g', InputOption::VALUE_NONE, 'Generate new tokens for PLEX users.') + ->addOption( + 'update', + 'u', + InputOption::VALUE_NONE, + 'Override sub users configuration based on main user configuration.' + ) + ->setDescription('Generate per user configuration, based on the main user data.') + ->setHelp( + r( + <<[ Important info ] + ------------------ + + You must have already configured your main user backends with admin access this means: + * For plex: you have admin token + * For jellyfin/emby: you have an APIKEY. + + ------- + [ FAQ ] + ------- + + # How to map users? + + Mapping is done automatically based on the username, however, if your users have different usernames + on each backend, you can create {path}/config/mapper.yaml file with the following format: + + - my_plex_server: + name: "mike_jones" + options: { } + my_jellyfin_server: + name: "jones_mike" + options: { } + my_emby_server: + name: "mikeJones" + options: { } + # second user + - my_emby_server: + name: "jiji_jones" + options: { } + my_plex_server: + name: "jones_jiji" + options: { } + my_jellyfin_server: + name: "jijiJones" + options: { } + + # How to regenerate tokens? + + If you want to regenerate tokens for PLEX users, you can use the --regenerate-tokens option. + + # How to update user configuration? + + If you want to update the user configuration based on the main user configuration, you can use the --update option. + + # Do i need to map the main user? + + No, There is no need, as the main user is already configured. + + HELP, + [ + 'cmd' => trim(commandContext()), + 'route' => self::ROUTE, + 'path' => Config::get('path'), + ] + ) + ); + } + + /** + * Executes the command. + * + * @param iInput $input The input interface. + * @param iOutput $output The output interface. + * + * @return int The exit code. 0 for success, 1 for failure. + */ + protected function runCommand(iInput $input, iOutput $output): int + { + $supported = Config::get('supported', []); + $configFile = ConfigFile::open(Config::get('backends_file'), 'yaml'); + $configFile->setLogger($this->logger); + + $mapFile = Config::get('mapper_file'); + $mapping = []; + + if (file_exists($mapFile) && filesize($mapFile) > 10) { + $map = ConfigFile::open(Config::get('mapper_file'), 'yaml'); + $mapping = $map->getAll(); + } + + $backends = []; + + foreach ($configFile->getAll() as $backendName => $backend) { + $type = strtolower(ag($backend, 'type', 'unknown')); + + if (!isset($supported[$type])) { + $this->logger->error("SYSTEM: Ignoring '{backend}'. Unexpected backend type '{type}'.", [ + 'type' => $type, + 'backend' => $backendName, + 'types' => implode(', ', array_keys($supported)), + ]); + continue; + } + + if (null === ($url = ag($backend, 'url')) || false === isValidURL($url)) { + $this->logger->error("SYSTEM: Ignoring '{backend}'. Invalid url '{url}'.", [ + 'url' => $url ?? 'None', + 'backend' => $backendName, + ]); + continue; + } + + $backend['name'] = $backendName; + $backend['class'] = $this->getBackend($backendName, $backend)->setLogger($this->logger); + $backends[$backendName] = $backend; + } + + if (empty($backends)) { + $this->logger->error('SYSTEM: No valid backends were found.'); + return self::FAILURE; + } + + $this->logger->notice("SYSTEM: Getting users list from '{backends}'.", [ + 'backends' => join(', ', array_keys($backends)) + ]); + + $users = []; + + foreach ($backends as $backend) { + /** @var iClient $client */ + $client = ag($backend, 'class'); + assert($backend instanceof iClient); + $this->logger->info("SYSTEM: Getting users from '{backend}'.", [ + 'backend' => $client->getContext()->backendName + ]); + + try { + foreach ($client->getUsersList() as $user) { + /** @var array $info */ + $info = $backend; + $info['user'] = ag($user, 'id', ag($info, 'user')); + $info['backendName'] = r("{backend}_{user}", [ + 'backend' => ag($backend, 'name'), + 'user' => ag($user, 'name'), + ]); + $info['displayName'] = ag($user, 'name'); + $info = ag_delete($info, 'options.' . Options::PLEX_USER_PIN); + $info = ag_delete($info, 'options.' . Options::ADMIN_TOKEN); + $info = ag_set($info, 'options.' . Options::ALT_NAME, ag($backend, 'name')); + $info = ag_set($info, 'options.' . Options::ALT_ID, ag($backend, 'user')); + if (PlexClient::CLIENT_NAME === ucfirst(ag($backend, 'type'))) { + $info = ag_set($info, 'token', 'reuse_or_generate_token'); + $info = ag_set($info, 'options.' . Options::PLEX_USER_NAME, ag($user, 'name')); + $info = ag_set($info, 'options.' . Options::PLEX_USER_UUID, ag($user, 'uuid')); + } + + $user['backend'] = ag($backend, 'name'); + $user['client_data'] = $info; + $users[] = $user; + } + } catch (Throwable $e) { + $this->logger->error( + "Exception '{error.kind}' was thrown unhandled during '{client}: {backend}' get users list. '{error.message}' at '{error.file}:{error.line}'.", + [ + 'backend' => $client->getContext()->backendName, + 'client' => $client->getContext()->clientName, + 'error' => [ + 'kind' => $e::class, + 'line' => $e->getLine(), + 'message' => $e->getMessage(), + 'file' => after($e->getFile(), ROOT_PATH), + ], + 'exception' => [ + 'file' => $e->getFile(), + 'line' => $e->getLine(), + 'kind' => get_class($e), + 'message' => $e->getMessage(), + ], + ] + ); + } + } + + $users = $this->generate_users_list($users, $mapping); + + if (count($users) < 1) { + $this->logger->warning('No users were found.'); + return self::FAILURE; + } + + $this->logger->notice("SYSTEM: User matching results {results}.", [ + 'results' => arrayToString($this->usersList($users)), + ]); + + foreach ($users as $user) { + $userName = ag($user, 'name', 'Unknown'); + + $subUserPath = r(fixPath(Config::get('path') . '/users/{user}'), ['user' => $userName]); + + if (false === is_dir($subUserPath)) { + $this->logger->info("SYSTEM: Creating '{user}' directory '{path}'.", [ + 'user' => $userName, + 'path' => $subUserPath + ]); + if (false === mkdir($subUserPath, 0755, true)) { + $this->logger->error("SYSTEM: Failed to '{user}' directory '{path}'.", [ + 'user' => $userName, + 'path' => $subUserPath + ]); + continue; + } + } + + $config_file = "{$subUserPath}/servers.yaml"; + $this->logger->notice("SYSTEM: Creating '{user}' configuration file '{file}'.", [ + 'user' => $userName, + 'file' => $config_file + ]); + + $perUser = ConfigFile::open($config_file, 'yaml', autoCreate: true); + $perUser->setLogger($this->logger); + $regenerateTokens = $input->getOption('regenerate-tokens'); + + foreach (ag($user, 'backends', []) as $backend) { + $name = ag($backend, 'client_data.backendName'); + $clientData = ag_delete(ag($backend, 'client_data'), 'class'); + $clientData['name'] = $name; + + if (false === $perUser->has($name)) { + $data = $clientData; + $data = ag_set($data, 'import.lastSync', null); + $data = ag_set($data, 'export.lastSync', null); + $data = ag_delete($data, ['webhook', 'name', 'backendName', 'displayName']); + $perUser->set($name, $data); + } else { + $clientData = ag_delete($clientData, ['token', 'import.lastSync', 'export.lastSync']); + $clientData = array_replace_recursive($perUser->get($name), $clientData); + if ($input->getOption('update')) { + $this->logger->info("SYSTEM: Updating user configuration for '{user}@{name}' backend.", [ + 'name' => $name, + 'user' => $userName, + ]); + $perUser->set($name, $clientData); + } + } + + try { + if (true === $regenerateTokens || 'reuse_or_generate_token' === ag($clientData, 'token')) { + /** @var iClient $client */ + $client = ag($backend, 'client_data.class'); + assert($client instanceof iClient); + if (PlexClient::CLIENT_NAME === $client->getType()) { + $clientData['token'] = $client->getUserToken( + ag($clientData, 'options.' . Options::PLEX_USER_UUID), + ag($clientData, 'options.' . Options::PLEX_USER_NAME) + ); + $perUser->set("{$name}.token", $clientData['token']); + } + } + } catch (Throwable $e) { + $this->logger->error( + "Failed to generate access token for '{user}@{name}' backend. '{error}' at '{file}:{line}'.", + [ + 'name' => $name, + 'user' => $userName, + 'error' => [ + 'kind' => $e::class, + 'line' => $e->getLine(), + 'message' => $e->getMessage(), + 'file' => after($e->getFile(), ROOT_PATH), + ], + 'exception' => [ + 'file' => $e->getFile(), + 'line' => $e->getLine(), + 'kind' => get_class($e), + 'message' => $e->getMessage(), + ], + ] + ); + continue; + } + } + + $dbFile = r($subUserPath . "/{user}.db", ['user' => $userName]); + if (false === file_exists($dbFile)) { + $this->logger->notice("SYSTEM: Creating '{user}' database '{db}'.", [ + 'user' => $userName, + 'db' => $dbFile + ]); + perUserDb($userName); + } + + $perUser->persist(); + } + + return self::SUCCESS; + } + + /** + * Generate a list of users that are matched across all backends. + * + * @param array $users The list of users from all backends. + * @param array{string: array{string: string, options: array}} $map The map of users to match. + * + * @return array{name: string, backends: array>}[] The list of matched users. + */ + private function generate_users_list(array $users, array $map = []): array + { + $allBackends = []; + foreach ($users as $u) { + if (!in_array($u['backend'], $allBackends, true)) { + $allBackends[] = $u['backend']; + } + } + + // Build a lookup: $usersBy[backend][lowercased_name] = userObject + $usersBy = []; + foreach ($users as $user) { + $backend = $user['backend']; + $nameLower = strtolower($user['name']); + if (ag($user, 'id') === ag($user, 'client_data.options.' . Options::ALT_ID)) { + $this->logger->debug('Skipping main user "{name}".', ['name' => $user['name']]); + continue; + } + if (!isset($usersBy[$backend])) { + $usersBy[$backend] = []; + } + $usersBy[$backend][$nameLower] = $user; + } + + $results = []; + + // Track used combos: array of [backend, nameLower]. + $used = []; + + // Helper: check if a (backend, nameLower) is already used. + $alreadyUsed = fn(string $b, string $n): bool => in_array([$b, $n], $used, true); + + /** + * Build a "unified" row from matched users across backends. + * - $backendDict example: [ 'backend1' => userObj, 'backend2' => userObj, ... ] + * - Picks a 'name' by "most frequent name" logic (with tie fallback). + * + * Returns an array shaped like: + * + * return [ + * 'name' => 'something', + * 'backends' => [ + * 'backend1' => userObj, + * 'backend2' => userObj, + * ..., + * ] + * ] + * + */ + $buildUnifiedRow = function (array $backendDict) use ($allBackends): array { + // Collect the names in the order of $allBackends for tie-breaking. + $names = []; + foreach ($allBackends as $b) { + if (isset($backendDict[$b])) { + $names[] = $backendDict[$b]['name']; + } + } + + // Tally frequencies + $freq = []; + foreach ($names as $n) { + if (!isset($freq[$n])) { + $freq[$n] = 0; + } + $freq[$n]++; + } + + // Decide a final 'name' + if (empty($freq)) { + $finalName = 'unknown'; + } else { + $max = max($freq); + $candidates = array_keys(array_filter($freq, fn($count) => $count === $max)); + + if (1 === count($candidates)) { + $finalName = $candidates[0]; + } else { + // Tie => pick the first from $names that’s in $candidates + $finalName = null; + foreach ($names as $n) { + if (in_array($n, $candidates, true)) { + $finalName = $n; + break; + } + } + if (!$finalName) { + $finalName = 'unknown'; + } + } + } + + // Build final row: "name" + sub-array "backends" + $row = [ + 'name' => $finalName, + 'backends' => [], + ]; + + // Fill 'backends' + foreach ($allBackends as $b) { + if (isset($backendDict[$b])) { + $row['backends'][$b] = $backendDict[$b]; + } + } + + return $row; + }; + + // Main logic: For each backend and each user in that backend, unify them if we find a match in ≥2 backends. + // We do map-based matching first, then direct-name matching. + foreach ($allBackends as $backend) { + if (!isset($usersBy[$backend])) { + continue; + } + + // For each user in this backend + foreach ($usersBy[$backend] as $nameLower => $userObj) { + // Skip if already used + if ($alreadyUsed($backend, $nameLower)) { + continue; + } + + // Map-based matching first + $matchedMapEntry = null; + foreach ($map as $mapRow) { + if (isset($mapRow[$backend]['name']) && strtolower($mapRow[$backend]['name']) === $nameLower) { + $matchedMapEntry = $mapRow; + break; + } + } + + if ($matchedMapEntry) { + // Build mapMatch from the map row. + $mapMatch = [$backend => $userObj]; + + // Gather all the other backends from the map + foreach ($allBackends as $otherBackend) { + if ($otherBackend === $backend) { + continue; + } + if (isset($matchedMapEntry[$otherBackend]['name'])) { + $mappedNameLower = strtolower($matchedMapEntry[$otherBackend]['name']); + if (isset($usersBy[$otherBackend][$mappedNameLower])) { + $mapMatch[$otherBackend] = $usersBy[$otherBackend][$mappedNameLower]; + } + } + } + + // If we matched ≥ 2 backends, unify them + if (count($mapMatch) >= 2) { + // --- MERGE map-based "options" into client_data => options, if any --- + foreach ($mapMatch as $b => &$matchedUser) { + // If the map entry has an 'options' array for this backend, + // merge it into $matchedUser['client_data']['options']. + if (isset($matchedMapEntry[$b]['options']) && is_array($matchedMapEntry[$b]['options'])) { + $mapOptions = $matchedMapEntry[$b]['options']; + + // Ensure $matchedUser['client_data'] is an array + if (!isset($matchedUser['client_data']) || !is_array($matchedUser['client_data'])) { + $matchedUser['client_data'] = []; + } + + // Ensure $matchedUser['client_data']['options'] is an array + if (!isset($matchedUser['client_data']['options']) || !is_array( + $matchedUser['client_data']['options'] + )) { + $matchedUser['client_data']['options'] = []; + } + + // Merge the map's options + $matchedUser['client_data']['options'] = array_replace_recursive( + $matchedUser['client_data']['options'], + $mapOptions + ); + } + } + unset($matchedUser); // break reference from the loop + + // Build final row + $results[] = $buildUnifiedRow($mapMatch); + + // Mark & remove from $usersBy + foreach ($mapMatch as $b => $mu) { + $nm = strtolower($mu['name']); + $used[] = [$b, $nm]; + unset($usersBy[$b][$nm]); + } + continue; + } else { + $this->logger->error("No partial fallback match via map for '{backend}: {user}'", [ + 'backend' => $userObj['backend'], + 'user' => $userObj['name'], + ]); + } + } + + // Direct-name matching if map fails + $directMatch = [$backend => $userObj]; + foreach ($allBackends as $otherBackend) { + if ($otherBackend === $backend) { + continue; + } + // Same name => direct match + if (isset($usersBy[$otherBackend][$nameLower])) { + $directMatch[$otherBackend] = $usersBy[$otherBackend][$nameLower]; + } + } + + // If direct matched ≥ 2 backends, unify + if (count($directMatch) >= 2) { + // No map "options" to merge here + $results[] = $buildUnifiedRow($directMatch); + + // Mark & remove them from $usersBy + foreach ($directMatch as $b => $matchedUser) { + $nm = strtolower($matchedUser['name']); + $used[] = [$b, $nm]; + unset($usersBy[$b][$nm]); + } + continue; + } + + // If neither map nor direct matched for ≥2 + $this->logger->error("Cannot match user '{backend}: {user}' in any map row or direct match.", [ + 'backend' => $userObj['backend'], + 'user' => $userObj['name'] + ]); + } + } + + return $results; + } + + private function usersList(array $list): array + { + $chunks = []; + + foreach ($list as $row) { + $name = $row['name'] ?? 'unknown'; + + $pairs = []; + if (!empty($row['backends']) && is_array($row['backends'])) { + foreach ($row['backends'] as $backendName => $backendData) { + if (isset($backendData['name'])) { + $pairs[] = r("{name}@{backend}", ['backend' => $backendName, 'name' => $backendData['name']]); + } + } + } + + $chunks[] = r("{name}: {pairs}", ['name' => $name, 'pairs' => implode(', ', $pairs)]); + } + + return $chunks; + } + +} diff --git a/src/Commands/State/BackupCommand.php b/src/Commands/State/BackupCommand.php index 7607878f..0aa61e15 100644 --- a/src/Commands/State/BackupCommand.php +++ b/src/Commands/State/BackupCommand.php @@ -14,7 +14,6 @@ use App\Libs\Mappers\ExtendedImportInterface as iEImport; use App\Libs\Mappers\Import\DirectMapper; use App\Libs\Options; use App\Libs\Stream; -use DirectoryIterator; use Psr\Http\Message\StreamInterface as iStream; use Psr\Log\LoggerInterface as iLogger; use Symfony\Component\Console\Input\InputInterface as iInput; @@ -154,73 +153,6 @@ class BackupCommand extends Command return $this->single(fn(): int => $this->process($input), $output); } - private function getBackends(iInput $input): array - { - $configs = [ - 'main' => [ - 'config' => ConfigFile::open(Config::get('backends_file'), 'yaml'), - 'mapper' => $this->mapper, - 'cache' => null, - ] - ]; - - if (true === $input->getOption('only-main-user')) { - return $configs; - } - - $usersDir = Config::get('path') . '/users'; - - if (false === is_dir($usersDir)) { - return $configs; - } - - if (!is_readable($usersDir)) { - $this->logger->error("SYSTEM: Unable to read '{dir}' directory.", ['dir' => $usersDir]); - return $configs; - } - - $mainUserIds = array_map(fn($backend) => ag($backend, 'user'), ag($configs, 'main.config')->getAll()); - - foreach (new DirectoryIterator(Config::get('path') . '/users') as $dir) { - if ($dir->isDot() || false === $dir->isDir()) { - continue; - } - - $config = perUserConfig($dir->getBasename()); - $subUserIds = array_map(fn($backend) => ag($backend, 'user'), $config->getAll()); - - foreach ($mainUserIds as $mainId) { - if (false === in_array($mainId, $subUserIds)) { - continue; - } - - $this->logger->debug("SYSTEM: Skipping '{user}' backends as it's same as main user.", [ - 'user' => $dir->getBasename(), - 'main' => $mainUserIds, - 'sub' => $subUserIds, - ]); - continue 2; - } - - $userName = $dir->getBasename(); - $perUserCache = perUserCacheAdapter($userName); - - $configs[$userName] = [ - 'config' => $config, - 'mapper' => $this->mapper->withDB(perUserDb($userName)) - ->withCache($perUserCache) - ->withLogger($this->logger) - ->withOptions( - array_replace_recursive($this->mapper->getOptions(), [Options::ALT_NAME => $userName]) - ) - ->loadData(), - 'cache' => $perUserCache, - ]; - } - - return $configs; - } - /** * Execute the command. * @@ -245,8 +177,13 @@ class BackupCommand extends Command $this->mapper->setOptions(options: $mapperOpts); } + $opts = []; + if (true === (bool)$input->getOption('only-main-user')) { + $opts = ['main_user_only' => true]; + } + $this->logger->notice("Using WatchState version - '{version}'.", ['version' => getAppVersion()]); - foreach ($this->getBackends($input) as $user => $opt) { + foreach ($this->getUserData($this->mapper, $this->logger, $opts) as $user => $opt) { try { $this->process_backup($input, $user, $opt); } finally { @@ -449,7 +386,7 @@ class BackupCommand extends Command gc_collect_cycles(); } - foreach ($list as $backend) { + foreach ($list as $b => $backend) { if (null === ($backend['fp'] ?? null)) { continue; } @@ -462,7 +399,8 @@ class BackupCommand extends Command if (false === $noCompression) { $file = $backend['fp']->getMetadata('uri'); - $this->logger->notice("SYSTEM: Compressing '{user}@{file}'.", [ + $this->logger->notice("SYSTEM: Compressing '{user}@{name}' backup file '{file}'.", [ + 'name' => $b, 'user' => $user, 'file' => $file ]); diff --git a/src/Commands/State/SyncCommand.php b/src/Commands/State/SyncCommand.php index 0a3c58a8..0e1db593 100644 --- a/src/Commands/State/SyncCommand.php +++ b/src/Commands/State/SyncCommand.php @@ -6,7 +6,6 @@ namespace App\Commands\State; use App\Backends\Common\Cache as BackendCache; use App\Backends\Common\ClientInterface as iClient; -use App\Backends\Plex\PlexClient; use App\Command; use App\Libs\Attributes\DI\Inject; use App\Libs\Attributes\Route\Cli; @@ -45,8 +44,6 @@ class SyncCommand extends Command public const string TASK_NAME = 'sync'; - private array $mapping = []; - /** * Class Constructor. * @@ -92,59 +89,7 @@ class SyncCommand extends Command InputOption::VALUE_NONE, 'Mapper option. Always update the locally stored metadata from backend.' ) - ->addOption('regenerate-tokens', 'g', InputOption::VALUE_NONE, 'Generate new tokens for all users.') - ->addOption('include-main-user', null, InputOption::VALUE_NONE, 'Include main user in sync.') - ->setHelp( - r( - <<[ FAQ ] - ------- - - Will this work with limited tokens? - - No, This requires admin token for plex backend, and API keys for jellyfin/emby. - We need the admin token for plex to generate user tokens for each user, and we need the API keys - for jellyfin/emby to get the user list and update their play state. - - Known limitations - - Known limitations: - * Cannot be used with plex users that have PIN enabled. - * Cannot sync play progress. - - Some or all of these limitations will be fixed in future releases. - - # How does this sync operation mode work? - - It works by first, getting all users from all backends, and trying to match them by name, - once we build a list of users that are matched, then we basically run the import/export for each user - using in memory storage, it should not have any impact on the real database and cache. - - You can help the matching by using the mapper file, which is a simple YAML file that maps users from one - backend to another, this is useful when the usernames are different or when you want to merge users from - different backends into one user. - - Example of a mapper.yaml file: - - - backend1: "mike_james" - backend2: "james_mike" - - - backend1: "john_doe" - backend2: "doe_john" - - HELP, - [ - 'cmd' => trim(commandContext()), - 'route' => self::ROUTE, - - ] - ) - ); + ->addOption('include-main-user', 'M', InputOption::VALUE_NONE, 'Include main user in sync.'); } /** @@ -175,16 +120,6 @@ class SyncCommand extends Command ]); } - $mapFile = Config::get('mapper_file'); - if (file_exists($mapFile) && filesize($mapFile) > 10) { - $map = ConfigFile::open(Config::get('mapper_file'), 'yaml'); - $this->mapping = $map->getAll(); - } - - $configFile = ConfigFile::open(Config::get('backends_file'), 'yaml'); - $configFile->setLogger($this->logger); - - $backends = []; $selected = $input->getOption('select-backend'); $isCustom = !empty($selected) && count($selected) > 0; $supported = Config::get('supported', []); @@ -212,132 +147,178 @@ class SyncCommand extends Command $this->mapper = $this->mapper->withOptions($mapperOpts); } - foreach ($configFile->getAll() as $backendName => $backend) { - $type = strtolower(ag($backend, 'type', 'unknown')); + $userOpt = [ + 'no_main_user' => !$input->getOption('include-main-user'), + ]; - if ($isCustom && $input->getOption('exclude') === in_array($backendName, $selected)) { - $this->logger->info("SYSTEM: Ignoring '{backend}' as requested by [-s, --select-backend].", [ - 'backend' => $backendName - ]); - continue; - } - - if (true !== (bool)ag($backend, 'import.enabled')) { - $this->logger->info("SYSTEM: Ignoring '{backend}' as the backend has import disabled.", [ - 'backend' => $backendName - ]); - continue; - } - - if (true !== (bool)ag($backend, 'export.enabled')) { - $this->logger->info("SYSTEM: Ignoring '{backend}' as the backend has export disabled.", [ - 'backend' => $backendName - ]); - continue; - } - - if (!isset($supported[$type])) { - $this->logger->error( - "SYSTEM: Ignoring '{backend}' due to unexpected type '{type}'. Expecting '{types}'.", - [ - 'type' => $type, - 'backend' => $backendName, - 'types' => implode(', ', array_keys($supported)), - ] - ); - continue; - } - - if (null === ($url = ag($backend, 'url')) || false === isValidURL($url)) { - $this->logger->error("SYSTEM: Ignoring '{backend}' due to invalid URL. '{url}'.", [ - 'url' => $url ?? 'None', - 'backend' => $backendName, - ]); - continue; - } - - $backend['name'] = $backendName; - $backends[$backendName] = $backend; - } + $backends = $this->getUserData($this->mapper, $this->logger, $userOpt); if (empty($backends)) { - $this->logger->warning('No backends were found.'); - return self::FAILURE; + $this->logger->warning('No users were found. Please create sub users via the backends:create command.'); + return self::SUCCESS; } - foreach ($backends as &$backend) { - if (null === ($name = ag($backend, 'name'))) { - continue; - } - - $opts = ag($backend, 'options', []); - - if ($input->getOption('ignore-date')) { - $opts[Options::IGNORE_DATE] = true; - } - - if ($input->getOption('trace')) { - $opts[Options::DEBUG_TRACE] = true; - } - - if ($input->getOption('dry-run')) { - $opts[Options::DRY_RUN] = true; - } - - if ($input->getOption('timeout')) { - $opts['client']['timeout'] = $input->getOption('timeout'); - } - - $backend['options'] = $opts; - $backend['class'] = $this->getBackend($name, $backend)->setLogger($this->logger); - } - - unset($backend); - - $this->logger->notice("SYSTEM: Getting users list from '{backends}'.", [ - 'backends' => join(', ', array_map(fn($backend) => $backend['name'], $backends)) - ]); - - $users = []; - - foreach ($backends as $backend) { - /** @var iClient $client */ - $client = ag($backend, 'class'); - assert($backend instanceof iClient); - $this->logger->info("SYSTEM: Getting users from '{backend}'.", [ - 'backend' => $client->getContext()->backendName - ]); - + foreach ($backends as $user => $userConf) { try { - foreach ($client->getUsersList() as $user) { - /** @var array $info */ - $info = $backend; - $info['user'] = ag($user, 'id', ag($info, 'user')); - $info['backendName'] = r("{backend}_{user}", [ - 'backend' => ag($backend, 'name'), - 'user' => ag($user, 'name'), - ]); - $info['displayName'] = ag($user, 'name'); - $info = ag_delete($info, 'options.' . Options::PLEX_USER_PIN); - $info = ag_delete($info, 'options.' . Options::ADMIN_TOKEN); - $info = ag_set($info, 'options.' . Options::ALT_NAME, ag($backend, 'name')); - $info = ag_set($info, 'options.' . Options::ALT_ID, ag($backend, 'user')); - if (PlexClient::CLIENT_NAME === ucfirst(ag($backend, 'type'))) { - $info = ag_set($info, 'token', 'reuse_or_generate_token'); - $info = ag_set($info, 'options.' . Options::PLEX_USER_NAME, ag($user, 'name')); - $info = ag_set($info, 'options.' . Options::PLEX_USER_UUID, ag($user, 'uuid')); + $this->queue->reset(); + + $config = ag($userConf, 'config'); + assert($config instanceof ConfigFile); + + $list = []; + + foreach ($config->getAll() as $backendName => $backend) { + $type = strtolower(ag($backend, 'type', 'unknown')); + + if ($isCustom && $input->getOption('exclude') === $this->in_array($selected, $backendName)) { + $this->logger->info("SYSTEM: Ignoring '{user}@{backend}' as requested.", [ + 'user' => $user, + 'backend' => $backendName + ]); + continue; } - $user['backend'] = ag($backend, 'name'); - $user['client_data'] = $info; - $users[] = $user; + if (true !== (bool)ag($backend, 'import.enabled')) { + $this->logger->info("SYSTEM: Ignoring '{user}@{backend}'. Import disabled.", [ + 'user' => $user, + 'backend' => $backendName + ]); + continue; + } + + if (!isset($supported[$type])) { + $this->logger->error("SYSTEM: Ignoring '{user}@{backend}'. Unexpected type '{type}'.", [ + 'user' => $user, + 'type' => $type, + 'backend' => $backendName, + ]); + continue; + } + + if (null === ($url = ag($backend, 'url')) || false === isValidURL($url)) { + $this->logger->error("SYSTEM: Ignoring '{user}@{backend}'. Invalid URL '{url}'.", [ + 'user' => $user, + 'url' => $url ?? 'None', + 'backend' => $backendName, + ]); + continue; + } + + $opts = ag($backend, 'options', []); + + if ($input->getOption('ignore-date')) { + $opts = ag_set($opts, Options::IGNORE_DATE, true); + } + + if ($input->getOption('trace')) { + $opts = ag_set($opts, Options::DEBUG_TRACE, true); + } + + if ($input->getOption('dry-run')) { + $opts = ag_set($opts, Options::DRY_RUN, true); + } + + if ($input->getOption('timeout')) { + $opts = ag_set($opts, 'client.timeout', $input->getOption('timeout')); + } + + $backend['options'] = $opts; + $backend['name'] = $backendName; + $backend['class'] = makeBackend($backend, $backendName, [ + BackendCache::class => Container::get(BackendCache::class)->with( + adapter: ag($userConf, 'cache') + ) + ])->setLogger($this->logger); + + $list[$backendName] = $backend; } + + if (empty($list)) { + $this->logger->warning( + $isCustom ? '[-s, --select-backend] flag did not match any backend.' : 'No backends were found.' + ); + continue; + } + + $start = makeDate(); + $this->logger->notice("SYSTEM: Syncing user '{user}: {list}'.", [ + 'user' => $user, + 'list' => join(', ', array_keys($list)), + 'started' => $start, + ]); + + /** @var iEImport $mapper */ + $mapper = ag($userConf, 'mapper'); + assert($mapper instanceof iEImport); + + $this->handleImport($mapper, $user, $list, $input->getOption('force-full'), $config); + + $changes = $mapper->computeChanges(array_keys($list)); + + foreach ($changes as $b => $changed) { + $count = count($changed); + if ($count < 1) { + continue; + } + $this->logger->notice("SYSTEM: '{changes}' changes detected for '{name}@{backend}'.", [ + 'name' => $user, + 'backend' => $b, + 'changes' => $count, + 'items' => array_map( + fn(iState $i) => [ + 'title' => $i->getName(), + 'state' => $i->isWatched() ? 'played' : 'unplayed', + 'meta' => $i->isSynced(array_keys($list)), + ], + $changed + ) + ]); + + + /** @var iClient $client */ + $client = $list[$b]['class']; + $client->updateState($changed, $this->queue); + } + + $this->handleExport($user, $list); + + $end = makeDate(); + $this->logger->notice("SYSTEM: Completed syncing user '{name}: {list}' in '{time.duration}'s", [ + 'name' => $user, + 'list' => join(', ', array_keys($list)), + 'time' => [ + 'start' => $start, + 'end' => $end, + 'duration' => $end->getTimestamp() - $start->getTimestamp(), + ], + 'memory' => [ + 'now' => getMemoryUsage(), + 'peak' => getPeakMemoryUsage(), + ], + ]); + + // -- Release memory. + if (false === $input->getOption('dry-run')) { + $mapper->commit(); + + foreach ($list as $b => $_) { + $config->set("{$b}.import.lastSync", time()); + $config->set("{$b}.export.lastSync", time()); + } + + $config->persist(); + } else { + $mapper->reset(); + } + + $this->logger->info("SYSTEM: Memory usage after reset '{memory}'.", [ + 'memory' => getMemoryUsage(), + ]); } catch (Throwable $e) { $this->logger->error( - "Exception '{error.kind}' was thrown unhandled during '{client}: {backend}' get users list. '{error.message}' at '{error.file}:{error.line}'.", + "SYSTEM: Exception '{error.kind}' was thrown unhandled during '{name}' sync. '{error.message}' at '{error.file}:{error.line}'.", [ - 'backend' => $client->getContext()->backendName, - 'client' => $client->getContext()->clientName, + 'name' => $user, 'error' => [ 'kind' => $e::class, 'line' => $e->getLine(), @@ -355,178 +336,6 @@ class SyncCommand extends Command } } - $users = $this->generate_users_list($users, $input->getOption('include-main-user'), $this->mapping); - - if (count($users) < 1) { - $this->logger->warning('No users were found.'); - return self::FAILURE; - } - - $this->logger->notice("SYSTEM: User matching results {results}.", [ - 'results' => arrayToString($this->usersList($users)), - ]); - - foreach ($users as $user) { - $this->queue->reset(); - - $userName = ag($user, 'name', 'Unknown'); - $this->logger->info("SYSTEM: Loading '{user}' mapper data. Current memory usage '{memory}'.", [ - 'user' => $userName, - 'memory' => getMemoryUsage(), - ]); - $perUserCache = perUserCacheAdapter($userName); - $perUserMapper = $this->mapper->withDB(perUserDb($userName)) - ->withCache($perUserCache) - ->withLogger($this->logger) - ->withOptions(array_replace_recursive($this->mapper->getOptions(), [Options::ALT_NAME => $userName])) - ->loadData(); - - $this->logger->info("SYSTEM: loading of '{user}' mapper data completed using '{memory}' of memory.", [ - 'user' => $userName, - 'memory' => getMemoryUsage(), - ]); - - $list = []; - $displayName = null; - - $perUser = ConfigFile::open(r(fixPath(Config::get('path') . '/users/{user}/servers.yaml'), [ - 'user' => $userName - ]), 'yaml', autoSave: true, autoCreate: true); - $perUser->setLogger($this->logger); - - $regenerateTokens = $input->getOption('regenerate-tokens'); - - foreach (ag($user, 'backends', []) as $backend) { - $name = ag($backend, 'client_data.backendName'); - $clientData = ag($backend, 'client_data'); - $clientData['name'] = $name; - - if (false === $perUser->has($name)) { - $data = $clientData; - $data = ag_set($data, 'import.lastSync', null); - $data = ag_set($data, 'export.lastSync', null); - $data = ag_delete($data, ['webhook', 'name', 'backendName', 'displayName']); - $perUser->set($name, $data); - } else { - $clientData = ag_delete($clientData, ['token', 'import.lastSync', 'export.lastSync']); - $clientData = array_replace_recursive($perUser->get($name), $clientData); - } - - try { - if (true === $regenerateTokens || 'reuse_or_generate_token' === ag($clientData, 'token')) { - /** @var iClient $client */ - $client = ag($backend, 'client_data.class'); - assert($client instanceof iClient); - if (PlexClient::CLIENT_NAME === $client->getType()) { - $clientData['token'] = $client->getUserToken( - ag($clientData, 'options.' . Options::PLEX_USER_UUID), - ag($clientData, 'options.' . Options::PLEX_USER_NAME) - ); - $perUser->set("{$name}.token", $clientData['token']); - } - } - } catch (Throwable $e) { - $this->logger->error( - "Failed to generate access token for '{user}: {name}' backend. '{error}' at '{file}:{line}'.", - [ - 'name' => $name, - 'user' => $userName, - 'error' => [ - 'kind' => $e::class, - 'line' => $e->getLine(), - 'message' => $e->getMessage(), - 'file' => after($e->getFile(), ROOT_PATH), - ], - 'exception' => [ - 'file' => $e->getFile(), - 'line' => $e->getLine(), - 'kind' => get_class($e), - 'message' => $e->getMessage(), - ], - ] - ); - continue; - } - - $clientData['class'] = makeBackend($clientData, $name, [ - BackendCache::class => Container::get(BackendCache::class)->with(adapter: $perUserCache) - ])->setLogger($this->logger); - - $list[$name] = $clientData; - $displayName = ag($backend, 'client_data.displayName', '??'); - - if (false === $input->getOption('dry-run')) { - $perUser->set("{$name}.import.lastSync", time()); - $perUser->set("{$name}.export.lastSync", time()); - } - } - - $start = makeDate(); - $this->logger->notice("SYSTEM: Syncing user '{user}' -> '{list}'.", [ - 'user' => $displayName, - 'list' => join(', ', array_keys($list)), - 'started' => $start, - ]); - - assert($perUserMapper instanceof iEImport); - $this->handleImport($perUserMapper, $displayName, $list, $input->getOption('force-full'), $perUser); - - assert($perUserMapper instanceof MemoryMapper); - /** @var MemoryMapper $changes */ - $changes = $perUserMapper->computeChanges(array_keys($list)); - - foreach ($changes as $b => $changed) { - $count = count($changed); - if ($count >= 1) { - $this->logger->notice("SYSTEM: Changes detected for '{name}: {backend}' are '{changes}'.", [ - 'name' => $displayName, - 'backend' => $b, - 'changes' => $count, - 'items' => array_map( - fn(iState $i) => [ - 'title' => $i->getName(), - 'state' => $i->isWatched() ? 'played' : 'unplayed', - 'meta' => $i->isSynced(array_keys($list)), - ], - $changed - ) - ]); - /** @var iClient $client */ - $client = $list[$b]['class']; - $client->updateState($changed, $this->queue); - } - } - - $this->handleExport($displayName, ag($user, 'backends', [])); - - $end = makeDate(); - $this->logger->notice("SYSTEM: Completed syncing user '{name}' -> '{list}' in '{time.duration}'s", [ - 'name' => $displayName, - 'list' => join(', ', array_keys($list)), - 'time' => [ - 'start' => $start, - 'end' => $end, - 'duration' => $end->getTimestamp() - $start->getTimestamp(), - ], - 'memory' => [ - 'now' => getMemoryUsage(), - 'peak' => getPeakMemoryUsage(), - ], - ]); - - // -- Release memory. - if (false === $input->getOption('dry-run')) { - $perUserMapper->commit(); - } else { - $perUserMapper->reset(); - } - - $this->logger->info("SYSTEM: Memory usage after reset '{memory}'.", [ - 'memory' => getMemoryUsage(), - ]); - $perUser->persist(); - } - return self::SUCCESS; } @@ -540,22 +349,41 @@ class SyncCommand extends Command /** @var array $queue */ $queue = []; + $this->logger->info("SYSTEM: Loading '{user}' mapper data. Current memory usage '{memory}'.", [ + 'user' => $name, + 'memory' => getMemoryUsage(), + ]); + $mapper->loadData(); + $this->logger->info("SYSTEM: loading of '{user}' mapper data '{count}' completed using '{memory}' of memory.", [ + 'user' => $name, + 'count' => $mapper->count(), + 'memory' => getMemoryUsage(), + ]); + foreach ($backends as $backend) { /** @var iClient $client */ $client = ag($backend, 'class'); + assert($client instanceof iClient); + $context = $client->getContext(); - $after = ag($context->options, Options::FORCE_FULL) || $isFull ? null : $config->get( - $context->backendName . '.import.lastSync' - ); + + if (true === $isFull || ag($context->options, Options::FORCE_FULL)) { + $after = null; + } else { + $after = $config->get($context->backendName . '.import.lastSync'); + } + if (null !== $after) { $after = makeDate($after); } + array_push($queue, ...$client->pull(mapper: $mapper, after: $after)); } $start = makeDate(); - $this->logger->notice("SYSTEM: Waiting on '{total}' requests for import '{name}' data.", [ + $this->logger->notice("SYSTEM: Waiting on '{total}' requests for '{name}: {backends}' data.", [ 'name' => $name, + 'backends' => join(', ', array_keys($backends)), 'total' => number_format(count($queue)), 'time' => [ 'start' => $start, @@ -582,9 +410,10 @@ class SyncCommand extends Command $end = makeDate(); $this->logger->notice( - "SYSTEM: Completed waiting on '{total}' requests in '{time.duration}'s for importing '{name}' data. Parsed '{responses.size}' of data.", + "SYSTEM: Finished waiting on '{total}' requests in '{time.duration}'s for importing '{name}: {backends}' data. Parsed '{responses.size}' of data.", [ 'name' => $name, + 'backends' => join(', ', array_keys($backends)), 'total' => number_format(count($queue)), 'time' => [ 'start' => $start, @@ -615,9 +444,10 @@ class SyncCommand extends Command return; } - $this->logger->notice("SYSTEM: Sending '{total}' change play state requests for '{name}'.", [ + $this->logger->notice("SYSTEM: Sending '{total}' change play state requests for '{name}: {backends}'.", [ 'name' => $name, - 'total' => $total + 'total' => $total, + 'backends' => join(', ', array_keys($backends)), ]); foreach ($this->queue->getQueue() as $response) { @@ -626,7 +456,7 @@ class SyncCommand extends Command try { if (200 !== ($statusCode = $response->getStatusCode())) { $this->logger->error( - "Request to change '{name}: {backend}' '{item.title}' play state returned with unexpected '{status_code}' status code.", + "Request to change '{name}@{backend}' '{item.title}' play state returned with unexpected '{status_code}' status code.", [ 'name' => $name, 'status_code' => $statusCode, @@ -636,13 +466,13 @@ class SyncCommand extends Command continue; } - $this->logger->notice("Marked '{name}: {backend}' '{item.title}' as '{play_state}'.", [ + $this->logger->notice("Marked '{name}@{backend}' '{item.title}' as '{play_state}'.", [ 'name' => $name, ...$context ]); } catch (Throwable $e) { $this->logger->error( - message: "Exception '{error.kind}' was thrown unhandled during '{name}: {backend}' request to change play state of {item.type} '{item.title}'. '{error.message}' at '{error.file}:{error.line}'.", + message: "Exception '{error.kind}' was thrown unhandled during '{name}@{backend}' request to change play state of {item.type} '{item.title}'. '{error.message}' at '{error.file}:{error.line}'.", context: [ 'name' => $name, 'error' => [ @@ -663,271 +493,15 @@ class SyncCommand extends Command } } - $this->logger->notice("SYSTEM: Sent '{total}' change play state requests for '{name}'.", [ + $this->logger->notice("SYSTEM: Sent '{total}' change play state requests for '{name}: {backends}'.", [ 'name' => $name, - 'total' => $total + 'total' => $total, + 'backends' => join(', ', array_keys($backends)), ]); } - /** - * Generate a list of users that are matched across all backends. - * - * @param array $users The list of users from all backends. - * @param array{string: array{string: string, options: array}} $map The map of users to match. - * - * @return array{name: string, backends: array>}[] The list of matched users. - */ - private function generate_users_list(array $users, bool $includeMainUser, array $map = []): array + private function in_array(array $haystack, string $needle): bool { - $allBackends = []; - foreach ($users as $u) { - if (!in_array($u['backend'], $allBackends, true)) { - $allBackends[] = $u['backend']; - } - } - - // Build a lookup: $usersBy[backend][lowercased_name] = userObject - $usersBy = []; - foreach ($users as $user) { - $backend = $user['backend']; - $nameLower = strtolower($user['name']); - if (false === $includeMainUser && ag($user, 'id') === ag($user, 'client_data.options.' . Options::ALT_ID)) { - $this->logger->debug('Skipping main user "{name}" from sync.', ['name' => $user['name']]); - continue; - } - if (!isset($usersBy[$backend])) { - $usersBy[$backend] = []; - } - $usersBy[$backend][$nameLower] = $user; - } - - $results = []; - - // Track used combos: array of [backend, nameLower]. - $used = []; - - // Helper: check if a (backend, nameLower) is already used. - $alreadyUsed = fn(string $b, string $n): bool => in_array([$b, $n], $used, true); - - /** - * Build a "unified" row from matched users across backends. - * - $backendDict example: [ 'backend1' => userObj, 'backend2' => userObj, ... ] - * - Picks a 'name' by "most frequent name" logic (with tie fallback). - * - * Returns an array shaped like: - * - * return [ - * 'name' => 'something', - * 'backends' => [ - * 'backend1' => userObj, - * 'backend2' => userObj, - * ..., - * ] - * ] - * - */ - $buildUnifiedRow = function (array $backendDict) use ($allBackends): array { - // Collect the names in the order of $allBackends for tie-breaking. - $names = []; - foreach ($allBackends as $b) { - if (isset($backendDict[$b])) { - $names[] = $backendDict[$b]['name']; - } - } - - // Tally frequencies - $freq = []; - foreach ($names as $n) { - if (!isset($freq[$n])) { - $freq[$n] = 0; - } - $freq[$n]++; - } - - // Decide a final 'name' - if (empty($freq)) { - $finalName = 'unknown'; - } else { - $max = max($freq); - $candidates = array_keys(array_filter($freq, fn($count) => $count === $max)); - - if (1 === count($candidates)) { - $finalName = $candidates[0]; - } else { - // Tie => pick the first from $names that’s in $candidates - $finalName = null; - foreach ($names as $n) { - if (in_array($n, $candidates, true)) { - $finalName = $n; - break; - } - } - if (!$finalName) { - $finalName = 'unknown'; - } - } - } - - // Build final row: "name" + sub-array "backends" - $row = [ - 'name' => $finalName, - 'backends' => [], - ]; - - // Fill 'backends' - foreach ($allBackends as $b) { - if (isset($backendDict[$b])) { - $row['backends'][$b] = $backendDict[$b]; - } - } - - return $row; - }; - - // Main logic: For each backend and each user in that backend, unify them if we find a match in ≥2 backends. - // We do map-based matching first, then direct-name matching. - foreach ($allBackends as $backend) { - if (!isset($usersBy[$backend])) { - continue; - } - - // For each user in this backend - foreach ($usersBy[$backend] as $nameLower => $userObj) { - // Skip if already used - if ($alreadyUsed($backend, $nameLower)) { - continue; - } - - // Map-based matching first - $matchedMapEntry = null; - foreach ($map as $mapRow) { - if (isset($mapRow[$backend]['name']) && strtolower($mapRow[$backend]['name']) === $nameLower) { - $matchedMapEntry = $mapRow; - break; - } - } - - if ($matchedMapEntry) { - // Build mapMatch from the map row. - $mapMatch = [$backend => $userObj]; - - // Gather all the other backends from the map - foreach ($allBackends as $otherBackend) { - if ($otherBackend === $backend) { - continue; - } - if (isset($matchedMapEntry[$otherBackend]['name'])) { - $mappedNameLower = strtolower($matchedMapEntry[$otherBackend]['name']); - if (isset($usersBy[$otherBackend][$mappedNameLower])) { - $mapMatch[$otherBackend] = $usersBy[$otherBackend][$mappedNameLower]; - } - } - } - - // If we matched ≥ 2 backends, unify them - if (count($mapMatch) >= 2) { - // --- MERGE map-based "options" into client_data => options, if any --- - foreach ($mapMatch as $b => &$matchedUser) { - // If the map entry has an 'options' array for this backend, - // merge it into $matchedUser['client_data']['options']. - if (isset($matchedMapEntry[$b]['options']) && is_array($matchedMapEntry[$b]['options'])) { - $mapOptions = $matchedMapEntry[$b]['options']; - - // Ensure $matchedUser['client_data'] is an array - if (!isset($matchedUser['client_data']) || !is_array($matchedUser['client_data'])) { - $matchedUser['client_data'] = []; - } - - // Ensure $matchedUser['client_data']['options'] is an array - if (!isset($matchedUser['client_data']['options']) || !is_array( - $matchedUser['client_data']['options'] - )) { - $matchedUser['client_data']['options'] = []; - } - - // Merge the map's options - $matchedUser['client_data']['options'] = array_replace_recursive( - $matchedUser['client_data']['options'], - $mapOptions - ); - } - } - unset($matchedUser); // break reference from the loop - - // Build final row - $results[] = $buildUnifiedRow($mapMatch); - - // Mark & remove from $usersBy - foreach ($mapMatch as $b => $mu) { - $nm = strtolower($mu['name']); - $used[] = [$b, $nm]; - unset($usersBy[$b][$nm]); - } - continue; - } else { - $this->logger->error("No partial fallback match via map for '{backend}: {user}'", [ - 'backend' => $userObj['backend'], - 'user' => $userObj['name'], - ]); - } - } - - // Direct-name matching if map fails - $directMatch = [$backend => $userObj]; - foreach ($allBackends as $otherBackend) { - if ($otherBackend === $backend) { - continue; - } - // Same name => direct match - if (isset($usersBy[$otherBackend][$nameLower])) { - $directMatch[$otherBackend] = $usersBy[$otherBackend][$nameLower]; - } - } - - // If direct matched ≥ 2 backends, unify - if (count($directMatch) >= 2) { - // No map "options" to merge here - $results[] = $buildUnifiedRow($directMatch); - - // Mark & remove them from $usersBy - foreach ($directMatch as $b => $matchedUser) { - $nm = strtolower($matchedUser['name']); - $used[] = [$b, $nm]; - unset($usersBy[$b][$nm]); - } - continue; - } - - // If neither map nor direct matched for ≥2 - $this->logger->error("Cannot match user '{backend}: {user}' in any map row or direct match.", [ - 'backend' => $userObj['backend'], - 'user' => $userObj['name'] - ]); - } - } - - return $results; - } - - private function usersList(array $list): array - { - $chunks = []; - - foreach ($list as $row) { - $name = $row['name'] ?? 'unknown'; - - $pairs = []; - if (!empty($row['backends']) && is_array($row['backends'])) { - foreach ($row['backends'] as $backendName => $backendData) { - if (isset($backendData['name'])) { - $pairs[] = r("{name}@{backend}", ['backend' => $backendName, 'name' => $backendData['name']]); - } - } - } - - $chunks[] = r("{name}: {pairs}", ['name' => $name, 'pairs' => implode(', ', $pairs)]); - } - - return $chunks; + return array_any($haystack, fn($item) => str_starts_with($item, $needle)); } } diff --git a/src/Libs/Mappers/Import/MemoryMapper.php b/src/Libs/Mappers/Import/MemoryMapper.php index 54088e0c..16eff0da 100644 --- a/src/Libs/Mappers/Import/MemoryMapper.php +++ b/src/Libs/Mappers/Import/MemoryMapper.php @@ -392,20 +392,22 @@ class MemoryMapper implements ExtendedImportInterface Message::increment("{$entity->via}.{$entity->type}.ignored_not_played_since_last_sync"); if ($entity->isWatched() !== $this->objects[$pointer]->isWatched()) { - $this->logger->notice( - "{mapper}: [O] '{backend}' item '{id}: {title}' is marked as '{state}' vs local state '{local_state}', However due to the remote item date '{remote_date}' being older than the last backend sync date '{local_date}'. it was not considered as valid state.", - [ - 'mapper' => afterLast(self::class, '\\'), - 'id' => $this->objects[$pointer]->id, - 'backend' => $entity->via, - 'remote_date' => makeDate($entity->updated), - 'local_date' => makeDate($opts['after']), - 'state' => $entity->isWatched() ? 'played' : 'unplayed', - 'local_state' => $this->objects[$pointer]->isWatched() ? 'played' : 'unplayed', - 'title' => $entity->getName(), - ] - ); - return $this; + if ($this->inTraceMode()) { + $this->logger->debug( + "{mapper}: [O] '{backend}' item '{id}: {title}' is marked as '{state}' vs local state '{local_state}', However due to the remote item date '{remote_date}' being older than the last backend sync date '{local_date}'. it was not considered as valid state.", + [ + 'mapper' => afterLast(self::class, '\\'), + 'id' => $this->objects[$pointer]->id, + 'backend' => $entity->via, + 'remote_date' => makeDate($entity->updated), + 'local_date' => makeDate($opts['after']), + 'state' => $entity->isWatched() ? 'played' : 'unplayed', + 'local_state' => $this->objects[$pointer]->isWatched() ? 'played' : 'unplayed', + 'title' => $entity->getName(), + ] + ); + } + return $this->handleTainted($pointer, $cloned, $entity, $opts); } if ($this->inTraceMode()) { diff --git a/src/Libs/helpers.php b/src/Libs/helpers.php index f965bc7f..b7ff9123 100644 --- a/src/Libs/helpers.php +++ b/src/Libs/helpers.php @@ -2214,7 +2214,18 @@ if (!function_exists('perUserDb')) { } } - $dbFile = fixPath(r("{path}/{user}.db", ['path' => $path, 'user' => $user])); + $dbFile = fixPath(r("{path}/user.db", ['path' => $path])); + $oldDb = fixPath(r("{path}/{user}.db", ['path' => $path, 'user' => $user])); + if (true === file_exists($oldDb)) { + if (false === file_exists($dbFile)) { + rename($oldDb, $dbFile); + clearstatcache(true, $oldDb); + clearstatcache(true, $dbFile); + } else { + unlink($oldDb); + } + } + $inTestMode = true === (defined('IN_TEST_MODE') && true === IN_TEST_MODE); $dsn = r('sqlite:{src}', ['src' => $inTestMode ? ':memory:' : $dbFile]); @@ -2261,7 +2272,7 @@ if (!function_exists('perUserConfig')) { } } - return ConfigFile::open(fixPath(r("{path}/servers.yaml", ['path' => $path])), 'yaml'); + return ConfigFile::open(fixPath(r("{path}/servers.yaml", ['path' => $path])), 'yaml', autoCreate: true); } } @@ -2283,36 +2294,7 @@ if (!function_exists('perUserCacheAdapter')) { } try { - $cacheUrl = Config::get('cache.url'); - - if (empty($cacheUrl)) { - throw new RuntimeException('No cache server was set.'); - } - - if (!extension_loaded('redis')) { - throw new RuntimeException('Redis extension is not loaded.'); - } - - $uri = new Uri($cacheUrl); - $params = []; - - if (!empty($uri->getQuery())) { - parse_str($uri->getQuery(), $params); - } - - $redis = new Redis(); - - $redis->connect($uri->getHost(), $uri->getPort() ?? 6379); - - if (null !== ag($params, 'password')) { - $redis->auth(ag($params, 'password')); - } - - if (null !== ag($params, 'db')) { - $redis->select((int)ag($params, 'db')); - } - - $backend = new RedisAdapter(redis: $redis, namespace: $ns); + $backend = new RedisAdapter(redis: Container::get(Redis::class), namespace: $ns); } catch (Throwable) { // -- in case of error, fallback to file system cache. $path = fixPath(r("{path}/users/{user}/cache", ['path' => Config::get('path'), 'user' => $user])); @@ -2419,3 +2401,4 @@ if (!function_exists('readFileFromArchive')) { return [Stream::make($stream, 'r'), $zip]; } } +