Commit 731b745d authored by Jens Langhammer's avatar Jens Langhammer

core: switch role evaluating to celery worker

parent 4b047802
Pipeline #3212 failed with stage
in 2 minutes and 13 seconds
"""passbook core celery"""
import logging
import os
import celery
import pymysql
from django.conf import settings
# from raven import Client
# from raven.contrib.celery import register_logger_signal, register_signal
pymysql.install_as_MySQLdb()
# set the default Django settings module for the 'celery' program.
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "passbook.core.settings")
LOGGER = logging.getLogger(__name__)
class Celery(celery.Celery):
"""Custom Celery class with Raven configured"""
# pylint: disable=method-hidden
# def on_configure(self):
# """Update raven client"""
# try:
# client = Client(settings.RAVEN_CONFIG.get('dsn'))
# # register a custom filter to filter out duplicate logs
# register_logger_signal(client)
# # hook into the Celery error handler
# register_signal(client)
# except RecursionError: # This error happens when pdoc is running
# pass
# pylint: disable=unused-argument
@celery.signals.setup_logging.connect
def config_loggers(*args, **kwags):
"""Apply logging settings from settings.py to celery"""
logging.config.dictConfig(settings.LOGGING)
# pylint: disable=unused-argument
@celery.signals.after_task_publish.connect
def after_task_publish(sender=None, headers=None, body=None, **kwargs):
"""Log task_id after it was published"""
info = headers if 'task' in headers else body
LOGGER.debug('%-40s published (name=%s)', info.get('id'), info.get('task'))
# pylint: disable=unused-argument
@celery.signals.task_prerun.connect
def task_prerun(task_id, task, *args, **kwargs):
"""Log task_id on worker"""
LOGGER.debug('%-40s started (name=%s)', task_id, task.__name__)
# pylint: disable=unused-argument
@celery.signals.task_postrun.connect
def task_postrun(task_id, task, *args, retval=None, state=None, **kwargs):
"""Log task_id on worker"""
LOGGER.debug('%-40s finished (name=%s, state=%s)',
task_id, task.__name__, state)
CELERY_APP = Celery('passbook')
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
CELERY_APP.config_from_object(settings, namespace='CELERY')
# Load task modules from all registered Django app configs.
CELERY_APP.autodiscover_tasks()
......@@ -8,6 +8,7 @@ from django.db import models
from django.utils.translation import gettext as _
from model_utils.managers import InheritanceManager
from passbook.core.celery import CELERY_APP
from passbook.lib.models import CreatedUpdatedModel, UUIDModel
LOGGER = getLogger(__name__)
......@@ -31,6 +32,13 @@ class Provider(models.Model):
return getattr(self, 'name')
return super().__str__()
class TaskModel(CELERY_APP.Task, models.Model):
"""Django model which is also a celery task"""
class Meta:
abstract = True
class RuleModel(UUIDModel, CreatedUpdatedModel):
"""Base model which can have rules applied to it"""
......@@ -91,7 +99,7 @@ class UserSourceConnection(CreatedUpdatedModel):
unique_together = (('user', 'source'),)
@reversion.register()
class Rule(UUIDModel, CreatedUpdatedModel):
class Rule(TaskModel, UUIDModel, CreatedUpdatedModel):
"""Rules which specify if a user is authorized to use an Application. Can be overridden by
other types to add other fields, more logic, etc."""
......@@ -114,6 +122,10 @@ class Rule(UUIDModel, CreatedUpdatedModel):
return self.name
return "%s action %s" % (self.name, self.action)
def run(self, user_id: int) -> bool:
"""Celery wrapper for passes"""
return self.passes(User.objects.get(pk=user_id))
def passes(self, user: User) -> bool:
"""Check if user instance passes this rule"""
raise NotImplementedError()
......@@ -140,6 +152,7 @@ class FieldMatcherRule(Rule):
match_action = models.CharField(max_length=50, choices=MATCHES)
value = models.TextField()
name = 'passbook_core.FieldMatcherRule'
form = 'passbook.core.forms.rules.FieldMatcherRuleForm'
def __str__(self):
......@@ -175,6 +188,7 @@ class FieldMatcherRule(Rule):
verbose_name = _('Field matcher Rule')
verbose_name_plural = _('Field matcher Rules')
@reversion.register()
class WebhookRule(Rule):
"""Rule that asks webhook"""
......@@ -212,3 +226,6 @@ class WebhookRule(Rule):
verbose_name = _('Webhook Rule')
verbose_name_plural = _('Webhook Rules')
# Register tasks
for task_model in Rule.__subclasses__():
CELERY_APP.tasks.register(task_model)
......@@ -160,7 +160,16 @@ USE_L10N = True
USE_TZ = True
OAUTH2_PROVIDER_APPLICATION_MODEL = 'oauth2_provider.Application'
# Celery settings
# Add a 10 minute timeout to all Celery tasks.
CELERY_TASK_SOFT_TIME_LIMIT = 600
CELERY_TIMEZONE = TIME_ZONE
CELERY_BEAT_SCHEDULE = {}
CELERY_CREATE_MISSING_QUEUES = True
CELERY_TASK_DEFAULT_QUEUE = 'passbook'
CELERY_BROKER_URL = 'redis://%s' % CONFIG.get('redis')
CELERY_RESULT_BACKEND = 'redis://%s' % CONFIG.get('redis')
# Static files (CSS, JavaScript, Images)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment