Add the ability to send backends requests in sequential instead of the default parallel mode

This commit is contained in:
arabcoders
2025-05-05 22:04:47 +03:00
parent 29dbf2f06f
commit 45e9e27b54
15 changed files with 443 additions and 673 deletions

22
NEWS.md
View File

@@ -1,4 +1,24 @@
# Old Updates
# NEWS
This page contains old news about the project.
### 2025-03-13
We have recently added support for plex webhooks via tautulli which you can use if you don't have PlexPass. This should
help close the gap with other media servers.
### 2025-02-19
We have introduced new experimental feature to allow syncing watch progress for played items. This feature is still in
early stages, and might not work as expected. and there are probably still many bugs that we need to fix. Please report
any issues you might face.
The feature is disabled by default, to enable it you need to run add this environment variable `WS_PROGRESS_THRESHOLD`
with seconds as value, the minimum value is `180` seconds. `0` seconds means it's disabled. We think reasonable value is
`86400` or more this number is about 1day.
We are still not keen on this feature, and it might be removed in future releases if we aren't able to deal with the
issues we are facing.
### 2025-02-11

View File

@@ -9,23 +9,34 @@ out of the box, this tool support `Jellyfin`, `Plex` and `Emby` media servers.
# Updates
### 2025-05-05
Weve added a new feature that lets you send requests **sequentially** to the backends instead of using the default
**parallel** mode. This can be especially helpful if you have very large libraries, slow disks, or simply want to avoid
overloading the backends with too many concurrent requests. You can enable by enabling `WS_HTTP_SYNC_REQUESTS`
environment variable. This mode only applies to `import`, `export`, and `backup` tasks at the moment.
Additionally, two command-line flags let you override the mode on the fly `--sync-requests` and `--async-requests`.
Well be evaluating this feature for a trial period. If it proves effective (and the slowdown is acceptable), we may
make **sequential** mode the default in a future release.
> [!NOTE]
> Because we cache many HTTP requests, comparing timings between sequential and parallel runs of `import` can be
> misleading. To get an accurate benchmark of `--sync-requests`, either start with a fresh setup (new installation) or
> purge your Redis instance before testing.
### 2025-04-06
We have recently re-worked how the `backend:create` command works, and we no longer generate random name for invalid backends names or usernames. We do a normalization step to make sure the name is valid. This should help with the confusion of having random names. This means if you re-run the `backend:create` you most likely will get a different name than before. So, we suggest to re-run the command with `--re-create` flag. This flag will delete the current sub-users, and regenerate updated config files.
We have recently re-worked how the `backend:create` command works, and we no longer generate random name for invalid
backends names or usernames. We do a normalization step to make sure the name is valid. This should help with the
confusion of having random names. This means if you re-run the `backend:create` you most likely will get a different
name than before. So, we suggest to re-run the command with `--re-create` flag. This flag will delete the current
sub-users, and regenerate updated config files.
We have also added new guard for the command, so if you already generated your sub-users, re-running the command will show you a warning message and exit without doing anything. to run the command again either you need to use `--re-create`or `--run` flag. The `--run` flag will run the command without deleting the current sub-users.
### 2025-03-13
We have recently added support for plex webhooks via tautulli which you can use if you don't have PlexPass. This should help close the gap with other media servers.
### 2025-02-19
We have introduced new experimental feature to allow syncing watch progress for played items. This feature is still in early stages, and might not work as expected. and there are probably still many bugs that we need to fix. Please report any issues you might face.
The feature is disabled by default, to enable it you need to run add this environment variable `WS_PROGRESS_THRESHOLD` with seconds as value, the minimum value is `180` seconds. `0` seconds means it's disabled. We think reasonable value is `86400` or more this number is about 1day.
We are still not keen on this feature, and it might be removed in future releases if we aren't able to deal with the issues we are facing.
We have also added new guard for the command, so if you already generated your sub-users, re-running the command will
show you a warning message and exit without doing anything. to run the command again either you need to use
`--re-create`or `--run` flag. The `--run` flag will run the command without deleting the current sub-users.
---
Refer to [NEWS](NEWS.md) for old updates.
@@ -42,11 +53,13 @@ Refer to [NEWS](NEWS.md) for old updates.
* Sync your watch progress/play state via webhooks or scheduled tasks.
* Check if your media backends have stale references to old files.
If you like my work, you might also like my other project [YTPTube](https://github.com/arabcoders/ytptube), which is simple and to the point yt-dlp frontend to help download content from all supported sites by yt-dlp.
If you like my work, you might also like my other project [YTPTube](https://github.com/arabcoders/ytptube), which is
simple and to the point yt-dlp frontend to help download content from all supported sites by yt-dlp.
# Install
First, start by creating a directory to store the data, to follow along with this setup, create directory called `data` at your working directory. Then proceed to use your preferred method to install the tool.
First, start by creating a directory to store the data, to follow along with this setup, create directory called `data`
at your working directory. Then proceed to use your preferred method to install the tool.
### Via compose file.
@@ -79,26 +92,34 @@ $ docker run -d --rm --user "${UID:-1000}:${GID:-1000}" --name watchstate --rest
```
> [!IMPORTANT]
> It's really important to match the `user:`, `--user` to the owner of the `data` directory, the container is rootless, as such it will crash if it's unable to write to the data directory.
> It's really important to match the `user:`, `--user` to the owner of the `data` directory, the container is rootless,
> as such it will crash if it's unable to write to the data directory.
>
> It's really not recommended to run containers as root, but if you fail to run the container you can try setting the `user: "0:0"` or `--user '0:0'` if that works it means you have permissions issues. refer to [FAQ](FAQ.md) to troubleshoot the problem.
> It's really not recommended to run containers as root, but if you fail to run the container you can try setting the
`user: "0:0"` or `--user '0:0'` if that works it means you have permissions issues. refer to [FAQ](FAQ.md) to
> troubleshoot the problem.
### Unraid users
For `Unraid` users You can install the `Community Applications` plugin, and search for **watchstate** it comes preconfigured. Otherwise, to manually install it, you need to add value to the `Extra Parameters` section in advanced tab/view. add the following value `--user 99:100`.
For `Unraid` users You can install the `Community Applications` plugin, and search for **watchstate** it comes
preconfigured. Otherwise, to manually install it, you need to add value to the `Extra Parameters` section in advanced
tab/view. add the following value `--user 99:100`.
This has to happen before you start the container, otherwise it will have the old user id, and
you then have to run the following command from terminal `chown -R 99:100 /mnt/user/appdata/watchstate`.
you then have to run the following command from terminal `chown -R 99:100 /mnt/user/appdata/watchstate`.
### Podman instead of docker
To use this container with `podman` set `compose.yaml` `user` to `0:0`. it will appear to be working as root inside the container, but it will be mapped to the user in which the command was run under.
To use this container with `podman` set `compose.yaml` `user` to `0:0`. it will appear to be working as root inside the
container, but it will be mapped to the user in which the command was run under.
# Management
After starting the container, you can access the WebUI by visiting `http://localhost:8080` in your browser.
At the start you won't see anything as the `WebUI` is decoupled from the WatchState and need to be configured to be able to access the API. In the top right corner, you will see a cogwheel icon, click on it and then Configure the connection settings.
At the start you won't see anything as the `WebUI` is decoupled from the WatchState and need to be configured to be able
to access the API. In the top right corner, you will see a cogwheel icon, click on it and then Configure the connection
settings.
![Connection settings](screenshots/api_settings.png)
@@ -117,11 +138,13 @@ From the host machine, you can run the following command
$ docker exec watchstate console system:apikey
```
Insert the `API key` into the `API Token` field and make sure to set the `API URL` or click the `current page URL` link. If everything is ok, the reset of the navbar will show up.
Insert the `API key` into the `API Token` field and make sure to set the `API URL` or click the `current page URL` link.
If everything is ok, the reset of the navbar will show up.
To add your backends, please click on the help button in the top right corner, and choose which method you want [one-way](guides/one-way-sync.md) or [two-way](guides/two-way-sync.md) sync. and follow the instructions.
To add your backends, please click on the help button in the top right corner, and choose which method you
want [one-way](guides/one-way-sync.md) or [two-way](guides/two-way-sync.md) sync. and follow the instructions.
### Supported import method
### Supported import methods
Currently, the tool supports three methods to import data from backends.
@@ -133,15 +156,22 @@ Currently, the tool supports three methods to import data from backends.
- `Receive events from backends and update the database accordingly.`
> [!NOTE]
> Even if all your backends support webhooks, you should keep import task enabled. This help keep healthy relationship and pick up any missed events. For more information please check the FAQ related to webhooks limitations.
> Even if all your backends support webhooks, you should keep import task enabled. This help keep healthy relationship
> and pick up any missed events. For more information please check the [webhook guide](/guides/webhooks.md) to
> understand
> webhooks limitations.
# FAQ
Take look at this [frequently asked questions](FAQ.md) page, or the [guides](guides/) for more in-depth guides on how to setup things.
Take look at this [frequently asked questions](FAQ.md) page, or the [guides](/guides/) for more in-depth guides on how
to
configure things.
# Social channels
If you have short or quick questions, or just want to chat with other users, feel free to join this [discord server](https://discord.gg/haUXHJyj6Y), keep in mind it's solo project, as such it might take me a bit of time to reply to questions, I operate in `UTC+3` timezone.
If you have short or quick questions, or just want to chat with other users, feel free to join
this [discord server](https://discord.gg/haUXHJyj6Y), keep in mind it's solo project, as such it might take me a bit of
time to reply to questions, I operate in `UTC+3` timezone.
# Donate

View File

@@ -138,6 +138,7 @@ return (function () {
$config['http'] = [
'default' => [
'maxRetries' => (int)env('WS_HTTP_MAX_RETRIES', 3),
'sync_requests' => (bool)env('WS_HTTP_SYNC_REQUESTS', false),
'options' => [
'headers' => [
'User-Agent' => ag($config, 'name') . '/' . getAppVersion(),

View File

@@ -229,6 +229,11 @@ return (function () {
},
'mask' => true,
],
[
'key' => 'WS_HTTP_SYNC_REQUESTS',
'description' => 'Whether to send backend requests in parallel or sequentially.',
'type' => 'bool',
],
];
$validateCronExpression = function (string $value): string {

View File

@@ -44,7 +44,7 @@ use Symfony\Contracts\HttpClient\HttpClientInterface;
return (function (): array {
return [
iLogger::class => [
'class' => fn () => new Logger(name: 'logger', processors: [new LogMessageProcessor()])
'class' => fn() => new Logger(name: 'logger', processors: [new LogMessageProcessor()])
],
HttpClientInterface::class => [
@@ -63,6 +63,7 @@ return (function (): array {
iLogger::class,
],
],
RetryableHttpClient::class => [
'class' => function (HttpClientInterface $client, iLogger $logger): RetryableHttpClient {
return new RetryableHttpClient(
@@ -76,6 +77,7 @@ return (function (): array {
iLogger::class,
],
],
LogSuppressor::class => [
'class' => function (): LogSuppressor {
$suppress = [];
@@ -90,11 +92,11 @@ return (function (): array {
],
StateInterface::class => [
'class' => fn () => new StateEntity([])
'class' => fn() => new StateEntity([])
],
QueueRequests::class => [
'class' => fn () => new QueueRequests()
'class' => fn() => new QueueRequests()
],
Redis::class => [
@@ -189,16 +191,16 @@ return (function (): array {
],
UriInterface::class => [
'class' => fn () => new Uri(''),
'class' => fn() => new Uri(''),
'shared' => false,
],
InputInterface::class => [
'class' => fn (): InputInterface => new ArgvInput()
'class' => fn(): InputInterface => new ArgvInput()
],
OutputInterface::class => [
'class' => fn (): OutputInterface => new ConsoleOutput()
'class' => fn(): OutputInterface => new ConsoleOutput()
],
PDO::class => [
@@ -226,7 +228,7 @@ return (function (): array {
],
DBLayer::class => [
'class' => fn (PDO $pdo): DBLayer => new DBLayer($pdo),
'class' => fn(PDO $pdo): DBLayer => new DBLayer($pdo),
'args' => [
PDO::class,
],
@@ -290,16 +292,16 @@ return (function (): array {
],
iImport::class => [
'class' => fn (iImport $mapper): iImport => $mapper,
'class' => fn(iImport $mapper): iImport => $mapper,
'args' => [MemoryMapper::class],
],
EventDispatcherInterface::class => [
'class' => fn (): EventDispatcher => new EventDispatcher(),
'class' => fn(): EventDispatcher => new EventDispatcher(),
],
UserContext::class => [
'class' => fn (iCache $cache, iImport $mapper, iDB $db): UserContext => new UserContext(
'class' => fn(iCache $cache, iImport $mapper, iDB $db): UserContext => new UserContext(
name: 'main',
config: new ConfigFile(
file: Config::get('backends_file'),

View File

@@ -1,4 +1,4 @@
<style scoped>
<style>
.markdown-alert {
padding: 0 1em;
margin-bottom: 16px;

View File

@@ -71,7 +71,7 @@ final class Index
$data = DataUtil::fromArray($request->getQueryParams());
$es = fn (string $val) => $db->escapeIdentifier($val, true);
$es = fn(string $val) => $db->escapeIdentifier($val, true);
$filters = [];
$page = (int)$data->get('page', 1);
@@ -426,7 +426,7 @@ final class Index
[
'key' => 'watched',
'description' => 'Search using watched status.',
'type' => [ '0', '1'],
'type' => ['0', '1'],
],
[
'key' => 'via',
@@ -579,7 +579,7 @@ final class Index
'ffprobe' => $data,
'subtitles' => array_filter(
findSideCarFiles(new SplFileInfo($file)),
fn ($sideCar) => isset(Subtitle::FORMATS[getExtension($sideCar)])
fn($sideCar) => isset(Subtitle::FORMATS[getExtension($sideCar)])
)
];
}

View File

@@ -0,0 +1,56 @@
<?php
declare(strict_types=1);
namespace App\Backends\Common;
use App\Libs\Enums\Http\Method;
use App\Libs\Uri;
use Closure;
final readonly class Request
{
public readonly string $id;
/**
* Wrap client requests into object.
*
* @param Method $method The HTTP method to use.
* @param Uri|string $url The URL to send the request to.
* @param array $options The options to pass to the request.
* @param callable|null $success The callback to call on successful response.
* @param callable|null $error The callback to call on error response.
* @param array $extras An array that can contain anything. Should be rarely used.
*/
public function __construct(
public Method $method,
public Uri|string $url,
public array $options = [],
public Closure|null $success = null,
public Closure|null $error = null,
public array $extras = [],
) {
$this->id = generateUUID();
}
public function toRequest(): array
{
return [
'method' => $this->method->value,
'url' => (string)$this->url,
'options' => $this->options,
];
}
public function __debugInfo()
{
return [
'id' => $this->id,
'method' => $this->method,
'url' => $this->url,
'options' => $this->options,
'success' => $this->success,
'error' => $this->error,
];
}
}

View File

@@ -7,6 +7,7 @@ namespace App\Backends\Jellyfin\Action;
use App\Backends\Common\CommonTrait;
use App\Backends\Common\Context;
use App\Backends\Common\GuidInterface as iGuid;
use App\Backends\Common\Request;
use App\Backends\Common\Response;
use App\Backends\Jellyfin\JellyfinActionTrait;
use App\Backends\Jellyfin\JellyfinClient as JFC;
@@ -89,12 +90,12 @@ class Import
): Response {
return $this->tryResponse(
context: $context,
fn: fn () => $this->getLibraries(
fn: fn() => $this->getLibraries(
context: $context,
handle: fn (array $logContext = []) => fn (iResponse $response) => $this->handle(
handle: fn(array $logContext = []) => fn(iResponse $response) => $this->handle(
context: $context,
response: $response,
callback: fn (array $item, array $logContext = []) => $this->process(
callback: fn(array $item, array $logContext = []) => $this->process(
context: $context,
guid: $guid,
mapper: $mapper,
@@ -104,26 +105,15 @@ class Import
),
logContext: $logContext
),
error: fn (array $logContext = []) => fn (Throwable $e) => $this->logger->error(
error: fn(array $logContext = []) => fn(Throwable $e) => $this->logger->error(
message: "{action}: Exception '{error.kind}' was thrown unhandled during '{client}: {user}@{backend}' library '{library.title}' request. '{error.message}' at '{error.file}:{error.line}'.",
context: [
'action' => property_exists($this, 'action') ? $this->action : 'import',
'backend' => $context->backendName,
'client' => $context->clientName,
'user' => $context->userContext->name,
'error' => [
'kind' => $e::class,
'line' => $e->getLine(),
'message' => $e->getMessage(),
'file' => after($e->getFile(), ROOT_PATH),
],
...$logContext,
'exception' => [
'file' => $e->getFile(),
'line' => $e->getLine(),
'kind' => get_class($e),
'message' => $e->getMessage(),
],
...exception_log($e),
]
),
opts: $opts
@@ -215,22 +205,7 @@ class Import
$this->logger->error(
...lw(
message: "{action}: Request for '{client}: {user}@{backend}' libraries has failed. '{error.kind}' with message '{error.message}' at '{error.file}:{error.line}'.",
context: [
...$rContext,
'error' => [
'line' => $e->getLine(),
'kind' => $e::class,
'message' => $e->getMessage(),
'file' => after($e->getFile(), ROOT_PATH),
],
'exception' => [
'file' => $e->getFile(),
'line' => $e->getLine(),
'kind' => $e::class,
'message' => $e->getMessage(),
'trace' => $e->getTrace(),
],
],
context: [...$rContext, ...exception_log($e)],
e: $e
)
);
@@ -240,21 +215,7 @@ class Import
$this->logger->error(
...lw(
message: "{action}: Request for '{client}: {user}@{backend}' libraries returned with invalid body. '{error.message}' at '{error.file}:{error.line}'.",
context: [
...$rContext,
'error' => [
'line' => $e->getLine(),
'kind' => $e::class,
'message' => $e->getMessage(),
'file' => after($e->getFile(), ROOT_PATH),
],
'exception' => [
'file' => $e->getFile(),
'line' => $e->getLine(),
'message' => $e->getMessage(),
'trace' => $e->getTrace(),
],
],
context: [...$rContext, ...exception_log($e)],
e: $e
)
);
@@ -264,22 +225,7 @@ class Import
$this->logger->error(
...lw(
message: "{action}: Exception '{error.kind}' was thrown unhandled during '{client}: {user}@{backend}' request for libraries. '{error.message}' at '{error.file}:{error.line}'.",
context: [
...$rContext,
'error' => [
'kind' => $e::class,
'line' => $e->getLine(),
'message' => $e->getMessage(),
'file' => after($e->getFile(), ROOT_PATH),
],
'exception' => [
'file' => $e->getFile(),
'line' => $e->getLine(),
'kind' => get_class($e),
'message' => $e->getMessage(),
'trace' => $e->getTrace(),
],
],
context: [...$rContext, ...exception_log($e)],
e: $e
)
);
@@ -288,7 +234,7 @@ class Import
}
if (null !== ($ignoreIds = ag($context->options, 'ignore', null))) {
$ignoreIds = array_map(fn ($v) => trim($v), explode(',', (string)$ignoreIds));
$ignoreIds = array_map(fn($v) => trim($v), explode(',', (string)$ignoreIds));
}
$limitLibraryId = ag($opts, Options::ONLY_LIBRARY_ID, null);
@@ -354,22 +300,7 @@ class Import
$this->logger->error(
...lw(
message: "{action}: Request for '{client}: {user}@{backend}' - '{library.title}' items count failed. '{error.kind}' with message '{error.message}' at '{error.file}:{error.line}'.",
context: [
'error' => [
'line' => $e->getLine(),
'kind' => $e::class,
'message' => $e->getMessage(),
'file' => after($e->getFile(), ROOT_PATH),
],
'exception' => [
'file' => $e->getFile(),
'line' => $e->getLine(),
'kind' => get_class($e),
'message' => $e->getMessage(),
'trace' => $e->getTrace(),
],
...$logContext,
],
context: [...$logContext, ...exception_log($e)],
e: $e
)
);
@@ -378,22 +309,7 @@ class Import
$this->logger->error(
...lw(
message: "{action}: Exception '{error.kind}' was thrown unhandled during '{client}: {user}@{backend}' - '{library.title}' items count request. '{error.message}' at '{error.file}:{error.line}'.",
context: [
'error' => [
'kind' => $e::class,
'line' => $e->getLine(),
'message' => $e->getMessage(),
'file' => after($e->getFile(), ROOT_PATH),
],
'exception' => [
'file' => $e->getFile(),
'line' => $e->getLine(),
'kind' => get_class($e),
'message' => $e->getMessage(),
'trace' => $e->getTrace(),
],
...$logContext,
],
context: [...$logContext, ...exception_log($e)],
e: $e
)
);
@@ -440,22 +356,7 @@ class Import
$this->logger->error(
...lw(
message: "{action}: Request for '{client}: {user}@{backend}' - '{library.title}' total items has failed. '{error.kind}' '{error.message}' at '{error.file}:{error.line}'.",
context: [
'error' => [
'kind' => $e::class,
'line' => $e->getLine(),
'message' => $e->getMessage(),
'file' => after($e->getFile(), ROOT_PATH),
],
'exception' => [
'file' => $e->getFile(),
'line' => $e->getLine(),
'kind' => get_class($e),
'message' => $e->getMessage(),
'trace' => $e->getTrace(),
],
...$logContext,
],
context: [...$logContext, ...exception_log($e)],
e: $e
),
);
@@ -464,22 +365,7 @@ class Import
$this->logger->error(
...lw(
message: "Exception '{error.kind}' was thrown unhandled during '{client}: {user}@{backend}' requests for items count. '{error.message}' at '{error.file}:{error.line}'.",
context: [
'error' => [
'kind' => $e::class,
'line' => $e->getLine(),
'message' => $e->getMessage(),
'file' => after($e->getFile(), ROOT_PATH),
],
'exception' => [
'file' => $e->getFile(),
'line' => $e->getLine(),
'kind' => get_class($e),
'message' => $e->getMessage(),
'trace' => $e->getTrace(),
],
...$logContext,
],
context: [...$logContext, ...exception_log($e)],
e: $e
)
);
@@ -537,60 +423,19 @@ class Import
);
try {
$requests[] = $this->http->request(
$requests[] = new Request(
method: Method::GET,
url: (string)$url,
options: array_replace_recursive($context->backendHeaders, [
'user_data' => [
'ok' => $handle($logContext),
'error' => $error($logContext),
]
])
url: $url,
options: $context->backendHeaders,
success: $handle($logContext),
error: $error($logContext),
extras: ['logContext' => $logContext, iHttp::class => $this->http],
);
} catch (iException $e) {
$this->logger->error(
...lw(
message: "{action}: Request for '{client}: {user}@{backend}' - '{library.title}' series external ids has failed. '{error.kind}' with message '{error.message}' at '{error.file}:{error.line}'.",
context: [
'error' => [
'line' => $e->getLine(),
'kind' => $e::class,
'message' => $e->getMessage(),
'file' => after($e->getFile(), ROOT_PATH),
],
'exception' => [
'file' => $e->getFile(),
'line' => $e->getLine(),
'kind' => get_class($e),
'message' => $e->getMessage(),
'trace' => $e->getTrace(),
],
...$logContext,
],
e: $e
)
);
continue;
} catch (Throwable $e) {
$this->logger->error(
...lw(
message: "{action}: Exception '{error.kind}' was thrown unhandled during '{client}: {user}@{backend}' '{library.title}' series external ids request. '{error.message}' at '{error.file}:{error.line}'.",
context: [
'error' => [
'kind' => $e::class,
'line' => $e->getLine(),
'message' => $e->getMessage(),
'file' => after($e->getFile(), ROOT_PATH),
],
'exception' => [
'file' => $e->getFile(),
'line' => $e->getLine(),
'kind' => get_class($e),
'message' => $e->getMessage(),
'trace' => $e->getTrace(),
],
...$logContext,
],
context: [...$logContext, ...exception_log($e)],
e: $e
),
);
@@ -676,60 +521,19 @@ class Import
context: $logContext,
);
$requests[] = $this->http->request(
$requests[] = new Request(
method: Method::GET,
url: (string)$url,
options: array_replace_recursive($context->backendHeaders, [
'user_data' => [
'ok' => $handle($logContext),
'error' => $error($logContext),
]
])
url: $url,
options: $context->backendHeaders,
success: $handle($logContext),
error: $error($logContext),
extras: ['logContext' => $logContext, iHttp::class => $this->http],
);
} catch (iException $e) {
$this->logger->error(
...lw(
message: "{action}: Request for '{client}: {user}@{backend}' '{library.title} {segment.number}/{segment.of}' content list has failed. {error.kind}' with message '{error.message}' at '{error.file}:{error.line}'.",
context: [
'error' => [
'line' => $e->getLine(),
'kind' => $e::class,
'message' => $e->getMessage(),
'file' => after($e->getFile(), ROOT_PATH),
],
...$logContext,
'exception' => [
'file' => $e->getFile(),
'line' => $e->getLine(),
'kind' => get_class($e),
'message' => $e->getMessage(),
'trace' => $e->getTrace(),
],
],
e: $e
)
);
continue;
} catch (Throwable $e) {
$this->logger->error(
...lw(
message: "{action}: Exception '{error.kind}' was thrown unhandled during '{client}: {user}@{backend}' '{library.title} {segment.number}/{segment.of}' content list request. '{error.message}' at '{error.file}:{error.line}'.",
context: [
'error' => [
'kind' => $e::class,
'line' => $e->getLine(),
'message' => $e->getMessage(),
'file' => after($e->getFile(), ROOT_PATH),
],
...$logContext,
'exception' => [
'file' => $e->getFile(),
'line' => $e->getLine(),
'kind' => get_class($e),
'message' => $e->getMessage(),
'trace' => $e->getTrace(),
],
],
context: [...$logContext, ...exception_log($e)],
e: $e
)
);
@@ -866,23 +670,7 @@ class Import
$this->logger->error(
...lw(
message: "{action}: Exception '{error.kind}' was thrown unhandled during '{client}: {user}@{backend}' parsing '{library.title} {segment.number}/{segment.of}' item response. '{error.message}' at '{error.file}:{error.line}'.",
context: [
'error' => [
'kind' => $e::class,
'line' => $e->getLine(),
'message' => $e->getMessage(),
'file' => after($e->getFile(), ROOT_PATH),
],
'entity' => $entity,
'exception' => [
'kind' => $e::class,
'line' => $e->getLine(),
'trace' => $e->getTrace(),
'message' => $e->getMessage(),
'file' => after($e->getFile(), ROOT_PATH),
],
...$logContext,
],
context: ['entity' => $entity, ...$logContext, ...exception_log($e)],
e: $e
)
);
@@ -892,22 +680,7 @@ class Import
$this->logger->error(
...lw(
message: "{action}: Exception '{error.kind}' was thrown unhandled during '{client}: {user}@{backend}' parsing of '{library.title} {segment.number}/{segment.of}' response. '{error.message}' at '{error.file}:{error.line}'.",
context: [
'error' => [
'kind' => $e::class,
'line' => $e->getLine(),
'message' => $e->getMessage(),
'file' => after($e->getFile(), ROOT_PATH),
],
'exception' => [
'line' => $e->getLine(),
'kind' => get_class($e),
'message' => $e->getMessage(),
'trace' => $e->getTrace(),
'file' => after($e->getFile(), ROOT_PATH),
],
...$logContext,
],
context: [...$logContext, ...exception_log($e)],
e: $e
)
);
@@ -1118,22 +891,7 @@ class Import
$this->logger->error(
...lw(
message: "{action}: Exception '{error.kind}' occurred during '{client}: {user}@{backend}' - '{library.title}' - '{item.id}: {item.title}' entity creation. '{error.message}' at '{error.file}:{error.line}'.",
context: [
'error' => [
'kind' => $e::class,
'line' => $e->getLine(),
'message' => $e->getMessage(),
'file' => after($e->getFile(), ROOT_PATH),
],
...$logContext,
'exception' => [
'line' => $e->getLine(),
'kind' => get_class($e),
'message' => $e->getMessage(),
'trace' => $e->getTrace(),
'file' => after($e->getFile(), ROOT_PATH),
],
],
context: [...$logContext, ...exception_log($e)],
e: $e
)
);
@@ -1169,22 +927,7 @@ class Import
$this->logger->error(
...lw(
message: "{action}: Exception '{error.kind}' was thrown unhandled during '{client}: {user}@{backend}' - '{library.title}' - '{item.title}' {action}. '{error.message}' at '{error.file}:{error.line}'.",
context: [
'error' => [
'kind' => $e::class,
'line' => $e->getLine(),
'message' => $e->getMessage(),
'file' => after($e->getFile(), ROOT_PATH),
],
...$logContext,
'exception' => [
'file' => $e->getFile(),
'line' => $e->getLine(),
'kind' => get_class($e),
'message' => $e->getMessage(),
'trace' => $e->getTrace(),
],
],
context: [...$logContext, ...exception_log($e)],
e: $e
)
);

View File

@@ -7,6 +7,7 @@ namespace App\Backends\Plex\Action;
use App\Backends\Common\CommonTrait;
use App\Backends\Common\Context;
use App\Backends\Common\GuidInterface as iGuid;
use App\Backends\Common\Request;
use App\Backends\Common\Response;
use App\Backends\Plex\PlexActionTrait;
use App\Backends\Plex\PlexClient;
@@ -70,12 +71,12 @@ class Import
): Response {
return $this->tryResponse(
context: $context,
fn: fn () => $this->getLibraries(
fn: fn() => $this->getLibraries(
context: $context,
handle: fn (array $logContext = []) => fn (iResponse $response) => $this->handle(
handle: fn(array $logContext = []) => fn(iResponse $response) => $this->handle(
context: $context,
response: $response,
callback: fn (array $item, array $logContext = []) => $this->process(
callback: fn(array $item, array $logContext = []) => $this->process(
context: $context,
guid: $guid,
mapper: $mapper,
@@ -85,26 +86,15 @@ class Import
),
logContext: $logContext
),
error: fn (array $logContext = []) => fn (Throwable $e) => $this->logger->error(
error: fn(array $logContext = []) => fn(Throwable $e) => $this->logger->error(
message: "{action}: Exception '{error.kind}' was thrown unhandled during '{client}: {user}@{backend}' library '{library.title}' request. '{error.message}' at '{error.file}:{error.line}'.",
context: [
'action' => $this->action,
'backend' => $context->backendName,
'client' => $context->clientName,
'user' => $context->userContext->name,
'error' => [
'kind' => $e::class,
'line' => $e->getLine(),
'message' => $e->getMessage(),
'file' => after($e->getFile(), ROOT_PATH),
],
...$logContext,
'exception' => [
'file' => $e->getFile(),
'line' => $e->getLine(),
'kind' => get_class($e),
'message' => $e->getMessage(),
],
...exception_log($e)
]
),
opts: $opts,
@@ -195,22 +185,7 @@ class Import
$this->logger->error(
...lw(
message: "{action}: Request for '{client}: {user}@{backend}' libraries has failed. '{error.kind}' with message '{error.message}' at '{error.file}:{error.line}'.",
context: [
...$rContext,
'error' => [
'line' => $e->getLine(),
'kind' => $e::class,
'message' => $e->getMessage(),
'file' => after($e->getFile(), ROOT_PATH),
],
'exception' => [
'file' => $e->getFile(),
'line' => $e->getLine(),
'kind' => $e::class,
'message' => $e->getMessage(),
'trace' => $e->getTrace(),
],
],
context: [...$rContext, ...exception_log($e)],
e: $e
)
);
@@ -220,21 +195,7 @@ class Import
$this->logger->error(
...lw(
message: "{action}: Request for '{client}: {user}@{backend}' libraries returned with invalid body. '{error.kind}' with message '{error.message}' at '{error.file}:{error.line}'.",
context: [
...$rContext,
'error' => [
'line' => $e->getLine(),
'kind' => $e::class,
'message' => $e->getMessage(),
'file' => after($e->getFile(), ROOT_PATH),
],
'exception' => [
'file' => $e->getFile(),
'line' => $e->getLine(),
'message' => $e->getMessage(),
'trace' => $e->getTrace(),
],
],
context: [...$rContext, ...exception_log($e)],
e: $e
)
);
@@ -244,22 +205,7 @@ class Import
$this->logger->error(
...lw(
message: "{action}: Exception '{error.kind}' was thrown unhandled during '{client}: {user}@{backend}' request for libraries. {error.message} at '{error.file}:{error.line}'.",
context: [
...$rContext,
'error' => [
'kind' => $e::class,
'line' => $e->getLine(),
'message' => $e->getMessage(),
'file' => after($e->getFile(), ROOT_PATH),
],
'exception' => [
'file' => $e->getFile(),
'line' => $e->getLine(),
'kind' => get_class($e),
'message' => $e->getMessage(),
'trace' => $e->getTrace(),
],
],
context: [...$rContext, ...exception_log($e)],
e: $e
)
);
@@ -268,7 +214,7 @@ class Import
}
if (null !== ($ignoreIds = ag($context->options, 'ignore', null))) {
$ignoreIds = array_map(fn ($v) => (int)trim($v), explode(',', (string)$ignoreIds));
$ignoreIds = array_map(fn($v) => (int)trim($v), explode(',', (string)$ignoreIds));
}
$limitLibraryId = ag($opts, Options::ONLY_LIBRARY_ID, null);
@@ -382,23 +328,7 @@ class Import
$this->logger->error(
...lw(
message: "{action}: Request for '{client}: {user}@{backend}' - '{library.title}' items count has failed. '{error.kind}' with message '{error.message}' at '{error.file}:{error.line}'.",
context: [
...$rContext,
'error' => [
'line' => $e->getLine(),
'kind' => $e::class,
'message' => $e->getMessage(),
'file' => after($e->getFile(), ROOT_PATH),
],
'exception' => [
'file' => $e->getFile(),
'line' => $e->getLine(),
'kind' => get_class($e),
'message' => $e->getMessage(),
'trace' => $e->getTrace(),
],
...$logContext,
],
context: [...$rContext, ...exception_log($e), ...$logContext],
e: $e
)
);
@@ -407,23 +337,7 @@ class Import
$this->logger->error(
...lw(
message: "{action}: Exception '{error.kind}' was thrown unhandled during '{client}: {user}@{backend}' request for libraries. {error.message} at '{error.file}:{error.line}'.",
context: [
...$rContext,
'error' => [
'kind' => $e::class,
'line' => $e->getLine(),
'message' => $e->getMessage(),
'file' => after($e->getFile(), ROOT_PATH),
],
'exception' => [
'file' => $e->getFile(),
'line' => $e->getLine(),
'kind' => get_class($e),
'message' => $e->getMessage(),
'trace' => $e->getTrace(),
],
...$logContext,
],
context: [...$rContext, ...exception_log($e), ...$logContext,],
e: $e
)
);
@@ -471,22 +385,7 @@ class Import
$this->logger->error(
...lw(
message: "{action}: Request for '{client}: {user}@{backend}' - '{library.title}' total items has failed. '{error.kind}' '{error.message}' at '{error.file}:{error.line}'.",
context: [
...$logContext,
'error' => [
'kind' => $e::class,
'line' => $e->getLine(),
'message' => $e->getMessage(),
'file' => after($e->getFile(), ROOT_PATH),
],
'exception' => [
'file' => $e->getFile(),
'line' => $e->getLine(),
'kind' => get_class($e),
'message' => $e->getMessage(),
'trace' => $e->getTrace(),
],
],
context: [...$logContext, ...exception_log($e)],
e: $e
)
);
@@ -495,22 +394,7 @@ class Import
$this->logger->error(
...lw(
message: "{action}: Exception '{error.kind}' was thrown unhandled during '{client}: {user}@{backend}' request for items count. {error.message} at '{error.file}:{error.line}'.",
context: [
...$logContext,
'error' => [
'kind' => $e::class,
'line' => $e->getLine(),
'message' => $e->getMessage(),
'file' => after($e->getFile(), ROOT_PATH),
],
'exception' => [
'file' => $e->getFile(),
'line' => $e->getLine(),
'kind' => get_class($e),
'message' => $e->getMessage(),
'trace' => $e->getTrace(),
],
],
context: [...$logContext, ...exception_log($e)],
e: $e
)
);
@@ -586,64 +470,24 @@ class Import
context: $logContext,
);
$requests[] = $this->http->request(
$requests[] = new Request(
method: Method::GET,
url: (string)$url,
url: $url,
options: array_replace_recursive($context->backendHeaders, [
'headers' => [
'X-Plex-Container-Size' => $segmentSize,
'X-Plex-Container-Start' => $i < 1 ? 0 : ($segmentSize * $i),
],
'user_data' => [
'ok' => $handle($logContext),
'error' => $error($logContext),
]
])
]),
success: $handle($logContext),
error: $error($logContext),
extras: ['logContext' => $logContext, iHttp::class => $this->http]
);
} catch (ExceptionInterface $e) {
$this->logger->error(
...lw(
message: "{action}: Request for '{client}: {user}@{backend}' - '{library.title} {segment.number}/{segment.of}' series external ids has failed. '{error.kind}' with message '{error.message}' at '{error.file}:{error.line}'.",
context: [
...$logContext,
'error' => [
'line' => $e->getLine(),
'kind' => $e::class,
'message' => $e->getMessage(),
'file' => after($e->getFile(), ROOT_PATH),
],
'exception' => [
'file' => $e->getFile(),
'line' => $e->getLine(),
'kind' => get_class($e),
'message' => $e->getMessage(),
'trace' => $e->getTrace(),
],
],
e: $e
)
);
continue;
} catch (Throwable $e) {
$this->logger->error(
...lw(
message: "{action}: Exception '{error.kind}' was thrown unhandled during '{client}: {user}@{backend}' '{library.title} {segment.number}/{segment.of}' series external ids request. {error.message} at '{error.file}:{error.line}'.",
context: [
...$logContext,
'error' => [
'kind' => $e::class,
'line' => $e->getLine(),
'message' => $e->getMessage(),
'file' => after($e->getFile(), ROOT_PATH),
],
'exception' => [
'file' => $e->getFile(),
'line' => $e->getLine(),
'kind' => get_class($e),
'message' => $e->getMessage(),
'trace' => $e->getTrace(),
],
],
context: [...$logContext, ...exception_log($e)],
e: $e
)
);
@@ -730,64 +574,24 @@ class Import
context: $logContext,
);
$requests[] = $this->http->request(
$requests[] = new Request(
method: Method::GET,
url: (string)$url,
url: $url,
options: array_replace_recursive($context->backendHeaders, [
'headers' => [
'X-Plex-Container-Size' => $segmentSize,
'X-Plex-Container-Start' => $i < 1 ? 0 : ($segmentSize * $i),
],
'user_data' => [
'ok' => $handle($logContext),
'error' => $error($logContext),
]
]),
success: $handle($logContext),
error: $error($logContext),
extras: ['logContext' => $logContext, iHttp::class => $this->http]
);
} catch (ExceptionInterface $e) {
$this->logger->error(
...lw(
message: "{action}: Request for '{client}: {user}@{backend}' - '{library.title} {segment.number}/{segment.of}' content list has failed. {error.kind}' with message '{error.message}' at '{error.file}:{error.line}'.",
context: [
...$logContext,
'error' => [
'line' => $e->getLine(),
'kind' => $e::class,
'message' => $e->getMessage(),
'file' => after($e->getFile(), ROOT_PATH),
],
'exception' => [
'file' => $e->getFile(),
'line' => $e->getLine(),
'kind' => get_class($e),
'message' => $e->getMessage(),
'trace' => $e->getTrace(),
],
],
e: $e
)
);
continue;
} catch (Throwable $e) {
$this->logger->error(
...lw(
message: "{action}: Exception '{error.kind}' was thrown unhandled during '{client}: {user}@{backend}' - '{library.title} {segment.number}/{segment.of}' content list request. {error.message} at '{error.file}:{error.line}'.",
context: [
...$logContext,
'error' => [
'kind' => $e::class,
'line' => $e->getLine(),
'message' => $e->getMessage(),
'file' => after($e->getFile(), ROOT_PATH),
],
'exception' => [
'file' => $e->getFile(),
'line' => $e->getLine(),
'kind' => get_class($e),
'message' => $e->getMessage(),
'trace' => $e->getTrace(),
],
],
context: [...$logContext, ...exception_log($e)],
e: $e
)
);
@@ -875,23 +679,7 @@ class Import
$this->logger->error(
...lw(
message: "{action}: Exception '{error.kind}' was thrown unhandled during '{client}: {user}@{backend}' parsing '{library.title} {segment.number}/{segment.of}' item response. {error.message} at '{error.file}:{error.line}'.",
context: [
...$logContext,
'error' => [
'kind' => $e::class,
'line' => $e->getLine(),
'message' => $e->getMessage(),
'file' => after($e->getFile(), ROOT_PATH),
],
'entity' => $entity,
'exception' => [
'kind' => $e::class,
'line' => $e->getLine(),
'trace' => $e->getTrace(),
'message' => $e->getMessage(),
'file' => after($e->getFile(), ROOT_PATH),
],
],
context: [...$logContext, ...exception_log($e), 'entity' => $entity],
e: $e
)
);
@@ -901,22 +689,7 @@ class Import
$this->logger->error(
...lw(
message: "{action}: Exception '{error.kind}' was thrown unhandled during '{client}: {user}@{backend}' parsing of '{library.title} {segment.number}/{segment.of}' response. {error.message} at '{error.file}:{error.line}'.",
context: [
...$logContext,
'error' => [
'kind' => $e::class,
'line' => $e->getLine(),
'message' => $e->getMessage(),
'file' => after($e->getFile(), ROOT_PATH),
],
'exception' => [
'line' => $e->getLine(),
'kind' => get_class($e),
'message' => $e->getMessage(),
'trace' => $e->getTrace(),
'file' => after($e->getFile(), ROOT_PATH),
],
],
context: [...$logContext, ...exception_log($e)],
e: $e
)
);
@@ -1055,10 +828,7 @@ class Import
default => throw new InvalidArgumentException(
r(
text: "{action}: Unexpected content type '{type}' was received from '{client}: {user}@{backend}'.",
context: [
...$logContext,
'type' => $type
]
context: [...$logContext, 'type' => $type]
)
),
},
@@ -1170,22 +940,7 @@ class Import
$this->logger->error(
...lw(
message: "{action}: Exception '{error.kind}' was thrown unhandled during '{client}: {user}@{backend}' - '{library.title}' - '{item.title}' item process. {error.message} at '{error.file}:{error.line}'.",
context: [
...$logContext,
'error' => [
'kind' => $e::class,
'line' => $e->getLine(),
'message' => $e->getMessage(),
'file' => after($e->getFile(), ROOT_PATH),
],
'exception' => [
'file' => $e->getFile(),
'line' => $e->getLine(),
'kind' => get_class($e),
'message' => $e->getMessage(),
'trace' => $e->getTrace(),
],
],
context: [...$logContext, ...exception_log($e)],
e: $e
)
);

View File

@@ -4,9 +4,12 @@ declare(strict_types=1);
namespace App\Commands\State;
use App\Backends\Common\Request;
use App\Command;
use App\Libs\Attributes\DI\Inject;
use App\Libs\Attributes\Route\Cli;
use App\Libs\Config;
use App\Libs\Extends\RetryableHttpClient;
use App\Libs\Extends\StreamLogHandler;
use App\Libs\LogSuppressor;
use App\Libs\Mappers\Import\DirectMapper;
@@ -20,8 +23,7 @@ use Psr\Log\LoggerInterface as iLogger;
use Symfony\Component\Console\Input\InputInterface as iInput;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface as iOutput;
use Symfony\Contracts\HttpClient\ResponseInterface;
use Throwable;
use Symfony\Contracts\HttpClient\HttpClientInterface as iHttp;
/**
* Class BackupCommand
@@ -44,7 +46,9 @@ class BackupCommand extends Command
public function __construct(
private DirectMapper $mapper,
private iLogger $logger,
private LogSuppressor $suppressor
private LogSuppressor $suppressor,
#[Inject(RetryableHttpClient::class)]
private iHttp $http,
) {
set_time_limit(0);
ini_set('memory_limit', '-1');
@@ -67,6 +71,18 @@ class BackupCommand extends Command
'If this flag is used, backups will not be removed by system:purge task.'
)
->addOption('dry-run', null, InputOption::VALUE_NONE, 'No actions will be committed.')
->addOption(
'sync-requests',
null,
InputOption::VALUE_NONE,
'Send one request at a time instead of all at once. note: Slower but more reliable.'
)
->addOption(
'async-requests',
null,
InputOption::VALUE_NONE,
'Send all requests at once. note: Faster but less reliable. Default.'
)
->addOption('timeout', null, InputOption::VALUE_REQUIRED, 'Set request timeout in seconds.')
->addOption(
'select-backend',
@@ -326,7 +342,7 @@ class BackupCommand extends Command
]);
}
/** @var array<array-key,ResponseInterface> $queue */
/** @var array<array-key,Request> $queue */
$queue = [];
foreach ($list as $name => &$backend) {
@@ -399,30 +415,27 @@ class BackupCommand extends Command
unset($backend);
if (false === ($syncRequests = $input->getOption('sync-requests'))) {
$syncRequests = (bool)Config::get('http.default.sync_requests', false);
}
if (true === $input->getOption('async-requests')) {
$syncRequests = false;
}
$start = microtime(true);
$this->logger->notice("SYSTEM: Waiting on '{total}' requests for '{user}: {backends}' backends.", [
$this->logger->notice("SYSTEM: Waiting on '{total}' {sync}requests for '{user}: {backends}' backends.", [
'user' => $userContext->name,
'total' => number_format(count($queue)),
'backends' => implode(', ', array_keys($list)),
'sync' => $syncRequests ? 'sync ' : '',
'memory' => [
'now' => getMemoryUsage(),
'peak' => getPeakMemoryUsage(),
],
]);
foreach ($queue as $_key => $response) {
$requestData = $response->getInfo('user_data');
try {
$requestData['ok']($response);
} catch (Throwable $e) {
$requestData['error']($e);
}
$queue[$_key] = null;
gc_collect_cycles();
}
send_requests(requests: $queue, client: $this->http, sync: $syncRequests, logger: $this->logger);
foreach ($list as $b => $backend) {
if (null === ($backend['fp'] ?? null)) {

View File

@@ -11,6 +11,7 @@ use App\Libs\Attributes\Route\Cli;
use App\Libs\Config;
use App\Libs\Database\DatabaseInterface;
use App\Libs\Entity\StateInterface as iState;
use App\Libs\Extends\RetryableHttpClient;
use App\Libs\Extends\StreamLogHandler;
use App\Libs\LogSuppressor;
use App\Libs\Mappers\Import\DirectMapper;
@@ -26,6 +27,7 @@ use Psr\Log\LoggerInterface as iLogger;
use Symfony\Component\Console\Input\InputInterface as iInput;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface as iOutput;
use Symfony\Contracts\HttpClient\HttpClientInterface as iHttp;
use Throwable;
/**
@@ -55,6 +57,8 @@ class ExportCommand extends Command
private QueueRequests $queue,
private iLogger $logger,
private LogSuppressor $suppressor,
#[Inject(RetryableHttpClient::class)]
private iHttp $http,
) {
set_time_limit(0);
ini_set('memory_limit', '-1');
@@ -71,6 +75,18 @@ class ExportCommand extends Command
->setDescription('Export play state to backends.')
->addOption('force-full', 'f', InputOption::VALUE_NONE, 'Force full export. Ignore last export date.')
->addOption('dry-run', null, InputOption::VALUE_NONE, 'Do not commit changes to backends.')
->addOption(
'sync-requests',
null,
InputOption::VALUE_NONE,
'Send one request at a time instead of all at once. note: Slower but more reliable.'
)
->addOption(
'async-requests',
null,
InputOption::VALUE_NONE,
'Send all requests at once. note: Faster but less reliable. Default.'
)
->addOption('timeout', null, InputOption::VALUE_REQUIRED, 'Set request timeout in seconds.')
->addOption('user', 'u', InputOption::VALUE_REQUIRED, 'Export to this specific user. Default all users.')
->addOption(
@@ -147,6 +163,14 @@ class ExportCommand extends Command
}
}
if (false === ($syncRequests = $input->getOption('sync-requests'))) {
$syncRequests = (bool)Config::get('http.default.sync_requests', false);
}
if (true === $input->getOption('async-requests')) {
$syncRequests = false;
}
$selected = $input->getOption('select-backend');
$isCustom = !empty($selected) && count($selected) > 0;
$supported = Config::get('supported', []);
@@ -433,7 +457,13 @@ class ExportCommand extends Command
}
if (count($export) >= 1) {
$this->export($userContext, $export, $input->getOption('dry-run'), $input->getOption('force-full'));
$this->export(
$userContext,
$export,
$input->getOption('dry-run'),
$input->getOption('force-full'),
$syncRequests
);
}
$total = count($this->queue->getQueue());
@@ -587,6 +617,7 @@ class ExportCommand extends Command
array $backends,
bool $inDryMode,
bool $isFull,
bool $syncRequests = false
): void {
$this->logger->notice("Export mode started for '{user}@{backends}'.", [
'user' => $userContext->name,
@@ -663,19 +694,13 @@ class ExportCommand extends Command
}
$start = microtime(true);
$this->logger->notice("SYSTEM: Sending '{total}' play state comparison requests for '{user}'.", [
$this->logger->notice("SYSTEM: Sending '{total}' play state comparison {sync}requests for '{user}'.", [
'total' => count($requests),
'user' => $userContext->name,
'sync' => true === $syncRequests ? 'sync ' : '',
]);
foreach ($requests as $response) {
$requestData = $response->getInfo('user_data');
try {
$requestData['ok']($response);
} catch (Throwable $e) {
$requestData['error']($e);
}
}
send_requests(requests: $requests, client: $this->http, sync: $syncRequests, logger: $this->logger);
$this->logger->notice("Export mode ended for '{user}: {backends}' in '{duration}'s.", [
'user' => $userContext->name,

View File

@@ -4,6 +4,7 @@ declare(strict_types=1);
namespace App\Commands\State;
use App\Backends\Common\Request;
use App\Command;
use App\Libs\Attributes\DI\Inject;
use App\Libs\Attributes\Route\Cli;
@@ -11,6 +12,7 @@ use App\Libs\Config;
use App\Libs\Container;
use App\Libs\Database\DatabaseInterface;
use App\Libs\Entity\StateInterface as iState;
use App\Libs\Extends\RetryableHttpClient;
use App\Libs\Extends\StreamLogHandler;
use App\Libs\LogSuppressor;
use App\Libs\Mappers\Import\DirectMapper;
@@ -28,8 +30,7 @@ use Symfony\Component\Console\Helper\TableSeparator;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Contracts\HttpClient\ResponseInterface;
use Throwable;
use Symfony\Contracts\HttpClient\HttpClientInterface as iHttp;
/**
* Class ImportCommand
@@ -55,7 +56,9 @@ class ImportCommand extends Command
#[Inject(DirectMapper::class)]
private iImport $mapper,
private iLogger $logger,
private LogSuppressor $suppressor
private LogSuppressor $suppressor,
#[Inject(RetryableHttpClient::class)]
private iHttp $http,
) {
set_time_limit(0);
ini_set('memory_limit', '-1');
@@ -72,6 +75,18 @@ class ImportCommand extends Command
->setDescription('Import play state and metadata from backends.')
->addOption('force-full', 'f', InputOption::VALUE_NONE, 'Force full import. Ignore last sync date.')
->addOption('dry-run', null, InputOption::VALUE_NONE, 'Do not commit any changes.')
->addOption(
'sync-requests',
null,
InputOption::VALUE_NONE,
'Send one request at a time instead of all at once. note: Slower but more reliable.'
)
->addOption(
'async-requests',
null,
InputOption::VALUE_NONE,
'Send all requests at once. note: Faster but less reliable. Default.'
)
->addOption('timeout', null, InputOption::VALUE_REQUIRED, 'Set request timeout in seconds.')
->addOption('user', 'u', InputOption::VALUE_REQUIRED, 'Select sub user. Default all users.')
->addOption(
@@ -168,6 +183,14 @@ class ImportCommand extends Command
$mapperOpts[Options::MAPPER_ALWAYS_UPDATE_META] = true;
}
if (false === ($syncRequests = $input->getOption('sync-requests'))) {
$syncRequests = (bool)Config::get('http.default.sync_requests', false);
}
if (true === $input->getOption('async-requests')) {
$syncRequests = false;
}
$this->mapper->setOptions($mapperOpts);
$users = getUsersContext(mapper: $this->mapper, logger: $this->logger, opts: [
@@ -272,7 +295,7 @@ class ImportCommand extends Command
continue;
}
/** @var array<array-key,ResponseInterface> $queue */
/** @var array<array-key,Request> $queue */
$queue = [];
$this->logger->notice(
@@ -369,28 +392,17 @@ class ImportCommand extends Command
unset($backend);
$start = microtime(true);
$this->logger->notice("SYSTEM: Waiting on '{total}' requests for '{user}' backends.", [
$this->logger->notice("SYSTEM: Waiting on '{total}' {sync}requests for '{user}' backends.", [
'user' => $userContext->name,
'total' => number_format(count($queue)),
'sync' => $syncRequests ? 'sync ' : '',
'memory' => [
'now' => getMemoryUsage(),
'peak' => getPeakMemoryUsage(),
],
]);
foreach ($queue as $_key => $response) {
$requestData = $response->getInfo('user_data');
try {
$requestData['ok']($response);
} catch (Throwable $e) {
$requestData['error']($e);
}
$queue[$_key] = null;
gc_collect_cycles();
}
send_requests(requests: $queue, client: $this->http, sync: $syncRequests, logger: $this->logger);
$this->logger->notice(
"SYSTEM: Completed '{total}' requests in '{duration}'s for '{user}' backends. Parsed '{responses.size}' of data.",
@@ -408,7 +420,7 @@ class ImportCommand extends Command
]
);
$queue = $requestData = null;
$queue = null;
$total = count($userContext->mapper);

View File

@@ -1361,3 +1361,31 @@ if (!function_exists('getUserContext')) {
}
}
if (!function_exists('exception_log')) {
/**
* Add standard way to access exception in log context.
*
* @param Throwable $e The exception.
*
* @return array{error: array,excpetion: array} The exception formatted in standard way.
*/
function exception_log(Throwable $e): array
{
return [
'error' => [
'kind' => $e::class,
'line' => $e->getLine(),
'message' => $e->getMessage(),
'file' => after($e->getFile(), ROOT_PATH),
],
'exception' => [
'file' => $e->getFile(),
'line' => $e->getLine(),
'kind' => $e::class,
'message' => $e->getMessage(),
'trace' => $e->getTrace(),
],
];
}
}

View File

@@ -7,6 +7,7 @@ declare(strict_types=1);
use App\Backends\Common\Cache as BackendCache;
use App\Backends\Common\ClientInterface as iClient;
use App\Backends\Common\Context;
use App\Backends\Common\Request;
use App\Libs\Attributes\Route\Cli;
use App\Libs\Attributes\Route\Route;
use App\Libs\Attributes\Scanner\Attributes as AttributesScanner;
@@ -35,6 +36,7 @@ use Psr\SimpleCache\CacheInterface as iCache;
use Symfony\Component\Process\Process;
use Symfony\Component\Yaml\Yaml;
use Symfony\Contracts\HttpClient\Exception\TransportExceptionInterface;
use Symfony\Contracts\HttpClient\HttpClientInterface as iHttp;
use Symfony\Contracts\HttpClient\ResponseStreamInterface;
if (!function_exists('env')) {
@@ -1382,3 +1384,81 @@ if (!function_exists('deepArrayMerge')) {
return $result;
}
}
if (!function_exists('send_requests')) {
/**
* Send requests.
*
* @param array<Request> $requests The requests to send.
* @param iHttp $client The HTTP client to use.
* @param bool $sync Whether to send requests synchronously (optional).
* @param iLogger|null $logger The logger to use (optional).
* @param array $opts Additional options for the client.
*/
function send_requests(
array $requests,
iHttp $client,
bool $sync = false,
iLogger|null $logger = null,
array $opts = []
): void {
try {
if (true === $sync) {
$i = 0;
$total = count($requests);
foreach ($requests as $request) {
try {
$i++;
$start = microtime(true);
$response = ($request->extras[iHttp::class] ?? $client)->request(...$request->toRequest());
($request->success)($response);
} catch (Throwable $e) {
($request->error)($e);
} finally {
$logger?->info("Request '{position}/{total}' completed in '{s}'s.", [
'position' => $i,
'total' => $total,
's' => round(microtime(true) - $start, 3)
]);
}
}
return;
}
$queue = [];
foreach ($requests as $request) {
$r = $request->toRequest();
$r['options'] = array_replace_recursive($r['options'], [
'user_data' => ['ok' => $request->success, 'error' => $request->error]
]);
$queue[] = ($request->extras[iHttp::class] ?? $client)->request(...$r);
}
$i = 0;
foreach ($queue as $_key => $response) {
$i++;
$requestData = $response->getInfo('user_data');
try {
$requestData['ok']($response);
} catch (Throwable $e) {
$requestData['error']($e);
}
$queue[$_key] = null;
if (0 === $i % 50) {
$i = 0;
gc_collect_cycles();
}
}
} finally {
gc_collect_cycles();
}
}
}