Merge pull request #661 from arabcoders/dev
Refactor DirectMapper to improve progress update handling and simplify metadata checks
This commit is contained in:
6
FAQ.md
6
FAQ.md
@@ -227,9 +227,9 @@ This will sync your local database state to the backend, ignoring date compariso
|
||||
|
||||
# Is there support for Multi-user setup?
|
||||
|
||||
There is **basic** support for multi-user setups, but it's not fully developed yet. The tool is primarily designed for
|
||||
single-user use, and multi-user functionality is built on top of that. Because of this, you might encounter some issues
|
||||
when using it with multiple users.
|
||||
The tool is primarily designed for single-user use, The Multi-user/sub-users functionality is built on top of that.
|
||||
Because of that you *may* encounter some issues when using it with multi-users. However, from our testing, sub-users
|
||||
functionality works well right now and behave as expected in the majority of cases. Follow the guidelines below.
|
||||
|
||||
## Getting started with a multi-user setup
|
||||
|
||||
|
||||
30
NEWS.md
30
NEWS.md
@@ -1,6 +1,34 @@
|
||||
# NEWS
|
||||
|
||||
This page contains old news about the project.
|
||||
### 2025-05-05
|
||||
|
||||
We’ve added a new feature that lets you send requests **sequentially** to the backends instead of using the default
|
||||
**parallel** mode. This can be especially helpful if you have very large libraries, slow disks, or simply want to avoid
|
||||
overloading the backends with too many concurrent requests. You can enable by enabling `WS_HTTP_SYNC_REQUESTS`
|
||||
environment variable. This mode only applies to `import`, `export`, and `backup` tasks at the moment.
|
||||
|
||||
Additionally, two command-line flags let you override the mode on the fly `--sync-requests` and `--async-requests`.
|
||||
|
||||
We’ll be evaluating this feature, and if it proves effective (and the slowdown is acceptable), we may
|
||||
make **sequential** mode the default in a future release. So far from our testing, we’ve seen between 1.5x to 2.0x
|
||||
increase in import time when using the sequential mode.
|
||||
|
||||
> [!NOTE]
|
||||
> Because we cache many HTTP requests, comparing timings between sequential and parallel runs of `import` can be
|
||||
> misleading. To get an accurate benchmark of `--sync-requests`, either start with a fresh setup (new installation) or
|
||||
> purge your Redis instance before testing.
|
||||
|
||||
### 2025-04-06
|
||||
|
||||
We have recently re-worked how the `backend:create` command works, and we no longer generate random name for invalid
|
||||
backends names or usernames. We do a normalization step to make sure the name is valid. This should help with the
|
||||
confusion of having random names. This means if you re-run the `backend:create` you most likely will get a different
|
||||
name than before. So, we suggest to re-run the command with `--re-create` flag. This flag will delete the current
|
||||
sub-users, and regenerate updated config files.
|
||||
|
||||
We have also added new guard for the command, so if you already generated your sub-users, re-running the command will
|
||||
show you a warning message and exit without doing anything. to run the command again either you need to use
|
||||
`--re-create` or `--run` flag. The `--run` flag will run the command without deleting the current sub-users.
|
||||
|
||||
### 2025-03-13
|
||||
|
||||
|
||||
46
README.md
46
README.md
@@ -4,47 +4,17 @@
|
||||

|
||||

|
||||
|
||||
This tool primary goal is to sync your backends play state without relying on third party services,
|
||||
out of the box, this tool support `Jellyfin`, `Plex` and `Emby` media servers.
|
||||
This tool primary goal is to sync your backends **users** play state without relying on third party services, out of the
|
||||
box, this tool support `Jellyfin`, `Plex` and `Emby` media servers.
|
||||
|
||||
# Updates
|
||||
|
||||
### 2025-05-05
|
||||
|
||||
We’ve added a new feature that lets you send requests **sequentially** to the backends instead of using the default
|
||||
**parallel** mode. This can be especially helpful if you have very large libraries, slow disks, or simply want to avoid
|
||||
overloading the backends with too many concurrent requests. You can enable by enabling `WS_HTTP_SYNC_REQUESTS`
|
||||
environment variable. This mode only applies to `import`, `export`, and `backup` tasks at the moment.
|
||||
|
||||
Additionally, two command-line flags let you override the mode on the fly `--sync-requests` and `--async-requests`.
|
||||
|
||||
We’ll be evaluating this feature, and if it proves effective (and the slowdown is acceptable), we may
|
||||
make **sequential** mode the default in a future release. So far from our testing, we’ve seen between 1.5x to 2.0x
|
||||
increase in import time when using the sequential mode.
|
||||
|
||||
> [!NOTE]
|
||||
> Because we cache many HTTP requests, comparing timings between sequential and parallel runs of `import` can be
|
||||
> misleading. To get an accurate benchmark of `--sync-requests`, either start with a fresh setup (new installation) or
|
||||
> purge your Redis instance before testing.
|
||||
|
||||
### 2025-04-06
|
||||
|
||||
We have recently re-worked how the `backend:create` command works, and we no longer generate random name for invalid
|
||||
backends names or usernames. We do a normalization step to make sure the name is valid. This should help with the
|
||||
confusion of having random names. This means if you re-run the `backend:create` you most likely will get a different
|
||||
name than before. So, we suggest to re-run the command with `--re-create` flag. This flag will delete the current
|
||||
sub-users, and regenerate updated config files.
|
||||
|
||||
We have also added new guard for the command, so if you already generated your sub-users, re-running the command will
|
||||
show you a warning message and exit without doing anything. to run the command again either you need to use
|
||||
`--re-create` or `--run` flag. The `--run` flag will run the command without deleting the current sub-users.
|
||||
|
||||
---
|
||||
Refer to [NEWS](NEWS.md) for old updates.
|
||||
Please refer to [NEWS](/NEWS.md) for the latest updates and changes.
|
||||
|
||||
# Features
|
||||
|
||||
* Management via WebUI.
|
||||
* **Sub-users** support.
|
||||
* Sync backends play state (`Many-to-Many` or `One-Way`).
|
||||
* Backup your backends play state into `portable` format.
|
||||
* Receive webhook events from media backends.
|
||||
@@ -59,6 +29,14 @@ simple and to the point yt-dlp frontend to help download content from all suppor
|
||||
|
||||
# Install
|
||||
|
||||
If you prefer video format [AlienTech42 YouTube Channel](https://www.youtube.com/@AlienTech42) had a video about
|
||||
installing WatchState using unraid [at this link](https://www.youtube.com/watch?v=XoztOwGHGxk). Much appreciated.
|
||||
|
||||
PS: I don't know the channel owner, but I appreciate the effort. There is small mistake in the video regarding the
|
||||
webhook URL, please copy the URL directly from the backends page. And this tool does support multi-users.
|
||||
|
||||
----
|
||||
|
||||
First, start by creating a directory to store the data, to follow along with this setup, create directory called `data`
|
||||
at your working directory. Then proceed to use your preferred method to install the tool.
|
||||
|
||||
|
||||
@@ -137,6 +137,7 @@ final class Webhooks
|
||||
}
|
||||
}
|
||||
|
||||
$debugTrace = true === (bool)ag($backend, 'options.' . Options::DEBUG_TRACE);
|
||||
$metadataOnly = true === (bool)ag($backend, 'options.' . Options::IMPORT_METADATA_ONLY);
|
||||
|
||||
if (true !== $metadataOnly && true !== (bool)ag($backend, 'import.enabled')) {
|
||||
@@ -208,6 +209,7 @@ final class Webhooks
|
||||
'tainted' => $entity->isTainted(),
|
||||
Options::IMPORT_METADATA_ONLY => $metadataOnly,
|
||||
Options::REQUEST_ID => ag($request->getServerParams(), 'X_REQUEST_ID'),
|
||||
Options::DEBUG_TRACE => $debugTrace,
|
||||
],
|
||||
Options::CONTEXT_USER => $userContext->name,
|
||||
]);
|
||||
|
||||
@@ -204,11 +204,9 @@ class DirectMapper implements ImportInterface
|
||||
*/
|
||||
private function addNewItem(iState $entity, array $opts = []): self
|
||||
{
|
||||
$metadataOnly = true === (bool)ag($opts, Options::IMPORT_METADATA_ONLY);
|
||||
$inDryRunMode = $this->inDryRunMode();
|
||||
$onStateUpdate = ag($opts, Options::STATE_UPDATE_EVENT, null);
|
||||
|
||||
if (true === $metadataOnly) {
|
||||
if (true === (bool)ag($opts, Options::IMPORT_METADATA_ONLY)) {
|
||||
$this->actions[$entity->type]['failed']++;
|
||||
Message::increment("{$entity->via}.{$entity->type}.failed");
|
||||
|
||||
@@ -253,7 +251,7 @@ class DirectMapper implements ImportInterface
|
||||
$entity->id = random_int((int)(PHP_INT_MAX / 2), PHP_INT_MAX);
|
||||
} else {
|
||||
$entity = $this->db->insert($entity);
|
||||
|
||||
$onStateUpdate = ag($opts, Options::STATE_UPDATE_EVENT, null);
|
||||
if (null !== $onStateUpdate && true === $entity->isWatched()) {
|
||||
$onStateUpdate($entity);
|
||||
}
|
||||
@@ -265,7 +263,7 @@ class DirectMapper implements ImportInterface
|
||||
'mapper' => afterLast(self::class, '\\'),
|
||||
'backend' => $entity->via,
|
||||
'title' => $entity->getName(),
|
||||
true === $this->inTraceMode() ? 'trace' : 'metadata' => $data,
|
||||
'metadata' => $data,
|
||||
]);
|
||||
|
||||
$this->addPointers($entity, $entity->id);
|
||||
@@ -277,7 +275,7 @@ class DirectMapper implements ImportInterface
|
||||
|
||||
$this->changed[$entity->id] = $this->objects[$entity->id] = $entity->id;
|
||||
|
||||
if (true === $inDryRunMode && $entity->hasPlayProgress()) {
|
||||
if (false === $inDryRunMode && $entity->hasPlayProgress()) {
|
||||
$itemId = r('{type}://{id}:{tainted}@{backend}', [
|
||||
'type' => $entity->type,
|
||||
'backend' => $entity->via,
|
||||
@@ -295,15 +293,10 @@ class DirectMapper implements ImportInterface
|
||||
context: [
|
||||
'user' => $this->userContext?->name ?? 'main',
|
||||
'mapper' => afterLast(self::class, '\\'),
|
||||
'error' => [
|
||||
'kind' => $e::class,
|
||||
'line' => $e->getLine(),
|
||||
'message' => $e->getMessage(),
|
||||
'file' => after($e->getFile(), ROOT_PATH),
|
||||
],
|
||||
'backend' => $entity->via,
|
||||
'title' => $entity->getName(),
|
||||
'state' => $entity->getAll()
|
||||
'state' => $entity->getAll(),
|
||||
...exception_log($e),
|
||||
],
|
||||
e: $e
|
||||
)
|
||||
@@ -328,13 +321,9 @@ class DirectMapper implements ImportInterface
|
||||
$inDryRunMode = $this->inDryRunMode();
|
||||
$keys = [iState::COLUMN_META_DATA];
|
||||
|
||||
$cloned = clone $local;
|
||||
$progressChange = $this->shouldProgressUpdate($local, $entity, $opts);
|
||||
|
||||
$newPlayProgress = (int)ag($entity->getMetadata($entity->via), iState::COLUMN_META_DATA_PROGRESS);
|
||||
$oldPlayProgress = (int)ag($cloned->getMetadata($entity->via), iState::COLUMN_META_DATA_PROGRESS);
|
||||
$playChanged = !$metadataOnly && $newPlayProgress > ($oldPlayProgress + 10);
|
||||
|
||||
if ($playChanged || true === (clone $local)->apply(entity: $entity, fields: $keys)->isChanged(fields: $keys)) {
|
||||
if (true === $progressChange || true === (clone $local)->apply($entity, fields: $keys)->isChanged($keys)) {
|
||||
try {
|
||||
$local = $local->apply(
|
||||
entity: $entity,
|
||||
@@ -342,46 +331,41 @@ class DirectMapper implements ImportInterface
|
||||
);
|
||||
|
||||
$this->removePointers($local)->addPointers($local, $local->id);
|
||||
|
||||
$changes = $local->diff(fields: $keys);
|
||||
$allowUpdate = (int)Config::get('progress.threshold', 0);
|
||||
$minThreshold = (int)Config::get('progress.minThreshold', 86_400);
|
||||
|
||||
$progress = $playChanged && $entity->hasPlayProgress();
|
||||
if ($entity->isWatched() && $allowUpdate < $minThreshold) {
|
||||
$progress = false;
|
||||
}
|
||||
|
||||
if (count($changes) >= 1) {
|
||||
if (true === $progressChange || count($changes) >= 1) {
|
||||
$_keys = array_merge($keys, [iState::COLUMN_EXTRA]);
|
||||
if ($playChanged && $progress) {
|
||||
if (true === $progressChange) {
|
||||
$_keys[] = iState::COLUMN_VIA;
|
||||
}
|
||||
|
||||
$local = $local->apply($entity, fields: $_keys);
|
||||
|
||||
$message = "{mapper}: [T] '{user}@{backend}' updated '#{id}: {title}' ";
|
||||
|
||||
$this->logger->log(
|
||||
$progress ? LogLevel::NOTICE : LogLevel::INFO,
|
||||
$progress ? "{mapper}: [T] '{user}@{backend}' updated '#{id}: {title}' due to play progress change." : "{mapper}: [T] '{user}@{backend}' updated '#{id}: {title}' metadata.",
|
||||
true === $progressChange ? LogLevel::NOTICE : LogLevel::INFO,
|
||||
$message . (true === $progressChange ? "due to play progress change." : "metadata."),
|
||||
[
|
||||
'user' => $this->userContext?->name ?? 'main',
|
||||
'mapper' => afterLast(self::class, '\\'),
|
||||
'id' => $cloned->id ?? 'New',
|
||||
'id' => $local->id ?? 'New',
|
||||
'backend' => $entity->via,
|
||||
'title' => $cloned->getName(),
|
||||
'changes' => $progress ? $local->diff(fields: $_keys) : $changes,
|
||||
'title' => $local->getName(),
|
||||
'changes' => true === $progressChange ? $local->diff(fields: $_keys) : $changes,
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
if (false === $inDryRunMode) {
|
||||
$this->db->update($local);
|
||||
if (true === $progress) {
|
||||
if (true === $progressChange) {
|
||||
$itemId = r('{type}://{id}:{tainted}@{backend}', [
|
||||
'type' => $entity->type,
|
||||
'backend' => $entity->via,
|
||||
'tainted' => 'untainted',
|
||||
'id' => ag($entity->getMetadata($entity->via), iState::COLUMN_ID, '??'),
|
||||
]);
|
||||
|
||||
$this->progressItems[$itemId] = $local;
|
||||
}
|
||||
}
|
||||
@@ -401,12 +385,6 @@ class DirectMapper implements ImportInterface
|
||||
context: [
|
||||
'user' => $this->userContext?->name ?? 'main',
|
||||
'mapper' => afterLast(self::class, '\\'),
|
||||
'error' => [
|
||||
'kind' => $e::class,
|
||||
'line' => $e->getLine(),
|
||||
'message' => $e->getMessage(),
|
||||
'file' => after($e->getFile(), ROOT_PATH),
|
||||
],
|
||||
'id' => $local->id ?? 'New',
|
||||
'backend' => $entity->via,
|
||||
'title' => $local->getName(),
|
||||
@@ -414,6 +392,7 @@ class DirectMapper implements ImportInterface
|
||||
'database' => $local->getAll(),
|
||||
'backend' => $entity->getAll()
|
||||
],
|
||||
...exception_log($e),
|
||||
],
|
||||
e: $e
|
||||
)
|
||||
@@ -482,9 +461,7 @@ class DirectMapper implements ImportInterface
|
||||
private function handleOldEntity(iState $local, iState $entity, array $opts = []): self
|
||||
{
|
||||
$keys = [iState::COLUMN_META_DATA];
|
||||
$metadataOnly = true === (bool)ag($opts, Options::IMPORT_METADATA_ONLY);
|
||||
$inDryRunMode = $this->inDryRunMode();
|
||||
$onStateUpdate = ag($opts, Options::STATE_UPDATE_EVENT, null);
|
||||
|
||||
$cloned = clone $local;
|
||||
|
||||
@@ -498,8 +475,7 @@ class DirectMapper implements ImportInterface
|
||||
|
||||
if (false === $inDryRunMode) {
|
||||
$this->db->update($local);
|
||||
|
||||
if (null !== $onStateUpdate) {
|
||||
if (null !== ($onStateUpdate = ag($opts, Options::STATE_UPDATE_EVENT, null))) {
|
||||
$onStateUpdate($local);
|
||||
}
|
||||
}
|
||||
@@ -528,12 +504,6 @@ class DirectMapper implements ImportInterface
|
||||
context: [
|
||||
'user' => $this->userContext?->name ?? 'main',
|
||||
'mapper' => afterLast(self::class, '\\'),
|
||||
'error' => [
|
||||
'kind' => $e::class,
|
||||
'line' => $e->getLine(),
|
||||
'message' => $e->getMessage(),
|
||||
'file' => after($e->getFile(), ROOT_PATH),
|
||||
],
|
||||
'id' => $cloned->id ?? 'New',
|
||||
'backend' => $entity->via,
|
||||
'title' => $cloned->getName(),
|
||||
@@ -541,6 +511,7 @@ class DirectMapper implements ImportInterface
|
||||
'database' => $cloned->getAll(),
|
||||
'backend' => $entity->getAll()
|
||||
],
|
||||
...exception_log($e),
|
||||
],
|
||||
e: $e
|
||||
)
|
||||
@@ -550,14 +521,14 @@ class DirectMapper implements ImportInterface
|
||||
return $this;
|
||||
}
|
||||
|
||||
$newPlayProgress = (int)ag($entity->getMetadata($entity->via), iState::COLUMN_META_DATA_PROGRESS);
|
||||
$oldPlayProgress = (int)ag($cloned->getMetadata($entity->via), iState::COLUMN_META_DATA_PROGRESS);
|
||||
$playChanged = !$metadataOnly && $newPlayProgress > ($oldPlayProgress + 10);
|
||||
$metaExists = count($cloned->getMetadata($entity->via)) >= 1;
|
||||
$progressChange = $this->shouldProgressUpdate($local, $entity, $opts);
|
||||
|
||||
$updateMeta = true === (bool)ag($this->options, Options::MAPPER_ALWAYS_UPDATE_META);
|
||||
$hasMeta = count($cloned->getMetadata($entity->via)) >= 1;
|
||||
|
||||
// -- this sometimes leads to never ending updates as data from backends conflicts.
|
||||
if (!$metaExists || $playChanged || true === (bool)ag($this->options, Options::MAPPER_ALWAYS_UPDATE_META)) {
|
||||
if (true === (clone $cloned)->apply(entity: $entity, fields: $keys)->isChanged(fields: $keys)) {
|
||||
if (false === $hasMeta || true === $progressChange || true === $updateMeta) {
|
||||
if (false === $hasMeta || $progressChange || (clone $cloned)->apply($entity, $keys)->isChanged($keys)) {
|
||||
try {
|
||||
$local = $local->apply(
|
||||
entity: $entity,
|
||||
@@ -567,44 +538,38 @@ class DirectMapper implements ImportInterface
|
||||
$this->removePointers($cloned)->addPointers($local, $local->id);
|
||||
|
||||
$changes = $local->diff(fields: $keys);
|
||||
$allowUpdate = (int)Config::get('progress.threshold', 0);
|
||||
$minThreshold = (int)Config::get('progress.minThreshold', 86_400);
|
||||
|
||||
$progress = $playChanged && $entity->hasPlayProgress();
|
||||
if ($entity->isWatched() && $allowUpdate < $minThreshold) {
|
||||
$progress = false;
|
||||
}
|
||||
|
||||
if (count($changes) >= 1) {
|
||||
if (true === $progressChange || false === $hasMeta || count($changes) >= 1) {
|
||||
$_keys = array_merge($keys, [iState::COLUMN_EXTRA]);
|
||||
if ($playChanged && $progress) {
|
||||
if (true === $progressChange) {
|
||||
$_keys[] = iState::COLUMN_VIA;
|
||||
}
|
||||
$local = $local->apply($entity, fields: $_keys);
|
||||
|
||||
$message = "{mapper}: [O] '{user}@{backend}' updated '#{id}: {title}' ";
|
||||
$this->logger->log(
|
||||
$progress ? LogLevel::NOTICE : LogLevel::INFO,
|
||||
$progress ? "{mapper}: [O] '{user}@{backend}' updated '#{id}: {title}' due to play progress change." : "{mapper}: [O] '{user}@{backend}' updated '#{id}: {title}' metadata.",
|
||||
true === $progressChange ? LogLevel::NOTICE : LogLevel::INFO,
|
||||
$message . (true === $progressChange ? "due to play progress change." : "metadata."),
|
||||
[
|
||||
'user' => $this->userContext?->name ?? 'main',
|
||||
'mapper' => afterLast(self::class, '\\'),
|
||||
'id' => $cloned->id ?? 'New',
|
||||
'backend' => $entity->via,
|
||||
'title' => $cloned->getName(),
|
||||
'changes' => $progress ? $local->diff(fields: $_keys) : $changes,
|
||||
'changes' => true === $progressChange ? $local->diff(fields: $_keys) : $changes,
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
if (false === $inDryRunMode) {
|
||||
$this->db->update($local);
|
||||
if ($progress) {
|
||||
if (true === $progressChange) {
|
||||
$itemId = r('{type}://{id}:{tainted}@{backend}', [
|
||||
'type' => $entity->type,
|
||||
'backend' => $entity->via,
|
||||
'tainted' => 'untainted',
|
||||
'id' => ag($entity->getMetadata($entity->via), iState::COLUMN_ID, '??'),
|
||||
]);
|
||||
|
||||
$this->progressItems[$itemId] = $local;
|
||||
}
|
||||
}
|
||||
@@ -624,12 +589,6 @@ class DirectMapper implements ImportInterface
|
||||
context: [
|
||||
'user' => $this->userContext?->name ?? 'main',
|
||||
'mapper' => afterLast(self::class, '\\'),
|
||||
'error' => [
|
||||
'kind' => $e::class,
|
||||
'line' => $e->getLine(),
|
||||
'message' => $e->getMessage(),
|
||||
'file' => after($e->getFile(), ROOT_PATH),
|
||||
],
|
||||
'id' => $cloned->id ?? 'New',
|
||||
'backend' => $entity->via,
|
||||
'title' => $cloned->getName(),
|
||||
@@ -637,6 +596,7 @@ class DirectMapper implements ImportInterface
|
||||
'database' => $cloned->getAll(),
|
||||
'backend' => $entity->getAll()
|
||||
],
|
||||
...exception_log($e),
|
||||
],
|
||||
e: $e
|
||||
)
|
||||
@@ -649,9 +609,10 @@ class DirectMapper implements ImportInterface
|
||||
|
||||
Message::increment("{$entity->via}.{$entity->type}.ignored_not_played_since_last_sync");
|
||||
|
||||
if ($entity->isWatched() !== $local->isWatched()) {
|
||||
$hasAfter = null !== ($opts['after'] ?? null) && true === ($opts['after'] instanceof iDate);
|
||||
if ($entity->isWatched() !== $local->isWatched() && $hasAfter) {
|
||||
$this->logger->notice(
|
||||
"{mapper}: [O] '{user}@{backend}' item '#{id}: {title}' is marked as '{state}' vs local '{local_state}', however due to the remote item date '{remote_date}' being older than the last backend sync date '{local_date}'. it was not considered as valid state.",
|
||||
"{mapper}: [O] '{user}@{backend}' item '#{id}: {title}' date '{remote_date}' is older than last sync date '{local_date}'. Marking the item as tainted and re-processing.",
|
||||
[
|
||||
'user' => $this->userContext?->name ?? 'main',
|
||||
'mapper' => afterLast(self::class, '\\'),
|
||||
@@ -664,7 +625,15 @@ class DirectMapper implements ImportInterface
|
||||
'title' => $entity->getName(),
|
||||
]
|
||||
);
|
||||
return $this;
|
||||
|
||||
$entity->metadata = ag_set(
|
||||
$entity->getMetadata(),
|
||||
"{$entity->via}." . iState::COLUMN_META_DATA_PLAYED_AT,
|
||||
$entity->updated
|
||||
);
|
||||
$entity->setIsTainted(true);
|
||||
|
||||
return $this->add($entity, $opts);
|
||||
}
|
||||
|
||||
if ($this->inTraceMode()) {
|
||||
@@ -716,10 +685,6 @@ class DirectMapper implements ImportInterface
|
||||
return $this;
|
||||
}
|
||||
|
||||
$metadataOnly = true === (bool)ag($opts, Options::IMPORT_METADATA_ONLY);
|
||||
$inDryRunMode = $this->inDryRunMode();
|
||||
$onStateUpdate = ag($opts, Options::STATE_UPDATE_EVENT, null);
|
||||
|
||||
/**
|
||||
* Handle adding new item logic.
|
||||
*/
|
||||
@@ -737,7 +702,7 @@ class DirectMapper implements ImportInterface
|
||||
* ONLY update backend metadata
|
||||
* if metadataOnly is set or the event is tainted.
|
||||
*/
|
||||
if (true === $metadataOnly || true === $entity->isTainted()) {
|
||||
if (true === (bool)ag($opts, Options::IMPORT_METADATA_ONLY) || true === $entity->isTainted()) {
|
||||
return $this->handleTainted($cloned, $entity, $opts);
|
||||
}
|
||||
|
||||
@@ -748,7 +713,7 @@ class DirectMapper implements ImportInterface
|
||||
}
|
||||
|
||||
/**
|
||||
* Fix for #329 {@see https://github.com/arabcoders/watchstate/issues/329}
|
||||
* Fix for issue #329 {@see https://github.com/arabcoders/watchstate/issues/329}
|
||||
*
|
||||
* This conditional block should proceed only if specific conditions are met.
|
||||
* 1- the backend state is [unwatched] while the db state is [watched]
|
||||
@@ -762,7 +727,9 @@ class DirectMapper implements ImportInterface
|
||||
|
||||
if (false === $hasMeta) {
|
||||
$message .= ' No metadata. Marking the item as tainted and re-processing.';
|
||||
} elseif (true === $hasDate) {
|
||||
}
|
||||
|
||||
if (true === $hasMeta && true === $hasDate) {
|
||||
$message .= ' db.metadata.played_at is equal to entity.updated. Marking the item as tainted and re-processing.';
|
||||
}
|
||||
|
||||
@@ -785,6 +752,19 @@ class DirectMapper implements ImportInterface
|
||||
}
|
||||
}
|
||||
|
||||
return $this->handleUntaintedEntity($cloned, $entity, $opts);
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle Untainted entities.
|
||||
*/
|
||||
public function handleUntaintedEntity(iState $local, iState $entity, array $opts = []): self
|
||||
{
|
||||
$inDryRunMode = $this->inDryRunMode();
|
||||
|
||||
$cloned = clone $local;
|
||||
|
||||
$progressChange = $this->shouldProgressUpdate($local, $entity, $opts);
|
||||
$keys = $opts['diff_keys'] ?? array_flip(
|
||||
array_keys_diff(
|
||||
base: array_flip(iState::ENTITY_KEYS),
|
||||
@@ -793,42 +773,56 @@ class DirectMapper implements ImportInterface
|
||||
)
|
||||
);
|
||||
|
||||
if (true === (clone $cloned)->apply(entity: $entity, fields: $keys)->isChanged(fields: $keys)) {
|
||||
if (true === $progressChange || true === (clone $cloned)->apply($entity, fields: $keys)->isChanged($keys)) {
|
||||
try {
|
||||
$local = $local->apply(
|
||||
entity: $entity,
|
||||
fields: array_merge($keys, [iState::COLUMN_EXTRA])
|
||||
);
|
||||
$_keys = array_merge($keys, [iState::COLUMN_EXTRA]);
|
||||
if (true === $progressChange) {
|
||||
$_keys[] = iState::COLUMN_VIA;
|
||||
}
|
||||
|
||||
$local = $local->apply(entity: $entity, fields: $_keys);
|
||||
$this->removePointers($cloned)->addPointers($local, $local->id);
|
||||
|
||||
$changes = $local->diff(fields: $keys);
|
||||
$changes = $local->diff(fields: $_keys);
|
||||
|
||||
$message = "{mapper}: [A] '{user}@{backend}' Updated '#{id}: {title}'.";
|
||||
$message = "{mapper}: [U] '{user}@{backend}' Updated '#{id}: {title}'.";
|
||||
|
||||
$isPlayChanged = $cloned->isWatched() !== $local->isWatched();
|
||||
$stateChange = $cloned->isWatched() !== $local->isWatched();
|
||||
|
||||
if (true === $isPlayChanged) {
|
||||
$message = "{mapper}: [A] '{user}@{backend}' Updated and marked '#{id}: {title}' as '{state}'.";
|
||||
if (null !== $onStateUpdate) {
|
||||
if (true === $progressChange) {
|
||||
$message .= " Due to play progress change.";
|
||||
}
|
||||
|
||||
if (true === $stateChange) {
|
||||
$message = "{mapper}: [U] '{user}@{backend}' Updated and marked '#{id}: {title}' as '{state}'.";
|
||||
if (null !== ($onStateUpdate = ag($opts, Options::STATE_UPDATE_EVENT, null))) {
|
||||
$onStateUpdate($local);
|
||||
}
|
||||
}
|
||||
|
||||
if (count($changes) >= 1) {
|
||||
$this->logger->log(true === $isPlayChanged ? LogLevel::NOTICE : LogLevel::INFO, $message, [
|
||||
if (true === $progressChange || count($changes) >= 1) {
|
||||
$this->logger->log($progressChange || $stateChange ? LogLevel::NOTICE : LogLevel::INFO, $message, [
|
||||
'user' => $this->userContext?->name ?? 'main',
|
||||
'mapper' => afterLast(self::class, '\\'),
|
||||
'id' => $cloned->id ?? 'New',
|
||||
'backend' => $entity->via,
|
||||
'title' => $cloned->getName(),
|
||||
'state' => $local->isWatched() ? 'played' : 'unplayed',
|
||||
'changes' => $local->diff(fields: $keys)
|
||||
'changes' => $local->diff(fields: $_keys)
|
||||
]);
|
||||
}
|
||||
|
||||
if (false === $inDryRunMode) {
|
||||
$this->db->update($local);
|
||||
if (true === $progressChange) {
|
||||
$itemId = r('{type}://{id}:{tainted}@{backend}', [
|
||||
'type' => $entity->type,
|
||||
'backend' => $entity->via,
|
||||
'tainted' => 'untainted',
|
||||
'id' => ag($entity->getMetadata($entity->via), iState::COLUMN_ID, '??'),
|
||||
]);
|
||||
$this->progressItems[$itemId] = $local;
|
||||
}
|
||||
}
|
||||
|
||||
if (null === ($this->changed[$local->id] ?? null)) {
|
||||
@@ -842,16 +836,10 @@ class DirectMapper implements ImportInterface
|
||||
Message::increment("{$entity->via}.{$local->type}.failed");
|
||||
$this->logger->error(
|
||||
...lw(
|
||||
message: "{mapper}: [A] Exception '{error.kind}' was thrown unhandled during '{user}@{backend}' - '{title}' add. {error.message} at '{error.file}:{error.line}'.",
|
||||
message: "{mapper}: [U] Exception '{error.kind}' was thrown unhandled during '{user}@{backend}' - '{title}' add. {error.message} at '{error.file}:{error.line}'.",
|
||||
context: [
|
||||
'user' => $this->userContext?->name ?? 'main',
|
||||
'mapper' => afterLast(self::class, '\\'),
|
||||
'error' => [
|
||||
'kind' => $e::class,
|
||||
'line' => $e->getLine(),
|
||||
'message' => $e->getMessage(),
|
||||
'file' => after($e->getFile(), ROOT_PATH),
|
||||
],
|
||||
'id' => $cloned->id ?? 'New',
|
||||
'backend' => $entity->via,
|
||||
'title' => $cloned->getName(),
|
||||
@@ -859,7 +847,7 @@ class DirectMapper implements ImportInterface
|
||||
'database' => $cloned->getAll(),
|
||||
'backend' => $entity->getAll()
|
||||
],
|
||||
'trace' => $e->getTrace(),
|
||||
...exception_log($e),
|
||||
],
|
||||
e: $e
|
||||
)
|
||||
@@ -886,7 +874,7 @@ class DirectMapper implements ImportInterface
|
||||
|
||||
if ($this->inTraceMode()) {
|
||||
$this->logger->info(
|
||||
"{mapper}: [A] Ignoring '{user}@{backend}' - '#{id}: {title}'. Metadata & play state are identical.",
|
||||
"{mapper}: [U] Ignoring '{user}@{backend}' - '#{id}: {title}'. Metadata & play state are identical.",
|
||||
$context
|
||||
);
|
||||
}
|
||||
@@ -938,16 +926,21 @@ class DirectMapper implements ImportInterface
|
||||
*/
|
||||
public function commit(): array
|
||||
{
|
||||
if (true === (bool)Config::get('sync.progress', false) && count($this->progressItems) >= 1) {
|
||||
if (true === (bool)Config::get('sync.progress', false) && count(
|
||||
$this->progressItems
|
||||
) >= 1 && false === $this->inDryRunMode()) {
|
||||
try {
|
||||
$name = '{type}://{id}@{backend}';
|
||||
|
||||
$opts = ['unique' => true];
|
||||
|
||||
if (null !== $this->userContext) {
|
||||
$opts = ag_set($opts, Options::CONTEXT_USER, $this->userContext->name);
|
||||
$name = $name . '/' . $this->userContext->name;
|
||||
}
|
||||
|
||||
foreach ($this->progressItems as $entity) {
|
||||
$opts[EventsTable::COLUMN_REFERENCE] = r('{type}://{id}@{backend}', [
|
||||
$opts[EventsTable::COLUMN_REFERENCE] = r($name, [
|
||||
'type' => $entity->type,
|
||||
'backend' => $entity->via,
|
||||
'id' => ag($entity->getMetadata($entity->via), iState::COLUMN_ID, '??'),
|
||||
@@ -1174,4 +1167,50 @@ class DirectMapper implements ImportInterface
|
||||
|
||||
return $this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if the progress update should be applied.
|
||||
* @param iState $old The old entity.
|
||||
* @param iState $new The new entity.
|
||||
* @param array $opts Additional options.
|
||||
*
|
||||
* @return bool True if the progress update should be applied, false otherwise.
|
||||
*/
|
||||
private function shouldProgressUpdate(iState $old, iState $new, array $opts = []): bool
|
||||
{
|
||||
if (true === (bool)ag($opts, Options::IMPORT_METADATA_ONLY, false)) {
|
||||
if (true === $this->inTraceMode()) {
|
||||
$this->logger?->info('only metadata updates allowed.');
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
$newPlayProgress = (int)ag($new->getMetadata($new->via), iState::COLUMN_META_DATA_PROGRESS);
|
||||
$oldPlayProgress = (int)ag($old->getMetadata($new->via), iState::COLUMN_META_DATA_PROGRESS);
|
||||
$playChanged = $newPlayProgress > ($oldPlayProgress + 10);
|
||||
|
||||
$allowUpdate = (int)Config::get('progress.threshold', 0);
|
||||
$minThreshold = (int)Config::get('progress.minThreshold', 86_400);
|
||||
|
||||
if (true === $new->isWatched() && $allowUpdate < $minThreshold) {
|
||||
if (true === $this->inTraceMode()) {
|
||||
$this->logger?->info('play progress update not allowed. threshold is too low.');
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
if (true === $this->inTraceMode()) {
|
||||
$message = "{mapper}: '{user}@{backend}' - '#{id}: {title}' Is {not}marked for progress update.";
|
||||
$this->logger->info($message, [
|
||||
'mapper' => afterLast(self::class, '\\'),
|
||||
'user' => $this->userContext?->name ?? 'main',
|
||||
'backend' => $new->via,
|
||||
'id' => $old->id ?? 'New',
|
||||
'title' => $old->getName(),
|
||||
'not' => $playChanged ? '' : 'not ',
|
||||
]);
|
||||
}
|
||||
|
||||
return $playChanged;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -79,10 +79,8 @@ final readonly class ProcessRequestEvent
|
||||
$writer(Level::Notice, $message);
|
||||
|
||||
$mapper = $userContext->mapper;
|
||||
if (null !== ag($e->getOptions(), Options::DEBUG_TRACE)) {
|
||||
$mapper = $userContext->mapper->setOptions(
|
||||
ag_set($userContext->mapper->getOptions(), Options::DEBUG_TRACE, true)
|
||||
);
|
||||
if (true === (bool)ag($e->getOptions(), Options::DEBUG_TRACE, false)) {
|
||||
$mapper = $mapper->setOptions(ag_set($mapper->getOptions(), Options::DEBUG_TRACE, true));
|
||||
}
|
||||
|
||||
$logger = clone $this->logger;
|
||||
@@ -94,7 +92,7 @@ final readonly class ProcessRequestEvent
|
||||
|
||||
$mapper->add($entity, [
|
||||
Options::IMPORT_METADATA_ONLY => (bool)ag($e->getOptions(), Options::IMPORT_METADATA_ONLY),
|
||||
Options::STATE_UPDATE_EVENT => fn (iState $state) => queuePush(entity: $state, userContext: $userContext),
|
||||
Options::STATE_UPDATE_EVENT => fn(iState $state) => queuePush(entity: $state, userContext: $userContext),
|
||||
'after' => $lastSync,
|
||||
]);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user