Optimisation de la mise à jour des historiques
éléments à implémenter
-
Mise en place de la file d'attente et du poll de worker -
Mise en place du système de locking global -
Migration des processus de raffraichissement des historiques actuels vers le nouveau système
Contexte
Le problème
Le traitement du rafraîchissement des historiques de transfert représente un goulot d'étranglement :
- Les historique de transferts de tous les serveurs sont rafraîchis dans la même transaction
- Le rafraîchissement de l'historique des flux ne peut se faire qu'une fois la transaction de rafraîchissement des transferts n'ait été fermée
- la mise à jour des historiques de transferts se fait dans une seule procédure (goroutine/thread/processeur) pour éviter les race conditions. Le process actuel n'est pas parallélisable.
Sur des volumes moyens (quelques transferts par minute), cela peut représenter des retards de traitements de plusieurs jours.
L'architecture laisse aussi à désirer :
- le système de rafraîchissement n'est pas déportable sur une autre machine
- la montée en charge (scalabilité est limités)
- l'évolutivité est trop limitée à cause de traitements différents qui sont trop liés (récupération des historiques des serveurs et traitement des transferts)
Tout cela a aussi des effets de bord qui ont un impact important :
- La granularité du rafraîchissement des transferts n'est pas assez fine : il suffit qu'un transfert ne puisse pas être mis à jour pour que l'historique de l'ensemble des serveurs soit écarté (même transaction)
- la liste des serveurs à mettre à jour est générée au démarrage de Waarp Manager. Si un serveur est ajouté, un redémarrage est nécessaire pour le prendre en compte
Le besoin
- Avoir un système plus performant, avec la granularité d'un transfert.
- Le traitement de chaque transfert doit être isolé dans sa propre transaction.
- Le système doit pouvoir être hautement parallélisable
- La solution doit être suffisamment générique pour être évolutive
Points d'attention :
- Race condition pour la mise-à-jour/création d'un transfert dont les informations proviennent de deux serveurs en même temps
- les performances doivent être au moins équivalentes aux performances actuelles. Idéalement, elles sont meilleures.
Solution
Mise en place d'un pool de workers pour pouvoir paralléliser les traitements (jobs).
Les workers sont des goroutines qui exécutent les traitements de manière abstraites. Ils prennent un traitement dans une queue et l'exécutent.
La queue doit être suffisamment grande pour absorber l'activité sans bloquer dans l'attente d'une place disponible. La queue n'est pas priorisée.
Le pool de worker doit avoir une taille dynamique pour suivre la charge sans toutefois dépasser un plafond au-delà duquel le gain de performance n'est plus suffisant.
Sur ce principe, le traitement d'un transfert reçu, et la mise à jour de l'historique des flux suite à sa mise à jour représentent des traitements mis en file. Le processus est le suivant :
- Un premier job récupère la liste des serveurs à rafraîchir et met en file un job de rafraîchissement par serveur à traiter ;
- Les jobs de rafraîchissements récupèrent par API REST la liste des transferts à créer/mettre à jour dans l'historique et créent un job par transfert à traiter ;
- Le job de traitement de transfert crée ou met à jour l'enregistrement dans la liste des transferts, puis crée un job de rafraîchissement de l'historique des flux ;
- les jobs de rafraîchissement de l'historique des flux crée ou met à jour le transfert concerné par le transfert initial.
Détails d'implémentation
Traitements asynchrones
Les traitements doivent être abstraits. un job est définit par l'interface :
type JobRunner interface {
RunJob(context.Context)
}
A minima, il s'agit d'une fonction à exécuter
type JobFunc func()
Un job par défaut est définit, notamment pour encapsuler JobFunc
.
La queue
La queue est peut être modélisée par un channel avec tampon :
var queue = make(chan Job, MAX_QUEUE)
Note : Une implémentation alternative peut être d'utiliser une table de la BDD pour enregistrer les jobs à traiter, et d'utiliser un curseur de streaming pour récupérer les jobs à traiter (ça ajoute une couche de persistance, mais est-ce utile ?)
Les workers
Goroutines quiprennent les Jobs dans la queue et doivent pouvoir être interrompus (arrêt du serveur :
type Worker struct {
queue chan Job
ctx context.Context // get an interruption signal
}
le service
il peut être démarré/ arrêté comme tous les autres services de Waarp Manager (serveur HTTP, le pool de BDD etc.). Au démarrage, il crée et démarre les workers et initialise la queue.
Lors de l'arrêt, un mécanisme peut être prévu pour sauvegarder la file d'attente et la recharger au démarrage suivant de Waarp Manager.
Autres considérations
Le rafraîchissement de l'historique des serveurs est actuellement programmé périodiquement. Un mécanisme doit être trouvé pour qu'un nouveau rafraîchissement ne soit pas lancé tant que le précédent n'est pas terminé.
Configuration
Doivent être ajoutés en options de configuration :
- le nombre maximal de workers
- la taille du tampon
Points encore à résoudre
Gestion des transactions
Chaque Job doit être traité dans sa propre transaction.
Qui a la responsabilité de la création de la transaction ? les Workers ? Les Jobs ?
Gestion des doublons
Avec la parallélisation apparaît le risque de créer des transferts ou des flux en doubles. Quand un transfert se produit, les informations sur le transfert parviennent en double à Waarp Manager (émetteur et récepteur). Si les deux transactions de création sont concurrentes, le transfert sera créé deux fois.
Comment et sur quels critères s'assurer que cela ne se produise pas ?
La même question se pose pour la création de nouveaux flux...
Pistes :
- gestion de mutexes en masse
-
upserts(impossible : ne règle pas le problème transactions concurrentes) - jouer sur la visibilité des transaction et le lock des lignes en bdd