Redesigned redesigned how we create sub users config data

This commit is contained in:
ArabCoders
2025-02-01 19:02:20 +03:00
parent 54a8556149
commit dd8425128f
12 changed files with 1021 additions and 789 deletions

48
FAQ.md
View File

@@ -211,23 +211,32 @@ database state back to the selected backend.
### Is there support for Multi-user setup?
There are minimal support for multi-user setup via `state:sync` command. There are some requirements to get it working
correctly. The tools will try to match the users based on the name, and fallback on the `mapper.yaml` file if it's
provided. The tool will try to sync the users data between the backends.
We are on early stage of supporting multi-user setups, initially few operations are supported. To get started, first you
need to create your own main user backends using admin token for Plex and api key for Jellyfin/Emby.
#### Things that will get synced
Once your own main user is added, make sure to turn on the `import` and `export` for all backends, as the sub users are
initial configuration is based on your own main user configuration. Once your own user is working, turn on the `import`
and `export` tasks in the Tasks page.
* Play status, i.e. watched/unwatched.
* Watch progress.
Now, to create the sub users configurations, you need to run `backend:create` command, which can be done via
`WebUI > Backends > Purple button (users) icon` or via CLI by running the following command:
#### Requirements to get the command working
```bash
$ docker exec -ti watchstate console backend:create -v
```
* All backends need to have admin level access, this is needed to inquiry about the users and generate the required
access tokens.
* That means for plex, it needs the admin token, to find it
check [plex article about it](https://support.plex.tv/articles/204059436-finding-an-authentication-token-x-plex-token/).
* For jellyfin/emby you need to use the API key, not the user password. You can generate api keys via Dashboard >
Advanced > API Keys.
Once the configuration is created, You can start using the multi-user functionality. Start by enabling the `sync` task
which is responsible for syncing the users play state and watch progress between the backends.
To enable the task, you can do it via `WebUI > Tasks` page or via CLI by running the following command:
```bash
$ docker exec -ti watchstate console system:env -k WS_CRON_SYNC -e true
```
If your users usernames are different between the backends, you can use the `mapper.yaml` file to map the users between
the backends. For more information about the `mapper.yaml` file, please refer to
the [mapper.yaml](#whats-the-schema-for-the-mapperyaml-file) section.
#### Whats the schema for the `mapper.yaml` file?
@@ -243,7 +252,7 @@ The schema is simple, it's a list of users in the following format:
my_emby_server:
name: "mikeJones"
options: { }
# 2nd user...
- my_emby_server:
name: "jiji_jones"
options: { }
@@ -253,15 +262,12 @@ The schema is simple, it's a list of users in the following format:
my_jellyfin_server:
name: "jijiJones"
options: { }
#.... more users
```
This yaml file helps map your users accounts in the different backends, so the tool can sync the correct user data.
Then simply run `state:sync -v` it will generate the required tokens and match users data between the backends.
then sync the difference. By default, the task is scheduled to run every 3 hour, you can change the schedule by
altering the `WS_CRON_SYNC_AT` environment variable via `ENV` page or `system:env` command.
To have the task run automatically, you need to enable the task via the `WebUI > Tasks` page or `system:env` command.
This yaml file helps map your users username in the different backends, so the tool can sync the correct user data. If
you added or updated mapping, you should delete `users` directory and generate new data. by running the `backend:create`
command as described in the previous section.
----

13
NEWS.md
View File

@@ -1,5 +1,18 @@
# Old Updates
### 2025-01-18
Due to popular demand, we finally have added the ability to sync all users data, however, it's limited to only
play state, no progress syncing implemented at this stage. This feature still in alpha expect bugs and issues.
However our local tests shows that it's working as expected, but we need more testing to be sure. Please report any
issues you encounter. To enable this feature, you will see new task in the `Tasks` page called `Sync`.
This task will sync all your users play state, However you need to have the backends added with admin token for plex and
API key for jellyfin and emby. Enable the task and let it run, it will sync all users play state.
Please read the FAQ entry about it at [this link](FAQ.md#is-there-support-for-multi-user-setup).
### 2024-12-30
We have removed the old environment variables `WS_CRON_PROGRESS` and `WS_CRON_PUSH` in favor of the new ones

View File

@@ -9,6 +9,21 @@ out of the box, this tool support `Jellyfin`, `Plex` and `Emby` media servers.
## Updates
### 2025-02-01
Breaking changes as of version 20250201~, in earlier versions, if you want to sync multi-user play state, you only had
to run `state:sync` command, However, due to us extending support for more operation to support multi-user data, we
needed a way to generate per user config instead of relying on `state:sync`, thus we have introduced a new command
called `backends:create`, the purpose of this command is to generate the needed config files for each user.
This change allow us to support more operations in the future.
We also have minor breaking change in per user db name, before it was named `user_name.db`, now it's named `user.db`
this change shouldn't effect you as we have backward compatibility in place to rename the old db to the new name.
for more information about multi-user, Please read the FAQ entry about it
at [this link](FAQ.md#is-there-support-for-multi-user-setup).
### 2025-01-24
We are excited to share that multi-user sync is now fully supported! Our first goal was to make sure the feature worked,
@@ -16,19 +31,6 @@ and since releasing it, weve worked hard to improve it based on feedback and
as expected and are happy to invite you to start using it. To learn more and get started, please check out the FAQ entry
here: [this link](FAQ.md#is-there-support-for-multi-user-setup).
### 2025-01-18
Due to popular demand, we finally have added the ability to sync all users data, however, it's limited to only
play state, no progress syncing implemented at this stage. This feature still in alpha expect bugs and issues.
However our local tests shows that it's working as expected, but we need more testing to be sure. Please report any
issues you encounter. To enable this feature, you will see new task in the `Tasks` page called `Sync`.
This task will sync all your users play state, However you need to have the backends added with admin token for plex and
API key for jellyfin and emby. Enable the task and let it run, it will sync all users play state.
Please read the FAQ entry about it at [this link](FAQ.md#is-there-support-for-multi-user-setup).
---
Refer to [NEWS](NEWS.md) for old updates.

View File

@@ -276,7 +276,7 @@ return (function () {
SyncCommand::TASK_NAME => [
'command' => SyncCommand::ROUTE,
'name' => SyncCommand::TASK_NAME,
'info' => 'Sync ALL users play state. Read the FAQ.',
'info' => 'Sync sub users play states.',
'enabled' => (bool)env('WS_CRON_SYNC', false),
'timer' => $checkTaskTimer((string)env('WS_CRON_SYNC_AT', '9 */3 * * *'), '9 */3 * * *'),
'args' => env('WS_CRON_SYNC_ARGS', '-v'),

View File

@@ -140,6 +140,16 @@ body {
border: var(--bulma-control-border-width) solid rgba(56, 56, 56, 0.38);
}
.button.is-purple {
background-color: #5f00d1;
border-color: transparent;
color: #fff;
}
.has-text-purple {
color: #5f00d1;
}
@media screen and (min-width: 769px), print {
.field.is-grouped-tablet {
display: flex;

View File

@@ -8,6 +8,12 @@
</span>
<div class="is-pulled-right">
<div class="field is-grouped">
<p class="control" v-if="backends && backends.length>0">
<button class="button is-purple" v-tooltip.bottom="'Create sub users backends.'"
@click="navigateTo(makeConsoleCommand('backend:create -v', true))">
<span class="icon"><i class="fas fa-users"></i></span>
</button>
</p>
<p class="control">
<button class="button is-primary" v-tooltip.bottom="'Add New Backend'"
@click="toggleForm = !toggleForm" :disabled="isLoading">
@@ -164,6 +170,10 @@
<li>
<strong>Export</strong> means pushing data from the local database to the backends.
</li>
<li>
To create sub users backends, click on the <span class="icon has-text-purple"><i class="fas fa-users"/></span>
button.
</li>
</ul>
</Message>
</div>

View File

@@ -7,18 +7,23 @@ namespace App;
use App\Backends\Common\ClientInterface as iClient;
use App\Libs\Config;
use App\Libs\ConfigFile;
use App\Libs\Container;
use App\Libs\Exceptions\RuntimeException;
use App\Libs\Mappers\ExtendedImportInterface as iEImport;
use App\Libs\Options;
use App\Listeners\ProcessProfileEvent;
use Closure;
use DirectoryIterator;
use Psr\Log\LoggerInterface as iLogger;
use Psr\SimpleCache\CacheInterface as iCache;
use Symfony\Component\Console\Command\Command as BaseCommand;
use Symfony\Component\Console\Command\LockableTrait;
use Symfony\Component\Console\Completion\CompletionInput;
use Symfony\Component\Console\Completion\CompletionSuggestions;
use Symfony\Component\Console\Helper\Table;
use Symfony\Component\Console\Helper\TableSeparator;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Input\InputInterface as iInput;
use Symfony\Component\Console\Output\OutputInterface as iOutput;
use Symfony\Component\Yaml\Yaml;
use Throwable;
@@ -42,13 +47,13 @@ class Command extends BaseCommand
/**
* Execute the command.
*
* @param InputInterface $input The input object.
* @param OutputInterface $output The output object.
* @param iInput $input The input object.
* @param iOutput $output The output object.
*
* @return int The command exit status.
* @throws RuntimeException If the profiler was enabled and the run was unsuccessful.
*/
protected function execute(InputInterface $input, OutputInterface $output): int
protected function execute(iInput $input, iOutput $output): int
{
if ($input->hasOption('debug') && $input->getOption('debug')) {
$input->setOption('context', true);
@@ -57,7 +62,7 @@ class Command extends BaseCommand
if (function_exists('putenv')) {
@putenv('SHELL_VERBOSITY=3');
}
$output->setVerbosity(OutputInterface::VERBOSITY_DEBUG);
$output->setVerbosity(iOutput::VERBOSITY_DEBUG);
}
if ($input->hasOption('context') && true === $input->getOption('context')) {
@@ -83,7 +88,7 @@ class Command extends BaseCommand
$profiler = new \Xhgui\Profiler\Profiler(Config::get('profiler.config', []));
$profiler->enable(Config::get('profiler.flags', null));
$status = $this->runCommand($input, $output);
try {
$data = $profiler->disable();
} catch (Throwable) {
@@ -133,11 +138,11 @@ class Command extends BaseCommand
* Executes the provided closure in a single instance, ensuring that only one instance of the command is running at a time.
*
* @param Closure $closure The closure to be executed.
* @param OutputInterface $output The OutputInterface instance for writing output messages.
* @param iOutput $output The OutputInterface instance for writing output messages.
*
* @return int The return value of the closure.
*/
protected function single(Closure $closure, OutputInterface $output): int
protected function single(Closure $closure, iOutput $output): int
{
try {
if (!$this->lock(getAppVersion() . ':' . $this->getName())) {
@@ -159,12 +164,12 @@ class Command extends BaseCommand
/**
* Runs the command and returns the return value.
*
* @param InputInterface $input The InputInterface instance for retrieving input data.
* @param OutputInterface $output The OutputInterface instance for writing output messages.
* @param iInput $input The InputInterface instance for retrieving input data.
* @param iOutput $output The OutputInterface instance for writing output messages.
*
* @return int The return value of the command execution.
*/
protected function runCommand(InputInterface $input, OutputInterface $output): int
protected function runCommand(iInput $input, iOutput $output): int
{
return self::SUCCESS;
}
@@ -196,10 +201,10 @@ class Command extends BaseCommand
* Displays the content in the specified mode.
*
* @param array $content The content to display.
* @param OutputInterface $output The OutputInterface instance for writing output messages.
* @param iOutput $output The OutputInterface instance for writing output messages.
* @param string $mode The display mode. Default is 'json'.
*/
protected function displayContent(array $content, OutputInterface $output, string $mode = 'json'): void
protected function displayContent(array $content, iOutput $output, string $mode = 'json'): void
{
switch ($mode) {
case 'json':
@@ -257,6 +262,85 @@ class Command extends BaseCommand
}
}
/**
* Retrieves per user data..
*
* @param iEImport $mapper The import mapper instance.
* @param iLogger $logger The logger instance.
* @param array $opts (Optional) Additional options.
*
* @return array<array-key, array{config:ConfigFile, mapper:iEImport, cache:iCache}> The user data.
* @throws RuntimeException If the users directory is not readable.
*/
protected function getUserData(iEImport $mapper, iLogger $logger, array $opts = []): array
{
$configs = [
'main' => [
'config' => ConfigFile::open(Config::get('backends_file'), 'yaml'),
'mapper' => $mapper,
'cache' => Container::get(iCache::class),
]
];
if (true === (bool)ag($opts, 'main_user_only', false)) {
return $configs;
}
if (true === (bool)ag($opts, 'no_main_user', false)) {
$configs = [];
}
$usersDir = Config::get('path') . '/users';
if (false === is_dir($usersDir)) {
return $configs;
}
if (false === is_readable($usersDir)) {
throw new RuntimeException(r("Unable to read '{dir}' directory.", ['dir' => $usersDir]));
}
$mainUserIds = array_map(
fn($backend) => ag($backend, 'user'),
ConfigFile::open(Config::get('backends_file'), 'yaml')->getAll()
);
foreach (new DirectoryIterator(Config::get('path') . '/users') as $dir) {
if ($dir->isDot() || false === $dir->isDir()) {
continue;
}
$config = perUserConfig($dir->getBasename());
$subUserIds = array_map(fn($backend) => ag($backend, 'user'), $config->getAll());
foreach ($mainUserIds as $mainId) {
if (false === in_array($mainId, $subUserIds)) {
continue;
}
continue 2;
}
$userName = $dir->getBasename();
$perUserCache = perUserCacheAdapter($userName);
$configs[$userName] = [
'config' => $config,
'mapper' => $mapper->withDB(perUserDb($userName))
->withCache($perUserCache)
->withLogger($logger)
->withOptions(
array_replace_recursive($mapper->getOptions(), [
Options::ALT_NAME => $userName
])
)
->loadData(),
'cache' => $perUserCache,
];
}
return $configs;
}
/**
* Completes the input by suggesting values for different options and arguments.
*

View File

@@ -0,0 +1,610 @@
<?php
declare(strict_types=1);
namespace App\Commands\Backend;
use App\Backends\Common\ClientInterface as iClient;
use App\Backends\Plex\PlexClient;
use App\Command;
use App\Libs\Attributes\Route\Cli;
use App\Libs\Config;
use App\Libs\ConfigFile;
use App\Libs\Options;
use Psr\Log\LoggerInterface as iLogger;
use Symfony\Component\Console\Input\InputInterface as iInput;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface as iOutput;
use Throwable;
/**
* Class CreateUsersCommand
*
* This command generates per user backends files, based on the main user configuration.
*
* @Routable(command: self::ROUTE)
*/
#[Cli(command: self::ROUTE)]
class CreateUsersCommand extends Command
{
public const string ROUTE = 'backend:create';
public function __construct(private iLogger $logger)
{
parent::__construct();
}
/**
* Configures the command.
*
* @return void
*/
protected function configure(): void
{
$this->setName(self::ROUTE)
->addOption('regenerate-tokens', 'g', InputOption::VALUE_NONE, 'Generate new tokens for PLEX users.')
->addOption(
'update',
'u',
InputOption::VALUE_NONE,
'Override sub users configuration based on main user configuration.'
)
->setDescription('Generate per user configuration, based on the main user data.')
->setHelp(
r(
<<<HELP
This command create per user configuration files based on the main user backends configuration.
------------------
<notice>[ Important info ]</notice>
------------------
You must have already configured your main user backends with admin access this means:
* For plex: you have admin token
* For jellyfin/emby: you have an APIKEY.
-------
<notice>[ FAQ ]</notice>
-------
<question># How to map users?</question>
Mapping is done automatically based on the username, however, if your users have different usernames
on each backend, you can create <value>{path}/config/mapper.yaml</value> file with the following format:
- my_plex_server:
name: "mike_jones"
options: { }
my_jellyfin_server:
name: "jones_mike"
options: { }
my_emby_server:
name: "mikeJones"
options: { }
# second user
- my_emby_server:
name: "jiji_jones"
options: { }
my_plex_server:
name: "jones_jiji"
options: { }
my_jellyfin_server:
name: "jijiJones"
options: { }
<question># How to regenerate tokens?</question>
If you want to regenerate tokens for PLEX users, you can use the <flag>--regenerate-tokens</flag> option.
<question># How to update user configuration?</question>
If you want to update the user configuration based on the main user configuration, you can use the <flag>--update</flag> option.
<question># Do i need to map the main user?</question>
No, There is no need, as the main user is already configured.
HELP,
[
'cmd' => trim(commandContext()),
'route' => self::ROUTE,
'path' => Config::get('path'),
]
)
);
}
/**
* Executes the command.
*
* @param iInput $input The input interface.
* @param iOutput $output The output interface.
*
* @return int The exit code. 0 for success, 1 for failure.
*/
protected function runCommand(iInput $input, iOutput $output): int
{
$supported = Config::get('supported', []);
$configFile = ConfigFile::open(Config::get('backends_file'), 'yaml');
$configFile->setLogger($this->logger);
$mapFile = Config::get('mapper_file');
$mapping = [];
if (file_exists($mapFile) && filesize($mapFile) > 10) {
$map = ConfigFile::open(Config::get('mapper_file'), 'yaml');
$mapping = $map->getAll();
}
$backends = [];
foreach ($configFile->getAll() as $backendName => $backend) {
$type = strtolower(ag($backend, 'type', 'unknown'));
if (!isset($supported[$type])) {
$this->logger->error("SYSTEM: Ignoring '{backend}'. Unexpected backend type '{type}'.", [
'type' => $type,
'backend' => $backendName,
'types' => implode(', ', array_keys($supported)),
]);
continue;
}
if (null === ($url = ag($backend, 'url')) || false === isValidURL($url)) {
$this->logger->error("SYSTEM: Ignoring '{backend}'. Invalid url '{url}'.", [
'url' => $url ?? 'None',
'backend' => $backendName,
]);
continue;
}
$backend['name'] = $backendName;
$backend['class'] = $this->getBackend($backendName, $backend)->setLogger($this->logger);
$backends[$backendName] = $backend;
}
if (empty($backends)) {
$this->logger->error('SYSTEM: No valid backends were found.');
return self::FAILURE;
}
$this->logger->notice("SYSTEM: Getting users list from '{backends}'.", [
'backends' => join(', ', array_keys($backends))
]);
$users = [];
foreach ($backends as $backend) {
/** @var iClient $client */
$client = ag($backend, 'class');
assert($backend instanceof iClient);
$this->logger->info("SYSTEM: Getting users from '{backend}'.", [
'backend' => $client->getContext()->backendName
]);
try {
foreach ($client->getUsersList() as $user) {
/** @var array $info */
$info = $backend;
$info['user'] = ag($user, 'id', ag($info, 'user'));
$info['backendName'] = r("{backend}_{user}", [
'backend' => ag($backend, 'name'),
'user' => ag($user, 'name'),
]);
$info['displayName'] = ag($user, 'name');
$info = ag_delete($info, 'options.' . Options::PLEX_USER_PIN);
$info = ag_delete($info, 'options.' . Options::ADMIN_TOKEN);
$info = ag_set($info, 'options.' . Options::ALT_NAME, ag($backend, 'name'));
$info = ag_set($info, 'options.' . Options::ALT_ID, ag($backend, 'user'));
if (PlexClient::CLIENT_NAME === ucfirst(ag($backend, 'type'))) {
$info = ag_set($info, 'token', 'reuse_or_generate_token');
$info = ag_set($info, 'options.' . Options::PLEX_USER_NAME, ag($user, 'name'));
$info = ag_set($info, 'options.' . Options::PLEX_USER_UUID, ag($user, 'uuid'));
}
$user['backend'] = ag($backend, 'name');
$user['client_data'] = $info;
$users[] = $user;
}
} catch (Throwable $e) {
$this->logger->error(
"Exception '{error.kind}' was thrown unhandled during '{client}: {backend}' get users list. '{error.message}' at '{error.file}:{error.line}'.",
[
'backend' => $client->getContext()->backendName,
'client' => $client->getContext()->clientName,
'error' => [
'kind' => $e::class,
'line' => $e->getLine(),
'message' => $e->getMessage(),
'file' => after($e->getFile(), ROOT_PATH),
],
'exception' => [
'file' => $e->getFile(),
'line' => $e->getLine(),
'kind' => get_class($e),
'message' => $e->getMessage(),
],
]
);
}
}
$users = $this->generate_users_list($users, $mapping);
if (count($users) < 1) {
$this->logger->warning('No users were found.');
return self::FAILURE;
}
$this->logger->notice("SYSTEM: User matching results {results}.", [
'results' => arrayToString($this->usersList($users)),
]);
foreach ($users as $user) {
$userName = ag($user, 'name', 'Unknown');
$subUserPath = r(fixPath(Config::get('path') . '/users/{user}'), ['user' => $userName]);
if (false === is_dir($subUserPath)) {
$this->logger->info("SYSTEM: Creating '{user}' directory '{path}'.", [
'user' => $userName,
'path' => $subUserPath
]);
if (false === mkdir($subUserPath, 0755, true)) {
$this->logger->error("SYSTEM: Failed to '{user}' directory '{path}'.", [
'user' => $userName,
'path' => $subUserPath
]);
continue;
}
}
$config_file = "{$subUserPath}/servers.yaml";
$this->logger->notice("SYSTEM: Creating '{user}' configuration file '{file}'.", [
'user' => $userName,
'file' => $config_file
]);
$perUser = ConfigFile::open($config_file, 'yaml', autoCreate: true);
$perUser->setLogger($this->logger);
$regenerateTokens = $input->getOption('regenerate-tokens');
foreach (ag($user, 'backends', []) as $backend) {
$name = ag($backend, 'client_data.backendName');
$clientData = ag_delete(ag($backend, 'client_data'), 'class');
$clientData['name'] = $name;
if (false === $perUser->has($name)) {
$data = $clientData;
$data = ag_set($data, 'import.lastSync', null);
$data = ag_set($data, 'export.lastSync', null);
$data = ag_delete($data, ['webhook', 'name', 'backendName', 'displayName']);
$perUser->set($name, $data);
} else {
$clientData = ag_delete($clientData, ['token', 'import.lastSync', 'export.lastSync']);
$clientData = array_replace_recursive($perUser->get($name), $clientData);
if ($input->getOption('update')) {
$this->logger->info("SYSTEM: Updating user configuration for '{user}@{name}' backend.", [
'name' => $name,
'user' => $userName,
]);
$perUser->set($name, $clientData);
}
}
try {
if (true === $regenerateTokens || 'reuse_or_generate_token' === ag($clientData, 'token')) {
/** @var iClient $client */
$client = ag($backend, 'client_data.class');
assert($client instanceof iClient);
if (PlexClient::CLIENT_NAME === $client->getType()) {
$clientData['token'] = $client->getUserToken(
ag($clientData, 'options.' . Options::PLEX_USER_UUID),
ag($clientData, 'options.' . Options::PLEX_USER_NAME)
);
$perUser->set("{$name}.token", $clientData['token']);
}
}
} catch (Throwable $e) {
$this->logger->error(
"Failed to generate access token for '{user}@{name}' backend. '{error}' at '{file}:{line}'.",
[
'name' => $name,
'user' => $userName,
'error' => [
'kind' => $e::class,
'line' => $e->getLine(),
'message' => $e->getMessage(),
'file' => after($e->getFile(), ROOT_PATH),
],
'exception' => [
'file' => $e->getFile(),
'line' => $e->getLine(),
'kind' => get_class($e),
'message' => $e->getMessage(),
],
]
);
continue;
}
}
$dbFile = r($subUserPath . "/{user}.db", ['user' => $userName]);
if (false === file_exists($dbFile)) {
$this->logger->notice("SYSTEM: Creating '{user}' database '{db}'.", [
'user' => $userName,
'db' => $dbFile
]);
perUserDb($userName);
}
$perUser->persist();
}
return self::SUCCESS;
}
/**
* Generate a list of users that are matched across all backends.
*
* @param array $users The list of users from all backends.
* @param array{string: array{string: string, options: array}} $map The map of users to match.
*
* @return array{name: string, backends: array<string, array<string, mixed>>}[] The list of matched users.
*/
private function generate_users_list(array $users, array $map = []): array
{
$allBackends = [];
foreach ($users as $u) {
if (!in_array($u['backend'], $allBackends, true)) {
$allBackends[] = $u['backend'];
}
}
// Build a lookup: $usersBy[backend][lowercased_name] = userObject
$usersBy = [];
foreach ($users as $user) {
$backend = $user['backend'];
$nameLower = strtolower($user['name']);
if (ag($user, 'id') === ag($user, 'client_data.options.' . Options::ALT_ID)) {
$this->logger->debug('Skipping main user "{name}".', ['name' => $user['name']]);
continue;
}
if (!isset($usersBy[$backend])) {
$usersBy[$backend] = [];
}
$usersBy[$backend][$nameLower] = $user;
}
$results = [];
// Track used combos: array of [backend, nameLower].
$used = [];
// Helper: check if a (backend, nameLower) is already used.
$alreadyUsed = fn(string $b, string $n): bool => in_array([$b, $n], $used, true);
/**
* Build a "unified" row from matched users across backends.
* - $backendDict example: [ 'backend1' => userObj, 'backend2' => userObj, ... ]
* - Picks a 'name' by "most frequent name" logic (with tie fallback).
*
* Returns an array shaped like:
* <code language="php">
* return [
* 'name' => 'something',
* 'backends' => [
* 'backend1' => userObj,
* 'backend2' => userObj,
* ...,
* ]
* ]
* </code>
*/
$buildUnifiedRow = function (array $backendDict) use ($allBackends): array {
// Collect the names in the order of $allBackends for tie-breaking.
$names = [];
foreach ($allBackends as $b) {
if (isset($backendDict[$b])) {
$names[] = $backendDict[$b]['name'];
}
}
// Tally frequencies
$freq = [];
foreach ($names as $n) {
if (!isset($freq[$n])) {
$freq[$n] = 0;
}
$freq[$n]++;
}
// Decide a final 'name'
if (empty($freq)) {
$finalName = 'unknown';
} else {
$max = max($freq);
$candidates = array_keys(array_filter($freq, fn($count) => $count === $max));
if (1 === count($candidates)) {
$finalName = $candidates[0];
} else {
// Tie => pick the first from $names thats in $candidates
$finalName = null;
foreach ($names as $n) {
if (in_array($n, $candidates, true)) {
$finalName = $n;
break;
}
}
if (!$finalName) {
$finalName = 'unknown';
}
}
}
// Build final row: "name" + sub-array "backends"
$row = [
'name' => $finalName,
'backends' => [],
];
// Fill 'backends'
foreach ($allBackends as $b) {
if (isset($backendDict[$b])) {
$row['backends'][$b] = $backendDict[$b];
}
}
return $row;
};
// Main logic: For each backend and each user in that backend, unify them if we find a match in ≥2 backends.
// We do map-based matching first, then direct-name matching.
foreach ($allBackends as $backend) {
if (!isset($usersBy[$backend])) {
continue;
}
// For each user in this backend
foreach ($usersBy[$backend] as $nameLower => $userObj) {
// Skip if already used
if ($alreadyUsed($backend, $nameLower)) {
continue;
}
// Map-based matching first
$matchedMapEntry = null;
foreach ($map as $mapRow) {
if (isset($mapRow[$backend]['name']) && strtolower($mapRow[$backend]['name']) === $nameLower) {
$matchedMapEntry = $mapRow;
break;
}
}
if ($matchedMapEntry) {
// Build mapMatch from the map row.
$mapMatch = [$backend => $userObj];
// Gather all the other backends from the map
foreach ($allBackends as $otherBackend) {
if ($otherBackend === $backend) {
continue;
}
if (isset($matchedMapEntry[$otherBackend]['name'])) {
$mappedNameLower = strtolower($matchedMapEntry[$otherBackend]['name']);
if (isset($usersBy[$otherBackend][$mappedNameLower])) {
$mapMatch[$otherBackend] = $usersBy[$otherBackend][$mappedNameLower];
}
}
}
// If we matched ≥ 2 backends, unify them
if (count($mapMatch) >= 2) {
// --- MERGE map-based "options" into client_data => options, if any ---
foreach ($mapMatch as $b => &$matchedUser) {
// If the map entry has an 'options' array for this backend,
// merge it into $matchedUser['client_data']['options'].
if (isset($matchedMapEntry[$b]['options']) && is_array($matchedMapEntry[$b]['options'])) {
$mapOptions = $matchedMapEntry[$b]['options'];
// Ensure $matchedUser['client_data'] is an array
if (!isset($matchedUser['client_data']) || !is_array($matchedUser['client_data'])) {
$matchedUser['client_data'] = [];
}
// Ensure $matchedUser['client_data']['options'] is an array
if (!isset($matchedUser['client_data']['options']) || !is_array(
$matchedUser['client_data']['options']
)) {
$matchedUser['client_data']['options'] = [];
}
// Merge the map's options
$matchedUser['client_data']['options'] = array_replace_recursive(
$matchedUser['client_data']['options'],
$mapOptions
);
}
}
unset($matchedUser); // break reference from the loop
// Build final row
$results[] = $buildUnifiedRow($mapMatch);
// Mark & remove from $usersBy
foreach ($mapMatch as $b => $mu) {
$nm = strtolower($mu['name']);
$used[] = [$b, $nm];
unset($usersBy[$b][$nm]);
}
continue;
} else {
$this->logger->error("No partial fallback match via map for '{backend}: {user}'", [
'backend' => $userObj['backend'],
'user' => $userObj['name'],
]);
}
}
// Direct-name matching if map fails
$directMatch = [$backend => $userObj];
foreach ($allBackends as $otherBackend) {
if ($otherBackend === $backend) {
continue;
}
// Same name => direct match
if (isset($usersBy[$otherBackend][$nameLower])) {
$directMatch[$otherBackend] = $usersBy[$otherBackend][$nameLower];
}
}
// If direct matched ≥ 2 backends, unify
if (count($directMatch) >= 2) {
// No map "options" to merge here
$results[] = $buildUnifiedRow($directMatch);
// Mark & remove them from $usersBy
foreach ($directMatch as $b => $matchedUser) {
$nm = strtolower($matchedUser['name']);
$used[] = [$b, $nm];
unset($usersBy[$b][$nm]);
}
continue;
}
// If neither map nor direct matched for ≥2
$this->logger->error("Cannot match user '{backend}: {user}' in any map row or direct match.", [
'backend' => $userObj['backend'],
'user' => $userObj['name']
]);
}
}
return $results;
}
private function usersList(array $list): array
{
$chunks = [];
foreach ($list as $row) {
$name = $row['name'] ?? 'unknown';
$pairs = [];
if (!empty($row['backends']) && is_array($row['backends'])) {
foreach ($row['backends'] as $backendName => $backendData) {
if (isset($backendData['name'])) {
$pairs[] = r("{name}@{backend}", ['backend' => $backendName, 'name' => $backendData['name']]);
}
}
}
$chunks[] = r("{name}: {pairs}", ['name' => $name, 'pairs' => implode(', ', $pairs)]);
}
return $chunks;
}
}

View File

@@ -14,7 +14,6 @@ use App\Libs\Mappers\ExtendedImportInterface as iEImport;
use App\Libs\Mappers\Import\DirectMapper;
use App\Libs\Options;
use App\Libs\Stream;
use DirectoryIterator;
use Psr\Http\Message\StreamInterface as iStream;
use Psr\Log\LoggerInterface as iLogger;
use Symfony\Component\Console\Input\InputInterface as iInput;
@@ -154,73 +153,6 @@ class BackupCommand extends Command
return $this->single(fn(): int => $this->process($input), $output);
}
private function getBackends(iInput $input): array
{
$configs = [
'main' => [
'config' => ConfigFile::open(Config::get('backends_file'), 'yaml'),
'mapper' => $this->mapper,
'cache' => null,
]
];
if (true === $input->getOption('only-main-user')) {
return $configs;
}
$usersDir = Config::get('path') . '/users';
if (false === is_dir($usersDir)) {
return $configs;
}
if (!is_readable($usersDir)) {
$this->logger->error("SYSTEM: Unable to read '{dir}' directory.", ['dir' => $usersDir]);
return $configs;
}
$mainUserIds = array_map(fn($backend) => ag($backend, 'user'), ag($configs, 'main.config')->getAll());
foreach (new DirectoryIterator(Config::get('path') . '/users') as $dir) {
if ($dir->isDot() || false === $dir->isDir()) {
continue;
}
$config = perUserConfig($dir->getBasename());
$subUserIds = array_map(fn($backend) => ag($backend, 'user'), $config->getAll());
foreach ($mainUserIds as $mainId) {
if (false === in_array($mainId, $subUserIds)) {
continue;
}
$this->logger->debug("SYSTEM: Skipping '{user}' backends as it's same as main user.", [
'user' => $dir->getBasename(),
'main' => $mainUserIds,
'sub' => $subUserIds,
]);
continue 2;
}
$userName = $dir->getBasename();
$perUserCache = perUserCacheAdapter($userName);
$configs[$userName] = [
'config' => $config,
'mapper' => $this->mapper->withDB(perUserDb($userName))
->withCache($perUserCache)
->withLogger($this->logger)
->withOptions(
array_replace_recursive($this->mapper->getOptions(), [Options::ALT_NAME => $userName])
)
->loadData(),
'cache' => $perUserCache,
];
}
return $configs;
}
/**
* Execute the command.
*
@@ -245,8 +177,13 @@ class BackupCommand extends Command
$this->mapper->setOptions(options: $mapperOpts);
}
$opts = [];
if (true === (bool)$input->getOption('only-main-user')) {
$opts = ['main_user_only' => true];
}
$this->logger->notice("Using WatchState version - '{version}'.", ['version' => getAppVersion()]);
foreach ($this->getBackends($input) as $user => $opt) {
foreach ($this->getUserData($this->mapper, $this->logger, $opts) as $user => $opt) {
try {
$this->process_backup($input, $user, $opt);
} finally {
@@ -449,7 +386,7 @@ class BackupCommand extends Command
gc_collect_cycles();
}
foreach ($list as $backend) {
foreach ($list as $b => $backend) {
if (null === ($backend['fp'] ?? null)) {
continue;
}
@@ -462,7 +399,8 @@ class BackupCommand extends Command
if (false === $noCompression) {
$file = $backend['fp']->getMetadata('uri');
$this->logger->notice("SYSTEM: Compressing '{user}@{file}'.", [
$this->logger->notice("SYSTEM: Compressing '{user}@{name}' backup file '{file}'.", [
'name' => $b,
'user' => $user,
'file' => $file
]);

View File

@@ -6,7 +6,6 @@ namespace App\Commands\State;
use App\Backends\Common\Cache as BackendCache;
use App\Backends\Common\ClientInterface as iClient;
use App\Backends\Plex\PlexClient;
use App\Command;
use App\Libs\Attributes\DI\Inject;
use App\Libs\Attributes\Route\Cli;
@@ -45,8 +44,6 @@ class SyncCommand extends Command
public const string TASK_NAME = 'sync';
private array $mapping = [];
/**
* Class Constructor.
*
@@ -92,59 +89,7 @@ class SyncCommand extends Command
InputOption::VALUE_NONE,
'Mapper option. Always update the locally stored metadata from backend.'
)
->addOption('regenerate-tokens', 'g', InputOption::VALUE_NONE, 'Generate new tokens for all users.')
->addOption('include-main-user', null, InputOption::VALUE_NONE, 'Include main user in sync.')
->setHelp(
r(
<<<HELP
pre-alpha command, not ready for production use. it's not working yet as expected,
Use it at your own risk.
-------
<notice>[ FAQ ]</notice>
-------
<question>Will this work with limited tokens?</question>
No, This requires admin token for plex backend, and API keys for jellyfin/emby.
We need the admin token for plex to generate user tokens for each user, and we need the API keys
for jellyfin/emby to get the user list and update their play state.
<question>Known limitations</question>
Known limitations:
* Cannot be used with plex users that have PIN enabled.
* Cannot sync play progress.
Some or all of these limitations will be fixed in future releases.
<question># How does this sync operation mode work?</question>
It works by first, getting all users from all backends, and trying to match them by name,
once we build a list of users that are matched, then we basically run the import/export for each user
using in memory storage, it should not have any impact on the real database and cache.
You can help the matching by using the mapper file, which is a simple YAML file that maps users from one
backend to another, this is useful when the usernames are different or when you want to merge users from
different backends into one user.
Example of a mapper.yaml file:
- backend1: "mike_james"
backend2: "james_mike"
- backend1: "john_doe"
backend2: "doe_john"
HELP,
[
'cmd' => trim(commandContext()),
'route' => self::ROUTE,
]
)
);
->addOption('include-main-user', 'M', InputOption::VALUE_NONE, 'Include main user in sync.');
}
/**
@@ -175,16 +120,6 @@ class SyncCommand extends Command
]);
}
$mapFile = Config::get('mapper_file');
if (file_exists($mapFile) && filesize($mapFile) > 10) {
$map = ConfigFile::open(Config::get('mapper_file'), 'yaml');
$this->mapping = $map->getAll();
}
$configFile = ConfigFile::open(Config::get('backends_file'), 'yaml');
$configFile->setLogger($this->logger);
$backends = [];
$selected = $input->getOption('select-backend');
$isCustom = !empty($selected) && count($selected) > 0;
$supported = Config::get('supported', []);
@@ -212,132 +147,178 @@ class SyncCommand extends Command
$this->mapper = $this->mapper->withOptions($mapperOpts);
}
foreach ($configFile->getAll() as $backendName => $backend) {
$type = strtolower(ag($backend, 'type', 'unknown'));
$userOpt = [
'no_main_user' => !$input->getOption('include-main-user'),
];
if ($isCustom && $input->getOption('exclude') === in_array($backendName, $selected)) {
$this->logger->info("SYSTEM: Ignoring '{backend}' as requested by [-s, --select-backend].", [
'backend' => $backendName
]);
continue;
}
if (true !== (bool)ag($backend, 'import.enabled')) {
$this->logger->info("SYSTEM: Ignoring '{backend}' as the backend has import disabled.", [
'backend' => $backendName
]);
continue;
}
if (true !== (bool)ag($backend, 'export.enabled')) {
$this->logger->info("SYSTEM: Ignoring '{backend}' as the backend has export disabled.", [
'backend' => $backendName
]);
continue;
}
if (!isset($supported[$type])) {
$this->logger->error(
"SYSTEM: Ignoring '{backend}' due to unexpected type '{type}'. Expecting '{types}'.",
[
'type' => $type,
'backend' => $backendName,
'types' => implode(', ', array_keys($supported)),
]
);
continue;
}
if (null === ($url = ag($backend, 'url')) || false === isValidURL($url)) {
$this->logger->error("SYSTEM: Ignoring '{backend}' due to invalid URL. '{url}'.", [
'url' => $url ?? 'None',
'backend' => $backendName,
]);
continue;
}
$backend['name'] = $backendName;
$backends[$backendName] = $backend;
}
$backends = $this->getUserData($this->mapper, $this->logger, $userOpt);
if (empty($backends)) {
$this->logger->warning('No backends were found.');
return self::FAILURE;
$this->logger->warning('No users were found. Please create sub users via the backends:create command.');
return self::SUCCESS;
}
foreach ($backends as &$backend) {
if (null === ($name = ag($backend, 'name'))) {
continue;
}
$opts = ag($backend, 'options', []);
if ($input->getOption('ignore-date')) {
$opts[Options::IGNORE_DATE] = true;
}
if ($input->getOption('trace')) {
$opts[Options::DEBUG_TRACE] = true;
}
if ($input->getOption('dry-run')) {
$opts[Options::DRY_RUN] = true;
}
if ($input->getOption('timeout')) {
$opts['client']['timeout'] = $input->getOption('timeout');
}
$backend['options'] = $opts;
$backend['class'] = $this->getBackend($name, $backend)->setLogger($this->logger);
}
unset($backend);
$this->logger->notice("SYSTEM: Getting users list from '{backends}'.", [
'backends' => join(', ', array_map(fn($backend) => $backend['name'], $backends))
]);
$users = [];
foreach ($backends as $backend) {
/** @var iClient $client */
$client = ag($backend, 'class');
assert($backend instanceof iClient);
$this->logger->info("SYSTEM: Getting users from '{backend}'.", [
'backend' => $client->getContext()->backendName
]);
foreach ($backends as $user => $userConf) {
try {
foreach ($client->getUsersList() as $user) {
/** @var array $info */
$info = $backend;
$info['user'] = ag($user, 'id', ag($info, 'user'));
$info['backendName'] = r("{backend}_{user}", [
'backend' => ag($backend, 'name'),
'user' => ag($user, 'name'),
]);
$info['displayName'] = ag($user, 'name');
$info = ag_delete($info, 'options.' . Options::PLEX_USER_PIN);
$info = ag_delete($info, 'options.' . Options::ADMIN_TOKEN);
$info = ag_set($info, 'options.' . Options::ALT_NAME, ag($backend, 'name'));
$info = ag_set($info, 'options.' . Options::ALT_ID, ag($backend, 'user'));
if (PlexClient::CLIENT_NAME === ucfirst(ag($backend, 'type'))) {
$info = ag_set($info, 'token', 'reuse_or_generate_token');
$info = ag_set($info, 'options.' . Options::PLEX_USER_NAME, ag($user, 'name'));
$info = ag_set($info, 'options.' . Options::PLEX_USER_UUID, ag($user, 'uuid'));
$this->queue->reset();
$config = ag($userConf, 'config');
assert($config instanceof ConfigFile);
$list = [];
foreach ($config->getAll() as $backendName => $backend) {
$type = strtolower(ag($backend, 'type', 'unknown'));
if ($isCustom && $input->getOption('exclude') === $this->in_array($selected, $backendName)) {
$this->logger->info("SYSTEM: Ignoring '{user}@{backend}' as requested.", [
'user' => $user,
'backend' => $backendName
]);
continue;
}
$user['backend'] = ag($backend, 'name');
$user['client_data'] = $info;
$users[] = $user;
if (true !== (bool)ag($backend, 'import.enabled')) {
$this->logger->info("SYSTEM: Ignoring '{user}@{backend}'. Import disabled.", [
'user' => $user,
'backend' => $backendName
]);
continue;
}
if (!isset($supported[$type])) {
$this->logger->error("SYSTEM: Ignoring '{user}@{backend}'. Unexpected type '{type}'.", [
'user' => $user,
'type' => $type,
'backend' => $backendName,
]);
continue;
}
if (null === ($url = ag($backend, 'url')) || false === isValidURL($url)) {
$this->logger->error("SYSTEM: Ignoring '{user}@{backend}'. Invalid URL '{url}'.", [
'user' => $user,
'url' => $url ?? 'None',
'backend' => $backendName,
]);
continue;
}
$opts = ag($backend, 'options', []);
if ($input->getOption('ignore-date')) {
$opts = ag_set($opts, Options::IGNORE_DATE, true);
}
if ($input->getOption('trace')) {
$opts = ag_set($opts, Options::DEBUG_TRACE, true);
}
if ($input->getOption('dry-run')) {
$opts = ag_set($opts, Options::DRY_RUN, true);
}
if ($input->getOption('timeout')) {
$opts = ag_set($opts, 'client.timeout', $input->getOption('timeout'));
}
$backend['options'] = $opts;
$backend['name'] = $backendName;
$backend['class'] = makeBackend($backend, $backendName, [
BackendCache::class => Container::get(BackendCache::class)->with(
adapter: ag($userConf, 'cache')
)
])->setLogger($this->logger);
$list[$backendName] = $backend;
}
if (empty($list)) {
$this->logger->warning(
$isCustom ? '[-s, --select-backend] flag did not match any backend.' : 'No backends were found.'
);
continue;
}
$start = makeDate();
$this->logger->notice("SYSTEM: Syncing user '{user}: {list}'.", [
'user' => $user,
'list' => join(', ', array_keys($list)),
'started' => $start,
]);
/** @var iEImport $mapper */
$mapper = ag($userConf, 'mapper');
assert($mapper instanceof iEImport);
$this->handleImport($mapper, $user, $list, $input->getOption('force-full'), $config);
$changes = $mapper->computeChanges(array_keys($list));
foreach ($changes as $b => $changed) {
$count = count($changed);
if ($count < 1) {
continue;
}
$this->logger->notice("SYSTEM: '{changes}' changes detected for '{name}@{backend}'.", [
'name' => $user,
'backend' => $b,
'changes' => $count,
'items' => array_map(
fn(iState $i) => [
'title' => $i->getName(),
'state' => $i->isWatched() ? 'played' : 'unplayed',
'meta' => $i->isSynced(array_keys($list)),
],
$changed
)
]);
/** @var iClient $client */
$client = $list[$b]['class'];
$client->updateState($changed, $this->queue);
}
$this->handleExport($user, $list);
$end = makeDate();
$this->logger->notice("SYSTEM: Completed syncing user '{name}: {list}' in '{time.duration}'s", [
'name' => $user,
'list' => join(', ', array_keys($list)),
'time' => [
'start' => $start,
'end' => $end,
'duration' => $end->getTimestamp() - $start->getTimestamp(),
],
'memory' => [
'now' => getMemoryUsage(),
'peak' => getPeakMemoryUsage(),
],
]);
// -- Release memory.
if (false === $input->getOption('dry-run')) {
$mapper->commit();
foreach ($list as $b => $_) {
$config->set("{$b}.import.lastSync", time());
$config->set("{$b}.export.lastSync", time());
}
$config->persist();
} else {
$mapper->reset();
}
$this->logger->info("SYSTEM: Memory usage after reset '{memory}'.", [
'memory' => getMemoryUsage(),
]);
} catch (Throwable $e) {
$this->logger->error(
"Exception '{error.kind}' was thrown unhandled during '{client}: {backend}' get users list. '{error.message}' at '{error.file}:{error.line}'.",
"SYSTEM: Exception '{error.kind}' was thrown unhandled during '{name}' sync. '{error.message}' at '{error.file}:{error.line}'.",
[
'backend' => $client->getContext()->backendName,
'client' => $client->getContext()->clientName,
'name' => $user,
'error' => [
'kind' => $e::class,
'line' => $e->getLine(),
@@ -355,178 +336,6 @@ class SyncCommand extends Command
}
}
$users = $this->generate_users_list($users, $input->getOption('include-main-user'), $this->mapping);
if (count($users) < 1) {
$this->logger->warning('No users were found.');
return self::FAILURE;
}
$this->logger->notice("SYSTEM: User matching results {results}.", [
'results' => arrayToString($this->usersList($users)),
]);
foreach ($users as $user) {
$this->queue->reset();
$userName = ag($user, 'name', 'Unknown');
$this->logger->info("SYSTEM: Loading '{user}' mapper data. Current memory usage '{memory}'.", [
'user' => $userName,
'memory' => getMemoryUsage(),
]);
$perUserCache = perUserCacheAdapter($userName);
$perUserMapper = $this->mapper->withDB(perUserDb($userName))
->withCache($perUserCache)
->withLogger($this->logger)
->withOptions(array_replace_recursive($this->mapper->getOptions(), [Options::ALT_NAME => $userName]))
->loadData();
$this->logger->info("SYSTEM: loading of '{user}' mapper data completed using '{memory}' of memory.", [
'user' => $userName,
'memory' => getMemoryUsage(),
]);
$list = [];
$displayName = null;
$perUser = ConfigFile::open(r(fixPath(Config::get('path') . '/users/{user}/servers.yaml'), [
'user' => $userName
]), 'yaml', autoSave: true, autoCreate: true);
$perUser->setLogger($this->logger);
$regenerateTokens = $input->getOption('regenerate-tokens');
foreach (ag($user, 'backends', []) as $backend) {
$name = ag($backend, 'client_data.backendName');
$clientData = ag($backend, 'client_data');
$clientData['name'] = $name;
if (false === $perUser->has($name)) {
$data = $clientData;
$data = ag_set($data, 'import.lastSync', null);
$data = ag_set($data, 'export.lastSync', null);
$data = ag_delete($data, ['webhook', 'name', 'backendName', 'displayName']);
$perUser->set($name, $data);
} else {
$clientData = ag_delete($clientData, ['token', 'import.lastSync', 'export.lastSync']);
$clientData = array_replace_recursive($perUser->get($name), $clientData);
}
try {
if (true === $regenerateTokens || 'reuse_or_generate_token' === ag($clientData, 'token')) {
/** @var iClient $client */
$client = ag($backend, 'client_data.class');
assert($client instanceof iClient);
if (PlexClient::CLIENT_NAME === $client->getType()) {
$clientData['token'] = $client->getUserToken(
ag($clientData, 'options.' . Options::PLEX_USER_UUID),
ag($clientData, 'options.' . Options::PLEX_USER_NAME)
);
$perUser->set("{$name}.token", $clientData['token']);
}
}
} catch (Throwable $e) {
$this->logger->error(
"Failed to generate access token for '{user}: {name}' backend. '{error}' at '{file}:{line}'.",
[
'name' => $name,
'user' => $userName,
'error' => [
'kind' => $e::class,
'line' => $e->getLine(),
'message' => $e->getMessage(),
'file' => after($e->getFile(), ROOT_PATH),
],
'exception' => [
'file' => $e->getFile(),
'line' => $e->getLine(),
'kind' => get_class($e),
'message' => $e->getMessage(),
],
]
);
continue;
}
$clientData['class'] = makeBackend($clientData, $name, [
BackendCache::class => Container::get(BackendCache::class)->with(adapter: $perUserCache)
])->setLogger($this->logger);
$list[$name] = $clientData;
$displayName = ag($backend, 'client_data.displayName', '??');
if (false === $input->getOption('dry-run')) {
$perUser->set("{$name}.import.lastSync", time());
$perUser->set("{$name}.export.lastSync", time());
}
}
$start = makeDate();
$this->logger->notice("SYSTEM: Syncing user '{user}' -> '{list}'.", [
'user' => $displayName,
'list' => join(', ', array_keys($list)),
'started' => $start,
]);
assert($perUserMapper instanceof iEImport);
$this->handleImport($perUserMapper, $displayName, $list, $input->getOption('force-full'), $perUser);
assert($perUserMapper instanceof MemoryMapper);
/** @var MemoryMapper $changes */
$changes = $perUserMapper->computeChanges(array_keys($list));
foreach ($changes as $b => $changed) {
$count = count($changed);
if ($count >= 1) {
$this->logger->notice("SYSTEM: Changes detected for '{name}: {backend}' are '{changes}'.", [
'name' => $displayName,
'backend' => $b,
'changes' => $count,
'items' => array_map(
fn(iState $i) => [
'title' => $i->getName(),
'state' => $i->isWatched() ? 'played' : 'unplayed',
'meta' => $i->isSynced(array_keys($list)),
],
$changed
)
]);
/** @var iClient $client */
$client = $list[$b]['class'];
$client->updateState($changed, $this->queue);
}
}
$this->handleExport($displayName, ag($user, 'backends', []));
$end = makeDate();
$this->logger->notice("SYSTEM: Completed syncing user '{name}' -> '{list}' in '{time.duration}'s", [
'name' => $displayName,
'list' => join(', ', array_keys($list)),
'time' => [
'start' => $start,
'end' => $end,
'duration' => $end->getTimestamp() - $start->getTimestamp(),
],
'memory' => [
'now' => getMemoryUsage(),
'peak' => getPeakMemoryUsage(),
],
]);
// -- Release memory.
if (false === $input->getOption('dry-run')) {
$perUserMapper->commit();
} else {
$perUserMapper->reset();
}
$this->logger->info("SYSTEM: Memory usage after reset '{memory}'.", [
'memory' => getMemoryUsage(),
]);
$perUser->persist();
}
return self::SUCCESS;
}
@@ -540,22 +349,41 @@ class SyncCommand extends Command
/** @var array<array-key,ResponseInterface> $queue */
$queue = [];
$this->logger->info("SYSTEM: Loading '{user}' mapper data. Current memory usage '{memory}'.", [
'user' => $name,
'memory' => getMemoryUsage(),
]);
$mapper->loadData();
$this->logger->info("SYSTEM: loading of '{user}' mapper data '{count}' completed using '{memory}' of memory.", [
'user' => $name,
'count' => $mapper->count(),
'memory' => getMemoryUsage(),
]);
foreach ($backends as $backend) {
/** @var iClient $client */
$client = ag($backend, 'class');
assert($client instanceof iClient);
$context = $client->getContext();
$after = ag($context->options, Options::FORCE_FULL) || $isFull ? null : $config->get(
$context->backendName . '.import.lastSync'
);
if (true === $isFull || ag($context->options, Options::FORCE_FULL)) {
$after = null;
} else {
$after = $config->get($context->backendName . '.import.lastSync');
}
if (null !== $after) {
$after = makeDate($after);
}
array_push($queue, ...$client->pull(mapper: $mapper, after: $after));
}
$start = makeDate();
$this->logger->notice("SYSTEM: Waiting on '{total}' requests for import '{name}' data.", [
$this->logger->notice("SYSTEM: Waiting on '{total}' requests for '{name}: {backends}' data.", [
'name' => $name,
'backends' => join(', ', array_keys($backends)),
'total' => number_format(count($queue)),
'time' => [
'start' => $start,
@@ -582,9 +410,10 @@ class SyncCommand extends Command
$end = makeDate();
$this->logger->notice(
"SYSTEM: Completed waiting on '{total}' requests in '{time.duration}'s for importing '{name}' data. Parsed '{responses.size}' of data.",
"SYSTEM: Finished waiting on '{total}' requests in '{time.duration}'s for importing '{name}: {backends}' data. Parsed '{responses.size}' of data.",
[
'name' => $name,
'backends' => join(', ', array_keys($backends)),
'total' => number_format(count($queue)),
'time' => [
'start' => $start,
@@ -615,9 +444,10 @@ class SyncCommand extends Command
return;
}
$this->logger->notice("SYSTEM: Sending '{total}' change play state requests for '{name}'.", [
$this->logger->notice("SYSTEM: Sending '{total}' change play state requests for '{name}: {backends}'.", [
'name' => $name,
'total' => $total
'total' => $total,
'backends' => join(', ', array_keys($backends)),
]);
foreach ($this->queue->getQueue() as $response) {
@@ -626,7 +456,7 @@ class SyncCommand extends Command
try {
if (200 !== ($statusCode = $response->getStatusCode())) {
$this->logger->error(
"Request to change '{name}: {backend}' '{item.title}' play state returned with unexpected '{status_code}' status code.",
"Request to change '{name}@{backend}' '{item.title}' play state returned with unexpected '{status_code}' status code.",
[
'name' => $name,
'status_code' => $statusCode,
@@ -636,13 +466,13 @@ class SyncCommand extends Command
continue;
}
$this->logger->notice("Marked '{name}: {backend}' '{item.title}' as '{play_state}'.", [
$this->logger->notice("Marked '{name}@{backend}' '{item.title}' as '{play_state}'.", [
'name' => $name,
...$context
]);
} catch (Throwable $e) {
$this->logger->error(
message: "Exception '{error.kind}' was thrown unhandled during '{name}: {backend}' request to change play state of {item.type} '{item.title}'. '{error.message}' at '{error.file}:{error.line}'.",
message: "Exception '{error.kind}' was thrown unhandled during '{name}@{backend}' request to change play state of {item.type} '{item.title}'. '{error.message}' at '{error.file}:{error.line}'.",
context: [
'name' => $name,
'error' => [
@@ -663,271 +493,15 @@ class SyncCommand extends Command
}
}
$this->logger->notice("SYSTEM: Sent '{total}' change play state requests for '{name}'.", [
$this->logger->notice("SYSTEM: Sent '{total}' change play state requests for '{name}: {backends}'.", [
'name' => $name,
'total' => $total
'total' => $total,
'backends' => join(', ', array_keys($backends)),
]);
}
/**
* Generate a list of users that are matched across all backends.
*
* @param array $users The list of users from all backends.
* @param array{string: array{string: string, options: array}} $map The map of users to match.
*
* @return array{name: string, backends: array<string, array<string, mixed>>}[] The list of matched users.
*/
private function generate_users_list(array $users, bool $includeMainUser, array $map = []): array
private function in_array(array $haystack, string $needle): bool
{
$allBackends = [];
foreach ($users as $u) {
if (!in_array($u['backend'], $allBackends, true)) {
$allBackends[] = $u['backend'];
}
}
// Build a lookup: $usersBy[backend][lowercased_name] = userObject
$usersBy = [];
foreach ($users as $user) {
$backend = $user['backend'];
$nameLower = strtolower($user['name']);
if (false === $includeMainUser && ag($user, 'id') === ag($user, 'client_data.options.' . Options::ALT_ID)) {
$this->logger->debug('Skipping main user "{name}" from sync.', ['name' => $user['name']]);
continue;
}
if (!isset($usersBy[$backend])) {
$usersBy[$backend] = [];
}
$usersBy[$backend][$nameLower] = $user;
}
$results = [];
// Track used combos: array of [backend, nameLower].
$used = [];
// Helper: check if a (backend, nameLower) is already used.
$alreadyUsed = fn(string $b, string $n): bool => in_array([$b, $n], $used, true);
/**
* Build a "unified" row from matched users across backends.
* - $backendDict example: [ 'backend1' => userObj, 'backend2' => userObj, ... ]
* - Picks a 'name' by "most frequent name" logic (with tie fallback).
*
* Returns an array shaped like:
* <code language="php">
* return [
* 'name' => 'something',
* 'backends' => [
* 'backend1' => userObj,
* 'backend2' => userObj,
* ...,
* ]
* ]
* </code>
*/
$buildUnifiedRow = function (array $backendDict) use ($allBackends): array {
// Collect the names in the order of $allBackends for tie-breaking.
$names = [];
foreach ($allBackends as $b) {
if (isset($backendDict[$b])) {
$names[] = $backendDict[$b]['name'];
}
}
// Tally frequencies
$freq = [];
foreach ($names as $n) {
if (!isset($freq[$n])) {
$freq[$n] = 0;
}
$freq[$n]++;
}
// Decide a final 'name'
if (empty($freq)) {
$finalName = 'unknown';
} else {
$max = max($freq);
$candidates = array_keys(array_filter($freq, fn($count) => $count === $max));
if (1 === count($candidates)) {
$finalName = $candidates[0];
} else {
// Tie => pick the first from $names thats in $candidates
$finalName = null;
foreach ($names as $n) {
if (in_array($n, $candidates, true)) {
$finalName = $n;
break;
}
}
if (!$finalName) {
$finalName = 'unknown';
}
}
}
// Build final row: "name" + sub-array "backends"
$row = [
'name' => $finalName,
'backends' => [],
];
// Fill 'backends'
foreach ($allBackends as $b) {
if (isset($backendDict[$b])) {
$row['backends'][$b] = $backendDict[$b];
}
}
return $row;
};
// Main logic: For each backend and each user in that backend, unify them if we find a match in ≥2 backends.
// We do map-based matching first, then direct-name matching.
foreach ($allBackends as $backend) {
if (!isset($usersBy[$backend])) {
continue;
}
// For each user in this backend
foreach ($usersBy[$backend] as $nameLower => $userObj) {
// Skip if already used
if ($alreadyUsed($backend, $nameLower)) {
continue;
}
// Map-based matching first
$matchedMapEntry = null;
foreach ($map as $mapRow) {
if (isset($mapRow[$backend]['name']) && strtolower($mapRow[$backend]['name']) === $nameLower) {
$matchedMapEntry = $mapRow;
break;
}
}
if ($matchedMapEntry) {
// Build mapMatch from the map row.
$mapMatch = [$backend => $userObj];
// Gather all the other backends from the map
foreach ($allBackends as $otherBackend) {
if ($otherBackend === $backend) {
continue;
}
if (isset($matchedMapEntry[$otherBackend]['name'])) {
$mappedNameLower = strtolower($matchedMapEntry[$otherBackend]['name']);
if (isset($usersBy[$otherBackend][$mappedNameLower])) {
$mapMatch[$otherBackend] = $usersBy[$otherBackend][$mappedNameLower];
}
}
}
// If we matched ≥ 2 backends, unify them
if (count($mapMatch) >= 2) {
// --- MERGE map-based "options" into client_data => options, if any ---
foreach ($mapMatch as $b => &$matchedUser) {
// If the map entry has an 'options' array for this backend,
// merge it into $matchedUser['client_data']['options'].
if (isset($matchedMapEntry[$b]['options']) && is_array($matchedMapEntry[$b]['options'])) {
$mapOptions = $matchedMapEntry[$b]['options'];
// Ensure $matchedUser['client_data'] is an array
if (!isset($matchedUser['client_data']) || !is_array($matchedUser['client_data'])) {
$matchedUser['client_data'] = [];
}
// Ensure $matchedUser['client_data']['options'] is an array
if (!isset($matchedUser['client_data']['options']) || !is_array(
$matchedUser['client_data']['options']
)) {
$matchedUser['client_data']['options'] = [];
}
// Merge the map's options
$matchedUser['client_data']['options'] = array_replace_recursive(
$matchedUser['client_data']['options'],
$mapOptions
);
}
}
unset($matchedUser); // break reference from the loop
// Build final row
$results[] = $buildUnifiedRow($mapMatch);
// Mark & remove from $usersBy
foreach ($mapMatch as $b => $mu) {
$nm = strtolower($mu['name']);
$used[] = [$b, $nm];
unset($usersBy[$b][$nm]);
}
continue;
} else {
$this->logger->error("No partial fallback match via map for '{backend}: {user}'", [
'backend' => $userObj['backend'],
'user' => $userObj['name'],
]);
}
}
// Direct-name matching if map fails
$directMatch = [$backend => $userObj];
foreach ($allBackends as $otherBackend) {
if ($otherBackend === $backend) {
continue;
}
// Same name => direct match
if (isset($usersBy[$otherBackend][$nameLower])) {
$directMatch[$otherBackend] = $usersBy[$otherBackend][$nameLower];
}
}
// If direct matched ≥ 2 backends, unify
if (count($directMatch) >= 2) {
// No map "options" to merge here
$results[] = $buildUnifiedRow($directMatch);
// Mark & remove them from $usersBy
foreach ($directMatch as $b => $matchedUser) {
$nm = strtolower($matchedUser['name']);
$used[] = [$b, $nm];
unset($usersBy[$b][$nm]);
}
continue;
}
// If neither map nor direct matched for ≥2
$this->logger->error("Cannot match user '{backend}: {user}' in any map row or direct match.", [
'backend' => $userObj['backend'],
'user' => $userObj['name']
]);
}
}
return $results;
}
private function usersList(array $list): array
{
$chunks = [];
foreach ($list as $row) {
$name = $row['name'] ?? 'unknown';
$pairs = [];
if (!empty($row['backends']) && is_array($row['backends'])) {
foreach ($row['backends'] as $backendName => $backendData) {
if (isset($backendData['name'])) {
$pairs[] = r("{name}@{backend}", ['backend' => $backendName, 'name' => $backendData['name']]);
}
}
}
$chunks[] = r("{name}: {pairs}", ['name' => $name, 'pairs' => implode(', ', $pairs)]);
}
return $chunks;
return array_any($haystack, fn($item) => str_starts_with($item, $needle));
}
}

View File

@@ -392,20 +392,22 @@ class MemoryMapper implements ExtendedImportInterface
Message::increment("{$entity->via}.{$entity->type}.ignored_not_played_since_last_sync");
if ($entity->isWatched() !== $this->objects[$pointer]->isWatched()) {
$this->logger->notice(
"{mapper}: [O] '{backend}' item '{id}: {title}' is marked as '{state}' vs local state '{local_state}', However due to the remote item date '{remote_date}' being older than the last backend sync date '{local_date}'. it was not considered as valid state.",
[
'mapper' => afterLast(self::class, '\\'),
'id' => $this->objects[$pointer]->id,
'backend' => $entity->via,
'remote_date' => makeDate($entity->updated),
'local_date' => makeDate($opts['after']),
'state' => $entity->isWatched() ? 'played' : 'unplayed',
'local_state' => $this->objects[$pointer]->isWatched() ? 'played' : 'unplayed',
'title' => $entity->getName(),
]
);
return $this;
if ($this->inTraceMode()) {
$this->logger->debug(
"{mapper}: [O] '{backend}' item '{id}: {title}' is marked as '{state}' vs local state '{local_state}', However due to the remote item date '{remote_date}' being older than the last backend sync date '{local_date}'. it was not considered as valid state.",
[
'mapper' => afterLast(self::class, '\\'),
'id' => $this->objects[$pointer]->id,
'backend' => $entity->via,
'remote_date' => makeDate($entity->updated),
'local_date' => makeDate($opts['after']),
'state' => $entity->isWatched() ? 'played' : 'unplayed',
'local_state' => $this->objects[$pointer]->isWatched() ? 'played' : 'unplayed',
'title' => $entity->getName(),
]
);
}
return $this->handleTainted($pointer, $cloned, $entity, $opts);
}
if ($this->inTraceMode()) {

View File

@@ -2214,7 +2214,18 @@ if (!function_exists('perUserDb')) {
}
}
$dbFile = fixPath(r("{path}/{user}.db", ['path' => $path, 'user' => $user]));
$dbFile = fixPath(r("{path}/user.db", ['path' => $path]));
$oldDb = fixPath(r("{path}/{user}.db", ['path' => $path, 'user' => $user]));
if (true === file_exists($oldDb)) {
if (false === file_exists($dbFile)) {
rename($oldDb, $dbFile);
clearstatcache(true, $oldDb);
clearstatcache(true, $dbFile);
} else {
unlink($oldDb);
}
}
$inTestMode = true === (defined('IN_TEST_MODE') && true === IN_TEST_MODE);
$dsn = r('sqlite:{src}', ['src' => $inTestMode ? ':memory:' : $dbFile]);
@@ -2261,7 +2272,7 @@ if (!function_exists('perUserConfig')) {
}
}
return ConfigFile::open(fixPath(r("{path}/servers.yaml", ['path' => $path])), 'yaml');
return ConfigFile::open(fixPath(r("{path}/servers.yaml", ['path' => $path])), 'yaml', autoCreate: true);
}
}
@@ -2283,36 +2294,7 @@ if (!function_exists('perUserCacheAdapter')) {
}
try {
$cacheUrl = Config::get('cache.url');
if (empty($cacheUrl)) {
throw new RuntimeException('No cache server was set.');
}
if (!extension_loaded('redis')) {
throw new RuntimeException('Redis extension is not loaded.');
}
$uri = new Uri($cacheUrl);
$params = [];
if (!empty($uri->getQuery())) {
parse_str($uri->getQuery(), $params);
}
$redis = new Redis();
$redis->connect($uri->getHost(), $uri->getPort() ?? 6379);
if (null !== ag($params, 'password')) {
$redis->auth(ag($params, 'password'));
}
if (null !== ag($params, 'db')) {
$redis->select((int)ag($params, 'db'));
}
$backend = new RedisAdapter(redis: $redis, namespace: $ns);
$backend = new RedisAdapter(redis: Container::get(Redis::class), namespace: $ns);
} catch (Throwable) {
// -- in case of error, fallback to file system cache.
$path = fixPath(r("{path}/users/{user}/cache", ['path' => Config::get('path'), 'user' => $user]));
@@ -2419,3 +2401,4 @@ if (!function_exists('readFileFromArchive')) {
return [Stream::make($stream, 'r'), $zip];
}
}