Merge pull request #98 from ArabCoders/new-db

New database design.
This commit is contained in:
Abdulmohsen
2022-05-10 23:13:51 +03:00
committed by GitHub
28 changed files with 3077 additions and 3462 deletions

View File

@@ -4,6 +4,18 @@ WatchState is a CLI based tool to sync your watch state between different media
services, like trakt.tv, This tool support `Plex Media Server`, `Emby` and `Jellyfin` out of the box currently, with
plans for future expansion for other media servers.
# Breaking Change
If you are using old version of the tool i.e. before (2022-05-08) you need to run manual import to populate the new
database. We had massive code/db changes. The new database name should be `watchstate_v0.db`, if you don't have a
database named `watchstate.db` then there is nothing to do.
to manually import the run this command with --force-full flag to get all previous records.
```bash
$ docker exec -ti watchstate console state:import --force-full -vvrm
```
# Install
create your `docker-compose.yaml` file:

View File

@@ -28,10 +28,10 @@ return (function () {
],
];
$config['tmpDir'] = fixPath(env('WS_TMP_DIR', fn() => ag($config, 'path')));
$config['tmpDir'] = fixPath(env('WS_TMP_DIR', $config['path']));
$config['storage'] = [
'dsn' => 'sqlite:' . ag($config, 'path') . '/db/watchstate.db',
'dsn' => 'sqlite:' . ag($config, 'path') . '/db/watchstate_v0.db',
'username' => null,
'password' => null,
'options' => [

View File

@@ -7,7 +7,6 @@ use App\Libs\Entity\StateEntity;
use App\Libs\Entity\StateInterface;
use App\Libs\Mappers\Export\ExportMapper;
use App\Libs\Mappers\ExportInterface;
use App\Libs\Mappers\Import\DirectMapper;
use App\Libs\Mappers\Import\MemoryMapper;
use App\Libs\Mappers\ImportInterface;
use App\Libs\Storage\PDO\PDOAdapter;
@@ -89,16 +88,6 @@ return (function (): array {
],
],
DirectMapper::class => [
'class' => function (LoggerInterface $logger, StorageInterface $storage): ImportInterface {
return (new DirectMapper($logger, $storage))->setUp(Config::get('mapper.import.opts', []));
},
'args' => [
LoggerInterface::class,
StorageInterface::class,
],
],
ImportInterface::class => [
'class' => function (ImportInterface $mapper): ImportInterface {
return $mapper;

View File

@@ -1,31 +1,32 @@
-- # migrate_up
CREATE TABLE IF NOT EXISTS "state"
CREATE TABLE "state"
(
"id" integer NOT NULL PRIMARY KEY AUTOINCREMENT,
"type" text NOT NULL,
"updated" integer NOT NULL,
"watched" integer NOT NULL DEFAULT 0,
"meta" text NULL,
"guid_plex" text NULL,
"guid_imdb" text NULL,
"guid_tvdb" text NULL,
"guid_tmdb" text NULL,
"guid_tvmaze" text NULL,
"guid_tvrage" text NULL,
"guid_anidb" text NULL
"id" integer NOT NULL PRIMARY KEY AUTOINCREMENT,
"type" text NOT NULL,
"updated" integer NOT NULL,
"watched" integer NOT NULL DEFAULT '0',
"via" text NOT NULL,
"title" text NOT NULL,
"year" integer NULL,
"season" integer NULL,
"episode" integer NULL,
"parent" text NULL,
"guids" text NULL,
"extra" text NULL
);
CREATE INDEX IF NOT EXISTS "state_type" ON "state" ("type");
CREATE INDEX IF NOT EXISTS "state_watched" ON "state" ("watched");
CREATE INDEX IF NOT EXISTS "state_updated" ON "state" ("updated");
CREATE INDEX IF NOT EXISTS "state_meta" ON "state" ("meta");
CREATE INDEX IF NOT EXISTS "state_guid_plex" ON "state" ("guid_plex");
CREATE INDEX IF NOT EXISTS "state_guid_imdb" ON "state" ("guid_imdb");
CREATE INDEX IF NOT EXISTS "state_guid_tvdb" ON "state" ("guid_tvdb");
CREATE INDEX IF NOT EXISTS "state_guid_tvmaze" ON "state" ("guid_tvmaze");
CREATE INDEX IF NOT EXISTS "state_guid_tvrage" ON "state" ("guid_tvrage");
CREATE INDEX IF NOT EXISTS "state_guid_anidb" ON "state" ("guid_anidb");
CREATE INDEX "state_type" ON "state" ("type");
CREATE INDEX "state_updated" ON "state" ("updated");
CREATE INDEX "state_watched" ON "state" ("watched");
CREATE INDEX "state_via" ON "state" ("via");
CREATE INDEX "state_title" ON "state" ("title");
CREATE INDEX "state_year" ON "state" ("year");
CREATE INDEX "state_season" ON "state" ("season");
CREATE INDEX "state_episode" ON "state" ("episode");
CREATE INDEX "state_parent" ON "state" ("parent");
CREATE INDEX "state_guids" ON "state" ("guids");
CREATE INDEX "state_extra" ON "state" ("extra");
-- # migrate_down

View File

@@ -39,13 +39,19 @@ final class ListCommand extends Command
'Limit results to this specified server. This filter is not reliable. and changes based on last server query.'
)
->addOption('output', null, InputOption::VALUE_REQUIRED, 'Display output as [json, yaml, table]', 'table')
->addOption('series', null, InputOption::VALUE_REQUIRED, 'Limit results to this specified series.')
->addOption('movie', null, InputOption::VALUE_REQUIRED, 'Limit results to this specified movie.')
->addOption('parent', null, InputOption::VALUE_NONE, 'If set it will search parent GUIDs instead.')
->addOption(
'type',
null,
InputOption::VALUE_REQUIRED,
'Limit results to this specified type can be [movie or episode].'
)
->addOption('title', null, InputOption::VALUE_REQUIRED, 'Limit results to this specified tv show.')
->addOption('season', null, InputOption::VALUE_REQUIRED, 'Select season number')
->addOption('episode', null, InputOption::VALUE_REQUIRED, 'Select episode number')
->addOption('id', null, InputOption::VALUE_REQUIRED, 'Select db record number')
->addOption('sort', null, InputOption::VALUE_REQUIRED, 'sort order by [id, updated]', 'updated')
->addOption('asc', null, InputOption::VALUE_NONE, 'Sort records in ascending order.')
->addOption('desc', null, InputOption::VALUE_NONE, 'Sort records in descending order. (Default)')
->setDescription('List Database entries.');
foreach (array_keys(Guid::SUPPORTED) as $guid) {
@@ -57,6 +63,8 @@ final class ListCommand extends Command
'Search Using ' . ucfirst($guid) . ' id.'
);
}
$this->addOption('parent', null, InputOption::VALUE_NONE, 'If set it will search parent GUIDs instead.');
}
/**
@@ -89,32 +97,37 @@ final class ListCommand extends Command
$sql = "SELECT * FROM state ";
if ($input->getOption('via')) {
$where[] = "json_extract(meta,'$.via') = :via";
$params['via'] = $input->getOption('via');
}
if ($input->getOption('id')) {
$where[] = "id = :id";
$params['id'] = $input->getOption('id');
}
if ($input->getOption('series')) {
$where[] = "json_extract(meta,'$.series') = :series";
$params['series'] = $input->getOption('series');
if ($input->getOption('via')) {
$where[] = "via = :via";
$params['via'] = $input->getOption('via');
}
if ($input->getOption('movie')) {
$where[] = "json_extract(meta,'$.title') = :movie";
$params['movie'] = $input->getOption('movie');
if ($input->getOption('type')) {
$where[] = "type = :type";
$params['type'] = match ($input->getOption('type')) {
StateInterface::TYPE_MOVIE => StateInterface::TYPE_MOVIE,
default => StateInterface::TYPE_EPISODE,
};
}
if ($input->getOption('title')) {
$where[] = "title LIKE '%' || :title || '%'";
$params['title'] = $input->getOption('title');
}
if (null !== $input->getOption('season')) {
$where[] = "json_extract(meta,'$.season') = " . (int)$input->getOption('season');
$where[] = "season = :season";
$params['season'] = $input->getOption('season');
}
if (null !== $input->getOption('episode')) {
$where[] = "json_extract(meta,'$.episode') = " . (int)$input->getOption('episode');
$where[] = "episode = :episode";
$params['episode'] = $input->getOption('episode');
}
if ($input->getOption('parent')) {
@@ -122,7 +135,7 @@ final class ListCommand extends Command
if (null === ($val = $input->getOption(afterLast($guid, 'guid_')))) {
continue;
}
$where[] = "json_extract(meta,'$.parent.{$guid}') = :{$guid}";
$where[] = "json_extract(parent,'$.{$guid}') = :{$guid}";
$params[$guid] = $val;
}
} else {
@@ -130,7 +143,7 @@ final class ListCommand extends Command
if (null === ($val = $input->getOption(afterLast($guid, 'guid_')))) {
continue;
}
$where[] = "{$guid} LIKE '%' || :{$guid} || '%'";
$where[] = "json_extract(guids,'$.{$guid}') = :{$guid}";
$params[$guid] = $val;
}
}
@@ -139,8 +152,17 @@ final class ListCommand extends Command
$sql .= 'WHERE ' . implode(' AND ', $where);
}
$sort = $input->getOption('sort') === 'id' ? 'id' : 'updated';
$sql .= " ORDER BY {$sort} DESC LIMIT :limit";
$sort = match ($input->getOption('sort')) {
'id' => 'id',
'season' => 'season',
'episode' => 'episode',
'type' => 'type',
default => 'updated',
};
$sortOrder = ($input->getOption('asc')) ? 'ASC' : 'DESC';
$sql .= " ORDER BY {$sort} {$sortOrder} LIMIT :limit";
$stmt = $this->pdo->prepare($sql);
$stmt->execute($params);
@@ -200,27 +222,27 @@ final class ListCommand extends Command
$type = strtolower($row['type'] ?? '??');
$meta = json_decode(ag($row, 'meta', '{}'), true);
$extra = json_decode(ag($row, 'extra', '{}'), true);
$episode = null;
if (StateInterface::TYPE_EPISODE === $type) {
$episode = sprintf(
'%sx%s',
str_pad((string)($meta['season'] ?? 0), 2, '0', STR_PAD_LEFT),
str_pad((string)($meta['episode'] ?? 0), 2, '0', STR_PAD_LEFT),
str_pad((string)($row['season'] ?? 0), 2, '0', STR_PAD_LEFT),
str_pad((string)($row['episode'] ?? 0), 3, '0', STR_PAD_LEFT),
);
}
$list[] = [
$row['id'],
ucfirst($row['type'] ?? '??'),
$meta['via'] ?? '??',
$meta['series'] ?? $meta['title'] ?? '??',
$meta['year'] ?? '0000',
$row['via'] ?? '??',
$row['title'] ?? '??',
$row['year'] ?? '0000',
$episode ?? '-',
makeDate($row['updated']),
true === (bool)$row['watched'] ? 'Yes' : 'No',
$meta['webhook']['event'] ?? '-',
$extra['webhook']['event'] ?? '-',
];
if ($x < $rowCount) {

View File

@@ -91,12 +91,19 @@ final class EditCommand extends Command
}
if (null !== $value) {
if ($value === ag($server, $key)) {
if (true === ctype_digit($value)) {
$value = (int)$value;
} elseif ('true' === strtolower((string)$value) || 'false' === strtolower((string)$value)) {
$value = 'true' === $value;
} else {
$value = (string)$value;
}
if ($value === ag($server, $key, null)) {
$output->writeln('<comment>Not updating. Value already matches.</comment>');
return self::SUCCESS;
}
$value = ctype_digit($value) ? (int)$value : (string)$value;
$server = ag_set($server, $key, $value);
$output->writeln(
@@ -104,7 +111,7 @@ final class EditCommand extends Command
'<info>Updated server:\'%s\' key \'%s\' with value of \'%s\'.</info>',
$name,
$key,
$value
is_bool($value) ? (true === $value ? 'true' : 'false') : $value,
)
);
}

View File

@@ -6,11 +6,9 @@ namespace App\Commands\State;
use App\Command;
use App\Libs\Config;
use App\Libs\Container;
use App\Libs\Data;
use App\Libs\Entity\StateInterface;
use App\Libs\Extends\CliLogger;
use App\Libs\Mappers\Import\DirectMapper;
use App\Libs\Mappers\ImportInterface;
use App\Libs\Storage\PDO\PDOAdapter;
use App\Libs\Storage\StorageInterface;
@@ -72,12 +70,6 @@ class ImportCommand extends Command
'Sync selected servers, comma seperated. \'s1,s2\'.',
''
)
->addOption(
'import-unwatched',
null,
InputOption::VALUE_NONE,
'--DEPRECATED-- will be removed in v1.x. We import the item regardless of watched/unwatched state.'
)
->addOption('stats-show', null, InputOption::VALUE_NONE, 'Show final status.')
->addOption(
'stats-filter',
@@ -86,11 +78,12 @@ class ImportCommand extends Command
'Filter final status output e.g. (servername.key)',
null
)
->addOption('dry-run', null, InputOption::VALUE_NONE, 'Do not commit any changes.')
->addOption(
'mapper-direct',
'deep-debug',
null,
InputOption::VALUE_NONE,
'Uses less memory. However, it\'s significantly slower then default mapper.'
'You should not use this flag unless told by the team.'
)
->addOption('config', 'c', InputOption::VALUE_REQUIRED, 'Use Alternative config file.');
}
@@ -114,10 +107,6 @@ class ImportCommand extends Command
$config = Config::get('path') . '/config/servers.yaml';
}
if ($input->getOption('mapper-direct')) {
$this->mapper = Container::get(DirectMapper::class);
}
$list = [];
$serversFilter = (string)$input->getOption('servers-filter');
$selected = explode(',', $serversFilter);
@@ -135,6 +124,22 @@ class ImportCommand extends Command
$this->mapper->setLogger($logger);
}
$mapperOpts = [];
if ($input->getOption('dry-run')) {
$output->writeln('<info>Dry run mode. No changes will be committed to backend.</info>');
$mapperOpts[ImportInterface::DRY_RUN] = true;
}
if ($input->getOption('deep-debug')) {
$mapperOpts[ImportInterface::DEEP_DEBUG] = true;
}
if (!empty($mapperOpts)) {
$this->mapper->setUp($mapperOpts);
}
foreach (Config::get('servers', []) as $serverName => $server) {
$type = strtolower(ag($server, 'type', 'unknown'));
@@ -183,7 +188,7 @@ class ImportCommand extends Command
/** @var array<array-key,ResponseInterface> $queue */
$queue = [];
if (count($list) >= 1 && !$input->getOption('mapper-direct')) {
if (count($list) >= 1) {
$this->logger->info('Preloading all mapper data.');
$this->mapper->loadData();
$this->logger->info('Finished preloading mapper data.');

View File

@@ -16,6 +16,7 @@ use Psr\SimpleCache\CacheInterface;
use Psr\SimpleCache\InvalidArgumentException;
use RuntimeException;
use Symfony\Component\Console\Helper\Table;
use Symfony\Component\Console\Helper\TableSeparator;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
@@ -99,45 +100,28 @@ class PushCommand extends Command
}
if ($input->getOption('queue-show')) {
$table = new Table($output);
$rows = [];
$table->setHeaders(
[
'ID',
'Type',
'Date',
'Via',
'Main Title',
'Year | Episode',
'Watched'
]
);
$x = 0;
$count = count($entities);
foreach ($entities as $entity) {
$number = '( ' . ag($entity->meta, 'year', 0) . ' )';
if (StateInterface::TYPE_EPISODE === $entity->type) {
$number .= sprintf(
' - S%sE%s',
str_pad((string)($entity->meta['season'] ?? 0), 2, '0', STR_PAD_LEFT),
str_pad((string)($entity->meta['episode'] ?? 0), 2, '0', STR_PAD_LEFT),
);
}
$x++;
$rows[] = [
$entity->id,
$entity->type,
$entity->getName(),
$entity->isWatched() ? 'Yes' : 'No',
$entity->via ?? '??',
makeDate($entity->updated),
ag($entity->meta, 'via', '??'),
ag($entity->meta, 'series', ag($entity->meta, 'title', '??')),
$number,
$entity->watched ? 'Yes' : 'No',
];
if ($x < $count) {
$rows[] = new TableSeparator();
}
}
$table->setRows($rows);
$table->render();
(new Table($output))->setHeaders(['Media Title', 'Played', 'Via', 'Record Date']
)->setStyle('box')->setRows($rows)->render();
return self::SUCCESS;
}
@@ -148,7 +132,7 @@ class PushCommand extends Command
foreach (Config::get('servers', []) as $serverName => $server) {
$type = strtolower(ag($server, 'type', 'unknown'));
if (true !== ag($server, 'webhook.push')) {
if (true !== (bool)ag($server, 'webhook.push')) {
$output->writeln(
sprintf('<error>Ignoring \'%s\' as requested by user config option.</error>', $serverName),
OutputInterface::VERBOSITY_VERBOSE
@@ -231,11 +215,12 @@ class PushCommand extends Command
if (200 !== $response->getStatusCode()) {
throw new ServerException($response);
}
$this->logger->info(
$this->logger->notice(
sprintf(
'Processed: State (%s) - %s',
ag($requestData, 'state', '??'),
'%s Processed \'%s\'. Set remote state to \'%s\'.',
ag($requestData, 'server', '??'),
ag($requestData, 'itemName', '??'),
ag($requestData, 'state', '??'),
)
);
} catch (ExceptionInterface $e) {

View File

@@ -12,21 +12,21 @@ final class StateEntity implements StateInterface
private array $data = [];
private bool $tainted = false;
/**
* User Addressable Variables.
*/
public null|string|int $id = null;
public string $type = '';
public int $updated = 0;
public int $watched = 0;
public array $meta = [];
public string|null $guid_plex = null;
public string|null $guid_imdb = null;
public string|null $guid_tvdb = null;
public string|null $guid_tmdb = null;
public string|null $guid_tvmaze = null;
public string|null $guid_tvrage = null;
public string|null $guid_anidb = null;
public string $via = '';
public string $title = '';
public int|null $year = null;
public int|null $season = null;
public int|null $episode = null;
public array $parent = [];
public array $guids = [];
public array $extra = [];
public function __construct(array $data)
{
@@ -46,8 +46,16 @@ final class StateEntity implements StateInterface
);
}
if ('meta' === $key && is_string($val)) {
if (null === ($val = json_decode($val, true))) {
foreach (StateInterface::ENTITY_ARRAY_KEYS as $subKey) {
if ($subKey !== $key) {
continue;
}
if (true === is_array($val)) {
continue;
}
if (null === ($val = json_decode($val ?? '{}', true))) {
$val = [];
}
}
@@ -63,20 +71,12 @@ final class StateEntity implements StateInterface
return new self($data);
}
public function diff(): array
public function diff(bool $all = false): array
{
$changed = [];
foreach ($this->getAll() as $key => $value) {
/**
* We ignore meta on purpose as it changes frequently.
* from one server to another.
*/
if ('meta' === $key && !$this->isEpisode()) {
continue;
}
if ('meta' === $key && ($value['parent'] ?? []) === ($this->data['parent'] ?? [])) {
if (false === $all && true === in_array($key, StateInterface::ENTITY_IGNORE_DIFF_CHANGES)) {
continue;
}
@@ -84,25 +84,21 @@ final class StateEntity implements StateInterface
continue;
}
if ('meta' === $key) {
$getChanged = array_diff_assoc_recursive($this->data['meta'] ?? [], $this->meta);
foreach ($getChanged as $metaKey => $_) {
$changed['new'][$key][$metaKey] = $this->meta[$metaKey] ?? 'None';
$changed['old'][$key][$metaKey] = $this->data[$key][$metaKey] ?? 'None';
if (true === in_array($key, StateInterface::ENTITY_ARRAY_KEYS)) {
$changes = array_diff_assoc_recursive($this->data[$key] ?? [], $value ?? []);
if (!empty($changes)) {
foreach (array_keys($changes) as $subKey) {
$changed[$key][$subKey] = [
'old' => $this->data[$key][$subKey] ?? 'None',
'new' => $value[$subKey] ?? 'None'
];
}
}
} else {
$changed['new'][$key] = $value ?? 'None';
$changed['old'][$key] = $this->data[$key] ?? 'None';
}
}
if (!empty($changed) && !array_key_exists('meta', $changed['new'] ?? $changed['old'] ?? [])) {
$getChanged = array_diff_assoc_recursive($this->data['meta'] ?? [], $this->meta);
foreach ($getChanged as $key => $_) {
$changed['new']['meta'][$key] = $this->meta[$key] ?? 'None';
$changed['old']['meta'][$key] = $this->data['meta'][$key] ?? 'None';
$changed[$key] = [
'old' => $this->data[$key] ?? 'None',
'new' => $value ?? 'None'
];
}
}
@@ -112,21 +108,15 @@ final class StateEntity implements StateInterface
public function getName(): string
{
if ($this->isMovie()) {
return sprintf(
'%s (%d) - @%s',
$this->meta['title'] ?? $this->data['meta']['title'] ?? '??',
$this->meta['year'] ?? $this->data['meta']['year'] ?? '??',
$this->meta['via'] ?? $this->data['meta']['via'] ?? '??',
);
return sprintf('%s (%d)', $this->title ?? '??', $this->year ?? 0000);
}
return sprintf(
'%s (%d) - %dx%d - @%s',
$this->meta['series'] ?? $this->data['meta']['series'] ?? '??',
$this->meta['year'] ?? $this->data['meta']['year'] ?? '??',
$this->meta['season'] ?? $this->data['meta']['season'] ?? 00,
$this->meta['episode'] ?? $this->data['meta']['episode'] ?? 00,
$this->meta['via'] ?? $this->data['meta']['via'] ?? '??',
'%s (%s) - %sx%s',
$this->title ?? '??',
$this->year ?? 0000,
str_pad((string)($this->season ?? 0), 2, '0', STR_PAD_LEFT),
str_pad((string)($this->episode ?? 0), 3, '0', STR_PAD_LEFT)
);
}
@@ -137,41 +127,40 @@ final class StateEntity implements StateInterface
'type' => $this->type,
'updated' => $this->updated,
'watched' => $this->watched,
'meta' => $this->meta,
'guid_plex' => $this->guid_plex,
'guid_imdb' => $this->guid_imdb,
'guid_tvdb' => $this->guid_tvdb,
'guid_tmdb' => $this->guid_tmdb,
'guid_tvmaze' => $this->guid_tvmaze,
'guid_tvrage' => $this->guid_tvrage,
'guid_anidb' => $this->guid_anidb,
'via' => $this->via,
'title' => $this->title,
'year' => $this->year,
'season' => $this->season,
'episode' => $this->episode,
'parent' => $this->parent,
'guids' => $this->guids,
'extra' => $this->extra,
];
}
public function isChanged(): bool
{
return count($this->diff()) >= 1;
return count($this->diff(all: false)) >= 1;
}
public function hasGuids(): bool
{
foreach (array_keys(Guid::SUPPORTED) as $key) {
if (null !== $this->{$key}) {
return true;
}
}
return count($this->guids) >= 1;
}
return false;
public function getGuids(): array
{
return $this->guids;
}
public function hasParentGuid(): bool
{
return count($this->getParentGuids()) >= 1;
return count($this->parent) >= 1;
}
public function getParentGuids(): array
{
return (array)ag($this->meta, 'parent', []);
return $this->parent;
}
public function isMovie(): bool
@@ -184,29 +173,26 @@ final class StateEntity implements StateInterface
return StateInterface::TYPE_EPISODE === $this->type;
}
public function isWatched(): bool
{
return 1 === $this->watched;
}
public function hasRelativeGuid(): bool
{
$parents = ag($this->meta, 'parent', []);
$season = ag($this->meta, 'season', null);
$episode = ag($this->meta, 'episode', null);
return !(null === $season || null === $episode || 0 === $episode || empty($parents));
return $this->isEpisode() && !empty($this->parent) && null !== $this->season && null !== $this->episode;
}
public function getRelativeGuids(): array
{
$parents = ag($this->meta, 'parent', []);
$season = ag($this->meta, 'season', null);
$episode = ag($this->meta, 'episode', null);
if (null === $season || null === $episode || 0 === $episode || empty($parents)) {
if (!$this->isEpisode()) {
return [];
}
$list = [];
foreach ($parents as $key => $val) {
$list[$key] = $val . '/' . $season . '/' . $episode;
foreach ($this->parent as $key => $val) {
$list[$key] = $val . '/' . $this->season . '/' . $this->episode;
}
return array_intersect_key($list, Guid::SUPPORTED);
@@ -214,20 +200,40 @@ final class StateEntity implements StateInterface
public function getRelativePointers(): array
{
return Guid::fromArray($this->getRelativeGuids())->getPointers();
if (!$this->isEpisode()) {
return [];
}
$list = Guid::fromArray($this->getRelativeGuids())->getPointers();
$rPointers = [];
foreach ($list as $val) {
$rPointers[] = 'r' . $val;
}
return $rPointers;
}
public function apply(StateInterface $entity, bool $guidOnly = false): self
{
if (true === $guidOnly) {
if ($this->guids !== $entity->guids) {
$this->updateValue('guids', $entity);
}
if ($this->parent !== $entity->parent) {
$this->updateValue('parent', $entity);
}
return $this;
}
if ($this->isEqual($entity)) {
return $this;
}
foreach ($entity->getAll() as $key => $val) {
if (true === $guidOnly && !str_starts_with($key, 'guid_')) {
continue;
}
$this->updateValue($key, $entity);
}
@@ -245,9 +251,17 @@ final class StateEntity implements StateInterface
return $this->data;
}
public function getPointers(): array
public function getPointers(array|null $guids = null): array
{
return Guid::fromArray(array_intersect_key((array)$this, Guid::SUPPORTED))->getPointers();
$list = array_intersect_key($this->guids, Guid::SUPPORTED);
if ($this->isEpisode()) {
foreach ($list as $key => $val) {
$list[$key] = $val . '/' . $this->season . '/' . $this->episode;
}
}
return Guid::fromArray($list)->getPointers();
}
public function setIsTainted(bool $isTainted): StateInterface
@@ -275,7 +289,7 @@ final class StateEntity implements StateInterface
private function isEqualValue(string $key, StateInterface $entity): bool
{
if ($key === 'updated' || $key === 'watched') {
if ('updated' === $key || 'watched' === $key) {
return !($entity->updated > $this->updated && $entity->watched !== $this->watched);
}
@@ -288,7 +302,7 @@ final class StateEntity implements StateInterface
private function updateValue(string $key, StateInterface $entity): void
{
if ($key === 'updated' || $key === 'watched') {
if ('updated' === $key || 'watched' === $key) {
if ($entity->updated > $this->updated && $entity->watched !== $this->watched) {
$this->updated = $entity->updated;
$this->watched = $entity->watched;
@@ -296,12 +310,14 @@ final class StateEntity implements StateInterface
return;
}
if (null !== ($entity->{$key} ?? null) && $this->{$key} !== $entity->{$key}) {
if ('meta' === $key) {
$this->{$key} = array_replace_recursive($this->{$key} ?? [], $entity->{$key} ?? []);
} else {
$this->{$key} = $entity->{$key};
}
if ('id' === $key) {
return;
}
if (true === in_array($key, StateInterface::ENTITY_ARRAY_KEYS)) {
$this->{$key} = array_replace_recursive($this->{$key} ?? [], $entity->{$key} ?? []);
} else {
$this->{$key} = $entity->{$key};
}
}
}

View File

@@ -9,19 +9,32 @@ interface StateInterface
public const TYPE_MOVIE = 'movie';
public const TYPE_EPISODE = 'episode';
public const ENTITY_IGNORE_DIFF_CHANGES = [
'via',
'extra',
'title',
'year',
];
public const ENTITY_ARRAY_KEYS = [
'parent',
'guids',
'extra'
];
public const ENTITY_KEYS = [
'id',
'type',
'updated',
'watched',
'meta',
'guid_plex',
'guid_imdb',
'guid_tvdb',
'guid_tmdb',
'guid_tvmaze',
'guid_tvrage',
'guid_anidb',
'via',
'title',
'year',
'season',
'episode',
'parent',
'guids',
'extra',
];
/**
@@ -36,9 +49,11 @@ interface StateInterface
/**
* Return An array of changed items.
*
* @param bool $all check all keys. including ignored keys.
*
* @return array
*/
public function diff(): array;
public function diff(bool $all = false): array;
/**
* Get All Entity keys.
@@ -55,14 +70,21 @@ interface StateInterface
public function isChanged(): bool;
/**
* Does the entity have GUIDs?
* Does the entity have external ids?
*
* @return bool
*/
public function hasGuids(): bool;
/**
* Does the entity have Relative GUIDs?
* Get List of external ids.
*
* @return array
*/
public function getGuids(): array;
/**
* Does the entity have relative external ids?
*
* @return bool
*/
@@ -83,14 +105,14 @@ interface StateInterface
public function getRelativePointers(): array;
/**
* Does the Entity have Parent IDs?
* Does the Entity have Parent external ids?
*
* @return bool
*/
public function hasParentGuid(): bool;
/**
* Get Parent GUIDs.
* Get Parent external ids.
*
* @return array
*/
@@ -110,6 +132,13 @@ interface StateInterface
*/
public function isEpisode(): bool;
/**
* Is entity marked as watched?
*
* @return bool
*/
public function isWatched(): bool;
/**
* Get constructed name.
*
@@ -118,7 +147,7 @@ interface StateInterface
public function getName(): string;
/**
* Get GUID Pointers.
* Get external ids Pointers.
*
* @return array
*/
@@ -152,8 +181,8 @@ interface StateInterface
* The Tainted flag control whether we will change state or not.
* If the entity is not already stored in the database, then this flag is not used.
* However, if the entity already exists and the flag is set to **true**, then
* we will be checking **GUIDs** only, and if those differ then meta will be updated as well.
* otherwise, nothing will be changed, This flag serve to update GUIDs via webhook unhelpful events like
* we will be checking **external ids** only, and if those differ {@see ENTITY_IGNORE_DIFF_CHANGES} will be updated
* as well, otherwise, nothing will be changed, This flag serve to update GUIDs via webhook unhelpful events like
* play/stop/resume.
*
* @param bool $isTainted

View File

@@ -4,12 +4,11 @@ declare(strict_types=1);
namespace App\Libs;
use JsonException;
use RuntimeException;
final class Guid
{
public const LOOKUP_KEY = '%s://%s';
public const GUID_PLEX = 'guid_plex';
public const GUID_IMDB = 'guid_imdb';
public const GUID_TVDB = 'guid_tvdb';
@@ -19,37 +18,101 @@ final class Guid
public const GUID_ANIDB = 'guid_anidb';
public const SUPPORTED = [
self::GUID_PLEX => 'string',
self::GUID_IMDB => 'string',
self::GUID_TVDB => 'string',
self::GUID_TMDB => 'string',
self::GUID_TVMAZE => 'string',
self::GUID_TVRAGE => 'string',
self::GUID_ANIDB => 'string',
Guid::GUID_PLEX => 'string',
Guid::GUID_IMDB => 'string',
Guid::GUID_TVDB => 'string',
Guid::GUID_TMDB => 'string',
Guid::GUID_TVMAZE => 'string',
Guid::GUID_TVRAGE => 'string',
Guid::GUID_ANIDB => 'string',
];
private const LOOKUP_KEY = '%s://%s';
private array $data = [];
/**
* Create List of db => external id list.
*
* @param array $guids Key/value pair of db => external id. For example, [ "guid_imdb" => "tt123456789" ]
*
* @throws RuntimeException if key/value is of unexpected type or unsupported.
*/
public function __construct(array $guids)
{
foreach ($guids as $key => $value) {
if (null === $value || null === (self::SUPPORTED[$key] ?? null)) {
if (null === $value || null === (Guid::SUPPORTED[$key] ?? null)) {
continue;
}
$this->updateGuid($key, $value);
if ($value === ($this->data[$key] ?? null)) {
continue;
}
if (!is_string($key)) {
throw new RuntimeException(
sprintf(
'Unexpected offset type was given. Was expecting \'string\' but got \'%s\' instead.',
get_debug_type($key)
),
);
}
if (null === (Guid::SUPPORTED[$key] ?? null)) {
throw new RuntimeException(
sprintf(
'Unexpected key. Was expecting one of \'%s\', but got \'%s\' instead.',
implode(', ', array_keys(Guid::SUPPORTED)),
$key
),
);
}
if (Guid::SUPPORTED[$key] !== ($valueType = get_debug_type($value))) {
throw new RuntimeException(
sprintf(
'Unexpected value type for \'%s\'. Was Expecting \'%s\' but got \'%s\' instead.',
$key,
Guid::SUPPORTED[$key],
$valueType
)
);
}
$this->data[$key] = $value;
}
}
public static function fromArray(array $guids): self
/**
* Create new instance from array payload.
*
* @param array $payload Key/value pair of db => external id. For example, [ "guid_imdb" => "tt123456789" ]
*
* @return static
*/
public static function fromArray(array $payload): self
{
return new self($guids);
return new self($payload);
}
public static function fromJson(string $guids): self
/**
* Create new instance from json payload.
*
* @param string $payload Key/value pair of db => external id. For example, { "guid_imdb" : "tt123456789" }
*
* @return static
* @throws JsonException if decoding JSON payload fails.
*/
public static function fromJson(string $payload): self
{
return new self(json_decode($guids, true));
return new self(json_decode(json: $payload, associative: true, flags: JSON_THROW_ON_ERROR));
}
/**
* Return suitable pointers to link entity to external id.
*
* @return array
*/
public function getPointers(): array
{
$arr = [];
@@ -61,47 +124,13 @@ final class Guid
return $arr;
}
public function getGuids(): array
/**
* Return list of External ids.
*
* @return array
*/
public function getAll(): array
{
return $this->data;
}
private function updateGuid(mixed $key, mixed $value): void
{
if ($value === ($this->data[$key] ?? null)) {
return;
}
if (!is_string($key)) {
throw new RuntimeException(
sprintf(
'Unexpected offset type was given. Was expecting \'string\' but got \'%s\' instead.',
get_debug_type($key)
),
);
}
if (null === (self::SUPPORTED[$key] ?? null)) {
throw new RuntimeException(
sprintf(
'Unexpected offset key. Was expecting one of \'%s\', but got \'%s\' instead.',
implode(', ', array_keys(self::SUPPORTED)),
$key
),
);
}
if (self::SUPPORTED[$key] !== ($valueType = get_debug_type($value))) {
throw new RuntimeException(
sprintf(
'Unexpected value type for \'%s\'. Was Expecting \'%s\' but got \'%s\' instead.',
$key,
self::SUPPORTED[$key],
$valueType
)
);
}
$this->data[$key] = $value;
}
}

View File

@@ -4,9 +4,7 @@ declare(strict_types=1);
namespace App\Libs\Mappers\Export;
use App\Libs\Container;
use App\Libs\Entity\StateInterface;
use App\Libs\Guid;
use App\Libs\Mappers\ExportInterface;
use App\Libs\Storage\StorageInterface;
use DateTimeInterface;
@@ -73,40 +71,13 @@ final class ExportMapper implements ExportInterface
return $this->objects[$entity->id];
}
if ($entity->hasGuids()) {
foreach ($entity->getPointers() as $key) {
if (null !== ($this->guids[$key] ?? null)) {
return $this->objects[$this->guids[$key]];
}
foreach ($entity->getRelativePointers() as $key) {
if (null !== ($this->guids[$key] ?? null)) {
return $this->objects[$this->guids[$key]];
}
}
if ($entity->isEpisode() && $entity->hasRelativeGuid()) {
foreach ($entity->getRelativePointers() as $key) {
if (null !== ($this->guids[$key] ?? null)) {
return $this->objects[$this->guids[$key]];
}
}
}
if (true === $this->fullyLoaded) {
return null;
}
if (null !== ($lazyEntity = $this->storage->get($entity))) {
$this->objects[$lazyEntity->id] = $lazyEntity;
$this->addGuids($this->objects[$lazyEntity->id], $lazyEntity->id);
return $this->objects[$lazyEntity->id];
}
return null;
}
public function findByIds(array $ids): null|StateInterface
{
$pointers = Guid::fromArray($ids)->getPointers();
foreach ($pointers as $key) {
foreach ($entity->getPointers() as $key) {
if (null !== ($this->guids[$key] ?? null)) {
return $this->objects[$this->guids[$key]];
}
@@ -116,8 +87,6 @@ final class ExportMapper implements ExportInterface
return null;
}
$entity = Container::get(StateInterface::class)::fromArray($ids);
if (null !== ($lazyEntity = $this->storage->get($entity))) {
$this->objects[$lazyEntity->id] = $lazyEntity;
$this->addGuids($this->objects[$lazyEntity->id], $lazyEntity->id);
@@ -169,16 +138,12 @@ final class ExportMapper implements ExportInterface
private function addGuids(StateInterface $entity, int|string $pointer): void
{
if ($entity->hasGuids()) {
foreach ($entity->getPointers() as $key) {
$this->guids[$key] = $pointer;
}
foreach ($entity->getPointers() as $key) {
$this->guids[$key] = $pointer;
}
if ($entity->isEpisode() && $entity->hasRelativeGuid()) {
foreach ($entity->getRelativePointers() as $key) {
$this->guids[$key] = $pointer;
}
foreach ($entity->getRelativePointers() as $key) {
$this->guids[$key] = $pointer;
}
}
}

View File

@@ -48,15 +48,6 @@ interface ExportInterface
*/
public function get(StateInterface $entity): null|StateInterface;
/**
* Find Entity By Ids.
*
* @param array $ids
*
* @return StateInterface|null
*/
public function findByIds(array $ids): null|StateInterface;
/**
* Has Entity.
*

View File

@@ -1,177 +0,0 @@
<?php
declare(strict_types=1);
namespace App\Libs\Mappers\Import;
use App\Libs\Data;
use App\Libs\Entity\StateInterface;
use App\Libs\Mappers\ImportInterface;
use App\Libs\Storage\StorageInterface;
use DateTimeInterface;
use Psr\Log\LoggerInterface;
use Throwable;
final class DirectMapper implements ImportInterface
{
private array $operations = [
StateInterface::TYPE_MOVIE => ['added' => 0, 'updated' => 0, 'failed' => 0],
StateInterface::TYPE_EPISODE => ['added' => 0, 'updated' => 0, 'failed' => 0],
];
private int $changed = 0;
public function __construct(private LoggerInterface $logger, private StorageInterface $storage)
{
}
public function setUp(array $opts): ImportInterface
{
return $this;
}
public function loadData(DateTimeInterface|null $date = null): ImportInterface
{
return $this;
}
public function add(string $bucket, string $name, StateInterface $entity, array $opts = []): self
{
if (!$entity->hasGuids() && $entity->hasRelativeGuid()) {
$this->logger->info(sprintf('Ignoring %s. No valid GUIDs.', $name));
Data::increment($bucket, $entity->type . '_failed_no_guid');
return $this;
}
$item = $this->get($entity);
if (null === $entity->id && null === $item) {
try {
$this->storage->insert($entity);
} catch (Throwable $e) {
$this->operations[$entity->type]['failed']++;
Data::append($bucket, 'storage_error', $e->getMessage());
return $this;
}
$this->changed++;
Data::increment($bucket, $entity->type . '_added');
$this->operations[$entity->type]['added']++;
$this->logger->debug(sprintf('Adding %s. As new Item.', $name));
return $this;
}
// -- Ignore old item.
if (null !== ($opts['after'] ?? null) && ($opts['after'] instanceof DateTimeInterface)) {
if ($opts['after']->getTimestamp() >= $entity->updated) {
// -- check for updated GUIDs.
if ($item->apply($entity, guidOnly: true)->isChanged()) {
try {
$this->changed++;
if (!empty($entity->meta)) {
$item->meta = $entity->meta;
}
$this->storage->update($item);
$this->operations[$entity->type]['updated']++;
$this->logger->debug(sprintf('Updating %s. GUIDs.', $name), $item->diff());
return $this;
} catch (Throwable $e) {
$this->operations[$entity->type]['failed']++;
Data::append($bucket, 'storage_error', $e->getMessage());
return $this;
}
}
$this->logger->debug(sprintf('Ignoring %s. No change since last sync.', $name));
Data::increment($bucket, $entity->type . '_ignored_not_played_since_last_sync');
return $this;
}
}
$item = $item->apply($entity);
if ($item->isChanged()) {
try {
$this->storage->update($item);
} catch (Throwable $e) {
$this->operations[$entity->type]['failed']++;
Data::append($bucket, 'storage_error', $e->getMessage());
return $this;
}
$this->changed++;
Data::increment($bucket, $entity->type . '_updated');
$this->operations[$entity->type]['updated']++;
} else {
Data::increment($bucket, $entity->type . '_ignored_no_change');
}
return $this;
}
public function get(StateInterface $entity): null|StateInterface
{
return $this->storage->get($entity);
}
public function remove(StateInterface $entity): bool
{
return $this->storage->remove($entity);
}
public function commit(): mixed
{
$op = $this->operations;
$this->reset();
return $op;
}
public function has(StateInterface $entity): bool
{
return null !== $this->storage->get($entity);
}
public function reset(): self
{
$this->changed = 0;
$this->operations[StateInterface::TYPE_EPISODE]['added'] = 0;
$this->operations[StateInterface::TYPE_EPISODE]['updated'] = 0;
$this->operations[StateInterface::TYPE_EPISODE]['failed'] = 0;
$this->operations[StateInterface::TYPE_MOVIE]['added'] = 0;
$this->operations[StateInterface::TYPE_MOVIE]['updated'] = 0;
$this->operations[StateInterface::TYPE_MOVIE]['failed'] = 0;
return $this;
}
public function getObjects(array $opts = []): array
{
return [];
}
public function getObjectsCount(): int
{
return 0;
}
public function setLogger(LoggerInterface $logger): self
{
$this->logger = $logger;
$this->storage->setLogger($logger);
return $this;
}
public function setStorage(StorageInterface $storage): self
{
$this->storage = $storage;
return $this;
}
public function count(): int
{
return $this->changed;
}
}

View File

@@ -9,6 +9,7 @@ use App\Libs\Entity\StateInterface;
use App\Libs\Mappers\ImportInterface;
use App\Libs\Storage\StorageInterface;
use DateTimeInterface;
use PDOException;
use Psr\Log\LoggerInterface;
final class MemoryMapper implements ImportInterface
@@ -52,7 +53,7 @@ final class MemoryMapper implements ImportInterface
continue;
}
$this->objects[$entity->id] = $entity;
$this->addGuids($this->objects[$entity->id], $entity->id);
$this->addPointers($this->objects[$entity->id], $entity->id);
}
return $this;
@@ -73,8 +74,21 @@ final class MemoryMapper implements ImportInterface
$this->changed[$pointer] = $pointer;
Data::increment($bucket, $entity->type . '_added');
$this->addGuids($this->objects[$pointer], $pointer);
$this->logger->debug(sprintf('Adding %s. As new Item.', $name));
$this->addPointers($this->objects[$pointer], $pointer);
if (true === ($this->options[ImportInterface::DEEP_DEBUG] ?? false)) {
$data = $entity->getAll();
unset($data['id']);
$data['updated'] = makeDate($data['updated']);
$data['watched'] = 0 === $data['watched'] ? 'No' : 'Yes';
if ($entity->isMovie()) {
unset($data['season'], $data['episode'], $data['parent']);
}
} else {
$data = [];
}
$this->logger->info(sprintf('Adding %s. As new Item.', $name), $data);
return $this;
}
@@ -82,19 +96,6 @@ final class MemoryMapper implements ImportInterface
// -- Ignore old item.
if (null !== ($opts['after'] ?? null) && ($opts['after'] instanceof DateTimeInterface)) {
if ($opts['after']->getTimestamp() >= $entity->updated) {
// -- check for updated GUIDs.
if ($this->objects[$pointer]->apply($entity, guidOnly: true)->isChanged()) {
$this->changed[$pointer] = $pointer;
if (!empty($entity->meta)) {
$this->objects[$pointer]->meta = $entity->meta;
}
Data::increment($bucket, $entity->type . '_updated');
$this->addGuids($this->objects[$pointer], $pointer);
$this->logger->debug(sprintf('Updating %s. GUIDs.', $name), $this->objects[$pointer]->diff());
return $this;
}
$this->logger->debug(sprintf('Ignoring %s. No change since last sync.', $name));
Data::increment($bucket, $entity->type . '_ignored_not_played_since_last_sync');
return $this;
}
@@ -102,15 +103,19 @@ final class MemoryMapper implements ImportInterface
$this->objects[$pointer] = $this->objects[$pointer]->apply($entity);
if ($this->objects[$pointer]->isChanged()) {
$cloned = clone $this->objects[$pointer];
if (true === $this->objects[$pointer]->isChanged()) {
Data::increment($bucket, $entity->type . '_updated');
$this->changed[$pointer] = $pointer;
$this->addGuids($this->objects[$pointer], $pointer);
$this->logger->debug(sprintf('Updating %s. State changed.', $name), $this->objects[$pointer]->diff());
$this->removePointers($cloned);
$this->addPointers($this->objects[$pointer], $pointer);
$this->logger->info(
sprintf('Updating %s. State changed.', $name),
$this->objects[$pointer]->diff(all: true),
);
return $this;
}
$this->logger->debug(sprintf('Ignoring %s. State unchanged.', $name));
Data::increment($bucket, $entity->type . '_ignored_no_change');
return $this;
@@ -122,34 +127,7 @@ final class MemoryMapper implements ImportInterface
return $this->objects[$entity->id];
}
if ($entity->hasGuids()) {
foreach ($entity->getPointers() as $key) {
if (null !== ($this->guids[$key] ?? null)) {
return $this->objects[$this->guids[$key]];
}
}
}
if ($entity->isEpisode() && $entity->hasRelativeGuid()) {
foreach ($entity->getRelativePointers() as $key) {
if (null !== ($this->guids[$key] ?? null)) {
return $this->objects[$this->guids[$key]];
}
}
}
if (true === $this->fullyLoaded) {
return null;
}
if (null !== ($lazyEntity = $this->storage->get($entity))) {
$this->objects[] = $lazyEntity;
$id = array_key_last($this->objects);
$this->addGuids($this->objects[$id], $id);
return $this->objects[$id];
}
return null;
return false === ($pointer = $this->getPointer($entity)) ? null : $this->objects[$pointer];
}
public function remove(StateInterface $entity): bool
@@ -160,21 +138,7 @@ final class MemoryMapper implements ImportInterface
$this->storage->remove($this->objects[$pointer]);
if ($entity->hasGuids()) {
foreach ($entity->getPointers() as $key) {
if (null !== ($this->guids[$key] ?? null)) {
unset($this->guids[$key]);
}
}
}
if ($entity->isEpisode() && $entity->hasRelativeGuid()) {
foreach ($entity->getRelativePointers() as $key) {
if (null !== ($this->guids[$key] ?? null)) {
unset($this->guids[$key]);
}
}
}
$this->removePointers($this->objects[$pointer]);
unset($this->objects[$pointer]);
@@ -187,9 +151,41 @@ final class MemoryMapper implements ImportInterface
public function commit(): mixed
{
$state = $this->storage->commit(
array_intersect_key($this->objects, $this->changed)
);
$state = $this->storage->transactional(function (StorageInterface $storage) {
$list = [
StateInterface::TYPE_MOVIE => ['added' => 0, 'updated' => 0, 'failed' => 0],
StateInterface::TYPE_EPISODE => ['added' => 0, 'updated' => 0, 'failed' => 0],
];
$count = count($this->changed);
$this->logger->notice(
0 === $count ? 'No changes detected.' : sprintf('Updating backend with \'%d\' changes.', $count)
);
foreach ($this->changed as $pointer) {
try {
$entity = &$this->objects[$pointer];
if (null === $entity->id) {
if (false === (bool)($this->options[ImportInterface::DRY_RUN] ?? false)) {
$storage->insert($entity);
}
$list[$entity->type]['added']++;
} else {
if (false === (bool)($this->options[ImportInterface::DRY_RUN] ?? false)) {
$storage->update($entity);
}
$list[$entity->type]['updated']++;
}
} catch (PDOException $e) {
$list[$entity->type]['failed']++;
$this->logger->error($e->getMessage(), $entity->getAll());
}
}
return $list;
});
$this->reset();
@@ -237,6 +233,23 @@ final class MemoryMapper implements ImportInterface
return $this;
}
public function __destruct()
{
if (false === ($this->options['disable_autocommit'] ?? false) && $this->count() >= 1) {
$this->commit();
}
}
public function inDryRunMode(): bool
{
return true === ($this->options[ImportInterface::DRY_RUN] ?? false);
}
public function inDeepDebugMode(): bool
{
return true === ($this->options[ImportInterface::DEEP_DEBUG] ?? false);
}
/**
* Is the object already mapped?
*
@@ -246,49 +259,36 @@ final class MemoryMapper implements ImportInterface
*/
private function getPointer(StateInterface $entity): int|bool
{
foreach ($entity->getPointers() as $key) {
foreach ([...$entity->getRelativePointers(), ...$entity->getPointers()] as $key) {
if (null !== ($this->guids[$key] ?? null)) {
return $this->guids[$key];
}
}
if ($entity->isEpisode()) {
foreach ($entity->getRelativePointers() as $key) {
if (null !== ($this->guids[$key] ?? null)) {
return $this->guids[$key];
}
}
}
if (false === $this->fullyLoaded && null !== ($lazyEntity = $this->storage->get($entity))) {
$this->objects[] = $lazyEntity;
$id = array_key_last($this->objects);
$this->addGuids($this->objects[$id], $id);
$this->addPointers($this->objects[$id], $id);
return $id;
}
return false;
}
private function addGuids(StateInterface $entity, int $pointer): void
private function addPointers(StateInterface $entity, int $pointer): void
{
if ($entity->hasGuids()) {
foreach ($entity->getPointers() as $key) {
$this->guids[$key] = $pointer;
}
foreach ([...$entity->getPointers(), ...$entity->getRelativePointers()] as $key) {
$this->guids[$key] = $pointer;
}
}
if ($entity->isEpisode()) {
foreach ($entity->getRelativePointers() as $key) {
$this->guids[$key] = $pointer;
private function removePointers(StateInterface $entity): void
{
foreach ([...$entity->getPointers(), ...$entity->getRelativePointers()] as $key) {
if (isset($this->guids[$key])) {
unset($this->guids[$key]);
}
}
}
public function __destruct()
{
if (false === ($this->options['disable_autocommit'] ?? false) && $this->count() >= 1) {
$this->commit();
}
}
}

View File

@@ -12,6 +12,9 @@ use Psr\Log\LoggerInterface;
interface ImportInterface extends Countable
{
public const DEEP_DEBUG = 'deep-debug';
public const DRY_RUN = 'dry-run';
/**
* Initiate Mapper.
*
@@ -117,4 +120,17 @@ interface ImportInterface extends Countable
* @return self
*/
public function SetStorage(StorageInterface $storage): self;
/**
* Are we in dry run mode?
*
* @return bool
*/
public function inDryRunMode(): bool;
/**
* Are we in deep debug mode?
* @return bool
*/
public function inDeepDebugMode(): bool;
}

View File

@@ -7,14 +7,19 @@ namespace App\Libs\Servers;
use App\Libs\Config;
use App\Libs\Container;
use App\Libs\Entity\StateInterface;
use App\Libs\Guid;
use App\Libs\HttpException;
use DateTimeInterface;
use JsonException;
use Psr\Http\Message\ServerRequestInterface;
use Psr\Http\Message\UriInterface;
use Psr\Log\LoggerInterface;
use Symfony\Contracts\HttpClient\Exception\ExceptionInterface;
use Throwable;
class EmbyServer extends JellyfinServer
{
public const NAME = 'EmbyBackend';
protected const WEBHOOK_ALLOWED_TYPES = [
'Movie',
'Episode',
@@ -49,32 +54,46 @@ class EmbyServer extends JellyfinServer
return parent::setUp($name, $url, $token, $userId, $uuid, $persist, $options);
}
public static function processRequest(ServerRequestInterface $request): ServerRequestInterface
public static function processRequest(ServerRequestInterface $request, array $opts = []): ServerRequestInterface
{
$userAgent = ag($request->getServerParams(), 'HTTP_USER_AGENT', '');
$logger = null;
if (false === Config::get('webhook.debug', false) && !str_starts_with($userAgent, 'Emby Server/')) {
return $request;
}
try {
$logger = $opts[LoggerInterface::class] ?? Container::get(LoggerInterface::class);
$payload = ag($request->getParsedBody() ?? [], 'data', null);
$userAgent = ag($request->getServerParams(), 'HTTP_USER_AGENT', '');
if (null === $payload || null === ($json = json_decode((string)$payload, true))) {
return $request;
}
if (false === str_starts_with($userAgent, 'Emby Server/')) {
return $request;
}
$attributes = [
'SERVER_ID' => ag($json, 'Server.Id', ''),
'SERVER_NAME' => ag($json, 'Server.Name', ''),
'SERVER_VERSION' => afterLast($userAgent, '/'),
'USER_ID' => ag($json, 'User.Id', ''),
'USER_NAME' => ag($json, 'User.Name', ''),
'WH_EVENT' => ag($json, 'Event', 'not_set'),
'WH_TYPE' => ag($json, 'Item.Type', 'not_set'),
];
$payload = (string)ag($request->getParsedBody() ?? [], 'data', null);
foreach ($attributes as $key => $val) {
$request = $request->withAttribute($key, $val);
if (null === ($json = json_decode(json: $payload, associative: true, flags: JSON_INVALID_UTF8_IGNORE))) {
return $request;
}
$request = $request->withParsedBody($json);
$attributes = [
'SERVER_ID' => ag($json, 'Server.Id', ''),
'SERVER_NAME' => ag($json, 'Server.Name', ''),
'SERVER_VERSION' => afterLast($userAgent, '/'),
'USER_ID' => ag($json, 'User.Id', ''),
'USER_NAME' => ag($json, 'User.Name', ''),
'WH_EVENT' => ag($json, 'Event', 'not_set'),
'WH_TYPE' => ag($json, 'Item.Type', 'not_set'),
];
foreach ($attributes as $key => $val) {
$request = $request->withAttribute($key, $val);
}
} catch (Throwable $e) {
$logger?->error($e->getMessage(), [
'file' => $e->getFile(),
'line' => $e->getLine(),
'kind' => get_class($e),
]);
}
return $request;
@@ -82,9 +101,7 @@ class EmbyServer extends JellyfinServer
public function parseWebhook(ServerRequestInterface $request): StateInterface
{
$payload = ag($request->getParsedBody() ?? [], 'data', null);
if (null === $payload || null === ($json = json_decode((string)$payload, true))) {
if (null === ($json = $request->getParsedBody())) {
throw new HttpException(sprintf('%s: No payload.', afterLast(__CLASS__, '\\')), 400);
}
@@ -92,56 +109,23 @@ class EmbyServer extends JellyfinServer
$type = ag($json, 'Item.Type', 'not_found');
if (null === $type || !in_array($type, self::WEBHOOK_ALLOWED_TYPES)) {
throw new HttpException(sprintf('%s: Not allowed type [%s]', afterLast(__CLASS__, '\\'), $type), 200);
throw new HttpException(sprintf('%s: Not allowed type [%s]', self::NAME, $type), 200);
}
$type = strtolower($type);
if (null === $event || !in_array($event, self::WEBHOOK_ALLOWED_EVENTS)) {
throw new HttpException(sprintf('%s: Not allowed event [%s]', afterLast(__CLASS__, '\\'), $event), 200);
throw new HttpException(sprintf('%s: Not allowed event [%s]', self::NAME, $event), 200);
}
$isTainted = in_array($event, self::WEBHOOK_TAINTED_EVENTS);
$meta = match ($type) {
StateInterface::TYPE_MOVIE => [
'via' => $this->name,
'title' => ag($json, 'Item.Name', ag($json, 'Item.OriginalTitle', '??')),
'year' => ag($json, 'Item.ProductionYear', 0000),
'date' => makeDate(
ag(
$json,
'Item.PremiereDate',
ag($json, 'Item.ProductionYear', ag($json, 'Item.DateCreated', 'now'))
)
)->format('Y-m-d'),
'webhook' => [
'event' => $event,
],
],
StateInterface::TYPE_EPISODE => [
'via' => $this->name,
'series' => ag($json, 'Item.SeriesName', '??'),
'year' => ag($json, 'Item.ProductionYear', 0000),
'season' => ag($json, 'Item.ParentIndexNumber', 0),
'episode' => ag($json, 'Item.IndexNumber', 0),
'title' => ag($json, 'Item.Name', ag($json, 'Item.OriginalTitle', '??')),
'date' => makeDate(ag($json, 'Item.PremiereDate', ag($json, 'Item.ProductionYear', 'now')))->format(
'Y-m-d'
),
'webhook' => [
'event' => $event,
],
],
default => throw new HttpException(sprintf('%s: Invalid content type.', afterLast(__CLASS__, '\\')), 400),
};
if ('item.markplayed' === $event || 'playback.scrobble' === $event) {
$isWatched = 1;
} elseif ('item.markunplayed' === $event) {
$isWatched = 0;
} else {
$isWatched = (int)(bool)ag($json, 'Item.Played', ag($json, 'Item.PlayedToCompletion', 0));
$isWatched = (int)(bool)ag($json, ['Item.Played', 'Item.PlayedToCompletion'], false);
}
$providersId = ag($json, 'Item.ProviderIds', []);
@@ -150,195 +134,129 @@ class EmbyServer extends JellyfinServer
'type' => $type,
'updated' => time(),
'watched' => $isWatched,
'meta' => $meta,
...$this->getGuids($providersId, $type)
'via' => $this->name,
'title' => ag($json, ['Item.Name', 'Item.OriginalTitle'], '??'),
'year' => ag($json, 'Item.ProductionYear', 0000),
'season' => null,
'episode' => null,
'parent' => [],
'guids' => $this->getGuids($providersId),
'extra' => [
'date' => makeDate(
ag($json, ['Item.PremiereDate', 'Item.ProductionYear', 'Item.DateCreated'], 'now')
)->format('Y-m-d'),
'webhook' => [
'event' => $event,
],
],
];
if (StateInterface::TYPE_EPISODE === $type) {
$row['title'] = ag($json, 'Item.SeriesName', '??');
$row['season'] = ag($json, 'Item.ParentIndexNumber', 0);
$row['episode'] = ag($json, 'Item.IndexNumber', 0);
if (null !== ($epTitle = ag($json, ['Name', 'OriginalTitle'], null))) {
$row['extra']['title'] = $epTitle;
}
if (null !== ag($json, 'Item.SeriesId')) {
$row['parent'] = $this->getEpisodeParent(ag($json, 'Item.SeriesId'), '');
}
}
$entity = Container::get(StateInterface::class)::fromArray($row)->setIsTainted($isTainted);
if (!$entity->hasGuids()) {
throw new HttpException(
sprintf(
'%s: No supported GUID was given. [%s]',
afterLast(__CLASS__, '\\'),
arrayToString(
[
'guids' => !empty($providersId) ? $providersId : 'None',
'rGuids' => $entity->hasRelativeGuid() ? $entity->getRelativeGuids() : 'None',
]
)
), 400
);
if (!$entity->hasGuids() && !$entity->hasRelativeGuid()) {
$message = sprintf('%s: No valid/supported External ids.', self::NAME);
if (empty($providersId)) {
$message .= ' Most likely unmatched movie/episode or show.';
}
$message .= sprintf(' [%s].', arrayToString(['guids' => !empty($providersId) ? $providersId : 'None']));
throw new HttpException($message, 400);
}
foreach ($entity->getPointers() as $guid) {
$this->cacheData[$guid] = ag($json, 'item.Id');
foreach ([...$entity->getRelativePointers(), ...$entity->getPointers()] as $guid) {
$this->cacheData[$guid] = ag($json, 'Item.Id');
}
if (false === $isTainted && (true === Config::get('webhook.debug') || null !== ag(
$request->getQueryParams(),
'debug'
))) {
saveWebhookPayload($this->name . '.' . $event, $request, [
'entity' => $entity->getAll(),
'payload' => $json,
]);
$savePayload = true === Config::get('webhook.debug') || null !== ag($request->getQueryParams(), 'debug');
if (false === $isTainted && $savePayload) {
saveWebhookPayload($this->name . '.' . $event, $request, $entity);
}
return $entity;
}
public function push(array $entities, DateTimeInterface|null $after = null): array
protected function getEpisodeParent(mixed $id, string $cacheName): array
{
$requests = [];
foreach ($entities as &$entity) {
if (false === ($this->options[ServerInterface::OPT_EXPORT_IGNORE_DATE] ?? false)) {
if (null !== $after && $after->getTimestamp() > $entity->updated) {
$entity = null;
continue;
}
}
$entity->plex_guid = null;
if (array_key_exists($id, $this->cacheShow)) {
return $this->cacheShow[$id];
}
unset($entity);
try {
$response = $this->http->request(
'GET',
(string)$this->url->withPath(
sprintf('/Users/%s/items/' . $id, $this->user)
),
$this->getHeaders()
);
/** @var StateInterface $entity */
foreach ($entities as $entity) {
if (null === $entity || false === $entity->hasGuids()) {
continue;
if (200 !== $response->getStatusCode()) {
return [];
}
try {
$guids = [];
$json = json_decode(
json: $response->getContent(),
associative: true,
flags: JSON_THROW_ON_ERROR | JSON_INVALID_UTF8_IGNORE
);
foreach ($entity->getPointers() as $pointer) {
if (str_starts_with($pointer, 'guid_plex://')) {
continue;
}
if (false === preg_match('#guid_(.+?)://\w+?/(.+)#s', $pointer, $matches)) {
continue;
}
$guids[] = sprintf('%s.%s', $matches[1], $matches[2]);
}
if (empty($guids)) {
continue;
}
$requests[] = $this->http->request(
'GET',
(string)$this->url->withPath(sprintf('/Users/%s/items', $this->user))->withQuery(
http_build_query(
[
'Recursive' => 'true',
'Fields' => 'ProviderIds,DateCreated',
'enableUserData' => 'true',
'enableImages' => 'false',
'AnyProviderIdEquals' => implode(',', $guids),
]
)
),
array_replace_recursive($this->getHeaders(), [
'user_data' => [
'state' => &$entity,
]
])
);
} catch (Throwable $e) {
$this->logger->error($e->getMessage());
if (null === ($itemType = ag($json, 'Type')) || 'Series' !== $itemType) {
return [];
}
$providersId = (array)ag($json, 'ProviderIds', []);
if (!$this->hasSupportedIds($providersId)) {
$this->cacheShow[$id] = [];
return $this->cacheShow[$id];
}
$this->cacheShow[$id] = Guid::fromArray($this->getGuids($providersId))->getAll();
return $this->cacheShow[$id];
} catch (ExceptionInterface $e) {
$this->logger->error($e->getMessage(), [
'file' => $e->getFile(),
'line' => $e->getLine(),
'kind' => get_class($e),
]);
return [];
} catch (JsonException $e) {
$this->logger->error(
sprintf('%s: Unable to decode \'%s\' JSON response. %s', $this->name, $cacheName, $e->getMessage()),
[
'file' => $e->getFile(),
'line' => $e->getLine(),
]
);
return [];
} catch (Throwable $e) {
$this->logger->error(
sprintf('%s: Failed to handle \'%s\' response. %s', $this->name, $cacheName, $e->getMessage()),
[
'file' => $e->getFile(),
'line' => $e->getLine(),
'kind' => get_class($e),
]
);
return [];
}
$stateRequests = [];
foreach ($requests as $response) {
try {
$json = ag(
json_decode($response->getContent(), true, flags: JSON_THROW_ON_ERROR),
'Items',
[]
)[0] ?? [];
$state = $response->getInfo('user_data')['state'];
assert($state instanceof StateInterface);
if (StateInterface::TYPE_MOVIE === $state->type) {
$iName = sprintf(
'%s - [%s (%d)]',
$this->name,
$state->meta['title'] ?? '??',
$state->meta['year'] ?? 0000,
);
} else {
$iName = trim(
sprintf(
'%s - [%s - (%dx%d) - %s]',
$this->name,
$state->meta['series'] ?? '??',
$state->meta['season'] ?? 0,
$state->meta['episode'] ?? 0,
$state->meta['title'] ?? '??',
)
);
}
if (empty($json)) {
$this->logger->notice(sprintf('Ignoring %s. does not exists.', $iName));
continue;
}
$isWatched = (int)(bool)ag($json, 'UserData.Played', false);
if ($state->watched === $isWatched) {
$this->logger->debug(sprintf('Ignoring %s. State is unchanged.', $iName));
continue;
}
if (false === ($this->options[ServerInterface::OPT_EXPORT_IGNORE_DATE] ?? false)) {
$date = ag(
$json,
'UserData.LastPlayedDate',
ag($json, 'DateCreated', ag($json, 'PremiereDate', null))
);
if (null === $date) {
$this->logger->notice(sprintf('Ignoring %s. No date is set.', $iName));
continue;
}
$date = strtotime($date);
if ($date >= $state->updated) {
$this->logger->debug(sprintf('Ignoring %s. Date is newer then what in db.', $iName));
continue;
}
}
$stateRequests[] = $this->http->request(
1 === $state->watched ? 'POST' : 'DELETE',
(string)$this->url->withPath(sprintf('/Users/%s/PlayedItems/%s', $this->user, ag($json, 'Id'))),
array_replace_recursive(
$this->getHeaders(),
[
'user_data' => [
'state' => 1 === $state->watched ? 'Watched' : 'Unwatched',
'itemName' => $iName,
],
]
)
);
} catch (Throwable $e) {
$this->logger->error($e->getMessage(), ['file' => $e->getFile(), 'line' => $e->getLine()]);
}
}
unset($requests);
return $stateRequests;
}
}

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -55,9 +55,11 @@ interface ServerInterface
* Process The request For attributes extraction.
*
* @param ServerRequestInterface $request
* @param array $opts
*
* @return ServerRequestInterface
*/
public static function processRequest(ServerRequestInterface $request): ServerRequestInterface;
public static function processRequest(ServerRequestInterface $request, array $opts = []): ServerRequestInterface;
/**
* Parse server specific webhook event. for play/un-played event.

View File

@@ -18,8 +18,7 @@ use Psr\Log\LoggerInterface;
final class PDOAdapter implements StorageInterface
{
private bool $viaCommit = false;
private bool $viaTransaction = false;
private bool $singleTransaction = false;
/**
@@ -41,14 +40,15 @@ final class PDOAdapter implements StorageInterface
try {
$data = $entity->getAll();
if (is_array($data['meta'])) {
$data['meta'] = json_encode($data['meta']);
foreach (StateInterface::ENTITY_ARRAY_KEYS as $key) {
if (null !== ($data[$key] ?? null) && is_array($data[$key])) {
ksort($data[$key]);
$data[$key] = json_encode($data[$key], flags: JSON_UNESCAPED_SLASHES | JSON_UNESCAPED_UNICODE);
}
}
if (null !== $data['id']) {
throw new StorageException(
sprintf('Trying to insert already saved entity #%s', $data['id']), 21
);
throw new StorageException(sprintf('Trying to insert already saved entity #%s', $data['id']), 21);
}
unset($data['id']);
@@ -64,8 +64,8 @@ final class PDOAdapter implements StorageInterface
$entity->id = (int)$this->pdo->lastInsertId();
} catch (PDOException $e) {
$this->stmt['insert'] = null;
if (false === $this->viaCommit) {
$this->logger->error($e->getMessage(), $entity->meta ?? []);
if (false === $this->viaTransaction) {
$this->logger->error($e->getMessage(), $entity->getAll());
return $entity;
}
throw $e;
@@ -76,11 +76,11 @@ final class PDOAdapter implements StorageInterface
public function get(StateInterface $entity): StateInterface|null
{
if ($entity->hasGuids() && null !== ($item = $this->findByGuid($entity))) {
if ($entity->isEpisode() && $entity->hasRelativeGuid() && null !== ($item = $this->findByRGuid($entity))) {
return $item;
}
if ($entity->isEpisode() && $entity->hasRelativeGuid() && null !== ($item = $this->findByRGuid($entity))) {
if ($entity->hasGuids() && null !== ($item = $this->findByGuid($entity))) {
return $item;
}
@@ -113,8 +113,10 @@ final class PDOAdapter implements StorageInterface
try {
$data = $entity->getAll();
if (is_array($data['meta'])) {
$data['meta'] = json_encode($data['meta']);
foreach (StateInterface::ENTITY_ARRAY_KEYS as $key) {
if (is_array($data[$key] ?? [])) {
$data[$key] = json_encode($data[$key], flags: JSON_UNESCAPED_SLASHES | JSON_UNESCAPED_UNICODE);
}
}
if (null === $data['id']) {
@@ -130,8 +132,8 @@ final class PDOAdapter implements StorageInterface
$this->stmt['update']->execute($data);
} catch (PDOException $e) {
$this->stmt['update'] = null;
if (false === $this->viaCommit) {
$this->logger->error($e->getMessage(), $entity->meta ?? []);
if (false === $this->viaTransaction) {
$this->logger->error($e->getMessage(), $entity->getAll());
return $entity;
}
throw $e;
@@ -165,50 +167,31 @@ final class PDOAdapter implements StorageInterface
return true;
}
public function commit(array $entities): array
public function commit(array $entities, array $opts = []): array
{
return $this->transactional(function () use ($entities) {
$list = [
StateInterface::TYPE_MOVIE => ['added' => 0, 'updated' => 0, 'failed' => 0],
StateInterface::TYPE_EPISODE => ['added' => 0, 'updated' => 0, 'failed' => 0],
];
$count = count($entities);
$this->logger->notice(
0 === $count ? 'No changes detected.' : sprintf('Updating database with \'%d\' changes.', $count)
);
$this->viaCommit = true;
$actions = [
'added' => 0,
'updated' => 0,
'failed' => 0,
];
return $this->transactional(function () use ($entities, $actions) {
foreach ($entities as $entity) {
try {
if (null === $entity->id) {
$this->logger->info(
'Adding ' . $entity->type . ' - [' . $entity->getName() . '].',
$entity->getAll()
);
$this->insert($entity);
$list[$entity->type]['added']++;
$actions['added']++;
} else {
$this->logger->info(
'Updating ' . $entity->type . ':' . $entity->id . ' - [' . $entity->getName() . '].',
$entity->diff()
);
$this->update($entity);
$list[$entity->type]['updated']++;
$actions['updated']++;
}
} catch (PDOException $e) {
$list[$entity->type]['failed']++;
$actions['failed']++;
$this->logger->error($e->getMessage(), $entity->getAll());
}
}
$this->viaCommit = false;
return $list;
return $actions;
});
}
@@ -250,11 +233,6 @@ final class PDOAdapter implements StorageInterface
return $this->pdo;
}
/**
* Enable Single Transaction mode.
*
* @return bool
*/
public function singleTransaction(): bool
{
$this->singleTransaction = true;
@@ -267,6 +245,32 @@ final class PDOAdapter implements StorageInterface
return $this->pdo->inTransaction();
}
public function transactional(Closure $callback): mixed
{
if (true === $this->pdo->inTransaction()) {
$this->viaTransaction = true;
$result = $callback($this);
$this->viaTransaction = false;
return $result;
}
try {
$this->pdo->beginTransaction();
$this->viaTransaction = true;
$result = $callback($this);
$this->viaTransaction = false;
$this->pdo->commit();
return $result;
} catch (PDOException $e) {
$this->pdo->rollBack();
$this->viaTransaction = false;
throw $e;
}
}
/**
* If we are using single transaction,
* commit all changes on class destruction.
@@ -280,34 +284,6 @@ final class PDOAdapter implements StorageInterface
$this->stmt = [];
}
/**
* Wrap Transaction.
*
* @param Closure(PDO): mixed $callback
*
* @return mixed
* @throws PDOException
*/
private function transactional(Closure $callback): mixed
{
if (true === $this->pdo->inTransaction()) {
return $callback($this->pdo);
}
try {
$this->pdo->beginTransaction();
$result = $callback($this->pdo);
$this->pdo->commit();
return $result;
} catch (PDOException $e) {
$this->pdo->rollBack();
throw $e;
}
}
/**
* Generate SQL Insert Statement.
*
@@ -374,58 +350,49 @@ final class PDOAdapter implements StorageInterface
{
$cond = $where = [];
foreach ($entity->getParentGuids() as $key => $val) {
foreach ($entity->parent as $key => $val) {
if (null === ($val ?? null)) {
continue;
}
$where[] = "json_extract(meta,'$.parent.{$key}') = :{$key}";
$where[] = "JSON_EXTRACT(parent,'$.{$key}') = :{$key}";
$cond[$key] = $val;
}
$sqlType = '';
if (null !== ($entity?->type ?? null)) {
$sqlType = 'type = :s_type AND ';
$cond['s_type'] = $entity->type;
}
$sql = "SELECT
*
FROM
state
WHERE
{$sqlType}
json_extract(meta, '$.season') = " . (int)ag($entity->meta, 'season', 0) . "
(
type = :type
AND
json_extract(meta, '$.episode') = " . (int)ag($entity->meta, 'episode', 0) . "
season = :season
AND
episode = :episode
)
AND
(
" . implode(' OR ', $where) . "
)
LIMIT 1
";
$cachedKey = md5($sql);
$cond['season'] = $entity->season;
$cond['episode'] = $entity->episode;
$cond['type'] = StateInterface::TYPE_EPISODE;
try {
if (null === ($this->stmt[$cachedKey] ?? null)) {
$this->stmt[$cachedKey] = $this->pdo->prepare($sql);
}
$stmt = $this->pdo->prepare($sql);
if (false === $this->stmt[$cachedKey]->execute($cond)) {
$this->stmt[$cachedKey] = null;
throw new StorageException('Failed to execute sql query.', 61);
}
if (false === ($row = $this->stmt[$cachedKey]->fetch(PDO::FETCH_ASSOC))) {
return null;
}
return $entity::fromArray($row);
} catch (PDOException|StorageException $e) {
$this->stmt[$cachedKey] = null;
throw $e;
if (false === $stmt->execute($cond)) {
throw new StorageException('Failed to execute sql query.', 61);
}
if (false === ($row = $stmt->fetch(PDO::FETCH_ASSOC))) {
return null;
}
return $entity::fromArray($row);
}
/**
@@ -447,50 +414,46 @@ final class PDOAdapter implements StorageInterface
return $entity::fromArray($row);
}
$cond = $where = [];
$guids = [];
$cond = [
'type' => $entity->type,
];
foreach (array_keys(Guid::SUPPORTED) as $key) {
if (null === ($entity->{$key} ?? null)) {
if (null === ($entity->guids[$key] ?? null)) {
continue;
}
$where[] = "{$key} = :{$key}";
$cond[$key] = $entity->{$key};
$guids[] = "JSON_EXTRACT(guids,'$.{$key}') = :{$key}";
$cond[$key] = $entity->guids[$key];
}
if (empty($cond)) {
return null;
}
$sqlWhere = implode(' OR ', $where);
$sqlEpisode = '';
$cachedKey = md5($sqlWhere . ($entity?->type ?? ''));
try {
if (null === ($this->stmt[$cachedKey] ?? null)) {
$sqlType = '';
if (null !== ($entity?->type ?? null)) {
$sqlType = 'type = :s_type AND ';
$cond['s_type'] = $entity->type;
}
$this->stmt[$cachedKey] = $this->pdo->prepare("SELECT * FROM state WHERE {$sqlType} {$sqlWhere}");
}
if (false === $this->stmt[$cachedKey]->execute($cond)) {
$this->stmt[$cachedKey] = null;
throw new StorageException('Failed to execute sql query.', 61);
}
if (false === ($row = $this->stmt[$cachedKey]->fetch(PDO::FETCH_ASSOC))) {
return null;
}
return $entity::fromArray($row);
} catch (PDOException|StorageException $e) {
$this->stmt[$cachedKey] = null;
throw $e;
if ($entity->isEpisode()) {
$sqlEpisode = ' AND season = :season AND episode = :episode ';
$cond['season'] = $entity->season;
$cond['episode'] = $entity->episode;
}
$sqlGuids = ' AND (' . implode(' OR ', $guids) . ' ) ';
$sql = "SELECT * FROM state WHERE ( type = :type {$sqlEpisode} ) {$sqlGuids} LIMIT 1";
$stmt = $this->pdo->prepare($sql);
if (false === $stmt->execute($cond)) {
throw new StorageException('Failed to execute sql query.', 61);
}
if (false === ($row = $stmt->fetch(PDO::FETCH_ASSOC))) {
return null;
}
return $entity::fromArray($row);
}
}

View File

@@ -5,8 +5,10 @@ declare(strict_types=1);
namespace App\Libs\Storage;
use App\Libs\Entity\StateInterface;
use Closure;
use DateTimeInterface;
use PDO;
use PDOException;
use Psr\Log\LoggerInterface;
use RuntimeException;
@@ -66,10 +68,11 @@ interface StorageInterface
* Insert/Update Entities.
*
* @param array<StateInterface> $entities
* @param array $opts
*
* @return array
*/
public function commit(array $entities): array;
public function commit(array $entities, array $opts = []): array;
/**
* Migrate Backend Storage Schema.
@@ -122,4 +125,21 @@ interface StorageInterface
* @throws RuntimeException if PDO is not initialized yet.
*/
public function getPdo(): PDO;
/**
* Enable Single Transaction mode.
*
* @return bool
*/
public function singleTransaction(): bool;
/**
* Wrap Queries into single transaction.
*
* @param Closure(StorageInterface): mixed $callback
*
* @return mixed
* @throws PDOException
*/
public function transactional(Closure $callback): mixed;
}

View File

@@ -68,12 +68,27 @@ if (!function_exists('makeDate')) {
}
if (!function_exists('ag')) {
function ag(array $array, string|null $path, mixed $default = null, string $separator = '.'): mixed
function ag(array|object $array, string|array|null $path, mixed $default = null, string $separator = '.'): mixed
{
if (null === $path) {
if (empty($path)) {
return $array;
}
if (!is_array($array)) {
$array = get_object_vars($array);
}
if (is_array($path)) {
foreach ($path as $key) {
$val = ag($array, $key, '_not_set');
if ('_not_set' === $val) {
continue;
}
return $val;
}
return getValue($default);
}
if (array_key_exists($path, $array)) {
return $array[$path];
}
@@ -226,24 +241,27 @@ if (!function_exists('fsize')) {
}
if (!function_exists('saveWebhookPayload')) {
function saveWebhookPayload(string $name, ServerRequestInterface $request, array $parsed = []): void
function saveWebhookPayload(string $name, ServerRequestInterface $request, StateInterface $state): void
{
$content = [
'query' => $request->getQueryParams(),
'request' => [
'server' => $request->getServerParams(),
'body' => (string)$request->getBody(),
'query' => $request->getQueryParams(),
],
'parsed' => $request->getParsedBody(),
'server' => $request->getServerParams(),
'body' => (string)$request->getBody(),
'attributes' => $request->getAttributes(),
'cParsed' => $parsed,
'entity' => $state->getAll(),
];
@file_put_contents(
Config::get('tmpDir') . '/webhooks/' . sprintf(
'webhook.%s.%s.json',
'webhook.%s.%s.%s.json',
$name,
(string)ag($request->getServerParams(), 'X_REQUEST_ID', time())
ag($state->extra, 'webhook.event', 'unknown'),
ag($request->getServerParams(), 'X_REQUEST_ID', time())
),
json_encode($content, JSON_PRETTY_PRINT | JSON_UNESCAPED_SLASHES)
json_encode(value: $content, flags: JSON_PRETTY_PRINT | JSON_UNESCAPED_SLASHES)
);
}
}
@@ -359,6 +377,13 @@ if (!function_exists('before')) {
}
}
if (!function_exists('after')) {
function after(string $subject, string $search): string
{
return empty($search) ? $subject : array_reverse(explode($search, $subject, 2))[0];
}
}
if (!function_exists('makeServer')) {
/**
* @param array{name:string|null, type:string, url:string, token:string|int|null, user:string|int|null, persist:array, options:array} $server
@@ -419,7 +444,7 @@ if (!function_exists('arrayToString')) {
}
if (is_array($val)) {
$val = json_encode($val, flags: JSON_UNESCAPED_SLASHES | JSON_UNESCAPED_UNICODE);
$val = '[ ' . arrayToString($val) . ' ]';
} else {
$val = $val ?? 'None';
}

View File

@@ -7,29 +7,31 @@ use App\Libs\Entity\StateInterface;
return [
'id' => null,
'type' => StateInterface::TYPE_EPISODE,
'updated' => 0,
'updated' => 1,
'watched' => 1,
'meta' => [
'via' => 'Plex@Home',
'series' => 'Series Title',
'year' => 2020,
'season' => 1,
'episode' => 2,
'via' => 'Plex@Home',
'title' => 'Series Title',
'year' => 2020,
'season' => 1,
'episode' => 2,
'parent' => [
'guid_imdb' => '510',
'guid_tvdb' => '520',
],
'guids' => [
'guid_plex' => '6000',
'guid_imdb' => '6100',
'guid_tvdb' => '6200',
'guid_tmdb' => '6300',
'guid_tvmaze' => '6400',
'guid_tvrage' => '6500',
'guid_anidb' => '6600',
],
'extra' => [
'title' => 'Episode Title',
'date' => '2020-01-03',
'webhook' => [
'event' => 'media.scrobble'
],
'parent' => [
'guid_imdb' => '510',
'guid_tvdb' => '520',
],
],
'guid_plex' => StateInterface::TYPE_EPISODE . '/6000',
'guid_imdb' => StateInterface::TYPE_EPISODE . '/6100',
'guid_tvdb' => StateInterface::TYPE_EPISODE . '/6200',
'guid_tmdb' => StateInterface::TYPE_EPISODE . '/6300',
'guid_tvmaze' => StateInterface::TYPE_EPISODE . '/6400',
'guid_tvrage' => StateInterface::TYPE_EPISODE . '/6500',
'guid_anidb' => StateInterface::TYPE_EPISODE . '/6600',
];

View File

@@ -9,19 +9,25 @@ return [
'type' => StateInterface::TYPE_MOVIE,
'updated' => 1,
'watched' => 1,
'meta' => [
'via' => 'JF@Home',
'title' => 'Movie Title',
'year' => 2020,
'via' => 'JF@Home',
'title' => 'Movie Title',
'year' => 2020,
'season' => null,
'episode' => null,
'parent' => [],
'guids' => [
'guid_plex' => '1000',
'guid_imdb' => '1100',
'guid_tvdb' => '1200',
'guid_tmdb' => '1300',
'guid_tvmaze' => '1400',
'guid_tvrage' => '1500',
'guid_anidb' => '1600',
],
'extra' => [
'webhook' => [
'event' => 'ItemAdded'
]
],
'guid_plex' => StateInterface::TYPE_MOVIE . '/1000',
'guid_imdb' => StateInterface::TYPE_MOVIE . '/1100',
'guid_tvdb' => StateInterface::TYPE_MOVIE . '/1200',
'guid_tmdb' => StateInterface::TYPE_MOVIE . '/1300',
'guid_tvmaze' => StateInterface::TYPE_MOVIE . '/1400',
'guid_tvrage' => StateInterface::TYPE_MOVIE . '/1500',
'guid_anidb' => StateInterface::TYPE_MOVIE . '/1600',
];

View File

@@ -1,163 +0,0 @@
<?php
declare(strict_types=1);
namespace Tests\Mappers\Import;
use App\Libs\Entity\StateEntity;
use App\Libs\Entity\StateInterface;
use App\Libs\Extends\CliLogger;
use App\Libs\Mappers\Import\DirectMapper;
use App\Libs\Storage\PDO\PDOAdapter;
use App\Libs\Storage\StorageInterface;
use PDO;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Console\Input\ArrayInput;
use Symfony\Component\Console\Output\NullOutput;
class DirectMapperTest extends TestCase
{
private array $testMovie = [];
private array $testEpisode = [];
private DirectMapper|null $mapper = null;
private StorageInterface|null $storage = null;
public function setUp(): void
{
$this->output = new NullOutput();
$this->input = new ArrayInput([]);
$this->testMovie = require __DIR__ . '/../../Fixtures/MovieEntity.php';
$this->testEpisode = require __DIR__ . '/../../Fixtures/EpisodeEntity.php';
$logger = new CliLogger($this->output);
$this->storage = new PDOAdapter($logger, new PDO('sqlite::memory:'));
$this->storage->migrations('up');
$this->mapper = new DirectMapper($logger, $this->storage);
$this->mapper->setUp(['class' => new StateEntity([])]);
}
public function test_add_conditions(): void
{
$testMovie = new StateEntity($this->testMovie);
$testEpisode = new StateEntity($this->testEpisode);
// -- expect 0 as we have not modified or added new item yet.
$this->assertCount(0, $this->mapper);
$this->mapper->add('test', 'test1', $testEpisode)->add('test', 'test2', $testMovie);
$this->assertCount(2, $this->mapper);
$this->assertSame(
[
StateInterface::TYPE_MOVIE => ['added' => 1, 'updated' => 0, 'failed' => 0],
StateInterface::TYPE_EPISODE => ['added' => 1, 'updated' => 0, 'failed' => 0],
],
$this->mapper->commit()
);
// -- assert 0 as we have committed the changes to the db, and the state should have been reset.
$this->assertCount(0, $this->mapper);
$testEpisode->guid_tvrage = StateInterface::TYPE_EPISODE . '/2';
$this->mapper->add('test', 'test1', $testEpisode);
$this->assertCount(1, $this->mapper);
}
public function test_get_conditions(): void
{
$testMovie = new StateEntity($this->testMovie);
$testEpisode = new StateEntity($this->testEpisode);
// -- expect null as we haven't added anything to db yet.
$this->assertNull($this->mapper->get($testEpisode));
$this->storage->commit([$testEpisode, $testMovie]);
clone $testMovie2 = $testMovie;
clone $testEpisode2 = $testEpisode;
$testMovie2->id = 2;
$testEpisode2->id = 1;
$this->assertSame($testEpisode2->getAll(), $this->mapper->get($testEpisode)->getAll());
$this->assertSame($testMovie2->getAll(), $this->mapper->get($testMovie)->getAll());
}
public function test_remove_conditions(): void
{
$testMovie = new StateEntity($this->testMovie);
$testEpisode = new StateEntity($this->testEpisode);
$this->assertFalse($this->mapper->remove($testEpisode));
$this->mapper->add('test', 'episode', $testEpisode)->add('test', 'movie', $testMovie)->commit();
$this->assertTrue($this->mapper->remove($testEpisode));
}
public function test_commit_conditions(): void
{
$testMovie = new StateEntity($this->testMovie);
$testEpisode = new StateEntity($this->testEpisode);
// -- expect 0 as we have not modified or added new item yet.
$this->assertCount(0, $this->mapper);
$this->mapper->add('test', 'test1', $testEpisode)->add('test', 'test2', $testMovie);
$this->assertCount(2, $this->mapper);
$this->assertSame(
[
StateInterface::TYPE_MOVIE => ['added' => 1, 'updated' => 0, 'failed' => 0],
StateInterface::TYPE_EPISODE => ['added' => 1, 'updated' => 0, 'failed' => 0],
],
$this->mapper->commit()
);
$this->assertSame(
[
StateInterface::TYPE_MOVIE => ['added' => 0, 'updated' => 0, 'failed' => 0],
StateInterface::TYPE_EPISODE => ['added' => 0, 'updated' => 0, 'failed' => 0],
],
$this->mapper->commit()
);
$testEpisode->guid_tvrage = StateInterface::TYPE_EPISODE . '/1';
$testMovie->guid_tvrage = StateInterface::TYPE_MOVIE . '/1';
$this->mapper->add('test', 'test1', $testEpisode)->add('test', 'test2', $testMovie);
$this->assertSame(
[
StateInterface::TYPE_MOVIE => ['added' => 0, 'updated' => 1, 'failed' => 0],
StateInterface::TYPE_EPISODE => ['added' => 0, 'updated' => 1, 'failed' => 0],
],
$this->mapper->commit()
);
}
public function test_has_conditions(): void
{
$testEpisode = new StateEntity($this->testEpisode);
$this->assertFalse($this->mapper->has($testEpisode));
$this->storage->commit([$testEpisode]);
$this->assertTrue($this->mapper->has($testEpisode));
}
public function test_reset_conditions(): void
{
$testEpisode = new StateEntity($this->testEpisode);
$this->assertCount(0, $this->mapper);
$this->mapper->add('test', 'episode', $testEpisode);
$this->assertCount(1, $this->mapper);
$this->mapper->reset();
$this->assertCount(0, $this->mapper);
}
}

View File

@@ -97,7 +97,7 @@ class MemoryMapperTest extends TestCase
// -- assert 0 as we have committed the changes to the db, and the state should have been reset.
$this->assertCount(0, $this->mapper);
$testEpisode->guid_tvrage = StateInterface::TYPE_EPISODE . '/2';
$testEpisode->guids['guid_tvrage'] = '2';
$this->mapper->add('test', 'test1', $testEpisode);
@@ -116,8 +116,19 @@ class MemoryMapperTest extends TestCase
public function test_get_conditions(): void
{
$testMovie = new StateEntity($this->testMovie);
$testEpisode = new StateEntity($this->testEpisode);
$movie = $this->testMovie;
$episode = $this->testEpisode;
ksort($movie['parent']);
ksort($movie['guids']);
ksort($movie['extra']);
ksort($episode['parent']);
ksort($episode['guids']);
ksort($episode['extra']);
$testMovie = new StateEntity($movie);
$testEpisode = new StateEntity($episode);
// -- expect null as we haven't added anything to db yet.
$this->assertNull($this->mapper->get($testEpisode));
@@ -168,8 +179,8 @@ class MemoryMapperTest extends TestCase
$this->mapper->commit()
);
$testMovie->guid_anidb = StateInterface::TYPE_MOVIE . '/1';
$testEpisode->guid_anidb = StateInterface::TYPE_EPISODE . '/1';
$testMovie->guids['guid_anidb'] = '1';
$testEpisode->guids['guid_anidb'] = '1';
$this->assertSame(
[

View File

@@ -53,7 +53,13 @@ class PDOAdapterTest extends TestCase
public function test_get_conditions(): void
{
$item = new StateEntity($this->testEpisode);
$test = $this->testEpisode;
ksort($test['parent']);
ksort($test['guids']);
ksort($test['extra']);
$item = new StateEntity($test);
// -- db should be empty at this stage. as such we expect null.
$this->assertNull($this->storage->get($item));
@@ -100,7 +106,7 @@ class PDOAdapterTest extends TestCase
public function test_update_conditions(): void
{
$item = $this->storage->insert(new StateEntity($this->testEpisode));
$item->guid_plex = StateInterface::TYPE_EPISODE . '/1000';
$item->guids['guid_plex'] = StateInterface::TYPE_EPISODE . '/1000';
$updatedItem = $this->storage->update($item);
@@ -133,21 +139,15 @@ class PDOAdapterTest extends TestCase
$item2 = new StateEntity($this->testMovie);
$this->assertSame(
[
StateInterface::TYPE_MOVIE => ['added' => 1, 'updated' => 0, 'failed' => 0],
StateInterface::TYPE_EPISODE => ['added' => 1, 'updated' => 0, 'failed' => 0],
],
['added' => 2, 'updated' => 0, 'failed' => 0],
$this->storage->commit([$item1, $item2])
);
$item1->guid_anidb = StateInterface::TYPE_EPISODE . '/1';
$item2->guid_anidb = StateInterface::TYPE_MOVIE . '/1';
$item1->guids['guid_anidb'] = StateInterface::TYPE_EPISODE . '/1';
$item2->guids['guid_anidb'] = StateInterface::TYPE_MOVIE . '/1';
$this->assertSame(
[
StateInterface::TYPE_MOVIE => ['added' => 0, 'updated' => 1, 'failed' => 0],
StateInterface::TYPE_EPISODE => ['added' => 0, 'updated' => 1, 'failed' => 0],
],
['added' => 0, 'updated' => 2, 'failed' => 0],
$this->storage->commit([$item1, $item2])
);
}