initial code to support play progress tracking via import.

This commit is contained in:
Abdulmhsen B. A. A
2023-11-11 14:08:05 +03:00
parent 3961fbd2e7
commit e14de10a58
7 changed files with 122 additions and 26 deletions

View File

@@ -11,6 +11,7 @@ use App\Libs\Entity\StateInterface;
use App\Libs\Extends\ConsoleOutput;
use App\Libs\Extends\HttpClient;
use App\Libs\Extends\LogMessageProcessor;
use App\Libs\Mappers\Import\DirectMapper;
use App\Libs\Mappers\Import\MemoryMapper;
use App\Libs\Mappers\ImportInterface as iImport;
use App\Libs\QueueRequests;
@@ -167,13 +168,26 @@ return (function (): array {
],
MemoryMapper::class => [
'class' => function (iLogger $logger, iDB $db): iImport {
return (new MemoryMapper(logger: $logger, db: $db))
'class' => function (iLogger $logger, iDB $db, CacheInterface $cache): iImport {
return (new MemoryMapper(logger: $logger, db: $db, cache: $cache))
->setOptions(options: Config::get('mapper.import.opts', []));
},
'args' => [
iLogger::class,
iDB::class,
CacheInterface::class
],
],
DirectMapper::class => [
'class' => function (iLogger $logger, iDB $db, CacheInterface $cache): iImport {
return (new DirectMapper(logger: $logger, db: $db, cache: $cache))
->setOptions(options: Config::get('mapper.import.opts', []));
},
'args' => [
iLogger::class,
iDB::class,
CacheInterface::class
],
],

View File

@@ -8,6 +8,7 @@ use App\Command;
use App\Commands\Backend\Library\UnmatchedCommand;
use App\Commands\Config\EditCommand;
use App\Libs\Config;
use App\Libs\Container;
use App\Libs\Database\DatabaseInterface as iDB;
use App\Libs\Entity\StateInterface as iState;
use App\Libs\Extends\StreamLogHandler;
@@ -250,7 +251,7 @@ class ImportCommand extends Command
}
if ($input->getOption('direct-mapper')) {
$this->mapper = new DirectMapper(logger: $this->logger, db: $this->db);
$this->mapper = Container::get(DirectMapper::class);
}
if (!empty($mapperOpts)) {

View File

@@ -53,7 +53,7 @@ class ProgressCommand extends Command
<error>***WARNING THIS COMMAND IS EXPERIMENTAL AND MAY NOT WORK AS EXPECTED***</error>
<notice>THIS COMMAND ONLY WORKS CORRECTLY FOR PLEX & EMBY AT THE MOMENT.</notice>
=================================================================================
Jellyfin API has a bug which i cannot do anything about.
Jellyfin API has a bug which I cannot do anything about.
This command push <notice>user</notice> watch progress to export enabled backends.
You should not run this manually and instead rely on scheduled task to run this command.
@@ -100,6 +100,9 @@ class ProgressCommand extends Command
if (!empty($items)) {
foreach ($items as $queueItem) {
$dbItem = $this->db->get($queueItem);
if ($dbItem->isWatched()) {
continue;
}
$dbItem = $dbItem->apply($queueItem);
if (!$dbItem->hasPlayProgress()) {

View File

@@ -10,10 +10,13 @@ use App\Libs\Entity\StateInterface as iState;
use App\Libs\Mappers\ImportInterface as iImport;
use App\Libs\Message;
use App\Libs\Options;
use DateInterval;
use DateTimeInterface as iDate;
use Exception;
use PDOException;
use Psr\Log\LoggerInterface as iLogger;
use Psr\SimpleCache\CacheInterface;
use Psr\SimpleCache\InvalidArgumentException;
final class DirectMapper implements iImport
{
@@ -43,8 +46,12 @@ final class DirectMapper implements iImport
protected array $options = [];
protected bool $fullyLoaded = false;
/**
* @var array<string,iState> List of items with play progress.
*/
protected array $progressItems = [];
public function __construct(protected iLogger $logger, protected iDB $db)
public function __construct(protected iLogger $logger, protected iDB $db, protected CacheInterface $cache)
{
}
@@ -297,8 +304,12 @@ final class DirectMapper implements iImport
return $this;
}
$newPlayProgress = (int)ag($entity->getMetadata($entity->via), iState::COLUMN_META_DATA_PROGRESS);
$oldPlayProgress = (int)ag($cloned->getMetadata($entity->via), iState::COLUMN_META_DATA_PROGRESS);
$playChanged = $newPlayProgress != $oldPlayProgress;
// -- this sometimes leads to never ending updates as data from backends conflicts.
if (true === (bool)ag($this->options, Options::MAPPER_ALWAYS_UPDATE_META)) {
if ($playChanged || true === (bool)ag($this->options, Options::MAPPER_ALWAYS_UPDATE_META)) {
if (true === (clone $cloned)->apply(entity: $entity, fields: $keys)->isChanged(fields: $keys)) {
try {
$local = $local->apply(
@@ -311,16 +322,30 @@ final class DirectMapper implements iImport
$changes = $local->diff(fields: $keys);
if (count($changes) >= 1) {
$this->logger->notice('MAPPER: [{backend}] updated [{title}] metadata.', [
'id' => $cloned->id,
'backend' => $entity->via,
'title' => $cloned->getName(),
'changes' => $changes,
]);
$this->logger->notice(
$playChanged ? 'MAPPER: [{backend}] updated [{title}] due to play progress change.' : 'MAPPER: [{backend}] updated [{title}] metadata.',
[
'id' => $cloned->id,
'backend' => $entity->via,
'title' => $cloned->getName(),
'changes' => $changes,
]
);
}
if (false === $inDryRunMode) {
$this->db->update($local);
if (true === $entity->hasPlayProgress()) {
$itemId = r('{type}://{id}:{tainted}@{backend}', [
'type' => $entity->type,
'backend' => $entity->via,
'tainted' => $entity->isTainted() ? 'tainted' : 'untainted',
'id' => ag($entity->getMetadata($entity->via), iState::COLUMN_ID, '??'),
]);
$this->progressItems[$itemId] = $entity;
}
}
if (null === ($this->changed[$local->id] ?? null)) {
@@ -510,8 +535,19 @@ final class DirectMapper implements iImport
return $this->db->remove($entity);
}
public function commit(): mixed
public function commit(): array
{
if (count($this->progressItems) >= 1) {
try {
$progress = $this->cache->get('progress', []);
foreach ($this->progressItems as $itemId => $entity) {
$progress[$itemId] = $entity;
}
$this->cache->set('progress', $progress, new DateInterval('P1D'));
} catch (InvalidArgumentException) {
}
}
$list = $this->actions;
$this->reset();
@@ -532,7 +568,7 @@ final class DirectMapper implements iImport
];
$this->fullyLoaded = false;
$this->changed = $this->objects = $this->pointers = [];
$this->changed = $this->objects = $this->pointers = $this->progressItems = [];
return $this;
}

View File

@@ -9,9 +9,12 @@ use App\Libs\Entity\StateInterface as iState;
use App\Libs\Mappers\ImportInterface as iImport;
use App\Libs\Message;
use App\Libs\Options;
use DateInterval;
use DateTimeInterface as iDate;
use PDOException;
use Psr\Log\LoggerInterface as iLogger;
use Psr\SimpleCache\CacheInterface;
use Psr\SimpleCache\InvalidArgumentException;
final class MemoryMapper implements iImport
{
@@ -32,11 +35,16 @@ final class MemoryMapper implements iImport
*/
protected array $changed = [];
/**
* @var array<int,iState> List of items with play progress.
*/
protected array $progressItems = [];
protected array $options = [];
protected bool $fullyLoaded = false;
public function __construct(protected iLogger $logger, protected iDB $db)
public function __construct(protected iLogger $logger, protected iDB $db, protected CacheInterface $cache)
{
}
@@ -214,8 +222,12 @@ final class MemoryMapper implements iImport
return $this;
}
$newPlayProgress = (int)ag($entity->getMetadata($entity->via), iState::COLUMN_META_DATA_PROGRESS);
$oldPlayProgress = (int)ag($cloned->getMetadata($entity->via), iState::COLUMN_META_DATA_PROGRESS);
$playChanged = $newPlayProgress != $oldPlayProgress;
// -- this sometimes leads to never ending updates as data from backends conflicts.
if (true === (bool)ag($this->options, Options::MAPPER_ALWAYS_UPDATE_META)) {
if ($playChanged || true === (bool)ag($this->options, Options::MAPPER_ALWAYS_UPDATE_META)) {
if (true === (clone $cloned)->apply(entity: $entity, fields: $keys)->isChanged(fields: $keys)) {
$this->changed[$pointer] = $pointer;
Message::increment("{$entity->via}.{$entity->type}.updated");
@@ -230,13 +242,26 @@ final class MemoryMapper implements iImport
$changes = $this->objects[$pointer]->diff(fields: $keys);
if (count($changes) >= 1) {
$this->logger->notice('MAPPER: [{backend}] updated [{title}] metadata.', [
'id' => $cloned->id,
'backend' => $entity->via,
'title' => $cloned->getName(),
'changes' => $changes,
'fields' => implode(',', $keys),
]);
$this->logger->notice(
$playChanged ? 'MAPPER: [{backend}] updated [{title}] due to play progress change.' : 'MAPPER: [{backend}] updated [{title}] metadata.',
[
'id' => $cloned->id,
'backend' => $entity->via,
'title' => $cloned->getName(),
'changes' => $changes,
'fields' => implode(',', $keys),
]
);
if (true === $entity->hasPlayProgress()) {
$itemId = r('{type}://{id}:{tainted}@{backend}', [
'type' => $entity->type,
'backend' => $entity->via,
'tainted' => $entity->isTainted() ? 'tainted' : 'untainted',
'id' => ag($entity->getMetadata($entity->via), iState::COLUMN_ID, '??'),
]);
$this->progressItems[$itemId] = $entity;
}
}
return $this;
@@ -380,6 +405,19 @@ final class MemoryMapper implements iImport
public function commit(): mixed
{
if (true !== $this->inDryRunMode()) {
if (count($this->progressItems) >= 1) {
try {
$progress = $this->cache->get('progress', []);
foreach ($this->progressItems as $itemId => $entity) {
$progress[$itemId] = $entity;
}
$this->cache->set('progress', $progress, new DateInterval('P1D'));
} catch (InvalidArgumentException) {
}
}
}
$state = $this->db->transactional(function (iDB $db) {
$list = [
iState::TYPE_MOVIE => ['added' => 0, 'updated' => 0, 'failed' => 0],
@@ -437,7 +475,7 @@ final class MemoryMapper implements iImport
public function reset(): self
{
$this->fullyLoaded = false;
$this->objects = $this->changed = $this->pointers = [];
$this->objects = $this->changed = $this->pointers = $this->progressItems = [];
return $this;
}

View File

@@ -7,12 +7,14 @@ namespace Tests\Mappers\Import;
use App\Libs\Entity\StateEntity;
use App\Libs\Mappers\Import\DirectMapper;
use App\Libs\Mappers\ImportInterface;
use Symfony\Component\Cache\Adapter\NullAdapter;
use Symfony\Component\Cache\Psr16Cache;
class DirectMapperTest extends AbstractTestsMapper
{
protected function setupMapper(): ImportInterface
{
$mapper = new DirectMapper($this->logger, $this->db);
$mapper = new DirectMapper($this->logger, $this->db, cache: new Psr16Cache(new NullAdapter()));
$mapper->setOptions(options: ['class' => new StateEntity([])]);
return $mapper;
}

View File

@@ -7,12 +7,14 @@ namespace Tests\Mappers\Import;
use App\Libs\Entity\StateEntity;
use App\Libs\Mappers\Import\MemoryMapper;
use App\Libs\Mappers\ImportInterface;
use Symfony\Component\Cache\Adapter\NullAdapter;
use Symfony\Component\Cache\Psr16Cache;
class MemoryMapperTest extends AbstractTestsMapper
{
protected function setupMapper(): ImportInterface
{
$mapper = new MemoryMapper($this->logger, $this->db);
$mapper = new MemoryMapper($this->logger, $this->db, new Psr16Cache(new NullAdapter()));
$mapper->setOptions(options: ['class' => new StateEntity([])]);
return $mapper;