Commit 11cfa3d0 authored by Jens Langhammer's avatar Jens Langhammer

p2: Rewrite replication to run async, run full sync on first configuration

parent ad4eef16
Pipeline #3865 failed with stage
in 1 minute and 34 seconds
......@@ -2,6 +2,7 @@
import copy
from logging import getLogger
from shutil import copyfileobj
from time import time
from p2.components.replication.constants import (TAG_BLOB_SOURCE_UUID,
TAG_REPLICATION_TARGET)
......@@ -14,13 +15,16 @@ LOGGER = getLogger(__name__)
class ReplicationController(ComponentController):
"""Replicate Blobs 1:1 between volumes"""
# TODO: Initial replication on config
template_name = 'components/replication/card.html'
form_class = 'p2.components.replication.forms.ReplicationForm'
@property
def target_volume(self):
"""Get Target volume"""
return Volume.objects.get(pk=self.instance.tags.get(TAG_REPLICATION_TARGET))
def _get_target_blob(self, source_blob):
target_volume = Volume.objects.get(pk=self.instance.tags.get(TAG_REPLICATION_TARGET))
target_volume = self.target_volume
# Check if there's a blob thats our source UUID as attribute
possible_targets = Blob.objects.filter(**{
'volume': target_volume,
......@@ -38,13 +42,22 @@ class ReplicationController(ComponentController):
})
return target_blob
def full_replication(self):
def full_replication(self, source_volume):
"""Full replication after component has been configured"""
start_time = time()
for blob in Blob.objects.filter(volume=source_volume):
self.metadata_update(blob)
target_blob = self.payload_update(blob)
target_blob.save()
end_time = time()
space = self.target_volume.space_used
duration = (end_time - start_time) + 1 # +1 to make sure we don't divide by 0
rate = space / duration
LOGGER.debug("Initial full replication finished, %r bytes per second", rate)
def save(self, blob):
def metadata_update(self, blob):
"""Replicate metadata save"""
LOGGER.debug('Replicating::Save %s', blob.uuid.hex)
LOGGER.debug('Replicating::UpdateMetadata %s', blob.uuid.hex)
target_blob = self._get_target_blob(blob)
target_blob.path = blob.path
for attr in ['path', 'prefix', 'attributes', 'tags']:
......@@ -52,14 +65,14 @@ class ReplicationController(ComponentController):
setattr(target_blob, attr, copy.deepcopy(getattr(blob, attr)))
# Make sure we don't erase the source uuid
target_blob.attributes[TAG_BLOB_SOURCE_UUID] = blob.uuid.hex
target_blob.save()
return target_blob
def payload_updated(self, blob):
def payload_update(self, blob):
"""Replicate payload update"""
LOGGER.debug('Replicating::UpdatePayload %s', blob.uuid.hex)
target_blob = self._get_target_blob(blob)
copyfileobj(blob, target_blob)
target_blob.save()
return target_blob
def delete(self, blob):
"""Delete remote blob"""
......
......@@ -3,7 +3,11 @@
from django.db.models.signals import post_delete, post_save
from django.dispatch import receiver
from p2.components.replication.constants import TAG_REPLICATION_OFFSET
from p2.components.replication.controller import ReplicationController
from p2.components.replication.tasks import (replicate_delete_task,
replicate_metadata_update_task,
replicate_payload_update_task)
from p2.core.models import Blob, Component
from p2.core.signals import BLOB_PAYLOAD_UPDATED, BLOB_POST_SAVE
from p2.lib.reflection import class_to_path
......@@ -14,25 +18,26 @@ from p2.lib.reflection import class_to_path
def post_save_replication(sender, blob, **kwargs):
"""Replicate saved metadata"""
replication_component = blob.volume.component(ReplicationController)
if replication_component:
replication_component.controller.save(blob)
replicate_metadata_update_task.apply_async(
(blob.pk,), int(countdown=replication_component.tags.get(TAG_REPLICATION_OFFSET, 0)))
@receiver(BLOB_PAYLOAD_UPDATED)
# pylint: disable=unused-argument
def payload_updated_replication(sender, blob, **kwargs):
"""Replicate payload to target volume"""
replication_component = blob.volume.component(ReplicationController)
if replication_component:
replication_component.controller.payload_updated(blob)
replicate_payload_update_task.apply_async(
(blob.pk,), int(countdown=replication_component.tags.get(TAG_REPLICATION_OFFSET, 0)))
@receiver(post_delete, sender=Blob)
# pylint: disable=unused-argument
def blob_post_delete(sender, instance, **kwargs):
"""Delete target blob"""
# TODO: Run as task
replication_component = instance.volume.component(ReplicationController)
if replication_component:
replication_component.controller.delete(instance)
replicate_delete_task.apply_async(
(instance.pk,), int(countdown=replication_component.tags.get(TAG_REPLICATION_OFFSET, 0)))
@receiver(post_save, sender=Component)
# pylint: disable=unused-argument
......
"""p2 replication component tasks"""
from logging import getLogger
from p2.components.replication.controller import ReplicationController
from p2.core.celery import CELERY_APP
from p2.core.models import Blob, Volume
LOGGER = getLogger(__name__)
@CELERY_APP.task
def replicate_metadata_update_task(source_blob_pk):
"""Run Replication-MetadataUpdate operation in worker thread"""
blob = Blob.objects.get(pk=source_blob_pk)
replication_component = blob.volume.component(ReplicationController)
if replication_component:
replication_component.controller.metadata_update(blob).save()
@CELERY_APP.task
def replicate_payload_update_task(source_blob_pk):
"""Run Replication-PayloadUpdate operation in worker thread"""
blob = Blob.objects.get(pk=source_blob_pk)
replication_component = blob.volume.component(ReplicationController)
if replication_component:
replication_component.controller.payload_update(blob).save()
@CELERY_APP.task
def replicate_delete_task(source_blob_pk):
"""Run Replication-Save operation in worker thread"""
blob = Blob.objects.get(pk=source_blob_pk)
replication_component = blob.volume.component(ReplicationController)
if replication_component:
replication_component.controller.delete(blob)
@CELERY_APP.task
def initial_full_replication(volume_pk):
"""Initial full replication after Component is configured"""
source_volume = Volume.objects.get(pk=volume_pk)
replication_component = source_volume.component(ReplicationController)
assert replication_component, "ReplicationController not configured"
replication_component.controller.full_replication()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment