Skip to content

api.common

decorators

async_router_decorator(role, *, func)

Async Router decorator

Parameters:

Name Type Description Default
role ROLE_TYPE

role of this process

required
func

function to be queued, or called, based on role

required

Raises:

Type Description
AssertionError

if wrapped function is not async

TimeoutError

if the async process took more than 600sec to complete

FaultyRoleException

if role is set to other than all or server

Source code in api/common/decorators.py
def async_router_decorator(role: ROLE_TYPE, *, func):
    """Async Router decorator

    Args:
        role: role of this process
        func: function to be queued, or called, based on role
    Raises:
        AssertionError: if wrapped function is not async
        TimeoutError: if the async process took more than 600sec to complete
        FaultyRoleException: if role is set to other than `all` or `server`
    """
    def outer(fn):
        global name_to_fns_map

        assert fn.__name__ not in name_to_fns_map, f"{fn.__name__} already in name_to_fns_map"
        name_to_fns_map[fn.__name__] = (fn, func)

        assert inspect.iscoroutinefunction(fn), f"async_router_decorator can only be used to decorate async functions"

        return_func = None
        if role == "all":
            async def async_get_direct_result(*args, **kwargs):
                return func(*args, **kwargs)
            return_func = async_get_direct_result
        if role == "server":
            async def async_get_result(*args, **kwargs):
                if isinstance(func, partial):
                    func.func.s(*func.args, **func.keywords)
                    _func = func.func
                    args=[*args, *func.args]
                    kwargs={**kwargs, **func.keywords}
                else:
                    _func=func

                async_result = _func.apply_async(args, kwargs)
                wait=0.1
                timestamp=time.time()
                while True:
                    if async_result.status == "SUCCESS":
                        return async_result.get()
                    if async_result.status == "FAILURE":
                        # on failure, getting the result will raise the exception
                        async_result.get()
                        raise Exception("unknown exception")
                    if (time.time() - timestamp) > 600:
                        async_result.revoke()
                        raise TimeoutError
                    await asyncio.sleep(wait)
                    wait = min(wait + wait, 1) # increase sleep duration, but at most 1 sec
            return_func = async_get_result

        if return_func is None:
            raise FaultyRoleException(f"router_decorator can only be used in roles: all, server, but you selected {role}")

        @wraps(fn)
        async def inner(*args, **kwargs):
            return await fn(*args, **kwargs, func=return_func)
        return inner
    return outer

data_decorator(role)

data decorator

Extensively used in api.common.data_handlers. Most of the business logic, including data fetching, serialization, etc.

Parameters:

Name Type Description Default
role ROLE_TYPE

Role of this process

required

Raises:

Type Description
ImportError

Celery not installed, but role is set to either worker or server

Source code in api/common/decorators.py
def data_decorator(role: ROLE_TYPE):
    """data decorator

    Extensively used in api.common.data_handlers. Most of the business logic, including data fetching, serialization, etc.

    Args:
        role: Role of this process

    Raises:
        ImportError: Celery not installed, but role is set to either `worker` or `server`
    """
    def outer_wrapper(fn):
        if role == "all":
            return fn
        if role == "worker" or role == "server":
            try:
                from api.worker import app

                def celery_task_wrapper(self, *args, **kwargs):
                    return fn(*args, **kwargs)

                return app.task(bind=True)(
                    wraps(fn)(celery_task_wrapper)
                )
            except ImportError as e:
                errmsg = f"For worker role, celery must be installed as a dep"
                logger.critical(errmsg)
                raise ImportError(errmsg) from e
        raise FaultyRoleException(f"role must be 'all', 'server' or 'worker', but it is {role}")
    return outer_wrapper

router_decorator(role, *, func, queue_as_async=False, **kwargs)

Sync Router Decorator

Parameters:

Name Type Description Default
role ROLE_TYPE

role of this process

required
func

function to be queued, or called, based on role

required
queue_as_async bool

if set, will return id of the task, instead of task itself. Query the id for result.

False

Raises:

Type Description
FaultyRoleException

if role is set to all and queue_as_async is set

FaultyRoleException

if role is set to other than all or server

Source code in api/common/decorators.py
def router_decorator(role: ROLE_TYPE, *, func, queue_as_async: bool=False, **kwargs):
    """Sync Router Decorator

    Args:
        role: role of this process
        func: function to be queued, or called, based on role
        queue_as_async: if set, will return id of the task, instead of task itself. Query the id for result.

    Raises:
        FaultyRoleException: if role is set to `all` and `queue_as_async` is set
        FaultyRoleException: if role is set to other than `all` or `server`
    """
    def outer(fn):

        global name_to_fns_map

        assert fn.__name__ not in name_to_fns_map, f"{fn.__name__} already in name_to_fns_map"
        name_to_fns_map[fn.__name__] = (fn, func)

        return_func = None
        if role == "all":
            if queue_as_async:
                raise FaultyRoleException(f"If role is set to all, cannot queue_as_async")
            return_func = func
        if role == "server":
            def sync_get_result(*args, **kwargs):
                if isinstance(func, partial):
                    func.func.s(*func.args, **func.keywords)
                    _func = func.func
                    args=[*args, *func.args]
                    kwargs={**kwargs, **func.keywords}
                else:
                    _func=func

                async_result = _func.apply_async(args, kwargs)
                if queue_as_async:
                    return async_result.id
                else:
                    return async_result.get()
            return_func = sync_get_result
        if return_func is None:
            raise FaultyRoleException(f"router_decorator can only be used in roles: all, server, but you selected {role}")

        @wraps(fn)
        def inner(*args, **kwargs):
            return fn(*args, **kwargs, func=return_func)
        return inner
    return outer

exceptions

AmbiguousParameters

AmbiguousParameters

Source code in api/common/exceptions.py
class AmbiguousParameters(SapiBaseException):
    """AmbiguousParameters"""

ClsNotRegisteredException

ClsNotRegisteredException

Source code in api/common/exceptions.py
class ClsNotRegisteredException(SerializationException):
    """ClsNotRegisteredException"""

ConfigException

ConfigException

Source code in api/common/exceptions.py
class ConfigException(SapiBaseException):
    """ConfigException"""

FaultyRoleException

FaultyRoleException

Source code in api/common/exceptions.py
class FaultyRoleException(ConfigException):
    """FaultyRoleException"""

InsufficientParameters

InsufficientParameters

Source code in api/common/exceptions.py
class InsufficientParameters(SapiBaseException):
    """InsufficientParameters"""

InvalidParameters

InvalidParameters

Source code in api/common/exceptions.py
class InvalidParameters(SapiBaseException):
    """InvalidParameters"""

NonStrKeyException

NonStrKeyException

Source code in api/common/exceptions.py
class NonStrKeyException(SerializationException):
    """NonStrKeyException"""

NotFound

NotFound

Source code in api/common/exceptions.py
class NotFound(SapiBaseException):
    """NotFound"""

SerializationException

SerializationException

Source code in api/common/exceptions.py
class SerializationException(SapiBaseException):
    """SerializationException"""

logger

Logging module.

if LOGGER_DIR is defined in config, will also use TimedRotatingFileHandler to write to LOGGER_DIR.

storage

get_filename(*args, ext=None)

Get a hashed filename based on positional arguments.

Will also honor SIIBRA_API_SHARED_DIR in config, if defined.

Parameters:

Name Type Description Default
args List[str]

positional arguments

()
ext str

extension

None

Returns:

Type Description
str

hashed path, in the form of {SIIBRA_API_SHARED_DIR}/{hash(*args)} + ('.{ext}' if ext else '')

Source code in api/common/storage.py
def get_filename(*args: List[str], ext:str=None) -> str:
    """Get a hashed filename based on positional arguments.

    Will also honor `SIIBRA_API_SHARED_DIR` in config, if defined.

    Args:
        args: positional arguments
        ext: extension

    Returns:
        hashed path, in the form of `{SIIBRA_API_SHARED_DIR}/{hash(*args)} + ('.{ext}' if ext else '')`
    """
    assert all(isinstance(arg, str) for arg in args), f"all args to get_filename must be str"
    return os.path.join(SIIBRA_API_SHARED_DIR, hashlib.md5("".join(args).encode("utf-8")).hexdigest() + (f".{ext.lstrip('.')}") if ext else "")

timer

RepeatTimer

RepeatTimer

runs self.function every self.interval

Source code in api/common/timer.py
class RepeatTimer(Timer):
    """RepeatTimer

    runs `self.function` every `self.interval`
    """
    def run(self):
        while not self.finished.wait(self.interval):
            self.function(*self.args, **self.kwargs)