diff --git a/FAQ.md b/FAQ.md
index 30693d39..5c56d4fc 100644
--- a/FAQ.md
+++ b/FAQ.md
@@ -402,7 +402,7 @@ command via CLI.
> [!IMPORTANT]
> for environment variables that has `{TASK}` tag, you **MUST** replace it with one
-> of `IMPORT`, `EXPORT`, `PUSH`, `BACKUP`, `PRUNE`, `INDEXES`. To see tasks active settings run
+> of `IMPORT`, `EXPORT`, `BACKUP`, `PRUNE`, `INDEXES`. To see tasks active settings run
```bash
$ docker exec -ti watchstate console system:tasks
diff --git a/NEWS.md b/NEWS.md
index a5231118..3f7cc7f0 100644
--- a/NEWS.md
+++ b/NEWS.md
@@ -1,5 +1,45 @@
# Old Updates
+### 2024-08-10
+
+I have recently added new experimental feature, to play your content directly from the WebUI. This feature is still in
+alpha, and missing a lot of features. But it's a start. Right now it does auto transcode on the fly to play any content in the browser.
+
+The feature requires that you mount your media directories to the `WatchState` container similar to the `File integrity` feature. I have plans to expand
+the feature to support more controls, however, right now it's only support basic subtitles streams and default audio stream or first audio stream.
+
+The transcoder works by converting the media on the fly to `HLS` segments, and the subtitles are selectable via the player ui which are also converted to `vtt` format.
+
+Expects bugs and issues, as the feature is still in alpha. But I would love to hear your feedback. You can play the media by visiting
+the history page of the item you will see red play button on top right corner of the page. If the items has a play button, then you correctly mounted
+the media directories. otherwise, the button be disabled with tooltip of `Media is inaccessible`.
+
+The feature is not meant to replace your backend media player, the purpose of this feature is to quickly check the media without leaving the WebUI.
+
+### 2024-08-01
+
+We recently enabled listening on tls connections via `8443` which can be controlled by `HTTPS_PORT` environment variable.
+Before today, we simply only exposed the port via the `Dockerfile`, but we weren't listening for connections on it.
+
+However, please keep in mind that the certificate is self-signed, and you might get a warning from your browser. You can
+either accept the warning or add the certificate to your trusted certificates. We strongly recommend using a reverse proxy.
+instead of relying on self-signed certificates.
+
+### 2024-07-22
+
+We have recently added a new WebUI feature, `File integrity`, this feature will help you to check if your media backends
+are reporting files that are not available on the disk. This feature is still in alpha, and we are working on improving
+it.
+
+This feature `REQUIRES` that you mount your media directories to the `WatchState` container preferably as readonly. There is plans to add
+a path replacement feature to allow you change the pathing, but it's not implemented yet.
+
+This feature will work on both local and remote cloud storages provided they are mounted into the container. We also may recommend not to
+use this feature depending on how your cloud storage provider treats file stat calls. As it might lead to unnecessary money spending. and of course
+it will be slower.
+
+For more information about how we cache the stat calls, please refer to the [FAQ](FAQ.md#How-does-the-file-integrity-feature-works).
+
### 2024-07-06
Recently we have introduced a new feature that allows you to use Jellyfin and Emby OAuth access tokens for syncing
diff --git a/README.md b/README.md
index 66a8fa34..78496270 100644
--- a/README.md
+++ b/README.md
@@ -9,6 +9,18 @@ out of the box, this tool support `Jellyfin`, `Plex` and `Emby` media servers.
## Updates
+### 2024-08-19
+
+We have migrated the `state:push` task into the new events system, as such the old task `state:push` is now gone.
+To enable the new event handler for push events, use the new environment variable `WS_PUSH_ENABLED` and set it to `true`.
+Right now, it's disabled by default. However, for people who had the old task enabled, it will reuse that setting.
+
+Keep in mind, the new event handler is more efficient and will only push data when there is a change in the play state. And it's much faster
+than the old task. This event handler will push data within a minute of the change.
+
+PS: Please enable the task by setting its new environment variable `WS_PUSH_ENABLED` to `true`. The old `WS_CRON_PUSH` is now gone.
+and will be removed in the future releases.
+
### 2024-08-18
We have started migrating the old events system to a new one, so far we have migrated the `progress` and `requests` to it. As such,
@@ -17,46 +29,6 @@ environment variable `WS_SYNC_PROGRESS` which you can set to `true` to enable th
We will continue to migrate the rest of the events to the new system, and we will keep you updated.
-### 2024-08-10
-
-I have recently added new experimental feature, to play your content directly from the WebUI. This feature is still in
-alpha, and missing a lot of features. But it's a start. Right now it does auto transcode on the fly to play any content in the browser.
-
-The feature requires that you mount your media directories to the `WatchState` container similar to the `File integrity` feature. I have plans to expand
-the feature to support more controls, however, right now it's only support basic subtitles streams and default audio stream or first audio stream.
-
-The transcoder works by converting the media on the fly to `HLS` segments, and the subtitles are selectable via the player ui which are also converted to `vtt` format.
-
-Expects bugs and issues, as the feature is still in alpha. But I would love to hear your feedback. You can play the media by visiting
-the history page of the item you will see red play button on top right corner of the page. If the items has a play button, then you correctly mounted
-the media directories. otherwise, the button be disabled with tooltip of `Media is inaccessible`.
-
-The feature is not meant to replace your backend media player, the purpose of this feature is to quickly check the media without leaving the WebUI.
-
-### 2024-08-01
-
-We recently enabled listening on tls connections via `8443` which can be controlled by `HTTPS_PORT` environment variable.
-Before today, we simply only exposed the port via the `Dockerfile`, but we weren't listening for connections on it.
-
-However, please keep in mind that the certificate is self-signed, and you might get a warning from your browser. You can
-either accept the warning or add the certificate to your trusted certificates. We strongly recommend using a reverse proxy.
-instead of relying on self-signed certificates.
-
-### 2024-07-22
-
-We have recently added a new WebUI feature, `File integrity`, this feature will help you to check if your media backends
-are reporting files that are not available on the disk. This feature is still in alpha, and we are working on improving
-it.
-
-This feature `REQUIRES` that you mount your media directories to the `WatchState` container preferably as readonly. There is plans to add
-a path replacement feature to allow you change the pathing, but it's not implemented yet.
-
-This feature will work on both local and remote cloud storages provided they are mounted into the container. We also may recommend not to
-use this feature depending on how your cloud storage provider treats file stat calls. As it might lead to unnecessary money spending. and of course
-it will be slower.
-
-For more information about how we cache the stat calls, please refer to the [FAQ](FAQ.md#How-does-the-file-integrity-feature-works).
-
Refer to [NEWS](NEWS.md) for old updates.
# Features
diff --git a/config/config.php b/config/config.php
index 0ef1eb14..08d3a273 100644
--- a/config/config.php
+++ b/config/config.php
@@ -9,7 +9,6 @@ use App\Commands\Events\DispatchCommand;
use App\Commands\State\BackupCommand;
use App\Commands\State\ExportCommand;
use App\Commands\State\ImportCommand;
-use App\Commands\State\PushCommand;
use App\Commands\System\IndexCommand;
use App\Commands\System\PruneCommand;
use App\Libs\Mappers\Import\MemoryMapper;
@@ -75,7 +74,10 @@ return (function () {
'header' => (string)env('WS_TRUST_HEADER', 'X-Forwarded-For'),
],
'sync' => [
- 'progress' => (bool)env('WS_SYNC_PROGRESS', false),
+ 'progress' => (bool)env('WS_SYNC_PROGRESS', (bool)env('WS_CRON_PROGRESS', false)),
+ ],
+ 'push' => [
+ 'enabled' => (bool)env('WS_PUSH_ENABLED', (bool)env('WS_CRON_PUSH', false)),
],
];
@@ -266,14 +268,6 @@ return (function () {
'timer' => $checkTaskTimer((string)env('WS_CRON_EXPORT_AT', '30 */1 * * *'), '30 */1 * * *'),
'args' => env('WS_CRON_EXPORT_ARGS', '-v'),
],
- PushCommand::TASK_NAME => [
- 'command' => PushCommand::ROUTE,
- 'name' => PushCommand::TASK_NAME,
- 'info' => 'Send queued events to backends.',
- 'enabled' => (bool)env('WS_CRON_PUSH', true),
- 'timer' => $checkTaskTimer((string)env('WS_CRON_PUSH_AT', '*/10 * * * *'), '*/10 * * * *'),
- 'args' => env('WS_CRON_PUSH_ARGS', '-v'),
- ],
BackupCommand::TASK_NAME => [
'command' => BackupCommand::ROUTE,
'name' => BackupCommand::TASK_NAME,
diff --git a/config/env.spec.php b/config/env.spec.php
index e7b0fc8f..ea0d5381 100644
--- a/config/env.spec.php
+++ b/config/env.spec.php
@@ -166,6 +166,11 @@ return (function () {
'description' => 'Enable watch progress sync.',
'type' => 'bool',
],
+ [
+ 'key' => 'WS_PUSH_ENABLED',
+ 'description' => 'Enable Push play state to backends. This feature depends on webhooks being enabled.',
+ 'type' => 'bool',
+ ],
];
$validateCronExpression = function (string $value): string {
@@ -191,7 +196,7 @@ return (function () {
};
// -- Do not forget to update the tasks list if you add a new task.
- $tasks = ['import', 'export', 'push', 'backup', 'prune', 'indexes'];
+ $tasks = ['import', 'export', 'backup', 'prune', 'indexes'];
$task_env = [
[
'key' => 'WS_CRON_{TASK}',
diff --git a/frontend/layouts/default.vue b/frontend/layouts/default.vue
index a028be0d..24e3f550 100644
--- a/frontend/layouts/default.vue
+++ b/frontend/layouts/default.vue
@@ -83,11 +83,6 @@
Events
- changeRoute(e)">
-
- Old Events
-
-
changeRoute(e)">
Ignore List
diff --git a/frontend/pages/events/index.vue b/frontend/pages/events/index.vue
index 03b42aa6..d8798430 100644
--- a/frontend/pages/events/index.vue
+++ b/frontend/pages/events/index.vue
@@ -39,13 +39,14 @@
-
- Event {{ item.event }} was created at
+ Event {{ item.event }}
+
+ with reference {{ item.reference }}
+
+ was created
{{ moment(item.created_at).fromNow() }}
- , and last updated at
+ , and last updated
not started
{{ moment(item.updated_at).fromNow() }}
- .
+ ,
with status of {{ item.status }}:
{{ item.status_name }}.
-
- This page will show events that are queued to be handled or sent to the backends.
- This endpoint is being deprecated and will be removed in the future, We are migrating to the new
- events endpoint.
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- State events
-
-
- Events that are changing the play state. Consumed by state:push task.
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- {{ formatDuration(i.item.progress) }}
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- Watch progress events
-
-
- Events that are changing the play progress. Consumed by state:progress task.
-
- Events marked with is Tainted: Yes, are interesting but are too chaotic to be useful be used
- to
- determine play state. However, we do use them to update local metadata & play progress.
-
-
- Events marked with is Tainted: No, are events that are used to determine play state.
-
-
- If you are fast enough, you might be able to see the event before it is consumed by the backend. which
- allow
- you to delete it from the queue if you desire.
-
-
-
-
-
-
-
-
-
diff --git a/frontend/utils/events/helpers.js b/frontend/utils/events/helpers.js
index 0cca5faf..570bb3e0 100644
--- a/frontend/utils/events/helpers.js
+++ b/frontend/utils/events/helpers.js
@@ -3,7 +3,7 @@ const makeName = id => id.split('-').slice(0)[0]
const getStatusClass = status => {
switch (status) {
case 0:
- return 'is-light'
+ return 'is-light has-text-dark'
case 1:
return 'is-warning'
case 2:
@@ -13,7 +13,7 @@ const getStatusClass = status => {
case 4:
return 'is-danger is-light'
default:
- return 'is-light'
+ return 'is-light has-text-dark'
}
}
diff --git a/src/API/System/OldEvents.php b/src/API/System/OldEvents.php
deleted file mode 100644
index f971e09c..00000000
--- a/src/API/System/OldEvents.php
+++ /dev/null
@@ -1,119 +0,0 @@
- [],
- 'progress' => [],
- 'requests' => [],
- ];
-
- foreach ($this->cache->get('queue', []) as $key => $item) {
- if (null === ($entity = $this->db->get(Container::get(iState::class)::fromArray($item)))) {
- continue;
- }
- $response['queue'][] = ['key' => $key, 'item' => $this->formatEntity($entity)];
- }
-
- foreach ($this->cache->get('progress', []) as $key => $item) {
- if (null !== ($entity = $this->db->get($item))) {
- $item->id = $entity->id;
- }
-
- $response['progress'][] = ['key' => $key, 'item' => $this->formatEntity($item)];
- }
-
- foreach ($this->cache->get('requests', []) as $key => $request) {
- if (null === ($item = ag($request, 'entity')) || false === ($item instanceof iState)) {
- continue;
- }
-
- if (null !== ($entity = $this->db->get($item))) {
- $item->id = $entity->id;
- }
-
- $response['requests'][] = ['key' => $key, 'item' => $this->formatEntity($item)];
- }
-
- return api_response(Status::OK, $response);
- }
-
- /**
- * @throws InvalidArgumentException
- */
- #[Delete(self::URL . '/{id}[/]', name: 'system.events.delete')]
- public function deleteEvent(iRequest $request, array $args = []): iResponse
- {
- $params = DataUtil::fromRequest($request, true);
-
- if (null === ($id = $params->get('id', ag($args, 'id')))) {
- return api_error('Invalid id.', Status::BAD_REQUEST);
- }
-
- $type = $params->get('type', 'queue');
-
- if (false === in_array($type, self::TYPES, true)) {
- return api_error(r("Invalid type '{type}'. Only '{types}' are supported.", [
- 'type' => $type,
- 'types' => implode(", ", self::TYPES),
- ]), Status::BAD_REQUEST);
- }
-
- $items = $this->cache->get($type, []);
-
- if (empty($items)) {
- return api_error(r('{type} is empty.', ['type' => $type]), Status::NOT_FOUND);
- }
-
- if (false === array_key_exists($id, $items)) {
- return api_error(r("Record id '{id}' doesn't exists. for '{type}' list.", [
- 'id' => $id,
- 'type' => $type,
- ]), Status::NOT_FOUND);
- }
-
- if ('queue' === $type) {
- $item = Container::get(iState::class)::fromArray(['id' => $id]);
- queuePush($item, remove: true);
- } else {
- unset($items[$id]);
- $this->cache->set($type, $items, new DateInterval('P3D'));
- }
-
- return api_response(Status::OK, ['id' => $id, 'type' => $type, 'status' => 'deleted']);
- }
-}
diff --git a/src/Commands/Events/DispatchCommand.php b/src/Commands/Events/DispatchCommand.php
index 9d136f1e..a43eb7bc 100644
--- a/src/Commands/Events/DispatchCommand.php
+++ b/src/Commands/Events/DispatchCommand.php
@@ -22,7 +22,7 @@ use Throwable;
#[Cli(command: self::ROUTE)]
final class DispatchCommand extends Command
{
- public const string TASK_NAME = 'Dispatch';
+ public const string TASK_NAME = 'dispatch';
public const string ROUTE = 'events:dispatch';
diff --git a/src/Commands/State/PushCommand.php b/src/Commands/State/PushCommand.php
deleted file mode 100644
index 44de4c1d..00000000
--- a/src/Commands/State/PushCommand.php
+++ /dev/null
@@ -1,272 +0,0 @@
-setName(self::ROUTE)
- ->setDescription('Push webhook queued events.')
- ->addOption('keep', 'k', InputOption::VALUE_NONE, 'Do not expunge queue after run is complete.')
- ->addOption('dry-run', null, InputOption::VALUE_NONE, 'Do not commit changes to backends.')
- ->addOption('ignore-date', null, InputOption::VALUE_NONE, 'Ignore date comparison.')
- ->setHelp(
- r(
- <<webhook updated play state to export enabled backends.
- You should not run this manually and instead rely on scheduled task to run this command.
-
- This command require the metadata to be already saved in database.
- If no metadata available for a backend, then the item will be ignored for that backend.
-
- If the item was ignored during {route} run, it will be picked up later by next {export_route} run.
-
- HELP,
- [
- 'cmd' => trim(commandContext()),
- 'route' => self::ROUTE,
- 'export_route' => ExportCommand::ROUTE,
- ]
- )
- );
- }
-
- /**
- * Make sure the command is not running in parallel.
- *
- * @param InputInterface $input The input interface.
- * @param OutputInterface $output The output interface.
- *
- * @return int Returns the process result status code.
- * @throws \Psr\SimpleCache\InvalidArgumentException if the cache key is not a legal value.
- */
- protected function runCommand(InputInterface $input, OutputInterface $output): int
- {
- return $this->single(fn(): int => $this->process($input), $output);
- }
-
- /**
- * Process the queue items and send change play state requests to the supported backends.
- *
- * @param InputInterface $input The input interface.
- *
- * @return int Returns the process result status code.
- * @throws \Psr\SimpleCache\InvalidArgumentException if the cache key is not a legal value.
- */
- protected function process(InputInterface $input): int
- {
- if (!$this->cache->has('queue')) {
- $this->logger->info('No items in the queue.');
- return self::SUCCESS;
- }
-
- $entities = $items = [];
-
- foreach ($this->cache->get('queue', []) as $item) {
- $items[] = Container::get(iState::class)::fromArray($item);
- }
-
- if (!empty($items)) {
- foreach ($this->db->find(...$items) as $item) {
- $entities[$item->id] = $item;
- }
- }
-
- $items = null;
-
- if (empty($entities)) {
- $this->cache->delete('queue');
- $this->logger->debug('No items in the queue.');
- return self::SUCCESS;
- }
-
- $list = [];
- $supported = Config::get('supported', []);
-
- foreach ((array)Config::get('servers', []) as $backendName => $backend) {
- $type = strtolower(ag($backend, 'type', 'unknown'));
-
- if (true !== (bool)ag($backend, 'export.enabled')) {
- $this->logger->info('SYSTEM: Export to [{backend}] is disabled by user.', [
- 'backend' => $backendName,
- ]);
-
- continue;
- }
-
- if (!isset($supported[$type])) {
- $this->logger->error('SYSTEM: [{backend}] Invalid type.', [
- 'backend' => $backendName,
- 'condition' => [
- 'expected' => implode(', ', array_keys($supported)),
- 'given' => $type,
- ],
- ]);
- continue;
- }
-
- if (null === ($url = ag($backend, 'url')) || false === isValidURL($url)) {
- $this->logger->error('SYSTEM: [{backend}] Invalid url.', [
- 'backend' => $backendName,
- 'url' => $url ?? 'None',
- ]);
- continue;
- }
-
- $backend['name'] = $backendName;
- $list[$backendName] = $backend;
- }
-
- if (empty($list)) {
- $this->logger->warning('SYSTEM: There are no backends with export enabled.');
- return self::FAILURE;
- }
-
- foreach ($list as $name => &$backend) {
- $opts = ag($backend, 'options', []);
-
- if ($input->getOption('ignore-date')) {
- $opts[Options::IGNORE_DATE] = true;
- }
-
- if ($input->getOption('dry-run')) {
- $opts[Options::DRY_RUN] = true;
- }
-
- if ($input->getOption('trace')) {
- $opts[Options::DEBUG_TRACE] = true;
- }
-
- $backend['options'] = $opts;
- $backend['class'] = $this->getBackend(name: $name, config: $backend);
-
- $backend['class']->push(entities: $entities, queue: $this->queue);
- }
-
- unset($backend);
-
- $total = count($this->queue);
-
- if ($total >= 1) {
- $start = makeDate();
- $this->logger->notice('SYSTEM: Sending [{total}] change play state requests.', [
- 'total' => $total,
- 'time' => [
- 'start' => $start,
- ],
- ]);
-
- foreach ($this->queue->getQueue() as $response) {
- $context = ag($response->getInfo('user_data'), 'context', []);
-
- try {
- if (200 !== $response->getStatusCode()) {
- $this->logger->error(
- 'SYSTEM: Request to change [{backend}] [{item.title}] play state returned with unexpected [{status_code}] status code.',
- $context
- );
- continue;
- }
-
- $this->logger->notice('SYSTEM: Marked [{backend}] [{item.title}] as [{play_state}].', $context);
- } catch (Throwable $e) {
- $this->logger->error(
- message: 'SYSTEM: Exception [{error.kind}] was thrown unhandled during [{backend}] request to change play state of {item.type} [{item.title}]. Error [{error.message} @ {error.file}:{error.line}].',
- context: [
- 'error' => [
- 'kind' => $e::class,
- 'line' => $e->getLine(),
- 'message' => $e->getMessage(),
- 'file' => after($e->getFile(), ROOT_PATH),
- ],
- ...$context,
- 'exception' => [
- 'file' => $e->getFile(),
- 'line' => $e->getLine(),
- 'kind' => get_class($e),
- 'message' => $e->getMessage(),
- 'trace' => $e->getTrace(),
- ],
- ]
- );
- }
- }
-
- $end = makeDate();
-
- $this->logger->notice('SYSTEM: Sent [{total}] change play state requests.', [
- 'total' => $total,
- 'time' => [
- 'start' => $start,
- 'end' => $end,
- 'duration' => $end->getTimestamp() - $start->getTimestamp(),
- ],
- ]);
-
- $this->logger->notice('Using WatchState Version - \'{version}\'.', ['version' => getAppVersion()]);
- } else {
- $this->logger->notice('SYSTEM: No play state changes detected.');
- }
-
- if (false === $input->getOption('keep') && false === $input->getOption('dry-run')) {
- $this->cache->delete('queue');
- }
-
- return self::SUCCESS;
- }
-}
diff --git a/src/Libs/Database/PDO/PDOAdapter.php b/src/Libs/Database/PDO/PDOAdapter.php
index ee000d55..8b8a982f 100644
--- a/src/Libs/Database/PDO/PDOAdapter.php
+++ b/src/Libs/Database/PDO/PDOAdapter.php
@@ -609,15 +609,24 @@ final class PDOAdapter implements iDB
/**
* @inheritdoc
+ * @noinspection SqlWithoutWhere
*/
public function reset(): bool
{
- $this->pdo->exec('DELETE FROM `state` WHERE `id` > 0');
+ $this->pdo->beginTransaction();
- if ('sqlite' === $this->pdo->getAttribute(PDO::ATTR_DRIVER_NAME)) {
- $this->pdo->exec('DELETE FROM sqlite_sequence WHERE name = "state"');
+ $tables = $this->pdo->query(
+ 'SELECT name FROM sqlite_master WHERE "type" = "table" AND "name" NOT LIKE "sqlite_%"'
+ );
+
+ foreach ($tables->fetchAll(PDO::FETCH_COLUMN) as $table) {
+ $this->pdo->exec('DELETE FROM "' . $table . '"');
+ $this->pdo->exec('DELETE FROM sqlite_sequence WHERE "name" = "' . $table . '"');
}
+ $this->pdo->commit();
+ $this->pdo->exec('VACUUM');
+
return true;
}
diff --git a/src/Libs/helpers.php b/src/Libs/helpers.php
index f9de11cc..b5805ce0 100644
--- a/src/Libs/helpers.php
+++ b/src/Libs/helpers.php
@@ -27,6 +27,7 @@ use App\Libs\Response;
use App\Libs\Router;
use App\Libs\Stream;
use App\Libs\Uri;
+use App\Listeners\ProcessPushEvent;
use App\Model\Events\Event as EventInfo;
use App\Model\Events\EventListener;
use App\Model\Events\EventsRepository;
@@ -536,47 +537,48 @@ if (!function_exists('httpClientChunks')) {
if (!function_exists('queuePush')) {
/**
- * Pushes the entity to the queue.
- *
- * This method adds the entity to the queue for further processing.
+ * Add push event to the events queue.
*
* @param iState $entity The entity to push to the queue.
- * @param bool $remove (optional) Whether to remove the entity from the queue if it already exists (default is false).
+ * @param bool $remove Whether to remove the event from the queue if it's in pending state. Default is false.
*/
function queuePush(iState $entity, bool $remove = false): void
{
- if (!$remove && !$entity->hasGuids() && !$entity->hasRelativeGuid()) {
+ $logger = Container::get(iLogger::class);
+
+ if (false === (bool)Config::get('push.enabled', false)) {
+ $logger->debug("Push is disabled. Unable to push '{via}: {entity}'.", [
+ 'via' => $entity->via,
+ 'entity' => $entity->getName()
+ ]);
return;
}
- try {
- $cache = Container::get(iCache::class);
-
- $list = $cache->get('queue', []);
-
- if (true === $remove && array_key_exists($entity->id, $list)) {
- unset($list[$entity->id]);
- } else {
- $list[$entity->id] = ['id' => $entity->id];
- }
-
- $cache->set('queue', $list, new DateInterval('P7D'));
- } catch (\Psr\SimpleCache\InvalidArgumentException $e) {
- Container::get(iLogger::class)->error(
- message: 'Exception [{error.kind}] was thrown unhandled during saving [{backend} - {title}} into queue. Error [{error.message} @ {error.file}:{error.line}].',
- context: [
- 'backend' => $entity->via,
- 'title' => $entity->getName(),
- 'error' => [
- 'kind' => $e::class,
- 'line' => $e->getLine(),
- 'message' => $e->getMessage(),
- 'file' => after($e->getFile(), ROOT_PATH),
- ],
- 'trace' => $e->getTrace(),
- ],
- );
+ if (!$entity->id) {
+ $logger->error("Unable to push event '{via}: {entity}'. It has no local id yet.", [
+ 'via' => $entity->via,
+ 'entity' => $entity->getName()
+ ]);
+ return;
}
+
+ if (true === $remove) {
+ Container::get(EventsRepository::class)->removeByReference(r('push://{id}', ['id' => $entity->id]));
+ return;
+ }
+
+ if (!$entity->hasGuids() && !$entity->hasRelativeGuid()) {
+ $logger->error("Unable to push '{id}' event '{via}: {entity}'. It has no GUIDs.", [
+ 'id' => $entity->id,
+ 'via' => $entity->via,
+ 'entity' => $entity->getName()
+ ]);
+ return;
+ }
+
+ queueEvent(ProcessPushEvent::NAME, [iState::COLUMN_ID => $entity->id], [
+ EventsTable::COLUMN_REFERENCE => r('push://{id}', ['id' => $entity->id]),
+ ]);
}
}
diff --git a/src/Listeners/ProcessProgressEvent.php b/src/Listeners/ProcessProgressEvent.php
index 88007efa..d3c2e7e2 100644
--- a/src/Listeners/ProcessProgressEvent.php
+++ b/src/Listeners/ProcessProgressEvent.php
@@ -72,14 +72,15 @@ final readonly class ProcessProgressEvent
$type = strtolower(ag($backend, 'type', 'unknown'));
if (true !== (bool)ag($backend, 'export.enabled')) {
- $writer(Level::Notice, "SYSTEM: Export to '{backend}' is disabled by user.", [
+ $writer(Level::Notice, "Export to '{backend}' is disabled by user.", [
'backend' => $backendName
]);
continue;
}
if (!isset($supported[$type])) {
- $writer(Level::Error, "SYSTEM: '{backend}' Invalid type.", [
+ $writer(Level::Error, "The backend '{backend}' is using invalid type '{type}'.", [
+ 'type' => $type,
'backend' => $backendName,
'condition' => [
'expected' => implode(', ', array_keys($supported)),
@@ -90,7 +91,7 @@ final readonly class ProcessProgressEvent
}
if (null === ($url = ag($backend, 'url')) || false === isValidURL($url)) {
- $writer(Level::Error, "SYSTEM: '{backend}' Invalid url.", [
+ $writer(Level::Error, "The backend '{backend}' URL is invalid.", [
'backend' => $backendName,
'url' => $url ?? 'None',
]);
@@ -102,7 +103,7 @@ final readonly class ProcessProgressEvent
}
if (empty($list)) {
- $writer(Level::Info, 'SYSTEM: There are no backends with export enabled.');
+ $writer(Level::Error, 'There are no backends with export enabled.');
return $e;
}
@@ -128,7 +129,7 @@ final readonly class ProcessProgressEvent
} catch (UnexpectedVersionException|NotImplementedException $e) {
$writer(
Level::Notice,
- "SYSTEM: This feature is not available for '{backend}'. '{error.message}' at '{error.file}:{error.line}'.",
+ "This feature is not available for '{backend}'. '{error.message}' at '{error.file}:{error.line}'.",
[
'backend' => $name,
'error' => [
@@ -149,7 +150,7 @@ final readonly class ProcessProgressEvent
} catch (Throwable $e) {
$writer(
Level::Error,
- "SYSTEM: Exception '{error.kind}' was thrown unhandled during '{backend}' request to sync progress. '{error.message}' at '{error.file}:{error.line}'.",
+ "Exception '{error.kind}' was thrown unhandled during '{backend}' request to sync progress. '{error.message}' at '{error.file}:{error.line}'.",
[
'backend' => $name,
'error' => [
@@ -172,83 +173,73 @@ final readonly class ProcessProgressEvent
unset($backend);
- $total = count($this->queue);
+ if (count($this->queue) < 1) {
+ $writer(Level::Notice, "Backend handlers didn't queue items to be updated.");
+ return $e;
+ }
- if ($total >= 1) {
- $start = makeDate();
+ $progress = formatDuration($item->getPlayProgress());
- $writer(Level::Notice, "SYSTEM: Sending '{total}' progress update requests.", [
- 'total' => $total,
- 'time' => [
- 'start' => $start,
- ],
- ]);
+ $writer(Level::Notice, "Processing '{id}' - '{via}: {title}' watch progress '{progress}' event.", [
+ 'id' => $item->id,
+ 'via' => $item->via,
+ 'title' => $item->getName(),
+ 'progress' => $progress,
+ ]);
- foreach ($this->queue->getQueue() as $response) {
- $context = ag($response->getInfo('user_data'), 'context', []);
+ foreach ($this->queue->getQueue() as $response) {
+ $context = ag($response->getInfo('user_data'), 'context', []);
- try {
- if (ag($options, 'trace')) {
- $writer(Level::Debug, "Processing '{backend}' - '{item.title}' response.", [
- 'url' => ag($context, 'remote.url', '??'),
- 'status_code' => $response->getStatusCode(),
- 'headers' => $response->getHeaders(false),
- 'response' => $response->getContent(false),
- ...$context
- ]);
- }
-
- if (!in_array($response->getStatusCode(), [Status::OK->value, Status::NO_CONTENT->value])) {
- $writer(
- Level::Error,
- "SYSTEM: Request to change '{backend}' '{item.title}' watch progress returned with unexpected '{status_code}' status code.",
- [
- 'status_code' => $response->getStatusCode(),
- ...$context
- ]
- );
- continue;
- }
-
- $writer(Level::Notice, "SYSTEM: Updated '{backend}' '{item.title}' watch progress.", [
- ...$context,
+ try {
+ if (ag($options, 'trace')) {
+ $writer(Level::Debug, "Processing '{backend}: {item.title}' response.", [
+ 'url' => ag($context, 'remote.url', '??'),
'status_code' => $response->getStatusCode(),
+ 'headers' => $response->getHeaders(false),
+ 'response' => $response->getContent(false),
+ ...$context
]);
- } catch (Throwable $e) {
+ }
+
+ if (!in_array($response->getStatusCode(), [Status::OK->value, Status::NO_CONTENT->value])) {
$writer(
Level::Error,
- "SYSTEM: Exception '{error.kind}' was thrown unhandled during '{backend}' request to change watch progress of {item.type} '{item.title}'. '{error.message}' at '{error.file}:{error.line}'.",
+ "Request to change '{backend}: {item.title}' watch progress returned with unexpected '{status_code}' status code.",
[
- 'error' => [
- 'kind' => $e::class,
- 'line' => $e->getLine(),
- 'message' => $e->getMessage(),
- 'file' => after($e->getFile(), ROOT_PATH),
- ],
- 'exception' => [
- 'file' => $e->getFile(),
- 'line' => $e->getLine(),
- 'kind' => get_class($e),
- 'message' => $e->getMessage(),
- 'trace' => $e->getTrace(),
- ],
- ...$context,
+ 'status_code' => $response->getStatusCode(),
+ ...$context
]
);
+ continue;
}
- }
- $end = makeDate();
- $writer(Level::Notice, "SYSTEM: Sent '{total}' watch progress requests.", [
- 'total' => $total,
- 'time' => [
- 'start' => $start,
- 'end' => $end,
- 'duration' => $end->getTimestamp() - $start->getTimestamp(),
- ],
- ]);
- } else {
- $writer(Level::Notice, 'SYSTEM: No watch progress changes detected.');
+ $writer(Level::Notice, "Updated '{backend}: {item.title}' watch progress to '{progress}'.", [
+ ...$context,
+ 'progress' => $progress,
+ 'status_code' => $response->getStatusCode(),
+ ]);
+ } catch (Throwable $e) {
+ $writer(
+ Level::Error,
+ "Exception '{error.kind}' was thrown unhandled during '{backend}' request to change watch progress of {item.type} '{item.title}'. '{error.message}' at '{error.file}:{error.line}'.",
+ [
+ 'error' => [
+ 'kind' => $e::class,
+ 'line' => $e->getLine(),
+ 'message' => $e->getMessage(),
+ 'file' => after($e->getFile(), ROOT_PATH),
+ ],
+ 'exception' => [
+ 'file' => $e->getFile(),
+ 'line' => $e->getLine(),
+ 'kind' => get_class($e),
+ 'message' => $e->getMessage(),
+ 'trace' => $e->getTrace(),
+ ],
+ ...$context,
+ ]
+ );
+ }
}
return $e;
diff --git a/src/Listeners/ProcessPushEvent.php b/src/Listeners/ProcessPushEvent.php
new file mode 100644
index 00000000..2bbf5fc6
--- /dev/null
+++ b/src/Listeners/ProcessPushEvent.php
@@ -0,0 +1,190 @@
+addLog($level->getName() . ': ' . r($message, $context));
+ $this->logger->log($level, $message, $context);
+ };
+
+ $e->stopPropagation();
+
+ if (null === ($item = $this->db->get(Container::get(iState::class)::fromArray($e->getData())))) {
+ $writer(Level::Error, "Item '{id}' is not found or has been deleted.", [
+ 'id' => ag($e->getData(), 'id', '?')
+ ]);
+ return $e;
+ }
+
+ $options = $e->getOptions();
+ $list = [];
+ $supported = Config::get('supported', []);
+
+ foreach ((array)Config::get('servers', []) as $backendName => $backend) {
+ $type = strtolower(ag($backend, 'type', 'unknown'));
+
+ if (true !== (bool)ag($backend, 'export.enabled')) {
+ $writer(Level::Notice, "Export to '{backend}' is disabled by user.", [
+ 'backend' => $backendName
+ ]);
+ continue;
+ }
+
+ if (!isset($supported[$type])) {
+ $writer(Level::Error, "The backend '{backend}' is using invalid type '{type}'.", [
+ 'type' => $type,
+ 'backend' => $backendName,
+ 'condition' => [
+ 'expected' => implode(', ', array_keys($supported)),
+ 'given' => $type,
+ ],
+ ]);
+ continue;
+ }
+
+ if (null === ($url = ag($backend, 'url')) || false === isValidURL($url)) {
+ $writer(Level::Error, "The backend '{backend}' URL is invalid.", [
+ 'backend' => $backendName,
+ 'url' => $url ?? 'None',
+ ]);
+ continue;
+ }
+
+ $backend['name'] = $backendName;
+ $list[$backendName] = $backend;
+ }
+
+ if (empty($list)) {
+ $writer(Level::Error, 'There are no backends with export enabled.');
+ return $e;
+ }
+
+ foreach ($list as $name => &$backend) {
+ try {
+ $opts = ag($backend, 'options', []);
+
+ if (ag($options, 'ignore-date')) {
+ $opts[Options::IGNORE_DATE] = true;
+ }
+
+ if (ag($options, 'dry-run')) {
+ $opts[Options::DRY_RUN] = true;
+ }
+
+ if (ag($options, 'trace')) {
+ $opts[Options::DEBUG_TRACE] = true;
+ }
+
+ $backend['options'] = $opts;
+ $backend['class'] = getBackend(name: $name, config: $backend);
+ $backend['class']->push(entities: [$item->id => $item], queue: $this->queue);
+ } catch (Throwable $e) {
+ $writer(
+ Level::Error,
+ "Exception '{error.kind}' was thrown unhandled during '{backend}' push events. '{error.message}' at '{error.file}:{error.line}'.",
+ [
+ 'backend' => $name,
+ 'error' => [
+ 'kind' => $e::class,
+ 'line' => $e->getLine(),
+ 'message' => $e->getMessage(),
+ 'file' => after($e->getFile(), ROOT_PATH),
+ ],
+ 'exception' => [
+ 'file' => $e->getFile(),
+ 'line' => $e->getLine(),
+ 'kind' => get_class($e),
+ 'message' => $e->getMessage(),
+ 'trace' => $e->getTrace(),
+ ],
+ ]
+ );
+ }
+ }
+ unset($backend);
+
+ if (count($this->queue) < 1) {
+ $writer(Level::Notice, 'SYSTEM: No play state changes detected.');
+ return $e;
+ }
+
+ $writer(Level::Notice, "Processing '{id}' - '{via}: {title}' '{state}' push event.", [
+ 'id' => $item->id,
+ 'via' => $item->via,
+ 'title' => $item->getName(),
+ 'state' => $item->isWatched() ? 'played' : 'unplayed',
+ ]);
+
+ foreach ($this->queue->getQueue() as $response) {
+ $context = ag($response->getInfo('user_data'), 'context', []);
+
+ try {
+ if (Status::OK !== Status::from($response->getStatusCode())) {
+ $writer(
+ Level::Error,
+ "Request to change '{backend}: {item.title}' play state returned with unexpected '{status_code}' status code.",
+ $context
+ );
+ continue;
+ }
+
+ $writer(Level::Notice, "Updated '{backend}: {item.title}' watch state to '{play_state}'.", $context);
+ } catch (Throwable $e) {
+ $writer(
+ Level::Error,
+ "Exception '{error.kind}' was thrown unhandled during '{backend}' request to change play state of {item.type} '{item.title}'. '{error.message}' at '{error.file}:{error.line}'.",
+ [
+ 'error' => [
+ 'kind' => $e::class,
+ 'line' => $e->getLine(),
+ 'message' => $e->getMessage(),
+ 'file' => after($e->getFile(), ROOT_PATH),
+ ],
+ ...$context,
+ 'exception' => [
+ 'file' => $e->getFile(),
+ 'line' => $e->getLine(),
+ 'kind' => get_class($e),
+ 'message' => $e->getMessage(),
+ 'trace' => $e->getTrace(),
+ ],
+ ]
+ );
+ }
+ }
+ return $e;
+ }
+}
diff --git a/src/Listeners/ProcessRequestEvent.php b/src/Listeners/ProcessRequestEvent.php
index 95f4b9bc..52ea0317 100644
--- a/src/Listeners/ProcessRequestEvent.php
+++ b/src/Listeners/ProcessRequestEvent.php
@@ -41,7 +41,7 @@ final readonly class ProcessRequestEvent
$lastSync = makeDate($lastSync);
}
- $message = r('SYSTEM: Processing [{backend}] [{title}] {tainted} request.', [
+ $message = r("Processing '{backend}: {title}' {tainted} request.", [
'backend' => $entity->via,
'title' => $entity->getName(),
'event' => ag($entity->getExtra($entity->via), iState::COLUMN_EXTRA_EVENT, '??'),
diff --git a/src/Model/Events/EventsRepository.php b/src/Model/Events/EventsRepository.php
index 7d767185..b61d9b5e 100644
--- a/src/Model/Events/EventsRepository.php
+++ b/src/Model/Events/EventsRepository.php
@@ -49,6 +49,23 @@ final class EventsRepository
return $items[0] ?? null;
}
+ /**
+ * Will return the last event by reference.
+ *
+ * @param string|int $reference reference id to remove.
+ * @param array $criteria Filter criteria. By default, it will only remove pending events.
+ */
+ public function removeByReference(string|int $reference, array $criteria = []): bool
+ {
+ if (empty($criteria)) {
+ $criteria[EntityTable::COLUMN_STATUS] = EventStatus::PENDING->value;
+ }
+
+ $criteria[EntityTable::COLUMN_REFERENCE] = $reference;
+
+ return $this->_remove($criteria);
+ }
+
/**
* @param array $criteria Criteria to search by.
* @param array $cols Columns to select.