Migrated state:progress command into the new events system, thus it's no longer a task but an event. as such, the related ENVS and command are gone.

This commit is contained in:
Abdulmhsen B. A. A.
2024-08-18 21:15:30 +03:00
parent b4c74c1cf9
commit a2ae3e6a33
14 changed files with 385 additions and 431 deletions

2
FAQ.md
View File

@@ -402,7 +402,7 @@ command via CLI.
> [!IMPORTANT]
> for environment variables that has `{TASK}` tag, you **MUST** replace it with one
> of `IMPORT`, `EXPORT`, `PUSH`, `BACKUP`, `PRUNE`, `INDEXES`, `REQUESTS`. To see tasks active settings run
> of `IMPORT`, `EXPORT`, `PUSH`, `BACKUP`, `PRUNE`, `INDEXES`. To see tasks active settings run
```bash
$ docker exec -ti watchstate console system:tasks

View File

@@ -9,6 +9,14 @@ out of the box, this tool support `Jellyfin`, `Plex` and `Emby` media servers.
## Updates
### 2024-08-18
We have started migrating the old events system to a new one, so far we have migrated the `progress` and `requests` to it. As such,
The old tasks `state:progress` and `state:requests` are now gone. To control if you want to enable the watch progress, there is new
environment variable `WS_SYNC_PROGRESS` which you can set to `true` to enable the watch progress. It's disabled by default.
We will continue to migrate the rest of the events to the new system, and we will keep you updated.
### 2024-08-10
I have recently added new experimental feature, to play your content directly from the WebUI. This feature is still in

View File

@@ -9,7 +9,6 @@ use App\Commands\Events\DispatchCommand;
use App\Commands\State\BackupCommand;
use App\Commands\State\ExportCommand;
use App\Commands\State\ImportCommand;
use App\Commands\State\ProgressCommand;
use App\Commands\State\PushCommand;
use App\Commands\System\IndexCommand;
use App\Commands\System\PruneCommand;
@@ -75,6 +74,9 @@ return (function () {
'proxy' => (bool)env('WS_TRUST_PROXY', false),
'header' => (string)env('WS_TRUST_HEADER', 'X-Forwarded-For'),
],
'sync' => [
'progress' => (bool)env('WS_SYNC_PROGRESS', false),
],
];
$config['backends_file'] = fixPath(env('WS_BACKENDS_FILE', ag($config, 'path') . '/config/servers.yaml'));
@@ -272,14 +274,6 @@ return (function () {
'timer' => $checkTaskTimer((string)env('WS_CRON_PUSH_AT', '*/10 * * * *'), '*/10 * * * *'),
'args' => env('WS_CRON_PUSH_ARGS', '-v'),
],
ProgressCommand::TASK_NAME => [
'command' => ProgressCommand::ROUTE,
'name' => ProgressCommand::TASK_NAME,
'info' => 'Send play progress to backends.',
'enabled' => (bool)env('WS_CRON_PROGRESS', false),
'timer' => $checkTaskTimer((string)env('WS_CRON_PROGRESS_AT', '*/45 * * * *'), '*/45 * * * *'),
'args' => env('WS_CRON_PROGRESS_ARGS', '-v'),
],
BackupCommand::TASK_NAME => [
'command' => BackupCommand::ROUTE,
'name' => BackupCommand::TASK_NAME,

View File

@@ -161,6 +161,11 @@ return (function () {
'description' => 'All executing all commands in the console. They must be prefixed with $',
'type' => 'bool',
],
[
'key' => 'WS_SYNC_PROGRESS',
'description' => 'Enable watch progress sync.',
'type' => 'bool',
],
];
$validateCronExpression = function (string $value): string {
@@ -186,7 +191,7 @@ return (function () {
};
// -- Do not forget to update the tasks list if you add a new task.
$tasks = ['import', 'export', 'push', 'progress', 'backup', 'prune', 'indexes'];
$tasks = ['import', 'export', 'push', 'backup', 'prune', 'indexes'];
$task_env = [
[
'key' => 'WS_CRON_{TASK}',

View File

@@ -85,6 +85,18 @@
JSON.stringify(item.logs, null, 2)
}}</code></pre>
</div>
<div class="column is-12" v-if="item.options">
<h2 class="title is-4 is-clickable is-unselectable" @click="toggleOptions = !toggleOptions">
<span class="icon">
<i class="fas" :class="{ 'fa-arrow-down': !toggleOptions, 'fa-arrow-up': toggleOptions }"></i>
</span>&nbsp;
<span>Show attached options</span>
</h2>
<pre class="p-0 is-pre-wrap" v-if="toggleOptions"><code
style="word-break: break-word" class="language-json">{{
JSON.stringify(item.options, null, 2)
}}</code></pre>
</div>
</div>
</div>
</template>
@@ -105,6 +117,7 @@ const item = ref({})
const toggleLogs = useStorage('events_toggle_logs', true)
const toggleData = useStorage('events_toggle_data', true)
const toggleOptions = useStorage('events_toggle_options', true)
onMounted(async () => {
if (!id.value) {

View File

@@ -16,7 +16,6 @@ use App\Libs\Traits\APITraits;
use App\Libs\Uri;
use App\Listeners\ProcessRequestEvent;
use App\Model\Events\EventsTable;
use DateInterval;
use Monolog\Handler\StreamHandler;
use Monolog\Level;
use Monolog\Logger;
@@ -199,23 +198,14 @@ final class Webhooks
'id' => ag($entity->getMetadata($entity->via), iState::COLUMN_ID, '??'),
]);
queueEvent(ProcessRequestEvent::NAME, [
'options' => [
Options::IMPORT_METADATA_ONLY => $metadataOnly,
],
'entity' => $entity->getAll(),
], [
EventsTable::COLUMN_REFERENCE => $itemId,
queueEvent(ProcessRequestEvent::NAME, $entity->getAll(), [
'unique' => true,
EventsTable::COLUMN_REFERENCE => $itemId,
EventsTable::COLUMN_OPTIONS => [
Options::IMPORT_METADATA_ONLY => $metadataOnly,
]
]);
$pEnabled = (bool)env('WS_CRON_PROGRESS', false);
if ($pEnabled && false === $metadataOnly && true === $entity->hasPlayProgress() && !$entity->isWatched()) {
$progress = $this->cache->get('progress', []);
$progress[str_replace($itemId, ':tainted@', ':untainted@')] = $entity;
$this->cache->set('progress', $progress, new DateInterval('P3D'));
}
$this->write($request, Level::Info, 'Queued [{backend}: {event}] {item.type} [{item.title}].', [
'backend' => $entity->via,
'event' => ag($entity->getExtra($entity->via), iState::COLUMN_EXTRA_EVENT),
@@ -225,7 +215,7 @@ final class Webhooks
'type' => $entity->type,
'played' => $entity->isWatched() ? 'Yes' : 'No',
'queue_id' => $itemId,
'progress' => $pEnabled && $entity->hasPlayProgress() ? $entity->getPlayProgress() : null,
'progress' => $entity->hasPlayProgress() ? $entity->getPlayProgress() : null,
]
]
);

View File

@@ -1,387 +0,0 @@
<?php
declare(strict_types=1);
namespace App\Commands\State;
use App\Command;
use App\Libs\Attributes\Route\Cli;
use App\Libs\Config;
use App\Libs\Database\DatabaseInterface as iDB;
use App\Libs\Entity\StateInterface as iState;
use App\Libs\Exceptions\Backends\UnexpectedVersionException;
use App\Libs\Options;
use App\Libs\QueueRequests;
use Psr\Log\LoggerInterface as iLogger;
use Psr\SimpleCache\CacheInterface as iCache;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Throwable;
/**
* Class ProgressCommand
*
* This command is used to push user watch progress to export enabled backends.
* It should not be run manually and should be scheduled to run as a task.
*
* This command requires the watch progress metadata to be already saved in the database.
* If no metadata is available for a backend,
* the watch progress update won't be sent to that backend
*/
#[Cli(command: self::ROUTE)]
class ProgressCommand extends Command
{
public const string ROUTE = 'state:progress';
public const string TASK_NAME = 'progress';
/**
* Class Constructor.
*
* @param iLogger $logger The logger instance.
* @param iCache $cache The cache instance.
* @param iDB $db The database instance.
* @param QueueRequests $queue The queue requests instance.
*/
public function __construct(
private iLogger $logger,
private iCache $cache,
private iDB $db,
private QueueRequests $queue
) {
set_time_limit(0);
ini_set('memory_limit', '-1');
parent::__construct();
}
/**
* Configure the command.
*/
protected function configure(): void
{
$this->setName(self::ROUTE)
->setDescription('Push queued watch progress.')
->addOption('keep', 'k', InputOption::VALUE_NONE, 'Do not expunge queue after run is complete.')
->addOption('dry-run', null, InputOption::VALUE_NONE, 'Do not commit changes to backends.')
->addOption('list', 'l', InputOption::VALUE_NONE, 'List queued items.')
->addOption('ignore-date', null, InputOption::VALUE_NONE, 'Ignore date comparison.')
->setHelp(
r(
<<<HELP
This command push <notice>user</notice> watch progress to export enabled backends.
You should not run this manually and instead rely on scheduled task to run this command.
This command require the <notice>metadata</notice> to be already saved in database.
If no metadata available for a backend, then watch progress update won't be sent to that backend.
HELP,
[
'cmd' => trim(commandContext()),
'route' => self::ROUTE,
]
)
);
}
/**
* Make sure the command is not running in parallel.
*
* @param InputInterface $input
* @param OutputInterface $output
* @return int
* @throws \Psr\Cache\InvalidArgumentException if the cache key is not a legal value
*/
protected function runCommand(InputInterface $input, OutputInterface $output): int
{
return $this->single(fn(): int => $this->process($input, $output), $output);
}
/**
* Run the command.
*
* @param InputInterface $input The input interface.
* @param OutputInterface $output The output interface.
* @return int Returns the status code.
* @throws \Psr\Cache\InvalidArgumentException if the cache key is not a legal value
* @noinspection PhpRedundantCatchClauseInspection
*/
protected function process(InputInterface $input, OutputInterface $output): int
{
if (!$this->cache->has('progress')) {
$this->logger->info('No watch progress items in the queue.');
return self::SUCCESS;
}
/** @var array<iState> $entities */
$entities = [];
foreach ($this->cache->get('progress', []) as $queueItem) {
assert($queueItem instanceof iState);
$dbItem = $this->db->get($queueItem);
if (null === $dbItem || $dbItem->isWatched() || $queueItem->isWatched()) {
continue;
}
$dbItem = $dbItem->apply($queueItem);
if (!$dbItem->hasPlayProgress()) {
continue;
}
if (array_key_exists($dbItem->id, $entities) && $entities[$dbItem->id]->getPlayProgress(
) > $dbItem->getPlayProgress()) {
continue;
}
$entities[$dbItem->id] = $dbItem;
}
if (empty($entities)) {
$this->cache->delete('progress');
$this->logger->debug('No watch progress items in the queue.');
return self::SUCCESS;
}
if ($input->getOption('list')) {
return $this->listItems($input, $output, $entities);
}
$list = [];
$supported = Config::get('supported', []);
foreach ((array)Config::get('servers', []) as $backendName => $backend) {
$type = strtolower(ag($backend, 'type', 'unknown'));
if (true !== (bool)ag($backend, 'export.enabled')) {
$this->logger->info("SYSTEM: Export to '{backend}' is disabled by user.", [
'backend' => $backendName,
]);
continue;
}
if (!isset($supported[$type])) {
$this->logger->error("SYSTEM: '{backend}' Invalid type.", [
'backend' => $backendName,
'condition' => [
'expected' => implode(', ', array_keys($supported)),
'given' => $type,
],
]);
continue;
}
if (null === ($url = ag($backend, 'url')) || false === isValidURL($url)) {
$this->logger->error("SYSTEM: '{backend}' Invalid url.", [
'backend' => $backendName,
'url' => $url ?? 'None',
]);
continue;
}
$backend['name'] = $backendName;
$list[$backendName] = $backend;
}
if (empty($list)) {
$this->logger->warning('SYSTEM: There are no backends with export enabled.');
return self::FAILURE;
}
foreach ($list as $name => &$backend) {
try {
$opts = ag($backend, 'options', []);
if ($input->getOption('ignore-date')) {
$opts[Options::IGNORE_DATE] = true;
}
if ($input->getOption('dry-run')) {
$opts[Options::DRY_RUN] = true;
}
if ($input->getOption('trace')) {
$opts[Options::DEBUG_TRACE] = true;
}
$backend['options'] = $opts;
$backend['class'] = $this->getBackend(name: $name, config: $backend);
$backend['class']->progress(entities: $entities, queue: $this->queue);
} catch (UnexpectedVersionException $e) {
$this->logger->notice(
"SYSTEM: Sync play progress is not supported for '{backend}'. '{error.message}' at '{error.file}:{error.line}'.",
[
'backend' => $name,
'error' => [
'kind' => $e::class,
'line' => $e->getLine(),
'message' => $e->getMessage(),
'file' => after($e->getFile(), ROOT_PATH),
],
'exception' => [
'file' => $e->getFile(),
'line' => $e->getLine(),
'kind' => get_class($e),
'message' => $e->getMessage(),
'trace' => $e->getTrace(),
],
]
);
} catch (Throwable $e) {
$this->logger->error(
message: "SYSTEM: Exception '{error.kind}' was thrown unhandled during '{backend}' request to sync progress. '{error.message}' at '{error.file}:{error.line}'.",
context: [
'backend' => $name,
'error' => [
'kind' => $e::class,
'line' => $e->getLine(),
'message' => $e->getMessage(),
'file' => after($e->getFile(), ROOT_PATH),
],
'exception' => [
'file' => $e->getFile(),
'line' => $e->getLine(),
'kind' => get_class($e),
'message' => $e->getMessage(),
'trace' => $e->getTrace(),
],
]
);
}
}
unset($backend);
$total = count($this->queue);
if ($total >= 1) {
$start = makeDate();
$this->logger->notice("SYSTEM: Sending '{total}' progress update requests.", [
'total' => $total,
'time' => [
'start' => $start,
],
]);
foreach ($this->queue->getQueue() as $response) {
$context = ag($response->getInfo('user_data'), 'context', []);
try {
if ($input->getOption('trace')) {
$this->logger->debug("Processing '{backend}' - '{item.title}' response.", [
'url' => ag($context, 'remote.url', '??'),
'status_code' => $response->getStatusCode(),
'headers' => $response->getHeaders(false),
'response' => $response->getContent(false),
...$context
]);
}
if (!in_array($response->getStatusCode(), [200, 204])) {
$this->logger->error(
"SYSTEM: Request to change '{backend}' '{item.title}' watch progress returned with unexpected '{status_code}' status code.",
[
'status_code' => $response->getStatusCode(),
...$context
]
);
continue;
}
$this->logger->notice("SYSTEM: Updated '{backend}' '{item.title}' watch progress.", [
...$context,
'status_code' => $response->getStatusCode(),
]);
} catch (Throwable $e) {
$this->logger->error(
message: "SYSTEM: Exception '{error.kind}' was thrown unhandled during '{backend}' request to change watch progress of {item.type} '{item.title}'. '{error.message}' at '{error.file}:{error.line}'.",
context: [
'error' => [
'kind' => $e::class,
'line' => $e->getLine(),
'message' => $e->getMessage(),
'file' => after($e->getFile(), ROOT_PATH),
],
'exception' => [
'file' => $e->getFile(),
'line' => $e->getLine(),
'kind' => get_class($e),
'message' => $e->getMessage(),
'trace' => $e->getTrace(),
],
...$context,
]
);
}
}
$end = makeDate();
$this->logger->notice("SYSTEM: Sent '{total}' watch progress requests.", [
'total' => $total,
'time' => [
'start' => $start,
'end' => $end,
'duration' => $end->getTimestamp() - $start->getTimestamp(),
],
]);
$this->logger->notice("Using WatchState Version - '{version}'.", ['version' => getAppVersion()]);
} else {
$this->logger->notice('SYSTEM: No play state changes detected.');
}
if (false === $input->getOption('keep') && false === $input->getOption('dry-run')) {
$this->cache->delete('progress');
}
return self::SUCCESS;
}
/**
* Renders and displays a list of items based on the specified output mode.
*
* @param InputInterface $input The input interface object.
* @param OutputInterface $output The output interface object.
* @param array $items An array of items to be listed.
*
* @return int The status code indicating the success of the method execution.
*/
private function listItems(InputInterface $input, OutputInterface $output, array $items): int
{
$list = [];
$mode = $input->getOption('output');
foreach ($items as $item) {
if ('table' === $mode) {
$builder = [
'queued' => makeDate(ag($item->getExtra($item->via), iState::COLUMN_EXTRA_DATE))->format(
'Y-m-d H:i:s T'
),
'via' => $item->via,
'title' => $item->getName(),
'played' => $item->isWatched() ? 'Yes' : 'No',
'play_time' => formatDuration($item->getPlayProgress()),
'tainted' => $item->isTainted() ? 'Yes' : 'No',
'event' => ag($item->getExtra($item->via), iState::COLUMN_EXTRA_EVENT, '??'),
];
} else {
$builder = [
...$item->getAll(),
'tainted' => $item->isTainted(),
];
}
$list[] = $builder;
}
$this->displayContent($list, $output, $mode);
return self::SUCCESS;
}
}

View File

@@ -16,6 +16,11 @@ class DataEvent extends Event
return $this->eventInfo;
}
public function getReference(): string|null
{
return $this->eventInfo->reference;
}
public function addLog(string $log): void
{
$this->eventInfo->logs[] = $log;
@@ -30,4 +35,9 @@ class DataEvent extends Event
{
return $this->eventInfo->event_data;
}
public function getOptions(): array
{
return $this->eventInfo->options;
}
}

View File

@@ -0,0 +1,14 @@
<?php
declare(strict_types=1);
namespace App\Libs\Exceptions\Backends;
/**
* Class UnexpectedVersionException
*
* This exception is thrown when the requested method is not implemented by the backend.
*/
class NotImplementedException extends BackendException
{
}

View File

@@ -4,13 +4,15 @@ declare(strict_types=1);
namespace App\Libs\Mappers\Import;
use App\Libs\Config;
use App\Libs\Container;
use App\Libs\Database\DatabaseInterface as iDB;
use App\Libs\Entity\StateInterface as iState;
use App\Libs\Mappers\ImportInterface as iImport;
use App\Libs\Message;
use App\Libs\Options;
use DateInterval;
use App\Listeners\ProcessProgressEvent;
use App\Model\Events\EventsTable;
use DateTimeInterface as iDate;
use PDOException;
use Psr\Log\LoggerInterface as iLogger;
@@ -740,13 +742,18 @@ final class DirectMapper implements iImport
*/
public function commit(): array
{
if (true === (bool)env('WS_CRON_PROGRESS', false) && count($this->progressItems) >= 1) {
if (true === (bool)Config::get('sync.progress', false) && count($this->progressItems) >= 1) {
try {
$progress = $this->cache->get('progress', []);
foreach ($this->progressItems as $itemId => $entity) {
$progress[$itemId] = $entity;
foreach ($this->progressItems as $entity) {
queueEvent(ProcessProgressEvent::NAME, [iState::COLUMN_ID => $entity->id], [
'unique' => true,
EventsTable::COLUMN_REFERENCE => r('{type}://{id}@{backend}', [
'type' => $entity->type,
'backend' => $entity->via,
'id' => ag($entity->getMetadata($entity->via), iState::COLUMN_ID, '??'),
]),
]);
}
$this->cache->set('progress', $progress, new DateInterval('P3D'));
} catch (\Psr\SimpleCache\InvalidArgumentException) {
}
}

View File

@@ -4,12 +4,14 @@ declare(strict_types=1);
namespace App\Libs\Mappers\Import;
use App\Libs\Config;
use App\Libs\Database\DatabaseInterface as iDB;
use App\Libs\Entity\StateInterface as iState;
use App\Libs\Mappers\ImportInterface as iImport;
use App\Libs\Message;
use App\Libs\Options;
use DateInterval;
use App\Listeners\ProcessProgressEvent;
use App\Model\Events\EventsTable;
use DateTimeInterface as iDate;
use PDOException;
use Psr\Log\LoggerInterface as iLogger;
@@ -543,13 +545,18 @@ final class MemoryMapper implements iImport
public function commit(): mixed
{
if (true !== $this->inDryRunMode()) {
if (true === (bool)env('WS_CRON_PROGRESS', false) && count($this->progressItems) >= 1) {
if (true === (bool)Config::get('sync.progress', false) && count($this->progressItems) >= 1) {
try {
$progress = $this->cache->get('progress', []);
foreach ($this->progressItems as $itemId => $entity) {
$progress[$itemId] = $entity;
foreach ($this->progressItems as $entity) {
queueEvent(ProcessProgressEvent::NAME, [iState::COLUMN_ID => $entity->id], [
'unique' => true,
EventsTable::COLUMN_REFERENCE => r('{type}://{id}@{backend}', [
'type' => $entity->type,
'backend' => $entity->via,
'id' => ag($entity->getMetadata($entity->via), iState::COLUMN_ID, '??'),
]),
]);
}
$this->cache->set('progress', $progress, new DateInterval('P3D'));
} catch (\Psr\SimpleCache\InvalidArgumentException) {
}
}

View File

@@ -10,6 +10,7 @@ use App\Libs\APIResponse;
use App\Libs\Attributes\Scanner\Attributes as AttributesScanner;
use App\Libs\Attributes\Scanner\Item as ScannerItem;
use App\Libs\Config;
use App\Libs\ConfigFile;
use App\Libs\Container;
use App\Libs\DataUtil;
use App\Libs\Entity\StateInterface as iState;
@@ -2012,6 +2013,10 @@ if (!function_exists('queueEvent')) {
'class' => ag($opts, 'class', DataEvent::class),
];
if (ag_exists($opts, EventsTable::COLUMN_OPTIONS) && is_array($opts[EventsTable::COLUMN_OPTIONS])) {
$item->options = array_replace_recursive($opts[EventsTable::COLUMN_OPTIONS], $item->options);
}
if ($reference) {
$item->reference = $reference;
}
@@ -2042,3 +2047,28 @@ if (!function_exists('getPagination')) {
return [$page, $perpage, $start];
}
}
if (!function_exists('getBackend')) {
/**
* Retrieves the backend client for the specified name.
*
* @param string $name The name of the backend.
* @param array $config (Optional) Override the default configuration for the backend.
*
* @return iClient The backend client instance.
* @throws RuntimeException If no backend with the specified name is found.
*/
function getBackend(string $name, array $config = []): iClient
{
$configFile = ConfigFile::open(Config::get('backends_file'), 'yaml');
if (null === $configFile->get("{$name}.type", null)) {
throw new RuntimeException(r("No backend named '{backend}' was found.", ['backend' => $name]));
}
$default = $configFile->get($name);
$default['name'] = $name;
return makeBackend(array_replace_recursive($default, $config), $name);
}
}

View File

@@ -0,0 +1,245 @@
<?php
declare(strict_types=1);
namespace App\Listeners;
use App\Libs\Config;
use App\Libs\Container;
use App\Libs\Database\DatabaseInterface as iDB;
use App\Libs\Entity\StateInterface as iState;
use App\Libs\Enums\Http\Status;
use App\libs\Events\DataEvent;
use App\Libs\Exceptions\Backends\NotImplementedException;
use App\Libs\Exceptions\Backends\UnexpectedVersionException;
use App\Libs\Options;
use App\Libs\QueueRequests;
use App\Model\Events\EventListener;
use Monolog\Level;
use Psr\Log\LoggerInterface as iLogger;
use Throwable;
#[EventListener(self::NAME)]
final readonly class ProcessProgressEvent
{
public const string NAME = 'on_progress';
/**
* Class constructor.
*
* @param iLogger $logger The logger object.
*/
public function __construct(private iLogger $logger, private iDB $db, private QueueRequests $queue)
{
set_time_limit(0);
ini_set('memory_limit', '-1');
}
public function __invoke(DataEvent $e): DataEvent
{
$writer = function (Level $level, string $message, array $context = []) use ($e) {
$e->addLog($level->getName() . ': ' . r($message, $context));
$this->logger->log($level, $message, $context);
};
$e->stopPropagation();
$options = $e->getOptions();
if (null === ($item = $this->db->get(Container::get(iState::class)::fromArray($e->getData())))) {
$writer(Level::Error, "Item with id '{id}' not found.", [
'id' => ag($e->getData(), 'id', $e->getReference() ?? 'Unknown ID.')
]);
return $e;
}
$list = [];
$supported = Config::get('supported', []);
foreach ((array)Config::get('servers', []) as $backendName => $backend) {
$type = strtolower(ag($backend, 'type', 'unknown'));
if (true !== (bool)ag($backend, 'export.enabled')) {
$writer(Level::Notice, "SYSTEM: Export to '{backend}' is disabled by user.", [
'backend' => $backendName
]);
continue;
}
if (!isset($supported[$type])) {
$writer(Level::Error, "SYSTEM: '{backend}' Invalid type.", [
'backend' => $backendName,
'condition' => [
'expected' => implode(', ', array_keys($supported)),
'given' => $type,
],
]);
continue;
}
if (null === ($url = ag($backend, 'url')) || false === isValidURL($url)) {
$writer(Level::Error, "SYSTEM: '{backend}' Invalid url.", [
'backend' => $backendName,
'url' => $url ?? 'None',
]);
continue;
}
$backend['name'] = $backendName;
$list[$backendName] = $backend;
}
if (empty($list)) {
$writer(Level::Info, 'SYSTEM: There are no backends with export enabled.');
return $e;
}
foreach ($list as $name => &$backend) {
try {
$opts = ag($backend, 'options', []);
if (ag($options, 'ignore-date')) {
$opts[Options::IGNORE_DATE] = true;
}
if (ag($options, 'dry-run')) {
$opts[Options::DRY_RUN] = true;
}
if (ag($options, 'trace')) {
$opts[Options::DEBUG_TRACE] = true;
}
$backend['options'] = $opts;
$backend['class'] = getBackend(name: $name, config: $backend);
$backend['class']->progress(entities: [$item->id => $item], queue: $this->queue);
} catch (UnexpectedVersionException|NotImplementedException $e) {
$writer(
Level::Notice,
"SYSTEM: This feature is not available for '{backend}'. '{error.message}' at '{error.file}:{error.line}'.",
[
'backend' => $name,
'error' => [
'kind' => $e::class,
'line' => $e->getLine(),
'message' => $e->getMessage(),
'file' => after($e->getFile(), ROOT_PATH),
],
'exception' => [
'file' => $e->getFile(),
'line' => $e->getLine(),
'kind' => get_class($e),
'message' => $e->getMessage(),
'trace' => $e->getTrace(),
],
]
);
} catch (Throwable $e) {
$writer(
Level::Error,
"SYSTEM: Exception '{error.kind}' was thrown unhandled during '{backend}' request to sync progress. '{error.message}' at '{error.file}:{error.line}'.",
[
'backend' => $name,
'error' => [
'kind' => $e::class,
'line' => $e->getLine(),
'message' => $e->getMessage(),
'file' => after($e->getFile(), ROOT_PATH),
],
'exception' => [
'file' => $e->getFile(),
'line' => $e->getLine(),
'kind' => get_class($e),
'message' => $e->getMessage(),
'trace' => $e->getTrace(),
],
]
);
}
}
unset($backend);
$total = count($this->queue);
if ($total >= 1) {
$start = makeDate();
$writer(Level::Notice, "SYSTEM: Sending '{total}' progress update requests.", [
'total' => $total,
'time' => [
'start' => $start,
],
]);
foreach ($this->queue->getQueue() as $response) {
$context = ag($response->getInfo('user_data'), 'context', []);
try {
if (ag($options, 'trace')) {
$writer(Level::Debug, "Processing '{backend}' - '{item.title}' response.", [
'url' => ag($context, 'remote.url', '??'),
'status_code' => $response->getStatusCode(),
'headers' => $response->getHeaders(false),
'response' => $response->getContent(false),
...$context
]);
}
if (!in_array($response->getStatusCode(), [Status::OK->value, Status::NO_CONTENT->value])) {
$writer(
Level::Error,
"SYSTEM: Request to change '{backend}' '{item.title}' watch progress returned with unexpected '{status_code}' status code.",
[
'status_code' => $response->getStatusCode(),
...$context
]
);
continue;
}
$writer(Level::Notice, "SYSTEM: Updated '{backend}' '{item.title}' watch progress.", [
...$context,
'status_code' => $response->getStatusCode(),
]);
} catch (Throwable $e) {
$writer(
Level::Error,
"SYSTEM: Exception '{error.kind}' was thrown unhandled during '{backend}' request to change watch progress of {item.type} '{item.title}'. '{error.message}' at '{error.file}:{error.line}'.",
[
'error' => [
'kind' => $e::class,
'line' => $e->getLine(),
'message' => $e->getMessage(),
'file' => after($e->getFile(), ROOT_PATH),
],
'exception' => [
'file' => $e->getFile(),
'line' => $e->getLine(),
'kind' => get_class($e),
'message' => $e->getMessage(),
'trace' => $e->getTrace(),
],
...$context,
]
);
}
}
$end = makeDate();
$writer(Level::Notice, "SYSTEM: Sent '{total}' watch progress requests.", [
'total' => $total,
'time' => [
'start' => $start,
'end' => $end,
'duration' => $end->getTimestamp() - $start->getTimestamp(),
],
]);
} else {
$writer(Level::Notice, 'SYSTEM: No watch progress changes detected.');
}
return $e;
}
}

View File

@@ -12,6 +12,7 @@ use App\Libs\Extends\ProxyHandler;
use App\Libs\Mappers\Import\DirectMapper;
use App\Libs\Options;
use App\Model\Events\EventListener;
use App\Model\Events\EventsTable;
use Monolog\Logger;
use Psr\Log\LoggerInterface as iLogger;
@@ -35,10 +36,7 @@ final readonly class ProcessRequestEvent
{
$e->stopPropagation();
$data = ag($e->getData(), 'entity');
$entity = Container::get(iState::class)::fromArray($data);
$options = ag($data, 'options', []);
$entity = Container::get(iState::class)::fromArray($e->getData());
if (null !== ($lastSync = ag(Config::get("servers.{$entity->via}", []), 'import.lastSync'))) {
$lastSync = makeDate($lastSync);
@@ -64,8 +62,9 @@ final readonly class ProcessRequestEvent
$oldLogger = $this->mapper->getLogger();
$this->mapper->setLogger($logger);
$metadataOnly = (bool)ag($e->getOptions(), Options::IMPORT_METADATA_ONLY);
$this->mapper->add($entity, [
Options::IMPORT_METADATA_ONLY => (bool)ag($options, Options::IMPORT_METADATA_ONLY),
Options::IMPORT_METADATA_ONLY => $metadataOnly,
Options::STATE_UPDATE_EVENT => fn(iState $state) => queuePush($state),
'after' => $lastSync,
]);
@@ -73,6 +72,25 @@ final readonly class ProcessRequestEvent
$this->mapper->commit();
$this->mapper->setLogger($oldLogger);
$pEnabled = (bool)Config::get('sync.progress', false);
if (true === $pEnabled && true === $entity->hasPlayProgress() && !$entity->isWatched()) {
if (null !== ($newEntity = $this->mapper->get($entity))) {
$logger->notice(r("Scheduling '{title}' for watch progress update via '{backend}' event.", [
'backend' => $entity->via,
'title' => $entity->getName(),
]));
queueEvent(ProcessProgressEvent::NAME, [iState::COLUMN_ID => $newEntity->id], [
'unique' => true,
EventsTable::COLUMN_REFERENCE => r('{type}://{id}@{backend}', [
'type' => $newEntity->type,
'backend' => $newEntity->via,
'id' => ag($newEntity->getMetadata($newEntity->via), iState::COLUMN_ID, '??'),
]),
]);
}
}
$handler->close();
return $e;