Merge pull request #540 from arabcoders/dev
Added fallback to queueEvent to cache in-case saving to db was failed.
This commit is contained in:
@@ -449,7 +449,6 @@ onMounted(async () => {
|
||||
|
||||
const updateHwAccel = codec => {
|
||||
const codecInfo = item.value.hardware.codecs.filter(c => c.codec === codec);
|
||||
console.log(codecInfo)
|
||||
if (codecInfo.length < 1) {
|
||||
config.value.hwaccel = false
|
||||
return;
|
||||
|
||||
@@ -94,6 +94,7 @@ readonly class Segments
|
||||
$isVAAPI = $hwaccel && 'h264_vaapi' === $vCodec;
|
||||
$isQSV = $hwaccel && 'h264_qsv' === $vCodec;
|
||||
$segmentSize = number_format((int)$params->get('segment_size', Playlist::SEGMENT_DUR), 6);
|
||||
$directPlay = null === $subtitle && null === $external && $params->has('direct_play');
|
||||
|
||||
if ($isVAAPI && false === file_exists($vaapi_device)) {
|
||||
return api_error(r("VAAPI device '{device}' not found.", ['device' => $vaapi_device]), Status::BAD_REQUEST);
|
||||
@@ -119,17 +120,19 @@ readonly class Segments
|
||||
usleep(20000);
|
||||
}
|
||||
|
||||
$cmd = [
|
||||
'ffmpeg',
|
||||
'-ss',
|
||||
(string)($segment === 0 ? 0 : ($segmentSize * $segment)),
|
||||
'-t',
|
||||
(string)(ag($request->getQueryParams(), 'sd', $segmentSize)),
|
||||
'-xerror',
|
||||
'-hide_banner',
|
||||
'-loglevel',
|
||||
'error',
|
||||
];
|
||||
$directPlay = $directPlay && str_ends_with($this->getStream(ag($json, 'streams', []), 0)['codec_name'], '264');
|
||||
|
||||
$cmd = ['ffmpeg'];
|
||||
if (false === $directPlay) {
|
||||
$cmd[] = '-ss';
|
||||
$cmd[] = (string)($segment === 0 ? 0 : ($segmentSize * $segment));
|
||||
$cmd[] = '-t';
|
||||
$cmd[] = (string)(ag($request->getQueryParams(), 'sd', $segmentSize));
|
||||
}
|
||||
$cmd[] = '-xerror';
|
||||
$cmd[] = '-hide_banner';
|
||||
$cmd[] = '-loglevel';
|
||||
$cmd[] = 'error';
|
||||
|
||||
$tmpSubFile = null;
|
||||
$tmpVidFile = r("{path}/t-{name}-vlink.{type}", [
|
||||
@@ -146,9 +149,11 @@ readonly class Segments
|
||||
symlink($path, $tmpVidFile);
|
||||
}
|
||||
|
||||
$cmd[] = '-copyts';
|
||||
if (false === $directPlay) {
|
||||
$cmd[] = '-copyts';
|
||||
}
|
||||
|
||||
if ($isQSV) {
|
||||
if ($isQSV && false === $directPlay) {
|
||||
$cmd[] = '-hwaccel';
|
||||
$cmd[] = 'qsv';
|
||||
if ($overlay) {
|
||||
@@ -157,7 +162,7 @@ readonly class Segments
|
||||
}
|
||||
}
|
||||
|
||||
if ($isVAAPI) {
|
||||
if ($isVAAPI && false === $directPlay) {
|
||||
$cmd[] = '-hwaccel';
|
||||
$cmd[] = 'vaapi';
|
||||
$cmd[] = '-vaapi_device';
|
||||
@@ -171,6 +176,13 @@ readonly class Segments
|
||||
$cmd[] = '-i';
|
||||
$cmd[] = 'file:' . $tmpVidFile;
|
||||
|
||||
if (true === $directPlay) {
|
||||
$cmd[] = '-ss';
|
||||
$cmd[] = (string)($segment === 0 ? 0 : ($segmentSize * $segment));
|
||||
$cmd[] = '-t';
|
||||
$cmd[] = (string)(ag($request->getQueryParams(), 'sd', $segmentSize));
|
||||
}
|
||||
|
||||
# remove garbage metadata.
|
||||
$cmd[] = '-map_metadata';
|
||||
$cmd[] = '-1';
|
||||
@@ -180,8 +192,13 @@ readonly class Segments
|
||||
$cmd[] = '-pix_fmt';
|
||||
$cmd[] = $params->get('pix_fmt', 'yuv420p');
|
||||
|
||||
$cmd[] = '-g';
|
||||
$cmd[] = '52';
|
||||
if (true === $directPlay) {
|
||||
$cmd[] = '-force_key_frames';
|
||||
$cmd[] = 'expr:gte(t,n_forced*' . (int)$sConfig['segment_size'] . ')';
|
||||
} else {
|
||||
$cmd[] = '-g';
|
||||
$cmd[] = '52';
|
||||
}
|
||||
|
||||
if ($overlay && empty($external) && null !== $subtitle) {
|
||||
$cmd[] = '-filter_complex';
|
||||
@@ -201,28 +218,30 @@ readonly class Segments
|
||||
|
||||
$cmd[] = '-strict';
|
||||
$cmd[] = '-2';
|
||||
if (empty($external) && $isVAAPI) {
|
||||
if (empty($external) && $isVAAPI && false === $directPlay) {
|
||||
$cmd[] = '-vf';
|
||||
$cmd[] = 'format=nv12,hwupload';
|
||||
}
|
||||
$cmd[] = '-codec:v';
|
||||
$cmd[] = $vCodec;
|
||||
$cmd[] = $directPlay ? 'copy' : $vCodec;
|
||||
|
||||
$cmd[] = '-crf';
|
||||
$cmd[] = $params->get('video_crf', '23');
|
||||
$cmd[] = '-preset:v';
|
||||
$cmd[] = $params->get('video_preset', 'fast');
|
||||
if (false === $directPlay) {
|
||||
$cmd[] = '-crf';
|
||||
$cmd[] = $params->get('video_crf', '23');
|
||||
$cmd[] = '-preset:v';
|
||||
$cmd[] = $params->get('video_preset', 'fast');
|
||||
|
||||
if (0 !== (int)$params->get('video_bitrate', 0)) {
|
||||
$cmd[] = '-b:v';
|
||||
$cmd[] = $params->get('video_bitrate', '192k');
|
||||
if (0 !== (int)$params->get('video_bitrate', 0)) {
|
||||
$cmd[] = '-b:v';
|
||||
$cmd[] = $params->get('video_bitrate', '192k');
|
||||
}
|
||||
|
||||
$cmd[] = '-level';
|
||||
$cmd[] = $params->get('video_level', '4.1');
|
||||
$cmd[] = '-profile:v';
|
||||
$cmd[] = $params->get('video_profile', 'main');
|
||||
}
|
||||
|
||||
$cmd[] = '-level';
|
||||
$cmd[] = $params->get('video_level', '4.1');
|
||||
$cmd[] = '-profile:v';
|
||||
$cmd[] = $params->get('video_profile', 'main');
|
||||
|
||||
// -- audio section.
|
||||
$cmd[] = '-map';
|
||||
$cmd[] = null === $audio ? '0:a:0' : "0:{$audio}";
|
||||
@@ -270,12 +289,16 @@ readonly class Segments
|
||||
$cmd[] = '-sn';
|
||||
}
|
||||
|
||||
if (true === $directPlay) {
|
||||
$cmd[] = '-output_ts_offset';
|
||||
$cmd[] = (string)($segment * $segmentSize);
|
||||
}
|
||||
|
||||
$cmd[] = '-muxdelay';
|
||||
$cmd[] = '0';
|
||||
$cmd[] = '-f';
|
||||
$cmd[] = 'mpegts';
|
||||
$cmd[] = 'pipe:1';
|
||||
|
||||
$debug = (bool)ag($sConfig, 'debug', false);
|
||||
|
||||
try {
|
||||
@@ -298,7 +321,7 @@ readonly class Segments
|
||||
'stderr' => $process->getErrorOutput(),
|
||||
'Ffmpeg' => $process->getCommandLine(),
|
||||
'config' => $sConfig,
|
||||
'command' => implode(' ', $cmd),
|
||||
'command' => $this->cmdLog($cmd),
|
||||
]
|
||||
);
|
||||
|
||||
@@ -314,7 +337,7 @@ readonly class Segments
|
||||
|
||||
if (true === $debug) {
|
||||
$response = $response
|
||||
->withHeader('X-Ffmpeg', $process->getCommandLine())
|
||||
->withHeader('X-Ffmpeg', $this->cmdLog($cmd))
|
||||
->withHeader(
|
||||
'X-Transcode-Config',
|
||||
json_encode($sConfig, JSON_UNESCAPED_SLASHES | JSON_UNESCAPED_UNICODE)
|
||||
@@ -325,23 +348,24 @@ readonly class Segments
|
||||
}
|
||||
|
||||
$response = api_response(Status::OK, body: Stream::create($process->getOutput()), headers: [
|
||||
// 'Access-Control-Allow-Origin' => '*',
|
||||
'Content-Type' => 'video/mpegts',
|
||||
'X-Transcode-Time' => round($end - $start, 6),
|
||||
'X-Emitter-Flush' => 1,
|
||||
'Pragma' => 'public',
|
||||
'Access-Control-Allow-Origin' => '*',
|
||||
'Cache-Control' => sprintf('public, max-age=%s', time() + 31536000),
|
||||
'Last-Modified' => sprintf('%s GMT', gmdate('D, d M Y H:i:s', time())),
|
||||
'Expires' => sprintf('%s GMT', gmdate('D, d M Y H:i:s', time() + 31536000)),
|
||||
]);
|
||||
|
||||
if (true === $debug) {
|
||||
$response = $response
|
||||
->withHeader('X-Ffmpeg', $process->getCommandLine())
|
||||
->withHeader('X-Ffmpeg', $this->cmdLog($cmd))
|
||||
->withHeader(
|
||||
'X-Transcode-Config',
|
||||
json_encode($sConfig, JSON_UNESCAPED_SLASHES | JSON_UNESCAPED_UNICODE)
|
||||
);
|
||||
} else {
|
||||
$response = $response->withHeader('Pragma', 'public')
|
||||
->withHeader('Cache-Control', sprintf('public, max-age=%s', time() + 31536000))
|
||||
->withHeader('Last-Modified', sprintf('%s GMT', gmdate('D, d M Y H:i:s', time())))
|
||||
->withHeader('Expires', sprintf('%s GMT', gmdate('D, d M Y H:i:s', time() + 31536000)));
|
||||
}
|
||||
|
||||
return $response;
|
||||
@@ -349,7 +373,7 @@ readonly class Segments
|
||||
$this->logger->error("Failed to generate segment. '{error}' at {file}:{line}", [
|
||||
'stdout' => isset($process) ? $process->getOutput() : null,
|
||||
'stderr' => isset($process) ? $process->getErrorOutput() : null,
|
||||
'Ffmpeg' => isset($process) ? $process->getCommandLine() : null,
|
||||
'Ffmpeg' => $this->cmdLog($cmd),
|
||||
'config' => $sConfig,
|
||||
'command' => implode(' ', $cmd),
|
||||
'error' => $e->getMessage(),
|
||||
@@ -360,14 +384,12 @@ readonly class Segments
|
||||
|
||||
$response = api_error('Failed to generate segment. check logs.', Status::INTERNAL_SERVER_ERROR);
|
||||
if (true === $debug) {
|
||||
if (isset($process)) {
|
||||
$response = $response->withHeader('X-Ffmpeg', $process->getCommandLine());
|
||||
}
|
||||
|
||||
$response = $response->withHeader(
|
||||
'X-Transcode-Config',
|
||||
json_encode($sConfig, JSON_UNESCAPED_SLASHES | JSON_UNESCAPED_UNICODE)
|
||||
);
|
||||
$response = $response
|
||||
->withHeader('X-Ffmpeg', $this->cmdLog($cmd))
|
||||
->withHeader(
|
||||
'X-Transcode-Config',
|
||||
json_encode($sConfig, JSON_UNESCAPED_SLASHES | JSON_UNESCAPED_UNICODE)
|
||||
);
|
||||
}
|
||||
return $response;
|
||||
} finally {
|
||||
@@ -423,8 +445,7 @@ readonly class Segments
|
||||
$this->logger->error('Failed to extract subtitle.', [
|
||||
'stdout' => $process->getOutput(),
|
||||
'stderr' => $process->getErrorOutput(),
|
||||
'Ffmpeg' => $process->getCommandLine(),
|
||||
'command' => implode(' ', $cmd),
|
||||
'Ffmpeg' => $this->cmdLog($cmd),
|
||||
]);
|
||||
return "{$path}:stream_index={$stream}";
|
||||
}
|
||||
@@ -440,8 +461,7 @@ readonly class Segments
|
||||
$this->logger->error("Failed to extract subtitles. '{error}' at {file}:{line}", [
|
||||
'stdout' => isset($process) ? $process->getOutput() : null,
|
||||
'stderr' => isset($process) ? $process->getErrorOutput() : null,
|
||||
'Ffmpeg' => isset($process) ? $process->getCommandLine() : null,
|
||||
'command' => implode(' ', $cmd),
|
||||
'Ffmpeg' => $this->cmdLog($cmd),
|
||||
'error' => $e->getMessage(),
|
||||
'line' => $e->getLine(),
|
||||
'file' => $e->getFile(),
|
||||
@@ -460,4 +480,9 @@ readonly class Segments
|
||||
}
|
||||
return [];
|
||||
}
|
||||
|
||||
private function cmdLog(array $cmd): string
|
||||
{
|
||||
return implode(' ', array_map(fn($v) => str_contains($v, ' ') ? escapeshellarg($v) : $v, $cmd));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -36,9 +36,9 @@ trait CommonTrait
|
||||
return new Response(
|
||||
status: false,
|
||||
error: new Error(
|
||||
message: "Exception '{error.kind}' was thrown unhandled in '{client}: {backend}' {action}. '{error.message}' at '{error.file}:{error.line}'.",
|
||||
message: "{client}: '{backend}' {action} thrown unhandled exception '{error.kind}'. '{error.message}' at '{error.file}:{error.line}'.",
|
||||
context: [
|
||||
'action' => $action ?? 'not_set',
|
||||
'action' => $action ?? '',
|
||||
'backend' => $context->backendName,
|
||||
'client' => $context->clientName,
|
||||
'message' => $e->getMessage(),
|
||||
@@ -81,7 +81,40 @@ trait CommonTrait
|
||||
DateInterval $ttl,
|
||||
iLogger|null $logger = null
|
||||
): mixed {
|
||||
return tryCache($context->cache->getInterface(), $context->backendName . '_' . $key, $fn, $ttl, $logger);
|
||||
try {
|
||||
$cache = $context->cache->getInterface();
|
||||
$cacheKey = $context->backendName . '_' . $key;
|
||||
|
||||
if (true === $cache->has($cacheKey)) {
|
||||
$logger?->debug("{client} Cache hit for key '{backend}: {key}'.", [
|
||||
'key' => $key,
|
||||
'client' => $context->clientName,
|
||||
'backend' => $context->backendName,
|
||||
]);
|
||||
return $cache->get($cacheKey);
|
||||
}
|
||||
} catch (\Psr\SimpleCache\InvalidArgumentException) {
|
||||
/** @noinspection PhpConditionAlreadyCheckedInspection */
|
||||
$logger?->error("{client} Failed to retrieve cached data for '{backend}: {key}'.", [
|
||||
'client' => $context->clientName,
|
||||
'backend' => $context->backendName,
|
||||
'key' => $key,
|
||||
]);
|
||||
}
|
||||
|
||||
$data = $fn();
|
||||
|
||||
try {
|
||||
$cache->set($cacheKey, $data, $ttl);
|
||||
} catch (\Psr\SimpleCache\InvalidArgumentException) {
|
||||
$logger?->error("{client} Failed to cache data for key '{backend}: {key}'.", [
|
||||
'client' => $context->clientName,
|
||||
'backend' => $context->backendName,
|
||||
'key' => $key,
|
||||
]);
|
||||
}
|
||||
|
||||
return $data;
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -4,15 +4,12 @@ declare(strict_types=1);
|
||||
|
||||
namespace App\Backends\Common;
|
||||
|
||||
use App\Libs\Container;
|
||||
use App\Libs\Options;
|
||||
use Psr\Http\Message\UriInterface;
|
||||
use Psr\Log\LoggerInterface as iLogger;
|
||||
|
||||
final class Context
|
||||
final readonly class Context
|
||||
{
|
||||
protected iLogger|null $logger = null;
|
||||
|
||||
/**
|
||||
* Make backend context for classes to work with.
|
||||
*
|
||||
@@ -28,16 +25,17 @@ final class Context
|
||||
* @param array $options optional options.
|
||||
*/
|
||||
public function __construct(
|
||||
public readonly string $clientName,
|
||||
public readonly string $backendName,
|
||||
public readonly UriInterface $backendUrl,
|
||||
public readonly Cache $cache,
|
||||
public readonly string|int|null $backendId = null,
|
||||
public readonly string|int|null $backendToken = null,
|
||||
public readonly string|int|null $backendUser = null,
|
||||
public readonly array $backendHeaders = [],
|
||||
public readonly bool $trace = false,
|
||||
public readonly array $options = []
|
||||
public string $clientName,
|
||||
public string $backendName,
|
||||
public UriInterface $backendUrl,
|
||||
public Cache $cache,
|
||||
public iLogger|null $logger = null,
|
||||
public string|int|null $backendId = null,
|
||||
public string|int|null $backendToken = null,
|
||||
public string|int|null $backendUser = null,
|
||||
public array $backendHeaders = [],
|
||||
public bool $trace = false,
|
||||
public array $options = []
|
||||
) {
|
||||
}
|
||||
|
||||
@@ -54,21 +52,27 @@ final class Context
|
||||
return true === $withUser ? $status && null !== $this->backendUser : $status;
|
||||
}
|
||||
|
||||
public function hasLogger(): bool
|
||||
{
|
||||
return null !== $this->logger;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a logger to the context, and return a new instance.
|
||||
*
|
||||
* @param iLogger $logger
|
||||
*
|
||||
* @return static A new instance with the logger.
|
||||
*/
|
||||
public function withLogger(iLogger $logger): self
|
||||
{
|
||||
$clone = clone $this;
|
||||
$clone->logger = $logger;
|
||||
|
||||
return $clone;
|
||||
}
|
||||
|
||||
public function getLogger(): iLogger
|
||||
{
|
||||
return $this->logger ?? Container::get(iLogger::class);
|
||||
return new Context(
|
||||
clientName: $this->clientName,
|
||||
backendName: $this->backendName,
|
||||
backendUrl: $this->backendUrl,
|
||||
cache: $this->cache,
|
||||
logger: $logger,
|
||||
backendId: $this->backendId,
|
||||
backendToken: $this->backendToken,
|
||||
backendUser: $this->backendUser,
|
||||
backendHeaders: $this->backendHeaders,
|
||||
trace: $this->trace,
|
||||
options: $this->options
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -117,6 +117,7 @@ class EmbyClient implements iClient
|
||||
backendName: $context->backendName,
|
||||
backendUrl: $context->backendUrl,
|
||||
cache: $this->cache->withData(static::CLIENT_NAME . '_' . $context->backendName, $context->options),
|
||||
logger: $context->logger,
|
||||
backendId: $context->backendId,
|
||||
backendToken: $context->backendToken,
|
||||
backendUser: $context->backendUser,
|
||||
@@ -145,10 +146,6 @@ class EmbyClient implements iClient
|
||||
),
|
||||
])
|
||||
);
|
||||
|
||||
if ($context->hasLogger()) {
|
||||
$cloned->context = $cloned->context->withLogger($context->getLogger());
|
||||
}
|
||||
|
||||
$cloned->guid = $cloned->guid->withContext($cloned->context);
|
||||
|
||||
|
||||
@@ -9,6 +9,7 @@ use App\Backends\Common\Context;
|
||||
use App\Backends\Common\Error;
|
||||
use App\Backends\Common\Response;
|
||||
use App\Backends\Jellyfin\JellyfinClient;
|
||||
use App\Libs\Enums\Http\Status;
|
||||
use App\Libs\Options;
|
||||
use DateInterval;
|
||||
use Psr\Log\LoggerInterface;
|
||||
@@ -82,7 +83,7 @@ class GetMetaData
|
||||
)
|
||||
);
|
||||
|
||||
$this->logger->debug('Requesting [{client}: {backend}] item [{id}] metadata.', [
|
||||
$this->logger->debug("{client}: Requesting '{backend}: {id}' item metadata.", [
|
||||
'id' => $id,
|
||||
'url' => $url,
|
||||
'client' => $context->clientName,
|
||||
@@ -99,11 +100,11 @@ class GetMetaData
|
||||
array_replace_recursive($context->backendHeaders, $opts['headers'] ?? [])
|
||||
);
|
||||
|
||||
if (200 !== $response->getStatusCode()) {
|
||||
return new Response(
|
||||
if (Status::OK !== Status::from($response->getStatusCode())) {
|
||||
$response = new Response(
|
||||
status: false,
|
||||
error: new Error(
|
||||
message: 'Request for [{backend}] item [{id}] returned with unexpected [{status_code}] status code.',
|
||||
message: "{client} Request for '{backend}: {id}' item returned with unexpected '{status_code}' status code.",
|
||||
context: [
|
||||
'id' => $id,
|
||||
'client' => $context->clientName,
|
||||
@@ -112,6 +113,8 @@ class GetMetaData
|
||||
]
|
||||
)
|
||||
);
|
||||
$context->logger?->error($response->getError()->message, $response->getError()->context);
|
||||
return $response;
|
||||
}
|
||||
|
||||
$item = json_decode(
|
||||
@@ -132,7 +135,7 @@ class GetMetaData
|
||||
}
|
||||
|
||||
if (true === $context->trace) {
|
||||
$this->logger->debug('Processing [{client}: {backend}] item [{id}] payload.', [
|
||||
$this->logger->debug("{client} Processing '{backend}: {id}' item payload.", [
|
||||
'id' => $id,
|
||||
'client' => $context->clientName,
|
||||
'backend' => $context->backendName,
|
||||
|
||||
@@ -134,6 +134,7 @@ class JellyfinClient implements iClient
|
||||
backendName: $context->backendName,
|
||||
backendUrl: $context->backendUrl,
|
||||
cache: $this->cache->withData(static::CLIENT_NAME . '_' . $context->backendName, $context->options),
|
||||
logger: $context->logger,
|
||||
backendId: $context->backendId,
|
||||
backendToken: $context->backendToken,
|
||||
backendUser: $context->backendUser,
|
||||
@@ -163,10 +164,6 @@ class JellyfinClient implements iClient
|
||||
])
|
||||
);
|
||||
|
||||
if ($context->hasLogger()) {
|
||||
$cloned->context = $cloned->context->withLogger($context->getLogger());
|
||||
}
|
||||
|
||||
$cloned->guid = $cloned->guid->withContext($cloned->context);
|
||||
|
||||
return $cloned;
|
||||
|
||||
@@ -8,6 +8,7 @@ use App\Backends\Common\CommonTrait;
|
||||
use App\Backends\Common\Context;
|
||||
use App\Backends\Common\Error;
|
||||
use App\Backends\Common\Response;
|
||||
use App\Libs\Enums\Http\Status;
|
||||
use App\Libs\Options;
|
||||
use DateInterval;
|
||||
use Psr\Log\LoggerInterface;
|
||||
@@ -50,7 +51,7 @@ final class GetMetaData
|
||||
$url = $context->backendUrl->withPath('/library/metadata/' . $id)
|
||||
->withQuery(http_build_query(array_merge_recursive(['includeGuids' => 1], $opts['query'] ?? [])));
|
||||
|
||||
$this->logger->debug('Requesting [{client}: {backend}] item [{id}] metadata.', [
|
||||
$this->logger->debug("{client}: Requesting '{backend}: {id}' item metadata.", [
|
||||
'client' => $context->clientName,
|
||||
'backend' => $context->backendName,
|
||||
'id' => $id,
|
||||
@@ -67,11 +68,11 @@ final class GetMetaData
|
||||
array_replace_recursive($context->backendHeaders, $opts['headers'] ?? [])
|
||||
);
|
||||
|
||||
if (200 !== $response->getStatusCode()) {
|
||||
return new Response(
|
||||
if (Status::OK !== Status::from($response->getStatusCode())) {
|
||||
$response = new Response(
|
||||
status: false,
|
||||
error: new Error(
|
||||
message: 'Request for [{backend}] item [{id}] returned with unexpected [{status_code}] status code.',
|
||||
message: "{client} Request for '{backend}: {id}' item returned with unexpected '{status_code}' status code.",
|
||||
context: [
|
||||
'id' => $id,
|
||||
'client' => $context->clientName,
|
||||
@@ -80,6 +81,8 @@ final class GetMetaData
|
||||
]
|
||||
)
|
||||
);
|
||||
$context->logger?->error($response->getError()->message, $response->getError()->context);
|
||||
return $response;
|
||||
}
|
||||
|
||||
$content = $response->getContent();
|
||||
@@ -102,7 +105,7 @@ final class GetMetaData
|
||||
}
|
||||
|
||||
if (true === $context->trace) {
|
||||
$this->logger->debug('Processing [{client}: {backend}] item [{id}] payload.', [
|
||||
$this->logger->debug("{client}: Processing '{backend}: {id}' item payload.", [
|
||||
'id' => $id,
|
||||
'client' => $context->clientName,
|
||||
'backend' => $context->backendName,
|
||||
|
||||
@@ -142,6 +142,7 @@ class PlexClient implements iClient
|
||||
backendName: $context->backendName,
|
||||
backendUrl: $context->backendUrl,
|
||||
cache: $this->cache->withData(static::CLIENT_NAME . '_' . $context->backendName, $context->options),
|
||||
logger: $context->logger,
|
||||
backendId: $context->backendId,
|
||||
backendToken: $context->backendToken,
|
||||
backendUser: $context->backendUser,
|
||||
@@ -162,10 +163,6 @@ class PlexClient implements iClient
|
||||
])
|
||||
);
|
||||
|
||||
if ($context->hasLogger()) {
|
||||
$cloned->context = $cloned->context->withLogger($context->getLogger());
|
||||
}
|
||||
|
||||
$cloned->guid = $cloned->guid->withContext($cloned->context);
|
||||
|
||||
return $cloned;
|
||||
|
||||
@@ -13,6 +13,7 @@ use App\Model\Events\EventsTable;
|
||||
use App\Model\Events\EventStatus as Status;
|
||||
use Psr\EventDispatcher\EventDispatcherInterface as iDispatcher;
|
||||
use Psr\Log\LoggerInterface as iLogger;
|
||||
use Psr\SimpleCache\CacheInterface as iCache;
|
||||
use Symfony\Component\Console\Input\InputInterface;
|
||||
use Symfony\Component\Console\Input\InputOption;
|
||||
use Symfony\Component\Console\Output\OutputInterface;
|
||||
@@ -29,6 +30,7 @@ final class DispatchCommand extends Command
|
||||
public function __construct(
|
||||
private readonly iDispatcher $dispatcher,
|
||||
private readonly EventsRepository $repo,
|
||||
private readonly iCache $cache,
|
||||
private iLogger $logger,
|
||||
) {
|
||||
parent::__construct(null);
|
||||
@@ -44,6 +46,8 @@ final class DispatchCommand extends Command
|
||||
|
||||
protected function execute(InputInterface $input, OutputInterface $output): int
|
||||
{
|
||||
$this->unloadEvents();
|
||||
|
||||
registerEvents();
|
||||
|
||||
$id = $input->getOption('id');
|
||||
@@ -145,4 +149,35 @@ final class DispatchCommand extends Command
|
||||
$this->logger->error($errorLog, ['trace' => $e->getTrace()]);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This method will re-queue events that we failed to save, due to database lock issues
|
||||
* which is quite common issue as we mainly use sqlite as our database.
|
||||
*
|
||||
* This method will only attempt to re-queue once, if that fails, it will be lost. and we will log it.
|
||||
* @return void
|
||||
*/
|
||||
private function unloadEvents(): void
|
||||
{
|
||||
try {
|
||||
$events = $this->cache->get('events', []);
|
||||
if (count($events) < 1) {
|
||||
return;
|
||||
}
|
||||
foreach ($events as $eventData) {
|
||||
try {
|
||||
queueEvent(...$eventData);
|
||||
$this->logger->info(
|
||||
"Queued '{event}' event. it was saved to cache due to failure to persist it.",
|
||||
$eventData
|
||||
);
|
||||
} catch (Throwable) {
|
||||
$this->logger->error("Failed to re-queue '{event}' event.", $eventData);
|
||||
}
|
||||
}
|
||||
|
||||
$this->cache->delete('events');
|
||||
} catch (Throwable) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -679,6 +679,7 @@ final class DirectMapper implements iImport
|
||||
'database' => $cloned->getAll(),
|
||||
'backend' => $entity->getAll()
|
||||
],
|
||||
'trace' => $e->getTrace(),
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
@@ -42,7 +42,6 @@ use Psr\Http\Message\ServerRequestInterface as iRequest;
|
||||
use Psr\Http\Message\StreamInterface as iStream;
|
||||
use Psr\Http\Message\UriInterface as iUri;
|
||||
use Psr\Log\LoggerInterface as iLogger;
|
||||
use Psr\SimpleCache\CacheInterface;
|
||||
use Psr\SimpleCache\CacheInterface as iCache;
|
||||
use Symfony\Component\EventDispatcher\EventDispatcher;
|
||||
use Symfony\Component\Process\Process;
|
||||
@@ -1200,44 +1199,6 @@ if (!function_exists('parseConfigValue')) {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
if (!function_exists('tryCache')) {
|
||||
/**
|
||||
* Try to get a value from the cache, if it does not exist, call the callback and cache the result.
|
||||
*
|
||||
* @param iCache $cache The cache instance.
|
||||
* @param string $key The cache key.
|
||||
* @param Closure $callback The callback to call if the key does not exist.
|
||||
* @param DateInterval $ttl The time to live for the cache.
|
||||
* @param iLogger|null $logger The logger instance (optional).
|
||||
*
|
||||
* @return mixed The value from the cache or the callback.
|
||||
*/
|
||||
function tryCache(
|
||||
iCache $cache,
|
||||
string $key,
|
||||
Closure $callback,
|
||||
DateInterval $ttl,
|
||||
iLogger|null $logger = null
|
||||
): mixed {
|
||||
if (true === $cache->has($key)) {
|
||||
$logger?->debug("Cache hit for key '{key}'.", ['key' => $key]);
|
||||
return $cache->get($key);
|
||||
}
|
||||
|
||||
$data = $callback();
|
||||
|
||||
try {
|
||||
$cache->set($key, $data, $ttl);
|
||||
} catch (\Psr\SimpleCache\InvalidArgumentException) {
|
||||
$logger?->error("Failed to cache data for key '{key}'.", ['key' => $key]);
|
||||
}
|
||||
|
||||
return $data;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
if (!function_exists('checkIgnoreRule')) {
|
||||
/**
|
||||
* Check if the given ignore rule is valid.
|
||||
@@ -1913,7 +1874,7 @@ if (!function_exists('cacheableItem')) {
|
||||
DateInterval|int|null $ttl = null,
|
||||
bool $ignoreCache = false
|
||||
): mixed {
|
||||
$cache = Container::get(CacheInterface::class);
|
||||
$cache = Container::get(iCache::class);
|
||||
|
||||
if (!$ignoreCache && $cache->has($key)) {
|
||||
return $cache->get($key);
|
||||
@@ -2010,7 +1971,11 @@ if (!function_exists('queueEvent')) {
|
||||
$item->event = $event;
|
||||
$item->status = EventStatus::PENDING;
|
||||
$item->event_data = $data;
|
||||
$item->created_at = makeDate();
|
||||
if (ag_exists($opts, EventsTable::COLUMN_CREATED_AT)) {
|
||||
$item->created_at = $opts[EventsTable::COLUMN_CREATED_AT];
|
||||
} else {
|
||||
$item->created_at = makeDate();
|
||||
}
|
||||
$item->options = [
|
||||
'class' => ag($opts, 'class', DataEvent::class),
|
||||
];
|
||||
@@ -2023,8 +1988,23 @@ if (!function_exists('queueEvent')) {
|
||||
$item->reference = $reference;
|
||||
}
|
||||
|
||||
$id = $repo->save($item);
|
||||
$item->id = $id;
|
||||
try {
|
||||
$id = $repo->save($item);
|
||||
$item->id = $id;
|
||||
} catch (PDOException $e) {
|
||||
// sometimes our sqlite db get locked due to multiple writes.
|
||||
// and the db retry logic will time out, to save the event we fall back to cache store.
|
||||
if (false === ag_exists($opts, 'cached') && false !== stripos($e->getMessage(), 'database is locked')) {
|
||||
$cache = Container::get(iCache::class);
|
||||
$events = $cache->get('events', []);
|
||||
$opts[EventsTable::COLUMN_CREATED_AT] = makeDate();
|
||||
$opts['cached'] = true;
|
||||
$events[] = ['event' => $event, 'data' => $data, 'opts' => $opts];
|
||||
$cache->set('events', $events, new DateInterval('PT1H'));
|
||||
} else {
|
||||
throw $e;
|
||||
}
|
||||
}
|
||||
|
||||
return $item;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user