more changes to support multi-user sync

This commit is contained in:
ArabCoders
2025-01-21 14:15:29 +03:00
parent 3caa5dce39
commit 624ebb138f
11 changed files with 349 additions and 52 deletions

View File

@@ -6,6 +6,7 @@ return [
'{path}/db/archive',
'{path}/config',
'{path}/backup',
'{path}/users',
'{tmp_dir}/logs',
'{tmp_dir}/cache',
'{tmp_dir}/profiler',

View File

@@ -43,6 +43,23 @@ final class Cache implements Countable
{
}
/**
* Clone the object with the given logger and cache adapter.
*
* @param iLogger|null $logger The logger to use. If not provided, the current logger is used.
* @param iCache|null $adapter The cache adapter to use. If not provided, the current cache adapter is used.
*
* @return Cache return new instance of Cache class.
*/
public function with(iLogger|null $logger = null, iCache|null $adapter = null): self
{
$cloned = clone $this;
$cloned->logger = $logger ?? $this->logger;
$cloned->cache = $adapter ?? $this->cache;
return $cloned;
}
/**
* Clone the object with the data retrieved from the cache based on the key.
*

View File

@@ -4,15 +4,18 @@ declare(strict_types=1);
namespace App\Commands\State;
use App\Backends\Common\Cache as BackendCache;
use App\Backends\Common\ClientInterface as iClient;
use App\Command;
use App\Libs\Attributes\Route\Cli;
use App\Libs\Config;
use App\Libs\ConfigFile;
use App\Libs\Container;
use App\Libs\Entity\StateInterface as iState;
use App\Libs\Extends\StreamLogHandler;
use App\Libs\LogSuppressor;
use App\Libs\Mappers\Import\NullMapper;
use App\Libs\Mappers\ExtendedImportInterface as iEImport;
use App\Libs\Mappers\Import\MemoryMapper;
use App\Libs\Message;
use App\Libs\Options;
use App\Libs\QueueRequests;
@@ -45,12 +48,12 @@ class SyncCommand extends Command
/**
* Class Constructor.
*
* @param NullMapper $mapper The instance of the DirectMapper class.
* @param MemoryMapper $mapper The instance of the DirectMapper class.
* @param QueueRequests $queue The instance of the QueueRequests class.
* @param iLogger $logger The instance of the iLogger class.
*/
public function __construct(
private readonly NullMapper $mapper,
private readonly MemoryMapper $mapper,
private readonly QueueRequests $queue,
private readonly iLogger $logger,
private readonly LogSuppressor $suppressor,
@@ -323,8 +326,13 @@ class SyncCommand extends Command
]);
foreach (array_reverse($users) as $user) {
$userName = ag($user, 'name', 'Unknown');
$perUserCache = perUserCacheAdapter($userName);
$perUserMapper = perUserMapper($this->mapper, $userName)
->withCache($perUserCache)
->withLogger($this->logger)->loadData();
$this->queue->reset();
$this->mapper->reset();
$list = [];
$displayName = null;
@@ -333,7 +341,9 @@ class SyncCommand extends Command
$name = ag($backend, 'client_data.backendName');
$clientData = ag($backend, 'client_data');
$clientData['name'] = $name;
$clientData['class'] = makeBackend($clientData, $name)->setLogger($this->logger);
$clientData['class'] = makeBackend($clientData, $name, [
BackendCache::class => Container::get(BackendCache::class)->with(adapter: $perUserCache)
])->setLogger($this->logger);
$list[$name] = $clientData;
$displayName = ag($backend, 'client_data.displayName', '??');
}
@@ -345,9 +355,12 @@ class SyncCommand extends Command
'started' => $start,
]);
$this->handleImport($displayName, $list);
assert($perUserMapper instanceof iEImport);
$this->handleImport($perUserMapper, $displayName, $list);
$changes = $this->mapper->computeChanges(array_keys($list));
assert($perUserMapper instanceof MemoryMapper);
/** @var MemoryMapper $changes */
$changes = $perUserMapper->computeChanges(array_keys($list));
foreach ($changes as $b => $changed) {
$count = count($changed);
@@ -394,7 +407,7 @@ class SyncCommand extends Command
return self::SUCCESS;
}
protected function handleImport(string $name, array $backends): void
protected function handleImport(iEImport $mapper, string $name, array $backends): void
{
/** @var array<array-key,ResponseInterface> $queue */
$queue = [];
@@ -402,7 +415,7 @@ class SyncCommand extends Command
foreach ($backends as $backend) {
/** @var iClient $client */
$client = ag($backend, 'class');
array_push($queue, ...$client->pull($this->mapper));
array_push($queue, ...$client->pull($mapper));
}
$start = makeDate();

View File

@@ -24,7 +24,7 @@ final class DBLayer implements LoggerAwareInterface
private int $count = 0;
private string $driver;
private string $driver = '';
private array $last = [
'sql' => '',
@@ -66,6 +66,19 @@ final class DBLayer implements LoggerAwareInterface
$this->retry = ag($this->options, 'retry', self::LOCK_RETRY);
}
/**
* Create a new instance with the given PDO object and options.
*
* @param PDO $pdo The PDO object.
* @param array|null $options The options to be passed to the new instance, or null to use the current options.
*
* @return self The new instance.
*/
public function withPDO(PDO $pdo, array|null $options = null): self
{
return new self($pdo, $options ?? $this->options);
}
/**
* Execute a SQL statement and return the number of affected rows.
* The execution will be wrapped into {@link DBLayer::wrap()} method. to handle database locks.

View File

@@ -8,7 +8,7 @@ use App\Libs\Entity\StateInterface;
use Closure;
use DateTimeInterface;
use PDOException;
use Psr\Log\LoggerInterface;
use Psr\Log\LoggerInterface as iLogger;
interface DatabaseInterface
{
@@ -16,6 +16,25 @@ interface DatabaseInterface
public const string MIGRATE_DOWN = 'down';
/**
* Create new instance.
* @param iLogger|null $logger Logger to use, if null use default.
* @param DBLayer|null $db Database layer to use, if null use default.
* @param array|null $options PDO options.
*
* @return self Return new instance.
*/
public function with(iLogger|null $logger = null, DBLayer|null $db = null, array|null $options = null): self;
/**
* Set options
*
* @param array $options PDO options
*
* @return self return new instance with options.
*/
public function withOptions(array $options): self;
/**
* Set options
*
@@ -124,11 +143,11 @@ interface DatabaseInterface
* Migrate data from old database schema to new one.
*
* @param string $version Version to migrate to.
* @param LoggerInterface|null $logger Logger to use.
* @param iLogger|null $logger Logger to use.
*
* @return mixed Return value depends on the driver.
*/
public function migrateData(string $version, LoggerInterface|null $logger = null): mixed;
public function migrateData(string $version, iLogger|null $logger = null): mixed;
/**
* Is the database up to date with migrations?
@@ -166,11 +185,11 @@ interface DatabaseInterface
/**
* Inject Logger.
*
* @param LoggerInterface $logger
* @param iLogger $logger
*
* @return $this
*/
public function setLogger(LoggerInterface $logger): self;
public function setLogger(iLogger $logger): self;
/**
* Get DBLayer instance.

View File

@@ -30,11 +30,6 @@ final class PDOAdapter implements iDB
*/
private bool $viaTransaction = false;
/**
* @var array Adapter options.
*/
private array $options = [];
/**
* @var array<array-key, PDOStatement> Prepared statements.
*/
@@ -49,10 +44,23 @@ final class PDOAdapter implements iDB
* @param iLogger $logger The logger object used for logging.
* @param DBLayer $db The PDO object used for database connections.
*/
public function __construct(private iLogger $logger, private readonly DBLayer $db)
public function __construct(private iLogger $logger, private readonly DBLayer $db, private array $options = [])
{
}
public function with(iLogger|null $logger = null, DBLayer|null $db = null, array|null $options = null): self
{
if (null === $logger && null === $db && null === $options) {
return $this;
}
return new self($logger ?? $this->logger, $db ?? $this->db, $options ?? $this->options);
}
public function withOptions(array $options): self
{
return $this->with(options: $options);
}
/**
* @inheritdoc
*/

View File

@@ -0,0 +1,45 @@
<?php
declare(strict_types=1);
namespace App\Libs\Mappers;
use App\Libs\Database\DatabaseInterface as iDB;
use Psr\Log\LoggerInterface as iLogger;
use Psr\SimpleCache\CacheInterface as iCache;
interface ExtendedImportInterface extends ImportInterface
{
/**
* Set the database connection. and return the instance
*
* @param iDB $db Database connection
* @return self Instance of the class
*/
public function withDB(iDB $db): self;
/**
* Set the cache connection. and return the instance
*
* @param iCache $cache Cache connection
* @return self Instance of the class
*/
public function withCache(iCache $cache): self;
/**
* Set the logger connection. and return the instance
*
* @param iLogger $logger Logger connection
* @return self Instance of the class
*/
public function withLogger(iLogger $logger): self;
/**
* Compute the play state for each backend.
*
* @param array $backends List of backends to check.
*
* @return array List of changes for each backend.
*/
public function computeChanges(array $backends): array;
}

View File

@@ -8,7 +8,7 @@ use App\Libs\Config;
use App\Libs\Container;
use App\Libs\Database\DatabaseInterface as iDB;
use App\Libs\Entity\StateInterface as iState;
use App\Libs\Mappers\ImportInterface as iImport;
use App\Libs\Mappers\ExtendedImportInterface as iImport;
use App\Libs\Message;
use App\Libs\Options;
use App\Listeners\ProcessProgressEvent;
@@ -80,6 +80,36 @@ final class DirectMapper implements iImport
{
}
/**
* @inheritdoc
*/
public function withDB(iDB $db): self
{
$instance = clone $this;
$instance->db = $db;
return $instance;
}
/**
* @inheritdoc
*/
public function withCache(iCache $cache): self
{
$instance = clone $this;
$instance->cache = $cache;
return $instance;
}
/**
* @inheritdoc
*/
public function withLogger(iLogger $logger): self
{
$instance = clone $this;
$instance->logger = $logger;
return $instance;
}
/**
* @inheritdoc
*/
@@ -958,6 +988,14 @@ final class DirectMapper implements iImport
return $this->changed;
}
/**
* @inheritdoc
*/
public function computeChanges(array $backends): array
{
return [];
}
/**
* Adds pointers to the entity.
*

View File

@@ -7,7 +7,7 @@ namespace App\Libs\Mappers\Import;
use App\Libs\Config;
use App\Libs\Database\DatabaseInterface as iDB;
use App\Libs\Entity\StateInterface as iState;
use App\Libs\Mappers\ImportInterface as iImport;
use App\Libs\Mappers\ExtendedImportInterface as iImport;
use App\Libs\Message;
use App\Libs\Options;
use App\Listeners\ProcessProgressEvent;
@@ -70,6 +70,36 @@ class MemoryMapper implements iImport
{
}
/**
* @inheritdoc
*/
public function withDB(iDB $db): self
{
$instance = clone $this;
$instance->db = $db;
return $instance;
}
/**
* @inheritdoc
*/
public function withCache(iCache $cache): self
{
$instance = clone $this;
$instance->cache = $cache;
return $instance;
}
/**
* @inheritdoc
*/
public function withLogger(iLogger $logger): self
{
$instance = clone $this;
$instance->logger = $logger;
return $instance;
}
/**
* @inheritdoc
*/
@@ -772,6 +802,29 @@ class MemoryMapper implements iImport
return $this->changed;
}
/**
* @inheritdoc
*/
public function computeChanges(array $backends): array
{
$changes = [];
foreach ($backends as $backend) {
$changes[$backend] = [];
}
foreach ($this->objects as $entity) {
$state = $entity->isSynced($backends);
foreach ($state as $b => $value) {
if (false === $value) {
$changes[$b][] = $entity;
}
}
}
return $changes;
}
/**
* Add pointers to the pointer storage.
*

View File

@@ -71,33 +71,6 @@ class NullMapper extends MemoryMapper implements iImport
];
}
/**
* Compute the play state for each backend.
*
* @param array $backends List of backends to check.
*
* @return array List of changes for each backend.
*/
public function computeChanges(array $backends): array
{
$changes = [];
foreach ($backends as $backend) {
$changes[$backend] = [];
}
foreach ($this->objects as $entity) {
$state = $entity->isSynced($backends);
foreach ($state as $b => $value) {
if (false === $value) {
$changes[$b][] = $entity;
}
}
}
return $changes;
}
public function __destruct()
{
// -- disabled autocommit.

View File

@@ -14,6 +14,7 @@ use App\Libs\Attributes\Scanner\Item as ScannerItem;
use App\Libs\Config;
use App\Libs\ConfigFile;
use App\Libs\Container;
use App\Libs\Database\DatabaseInterface as iDB;
use App\Libs\Database\DBLayer;
use App\Libs\DataUtil;
use App\Libs\Entity\StateInterface as iState;
@@ -27,6 +28,7 @@ use App\Libs\Extends\Date;
use App\Libs\Extends\ReflectionContainer;
use App\Libs\Guid;
use App\Libs\Initializer;
use App\Libs\Mappers\ExtendedImportInterface as iEImport;
use App\Libs\Options;
use App\Libs\Response;
use App\Libs\Stream;
@@ -46,7 +48,13 @@ use Psr\Http\Message\ServerRequestInterface as iRequest;
use Psr\Http\Message\StreamInterface as iStream;
use Psr\Http\Message\UriInterface as iUri;
use Psr\Log\LoggerInterface as iLogger;
use Psr\SimpleCache\CacheInterface;
use Psr\SimpleCache\CacheInterface as iCache;
use Symfony\Component\Cache\Adapter\ArrayAdapter;
use Symfony\Component\Cache\Adapter\FilesystemAdapter;
use Symfony\Component\Cache\Adapter\NullAdapter;
use Symfony\Component\Cache\Adapter\RedisAdapter;
use Symfony\Component\Cache\Psr16Cache;
use Symfony\Component\EventDispatcher\EventDispatcher;
use Symfony\Component\Process\Process;
use Symfony\Contracts\HttpClient\Exception\TransportExceptionInterface;
@@ -648,11 +656,12 @@ if (!function_exists('makeBackend')) {
*
* @param array{name:string|null, type:string, url:string, token:string|int|null, user:string|int|null, options:array} $backend
* @param string|null $name server name.
* @param array $options
*
* @return iClient backend client instance.
* @throws InvalidArgumentException if configuration is wrong.
*/
function makeBackend(array $backend, string|null $name = null): iClient
function makeBackend(array $backend, string|null $name = null, array $options = []): iClient
{
if (null === ($backendType = ag($backend, 'type'))) {
throw new InvalidArgumentException('No backend type was set.');
@@ -676,7 +685,7 @@ if (!function_exists('makeBackend')) {
clientName: $backendType,
backendName: $name ?? ag($backend, 'name', '??'),
backendUrl: new Uri(ag($backend, 'url')),
cache: Container::get(BackendCache::class),
cache: $options[BackendCache::class] ?? Container::get(BackendCache::class),
backendId: ag($backend, 'uuid', null),
backendToken: ag($backend, 'token', null),
backendUser: ag($backend, 'user', null),
@@ -2171,3 +2180,111 @@ if (!function_exists('timeIt')) {
]);
}
}
if (!function_exists('perUserMapper')) {
/**
* User Import Mapper.
*
* @param iEImport $mapper The mapper instance.
* @param string $user The username.
*
* @return iEImport new mapper instance.
*/
function perUserMapper(iEImport $mapper, string $user): iEImport
{
$path = fixPath(r("{path}/users/{user}", ['path' => Config::get('path'), 'user' => $user]));
if (false === file_exists($path)) {
if (false === @mkdir($path, 0755, true) && false === is_dir($path)) {
throw new RuntimeException(r("Unable to create '{path}' directory.", ['path' => $path]));
}
}
$dbFile = fixPath(r("{path}/{user}.db", ['path' => $path, 'user' => $user]));
$inTestMode = true === (defined('IN_TEST_MODE') && true === IN_TEST_MODE);
$dsn = r('sqlite:{src}', ['src' => $inTestMode ? ':memory:' : $dbFile]);
if (false === $inTestMode) {
$changePerm = !file_exists($dbFile);
}
$pdo = new PDO(dsn: $dsn, options: Config::get('database.options', []));
if (!$inTestMode && $changePerm && inContainer() && 777 !== (int)(decoct(fileperms($dbFile) & 0777))) {
@chmod($dbFile, 0777);
}
foreach (Config::get('database.exec', []) as $cmd) {
$pdo->exec($cmd);
}
$db = Container::get(iDB::class)->with(db: Container::get(DBLayer::class)->withPDO($pdo));
if (!$db->isMigrated()) {
$db->migrations(iDB::MIGRATE_UP);
$db->ensureIndex();
$db->migrateData(Config::get('database.version'), Container::get(iLogger::class));
}
return $mapper->withDB($db);
}
}
if (!function_exists('perUserCacheAdapter')) {
function perUserCacheAdapter(string $user): CacheInterface
{
if (true === (bool)env('WS_CACHE_NULL', false)) {
return new Psr16Cache(new NullAdapter());
}
if (true === (defined('IN_TEST_MODE') && true === IN_TEST_MODE)) {
return new Psr16Cache(new ArrayAdapter());
}
$ns = getAppVersion();
if (true === isValidName($user)) {
$ns .= isValidName($user) ? '.' . $user : '.' . md5($user);
}
try {
$cacheUrl = Config::get('cache.url');
if (empty($cacheUrl)) {
throw new RuntimeException('No cache server was set.');
}
if (!extension_loaded('redis')) {
throw new RuntimeException('Redis extension is not loaded.');
}
$uri = new Uri($cacheUrl);
$params = [];
if (!empty($uri->getQuery())) {
parse_str($uri->getQuery(), $params);
}
$redis = new Redis();
$redis->connect($uri->getHost(), $uri->getPort() ?? 6379);
if (null !== ag($params, 'password')) {
$redis->auth(ag($params, 'password'));
}
if (null !== ag($params, 'db')) {
$redis->select((int)ag($params, 'db'));
}
$backend = new RedisAdapter(redis: $redis, namespace: $ns);
} catch (Throwable) {
// -- in case of error, fallback to file system cache.
$path = fixPath(r("{path}/users/{user}/cache", ['path' => Config::get('path'), 'user' => $user]));
if (false === file_exists($path)) {
if (false === @mkdir($path, 0755, true) && false === is_dir($path)) {
throw new RuntimeException(
r("Unable to create per user cache '{path}' directory.", ['path' => $path])
);
}
}
$backend = new FilesystemAdapter(namespace: $ns, directory: $path);
}
return new Psr16Cache($backend);
}
}