Added initial support for multi-user play state sync. this feature still in alpha stage.
This commit is contained in:
55
FAQ.md
55
FAQ.md
@@ -211,23 +211,54 @@ database state back to the selected backend.
|
||||
|
||||
### Is there support for Multi-user setup?
|
||||
|
||||
No, The tool is designed to work for single user. However, It's possible to run container for each user. You can also
|
||||
use single container for all users, however it's not really easy refer
|
||||
to [issue #136](https://github.com/arabcoders/watchstate/issues/136).
|
||||
There is a minimal support for multi-user setup via `state:sync` command. However, it still requires that you add your
|
||||
backends as usual for single user setup and to use `state:sync` command, it's required that all backends have admin
|
||||
access to be able to retrieve access-tokens for users. That means for Plex you need an admin token, and for
|
||||
jellyfin/emby you need API key, not `user:password` limited access.
|
||||
|
||||
For `Jellyfin` and `Emby`, you can just generate new API tokens and link it to a user.
|
||||
To get started using `state:sync` command, as mentioned before setup your backends as normal, then create a
|
||||
`/config/config/mapper.yaml` file if your backends doesn't have the same user. for example
|
||||
|
||||
For Plex, You should use your admin token and by running the `config:add` command and selecting a user the tool will
|
||||
attempt to generate a token for that user.
|
||||
```yaml
|
||||
- backend_name1:
|
||||
name: "mike_jones"
|
||||
options: { }
|
||||
backend_name2:
|
||||
name: "jones_mike"
|
||||
options: { }
|
||||
backend_name3:
|
||||
name: "mikeJones"
|
||||
options: { }
|
||||
|
||||
> [!Note]
|
||||
> If the tool fails to generate an access token for the user, you can run the following command to generate the access
|
||||
> token manually.
|
||||
|
||||
```bash
|
||||
$ docker exec -ti console backend:users:list -s backend_name --with-tokens
|
||||
- backend_name1:
|
||||
name: "jiji_jones"
|
||||
options: { }
|
||||
backend_name2:
|
||||
name: "jones_jiji"
|
||||
options: { }
|
||||
backend_name3:
|
||||
name: "jijiJones"
|
||||
options: { }
|
||||
```
|
||||
|
||||
This yaml file helps map your users accounts in the different backends, so the tool can sync the correct user data.
|
||||
|
||||
Then simply run `state:sync -v` it will generate the required tokens and match users data between the backends.
|
||||
then sync the difference, Keep in mind that it will be slow and that's expected as it needs to do the same thing without
|
||||
caching for all users servers and backends. it's recommended to not run this command frequently. as it's puts a lot of
|
||||
load on the backends. By default, it will sync once every 3 hours. you can ofc change it to suit your needs.
|
||||
|
||||
> [!NOTE]
|
||||
> Known issues:
|
||||
|
||||
* Currently, state:sync doesn't have a way of syncing plex users that has PIN enabled.
|
||||
* Majority of the command flags aren't working or not implemented yet.
|
||||
|
||||
> [!IMPORTANT]
|
||||
> Please keep in mind the new command is still in alpha stage, so things will probably break. Please report any bugs
|
||||
> you encounter. Also, please make sure to have a backup of your data before running the command. just in-case,
|
||||
> while we did test it on our live data, it's always better to be safe than sorry.
|
||||
|
||||
----
|
||||
|
||||
### How do i migrate invited friends i.e. (external user) data from from plex to emby/jellyfin?
|
||||
|
||||
13
README.md
13
README.md
@@ -9,6 +9,19 @@ out of the box, this tool support `Jellyfin`, `Plex` and `Emby` media servers.
|
||||
|
||||
## Updates
|
||||
|
||||
### 2025-01-18
|
||||
|
||||
Due to popular demand, we finally have added the ability to sync all users data, however, it's limited to only
|
||||
play state, no progress syncing implemented at this stage. This feature still in alpha expect bugs and issues.
|
||||
|
||||
However our local tests shows that it's working as expected, but we need more testing to be sure. Please report any
|
||||
issues you encounter. To enable this feature, you will see new task in the `Tasks` page called `Sync`.
|
||||
|
||||
This task will sync all your users play state, However you need to have the backends added with admin token for plex and
|
||||
API key for jellyfin and emby. Enable the task and let it run, it will sync all users play state.
|
||||
|
||||
Please read the FAQ entry about it at [this link](FAQ.md#is-there-support-for-multi-user-setup).
|
||||
|
||||
### 2024-12-30
|
||||
|
||||
We have removed the old environment variables `WS_CRON_PROGRESS` and `WS_CRON_PUSH` in favor of the new ones
|
||||
|
||||
@@ -9,6 +9,7 @@ use App\Commands\Events\DispatchCommand;
|
||||
use App\Commands\State\BackupCommand;
|
||||
use App\Commands\State\ExportCommand;
|
||||
use App\Commands\State\ImportCommand;
|
||||
use App\Commands\State\SyncCommand;
|
||||
use App\Commands\System\IndexCommand;
|
||||
use App\Commands\System\PruneCommand;
|
||||
use App\Libs\Mappers\Import\MemoryMapper;
|
||||
@@ -87,6 +88,7 @@ return (function () {
|
||||
];
|
||||
|
||||
$config['backends_file'] = fixPath(env('WS_BACKENDS_FILE', ag($config, 'path') . '/config/servers.yaml'));
|
||||
$config['mapper_file'] = fixPath(env('WS_MAPPER_FILE', ag($config, 'path') . '/config/mapper.yaml'));
|
||||
|
||||
date_default_timezone_set(ag($config, 'tz', 'UTC'));
|
||||
$logDateFormat = makeDate()->format('Ymd');
|
||||
@@ -273,6 +275,14 @@ return (function () {
|
||||
'timer' => $checkTaskTimer((string)env('WS_CRON_EXPORT_AT', '30 */1 * * *'), '30 */1 * * *'),
|
||||
'args' => env('WS_CRON_EXPORT_ARGS', '-v'),
|
||||
],
|
||||
SyncCommand::TASK_NAME => [
|
||||
'command' => SyncCommand::ROUTE,
|
||||
'name' => SyncCommand::TASK_NAME,
|
||||
'info' => '[Alpha stage] Sync All users play state. Read the FAQ.',
|
||||
'enabled' => (bool)env('WS_CRON_SYNC', false),
|
||||
'timer' => $checkTaskTimer((string)env('WS_CRON_SYNC_AT', '9 */3 * * *'), '9 */3 * * *'),
|
||||
'args' => env('WS_CRON_SYNC_ARGS', '-v'),
|
||||
],
|
||||
BackupCommand::TASK_NAME => [
|
||||
'command' => BackupCommand::ROUTE,
|
||||
'name' => BackupCommand::TASK_NAME,
|
||||
|
||||
@@ -196,7 +196,7 @@ return (function () {
|
||||
};
|
||||
|
||||
// -- Do not forget to update the tasks list if you add a new task.
|
||||
$tasks = ['import', 'export', 'backup', 'prune', 'indexes'];
|
||||
$tasks = ['import', 'export', 'backup', 'prune', 'indexes', 'sync'];
|
||||
$task_env = [
|
||||
[
|
||||
'key' => 'WS_CRON_{TASK}',
|
||||
|
||||
@@ -17,7 +17,7 @@ use Psr\Http\Message\StreamInterface as iStream;
|
||||
use Psr\Http\Message\UriInterface as iUri;
|
||||
use Psr\Log\LoggerInterface as iLogger;
|
||||
use Symfony\Contracts\HttpClient\Exception\ExceptionInterface;
|
||||
use Symfony\Contracts\HttpClient\ResponseInterface;
|
||||
use Symfony\Contracts\HttpClient\ResponseInterface as iResponse;
|
||||
|
||||
interface ClientInterface
|
||||
{
|
||||
@@ -85,7 +85,7 @@ interface ClientInterface
|
||||
* @param iImport $mapper mapper to use.
|
||||
* @param iDate|null $after only import items after this date.
|
||||
*
|
||||
* @return array<array-key,ResponseInterface> responses.
|
||||
* @return array<array-key,iResponse> responses.
|
||||
*/
|
||||
public function pull(iImport $mapper, iDate|null $after = null): array;
|
||||
|
||||
@@ -96,7 +96,7 @@ interface ClientInterface
|
||||
* @param iStream|null $writer writer to use.
|
||||
* @param array $opts options for backup.
|
||||
*
|
||||
* @return array<array-key,ResponseInterface> responses.
|
||||
* @return array<array-key,iResponse> responses.
|
||||
*/
|
||||
public function backup(iImport $mapper, iStream|null $writer = null, array $opts = []): array;
|
||||
|
||||
@@ -107,7 +107,7 @@ interface ClientInterface
|
||||
* @param QueueRequests $queue queue to use.
|
||||
* @param iDate|null $after only export items after this date.
|
||||
*
|
||||
* @return array<array-key,ResponseInterface> responses.
|
||||
* @return array<array-key,iResponse> responses.
|
||||
*/
|
||||
public function export(iImport $mapper, QueueRequests $queue, iDate|null $after = null): array;
|
||||
|
||||
@@ -325,4 +325,12 @@ interface ClientInterface
|
||||
* @return GuidInterface
|
||||
*/
|
||||
public function getGuid(): GuidInterface;
|
||||
|
||||
/**
|
||||
* Generate request to change the backend item play state.
|
||||
*
|
||||
* @param array<iState> $entities state entity.
|
||||
* @param array $opts options.
|
||||
*/
|
||||
public function updateState(array $entities, QueueRequests $queue, array $opts = []): void;
|
||||
}
|
||||
|
||||
10
src/Backends/Emby/Action/UpdateState.php
Normal file
10
src/Backends/Emby/Action/UpdateState.php
Normal file
@@ -0,0 +1,10 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace App\Backends\Emby\Action;
|
||||
|
||||
class UpdateState extends \App\Backends\Jellyfin\Action\UpdateState
|
||||
{
|
||||
protected string $action = 'emby.UpdateState';
|
||||
}
|
||||
@@ -19,6 +19,7 @@ use App\Backends\Emby\Action\GetLibrary;
|
||||
use App\Backends\Emby\Action\GetMetaData;
|
||||
use App\Backends\Emby\Action\GetSessions;
|
||||
use App\Backends\Emby\Action\GetUsersList;
|
||||
use App\Backends\Emby\Action\GetVersion;
|
||||
use App\Backends\Emby\Action\GetWebUrl;
|
||||
use App\Backends\Emby\Action\Import;
|
||||
use App\Backends\Emby\Action\InspectRequest;
|
||||
@@ -28,7 +29,7 @@ use App\Backends\Emby\Action\Push;
|
||||
use App\Backends\Emby\Action\SearchId;
|
||||
use App\Backends\Emby\Action\SearchQuery;
|
||||
use App\Backends\Emby\Action\ToEntity;
|
||||
use App\Backends\Jellyfin\Action\GetVersion;
|
||||
use App\Backends\Emby\Action\UpdateState;
|
||||
use App\Backends\Jellyfin\JellyfinClient;
|
||||
use App\Libs\Config;
|
||||
use App\Libs\Container;
|
||||
@@ -650,6 +651,23 @@ class EmbyClient implements iClient
|
||||
return Container::get(EmbyValidateContext::class)($context);
|
||||
}
|
||||
|
||||
/**
|
||||
* @inheritdoc
|
||||
*/
|
||||
public function updateState(array $entities, QueueRequests $queue, array $opts = []): void
|
||||
{
|
||||
$response = Container::get(UpdateState::class)(
|
||||
context: $this->context,
|
||||
entities: $entities,
|
||||
queue: $queue,
|
||||
opts: $opts
|
||||
);
|
||||
|
||||
if ($response->hasError()) {
|
||||
$this->logger->log($response->error->level(), $response->error->message, $response->error->context);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @inheritdoc
|
||||
*/
|
||||
|
||||
99
src/Backends/Jellyfin/Action/UpdateState.php
Normal file
99
src/Backends/Jellyfin/Action/UpdateState.php
Normal file
@@ -0,0 +1,99 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace App\Backends\Jellyfin\Action;
|
||||
|
||||
use App\Backends\Common\CommonTrait;
|
||||
use App\Backends\Common\Context;
|
||||
use App\Backends\Common\Response;
|
||||
use App\Backends\Jellyfin\JellyfinClient;
|
||||
use App\Libs\Entity\StateInterface as iState;
|
||||
use App\Libs\Extends\Date;
|
||||
use App\Libs\QueueRequests;
|
||||
use Psr\Log\LoggerInterface as iLogger;
|
||||
use Symfony\Contracts\HttpClient\HttpClientInterface as iHttp;
|
||||
|
||||
class UpdateState
|
||||
{
|
||||
use CommonTrait;
|
||||
|
||||
protected string $action = 'jellyfin.updateState';
|
||||
|
||||
public function __construct(protected iHttp $http, protected iLogger $logger)
|
||||
{
|
||||
}
|
||||
|
||||
/**
|
||||
* Get Backend unique identifier.
|
||||
*
|
||||
* @param Context $context Context instance.
|
||||
* @param array<iState> $entities State instance.
|
||||
* @param QueueRequests $queue QueueRequests instance.
|
||||
* @param array $opts optional options.
|
||||
*
|
||||
* @return Response
|
||||
*/
|
||||
public function __invoke(Context $context, array $entities, QueueRequests $queue, array $opts = []): Response
|
||||
{
|
||||
return $this->tryResponse(
|
||||
context: $context,
|
||||
fn: function () use ($context, $entities, $opts, $queue) {
|
||||
foreach ($entities as $entity) {
|
||||
$meta = $entity->getMetadata($context->backendName);
|
||||
if (count($meta) < 1) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if ($entity->isWatched() === (bool)ag($meta, iState::COLUMN_WATCHED)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (null === ($itemId = ag($meta, iState::COLUMN_ID))) {
|
||||
continue;
|
||||
}
|
||||
|
||||
$url = $context->backendUrl->withPath(
|
||||
r('/Users/{user_id}/PlayedItems/{item_id}', [
|
||||
'user_id' => $context->backendUser,
|
||||
'item_id' => $itemId,
|
||||
])
|
||||
);
|
||||
|
||||
if ($context->clientName === JellyfinClient::CLIENT_NAME) {
|
||||
$url = $url->withQuery(
|
||||
http_build_query([
|
||||
'DatePlayed' => makeDate($entity->updated)->format(Date::ATOM)
|
||||
])
|
||||
);
|
||||
}
|
||||
|
||||
$queue->add(
|
||||
$this->http->request(
|
||||
method: $entity->isWatched() ? 'POST' : 'DELETE',
|
||||
url: (string)$url,
|
||||
options: $context->backendHeaders + [
|
||||
'user_data' => [
|
||||
'context' => [
|
||||
'backend' => $context->backendName,
|
||||
'play_state' => $entity->isWatched() ? 'played' : 'unplayed',
|
||||
'item' => [
|
||||
'id' => $itemId,
|
||||
'title' => $entity->getName(),
|
||||
'type' => $entity->type == iState::TYPE_EPISODE ? 'episode' : 'movie',
|
||||
'state' => $entity->isWatched() ? 'played' : 'unplayed',
|
||||
],
|
||||
'url' => (string)$url,
|
||||
]
|
||||
],
|
||||
]
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
return new Response(status: true);
|
||||
},
|
||||
action: $this->action
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -31,6 +31,7 @@ use App\Backends\Jellyfin\Action\Push;
|
||||
use App\Backends\Jellyfin\Action\SearchId;
|
||||
use App\Backends\Jellyfin\Action\SearchQuery;
|
||||
use App\Backends\Jellyfin\Action\ToEntity;
|
||||
use App\Backends\Jellyfin\Action\UpdateState;
|
||||
use App\Libs\Config;
|
||||
use App\Libs\Container;
|
||||
use App\Libs\Entity\StateInterface as iState;
|
||||
@@ -43,8 +44,8 @@ use App\Libs\Options;
|
||||
use App\Libs\QueueRequests;
|
||||
use App\Libs\Uri;
|
||||
use DateTimeInterface as iDate;
|
||||
use Psr\Http\Message\ServerRequestInterface;
|
||||
use Psr\Http\Message\StreamInterface;
|
||||
use Psr\Http\Message\ServerRequestInterface as iRequest;
|
||||
use Psr\Http\Message\StreamInterface as iStream;
|
||||
use Psr\Http\Message\UriInterface;
|
||||
use Psr\Log\LoggerInterface as iLogger;
|
||||
use Throwable;
|
||||
@@ -205,7 +206,7 @@ class JellyfinClient implements iClient
|
||||
/**
|
||||
* @inheritdoc
|
||||
*/
|
||||
public function processRequest(ServerRequestInterface $request, array $opts = []): ServerRequestInterface
|
||||
public function processRequest(iRequest $request, array $opts = []): iRequest
|
||||
{
|
||||
$response = Container::get(InspectRequest::class)(context: $this->context, request: $request);
|
||||
|
||||
@@ -219,7 +220,7 @@ class JellyfinClient implements iClient
|
||||
/**
|
||||
* @inheritdoc
|
||||
*/
|
||||
public function parseWebhook(ServerRequestInterface $request): iState
|
||||
public function parseWebhook(iRequest $request): iState
|
||||
{
|
||||
$response = Container::get(ParseWebhook::class)(
|
||||
context: $this->context,
|
||||
@@ -270,7 +271,7 @@ class JellyfinClient implements iClient
|
||||
/**
|
||||
* @inheritdoc
|
||||
*/
|
||||
public function backup(iImport $mapper, StreamInterface|null $writer = null, array $opts = []): array
|
||||
public function backup(iImport $mapper, iStream|null $writer = null, array $opts = []): array
|
||||
{
|
||||
$response = Container::get(Backup::class)(
|
||||
context: $this->context,
|
||||
@@ -670,7 +671,7 @@ class JellyfinClient implements iClient
|
||||
/**
|
||||
* @inheritdoc
|
||||
*/
|
||||
public function fromRequest(array $config, ServerRequestInterface $request): array
|
||||
public function fromRequest(array $config, iRequest $request): array
|
||||
{
|
||||
return $config;
|
||||
}
|
||||
@@ -683,6 +684,23 @@ class JellyfinClient implements iClient
|
||||
return Container::get(JellyfinValidateContext::class)($context);
|
||||
}
|
||||
|
||||
/**
|
||||
* @inheritdoc
|
||||
*/
|
||||
public function updateState(array $entities, QueueRequests $queue, array $opts = []): void
|
||||
{
|
||||
$response = Container::get(UpdateState::class)(
|
||||
context: $this->context,
|
||||
entities: $entities,
|
||||
queue: $queue,
|
||||
opts: $opts
|
||||
);
|
||||
|
||||
if ($response->hasError()) {
|
||||
$this->logger->log($response->error->level(), $response->error->message, $response->error->context);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @inheritdoc
|
||||
*/
|
||||
|
||||
91
src/Backends/Plex/Action/UpdateState.php
Normal file
91
src/Backends/Plex/Action/UpdateState.php
Normal file
@@ -0,0 +1,91 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace App\Backends\Plex\Action;
|
||||
|
||||
use App\Backends\Common\CommonTrait;
|
||||
use App\Backends\Common\Context;
|
||||
use App\Backends\Common\Response;
|
||||
use App\Libs\Entity\StateInterface as iState;
|
||||
use App\Libs\QueueRequests;
|
||||
use Psr\Log\LoggerInterface as iLogger;
|
||||
use Symfony\Contracts\HttpClient\HttpClientInterface as iHttp;
|
||||
|
||||
final class UpdateState
|
||||
{
|
||||
use CommonTrait;
|
||||
|
||||
private string $action = 'plex.updateState';
|
||||
|
||||
public function __construct(protected iHttp $http, protected iLogger $logger)
|
||||
{
|
||||
}
|
||||
|
||||
/**
|
||||
* Get Backend unique identifier.
|
||||
*
|
||||
* @param Context $context Context instance.
|
||||
* @param array<iState> $entities State instance.
|
||||
* @param QueueRequests $queue QueueRequests instance.
|
||||
* @param array $opts optional options.
|
||||
*
|
||||
* @return Response
|
||||
*/
|
||||
public function __invoke(Context $context, array $entities, QueueRequests $queue, array $opts = []): Response
|
||||
{
|
||||
return $this->tryResponse(
|
||||
context: $context,
|
||||
fn: function () use ($context, $entities, $opts, $queue) {
|
||||
foreach ($entities as $entity) {
|
||||
$meta = $entity->getMetadata($context->backendName);
|
||||
if (count($meta) < 1) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (null === ($itemId = ag($meta, iState::COLUMN_ID))) {
|
||||
continue;
|
||||
}
|
||||
|
||||
$itemBackendState = (bool)ag($meta, iState::COLUMN_WATCHED);
|
||||
|
||||
if ($entity->isWatched() === $itemBackendState) {
|
||||
continue;
|
||||
}
|
||||
|
||||
$url = $context->backendUrl->withPath($entity->isWatched() ? '/:/scrobble' : '/:/unscrobble')
|
||||
->withQuery(
|
||||
http_build_query([
|
||||
'identifier' => 'com.plexapp.plugins.library',
|
||||
'key' => $itemId,
|
||||
])
|
||||
);
|
||||
|
||||
$queue->add(
|
||||
$this->http->request(
|
||||
method: 'GET',
|
||||
url: (string)$url,
|
||||
options: $context->backendHeaders + [
|
||||
'user_data' => [
|
||||
'context' => [
|
||||
'backend' => $context->backendName,
|
||||
'play_state' => $entity->isWatched() ? 'played' : 'unplayed',
|
||||
'item' => [
|
||||
'id' => $itemId,
|
||||
'title' => $entity->getName(),
|
||||
'type' => $entity->type == iState::TYPE_EPISODE ? 'episode' : 'movie',
|
||||
'state' => $entity->isWatched() ? 'played' : 'unplayed',
|
||||
],
|
||||
]
|
||||
],
|
||||
]
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
return new Response(status: true);
|
||||
},
|
||||
action: $this->action
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -29,6 +29,7 @@ use App\Backends\Plex\Action\Push;
|
||||
use App\Backends\Plex\Action\SearchId;
|
||||
use App\Backends\Plex\Action\SearchQuery;
|
||||
use App\Backends\Plex\Action\ToEntity;
|
||||
use App\Backends\Plex\Action\UpdateState;
|
||||
use App\Libs\Config;
|
||||
use App\Libs\Container;
|
||||
use App\Libs\DataUtil;
|
||||
@@ -687,6 +688,20 @@ class PlexClient implements iClient
|
||||
return Container::get(PlexValidateContext::class)($context);
|
||||
}
|
||||
|
||||
public function updateState(array $entities, QueueRequests $queue, array $opts = []): void
|
||||
{
|
||||
$response = Container::get(UpdateState::class)(
|
||||
context: $this->context,
|
||||
entities: $entities,
|
||||
queue: $queue,
|
||||
opts: $opts
|
||||
);
|
||||
|
||||
if ($response->hasError()) {
|
||||
$this->logger->log($response->error->level(), $response->error->message, $response->error->context);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @inheritdoc
|
||||
*/
|
||||
|
||||
769
src/Commands/State/SyncCommand.php
Normal file
769
src/Commands/State/SyncCommand.php
Normal file
@@ -0,0 +1,769 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace App\Commands\State;
|
||||
|
||||
use App\Backends\Common\ClientInterface as iClient;
|
||||
use App\Command;
|
||||
use App\Libs\Attributes\Route\Cli;
|
||||
use App\Libs\Config;
|
||||
use App\Libs\ConfigFile;
|
||||
use App\Libs\Entity\StateInterface as iState;
|
||||
use App\Libs\Extends\StreamLogHandler;
|
||||
use App\Libs\LogSuppressor;
|
||||
use App\Libs\Mappers\Import\NullMapper;
|
||||
use App\Libs\Message;
|
||||
use App\Libs\Options;
|
||||
use App\Libs\QueueRequests;
|
||||
use App\Libs\Stream;
|
||||
use Monolog\Logger;
|
||||
use Psr\Log\LoggerInterface as iLogger;
|
||||
use Psr\Log\NullLogger;
|
||||
use Symfony\Component\Console\Input\InputInterface as iInput;
|
||||
use Symfony\Component\Console\Input\InputOption;
|
||||
use Symfony\Component\Console\Output\OutputInterface as iOutput;
|
||||
use Symfony\Contracts\HttpClient\ResponseInterface;
|
||||
use Throwable;
|
||||
|
||||
/**
|
||||
* Class ExportCommand
|
||||
*
|
||||
* Command for exporting play state to backends.
|
||||
*
|
||||
* @package App\Console\Commands\State
|
||||
*/
|
||||
#[Cli(command: self::ROUTE)]
|
||||
class SyncCommand extends Command
|
||||
{
|
||||
public const string ROUTE = 'state:sync';
|
||||
|
||||
public const string TASK_NAME = 'sync';
|
||||
|
||||
private array $mapping = [];
|
||||
|
||||
/**
|
||||
* Class Constructor.
|
||||
*
|
||||
* @param NullMapper $mapper The instance of the DirectMapper class.
|
||||
* @param QueueRequests $queue The instance of the QueueRequests class.
|
||||
* @param iLogger $logger The instance of the iLogger class.
|
||||
*/
|
||||
public function __construct(
|
||||
private readonly NullMapper $mapper,
|
||||
private readonly QueueRequests $queue,
|
||||
private readonly iLogger $logger,
|
||||
private readonly LogSuppressor $suppressor,
|
||||
) {
|
||||
set_time_limit(0);
|
||||
ini_set('memory_limit', '-1');
|
||||
$this->mapper->setLogger(new NullLogger());
|
||||
parent::__construct();
|
||||
}
|
||||
|
||||
/**
|
||||
* Configure the command.
|
||||
*/
|
||||
protected function configure(): void
|
||||
{
|
||||
$this->setName(self::ROUTE)
|
||||
->setDescription('Sync All users play state to backends.')
|
||||
->addOption('force-full', 'f', InputOption::VALUE_NONE, 'Force full export. Ignore last export date.')
|
||||
->addOption('dry-run', null, InputOption::VALUE_NONE, 'Do not commit changes to backends.')
|
||||
->addOption('timeout', null, InputOption::VALUE_REQUIRED, 'Set request timeout in seconds.')
|
||||
->addOption(
|
||||
'select-backend',
|
||||
's',
|
||||
InputOption::VALUE_IS_ARRAY | InputOption::VALUE_OPTIONAL,
|
||||
'Select backend.'
|
||||
)
|
||||
->addOption('exclude', null, InputOption::VALUE_NONE, 'Inverse --select-backend logic.')
|
||||
->addOption('ignore-date', 'i', InputOption::VALUE_NONE, 'Ignore date comparison.')
|
||||
->addOption('logfile', null, InputOption::VALUE_REQUIRED, 'Save console output to file.')
|
||||
->setHelp(
|
||||
r(
|
||||
<<<HELP
|
||||
|
||||
pre-alpha command, not ready for production use. it's not working yet as expected,
|
||||
Use it at your own risk.
|
||||
|
||||
-------
|
||||
<notice>[ FAQ ]</notice>
|
||||
-------
|
||||
|
||||
<question>Will this work with limited tokens?</question>
|
||||
|
||||
No, This requires admin token for plex backend, and API keys for jellyfin/emby.
|
||||
We need the admin token for plex to generate user tokens for each user, and we need the API keys
|
||||
for jellyfin/emby to get the user list and update their play state.
|
||||
|
||||
<question># How does this sync operation mode work?</question>
|
||||
|
||||
It works by first, getting all users from all backends, and trying to match them by name,
|
||||
once we build a list of users that are matched, then we basically run the import/export for each user
|
||||
using in memory storage, it should not have any impact on the real database and cache.
|
||||
|
||||
You can help the matching by using the mapper file, which is a simple YAML file that maps users from one
|
||||
backend to another, this is useful when the usernames are different or when you want to merge users from
|
||||
different backends into one user.
|
||||
|
||||
Example of a mapper.yaml file:
|
||||
|
||||
- backend1: "mike_james"
|
||||
backend2: "james_mike"
|
||||
|
||||
- backend1: "john_doe"
|
||||
backend2: "doe_john"
|
||||
|
||||
HELP,
|
||||
[
|
||||
'cmd' => trim(commandContext()),
|
||||
'route' => self::ROUTE,
|
||||
|
||||
]
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Make sure the command is not running in parallel.
|
||||
*
|
||||
* @param iInput $input The input object containing the command data.
|
||||
* @param iOutput $output The output object for displaying command output.
|
||||
*
|
||||
* @return int The exit code of the command execution.
|
||||
*/
|
||||
protected function runCommand(iInput $input, iOutput $output): int
|
||||
{
|
||||
return $this->single(fn(): int => $this->process($input, $output), $output);
|
||||
}
|
||||
|
||||
/**
|
||||
* Process the command by pulling and comparing status and then pushing.
|
||||
*
|
||||
* @param iInput $input
|
||||
* @param iOutput $output
|
||||
* @return int
|
||||
*/
|
||||
protected function process(iInput $input, iOutput $output): int
|
||||
{
|
||||
if (null !== ($logfile = $input->getOption('logfile')) && true === ($this->logger instanceof Logger)) {
|
||||
$this->logger->setHandlers([
|
||||
$this->suppressor->withHandler(new StreamLogHandler(new Stream($logfile, 'w'), $output))
|
||||
]);
|
||||
}
|
||||
|
||||
$mapFile = Config::get('mapper_file');
|
||||
if (file_exists($mapFile) && filesize($mapFile) > 10) {
|
||||
$map = ConfigFile::open(Config::get('mapper_file'), 'yaml');
|
||||
$this->mapping = $map->getAll();
|
||||
}
|
||||
|
||||
$configFile = ConfigFile::open(Config::get('backends_file'), 'yaml');
|
||||
$configFile->setLogger($this->logger);
|
||||
|
||||
$backends = [];
|
||||
$selected = $input->getOption('select-backend');
|
||||
$isCustom = !empty($selected) && count($selected) > 0;
|
||||
$supported = Config::get('supported', []);
|
||||
|
||||
if (true === $input->getOption('dry-run')) {
|
||||
$this->logger->notice('Dry run mode. No changes will be committed to backends.');
|
||||
}
|
||||
|
||||
foreach ($configFile->getAll() as $backendName => $backend) {
|
||||
$type = strtolower(ag($backend, 'type', 'unknown'));
|
||||
|
||||
if ($isCustom && $input->getOption('exclude') === in_array($backendName, $selected)) {
|
||||
$this->logger->info("SYSTEM: Ignoring '{backend}' as requested by [-s, --select-backend].", [
|
||||
'backend' => $backendName
|
||||
]);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (true !== (bool)ag($backend, 'export.enabled')) {
|
||||
$this->logger->info("SYSTEM: Ignoring '{backend}' as the backend has export disabled.", [
|
||||
'backend' => $backendName
|
||||
]);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!isset($supported[$type])) {
|
||||
$this->logger->error(
|
||||
"SYSTEM: Ignoring '{backend}' due to unexpected type '{type}'. Expecting '{types}'.",
|
||||
[
|
||||
'type' => $type,
|
||||
'backend' => $backendName,
|
||||
'types' => implode(', ', array_keys($supported)),
|
||||
]
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (null === ($url = ag($backend, 'url')) || false === isValidURL($url)) {
|
||||
$this->logger->error("SYSTEM: Ignoring '{backend}' due to invalid URL. '{url}'.", [
|
||||
'url' => $url ?? 'None',
|
||||
'backend' => $backendName,
|
||||
]);
|
||||
continue;
|
||||
}
|
||||
|
||||
$backend['name'] = $backendName;
|
||||
$backends[$backendName] = $backend;
|
||||
}
|
||||
|
||||
if (empty($backends)) {
|
||||
$this->logger->warning('No backends were found.');
|
||||
return self::FAILURE;
|
||||
}
|
||||
|
||||
foreach ($backends as &$backend) {
|
||||
if (null === ($name = ag($backend, 'name'))) {
|
||||
continue;
|
||||
}
|
||||
|
||||
$opts = ag($backend, 'options', []);
|
||||
|
||||
if ($input->getOption('ignore-date')) {
|
||||
$opts[Options::IGNORE_DATE] = true;
|
||||
}
|
||||
|
||||
if ($input->getOption('trace')) {
|
||||
$opts[Options::DEBUG_TRACE] = true;
|
||||
}
|
||||
|
||||
if ($input->getOption('dry-run')) {
|
||||
$opts[Options::DRY_RUN] = true;
|
||||
}
|
||||
|
||||
if ($input->getOption('timeout')) {
|
||||
$opts['client']['timeout'] = $input->getOption('timeout');
|
||||
}
|
||||
|
||||
$backend['options'] = $opts;
|
||||
$backend['class'] = $this->getBackend($name, $backend)->setLogger($this->logger);
|
||||
}
|
||||
|
||||
unset($backend);
|
||||
|
||||
$this->logger->notice("SYSTEM: Getting users list from '{backends}'.", [
|
||||
'backends' => join(', ', array_map(fn($backend) => $backend['name'], $backends))
|
||||
]
|
||||
);
|
||||
|
||||
$users = [];
|
||||
|
||||
foreach ($backends as $backend) {
|
||||
/** @var iClient $client */
|
||||
$client = ag($backend, 'class');
|
||||
assert($backend instanceof iClient);
|
||||
$this->logger->info("SYSTEM: Getting users from '{backend}'.", [
|
||||
'backend' => $client->getContext()->backendName
|
||||
]);
|
||||
try {
|
||||
foreach ($client->getUsersList(['tokens' => true]) as $user) {
|
||||
$info = $backend;
|
||||
$info['token'] = ag($user, 'token', ag($backend, 'token'));
|
||||
$info['user'] = ag($user, 'id', ag($info, 'user'));
|
||||
$info['backendName'] = r("{backend}_{user}", [
|
||||
'backend' => ag($backend, 'name'),
|
||||
'user' => ag($user, 'name'),
|
||||
]);
|
||||
$info['displayName'] = ag($user, 'name');
|
||||
$info = ag_delete($info, 'options.' . Options::PLEX_USER_PIN);
|
||||
$info = ag_delete($info, 'options.' . Options::ADMIN_TOKEN);
|
||||
|
||||
unset($info['class']);
|
||||
$user['backend'] = ag($backend, 'name');
|
||||
$user['client_data'] = $info;
|
||||
$users[] = $user;
|
||||
}
|
||||
} catch (Throwable $e) {
|
||||
$this->logger->error(
|
||||
"Exception '{error.kind}' was thrown unhandled during '{client}: {backend}' get users list. '{error.message}' at '{error.file}:{error.line}'.",
|
||||
[
|
||||
'backend' => $client->getContext()->backendName,
|
||||
'client' => $client->getContext()->clientName,
|
||||
'error' => [
|
||||
'kind' => $e::class,
|
||||
'line' => $e->getLine(),
|
||||
'message' => $e->getMessage(),
|
||||
'file' => after($e->getFile(), ROOT_PATH),
|
||||
],
|
||||
'exception' => [
|
||||
'file' => $e->getFile(),
|
||||
'line' => $e->getLine(),
|
||||
'kind' => get_class($e),
|
||||
'message' => $e->getMessage(),
|
||||
],
|
||||
]
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
$users = $this->generate_users_list($users, $this->mapping);
|
||||
|
||||
if (count($users) < 1) {
|
||||
$this->logger->warning('No users were found.');
|
||||
return self::FAILURE;
|
||||
}
|
||||
|
||||
$this->logger->notice("SYSTEM: User matching results {results}.", [
|
||||
'results' => arrayToString($this->usersList($users)),
|
||||
]);
|
||||
|
||||
foreach (array_reverse($users) as $user) {
|
||||
$this->queue->reset();
|
||||
$this->mapper->reset();
|
||||
|
||||
$list = [];
|
||||
$displayName = null;
|
||||
|
||||
foreach (ag($user, 'backends', []) as $backend) {
|
||||
$name = ag($backend, 'client_data.backendName');
|
||||
$clientData = ag($backend, 'client_data');
|
||||
$clientData['name'] = $name;
|
||||
$clientData['class'] = makeBackend($clientData, $name)->setLogger($this->logger);
|
||||
$list[$name] = $clientData;
|
||||
$displayName = ag($backend, 'client_data.displayName', '??');
|
||||
}
|
||||
|
||||
$start = makeDate();
|
||||
$this->logger->notice("SYSTEM: Syncing user '{user}' -> '{list}'.", [
|
||||
'user' => $displayName,
|
||||
'list' => join(', ', array_keys($list)),
|
||||
'started' => $start,
|
||||
]);
|
||||
|
||||
$this->handleImport($displayName, $list);
|
||||
|
||||
$changes = $this->mapper->computeChanges(array_keys($list));
|
||||
|
||||
foreach ($changes as $b => $changed) {
|
||||
$count = count($changed);
|
||||
$this->logger->notice("SYSTEM: Changes detected for '{name}: {backend}' are '{changes}'.", [
|
||||
'name' => $displayName,
|
||||
'backend' => $b,
|
||||
'changes' => $count,
|
||||
'items' => array_map(
|
||||
fn(iState $i) => [
|
||||
'title' => $i->getName(),
|
||||
'state' => $i->isWatched() ? 'played' : 'unplayed',
|
||||
'meta' => $i->isSynced(array_keys($list)),
|
||||
],
|
||||
$changed
|
||||
)
|
||||
]);
|
||||
|
||||
if ($count >= 1) {
|
||||
/** @var iClient $client */
|
||||
$client = $list[$b]['class'];
|
||||
$client->updateState($changed, $this->queue);
|
||||
}
|
||||
}
|
||||
|
||||
$this->handleExport($displayName);
|
||||
|
||||
$end = makeDate();
|
||||
$this->logger->notice("SYSTEM: Completed syncing user '{name}' -> '{list}' in '{time.duration}'s", [
|
||||
'name' => $displayName,
|
||||
'list' => join(', ', array_keys($list)),
|
||||
'time' => [
|
||||
'start' => $start,
|
||||
'end' => $end,
|
||||
'duration' => $end->getTimestamp() - $start->getTimestamp(),
|
||||
],
|
||||
'memory' => [
|
||||
'now' => getMemoryUsage(),
|
||||
'peak' => getPeakMemoryUsage(),
|
||||
],
|
||||
]);
|
||||
exit(1);
|
||||
}
|
||||
|
||||
return self::SUCCESS;
|
||||
}
|
||||
|
||||
protected function handleImport(string $name, array $backends): void
|
||||
{
|
||||
/** @var array<array-key,ResponseInterface> $queue */
|
||||
$queue = [];
|
||||
|
||||
foreach ($backends as $backend) {
|
||||
/** @var iClient $client */
|
||||
$client = ag($backend, 'class');
|
||||
array_push($queue, ...$client->pull($this->mapper));
|
||||
}
|
||||
|
||||
$start = makeDate();
|
||||
$this->logger->notice("SYSTEM: Waiting on '{total}' requests for import '{name}' data.", [
|
||||
'name' => $name,
|
||||
'total' => number_format(count($queue)),
|
||||
'time' => [
|
||||
'start' => $start,
|
||||
],
|
||||
'memory' => [
|
||||
'now' => getMemoryUsage(),
|
||||
'peak' => getPeakMemoryUsage(),
|
||||
],
|
||||
]);
|
||||
|
||||
foreach ($queue as $_key => $response) {
|
||||
$requestData = $response->getInfo('user_data');
|
||||
|
||||
try {
|
||||
$requestData['ok']($response);
|
||||
} catch (Throwable $e) {
|
||||
$requestData['error']($e);
|
||||
}
|
||||
|
||||
$queue[$_key] = null;
|
||||
|
||||
gc_collect_cycles();
|
||||
}
|
||||
|
||||
$end = makeDate();
|
||||
$this->logger->notice(
|
||||
"SYSTEM: Completed waiting on '{total}' requests in '{time.duration}'s for importing '{name}' data. Parsed '{responses.size}' of data.",
|
||||
[
|
||||
'name' => $name,
|
||||
'total' => number_format(count($queue)),
|
||||
'time' => [
|
||||
'start' => $start,
|
||||
'end' => $end,
|
||||
'duration' => $end->getTimestamp() - $start->getTimestamp(),
|
||||
],
|
||||
'memory' => [
|
||||
'now' => getMemoryUsage(),
|
||||
'peak' => getPeakMemoryUsage(),
|
||||
],
|
||||
'responses' => [
|
||||
'size' => fsize((int)Message::get('response.size', 0)),
|
||||
],
|
||||
]
|
||||
);
|
||||
|
||||
Message::add('response.size', 0);
|
||||
}
|
||||
|
||||
protected function handleExport(string $name): void
|
||||
{
|
||||
$total = count($this->queue->getQueue());
|
||||
if ($total < 1) {
|
||||
$this->logger->notice("SYSTEM: No play state changes detected for '{name}' backends.", ['name' => $name]);
|
||||
return;
|
||||
}
|
||||
|
||||
$this->logger->notice("SYSTEM: Sending '{total}' change play state requests for '{name}'.", [
|
||||
'name' => $name,
|
||||
'total' => $total
|
||||
]);
|
||||
|
||||
foreach ($this->queue->getQueue() as $response) {
|
||||
$context = ag($response->getInfo('user_data'), 'context', []);
|
||||
|
||||
try {
|
||||
if (200 !== ($statusCode = $response->getStatusCode())) {
|
||||
$this->logger->error(
|
||||
"Request to change '{name}: {backend}' '{item.title}' play state returned with unexpected '{status_code}' status code.",
|
||||
[
|
||||
'name' => $name,
|
||||
'status_code' => $statusCode,
|
||||
...$context,
|
||||
],
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
$this->logger->notice("Marked '{name}: {backend}' '{item.title}' as '{play_state}'.", [
|
||||
'name' => $name,
|
||||
...$context
|
||||
]);
|
||||
} catch (Throwable $e) {
|
||||
$this->logger->error(
|
||||
message: "Exception '{error.kind}' was thrown unhandled during '{name}: {backend}' request to change play state of {item.type} '{item.title}'. '{error.message}' at '{error.file}:{error.line}'.",
|
||||
context: [
|
||||
'name' => $name,
|
||||
'error' => [
|
||||
'kind' => $e::class,
|
||||
'line' => $e->getLine(),
|
||||
'message' => $e->getMessage(),
|
||||
'file' => after($e->getFile(), ROOT_PATH),
|
||||
],
|
||||
...$context,
|
||||
'exception' => [
|
||||
'file' => $e->getFile(),
|
||||
'line' => $e->getLine(),
|
||||
'kind' => get_class($e),
|
||||
'message' => $e->getMessage(),
|
||||
],
|
||||
]
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
$this->logger->notice("SYSTEM: Sent '{total}' change play state requests for '{name}'.", [
|
||||
'name' => $name,
|
||||
'total' => $total
|
||||
]);
|
||||
}
|
||||
|
||||
/**
|
||||
* Generate a list of users that are matched across all backends.
|
||||
*
|
||||
* @param array $users The list of users from all backends.
|
||||
* @param array{string: array{string: string, options: array}} $map The map of users to match.
|
||||
*
|
||||
* @return array{name: string, backends: array<string, array<string, mixed>>}[] The list of matched users.
|
||||
*/
|
||||
private function generate_users_list(array $users, array $map = []): array
|
||||
{
|
||||
$allBackends = [];
|
||||
foreach ($users as $u) {
|
||||
if (!in_array($u['backend'], $allBackends, true)) {
|
||||
$allBackends[] = $u['backend'];
|
||||
}
|
||||
}
|
||||
|
||||
// Build a lookup: $usersBy[backend][lowercased_name] = userObject
|
||||
$usersBy = [];
|
||||
foreach ($users as $user) {
|
||||
$backend = $user['backend'];
|
||||
$nameLower = strtolower($user['name']);
|
||||
|
||||
if (!isset($usersBy[$backend])) {
|
||||
$usersBy[$backend] = [];
|
||||
}
|
||||
$usersBy[$backend][$nameLower] = $user;
|
||||
}
|
||||
|
||||
$results = [];
|
||||
|
||||
// Track used combos: array of [backend, nameLower].
|
||||
$used = [];
|
||||
|
||||
// Helper: check if a (backend, nameLower) is already used.
|
||||
$alreadyUsed = fn(string $b, string $n): bool => in_array([$b, $n], $used, true);
|
||||
|
||||
/**
|
||||
* Build a "unified" row from matched users across backends.
|
||||
* - $backendDict example: [ 'backend1' => userObj, 'backend2' => userObj, ... ]
|
||||
* - Picks a 'name' by "most frequent name" logic (with tie fallback).
|
||||
*
|
||||
* Returns an array shaped like:
|
||||
* <code language="php">
|
||||
* return [
|
||||
* 'name' => 'something',
|
||||
* 'backends' => [
|
||||
* 'backend1' => userObj,
|
||||
* 'backend2' => userObj,
|
||||
* ...
|
||||
* ]
|
||||
* ]
|
||||
* </code>
|
||||
*/
|
||||
$buildUnifiedRow = function (array $backendDict) use ($allBackends): array {
|
||||
// Collect the names in the order of $allBackends for tie-breaking.
|
||||
$names = [];
|
||||
foreach ($allBackends as $b) {
|
||||
if (isset($backendDict[$b])) {
|
||||
$names[] = $backendDict[$b]['name'];
|
||||
}
|
||||
}
|
||||
|
||||
// Tally frequencies
|
||||
$freq = [];
|
||||
foreach ($names as $n) {
|
||||
if (!isset($freq[$n])) {
|
||||
$freq[$n] = 0;
|
||||
}
|
||||
$freq[$n]++;
|
||||
}
|
||||
|
||||
// Decide a final 'name'
|
||||
if (empty($freq)) {
|
||||
$finalName = 'unknown';
|
||||
} else {
|
||||
$max = max($freq);
|
||||
$candidates = array_keys(array_filter($freq, fn($count) => $count === $max));
|
||||
|
||||
if (1 === count($candidates)) {
|
||||
$finalName = $candidates[0];
|
||||
} else {
|
||||
// Tie => pick the first from $names that’s in $candidates
|
||||
$finalName = null;
|
||||
foreach ($names as $n) {
|
||||
if (in_array($n, $candidates, true)) {
|
||||
$finalName = $n;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!$finalName) {
|
||||
$finalName = 'unknown';
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Build final row: "name" + sub-array "backends"
|
||||
$row = [
|
||||
'name' => $finalName,
|
||||
'backends' => [],
|
||||
];
|
||||
|
||||
// Fill 'backends'
|
||||
foreach ($allBackends as $b) {
|
||||
if (isset($backendDict[$b])) {
|
||||
$row['backends'][$b] = $backendDict[$b];
|
||||
}
|
||||
}
|
||||
|
||||
return $row;
|
||||
};
|
||||
|
||||
// Main logic: For each backend and each user in that backend, unify them if we find a match in ≥2 backends.
|
||||
// We do map-based matching first, then direct-name matching.
|
||||
foreach ($allBackends as $backend) {
|
||||
if (!isset($usersBy[$backend])) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// For each user in this backend
|
||||
foreach ($usersBy[$backend] as $nameLower => $userObj) {
|
||||
// Skip if already used
|
||||
if ($alreadyUsed($backend, $nameLower)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Map-based matching first
|
||||
$matchedMapEntry = null;
|
||||
foreach ($map as $mapRow) {
|
||||
if (isset($mapRow[$backend]['name']) && strtolower($mapRow[$backend]['name']) === $nameLower) {
|
||||
$matchedMapEntry = $mapRow;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if ($matchedMapEntry) {
|
||||
// Build mapMatch from the map row.
|
||||
$mapMatch = [$backend => $userObj];
|
||||
|
||||
// Gather all the other backends from the map
|
||||
foreach ($allBackends as $otherBackend) {
|
||||
if ($otherBackend === $backend) {
|
||||
continue;
|
||||
}
|
||||
if (isset($matchedMapEntry[$otherBackend]['name'])) {
|
||||
$mappedNameLower = strtolower($matchedMapEntry[$otherBackend]['name']);
|
||||
if (isset($usersBy[$otherBackend][$mappedNameLower])) {
|
||||
$mapMatch[$otherBackend] = $usersBy[$otherBackend][$mappedNameLower];
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If we matched ≥ 2 backends, unify them
|
||||
if (count($mapMatch) >= 2) {
|
||||
// --- MERGE map-based "options" into client_data => options, if any ---
|
||||
foreach ($mapMatch as $b => &$matchedUser) {
|
||||
// If the map entry has an 'options' array for this backend,
|
||||
// merge it into $matchedUser['client_data']['options'].
|
||||
if (isset($matchedMapEntry[$b]['options']) && is_array($matchedMapEntry[$b]['options'])) {
|
||||
$mapOptions = $matchedMapEntry[$b]['options'];
|
||||
|
||||
// Ensure $matchedUser['client_data'] is an array
|
||||
if (!isset($matchedUser['client_data']) || !is_array($matchedUser['client_data'])) {
|
||||
$matchedUser['client_data'] = [];
|
||||
}
|
||||
|
||||
// Ensure $matchedUser['client_data']['options'] is an array
|
||||
if (!isset($matchedUser['client_data']['options']) || !is_array(
|
||||
$matchedUser['client_data']['options']
|
||||
)) {
|
||||
$matchedUser['client_data']['options'] = [];
|
||||
}
|
||||
|
||||
// Merge the map's options
|
||||
$matchedUser['client_data']['options'] = array_replace_recursive(
|
||||
$matchedUser['client_data']['options'],
|
||||
$mapOptions
|
||||
);
|
||||
}
|
||||
}
|
||||
unset($matchedUser); // break reference from the loop
|
||||
|
||||
// Build final row
|
||||
$results[] = $buildUnifiedRow($mapMatch);
|
||||
|
||||
// Mark & remove from $usersBy
|
||||
foreach ($mapMatch as $b => $mu) {
|
||||
$nm = strtolower($mu['name']);
|
||||
$used[] = [$b, $nm];
|
||||
unset($usersBy[$b][$nm]);
|
||||
}
|
||||
continue;
|
||||
} else {
|
||||
$this->logger->error("No partial fallback match via map for '{backend}: {user}'", [
|
||||
'backend' => $userObj['backend'],
|
||||
'user' => $userObj['name'],
|
||||
]);
|
||||
}
|
||||
}
|
||||
|
||||
// Direct-name matching if map fails
|
||||
$directMatch = [$backend => $userObj];
|
||||
foreach ($allBackends as $otherBackend) {
|
||||
if ($otherBackend === $backend) {
|
||||
continue;
|
||||
}
|
||||
// Same name => direct match
|
||||
if (isset($usersBy[$otherBackend][$nameLower])) {
|
||||
$directMatch[$otherBackend] = $usersBy[$otherBackend][$nameLower];
|
||||
}
|
||||
}
|
||||
|
||||
// If direct matched ≥ 2 backends, unify
|
||||
if (count($directMatch) >= 2) {
|
||||
// No map "options" to merge here
|
||||
$results[] = $buildUnifiedRow($directMatch);
|
||||
|
||||
// Mark & remove them from $usersBy
|
||||
foreach ($directMatch as $b => $matchedUser) {
|
||||
$nm = strtolower($matchedUser['name']);
|
||||
$used[] = [$b, $nm];
|
||||
unset($usersBy[$b][$nm]);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
// If neither map nor direct matched for ≥2
|
||||
$this->logger->error("Cannot match user '{backend}: {user}' in any map row or direct match.", [
|
||||
'backend' => $userObj['backend'],
|
||||
'user' => $userObj['name']
|
||||
]);
|
||||
}
|
||||
}
|
||||
|
||||
return $results;
|
||||
}
|
||||
|
||||
private function usersList(array $list): array
|
||||
{
|
||||
$chunks = [];
|
||||
|
||||
foreach ($list as $row) {
|
||||
$name = $row['name'] ?? 'unknown';
|
||||
|
||||
$pairs = [];
|
||||
if (!empty($row['backends']) && is_array($row['backends'])) {
|
||||
foreach ($row['backends'] as $backendName => $backendData) {
|
||||
if (isset($backendData['name'])) {
|
||||
$pairs[] = r("{name}@{backend}", ['backend' => $backendName, 'name' => $backendData['name']]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
$chunks[] = r("{name}: {pairs}", ['name' => $name, 'pairs' => implode(', ', $pairs)]);
|
||||
}
|
||||
|
||||
return $chunks;
|
||||
}
|
||||
}
|
||||
@@ -15,7 +15,7 @@ use RuntimeException;
|
||||
* Represents an metadata as entity.
|
||||
*
|
||||
* @implements iState
|
||||
* @uses LoggerAwareTrait
|
||||
* @implements LoggerAwareTrait
|
||||
*/
|
||||
final class StateEntity implements iState
|
||||
{
|
||||
@@ -698,6 +698,22 @@ final class StateEntity implements iState
|
||||
return ag_exists($this->context, $key);
|
||||
}
|
||||
|
||||
public function isSynced(array $backends): array
|
||||
{
|
||||
$match = [];
|
||||
|
||||
|
||||
foreach ($backends as $backend) {
|
||||
if (null === ag($this->metadata, $backend)) {
|
||||
$match[$backend] = null;
|
||||
continue;
|
||||
}
|
||||
$match[$backend] = $this->isWatched() === (bool)ag($this->metadata[$backend], iState::COLUMN_WATCHED, 0);
|
||||
}
|
||||
|
||||
return $match;
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if the value of a given key in the entity object is equal to the corresponding value in the current object.
|
||||
* Some keys are special and require special logic to compare. For example, the updated and watched keys are special
|
||||
|
||||
@@ -422,4 +422,14 @@ interface StateInterface extends LoggerAwareInterface
|
||||
* @return bool Return true if the entity has contextual data related to the key.
|
||||
*/
|
||||
public function hasContext(string $key): bool;
|
||||
|
||||
/**
|
||||
* Check whether the entity play state is synced with the backends.
|
||||
*
|
||||
* @param array $backends List of backends to check.
|
||||
*
|
||||
* @return array{string: bool|null} Return true if the entity is synced with the backend. or false if not, or null if the backend has no metadata.
|
||||
*/
|
||||
public function isSynced(array $backends): array;
|
||||
|
||||
}
|
||||
|
||||
@@ -116,7 +116,7 @@ class MemoryMapper implements iImport
|
||||
$this->addPointers($this->objects[$pointer], $pointer);
|
||||
}
|
||||
|
||||
$this->logger->info("MAPPER: Preloaded '{pointers}' pointers, and '{objects}' objects into memory.", [
|
||||
$this->logger->info("MemoryMapper: Preloaded '{pointers}' pointers, and '{objects}' objects into memory.", [
|
||||
'pointers' => number_format(count($this->pointers)),
|
||||
'objects' => number_format(count($this->objects)),
|
||||
]);
|
||||
@@ -133,12 +133,12 @@ class MemoryMapper implements iImport
|
||||
*
|
||||
* @return self
|
||||
*/
|
||||
private function addNewItem(iState $entity, array $opts = []): self
|
||||
protected function addNewItem(iState $entity, array $opts = []): self
|
||||
{
|
||||
if (true === (bool)ag($opts, Options::IMPORT_METADATA_ONLY)) {
|
||||
Message::increment("{$entity->via}.{$entity->type}.failed");
|
||||
$this->logger->notice(
|
||||
"MAPPER: Ignoring '{backend}' '{title}'. Does not exist in database. And backend set as metadata source only.",
|
||||
"MemoryMapper: Ignoring '{backend}' '{title}'. Does not exist in database. And backend set as metadata source only.",
|
||||
[
|
||||
'metaOnly' => true,
|
||||
'backend' => $entity->via,
|
||||
@@ -178,7 +178,7 @@ class MemoryMapper implements iImport
|
||||
];
|
||||
}
|
||||
|
||||
$this->logger->notice("MAPPER: '{backend}' added '{title}' as new item.", [
|
||||
$this->logger->notice("MemoryMapper: '{backend}' added '{title}' as new item.", [
|
||||
'backend' => $entity->via,
|
||||
'title' => $entity->getName(),
|
||||
true === $this->inTraceMode() ? 'trace' : 'metadata' => $data,
|
||||
@@ -197,7 +197,7 @@ class MemoryMapper implements iImport
|
||||
*
|
||||
* @return self
|
||||
*/
|
||||
private function handleTainted(string|int $pointer, iState $cloned, iState $entity, array $opts = []): self
|
||||
protected function handleTainted(string|int $pointer, iState $cloned, iState $entity, array $opts = []): self
|
||||
{
|
||||
$keys = [iState::COLUMN_META_DATA];
|
||||
|
||||
@@ -215,7 +215,7 @@ class MemoryMapper implements iImport
|
||||
$changes = $this->objects[$pointer]->diff(fields: $keys);
|
||||
|
||||
if (count($changes) >= 1) {
|
||||
$this->logger->notice("MAPPER: '{backend}' updated '{title}' metadata.", [
|
||||
$this->logger->notice("MemoryMapper: '{backend}' updated '{title}' metadata.", [
|
||||
'id' => $cloned->id,
|
||||
'backend' => $entity->via,
|
||||
'title' => $cloned->getName(),
|
||||
@@ -240,7 +240,7 @@ class MemoryMapper implements iImport
|
||||
}
|
||||
|
||||
$this->logger->notice(
|
||||
"MAPPER: '{backend}' item '{id}: {title}' is marked as '{state}' vs local state '{local_state}', However due to the following reason '{reasons}' it was not considered as valid state.",
|
||||
"MemoryMapper: '{backend}' item '{id}: {title}' is marked as '{state}' vs local state '{local_state}', However due to the following reason '{reasons}' it was not considered as valid state.",
|
||||
[
|
||||
'id' => $this->objects[$pointer]->id,
|
||||
'backend' => $entity->via,
|
||||
@@ -255,7 +255,7 @@ class MemoryMapper implements iImport
|
||||
}
|
||||
|
||||
if (true === $this->inTraceMode()) {
|
||||
$this->logger->info("MAPPER: '{backend}' '{title}' No metadata changes detected.", [
|
||||
$this->logger->info("MemoryMapper: '{backend}' '{title}' No metadata changes detected.", [
|
||||
'id' => $cloned->id,
|
||||
'backend' => $entity->via,
|
||||
'title' => $cloned->getName(),
|
||||
@@ -265,7 +265,7 @@ class MemoryMapper implements iImport
|
||||
return $this;
|
||||
}
|
||||
|
||||
private function handleOldEntity(string|int $pointer, iState $cloned, iState $entity, array $opts = []): self
|
||||
protected function handleOldEntity(string|int $pointer, iState $cloned, iState $entity, array $opts = []): self
|
||||
{
|
||||
$keys = [iState::COLUMN_META_DATA];
|
||||
|
||||
@@ -284,7 +284,7 @@ class MemoryMapper implements iImport
|
||||
);
|
||||
|
||||
if (count($changes) >= 1) {
|
||||
$this->logger->notice("MAPPER: '{backend}' marked '{title}' as 'unplayed'.", [
|
||||
$this->logger->notice("MemoryMapper: '{backend}' marked '{title}' as 'unplayed'.", [
|
||||
'id' => $cloned->id,
|
||||
'backend' => $entity->via,
|
||||
'title' => $cloned->getName(),
|
||||
@@ -324,7 +324,7 @@ class MemoryMapper implements iImport
|
||||
$this->objects[$pointer] = $this->objects[$pointer]->apply(entity: $entity, fields: $_keys);
|
||||
|
||||
$this->logger->notice(
|
||||
$progress ? "MAPPER: '{backend}' updated '{title}' due to play progress change." : "MAPPER: '{backend}' updated '{title}' metadata.",
|
||||
$progress ? "MemoryMapper: '{backend}' updated '{title}' due to play progress change." : "MemoryMapper: '{backend}' updated '{title}' metadata.",
|
||||
[
|
||||
'id' => $cloned->id,
|
||||
'backend' => $entity->via,
|
||||
@@ -354,7 +354,7 @@ class MemoryMapper implements iImport
|
||||
|
||||
if ($entity->isWatched() !== $this->objects[$pointer]->isWatched()) {
|
||||
$this->logger->notice(
|
||||
"MAPPER: '{backend}' item '{id}: {title}' is marked as '{state}' vs local state '{local_state}', However due to the remote item date '{remote_date}' being older than the last backend sync date '{local_date}'. it was not considered as valid state.",
|
||||
"MemoryMapper: '{backend}' item '{id}: {title}' is marked as '{state}' vs local state '{local_state}', However due to the remote item date '{remote_date}' being older than the last backend sync date '{local_date}'. it was not considered as valid state.",
|
||||
[
|
||||
'id' => $this->objects[$pointer]->id,
|
||||
'backend' => $entity->via,
|
||||
@@ -369,7 +369,7 @@ class MemoryMapper implements iImport
|
||||
}
|
||||
|
||||
if ($this->inTraceMode()) {
|
||||
$this->logger->debug("MAPPER: Ignoring '{backend}' '{title}'. No changes detected.", [
|
||||
$this->logger->debug("MemoryMapper: Ignoring '{backend}' '{title}'. No changes detected.", [
|
||||
'id' => $cloned->id,
|
||||
'backend' => $entity->via,
|
||||
'title' => $cloned->getName(),
|
||||
@@ -385,7 +385,7 @@ class MemoryMapper implements iImport
|
||||
public function add(iState $entity, array $opts = []): self
|
||||
{
|
||||
if (false === $entity->hasGuids() && false === $entity->hasRelativeGuid()) {
|
||||
$this->logger->warning("MAPPER: Ignoring '{backend}' '{title}'. No valid/supported external ids.", [
|
||||
$this->logger->warning("MemoryMapper: Ignoring '{backend}' '{title}'. No valid/supported external ids.", [
|
||||
'id' => $entity->id,
|
||||
'backend' => $entity->via,
|
||||
'title' => $entity->getName(),
|
||||
@@ -396,7 +396,7 @@ class MemoryMapper implements iImport
|
||||
|
||||
if (true === $entity->isEpisode() && $entity->episode < 1) {
|
||||
$this->logger->warning(
|
||||
"MAPPER: Ignoring '{backend}' '{id}: {title}'. Item was marked as episode but no episode number was provided.",
|
||||
"MemoryMapper: Ignoring '{backend}' '{id}: {title}'. Item was marked as episode but no episode number was provided.",
|
||||
[
|
||||
'id' => $entity->id ?? ag($entity->getMetadata($entity->via), iState::COLUMN_ID, ''),
|
||||
'backend' => $entity->via,
|
||||
@@ -444,7 +444,7 @@ class MemoryMapper implements iImport
|
||||
* 3 - mark entity as tainted and re-process it.
|
||||
*/
|
||||
if (true === $hasAfter && true === $cloned->isWatched() && false === $entity->isWatched()) {
|
||||
$message = "MAPPER: Watch state conflict detected in '{backend}: {title}' '{new_state}' vs local state '{id}: {current_state}'.";
|
||||
$message = "MemoryMapper: Watch state conflict detected in '{backend}: {title}' '{new_state}' vs local state '{id}: {current_state}'.";
|
||||
$hasMeta = count($cloned->getMetadata($entity->via)) >= 1;
|
||||
$hasDate = $entity->updated === ag($cloned->getMetadata($entity->via), iState::COLUMN_META_DATA_PLAYED_AT);
|
||||
|
||||
@@ -492,10 +492,10 @@ class MemoryMapper implements iImport
|
||||
|
||||
$changes = $this->objects[$pointer]->diff(fields: $keys);
|
||||
|
||||
$message = "MAPPER: '{backend}' Updated '{title}'.";
|
||||
$message = "MemoryMapper: '{backend}' Updated '{title}'.";
|
||||
|
||||
if ($cloned->isWatched() !== $this->objects[$pointer]->isWatched()) {
|
||||
$message = "MAPPER: '{backend}' Updated and marked '{id}: {title}' as '{state}'.";
|
||||
$message = "MemoryMapper: '{backend}' Updated and marked '{id}: {title}' as '{state}'.";
|
||||
}
|
||||
|
||||
if (count($changes) >= 1) {
|
||||
@@ -525,7 +525,10 @@ class MemoryMapper implements iImport
|
||||
];
|
||||
}
|
||||
|
||||
$this->logger->debug("MAPPER: Ignoring '{backend}' '{title}'. Metadata & play state are identical.", $context);
|
||||
$this->logger->debug(
|
||||
"MemoryMapper: Ignoring '{backend}' '{title}'. Metadata & play state are identical.",
|
||||
$context
|
||||
);
|
||||
|
||||
Message::increment("{$entity->via}.{$entity->type}.ignored_no_change");
|
||||
|
||||
@@ -597,13 +600,13 @@ class MemoryMapper implements iImport
|
||||
$count = count($this->changed);
|
||||
|
||||
if (0 === $count) {
|
||||
$this->logger->notice('MAPPER: No changes detected.');
|
||||
$this->logger->notice('MemoryMapper: No changes detected.');
|
||||
return $list;
|
||||
}
|
||||
$inDryRunMode = $this->inDryRunMode();
|
||||
|
||||
if (true === $inDryRunMode) {
|
||||
$this->logger->notice("MAPPER: Recorded '{total}' object changes.", ['total' => $count]);
|
||||
$this->logger->notice("MemoryMapper: Recorded '{total}' object changes.", ['total' => $count]);
|
||||
}
|
||||
|
||||
foreach ($this->changed as $pointer) {
|
||||
|
||||
105
src/Libs/Mappers/Import/NullMapper.php
Normal file
105
src/Libs/Mappers/Import/NullMapper.php
Normal file
@@ -0,0 +1,105 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace App\Libs\Mappers\Import;
|
||||
|
||||
use App\Libs\Database\DatabaseInterface as iDB;
|
||||
use App\Libs\Entity\StateInterface as iState;
|
||||
use App\Libs\Mappers\ImportInterface as iImport;
|
||||
use DateTimeInterface as iDate;
|
||||
use Psr\Log\LoggerInterface as iLogger;
|
||||
use Psr\SimpleCache\CacheInterface as iCache;
|
||||
|
||||
/**
|
||||
* Mapper class based on MemoryMapper with stripped down functionality.
|
||||
*
|
||||
* @implements iImport
|
||||
*/
|
||||
class NullMapper extends MemoryMapper implements iImport
|
||||
{
|
||||
public function __construct(iLogger $logger, iDB $db, iCache $cache)
|
||||
{
|
||||
$this->fullyLoaded = true;
|
||||
parent::__construct($logger, $db, $cache);
|
||||
}
|
||||
|
||||
public function loadData(?iDate $date = null): static
|
||||
{
|
||||
$this->fullyLoaded = true;
|
||||
return $this;
|
||||
}
|
||||
|
||||
public function add(iState $entity, array $opts = []): static
|
||||
{
|
||||
$this->fullyLoaded = true;
|
||||
return parent::add($entity, $opts);
|
||||
}
|
||||
|
||||
/**
|
||||
* @inheritdoc
|
||||
*/
|
||||
public function remove(iState $entity): bool
|
||||
{
|
||||
if (false === ($pointer = $this->getPointer($entity))) {
|
||||
return false;
|
||||
}
|
||||
|
||||
$this->removePointers($this->objects[$pointer]);
|
||||
|
||||
if (null !== ($this->objects[$pointer] ?? null)) {
|
||||
unset($this->objects[$pointer]);
|
||||
}
|
||||
|
||||
if (null !== ($this->changed[$pointer] ?? null)) {
|
||||
unset($this->changed[$pointer]);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* @inheritdoc
|
||||
*/
|
||||
public function commit(): array
|
||||
{
|
||||
$this->reset();
|
||||
|
||||
return [
|
||||
iState::TYPE_MOVIE => ['added' => 0, 'updated' => 0, 'failed' => 0],
|
||||
iState::TYPE_EPISODE => ['added' => 0, 'updated' => 0, 'failed' => 0],
|
||||
];
|
||||
}
|
||||
|
||||
/**
|
||||
* Compute the play state for each backend.
|
||||
*
|
||||
* @param array $backends List of backends to check.
|
||||
*
|
||||
* @return array List of changes for each backend.
|
||||
*/
|
||||
public function computeChanges(array $backends): array
|
||||
{
|
||||
$changes = [];
|
||||
|
||||
foreach ($backends as $backend) {
|
||||
$changes[$backend] = [];
|
||||
}
|
||||
|
||||
foreach ($this->objects as $entity) {
|
||||
$state = $entity->isSynced($backends);
|
||||
foreach ($state as $b => $value) {
|
||||
if (false === $value) {
|
||||
$changes[$b][] = $entity;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return $changes;
|
||||
}
|
||||
|
||||
public function __destruct()
|
||||
{
|
||||
// -- disabled autocommit.
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user