Refactor DirectMapper to improve progress update handling and simplify metadata checks; update README with video installation guide.

This commit is contained in:
arabcoders
2025-05-11 22:28:29 +03:00
parent 5ee7302186
commit c70442eb5a
2 changed files with 116 additions and 90 deletions

View File

@@ -59,6 +59,14 @@ simple and to the point yt-dlp frontend to help download content from all suppor
# Install
If you prefer video format [AlienTech42 YouTube Channel](https://www.youtube.com/@AlienTech42) had a video about
installing WatchState using unraid [at this link](https://www.youtube.com/watch?v=XoztOwGHGxk). Much appreciated.
PS: I don't know the channel owner, but I appreciate the effort. There is small mistake in the video regarding the
webhook URL, please copy the URL directly from the backends page.
----
First, start by creating a directory to store the data, to follow along with this setup, create directory called `data`
at your working directory. Then proceed to use your preferred method to install the tool.

View File

@@ -204,11 +204,9 @@ class DirectMapper implements ImportInterface
*/
private function addNewItem(iState $entity, array $opts = []): self
{
$metadataOnly = true === (bool)ag($opts, Options::IMPORT_METADATA_ONLY);
$inDryRunMode = $this->inDryRunMode();
$onStateUpdate = ag($opts, Options::STATE_UPDATE_EVENT, null);
if (true === $metadataOnly) {
if (true === (bool)ag($opts, Options::IMPORT_METADATA_ONLY)) {
$this->actions[$entity->type]['failed']++;
Message::increment("{$entity->via}.{$entity->type}.failed");
@@ -253,7 +251,7 @@ class DirectMapper implements ImportInterface
$entity->id = random_int((int)(PHP_INT_MAX / 2), PHP_INT_MAX);
} else {
$entity = $this->db->insert($entity);
$onStateUpdate = ag($opts, Options::STATE_UPDATE_EVENT, null);
if (null !== $onStateUpdate && true === $entity->isWatched()) {
$onStateUpdate($entity);
}
@@ -265,7 +263,7 @@ class DirectMapper implements ImportInterface
'mapper' => afterLast(self::class, '\\'),
'backend' => $entity->via,
'title' => $entity->getName(),
true === $this->inTraceMode() ? 'trace' : 'metadata' => $data,
'metadata' => $data,
]);
$this->addPointers($entity, $entity->id);
@@ -277,7 +275,7 @@ class DirectMapper implements ImportInterface
$this->changed[$entity->id] = $this->objects[$entity->id] = $entity->id;
if (true === $inDryRunMode && $entity->hasPlayProgress()) {
if (false === $inDryRunMode && $entity->hasPlayProgress()) {
$itemId = r('{type}://{id}:{tainted}@{backend}', [
'type' => $entity->type,
'backend' => $entity->via,
@@ -323,13 +321,9 @@ class DirectMapper implements ImportInterface
$inDryRunMode = $this->inDryRunMode();
$keys = [iState::COLUMN_META_DATA];
$cloned = clone $local;
$progressChange = $this->shouldProgressUpdate($local, $entity, $opts);
$newPlayProgress = (int)ag($entity->getMetadata($entity->via), iState::COLUMN_META_DATA_PROGRESS);
$oldPlayProgress = (int)ag($cloned->getMetadata($entity->via), iState::COLUMN_META_DATA_PROGRESS);
$playChanged = !$metadataOnly && $newPlayProgress > ($oldPlayProgress + 10);
if ($playChanged || true === (clone $local)->apply(entity: $entity, fields: $keys)->isChanged(fields: $keys)) {
if (true === $progressChange || true === (clone $local)->apply($entity, fields: $keys)->isChanged($keys)) {
try {
$local = $local->apply(
entity: $entity,
@@ -337,46 +331,41 @@ class DirectMapper implements ImportInterface
);
$this->removePointers($local)->addPointers($local, $local->id);
$changes = $local->diff(fields: $keys);
$allowUpdate = (int)Config::get('progress.threshold', 0);
$minThreshold = (int)Config::get('progress.minThreshold', 86_400);
$progress = $playChanged && $entity->hasPlayProgress();
if ($entity->isWatched() && $allowUpdate < $minThreshold) {
$progress = false;
}
if (count($changes) >= 1) {
if (true === $progressChange || count($changes) >= 1) {
$_keys = array_merge($keys, [iState::COLUMN_EXTRA]);
if ($playChanged && $progress) {
if (true === $progressChange) {
$_keys[] = iState::COLUMN_VIA;
}
$local = $local->apply($entity, fields: $_keys);
$message = "{mapper}: [T] '{user}@{backend}' updated '#{id}: {title}' ";
$this->logger->log(
$progress ? LogLevel::NOTICE : LogLevel::INFO,
$progress ? "{mapper}: [T] '{user}@{backend}' updated '#{id}: {title}' due to play progress change." : "{mapper}: [T] '{user}@{backend}' updated '#{id}: {title}' metadata.",
true === $progressChange ? LogLevel::NOTICE : LogLevel::INFO,
$message . (true === $progressChange ? "due to play progress change." : "metadata."),
[
'user' => $this->userContext?->name ?? 'main',
'mapper' => afterLast(self::class, '\\'),
'id' => $cloned->id ?? 'New',
'id' => $local->id ?? 'New',
'backend' => $entity->via,
'title' => $cloned->getName(),
'changes' => $progress ? $local->diff(fields: $_keys) : $changes,
'title' => $local->getName(),
'changes' => true === $progressChange ? $local->diff(fields: $_keys) : $changes,
]
);
}
if (false === $inDryRunMode) {
$this->db->update($local);
if (true === $progress) {
if (true === $progressChange) {
$itemId = r('{type}://{id}:{tainted}@{backend}', [
'type' => $entity->type,
'backend' => $entity->via,
'tainted' => 'untainted',
'id' => ag($entity->getMetadata($entity->via), iState::COLUMN_ID, '??'),
]);
$this->progressItems[$itemId] = $local;
}
}
@@ -472,9 +461,7 @@ class DirectMapper implements ImportInterface
private function handleOldEntity(iState $local, iState $entity, array $opts = []): self
{
$keys = [iState::COLUMN_META_DATA];
$metadataOnly = true === (bool)ag($opts, Options::IMPORT_METADATA_ONLY);
$inDryRunMode = $this->inDryRunMode();
$onStateUpdate = ag($opts, Options::STATE_UPDATE_EVENT, null);
$cloned = clone $local;
@@ -488,8 +475,7 @@ class DirectMapper implements ImportInterface
if (false === $inDryRunMode) {
$this->db->update($local);
if (null !== $onStateUpdate) {
if (null !== ($onStateUpdate = ag($opts, Options::STATE_UPDATE_EVENT, null))) {
$onStateUpdate($local);
}
}
@@ -535,14 +521,14 @@ class DirectMapper implements ImportInterface
return $this;
}
$newPlayProgress = (int)ag($entity->getMetadata($entity->via), iState::COLUMN_META_DATA_PROGRESS);
$oldPlayProgress = (int)ag($cloned->getMetadata($entity->via), iState::COLUMN_META_DATA_PROGRESS);
$playChanged = !$metadataOnly && $newPlayProgress > ($oldPlayProgress + 10);
$metaExists = count($cloned->getMetadata($entity->via)) >= 1;
$progressChange = $this->shouldProgressUpdate($local, $entity, $opts);
$updateMeta = true === (bool)ag($this->options, Options::MAPPER_ALWAYS_UPDATE_META);
$hasMeta = count($cloned->getMetadata($entity->via)) >= 1;
// -- this sometimes leads to never ending updates as data from backends conflicts.
if (!$metaExists || $playChanged || true === (bool)ag($this->options, Options::MAPPER_ALWAYS_UPDATE_META)) {
if (true === (clone $cloned)->apply(entity: $entity, fields: $keys)->isChanged(fields: $keys)) {
if (false === $hasMeta || true === $progressChange || true === $updateMeta) {
if (false === $hasMeta || $progressChange || (clone $cloned)->apply($entity, $keys)->isChanged($keys)) {
try {
$local = $local->apply(
entity: $entity,
@@ -552,44 +538,38 @@ class DirectMapper implements ImportInterface
$this->removePointers($cloned)->addPointers($local, $local->id);
$changes = $local->diff(fields: $keys);
$allowUpdate = (int)Config::get('progress.threshold', 0);
$minThreshold = (int)Config::get('progress.minThreshold', 86_400);
$progress = $playChanged && $entity->hasPlayProgress();
if ($entity->isWatched() && $allowUpdate < $minThreshold) {
$progress = false;
}
if (count($changes) >= 1) {
if (true === $progressChange || false === $hasMeta || count($changes) >= 1) {
$_keys = array_merge($keys, [iState::COLUMN_EXTRA]);
if ($playChanged && $progress) {
if (true === $progressChange) {
$_keys[] = iState::COLUMN_VIA;
}
$local = $local->apply($entity, fields: $_keys);
$message = "{mapper}: [O] '{user}@{backend}' updated '#{id}: {title}' ";
$this->logger->log(
$progress ? LogLevel::NOTICE : LogLevel::INFO,
$progress ? "{mapper}: [O] '{user}@{backend}' updated '#{id}: {title}' due to play progress change." : "{mapper}: [O] '{user}@{backend}' updated '#{id}: {title}' metadata.",
true === $progressChange ? LogLevel::NOTICE : LogLevel::INFO,
$message . (true === $progressChange ? "due to play progress change." : "metadata."),
[
'user' => $this->userContext?->name ?? 'main',
'mapper' => afterLast(self::class, '\\'),
'id' => $cloned->id ?? 'New',
'backend' => $entity->via,
'title' => $cloned->getName(),
'changes' => $progress ? $local->diff(fields: $_keys) : $changes,
'changes' => true === $progressChange ? $local->diff(fields: $_keys) : $changes,
]
);
}
if (false === $inDryRunMode) {
$this->db->update($local);
if ($progress) {
if (true === $progressChange) {
$itemId = r('{type}://{id}:{tainted}@{backend}', [
'type' => $entity->type,
'backend' => $entity->via,
'tainted' => 'untainted',
'id' => ag($entity->getMetadata($entity->via), iState::COLUMN_ID, '??'),
]);
$this->progressItems[$itemId] = $local;
}
}
@@ -629,9 +609,10 @@ class DirectMapper implements ImportInterface
Message::increment("{$entity->via}.{$entity->type}.ignored_not_played_since_last_sync");
if ($entity->isWatched() !== $local->isWatched()) {
$hasAfter = null !== ($opts['after'] ?? null) && true === ($opts['after'] instanceof iDate);
if ($entity->isWatched() !== $local->isWatched() && $hasAfter) {
$this->logger->notice(
"{mapper}: [O] '{user}@{backend}' item '#{id}: {title}' is marked as '{state}' vs local '{local_state}', however due to the remote item date '{remote_date}' being older than the last backend sync date '{local_date}'. it was not considered as valid state.",
"{mapper}: [O] '{user}@{backend}' item '#{id}: {title}' date '{remote_date}' is older than last sync date '{local_date}'. Marking the item as tainted and re-processing.",
[
'user' => $this->userContext?->name ?? 'main',
'mapper' => afterLast(self::class, '\\'),
@@ -644,7 +625,15 @@ class DirectMapper implements ImportInterface
'title' => $entity->getName(),
]
);
return $this;
$entity->metadata = ag_set(
$entity->getMetadata(),
"{$entity->via}." . iState::COLUMN_META_DATA_PLAYED_AT,
$entity->updated
);
$entity->setIsTainted(true);
return $this->add($entity, $opts);
}
if ($this->inTraceMode()) {
@@ -696,10 +685,6 @@ class DirectMapper implements ImportInterface
return $this;
}
$metadataOnly = true === (bool)ag($opts, Options::IMPORT_METADATA_ONLY);
$inDryRunMode = $this->inDryRunMode();
$onStateUpdate = ag($opts, Options::STATE_UPDATE_EVENT, null);
/**
* Handle adding new item logic.
*/
@@ -717,7 +702,7 @@ class DirectMapper implements ImportInterface
* ONLY update backend metadata
* if metadataOnly is set or the event is tainted.
*/
if (true === $metadataOnly || true === $entity->isTainted()) {
if (true === (bool)ag($opts, Options::IMPORT_METADATA_ONLY) || true === $entity->isTainted()) {
return $this->handleTainted($cloned, $entity, $opts);
}
@@ -728,7 +713,7 @@ class DirectMapper implements ImportInterface
}
/**
* Fix for #329 {@see https://github.com/arabcoders/watchstate/issues/329}
* Fix for issue #329 {@see https://github.com/arabcoders/watchstate/issues/329}
*
* This conditional block should proceed only if specific conditions are met.
* 1- the backend state is [unwatched] while the db state is [watched]
@@ -742,7 +727,9 @@ class DirectMapper implements ImportInterface
if (false === $hasMeta) {
$message .= ' No metadata. Marking the item as tainted and re-processing.';
} elseif (true === $hasDate) {
}
if (true === $hasMeta && true === $hasDate) {
$message .= ' db.metadata.played_at is equal to entity.updated. Marking the item as tainted and re-processing.';
}
@@ -765,10 +752,19 @@ class DirectMapper implements ImportInterface
}
}
$newPlayProgress = (int)ag($entity->getMetadata($entity->via), iState::COLUMN_META_DATA_PROGRESS);
$oldPlayProgress = (int)ag($cloned->getMetadata($entity->via), iState::COLUMN_META_DATA_PROGRESS);
$playChanged = false === $metadataOnly && $newPlayProgress > ($oldPlayProgress + 10);
return $this->handleUntaintedEntity($cloned, $entity, $opts);
}
/**
* Handle Untainted entities.
*/
public function handleUntaintedEntity(iState $local, iState $entity, array $opts = []): self
{
$inDryRunMode = $this->inDryRunMode();
$cloned = clone $local;
$progressChange = $this->shouldProgressUpdate($local, $entity, $opts);
$keys = $opts['diff_keys'] ?? array_flip(
array_keys_diff(
base: array_flip(iState::ENTITY_KEYS),
@@ -777,18 +773,10 @@ class DirectMapper implements ImportInterface
)
);
if ($playChanged || true === (clone $cloned)->apply(entity: $entity, fields: $keys)->isChanged(fields: $keys)) {
if (true === $progressChange || true === (clone $cloned)->apply($entity, fields: $keys)->isChanged($keys)) {
try {
$allowUpdate = (int)Config::get('progress.threshold', 0);
$minThreshold = (int)Config::get('progress.minThreshold', 86_400);
$progress = $playChanged && $entity->hasPlayProgress();
if ($entity->isWatched() && $allowUpdate < $minThreshold) {
$progress = false;
}
$_keys = array_merge($keys, [iState::COLUMN_EXTRA]);
if ($playChanged && $progress) {
if (true === $progressChange) {
$_keys[] = iState::COLUMN_VIA;
}
@@ -797,24 +785,23 @@ class DirectMapper implements ImportInterface
$changes = $local->diff(fields: $_keys);
$message = "{mapper}: [A] '{user}@{backend}' Updated '#{id}: {title}'.";
$message = "{mapper}: [U] '{user}@{backend}' Updated '#{id}: {title}'.";
$isPlayChanged = $cloned->isWatched() !== $local->isWatched();
$stateChange = $cloned->isWatched() !== $local->isWatched();
if ($playChanged && $progress) {
if (true === $progressChange) {
$message .= " Due to play progress change.";
}
if (true === $isPlayChanged) {
$message = "{mapper}: [A] '{user}@{backend}' Updated and marked '#{id}: {title}' as '{state}'.";
if (null !== $onStateUpdate) {
if (true === $stateChange) {
$message = "{mapper}: [U] '{user}@{backend}' Updated and marked '#{id}: {title}' as '{state}'.";
if (null !== ($onStateUpdate = ag($opts, Options::STATE_UPDATE_EVENT, null))) {
$onStateUpdate($local);
}
}
if (count($changes) >= 1) {
$level = ($playChanged && $progress) || $isPlayChanged ? LogLevel::NOTICE : LogLevel::INFO;
$this->logger->log($level, $message, [
if (true === $progressChange || count($changes) >= 1) {
$this->logger->log($progressChange || $stateChange ? LogLevel::NOTICE : LogLevel::INFO, $message, [
'user' => $this->userContext?->name ?? 'main',
'mapper' => afterLast(self::class, '\\'),
'id' => $cloned->id ?? 'New',
@@ -827,7 +814,7 @@ class DirectMapper implements ImportInterface
if (false === $inDryRunMode) {
$this->db->update($local);
if (true === $progress) {
if (true === $progressChange) {
$itemId = r('{type}://{id}:{tainted}@{backend}', [
'type' => $entity->type,
'backend' => $entity->via,
@@ -849,7 +836,7 @@ class DirectMapper implements ImportInterface
Message::increment("{$entity->via}.{$local->type}.failed");
$this->logger->error(
...lw(
message: "{mapper}: [A] Exception '{error.kind}' was thrown unhandled during '{user}@{backend}' - '{title}' add. {error.message} at '{error.file}:{error.line}'.",
message: "{mapper}: [U] Exception '{error.kind}' was thrown unhandled during '{user}@{backend}' - '{title}' add. {error.message} at '{error.file}:{error.line}'.",
context: [
'user' => $this->userContext?->name ?? 'main',
'mapper' => afterLast(self::class, '\\'),
@@ -887,7 +874,7 @@ class DirectMapper implements ImportInterface
if ($this->inTraceMode()) {
$this->logger->info(
"{mapper}: [A] Ignoring '{user}@{backend}' - '#{id}: {title}'. Metadata & play state are identical.",
"{mapper}: [U] Ignoring '{user}@{backend}' - '#{id}: {title}'. Metadata & play state are identical.",
$context
);
}
@@ -939,7 +926,9 @@ class DirectMapper implements ImportInterface
*/
public function commit(): array
{
if (true === (bool)Config::get('sync.progress', false) && count($this->progressItems) >= 1 && false === $this->inDryRunMode()) {
if (true === (bool)Config::get('sync.progress', false) && count(
$this->progressItems
) >= 1 && false === $this->inDryRunMode()) {
try {
$name = '{type}://{id}@{backend}';
@@ -951,7 +940,6 @@ class DirectMapper implements ImportInterface
}
foreach ($this->progressItems as $entity) {
$opts[EventsTable::COLUMN_REFERENCE] = r($name, [
'type' => $entity->type,
'backend' => $entity->via,
@@ -1179,4 +1167,34 @@ class DirectMapper implements ImportInterface
return $this;
}
/**
* Check if the progress update should be applied.
* @param iState $old The old entity.
* @param iState $new The new entity.
* @param array $opts Additional options.
*
* @return bool True if the progress update should be applied, false otherwise.
*/
private function shouldProgressUpdate(iState $old, iState $new, array $opts = []): bool
{
if (false === (bool)ag($opts, Options::IMPORT_METADATA_ONLY)) {
return false;
}
$newPlayProgress = (int)ag($new->getMetadata($new->via), iState::COLUMN_META_DATA_PROGRESS);
$oldPlayProgress = (int)ag($old->getMetadata($new->via), iState::COLUMN_META_DATA_PROGRESS);
$playChanged = $newPlayProgress > ($oldPlayProgress + 10);
$allowUpdate = (int)Config::get('progress.threshold', 0);
$minThreshold = (int)Config::get('progress.minThreshold', 86_400);
$progress = $playChanged && $new->hasPlayProgress();
if ($new->isWatched() && $allowUpdate < $minThreshold) {
$progress = false;
}
return $progress;
}
}