Migrate state:push into the new event system.

This commit is contained in:
Abdulmhsen B. A. A.
2024-08-19 14:31:31 +03:00
parent eea52a10bd
commit 2e41f80698
15 changed files with 367 additions and 438 deletions

2
FAQ.md
View File

@@ -402,7 +402,7 @@ command via CLI.
> [!IMPORTANT]
> for environment variables that has `{TASK}` tag, you **MUST** replace it with one
> of `IMPORT`, `EXPORT`, `PUSH`, `BACKUP`, `PRUNE`, `INDEXES`. To see tasks active settings run
> of `IMPORT`, `EXPORT`, `BACKUP`, `PRUNE`, `INDEXES`. To see tasks active settings run
```bash
$ docker exec -ti watchstate console system:tasks

40
NEWS.md
View File

@@ -1,5 +1,45 @@
# Old Updates
### 2024-08-10
I have recently added new experimental feature, to play your content directly from the WebUI. This feature is still in
alpha, and missing a lot of features. But it's a start. Right now it does auto transcode on the fly to play any content in the browser.
The feature requires that you mount your media directories to the `WatchState` container similar to the `File integrity` feature. I have plans to expand
the feature to support more controls, however, right now it's only support basic subtitles streams and default audio stream or first audio stream.
The transcoder works by converting the media on the fly to `HLS` segments, and the subtitles are selectable via the player ui which are also converted to `vtt` format.
Expects bugs and issues, as the feature is still in alpha. But I would love to hear your feedback. You can play the media by visiting
the history page of the item you will see red play button on top right corner of the page. If the items has a play button, then you correctly mounted
the media directories. otherwise, the button be disabled with tooltip of `Media is inaccessible`.
The feature is not meant to replace your backend media player, the purpose of this feature is to quickly check the media without leaving the WebUI.
### 2024-08-01
We recently enabled listening on tls connections via `8443` which can be controlled by `HTTPS_PORT` environment variable.
Before today, we simply only exposed the port via the `Dockerfile`, but we weren't listening for connections on it.
However, please keep in mind that the certificate is self-signed, and you might get a warning from your browser. You can
either accept the warning or add the certificate to your trusted certificates. We strongly recommend using a reverse proxy.
instead of relying on self-signed certificates.
### 2024-07-22
We have recently added a new WebUI feature, `File integrity`, this feature will help you to check if your media backends
are reporting files that are not available on the disk. This feature is still in alpha, and we are working on improving
it.
This feature `REQUIRES` that you mount your media directories to the `WatchState` container preferably as readonly. There is plans to add
a path replacement feature to allow you change the pathing, but it's not implemented yet.
This feature will work on both local and remote cloud storages provided they are mounted into the container. We also may recommend not to
use this feature depending on how your cloud storage provider treats file stat calls. As it might lead to unnecessary money spending. and of course
it will be slower.
For more information about how we cache the stat calls, please refer to the [FAQ](FAQ.md#How-does-the-file-integrity-feature-works).
### 2024-07-06
Recently we have introduced a new feature that allows you to use Jellyfin and Emby OAuth access tokens for syncing

View File

@@ -9,6 +9,18 @@ out of the box, this tool support `Jellyfin`, `Plex` and `Emby` media servers.
## Updates
### 2024-08-19
We have migrated the `state:push` task into the new events system, as such the old task `state:push` is now gone.
To enable the new event handler for push events, use the new environment variable `WS_PUSH_ENABLED` and set it to `true`.
Right now, it's disabled by default. However, for people who had the old task enabled, it will reuse that setting.
Keep in mind, the new event handler is more efficient and will only push data when there is a change in the play state. And it's much faster
than the old task. This event handler will push data within a minute of the change.
PS: Please enable the task by setting its new environment variable `WS_PUSH_ENABLED` to `true`. The old `WS_CRON_PUSH` is now gone.
and will be removed in the future releases.
### 2024-08-18
We have started migrating the old events system to a new one, so far we have migrated the `progress` and `requests` to it. As such,
@@ -17,46 +29,6 @@ environment variable `WS_SYNC_PROGRESS` which you can set to `true` to enable th
We will continue to migrate the rest of the events to the new system, and we will keep you updated.
### 2024-08-10
I have recently added new experimental feature, to play your content directly from the WebUI. This feature is still in
alpha, and missing a lot of features. But it's a start. Right now it does auto transcode on the fly to play any content in the browser.
The feature requires that you mount your media directories to the `WatchState` container similar to the `File integrity` feature. I have plans to expand
the feature to support more controls, however, right now it's only support basic subtitles streams and default audio stream or first audio stream.
The transcoder works by converting the media on the fly to `HLS` segments, and the subtitles are selectable via the player ui which are also converted to `vtt` format.
Expects bugs and issues, as the feature is still in alpha. But I would love to hear your feedback. You can play the media by visiting
the history page of the item you will see red play button on top right corner of the page. If the items has a play button, then you correctly mounted
the media directories. otherwise, the button be disabled with tooltip of `Media is inaccessible`.
The feature is not meant to replace your backend media player, the purpose of this feature is to quickly check the media without leaving the WebUI.
### 2024-08-01
We recently enabled listening on tls connections via `8443` which can be controlled by `HTTPS_PORT` environment variable.
Before today, we simply only exposed the port via the `Dockerfile`, but we weren't listening for connections on it.
However, please keep in mind that the certificate is self-signed, and you might get a warning from your browser. You can
either accept the warning or add the certificate to your trusted certificates. We strongly recommend using a reverse proxy.
instead of relying on self-signed certificates.
### 2024-07-22
We have recently added a new WebUI feature, `File integrity`, this feature will help you to check if your media backends
are reporting files that are not available on the disk. This feature is still in alpha, and we are working on improving
it.
This feature `REQUIRES` that you mount your media directories to the `WatchState` container preferably as readonly. There is plans to add
a path replacement feature to allow you change the pathing, but it's not implemented yet.
This feature will work on both local and remote cloud storages provided they are mounted into the container. We also may recommend not to
use this feature depending on how your cloud storage provider treats file stat calls. As it might lead to unnecessary money spending. and of course
it will be slower.
For more information about how we cache the stat calls, please refer to the [FAQ](FAQ.md#How-does-the-file-integrity-feature-works).
Refer to [NEWS](NEWS.md) for old updates.
# Features

View File

@@ -9,7 +9,6 @@ use App\Commands\Events\DispatchCommand;
use App\Commands\State\BackupCommand;
use App\Commands\State\ExportCommand;
use App\Commands\State\ImportCommand;
use App\Commands\State\PushCommand;
use App\Commands\System\IndexCommand;
use App\Commands\System\PruneCommand;
use App\Libs\Mappers\Import\MemoryMapper;
@@ -75,7 +74,10 @@ return (function () {
'header' => (string)env('WS_TRUST_HEADER', 'X-Forwarded-For'),
],
'sync' => [
'progress' => (bool)env('WS_SYNC_PROGRESS', false),
'progress' => (bool)env('WS_SYNC_PROGRESS', (bool)env('WS_CRON_PROGRESS', false)),
],
'push' => [
'enabled' => (bool)env('WS_PUSH_ENABLED', (bool)env('WS_CRON_PUSH', false)),
],
];
@@ -266,14 +268,6 @@ return (function () {
'timer' => $checkTaskTimer((string)env('WS_CRON_EXPORT_AT', '30 */1 * * *'), '30 */1 * * *'),
'args' => env('WS_CRON_EXPORT_ARGS', '-v'),
],
PushCommand::TASK_NAME => [
'command' => PushCommand::ROUTE,
'name' => PushCommand::TASK_NAME,
'info' => 'Send queued events to backends.',
'enabled' => (bool)env('WS_CRON_PUSH', true),
'timer' => $checkTaskTimer((string)env('WS_CRON_PUSH_AT', '*/10 * * * *'), '*/10 * * * *'),
'args' => env('WS_CRON_PUSH_ARGS', '-v'),
],
BackupCommand::TASK_NAME => [
'command' => BackupCommand::ROUTE,
'name' => BackupCommand::TASK_NAME,

View File

@@ -166,6 +166,11 @@ return (function () {
'description' => 'Enable watch progress sync.',
'type' => 'bool',
],
[
'key' => 'WS_PUSH_ENABLED',
'description' => 'Enable Push play state to backends. This feature depends on webhooks being enabled.',
'type' => 'bool',
],
];
$validateCronExpression = function (string $value): string {
@@ -191,7 +196,7 @@ return (function () {
};
// -- Do not forget to update the tasks list if you add a new task.
$tasks = ['import', 'export', 'push', 'backup', 'prune', 'indexes'];
$tasks = ['import', 'export', 'backup', 'prune', 'indexes'];
$task_env = [
[
'key' => 'WS_CRON_{TASK}',

View File

@@ -39,13 +39,14 @@
</div>
</div>
<div class="columns is-multiline" v-if="items.length < 1">
<div class="columns is-multiline" v-if="filteredRows.length < 1">
<div class="column is-12">
<Message v-if="isLoading" message_class="has-background-info-90 has-text-dark" title="Loading"
icon="fas fa-spinner fa-spin" message="Loading data. Please wait..."/>
<Message v-else class="has-background-warning-80 has-text-dark" title="Warning"
icon="fas fa-exclamation-triangle">
<p>No items found.</p>
<p v-if="query">Search for <strong>{{ query }}</strong> returned no results.</p>
</Message>
</div>
</div>
@@ -79,7 +80,7 @@
</div>
<span class="card-footer-item">
<span class="icon"><i class="fas fa-calendar"></i></span>
<time class="has-tooltip" v-tooltip="`Created at: ${moment(item.created_at).format(tooltip_dateformat)}`">
<time class="has-tooltip" v-tooltip="`Created at: ${moment(item.created_at)}`">
{{ moment(item.created_at).fromNow() }}
</time>
</span>
@@ -87,8 +88,7 @@
<span v-if="!item.updated_at" class="icon"><i class="fas fa-spinner fa-spin"></i></span>
<template v-else>
<span class="icon"><i class="fas fa-calendar-alt"></i></span>
<time class="has-tooltip"
v-tooltip="`Updated at: ${moment(item.updated_at).format(tooltip_dateformat)}`">
<time class="has-tooltip" v-tooltip="`Updated at: ${moment(item.updated_at)}`">
{{ moment(item.updated_at).fromNow() }}
</time>
</template>
@@ -173,7 +173,7 @@ const filteredRows = computed(() => {
return items.value.filter(i => {
return Object.keys(i).some(k => {
if (typeof i[k] === 'object') {
if (typeof i[k] === 'object' && null !== i[k]) {
return Object.values(i[k]).some(v => typeof v === 'string' ? v.toLowerCase().includes(toTower) : false)
}
return typeof i[k] === 'string' ? i[k].toLowerCase().includes(toTower) : false

View File

@@ -429,7 +429,14 @@
</span>
</span>
<div v-if="showRawData" class="mt-2">
<code class="is-block is-pre-wrap">{{ JSON.stringify(data, null, 2) }}</code>
<code class="is-block is-pre-wrap">{{
JSON.stringify(Object.keys(data)
.filter(key => !['files', 'hardware', 'content_exists', '_toggle'].includes(key))
.reduce((obj, key) => {
obj[key] = data[key];
return obj;
}, {}), null, 2)
}}</code>
</div>
</div>

View File

@@ -3,7 +3,7 @@ const makeName = id => id.split('-').slice(0)[0]
const getStatusClass = status => {
switch (status) {
case 0:
return 'is-light'
return 'is-light has-text-dark'
case 1:
return 'is-warning'
case 2:
@@ -13,7 +13,7 @@ const getStatusClass = status => {
case 4:
return 'is-danger is-light'
default:
return 'is-light'
return 'is-light has-text-dark'
}
}

View File

@@ -22,7 +22,7 @@ use Throwable;
#[Cli(command: self::ROUTE)]
final class DispatchCommand extends Command
{
public const string TASK_NAME = 'Dispatch';
public const string TASK_NAME = 'dispatch';
public const string ROUTE = 'events:dispatch';

View File

@@ -1,272 +0,0 @@
<?php
declare(strict_types=1);
namespace App\Commands\State;
use App\Command;
use App\Libs\Attributes\Route\Cli;
use App\Libs\Config;
use App\Libs\Container;
use App\Libs\Database\DatabaseInterface as iDB;
use App\Libs\Entity\StateInterface as iState;
use App\Libs\Options;
use App\Libs\QueueRequests;
use Psr\Log\LoggerInterface as iLogger;
use Psr\SimpleCache\CacheInterface as iCache;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Throwable;
/**
* Class PushCommand
*
* This class represents a command that pushes webhook queued events.
* It sends change play state requests to the supported backends.
*/
#[Cli(command: self::ROUTE)]
class PushCommand extends Command
{
public const ROUTE = 'state:push';
public const TASK_NAME = 'push';
/**
* Constructor for the given class.
*
* @param iLogger $logger The logger instance.
* @param iCache $cache The cache instance.
* @param iDB $db The database instance.
* @param QueueRequests $queue The queue instance.
*
* @return void
*/
public function __construct(
private iLogger $logger,
private iCache $cache,
private iDB $db,
private QueueRequests $queue
) {
set_time_limit(0);
ini_set('memory_limit', '-1');
parent::__construct();
}
/**
* Configure command.
*/
protected function configure(): void
{
$this->setName(self::ROUTE)
->setDescription('Push webhook queued events.')
->addOption('keep', 'k', InputOption::VALUE_NONE, 'Do not expunge queue after run is complete.')
->addOption('dry-run', null, InputOption::VALUE_NONE, 'Do not commit changes to backends.')
->addOption('ignore-date', null, InputOption::VALUE_NONE, 'Ignore date comparison.')
->setHelp(
r(
<<<HELP
This command push <notice>webhook</notice> updated play state to export enabled backends.
You should not run this manually and instead rely on scheduled task to run this command.
This command require the <notice>metadata</notice> to be already saved in database.
If no metadata available for a backend, then the item will be ignored for that backend.
If the item was ignored during <cmd>{route}</cmd> run, it will be picked up later by next <cmd>{export_route}</cmd> run.
HELP,
[
'cmd' => trim(commandContext()),
'route' => self::ROUTE,
'export_route' => ExportCommand::ROUTE,
]
)
);
}
/**
* Make sure the command is not running in parallel.
*
* @param InputInterface $input The input interface.
* @param OutputInterface $output The output interface.
*
* @return int Returns the process result status code.
* @throws \Psr\SimpleCache\InvalidArgumentException if the cache key is not a legal value.
*/
protected function runCommand(InputInterface $input, OutputInterface $output): int
{
return $this->single(fn(): int => $this->process($input), $output);
}
/**
* Process the queue items and send change play state requests to the supported backends.
*
* @param InputInterface $input The input interface.
*
* @return int Returns the process result status code.
* @throws \Psr\SimpleCache\InvalidArgumentException if the cache key is not a legal value.
*/
protected function process(InputInterface $input): int
{
if (!$this->cache->has('queue')) {
$this->logger->info('No items in the queue.');
return self::SUCCESS;
}
$entities = $items = [];
foreach ($this->cache->get('queue', []) as $item) {
$items[] = Container::get(iState::class)::fromArray($item);
}
if (!empty($items)) {
foreach ($this->db->find(...$items) as $item) {
$entities[$item->id] = $item;
}
}
$items = null;
if (empty($entities)) {
$this->cache->delete('queue');
$this->logger->debug('No items in the queue.');
return self::SUCCESS;
}
$list = [];
$supported = Config::get('supported', []);
foreach ((array)Config::get('servers', []) as $backendName => $backend) {
$type = strtolower(ag($backend, 'type', 'unknown'));
if (true !== (bool)ag($backend, 'export.enabled')) {
$this->logger->info('SYSTEM: Export to [{backend}] is disabled by user.', [
'backend' => $backendName,
]);
continue;
}
if (!isset($supported[$type])) {
$this->logger->error('SYSTEM: [{backend}] Invalid type.', [
'backend' => $backendName,
'condition' => [
'expected' => implode(', ', array_keys($supported)),
'given' => $type,
],
]);
continue;
}
if (null === ($url = ag($backend, 'url')) || false === isValidURL($url)) {
$this->logger->error('SYSTEM: [{backend}] Invalid url.', [
'backend' => $backendName,
'url' => $url ?? 'None',
]);
continue;
}
$backend['name'] = $backendName;
$list[$backendName] = $backend;
}
if (empty($list)) {
$this->logger->warning('SYSTEM: There are no backends with export enabled.');
return self::FAILURE;
}
foreach ($list as $name => &$backend) {
$opts = ag($backend, 'options', []);
if ($input->getOption('ignore-date')) {
$opts[Options::IGNORE_DATE] = true;
}
if ($input->getOption('dry-run')) {
$opts[Options::DRY_RUN] = true;
}
if ($input->getOption('trace')) {
$opts[Options::DEBUG_TRACE] = true;
}
$backend['options'] = $opts;
$backend['class'] = $this->getBackend(name: $name, config: $backend);
$backend['class']->push(entities: $entities, queue: $this->queue);
}
unset($backend);
$total = count($this->queue);
if ($total >= 1) {
$start = makeDate();
$this->logger->notice('SYSTEM: Sending [{total}] change play state requests.', [
'total' => $total,
'time' => [
'start' => $start,
],
]);
foreach ($this->queue->getQueue() as $response) {
$context = ag($response->getInfo('user_data'), 'context', []);
try {
if (200 !== $response->getStatusCode()) {
$this->logger->error(
'SYSTEM: Request to change [{backend}] [{item.title}] play state returned with unexpected [{status_code}] status code.',
$context
);
continue;
}
$this->logger->notice('SYSTEM: Marked [{backend}] [{item.title}] as [{play_state}].', $context);
} catch (Throwable $e) {
$this->logger->error(
message: 'SYSTEM: Exception [{error.kind}] was thrown unhandled during [{backend}] request to change play state of {item.type} [{item.title}]. Error [{error.message} @ {error.file}:{error.line}].',
context: [
'error' => [
'kind' => $e::class,
'line' => $e->getLine(),
'message' => $e->getMessage(),
'file' => after($e->getFile(), ROOT_PATH),
],
...$context,
'exception' => [
'file' => $e->getFile(),
'line' => $e->getLine(),
'kind' => get_class($e),
'message' => $e->getMessage(),
'trace' => $e->getTrace(),
],
]
);
}
}
$end = makeDate();
$this->logger->notice('SYSTEM: Sent [{total}] change play state requests.', [
'total' => $total,
'time' => [
'start' => $start,
'end' => $end,
'duration' => $end->getTimestamp() - $start->getTimestamp(),
],
]);
$this->logger->notice('Using WatchState Version - \'{version}\'.', ['version' => getAppVersion()]);
} else {
$this->logger->notice('SYSTEM: No play state changes detected.');
}
if (false === $input->getOption('keep') && false === $input->getOption('dry-run')) {
$this->cache->delete('queue');
}
return self::SUCCESS;
}
}

View File

@@ -27,6 +27,7 @@ use App\Libs\Response;
use App\Libs\Router;
use App\Libs\Stream;
use App\Libs\Uri;
use App\Listeners\ProcessPushEvent;
use App\Model\Events\Event as EventInfo;
use App\Model\Events\EventListener;
use App\Model\Events\EventsRepository;
@@ -536,47 +537,48 @@ if (!function_exists('httpClientChunks')) {
if (!function_exists('queuePush')) {
/**
* Pushes the entity to the queue.
*
* This method adds the entity to the queue for further processing.
* Add push event to the events queue.
*
* @param iState $entity The entity to push to the queue.
* @param bool $remove (optional) Whether to remove the entity from the queue if it already exists (default is false).
* @param bool $remove Whether to remove the event from the queue if it's in pending state. Default is false.
*/
function queuePush(iState $entity, bool $remove = false): void
{
if (!$remove && !$entity->hasGuids() && !$entity->hasRelativeGuid()) {
$logger = Container::get(iLogger::class);
if (false === (bool)Config::get('push.enabled', false)) {
$logger->error("Push is disabled. Unable to push '{via}: {entity}'.", [
'via' => $entity->via,
'entity' => $entity->getName()
]);
return;
}
try {
$cache = Container::get(iCache::class);
$list = $cache->get('queue', []);
if (true === $remove && array_key_exists($entity->id, $list)) {
unset($list[$entity->id]);
} else {
$list[$entity->id] = ['id' => $entity->id];
}
$cache->set('queue', $list, new DateInterval('P7D'));
} catch (\Psr\SimpleCache\InvalidArgumentException $e) {
Container::get(iLogger::class)->error(
message: 'Exception [{error.kind}] was thrown unhandled during saving [{backend} - {title}} into queue. Error [{error.message} @ {error.file}:{error.line}].',
context: [
'backend' => $entity->via,
'title' => $entity->getName(),
'error' => [
'kind' => $e::class,
'line' => $e->getLine(),
'message' => $e->getMessage(),
'file' => after($e->getFile(), ROOT_PATH),
],
'trace' => $e->getTrace(),
],
);
if (!$entity->id) {
$logger->error("Unable to push event '{via}: {entity}'. It has no local id yet.", [
'via' => $entity->via,
'entity' => $entity->getName()
]);
return;
}
if (true === $remove) {
Container::get(EventsRepository::class)->removeByReference(r('push://{id}', ['id' => $entity->id]));
return;
}
if (!$entity->hasGuids() && !$entity->hasRelativeGuid()) {
$logger->error("Unable to push '{id}' event '{via}: {entity}'. It has no GUIDs.", [
'id' => $entity->id,
'via' => $entity->via,
'entity' => $entity->getName()
]);
return;
}
queueEvent(ProcessPushEvent::NAME, [iState::COLUMN_ID => $entity->id], [
EventsTable::COLUMN_REFERENCE => r('push://{id}', ['id' => $entity->id]),
]);
}
}

View File

@@ -72,14 +72,15 @@ final readonly class ProcessProgressEvent
$type = strtolower(ag($backend, 'type', 'unknown'));
if (true !== (bool)ag($backend, 'export.enabled')) {
$writer(Level::Notice, "SYSTEM: Export to '{backend}' is disabled by user.", [
$writer(Level::Notice, "Export to '{backend}' is disabled by user.", [
'backend' => $backendName
]);
continue;
}
if (!isset($supported[$type])) {
$writer(Level::Error, "SYSTEM: '{backend}' Invalid type.", [
$writer(Level::Error, "The backend '{backend}' is using invalid type '{type}'.", [
'type' => $type,
'backend' => $backendName,
'condition' => [
'expected' => implode(', ', array_keys($supported)),
@@ -90,7 +91,7 @@ final readonly class ProcessProgressEvent
}
if (null === ($url = ag($backend, 'url')) || false === isValidURL($url)) {
$writer(Level::Error, "SYSTEM: '{backend}' Invalid url.", [
$writer(Level::Error, "The backend '{backend}' URL is invalid.", [
'backend' => $backendName,
'url' => $url ?? 'None',
]);
@@ -102,7 +103,7 @@ final readonly class ProcessProgressEvent
}
if (empty($list)) {
$writer(Level::Info, 'SYSTEM: There are no backends with export enabled.');
$writer(Level::Error, 'There are no backends with export enabled.');
return $e;
}
@@ -128,7 +129,7 @@ final readonly class ProcessProgressEvent
} catch (UnexpectedVersionException|NotImplementedException $e) {
$writer(
Level::Notice,
"SYSTEM: This feature is not available for '{backend}'. '{error.message}' at '{error.file}:{error.line}'.",
"This feature is not available for '{backend}'. '{error.message}' at '{error.file}:{error.line}'.",
[
'backend' => $name,
'error' => [
@@ -149,7 +150,7 @@ final readonly class ProcessProgressEvent
} catch (Throwable $e) {
$writer(
Level::Error,
"SYSTEM: Exception '{error.kind}' was thrown unhandled during '{backend}' request to sync progress. '{error.message}' at '{error.file}:{error.line}'.",
"Exception '{error.kind}' was thrown unhandled during '{backend}' request to sync progress. '{error.message}' at '{error.file}:{error.line}'.",
[
'backend' => $name,
'error' => [
@@ -172,83 +173,63 @@ final readonly class ProcessProgressEvent
unset($backend);
$total = count($this->queue);
if (count($this->queue) < 1) {
$writer(Level::Notice, "Backend handlers didn't queue items to be updated.");
return $e;
}
if ($total >= 1) {
$start = makeDate();
foreach ($this->queue->getQueue() as $response) {
$context = ag($response->getInfo('user_data'), 'context', []);
$writer(Level::Notice, "SYSTEM: Sending '{total}' progress update requests.", [
'total' => $total,
'time' => [
'start' => $start,
],
]);
foreach ($this->queue->getQueue() as $response) {
$context = ag($response->getInfo('user_data'), 'context', []);
try {
if (ag($options, 'trace')) {
$writer(Level::Debug, "Processing '{backend}' - '{item.title}' response.", [
'url' => ag($context, 'remote.url', '??'),
'status_code' => $response->getStatusCode(),
'headers' => $response->getHeaders(false),
'response' => $response->getContent(false),
...$context
]);
}
if (!in_array($response->getStatusCode(), [Status::OK->value, Status::NO_CONTENT->value])) {
$writer(
Level::Error,
"SYSTEM: Request to change '{backend}' '{item.title}' watch progress returned with unexpected '{status_code}' status code.",
[
'status_code' => $response->getStatusCode(),
...$context
]
);
continue;
}
$writer(Level::Notice, "SYSTEM: Updated '{backend}' '{item.title}' watch progress.", [
...$context,
try {
if (ag($options, 'trace')) {
$writer(Level::Debug, "Processing '{backend}: {item.title}' response.", [
'url' => ag($context, 'remote.url', '??'),
'status_code' => $response->getStatusCode(),
'headers' => $response->getHeaders(false),
'response' => $response->getContent(false),
...$context
]);
} catch (Throwable $e) {
}
if (!in_array($response->getStatusCode(), [Status::OK->value, Status::NO_CONTENT->value])) {
$writer(
Level::Error,
"SYSTEM: Exception '{error.kind}' was thrown unhandled during '{backend}' request to change watch progress of {item.type} '{item.title}'. '{error.message}' at '{error.file}:{error.line}'.",
"Request to change '{backend}: {item.title}' watch progress returned with unexpected '{status_code}' status code.",
[
'error' => [
'kind' => $e::class,
'line' => $e->getLine(),
'message' => $e->getMessage(),
'file' => after($e->getFile(), ROOT_PATH),
],
'exception' => [
'file' => $e->getFile(),
'line' => $e->getLine(),
'kind' => get_class($e),
'message' => $e->getMessage(),
'trace' => $e->getTrace(),
],
...$context,
'status_code' => $response->getStatusCode(),
...$context
]
);
continue;
}
}
$end = makeDate();
$writer(Level::Notice, "SYSTEM: Sent '{total}' watch progress requests.", [
'total' => $total,
'time' => [
'start' => $start,
'end' => $end,
'duration' => $end->getTimestamp() - $start->getTimestamp(),
],
]);
} else {
$writer(Level::Notice, 'SYSTEM: No watch progress changes detected.');
$writer(Level::Notice, "Updated '{backend}: {item.title}' watch progress.", [
...$context,
'status_code' => $response->getStatusCode(),
]);
} catch (Throwable $e) {
$writer(
Level::Error,
"Exception '{error.kind}' was thrown unhandled during '{backend}' request to change watch progress of {item.type} '{item.title}'. '{error.message}' at '{error.file}:{error.line}'.",
[
'error' => [
'kind' => $e::class,
'line' => $e->getLine(),
'message' => $e->getMessage(),
'file' => after($e->getFile(), ROOT_PATH),
],
'exception' => [
'file' => $e->getFile(),
'line' => $e->getLine(),
'kind' => get_class($e),
'message' => $e->getMessage(),
'trace' => $e->getTrace(),
],
...$context,
]
);
}
}
return $e;

View File

@@ -0,0 +1,183 @@
<?php
declare(strict_types=1);
namespace App\Listeners;
use App\Libs\Config;
use App\Libs\Container;
use App\Libs\Database\DatabaseInterface as iDB;
use App\Libs\Entity\StateInterface as iState;
use App\Libs\Enums\Http\Status;
use App\libs\Events\DataEvent;
use App\Libs\Options;
use App\Libs\QueueRequests;
use App\Model\Events\EventListener;
use Monolog\Level;
use Psr\Log\LoggerInterface as iLogger;
use Throwable;
#[EventListener(self::NAME)]
final readonly class ProcessPushEvent
{
public const string NAME = 'on_push';
/**
* Class constructor.
*
* @param iLogger $logger The logger object.
*/
public function __construct(private iLogger $logger, private iDB $db, private QueueRequests $queue)
{
set_time_limit(0);
ini_set('memory_limit', '-1');
}
public function __invoke(DataEvent $e): DataEvent
{
$writer = function (Level $level, string $message, array $context = []) use ($e) {
$e->addLog($level->getName() . ': ' . r($message, $context));
$this->logger->log($level, $message, $context);
};
$e->stopPropagation();
if (null === ($item = $this->db->get(Container::get(iState::class)::fromArray($e->getData())))) {
$writer(Level::Error, "Item '{id}' is not found or has been deleted.", [
'id' => ag($e->getData(), 'id', '?')
]);
return $e;
}
$options = $e->getOptions();
$list = [];
$supported = Config::get('supported', []);
foreach ((array)Config::get('servers', []) as $backendName => $backend) {
$type = strtolower(ag($backend, 'type', 'unknown'));
if (true !== (bool)ag($backend, 'export.enabled')) {
$writer(Level::Notice, "Export to '{backend}' is disabled by user.", [
'backend' => $backendName
]);
continue;
}
if (!isset($supported[$type])) {
$writer(Level::Error, "The backend '{backend}' is using invalid type '{type}'.", [
'type' => $type,
'backend' => $backendName,
'condition' => [
'expected' => implode(', ', array_keys($supported)),
'given' => $type,
],
]);
continue;
}
if (null === ($url = ag($backend, 'url')) || false === isValidURL($url)) {
$writer(Level::Error, "The backend '{backend}' URL is invalid.", [
'backend' => $backendName,
'url' => $url ?? 'None',
]);
continue;
}
$backend['name'] = $backendName;
$list[$backendName] = $backend;
}
if (empty($list)) {
$writer(Level::Error, 'There are no backends with export enabled.');
return $e;
}
foreach ($list as $name => &$backend) {
try {
$opts = ag($backend, 'options', []);
if (ag($options, 'ignore-date')) {
$opts[Options::IGNORE_DATE] = true;
}
if (ag($options, 'dry-run')) {
$opts[Options::DRY_RUN] = true;
}
if (ag($options, 'trace')) {
$opts[Options::DEBUG_TRACE] = true;
}
$backend['options'] = $opts;
$backend['class'] = getBackend(name: $name, config: $backend);
$backend['class']->push(entities: [$item->id => $item], queue: $this->queue);
} catch (Throwable $e) {
$writer(
Level::Error,
"Exception '{error.kind}' was thrown unhandled during '{backend}' push events. '{error.message}' at '{error.file}:{error.line}'.",
[
'backend' => $name,
'error' => [
'kind' => $e::class,
'line' => $e->getLine(),
'message' => $e->getMessage(),
'file' => after($e->getFile(), ROOT_PATH),
],
'exception' => [
'file' => $e->getFile(),
'line' => $e->getLine(),
'kind' => get_class($e),
'message' => $e->getMessage(),
'trace' => $e->getTrace(),
],
]
);
}
}
unset($backend);
if (count($this->queue) < 1) {
$writer(Level::Notice, 'SYSTEM: No play state changes detected.');
return $e;
}
foreach ($this->queue->getQueue() as $response) {
$context = ag($response->getInfo('user_data'), 'context', []);
try {
if (Status::OK !== Status::from($response->getStatusCode())) {
$writer(
Level::Error,
"Request to change '{backend}: {item.title}' play state returned with unexpected '{status_code}' status code.",
$context
);
continue;
}
$writer(Level::Notice, "Marked '{backend}: {item.title}' as '{play_state}'.", $context);
} catch (Throwable $e) {
$writer(
Level::Error,
"Exception '{error.kind}' was thrown unhandled during '{backend}' request to change play state of {item.type} '{item.title}'. '{error.message}' at '{error.file}:{error.line}'.",
[
'error' => [
'kind' => $e::class,
'line' => $e->getLine(),
'message' => $e->getMessage(),
'file' => after($e->getFile(), ROOT_PATH),
],
...$context,
'exception' => [
'file' => $e->getFile(),
'line' => $e->getLine(),
'kind' => get_class($e),
'message' => $e->getMessage(),
'trace' => $e->getTrace(),
],
]
);
}
}
return $e;
}
}

View File

@@ -41,7 +41,7 @@ final readonly class ProcessRequestEvent
$lastSync = makeDate($lastSync);
}
$message = r('SYSTEM: Processing [{backend}] [{title}] {tainted} request.', [
$message = r("Processing '{backend}: {title}' {tainted} request.", [
'backend' => $entity->via,
'title' => $entity->getName(),
'event' => ag($entity->getExtra($entity->via), iState::COLUMN_EXTRA_EVENT, '??'),

View File

@@ -49,6 +49,23 @@ final class EventsRepository
return $items[0] ?? null;
}
/**
* Will return the last event by reference.
*
* @param string|int $reference reference id to remove.
* @param array $criteria Filter criteria. By default, it will only remove pending events.
*/
public function removeByReference(string|int $reference, array $criteria = []): bool
{
if (empty($criteria)) {
$criteria[EntityTable::COLUMN_STATUS] = EventStatus::PENDING->value;
}
$criteria[EntityTable::COLUMN_REFERENCE] = $reference;
return $this->_remove($criteria);
}
/**
* @param array $criteria Criteria to search by.
* @param array $cols Columns to select.