Added Progress command, it's still experimental and only works correctly for plex at the moment. do not use it yet.

This commit is contained in:
Abdulmhsen B. A. A
2023-11-10 18:57:19 +03:00
parent e25349f8d1
commit 302f945763
11 changed files with 787 additions and 2 deletions

View File

@@ -110,6 +110,17 @@ interface ClientInterface
*/
public function push(array $entities, QueueRequests $queue, iDate|null $after = null): array;
/**
* Compare watch progress and push to backend.
*
* @param array<StateInterface> $entities
* @param QueueRequests $queue
* @param iDate|null $after
*
* @return array
*/
public function progress(array $entities, QueueRequests $queue, iDate|null $after = null): array;
/**
* Search backend libraries.
*

View File

@@ -0,0 +1,9 @@
<?php
declare(strict_types=1);
namespace App\Backends\Emby\Action;
class Progress extends \App\Backends\Jellyfin\Action\Progress
{
}

View File

@@ -18,6 +18,7 @@ use App\Backends\Emby\Action\GetUsersList;
use App\Backends\Emby\Action\Import;
use App\Backends\Emby\Action\InspectRequest;
use App\Backends\Emby\Action\ParseWebhook;
use App\Backends\Emby\Action\Progress;
use App\Backends\Emby\Action\Push;
use App\Backends\Emby\Action\SearchId;
use App\Backends\Emby\Action\SearchQuery;
@@ -238,6 +239,26 @@ class EmbyClient implements iClient
return [];
}
public function progress(array $entities, QueueRequests $queue, iDate|null $after = null): array
{
$response = Container::get(Progress::class)(
context: $this->context,
entities: $entities,
queue: $queue,
after: $after
);
if ($response->hasError()) {
$this->logger->log($response->error->level(), $response->error->message, $response->error->context);
}
if (false === $response->isSuccessful()) {
throw new RuntimeException(ag($response->extra, 'message', fn() => $response->error->format()));
}
return [];
}
public function search(string $query, int $limit = 25, array $opts = []): array
{
$response = Container::get(SearchQuery::class)(

View File

@@ -0,0 +1,176 @@
<?php
declare(strict_types=1);
namespace App\Backends\Jellyfin\Action;
use App\Backends\Common\CommonTrait;
use App\Backends\Common\Context;
use App\Backends\Common\Response;
use App\Libs\Entity\StateInterface as iState;
use App\Libs\Options;
use App\Libs\QueueRequests;
use DateTimeInterface;
use Psr\Log\LoggerInterface;
use Symfony\Contracts\HttpClient\HttpClientInterface;
use Throwable;
class Progress
{
use CommonTrait;
public function __construct(protected HttpClientInterface $http, protected LoggerInterface $logger)
{
}
/**
* Push Play state.
*
* @param Context $context
* @param array<iState> $entities
* @param QueueRequests $queue
* @param DateTimeInterface|null $after
* @return Response
*/
public function __invoke(
Context $context,
array $entities,
QueueRequests $queue,
DateTimeInterface|null $after = null
): Response {
return $this->tryResponse(context: $context, fn: fn() => $this->action($context, $entities, $queue, $after));
}
private function action(
Context $context,
array $entities,
QueueRequests $queue,
DateTimeInterface|null $after = null
): Response {
$ignoreDate = (bool)ag($context->options, Options::IGNORE_DATE, false);
foreach ($entities as $key => $entity) {
if (true !== ($entity instanceof iState)) {
continue;
}
if (null !== $after && false === (bool)ag($context->options, Options::IGNORE_DATE, false)) {
if ($after->getTimestamp() > $entity->updated) {
continue;
}
}
$metadata = $entity->getMetadata($context->backendName);
$logContext = [
'item' => [
'id' => $entity->id,
'type' => $entity->type,
'title' => $entity->getName(),
],
];
if (null === ag($metadata, iState::COLUMN_ID, null)) {
$this->logger->warning(
'Ignoring [{item.title}] for [{backend}]. No metadata was found.',
[
'backend' => $context->backendName,
...$logContext,
]
);
continue;
}
if (null === ($senderDate = ag($entity->getExtra($entity->via), iState::COLUMN_EXTRA_DATE))) {
$this->logger->warning('Ignoring [{item.title}] for [{backend}]. No Sender has set no date.', [
'backend' => $context->backendName,
...$logContext,
]);
continue;
}
if ($context->backendName === $entity->via) {
$this->logger->debug('Ignoring event as it was originated from this backend.', [
'backend' => $context->backendName,
...$logContext,
]);
continue;
}
if (null !== ($datetime = ag($entity->getExtra($context->backendName), iState::COLUMN_EXTRA_DATE, null))) {
if (false === $ignoreDate && makeDate($datetime) > makeDate($senderDate)) {
$this->logger->warning(
'Ignoring [{item.title}] for [{backend}]. Sender date is older than backend date.',
[
'backend' => $context->backendName,
...$logContext,
]
);
continue;
}
}
$logContext['remote']['id'] = ag($metadata, iState::COLUMN_ID);
try {
$url = $context->backendUrl->withPath(
r('/Users/{user_id}/PlayingItems/{item_id}/Progress', [
'user_id' => $context->backendUser,
'item_id' => $logContext['remote']['id'],
])
)->withQuery(
http_build_query([
'mediaSourceId' => $logContext['remote']['id'],
'positionTicks' => floor($entity->getPlayProgress() * 1_00_00),
])
);
$logContext['remote']['url'] = (string)$url;
$this->logger->debug('Updating [{backend}] {item.type} [{item.title}] watch progress.', [
'backend' => $context->backendName,
...$logContext,
]);
if (false === (bool)ag($context->options, Options::DRY_RUN, false)) {
$queue->add(
$this->http->request(
'POST',
(string)$url,
array_replace_recursive($context->backendHeaders, [
'json' => [
'UserData' => [
],
],
'user_data' => [
'id' => $key,
'context' => $logContext + [
'backend' => $context->backendName,
],
],
])
)
);
}
} catch (Throwable $e) {
$this->logger->error(
'Unhandled exception was thrown during request to change [{backend}] {item.type} [{item.title}] watch progress.',
[
'backend' => $context->backendName,
...$logContext,
'exception' => [
'file' => $e->getFile(),
'line' => $e->getLine(),
'kind' => get_class($e),
'message' => $e->getMessage(),
'trace' => $context->trace ? $e->getTrace() : [],
],
]
);
}
}
return new Response(status: true, response: $queue);
}
}

View File

@@ -18,6 +18,7 @@ use App\Backends\Jellyfin\Action\GetUsersList;
use App\Backends\Jellyfin\Action\Import;
use App\Backends\Jellyfin\Action\InspectRequest;
use App\Backends\Jellyfin\Action\ParseWebhook;
use App\Backends\Jellyfin\Action\Progress;
use App\Backends\Jellyfin\Action\Push;
use App\Backends\Jellyfin\Action\SearchId;
use App\Backends\Jellyfin\Action\SearchQuery;
@@ -253,6 +254,26 @@ class JellyfinClient implements iClient
return [];
}
public function progress(array $entities, QueueRequests $queue, iDate|null $after = null): array
{
$response = Container::get(Progress::class)(
context: $this->context,
entities: $entities,
queue: $queue,
after: $after
);
if ($response->hasError()) {
$this->logger->log($response->error->level(), $response->error->message, $response->error->context);
}
if (false === $response->isSuccessful()) {
throw new RuntimeException(ag($response->extra, 'message', fn() => $response->error->format()));
}
return [];
}
public function search(string $query, int $limit = 25, array $opts = []): array
{
$response = Container::get(SearchQuery::class)(

View File

@@ -0,0 +1,168 @@
<?php
declare(strict_types=1);
namespace App\Backends\Plex\Action;
use App\Backends\Common\CommonTrait;
use App\Backends\Common\Context;
use App\Backends\Common\Response;
use App\Libs\Entity\StateInterface as iState;
use App\Libs\Options;
use App\Libs\QueueRequests;
use DateTimeInterface;
use Psr\Log\LoggerInterface;
use Symfony\Contracts\HttpClient\HttpClientInterface;
use Throwable;
class Progress
{
use CommonTrait;
public function __construct(protected HttpClientInterface $http, protected LoggerInterface $logger)
{
}
/**
* Push Play state.
*
* @param Context $context
* @param array<iState> $entities
* @param QueueRequests $queue
* @param DateTimeInterface|null $after
* @return Response
*/
public function __invoke(
Context $context,
array $entities,
QueueRequests $queue,
DateTimeInterface|null $after = null
): Response {
return $this->tryResponse(context: $context, fn: fn() => $this->action($context, $entities, $queue, $after));
}
private function action(
Context $context,
array $entities,
QueueRequests $queue,
DateTimeInterface|null $after = null
): Response {
$ignoreDate = (bool)ag($context->options, Options::IGNORE_DATE, false);
foreach ($entities as $key => $entity) {
if (true !== ($entity instanceof iState)) {
continue;
}
if (null !== $after && false === (bool)ag($context->options, Options::IGNORE_DATE, false)) {
if ($after->getTimestamp() > $entity->updated) {
continue;
}
}
$metadata = $entity->getMetadata($context->backendName);
$logContext = [
'item' => [
'id' => $entity->id,
'type' => $entity->type,
'title' => $entity->getName(),
],
];
if (null === ag($metadata, iState::COLUMN_ID, null)) {
$this->logger->warning(
'Ignoring [{item.title}] for [{backend}]. No metadata was found.',
[
'backend' => $context->backendName,
...$logContext,
]
);
continue;
}
if (null === ($senderDate = ag($entity->getExtra($entity->via), iState::COLUMN_EXTRA_DATE))) {
$this->logger->warning('Ignoring [{item.title}] for [{backend}]. No Sender has set no date.', [
'backend' => $context->backendName,
...$logContext,
]);
continue;
}
if ($context->backendName === $entity->via) {
$this->logger->debug('Ignoring event as it was originated from this backend.', [
'backend' => $context->backendName,
...$logContext,
]);
continue;
}
if (null !== ($datetime = ag($entity->getExtra($context->backendName), iState::COLUMN_EXTRA_DATE, null))) {
if (false === $ignoreDate && makeDate($datetime) > makeDate($senderDate)) {
$this->logger->warning(
'Ignoring [{item.title}] for [{backend}]. Sender date is older than backend date.',
[
'backend' => $context->backendName,
...$logContext,
]
);
continue;
}
}
$logContext['remote']['id'] = ag($metadata, iState::COLUMN_ID);
try {
$url = $context->backendUrl->withPath('/:/progress/')->withQuery(
http_build_query([
'key' => $logContext['remote']['id'],
'identifier' => 'com.plexapp.plugins.library',
'state' => 'stopped',
'time' => $entity->getPlayProgress(),
])
);
$logContext['remote']['url'] = (string)$url;
$this->logger->debug('Updating [{backend}] {item.type} [{item.title}] watch progress.', [
'backend' => $context->backendName,
...$logContext,
]);
if (false === (bool)ag($context->options, Options::DRY_RUN, false)) {
$queue->add(
$this->http->request(
'POST',
(string)$url,
array_replace_recursive($context->backendHeaders, [
'user_data' => [
'id' => $key,
'context' => $logContext + [
'backend' => $context->backendName,
],
],
])
)
);
}
} catch (Throwable $e) {
$this->logger->error(
'Unhandled exception was thrown during request to change [{backend}] {item.type} [{item.title}] watch progress.',
[
'backend' => $context->backendName,
...$logContext,
'exception' => [
'file' => $e->getFile(),
'line' => $e->getLine(),
'kind' => get_class($e),
'message' => $e->getMessage(),
'trace' => $context->trace ? $e->getTrace() : [],
],
]
);
}
}
return new Response(status: true, response: $queue);
}
}

View File

@@ -19,6 +19,7 @@ use App\Backends\Plex\Action\GetUserToken;
use App\Backends\Plex\Action\Import;
use App\Backends\Plex\Action\InspectRequest;
use App\Backends\Plex\Action\ParseWebhook;
use App\Backends\Plex\Action\Progress;
use App\Backends\Plex\Action\Push;
use App\Backends\Plex\Action\SearchId;
use App\Backends\Plex\Action\SearchQuery;
@@ -259,6 +260,26 @@ class PlexClient implements iClient
return [];
}
public function progress(array $entities, QueueRequests $queue, iDate|null $after = null): array
{
$response = Container::get(Progress::class)(
context: $this->context,
entities: $entities,
queue: $queue,
after: $after
);
if ($response->hasError()) {
$this->logger->log($response->error->level(), $response->error->message, $response->error->context);
}
if (false === $response->isSuccessful()) {
throw new RuntimeException(ag($response->extra, 'message', fn() => $response->error->format()));
}
return [];
}
public function search(string $query, int $limit = 25, array $opts = []): array
{
$response = Container::get(SearchQuery::class)(

View File

@@ -0,0 +1,307 @@
<?php
declare(strict_types=1);
namespace App\Commands\State;
use App\Command;
use App\Libs\Config;
use App\Libs\Container;
use App\Libs\Database\DatabaseInterface as iDB;
use App\Libs\Entity\StateInterface as iState;
use App\Libs\Options;
use App\Libs\QueueRequests;
use App\Libs\Routable;
use Psr\Log\LoggerInterface as iLogger;
use Psr\SimpleCache\CacheInterface as iCache;
use Psr\SimpleCache\InvalidArgumentException;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
#[Routable(command: self::ROUTE)]
class ProgressCommand extends Command
{
public const ROUTE = 'state:progress';
public const TASK_NAME = 'progress';
public function __construct(
private iLogger $logger,
private iCache $cache,
private iDB $db,
private QueueRequests $queue
) {
set_time_limit(0);
ini_set('memory_limit', '-1');
parent::__construct();
}
protected function configure(): void
{
$this->setName(self::ROUTE)
->setDescription('Push queued watch progress.')
->addOption('keep', 'k', InputOption::VALUE_NONE, 'Do not expunge queue after run is complete.')
->addOption('dry-run', null, InputOption::VALUE_NONE, 'Do not commit changes to backends.')
->addOption('list', 'l', InputOption::VALUE_NONE, 'List queued items.')
->addOption('ignore-date', null, InputOption::VALUE_NONE, 'Ignore date comparison.')
->setHelp(
r(
<<<HELP
<error>***WARNING THIS COMMAND IS EXPERIMENTAL AND MAY NOT WORK AS EXPECTED***</error>
<notice>THIS COMMAND ONLY WORKS CORRECTLY FOR PLEX AT THE MOMENT</notice>
This command push <notice>user</notice> watch progress to export enabled backends.
You should not run this manually and instead rely on scheduled task to run this command.
This command require the <notice>metadata</notice> to be already saved in database.
If no metadata available for a backend, then watch progress update won't be sent to that backend.
HELP,
[
'cmd' => trim(commandContext()),
'route' => self::ROUTE,
]
)
);
}
/**
* @param InputInterface $input
* @param OutputInterface $output
* @return int
* @throws InvalidArgumentException
*/
protected function runCommand(InputInterface $input, OutputInterface $output): int
{
return $this->single(fn(): int => $this->process($input, $output), $output);
}
/**
* @throws InvalidArgumentException
*/
protected function process(InputInterface $input, OutputInterface $output): int
{
if (!$this->cache->has('progress')) {
$this->logger->info('No watch progress items in the queue.');
return self::SUCCESS;
}
$entities = $items = [];
foreach ($this->cache->get('progress', []) as $item) {
/** @var iState $item */
$items[] = Container::get(iState::class)::fromArray($item->getAll());
}
if (!empty($items)) {
foreach ($items as $queueItem) {
$dbItem = $this->db->get($queueItem);
$dbItem = $dbItem->apply($queueItem);
if (!$dbItem->hasPlayProgress()) {
continue;
}
$entities[$dbItem->id] = $dbItem;
}
}
$items = null;
if (empty($entities)) {
$this->cache->delete('progress');
$this->logger->debug('No watch progress items in the queue.');
return self::SUCCESS;
}
if ($input->getOption('list')) {
return $this->listItems($input, $output, $entities);
}
$list = [];
$supported = Config::get('supported', []);
foreach ((array)Config::get('servers', []) as $backendName => $backend) {
$type = strtolower(ag($backend, 'type', 'unknown'));
if (true !== (bool)ag($backend, 'export.enabled')) {
$this->logger->info('SYSTEM: Export to [{backend}] is disabled by user.', [
'backend' => $backendName,
]);
continue;
}
if (!isset($supported[$type])) {
$this->logger->error('SYSTEM: [{backend}] Invalid type.', [
'backend' => $backendName,
'condition' => [
'expected' => implode(', ', array_keys($supported)),
'given' => $type,
],
]);
continue;
}
if (null === ($url = ag($backend, 'url')) || true !== is_string(parse_url($url, PHP_URL_HOST))) {
$this->logger->error('SYSTEM: [{backend}] Invalid url.', [
'backend' => $backendName,
'url' => $url ?? 'None',
]);
continue;
}
$backend['name'] = $backendName;
$list[$backendName] = $backend;
}
if (empty($list)) {
$this->logger->warning('SYSTEM: There are no backends with export enabled.');
return self::FAILURE;
}
foreach ($list as $name => &$backend) {
$opts = ag($backend, 'options', []);
if ($input->getOption('ignore-date')) {
$opts[Options::IGNORE_DATE] = true;
}
if ($input->getOption('dry-run')) {
$opts[Options::DRY_RUN] = true;
}
if ($input->getOption('trace')) {
$opts[Options::DEBUG_TRACE] = true;
}
$backend['options'] = $opts;
$backend['class'] = $this->getBackend(name: $name, config: $backend);
$backend['class']->progress(entities: $entities, queue: $this->queue);
}
unset($backend);
$total = count($this->queue);
if ($total >= 1) {
$start = makeDate();
$this->logger->notice('SYSTEM: Sending [{total}] progress update requests.', [
'total' => $total,
'time' => [
'start' => $start,
],
]);
foreach ($this->queue->getQueue() as $response) {
$context = ag($response->getInfo('user_data'), 'context', []);
try {
if (!in_array($response->getStatusCode(), [200, 204])) {
$this->logger->error(
'SYSTEM: Request to change [{backend}] [{item.title}] watch progress returned with unexpected [{status_code}] status code.',
[
'status_code' => $response->getStatusCode(),
...$context
]
);
continue;
}
$this->logger->notice('SYSTEM: Updated [{backend}] [{item.title}] watch progress.', $context);
} catch (\Throwable $e) {
$this->logger->error(
'SYSTEM: Unhandled exception thrown during request to change watch progress of [{backend}] {item.type} [{item.title}].',
[
...$context,
'exception' => [
'file' => $e->getFile(),
'line' => $e->getLine(),
'kind' => get_class($e),
'message' => $e->getMessage(),
'trace' => $e->getTrace(),
],
]
);
}
}
$end = makeDate();
$this->logger->notice('SYSTEM: Sent [{total}] watch progress requests.', [
'total' => $total,
'time' => [
'start' => $start,
'end' => $end,
'duration' => $end->getTimestamp() - $start->getTimestamp(),
],
]);
$this->logger->notice(sprintf('Using WatchState Version - \'%s\'.', getAppVersion()));
} else {
$this->logger->notice('SYSTEM: No play state changes detected.');
}
if (false === $input->getOption('keep') && false === $input->getOption('dry-run')) {
$this->cache->delete('progress');
}
return self::SUCCESS;
}
/**
* List Items.
*
* @param InputInterface $input
* @param OutputInterface $output
* @param array<iState> $items
* @return int
*/
private function listItems(InputInterface $input, OutputInterface $output, array $items): int
{
$list = [];
$mode = $input->getOption('output');
foreach ($items as $item) {
if ('table' === $mode) {
$builder = [
'queued' => makeDate(ag($item->getExtra($item->via), iState::COLUMN_EXTRA_DATE))->format(
'Y-m-d H:i:s T'
),
'via' => $item->via,
'title' => $item->getName(),
'played' => $item->isWatched() ? 'Yes' : 'No',
'play_time' => $this->formatPlayProgress($item->getPlayProgress()),
'tainted' => $item->isTainted() ? 'Yes' : 'No',
'event' => ag($item->getExtra($item->via), iState::COLUMN_EXTRA_EVENT, '??'),
];
} else {
$builder = [
...$item->getAll(),
'tainted' => $item->isTainted(),
];
}
$list[] = $builder;
}
$this->displayContent($list, $output, $mode);
return self::SUCCESS;
}
public function formatPlayProgress(int $milliseconds): string
{
$seconds = floor($milliseconds / 1000);
$minutes = floor($seconds / 60);
$hours = floor($minutes / 60);
$seconds = $seconds % 60;
$minutes = $minutes % 60;
$format = '%02u:%02u:%02u';
return sprintf($format, $hours, $minutes, $seconds);
}
}

View File

@@ -410,6 +410,51 @@ final class StateEntity implements iState
return false;
}
public function getPlayProgress(): int
{
if ($this->isWatched()) {
return 0;
}
$compare = [];
foreach ($this->getMetadata() as $backend => $metadata) {
if (0 !== (int)ag($metadata, iState::COLUMN_WATCHED, 0)) {
continue;
}
if ((int)ag($metadata, iState::COLUMN_META_DATA_PROGRESS, 0) > 1000) {
$compare[$backend] = [
'progress' => (int)ag($metadata, iState::COLUMN_META_DATA_PROGRESS, 0),
'datetime' => ag($this->getExtra($backend), iState::COLUMN_EXTRA_DATE, 0),
];
}
}
$lastProgress = 0;
$lastDate = makeDate($this->updated - 1);
foreach ($compare as $data) {
if (null === ($progress = ag($data, 'progress', null))) {
continue;
}
if (null === ($datetime = ag($data, 'datetime', null))) {
continue;
}
if ($progress < 1000) {
continue;
}
if (makeDate($datetime) > $lastDate) {
$lastDate = makeDate($datetime);
$lastProgress = $progress;
}
}
return $lastProgress;
}
private function isEqualValue(string $key, iState $entity): bool
{
if (iState::COLUMN_UPDATED === $key || iState::COLUMN_WATCHED === $key) {

View File

@@ -352,4 +352,10 @@ interface StateInterface extends LoggerAwareInterface
*/
public function hasPlayProgress(): bool;
/**
* Get play progress. If the item is watched and/or has no progress, then 0 will be returned. otherwise
* time in milliseconds will be returned.
* @return int
*/
public function getPlayProgress(): int;
}

View File

@@ -385,9 +385,9 @@ final class Initializer
$cache->set('requests', $items, new DateInterval('P3D'));
if (true === $entity->hasPlayProgress()) {
if (false === $metadataOnly && true === $entity->hasPlayProgress()) {
$progress = $cache->get('progress', []);
$progress [$itemId] = $entity;
$progress[$itemId] = $entity;
$cache->set('progress', $progress, new DateInterval('P1D'));
}