Skip to content

api.common

decorators

async_router_decorator(role, *, func)

Async Router decorator

Parameters:

Name Type Description Default
role ROLE_TYPE

role of this process

required
func

function to be queued, or called, based on role

required

Raises: AssertionError: if wrapped function is not async TimeoutError: if the async process took more than 600sec to complete FaultyRoleException: if role is set to other than all or server

Source code in api/common/decorators.py
def async_router_decorator(role: ROLE_TYPE, *, func):
    """Async Router decorator

    Args:
        role: role of this process
        func: function to be queued, or called, based on role
    Raises:
        AssertionError: if wrapped function is not async
        TimeoutError: if the async process took more than 600sec to complete
        FaultyRoleException: if role is set to other than `all` or `server`
    """
    def outer(fn):
        global name_to_fns_map

        assert fn.__name__ not in name_to_fns_map, f"{fn.__name__} already in name_to_fns_map"
        name_to_fns_map[fn.__name__] = (fn, func)

        assert inspect.iscoroutinefunction(fn), f"async_router_decorator can only be used to decorate async functions"

        return_func = None
        if role == "all":
            async def async_get_direct_result(*args, **kwargs):
                return func(*args, **kwargs)
            return_func = async_get_direct_result
        if role == "server":
            async def async_get_result(*args, **kwargs):
                if isinstance(func, partial):
                    func.func.s(*func.args, **func.keywords)
                    _func = func.func
                    args=[*args, *func.args]
                    kwargs={**kwargs, **func.keywords}
                else:
                    _func=func

                async_result = _func.apply_async(args, kwargs)
                wait=0.1
                timestamp=time.time()
                while True:
                    if async_result.status == "SUCCESS":
                        return async_result.get()
                    if async_result.status == "FAILURE":
                        # on failure, getting the result will raise the exception
                        async_result.get()
                        raise Exception("unknown exception")
                    if (time.time() - timestamp) > 600:
                        async_result.revoke()
                        raise TimeoutError
                    await asyncio.sleep(wait)
                    wait = min(wait + wait, 1) # increase sleep duration, but at most 1 sec
            return_func = async_get_result

        if return_func is None:
            raise FaultyRoleException(f"router_decorator can only be used in roles: all, server, but you selected {role}")

        @wraps(fn)
        async def inner(*args, **kwargs):
            return await fn(*args, **kwargs, func=return_func)
        return inner
    return outer

data_decorator(role)

data decorator

Extensively used in api.common.data_handlers. Most of the business logic, including data fetching, serialization, etc.

Parameters:

Name Type Description Default
role ROLE_TYPE

Role of this process

required

Raises:

Type Description
ImportError

Celery not installed, but role is set to either worker or server

Source code in api/common/decorators.py
def data_decorator(role: ROLE_TYPE):
    """data decorator

    Extensively used in api.common.data_handlers. Most of the business logic, including data fetching, serialization, etc.

    Args:
        role: Role of this process

    Raises:
        ImportError: Celery not installed, but role is set to either `worker` or `server`
    """
    def outer_wrapper(fn):
        if role == "all":
            return fn
        if role == "worker" or role == "server":
            try:
                from api.worker import app

                def celery_task_wrapper(self, *args, **kwargs):
                    if role == "worker":
                        general_logger.info(f"Task Received: {fn.__name__=}, {args=}, {kwargs=}")
                    return fn(*args, **kwargs)

                return app.task(bind=True)(
                    wraps(fn)(celery_task_wrapper)
                )
            except ImportError as e:
                errmsg = f"For worker role, celery must be installed as a dep"
                general_logger.critical(errmsg)
                raise ImportError(errmsg) from e
        raise FaultyRoleException(f"role must be 'all', 'server' or 'worker', but it is {role}")
    return outer_wrapper

router_decorator(role, *, func, queue_as_async=False, **kwargs)

Sync Router Decorator

Parameters:

Name Type Description Default
role ROLE_TYPE

role of this process

required
func

function to be queued, or called, based on role

required
queue_as_async bool

if set, will return id of the task, instead of task itself. Query the id for result.

False

Raises:

Type Description
FaultyRoleException

if role is set to all and queue_as_async is set

FaultyRoleException

if role is set to other than all or server

Source code in api/common/decorators.py
def router_decorator(role: ROLE_TYPE, *, func, queue_as_async: bool=False, **kwargs):
    """Sync Router Decorator

    Args:
        role: role of this process
        func: function to be queued, or called, based on role
        queue_as_async: if set, will return id of the task, instead of task itself. Query the id for result.

    Raises:
        FaultyRoleException: if role is set to `all` and `queue_as_async` is set
        FaultyRoleException: if role is set to other than `all` or `server`
    """
    def outer(fn):

        global name_to_fns_map

        assert fn.__name__ not in name_to_fns_map, f"{fn.__name__} already in name_to_fns_map"
        name_to_fns_map[fn.__name__] = (fn, func)

        return_func = None
        if role == "all":
            if queue_as_async:
                raise FaultyRoleException(f"If role is set to all, cannot queue_as_async")
            return_func = func
        if role == "server":
            def sync_get_result(*args, **kwargs):
                if isinstance(func, partial):
                    func.func.s(*func.args, **func.keywords)
                    _func = func.func
                    args=[*args, *func.args]
                    kwargs={**kwargs, **func.keywords}
                else:
                    _func=func

                async_result = _func.apply_async(args, kwargs)
                if queue_as_async:
                    return async_result.id
                else:
                    return async_result.get()
            return_func = sync_get_result
        if return_func is None:
            raise FaultyRoleException(f"router_decorator can only be used in roles: all, server, but you selected {role}")

        @wraps(fn)
        def inner(*args, **kwargs):
            return fn(*args, **kwargs, func=return_func)
        return inner
    return outer

exceptions

AmbiguousParameters

AmbiguousParameters

Source code in api/common/exceptions.py
class AmbiguousParameters(SapiBaseException):
    """AmbiguousParameters"""

ClsNotRegisteredException

ClsNotRegisteredException

Source code in api/common/exceptions.py
class ClsNotRegisteredException(SerializationException):
    """ClsNotRegisteredException"""

ConfigException

ConfigException

Source code in api/common/exceptions.py
class ConfigException(SapiBaseException):
    """ConfigException"""

FaultyRoleException

FaultyRoleException

Source code in api/common/exceptions.py
class FaultyRoleException(ConfigException):
    """FaultyRoleException"""

InsufficientParameters

InsufficientParameters

Source code in api/common/exceptions.py
class InsufficientParameters(SapiBaseException):
    """InsufficientParameters"""

InvalidParameters

InvalidParameters

Source code in api/common/exceptions.py
class InvalidParameters(SapiBaseException):
    """InvalidParameters"""

NonStrKeyException

NonStrKeyException

Source code in api/common/exceptions.py
class NonStrKeyException(SerializationException):
    """NonStrKeyException"""

NotFound

NotFound

Source code in api/common/exceptions.py
class NotFound(SapiBaseException):
    """NotFound"""

SerializationException

SerializationException

Source code in api/common/exceptions.py
class SerializationException(SapiBaseException):
    """SerializationException"""

logger

Logging module.

if LOGGER_DIR is defined in config, will also use TimedRotatingFileHandler to write to LOGGER_DIR.

storage

get_filename(*args, ext=None)

Get a hashed filename based on positional arguments.

Will also honor SIIBRA_API_SHARED_DIR in config, if defined.

Parameters:

Name Type Description Default
args List[str]

positional arguments

()
ext str

extension

None

Returns:

Type Description
str

hashed path, in the form of {SIIBRA_API_SHARED_DIR}/{hash(*args)} + ('.{ext}' if ext else '')

Source code in api/common/storage.py
def get_filename(*args: List[str], ext:str=None) -> str:
    """Get a hashed filename based on positional arguments.

    Will also honor `SIIBRA_API_SHARED_DIR` in config, if defined.

    Args:
        args: positional arguments
        ext: extension

    Returns:
        hashed path, in the form of `{SIIBRA_API_SHARED_DIR}/{hash(*args)} + ('.{ext}' if ext else '')`
    """
    assert all(isinstance(arg, str) for arg in args), f"all args to get_filename must be str"
    return os.path.join(SIIBRA_API_SHARED_DIR, hashlib.md5("".join(args).encode("utf-8")).hexdigest() + (f".{ext.lstrip('.')}") if ext else "")

timer

Cron

Source code in api/common/timer.py
class Cron:
    def __init__(self) -> None:
        self._minutely_fns: List[Callable] = []
        self._ten_minutely_fns: List[Callable] = []

        self._timers: List[RepeatTimer] = [
            RepeatTimer(60, self._run_minutely),
            RepeatTimer(600, self._run_ten_minutely)
        ]

    def _run_minutely(self):
        for fn in self._minutely_fns:
            fn()

    def _run_ten_minutely(self):
        for fn in self._ten_minutely_fns:
            fn()

    def minutely(self, fn: Callable):
        self._minutely_fns.append(fn)
        return fn

    def ten_minutely(self, fn: Callable):
        self._ten_minutely_fns.append(fn)
        return fn

    def run_all(self):
        self._run_ten_minutely()
        self._run_minutely()

    def start(self):
        for timer in self._timers:
            timer.start()

    def stop(self):
        """On terminate"""
        for timer in self._timers:
            timer.cancel()

stop()

On terminate

Source code in api/common/timer.py
def stop(self):
    """On terminate"""
    for timer in self._timers:
        timer.cancel()

RepeatTimer

RepeatTimer

runs self.function every self.interval

Source code in api/common/timer.py
class RepeatTimer(Timer):
    """RepeatTimer

    runs `self.function` every `self.interval`
    """
    def run(self):
        while not self.finished.wait(self.interval):
            self.function(*self.args, **self.kwargs)