Skip to content

api.server

This module house the HTTP wrapper. Since most call signature and return values are fastapi standard, they will not be included.

The names of functions will be reflective if their purpose:

  • middleware_*
  • get_*
  • exception_*

api

EndpointLoggingFilter

Custom logger filter. Do not log metrics, ready endpoint.

Source code in api/server/api.py
class EndpointLoggingFilter(logging.Filter):
    """Custom logger filter. Do not log metrics, ready endpoint."""
    def filter(self, record: logging.LogRecord) -> bool:
        message = record.getMessage()
        return all(
            message.find(do_not_log) == -1 for do_not_log in do_not_logs
        )

get_metrics()

Get prometheus metrics

Source code in api/server/api.py
@siibra_api.get("/metrics", include_in_schema=False)
def get_metrics():
    """Get prometheus metrics"""
    return prom_metrics_resp()

get_ready()

Ready probe

TODO: implement me

Source code in api/server/api.py
@siibra_api.get("/ready", include_in_schema=False)
def get_ready():
    """Ready probe

    TODO: implement me"""
    return "ready"

get_home(request)

Return the template for the siibra landing page.

Source code in api/server/api.py
@siibra_api.get("/", include_in_schema=False)
def get_home(request: Request):
    """Return the template for the siibra landing page."""
    return templates.TemplateResponse(
        "index.html", context={
            "request": request,
            "api_version": __version__,
            "git_hash": GIT_HASH,
            "versions": ["v3_0", "v2_0", "v1_0"]
        })

middleware_cache_response(request, call_next) async

Cache requests to redis, to improve response time.

Source code in api/server/api.py
@siibra_api.middleware("http")
async def middleware_cache_response(request: Request, call_next):
    """Cache requests to redis, to improve response time."""
    cache_instance = get_cache_instance()

    hashed_path = md5(f"{request.url.path}{str(request.url.query)}".encode("utf-8")).hexdigest()
    cache_key = f"[{__version__}] {hashed_path}"

    auth_set = request.headers.get("Authorization") is not None

    accept_header = request.headers.get("Accept")
    query_code_flag = accept_header == "text/x-sapi-python"

    # bypass cache set if:
    # - method is not GET
    # - if auth token is set
    # - if any part of request.url.path matches with do_not_cache_list
    # - if any keyword appears in do_no_cache_query_list
    bypass_cache_set = (
        request.method.upper() != "GET"
        or auth_set
        or query_code_flag
        or any (keyword in request.url.path for keyword in do_not_cache_list)
        or any (keyword in request.url.query for keyword in do_no_cache_query_list)
    )

    # bypass cache read if:
    # - bypass cache set
    # - x-bypass-fastapi-cache is present
    bypass_cache_read = (
        request.headers.get("x-bypass-fastapi-cache")
    ) or bypass_cache_set

    # starlette seems to normalize header to lower case
    # so .get("origin") also works if the request has "Origin: http://..."

    # N.B. do not append cors header based on origin
    # some plugins may strip origin header for privacy reasons
    # has_origin = request.headers.get("origin")
    extra_headers = {
        "access-control-allow-origin": "*",
        "access-control-expose-headers": f"{siibra_version_header}",
        siibra_version_header: __version__,
    }

    cached_value = cache_instance.get_value(cache_key) if not bypass_cache_read else None

    status_code_key = "status_code"

    if cached_value:
        loaded_value = json.loads(cached_value)
        if loaded_value.get("error"):
            status_code = loaded_value.get(status_code_key, 500)
        else:
            status_code = 200
        return Response(
            cached_value,
            status_code=status_code,
            headers={
                "content-type": "application/json",
                cache_header: "hit",
                **extra_headers,
            }
        )


    try:
        response = await call_next(request)
        status_code = 200
        response_content_type = response.headers.get("content-type")
        response_headers = response.headers
        content = await read_bytes(response.body_iterator)

        if response.status_code == 404:
            status_code = 404
            response_content_type = "application/json"
            content = json.dumps({
                "error": True,
                status_code_key: status_code,
                "message": content.decode()
            }).encode("utf-8")
            response_headers = extra_headers

        elif response.status_code >= 400:
            status_code = response.status_code
            response_content_type = None
            content = json.dumps({
                "error": True,
                status_code_key: status_code,
                "message": content.decode()
            }).encode("utf-8")
            response_headers = extra_headers

    except NotFound as e:
        status_code = 404
        response_content_type = "application/json"
        content = json.dumps({
            "error": True,
            status_code_key: status_code,
            "message": str(e)
        }).encode("utf-8")
        response_headers = extra_headers
    except Exception as e:
        status_code = 500
        response_content_type = None
        content = json.dumps({
            "error": True,
            status_code_key: status_code,
            "message": str(e)
        }).encode("utf-8")
        response_headers = extra_headers


    # conditions when do not cache
    if (not bypass_cache_set) and response_content_type == "application/json":
        cache_instance.set_value(cache_key, content)
    return Response(
        content,
        status_code=status_code,
        headers=response_headers
    )

middleware_get_python_code(request, call_next) async

If Accept header is set to text/x-sapi-python, return the python code as plain text response.

Source code in api/server/api.py
@siibra_api.middleware("http")
async def middleware_get_python_code(request: Request, call_next):
    """If Accept header is set to text/x-sapi-python, return the python code as plain text response."""
    accept_header = request.headers.get("Accept")

    if accept_header == "text/x-sapi-python":
        try:
            src = get_sourcecode(request, "python")
            return PlainTextResponse(src)
        except NotFound as e:
            return PlainTextResponse(str(e) or "Not found", 404)

    return await call_next(request)

middleware_add_version_header(request, call_next) async

Add siibra-api version as a custom header

Source code in api/server/api.py
@siibra_api.middleware("http")
async def middleware_add_version_header(request: Request, call_next):
    """Add siibra-api version as a custom header"""
    response = await call_next(request)
    response.headers[siibra_version_header] = __version__
    return response

middleware_access_log(request, call_next) async

Access log middleware

Source code in api/server/api.py
@siibra_api.middleware("http")
async def middleware_access_log(request: Request, call_next):
    """Access log middleware"""

    if request.url.path in do_not_logs:
        return await call_next(request)

    start_time = time.time()
    try:
        resp = await call_next(request)
        process_time = (time.time() - start_time) * 1000
        access_logger.info(f"{request.method.upper()} {str(request.url)}", extra={
            "resp_status": str(resp.status_code),
            "process_time_ms": str(round(process_time)),
            "hit_cache": "cache_hit" if resp.headers.get(cache_header) == "hit" else "cache_miss"
        })
        return resp

    # Reverse proxy sometimes has a dedicated timeout
    # In events where server takes too long to respond, fastapi will raise a RuntimeError with body "No response returned."
    # Log the incident, and the time of response (This should reflect the duration of request, rather than when the client closes the connection)
    except RuntimeError:
        process_time = (time.time() - start_time) * 1000
        access_logger.info(f"{request.method.upper()} {str(request.url)}", extra={
            "resp_status": "504",
            "process_time_ms": str(round(process_time)),
            "hit_cache": "cache_miss"
        })
    except Exception as e:
        general_logger.critical(e)

exception_runtime(request, exc) async

Handling RuntimeErrors. Most of the RuntimeErrors are thrown by the siibra-python library when other Services are not responding. To be more resilient and not throw a simple and unplanned HTTP 500 response, this handler will return an HTTP 503 status.

Source code in api/server/api.py
@siibra_api.exception_handler(RuntimeError)
async def exception_runtime(request: Request, exc: RuntimeError) -> JSONResponse:
    """Handling RuntimeErrors.
    Most of the RuntimeErrors are thrown by the siibra-python library when other Services are not responding.
    To be more resilient and not throw a simple and unplanned HTTP 500 response, this handler will return an HTTP 503
    status."""
    general_logger.warning(f"Error handler: exception_runtime: {str(exc)}")
    return JSONResponse(
        status_code=503,
        content={
            "detail": "This part of the siibra service is temporarily unavailable",
            "error": str(exc)
        },
    )

exception_sapi(request, exc)

Handle sapi errors

Source code in api/server/api.py
@siibra_api.exception_handler(SapiBaseException)
def exception_sapi(request: Request, exc: SapiBaseException):
    """Handle sapi errors"""
    general_logger.warning(f"Error handler: exception_sapi: {str(exc)}")
    raise HTTPException(400, str(exc))

exception_other(request, exc) async

Catch all exception handler

Source code in api/server/api.py
@siibra_api.exception_handler(Exception)
async def exception_other(request: Request, exc: Exception):
    """Catch all exception handler"""
    general_logger.warning(f"Error handler: exception_other: {str(exc)}")
    return JSONResponse(
        status_code=500,
        content={
            "detail": "Some error occurred",
            "error": str(exc)
        }
    )

shutdown()

On shutdown

Source code in api/server/api.py
@siibra_api.on_event("shutdown")
def shutdown():
    """On shutdown"""
    terminate()
    metrics_on_terminate()

startup()

On startup

Source code in api/server/api.py
@siibra_api.on_event("startup")
def startup():
    """On startup"""
    on_startup()
    metrics_on_startup()

cache

DummyCache

Dummystore to be used if no store is available.

Source code in api/server/cache/__init__.py
3
4
5
6
7
8
class DummyCache:
    """Dummystore to be used if no store is available."""
    def get_value(self, *args):
        return None
    def set_value(self, *args):
        return None

get_instance()

Get the store singleton

Source code in api/server/cache/__init__.py
def get_instance():
    """Get the store singleton"""
    try:
        redis_cache = CacheGzipRedis()
        if redis_cache is None or not redis_cache.is_connected:
            raise Exception(f"Nonetype redis cache")
        return redis_cache
    except Exception as e:
        return DummyCache()

on_startup()

On startup call

Source code in api/server/cache/__init__.py
def on_startup():
    """On startup call"""
    redis_on_startup()

terminate()

On terminate call

Source code in api/server/cache/__init__.py
def terminate():
    """On terminate call"""
    redis_terminate()

redis

_is_ci = IS_CI module-attribute

Do not use cache if IS_CI set via config

CacheGzipRedis

GzipRedis. This store gzip then b64 encode the gzipped result.

Source code in api/server/cache/redis.py
class CacheGzipRedis:
    """GzipRedis. This store gzip then b64 encode the gzipped result."""

    _r: Redis = None
    _is_connected = False
    _timer: RepeatTimer = None

    # read only property
    @property
    def is_connected(self):
        return self._is_connected

    def get_value(self, key: str) -> str:
        """Get stored value acording to key

        Args:
            key: str

        Returns:
            stored value"""
        if _is_ci or not self.is_connected:
            return None
        bz64str = self._r.get(key)

        if bz64str is None:
            return None

        bz64str = CacheGzipRedis.getstr(bz64str)
        # if cached value 
        if bz64str[0] == "{" and bz64str[-1] == "}":
            self.set_value(key, bz64str)
            return bz64str
        try:
            return CacheGzipRedis.decode(bz64str)
        except:
            print(f"decoding key value error {key}, {bz64str}")
            return bz64str

    @staticmethod
    def getstr(val: Union[str, bytes]) -> str:
        """Convert str|bytes into str

        Args:
            val: value to be stringified

        Returns:
            string in utf-8 encoding

        Raises:
            Exception: neither str or bytes are provided"""
        if type(val) == str:
            return val
        if type(val) == bytes:
            return val.decode("utf-8")

        raise Exception(f"type {val.__class__.__name__} cannot be serialized")

    @staticmethod
    def getbytes(val: Union[str, bytes]) -> bytes:
        """Convert str|bytes into bytes

        Args:
            val: value to be stringified

        Returns:
            bytes

        Raises:
            Exception: neither str or bytes are provided"""
        if type(val) == bytes:
            return val
        if type(val) == str:
            return bytes(val, "utf-8")

        raise Exception(f"type {val.__class__.__name__} cannot be serialized")

    @staticmethod
    def decode(val: Union[str, bytes]) -> str:
        """decode gzipped b64 encoded string

        Args:
            val: value to be decoded

        Returns:
            decoded value"""
        bz64 = CacheGzipRedis.getbytes(val)
        bz = base64.b64decode(bz64)
        b = gzip.decompress(bz)
        return b.decode("utf-8")

    @staticmethod
    def encode(val: Union[str, bytes]) -> str:
        """encode value into gzipped b64

        Args:
            val: value to be encoded

        Returns:
            string representing gzipped, b64 of the original string
        """
        b = CacheGzipRedis.getbytes(val)
        bz = gzip.compress(b, compresslevel=9)
        bz64 = base64.b64encode(bz)
        bz64str = bz64.decode("utf-8")
        return bz64str

    def set_value(self, key, value):
        if _is_ci or not self.is_connected:
            return None

        compressed_value = CacheGzipRedis.encode(value)
        return self._r.set(key, compressed_value)
get_value(key)

Get stored value acording to key

Parameters:

Name Type Description Default
key str

str

required

Returns:

Type Description
str

stored value

Source code in api/server/cache/redis.py
def get_value(self, key: str) -> str:
    """Get stored value acording to key

    Args:
        key: str

    Returns:
        stored value"""
    if _is_ci or not self.is_connected:
        return None
    bz64str = self._r.get(key)

    if bz64str is None:
        return None

    bz64str = CacheGzipRedis.getstr(bz64str)
    # if cached value 
    if bz64str[0] == "{" and bz64str[-1] == "}":
        self.set_value(key, bz64str)
        return bz64str
    try:
        return CacheGzipRedis.decode(bz64str)
    except:
        print(f"decoding key value error {key}, {bz64str}")
        return bz64str
getstr(val) staticmethod

Convert str|bytes into str

Parameters:

Name Type Description Default
val Union[str, bytes]

value to be stringified

required

Returns:

Type Description
str

string in utf-8 encoding

Raises:

Type Description
Exception

neither str or bytes are provided

Source code in api/server/cache/redis.py
@staticmethod
def getstr(val: Union[str, bytes]) -> str:
    """Convert str|bytes into str

    Args:
        val: value to be stringified

    Returns:
        string in utf-8 encoding

    Raises:
        Exception: neither str or bytes are provided"""
    if type(val) == str:
        return val
    if type(val) == bytes:
        return val.decode("utf-8")

    raise Exception(f"type {val.__class__.__name__} cannot be serialized")
getbytes(val) staticmethod

Convert str|bytes into bytes

Parameters:

Name Type Description Default
val Union[str, bytes]

value to be stringified

required

Returns:

Type Description
bytes

bytes

Raises:

Type Description
Exception

neither str or bytes are provided

Source code in api/server/cache/redis.py
@staticmethod
def getbytes(val: Union[str, bytes]) -> bytes:
    """Convert str|bytes into bytes

    Args:
        val: value to be stringified

    Returns:
        bytes

    Raises:
        Exception: neither str or bytes are provided"""
    if type(val) == bytes:
        return val
    if type(val) == str:
        return bytes(val, "utf-8")

    raise Exception(f"type {val.__class__.__name__} cannot be serialized")
decode(val) staticmethod

decode gzipped b64 encoded string

Parameters:

Name Type Description Default
val Union[str, bytes]

value to be decoded

required

Returns:

Type Description
str

decoded value

Source code in api/server/cache/redis.py
@staticmethod
def decode(val: Union[str, bytes]) -> str:
    """decode gzipped b64 encoded string

    Args:
        val: value to be decoded

    Returns:
        decoded value"""
    bz64 = CacheGzipRedis.getbytes(val)
    bz = base64.b64decode(bz64)
    b = gzip.decompress(bz)
    return b.decode("utf-8")
encode(val) staticmethod

encode value into gzipped b64

Parameters:

Name Type Description Default
val Union[str, bytes]

value to be encoded

required

Returns:

Type Description
str

string representing gzipped, b64 of the original string

Source code in api/server/cache/redis.py
@staticmethod
def encode(val: Union[str, bytes]) -> str:
    """encode value into gzipped b64

    Args:
        val: value to be encoded

    Returns:
        string representing gzipped, b64 of the original string
    """
    b = CacheGzipRedis.getbytes(val)
    bz = gzip.compress(b, compresslevel=9)
    bz64 = base64.b64encode(bz)
    bz64str = bz64.decode("utf-8")
    return bz64str

on_startup()

On startup call

Source code in api/server/cache/redis.py
def on_startup():
    """On startup call"""
    CacheGzipRedis._r = Redis(host=_host, port=_port, password=_password)
    def _heartbeat():
        try:
            CacheGzipRedis._r.ping()
            CacheGzipRedis._is_connected = True
        except Exception:
            CacheGzipRedis._is_connected = False

    CacheGzipRedis._timer = RepeatTimer(5, _heartbeat)
    CacheGzipRedis._timer.start()

terminate()

On terminate call

Source code in api/server/cache/redis.py
def terminate():
    """On terminate call"""
    if CacheGzipRedis._timer is not None:
        CacheGzipRedis._timer.cancel()

code_snippet

lookup_handler_fn(arm, scope)

Lookup (recursively if necessary) to find the Route responsible for a given scope.

Source code in api/server/code_snippet.py
def lookup_handler_fn(arm: Union[FastAPI, Mount, Route], scope: Scope):
    """Lookup (recursively if necessary) to find the Route responsible for a given scope."""
    if isinstance(arm, (FastAPI, Mount)):
        for route in arm.routes:
            match, child_scope = route.matches(scope)
            if match == Match.FULL:
                child_match = lookup_handler_fn(route, { **scope, **child_scope })
                if child_match:
                    return child_match
    if isinstance(arm, Route):
        match, child_scope = arm.matches(scope)
        if match == Match.FULL:
            return arm, child_scope
    return None

get_sourcecode(request, lang='python')

Process a request, and transform it into the corresponding Python snippet

Parameters:

Name Type Description Default
request Request

Request

required

Returns:

Type Description
str

python snippet

Source code in api/server/code_snippet.py
def get_sourcecode(request: Request, lang: str="python") -> str:
    """Process a request, and transform it into the corresponding Python snippet

    Args:
        request: Request

    Returns:
        python snippet

    Raises:
        NotFound
    """

    returned_val = lookup_handler_fn(request.app, request.scope)
    if not returned_val:
        raise NotFound("handler_fn lookup failed!")

    arm, scope = returned_val
    for codeblock_lang, codeblock_code in yield_codeblock(scope['endpoint'].__doc__):
        if codeblock_lang == lang:

            prefix = "\n".join([f"{key} = {json.dumps(value)}"
                                for key, value in ({
                                    **scope.get("path_params", {}),
                                    **dict(request.query_params),
                                }).items()
                                if key not in IGNORE_QUERY_KEYS])
            return f"{prefix}\n\n{codeblock_code}"
    raise NotFound

compounds

download

router = APIRouter(route_class=SapiCustomRoute, tags=['download']) module-attribute

HTTP download bundle router

cleanup(filepath)

On downloaded callback

Parameters:

Name Type Description Default
filepath Path

Path to cleanup

required
Source code in api/server/compounds/download.py
def cleanup(filepath: Path):
    """On downloaded callback

    Args:
        filepath: Path to cleanup"""
    filepath.unlink()

get_download_bundle(space_id, parcellation_id, region_id=None, feature_id=None, *, background, func)

Prepare the bundle. Given a specification, prepare/bundle according to the specification.

Source code in api/server/compounds/download.py
@router.get("")
@version(*FASTAPI_VERSION)
@router_decorator(ROLE, func=download_all, queue_as_async=(ROLE=="server"))
def get_download_bundle(space_id: str, parcellation_id: str, region_id: str=None, feature_id: str=None, *, background: BackgroundTasks, func):
    """Prepare the bundle. Given a specification, prepare/bundle according to the specification."""
    returnval = func(space_id=space_id, parcellation_id=parcellation_id, region_id=region_id, feature_id=feature_id)
    try:
        path_to_file = Path(returnval)
    except Exception as e:
        raise HTTPException(500, detail=str(e))
    else:
        # TODO returning different shape is kind of ugly
        # fix in next version
        if path_to_file.exists() and path_to_file.is_file():

            background.add_task(cleanup, path_to_file)

            headers={
                "content-type": "application/octet-stream",
                "content-disposition": f'attachment; filename="{path_to_file.name}"'
            }
            return FileResponse(returnval, headers=headers)
        return TaskIdResp(task_id=returnval)

get_download_progress(task_id)

Get download task progress with task_id

Source code in api/server/compounds/download.py
@router.get("/{task_id:str}")
@version(*FASTAPI_VERSION)
def get_download_progress(task_id:str):
    """Get download task progress with task_id"""
    res = download_all.AsyncResult(task_id)
    if res.state == "FAILURE":
        result = res.get()
        res.forget()
        raise HTTPException(500, detail=str(result))
    if res.state == "SUCCESS":
        return TaskIdResp(task_id=task_id, status="SUCCESS")
    # no result yet
    return TaskIdResp(task_id=task_id, status="PENDING")

get_download_result(task_id, background)

Download the bundle

Source code in api/server/compounds/download.py
@router.get("/{task_id:str}/download")
@version(*FASTAPI_VERSION)
def get_download_result(task_id:str, background: BackgroundTasks):
    """Download the bundle"""
    res = download_all.AsyncResult(task_id)

    if res.state == "FAILURE":
        result = res.get()
        res.forget()
        raise HTTPException(500, detail=str(result))
    if res.state == "SUCCESS":
        result = res.get()
        res.forget()

        path_to_file = Path(result)
        if path_to_file.exists() and path_to_file.is_file():

            background.add_task(cleanup, path_to_file)

            headers={
                "content-type": "application/octet-stream",
                "content-disposition": f'attachment; filename="{path_to_file.name}"'
            }
            return FileResponse(result, headers=headers)
        else:
            raise HTTPException(500, detail=f"file {path_to_file} not found!")

    # no result yet
    raise HTTPException(404,detail=f"Not found {task_id}")

const

FASTAPI_VERSION = (3, 0) module-attribute

siibra-api version

core

atlas

TAGS = ['atlas'] module-attribute

HTTP atlas routes tags

router = APIRouter(route_class=SapiCustomRoute) module-attribute

HTTP atlas routes router

get_all_atlases(*, func)

HTTP get all atlases

import siibra
atlases = [atlas for atlas in siibra.atlases]
Source code in api/server/core/atlas.py
@router.get("", tags=TAGS, response_model=Page[SiibraAtlasModel])
@version(*FASTAPI_VERSION)
@router_decorator(ROLE, func=all_atlases)
def get_all_atlases(*, func):
    """HTTP get all atlases

    ```python
    import siibra
    atlases = [atlas for atlas in siibra.atlases]
    ```"""
    if func is None:
        raise HTTPException(500, "func: None passed")
    return paginate(func())

get_single_atlas(atlas_id, *, func)

HTTP get a single atlas

import siibra
atlas = siibra.atlases[atlas_id]
Source code in api/server/core/atlas.py
@router.get("/{atlas_id:lazy_path}", tags=TAGS, response_model=SiibraAtlasModel)
@version(*FASTAPI_VERSION)
@router_decorator(ROLE, func=single_atlas)
def get_single_atlas(atlas_id: str, *, func):
    """HTTP get a single atlas

    ```python
    import siibra
    atlas = siibra.atlases[atlas_id]
    ```"""
    if func is None:
        raise HTTPException(500, "func: None passed")
    return func(atlas_id)

parcellation

TAGS = ['parcellation'] module-attribute

HTTP parcellation routes tags

router = APIRouter(route_class=SapiCustomRoute, tags=TAGS) module-attribute

HTTP parcellation routes router

get_all_parcellations(func)

HTTP get all parcellations

import siibra
parcellations = [parcellation for parcellation in siibra.parcellations]
Source code in api/server/core/parcellation.py
@router.get("", response_model=Page[SiibraParcellationModel])
@version(*FASTAPI_VERSION)
@router_decorator(ROLE, func=all_parcellations)
def get_all_parcellations(func):
    """HTTP get all parcellations

    ```python
    import siibra
    parcellations = [parcellation for parcellation in siibra.parcellations]
    ```"""
    if func is None:
        raise HTTPException(500, f"func: None passed")
    return paginate(func())

get_single_parcellation(parcellation_id, *, func)

HTTP get a single parcellation

import siibra
parcellation = siibra.parcellations[parcellation_id]
Source code in api/server/core/parcellation.py
@router.get("/{parcellation_id:lazy_path}", response_model=SiibraParcellationModel)
@version(*FASTAPI_VERSION)
@router_decorator(ROLE, func=single_parcellation)
def get_single_parcellation(parcellation_id: str, *, func):
    """HTTP get a single parcellation

    ```python
    import siibra
    parcellation = siibra.parcellations[parcellation_id]
    ```"""
    if func is None:
        raise HTTPException(500, f"func: None passsed")
    return func(parcellation_id)

region

TAGS = ['region'] module-attribute

HTTP region routes tags

router = APIRouter(route_class=SapiCustomRoute, tags=TAGS) module-attribute

HTTP region routes router

get_all_regions(parcellation_id, find=None, func=lambda: [])

HTTP get all regions

import siibra

regions = [region for region in siibra.parcellations[parcellation_id]]
Source code in api/server/core/region.py
@router.get("", response_model=Page[ParcellationEntityVersionModel])
@version(*FASTAPI_VERSION)
@router_decorator(ROLE, func=all_regions)
def get_all_regions(parcellation_id: str, find:str=None, func=lambda:[]):
    """HTTP get all regions

    ```python
    import siibra

    regions = [region for region in siibra.parcellations[parcellation_id]]
    ```"""
    return paginate(func(parcellation_id, find=find))

get_all_features_region(parcellation_id, region_id, func=lambda: [])

HTTP get all features of a single region

import siibra

region = siibra.get_region(parcellation_id, region_id)
features = siibra.features.get(region, siibra.features.Feature)
Source code in api/server/core/region.py
@router.get("/{region_id:lazy_path}/features", response_model=Page[FeatureIdResponseModel])
@version(*FASTAPI_VERSION)
@router_decorator(ROLE, func=partial(get_all_all_features, space=None))
def get_all_features_region(parcellation_id: str, region_id: str, func=lambda:[]):
    """HTTP get all features of a single region

    ```python
    import siibra

    region = siibra.get_region(parcellation_id, region_id)
    features = siibra.features.get(region, siibra.features.Feature)
    ```"""
    return paginate(
        func(parcellation_id=parcellation_id, region_id=region_id)
    )

HTTP get_related_regions of the specified region

import siibra
region = siibra.get_region(parcellation_id, region_id)
related_regions = [related for related in region.get_related_regions()]
Source code in api/server/core/region.py
@router.get("/{region_id:lazy_path}/related", response_model=Page[RegionRelationAsmtModel])
@version(*FASTAPI_VERSION)
@router_decorator(ROLE, func=get_related_regions)
def get_related_region(parcellation_id: str, region_id: str, func=lambda:[]):
    """HTTP get_related_regions of the specified region

    ```python
    import siibra
    region = siibra.get_region(parcellation_id, region_id)
    related_regions = [related for related in region.get_related_regions()]
    ```"""

    return paginate(
        func(parcellation_id=parcellation_id, region_id=region_id)
    )

get_single_regions(parcellation_id, region_id, space_id=None, func=lambda: None)

HTTP get a single region

import siibra
region = siibra.get_region(parcellation_id, region_id)
Source code in api/server/core/region.py
@router.get("/{region_id:lazy_path}", response_model=ParcellationEntityVersionModel)
@version(*FASTAPI_VERSION)
@router_decorator(ROLE, func=single_region)
def get_single_regions(parcellation_id: str, region_id: str, space_id: Optional[str]=None, func=lambda:None):
    """HTTP get a single region

    ```python
    import siibra
    region = siibra.get_region(parcellation_id, region_id)
    ```"""
    return func(parcellation_id, region_id, space_id)

space

TAGS = ['space'] module-attribute

HTTP space routes tags

router = APIRouter(route_class=SapiCustomRoute, tags=TAGS) module-attribute

HTTP space routes router

get_all_spaces(*, func)

HTTP get all spaces

import siibra
spaces = [space for space in siibra.spaces]
Source code in api/server/core/space.py
@router.get("", response_model=Page[CommonCoordinateSpaceModel])
@version(*FASTAPI_VERSION)
@router_decorator(ROLE, func=all_spaces)
def get_all_spaces(*, func):
    """HTTP get all spaces

    ```python
    import siibra
    spaces = [space for space in siibra.spaces]
    ```"""
    if func is None:
        raise HTTPException(500, f"func: None passed")
    return paginate(func())

get_single_space(space_id, *, func)

HTTP get a single space

import siibra
space = siibra.spaces[space_id]
Source code in api/server/core/space.py
@router.get("/{space_id:lazy_path}", response_model=CommonCoordinateSpaceModel)
@version(*FASTAPI_VERSION)
@router_decorator(ROLE, func=single_space)
def get_single_space(space_id: str, *, func):
    """HTTP get a single space

    ```python
    import siibra
    space = siibra.spaces[space_id]
    ```"""
    if func is None:
        raise HTTPException(500, f"func: None passsed")
    return func(space_id)

features

TAGS = ['feature'] module-attribute

HTTP feature tags

router = APIRouter(route_class=SapiCustomRoute, tags=TAGS) module-attribute

HTTP feature router

FeatureMetaModel

Meta feature type

Source code in api/server/features/__init__.py
class FeatureMetaModel(BaseModel):
    """Meta feature type"""
    name: str
    display_name: str
    path: Optional[str]
    query_params: Optional[List[str]]
    required_query_params: Optional[List[str]]
    optional_query_params: Optional[List[str]]
    path_params: Optional[List[str]]
    category: Optional[str]

CategoryModel

Category model

Source code in api/server/features/__init__.py
class CategoryModel(BaseModel):
    """Category model"""
    name: str
    feature: List[FeatureMetaModel]

depascal(input)

Pascal to snake

Parameters:

Name Type Description Default
input str

string in PascalCase

required

Returns:

Type Description
str

string in snake_case

Source code in api/server/features/__init__.py
def depascal(input: str) -> str:
    """Pascal to snake

    Args:
        input: string in PascalCase

    Returns:
        string in snake_case"""
    if not input:
        return None
    return re.sub(
        r'([A-Z][A-Z]+)',
        r' \1',
        re.sub(
            r'([A-Z][a-z]+)',
            r' \1',
            input
        )
    ).strip()

retrieve_routes(request, feature_name)

Retrieve the meta info on feature types, given feature_name

Parameters:

Name Type Description Default
feature_name str

feature identifier (e.g. name)

required

Returns:

Type Description
Dict

Meta info

Source code in api/server/features/__init__.py
def retrieve_routes(request: Request, feature_name: str) -> Dict:
    """Retrieve the meta info on feature types, given feature_name

    Args:
        feature_name: feature identifier (e.g. name)

    Returns:
        Meta info"""
    for f_name in feature_name.split(".")[::-1]:
        for route in request.app.routes:
            if route.path.endswith(f"/{f_name}"):
                return {
                    'path': route.path,
                    'query_params': [param.name for param in route.dependant.query_params],
                    'path_params': [param.name for param in route.dependant.path_params],
                    'required_query_params': [
                        param.name
                        for param in route.dependant.query_params
                        if param.required
                    ],
                    'optional_query_params': [
                        param.name
                        for param in route.dependant.query_params
                        if not param.required
                    ],
                }
    return {}

get_all_feature_types(request, func)

Get meta info of all feature types

Source code in api/server/features/__init__.py
@router.get("/_types", response_model=Page[FeatureMetaModel])
@version(*FASTAPI_VERSION)
@router_decorator(ROLE, func=all_feature_types)
def get_all_feature_types(request: Request, func):
    """Get meta info of all feature types"""
    all_types = func()
    all_types = sorted(all_types, key=lambda obj: obj['name'])
    all_types = [{
        **retrieve_routes(request, item['name']),
        **item,
        'display_name': depascal(item['name'].split(".")[-1]),
    } for item in all_types]
    return paginate(all_types)

get_all_connectivity_features(parcellation_id, region_id=None, type=None, func=lambda: []) async

Get all connectivity features

Source code in api/server/features/__init__.py
@router.get("/RegionalConnectivity", response_model=Page[RegionalConnectivityModels])
@version(*FASTAPI_VERSION)
@wrap_feature_category("RegionalConnectivity")
@async_router_decorator(ROLE, func=partial(all_features, space_id=None))
async def get_all_connectivity_features(parcellation_id: str, region_id: str=None, type: Optional[str]=None, func=lambda:[]):
    """Get all connectivity features"""
    type = str(type) if type else None
    return paginate(
        await func(parcellation_id=parcellation_id, region_id=region_id, type=type)
    )

get_single_connectivity_feature(parcellation_id, feature_id, region_id=None, type=None, func=lambda: None) async

Get single connectivity feature

import siibra

feature = siibra.features.Feature._get_instance_by_id(feature_id)
Source code in api/server/features/__init__.py
@router.get("/RegionalConnectivity/{feature_id:lazy_path}", response_model=RegionalConnectivityModels)
@version(*FASTAPI_VERSION)
@wrap_feature_category("RegionalConnectivity")
@async_router_decorator(ROLE, func=partial(single_feature, space_id=None))
async def get_single_connectivity_feature(parcellation_id: str, feature_id: str, region_id:str=None, type: Optional[str]=None, func=lambda:None):
    """Get single connectivity feature

    ```python
    import siibra

    feature = siibra.features.Feature._get_instance_by_id(feature_id)
    ```"""
    type = str(type) if type else None
    return await func(parcellation_id=parcellation_id, feature_id=feature_id, region_id=region_id, type=type)

get_all_corticalprofile_features(parcellation_id, region_id, type=None, func=lambda: []) async

Get all CorticalProfile features

Source code in api/server/features/__init__.py
@router.get("/CorticalProfile", response_model=Page[CortialProfileModels])
@version(*FASTAPI_VERSION)
@wrap_feature_category("CorticalProfile")
@async_router_decorator(ROLE, func=partial(all_features, space_id=None))
async def get_all_corticalprofile_features(parcellation_id: str, region_id: str, type: Optional[str]=None, func=lambda:[]):
    """Get all CorticalProfile features"""
    type = str(type) if type else None
    return paginate(
        await func(parcellation_id=parcellation_id, region_id=region_id, type=type)
    )

get_single_corticalprofile_feature(parcellation_id, region_id, feature_id, type=None, func=lambda: None) async

Get a single CorticalProfile feature

import siibra

feature = siibra.features.Feature._get_instance_by_id(feature_id)
Source code in api/server/features/__init__.py
@router.get("/CorticalProfile/{feature_id:lazy_path}", response_model=CortialProfileModels)
@version(*FASTAPI_VERSION)
@wrap_feature_category("CorticalProfile")
@async_router_decorator(ROLE, func=partial(single_feature, space_id=None))
async def get_single_corticalprofile_feature(parcellation_id: str, region_id: str, feature_id: str, type: Optional[str]=None, func=lambda:None):
    """Get a single CorticalProfile feature

    ```python
    import siibra

    feature = siibra.features.Feature._get_instance_by_id(feature_id)
    ```"""
    type = str(type) if type else None
    return await func(parcellation_id=parcellation_id, region_id=region_id, feature_id=feature_id, type=type)

get_all_tabular(parcellation_id, region_id, type=None, func=lambda: []) async

Get all tabular features

Source code in api/server/features/__init__.py
@router.get("/Tabular", response_model=Page[TabularModels])
@version(*FASTAPI_VERSION)
@wrap_feature_category("Tabular")
@async_router_decorator(ROLE, func=partial(all_features, space_id=None))
async def get_all_tabular(parcellation_id: str, region_id: str, type: Optional[str]=None, func=lambda: []):
    """Get all tabular features"""
    type = str(type) if type else None
    return paginate(
        await func(parcellation_id=parcellation_id, region_id=region_id, type=type)
    )

get_single_tabular(parcellation_id, region_id, feature_id, type=None, func=lambda: None) async

Get a single tabular feature

import siibra

feature = siibra.features.Feature._get_instance_by_id(feature_id)
Source code in api/server/features/__init__.py
@router.get("/Tabular/{feature_id:lazy_path}", response_model=TabularModels)
@version(*FASTAPI_VERSION)
@wrap_feature_category("Tabular")
@async_router_decorator(ROLE, func=partial(single_feature, space_id=None))
async def get_single_tabular(parcellation_id: str, region_id: str, feature_id: str, type: Optional[str]=None, func=lambda: None):
    """Get a single tabular feature

    ```python
    import siibra

    feature = siibra.features.Feature._get_instance_by_id(feature_id)
    ```"""
    type = str(type) if type else None
    return await func(parcellation_id=parcellation_id, region_id=region_id, feature_id=feature_id, type=type)

get_all_voi(space_id, bbox, type=None, func=lambda: [])

Get all Image features

import siibra
import json
from siibra.locations.boundingbox import BoundingBox

boundingbox = BoundingBox(json.loads(bbox), space_id)

features = siibra.features.get(boundingbox, type)
Source code in api/server/features/__init__.py
@router.get("/Image", response_model=Page[SiibraVoiModel])
@version(*FASTAPI_VERSION)
@wrap_feature_category("Image")
@router_decorator(ROLE, func=partial(all_features, parcellation_id=None, region_id=None, restrict_space=True))
def get_all_voi(space_id: str, bbox: str, type: Optional[str]=None, func=lambda: []):
    """Get all Image features

    ```python
    import siibra
    import json
    from siibra.locations.boundingbox import BoundingBox

    boundingbox = BoundingBox(json.loads(bbox), space_id)

    features = siibra.features.get(boundingbox, type)
    ```"""
    type = str(type) if type else None
    return paginate(
        func(space_id=space_id, type=type, bbox=bbox)
    )

get_single_voi(space_id, feature_id, type=None, func=lambda: []) async

Get a single Image feature

import siibra

feature = siibra.features.Feature._get_instance_by_id(feature_id)
Source code in api/server/features/__init__.py
@router.get("/Image/{feature_id:lazy_path}", response_model=SiibraVoiModel)
@version(*FASTAPI_VERSION)
@wrap_feature_category("Image")
@async_router_decorator(ROLE, func=partial(single_feature, parcellation_id=None, region_id=None))
async def get_single_voi(space_id: str, feature_id: str, type: Optional[str]=None, func=lambda: []):
    """Get a single Image feature

    ```python
    import siibra

    feature = siibra.features.Feature._get_instance_by_id(feature_id)
    ```"""
    type = str(type) if type else None
    return await func(space_id=space_id, feature_id=feature_id, type=type)

get_all_gene(parcellation_id, region_id, gene, func=lambda: []) async

Get all GeneExpressions features

Source code in api/server/features/__init__.py
@router.get("/GeneExpressions", response_model=Page[SiibraTabularModel])
@version(*FASTAPI_VERSION)
@wrap_feature_category("GeneExpressions")
@async_router_decorator(ROLE, func=partial(all_features, space_id=None, type="GeneExpressions"))
async def get_all_gene(parcellation_id: str, region_id: str, gene: str, func=lambda: []):
    """Get all GeneExpressions features"""
    return paginate(
        await func(parcellation_id=parcellation_id, region_id=region_id, gene=gene)
    )

get_single_gene(parcellation_id, region_id, feature_id, gene, func=lambda: []) async

Get a single GeneExpressions feature

import siibra

feature = siibra.features.Feature._get_instance_by_id(feature_id)
Source code in api/server/features/__init__.py
@router.get("/GeneExpressions/{feature_id:lazy_path}", response_model=Page[SiibraTabularModel])
@version(*FASTAPI_VERSION)
@wrap_feature_category("GeneExpressions")
@async_router_decorator(ROLE, func=partial(single_feature, space_id=None, type="GeneExpressions"))
async def get_single_gene(parcellation_id: str, region_id: str, feature_id: str, gene: str, func=lambda: []):
    """Get a single GeneExpressions feature

    ```python
    import siibra

    feature = siibra.features.Feature._get_instance_by_id(feature_id)
    ```"""
    return await func(parcellation_id=parcellation_id, region_id=region_id, feature_id=feature_id, gene=gene)

get_all_ebrains_df(parcellation_id, region_id, func=lambda: []) async

Get all EbrainsDataFeatures

Source code in api/server/features/__init__.py
@router.get("/EbrainsDataFeature", response_model=Page[SiibraEbrainsDataFeatureModel])
@version(*FASTAPI_VERSION)
@wrap_feature_category("EbrainsDataFeature")
@async_router_decorator(ROLE, func=partial(all_features, space_id=None, type="EbrainsDataFeature"))
async def get_all_ebrains_df(parcellation_id: str, region_id: str, func=lambda: []):
    """Get all EbrainsDataFeatures"""
    return paginate(
        await func(parcellation_id=parcellation_id, region_id=region_id)
    )

get_single_ebrains_df(parcellation_id, region_id, feature_id, func=lambda: None) async

Get a single EbrainsDataFeature

import siibra

feature = siibra.features.Feature._get_instance_by_id(feature_id)
Source code in api/server/features/__init__.py
@router.get("/EbrainsDataFeature/{feature_id:lazy_path}", response_model=Page[SiibraEbrainsDataFeatureModel])
@version(*FASTAPI_VERSION)
@wrap_feature_category("EbrainsDataFeature")
@async_router_decorator(ROLE, func=partial(single_feature, space_id=None, type="EbrainsDataFeature"))
async def get_single_ebrains_df(parcellation_id: str, region_id: str, feature_id: str, func=lambda: None):
    """Get a single EbrainsDataFeature

    ```python
    import siibra

    feature = siibra.features.Feature._get_instance_by_id(feature_id)
    ```"""
    return await func(parcellation_id=parcellation_id, region_id=region_id, feature_id=feature_id)

get_single_feature(feature_id, request, func) async

Get a single feature, from feature_id

import siibra

feature = siibra.features.Feature._get_instance_by_id(feature_id)
Source code in api/server/features/__init__.py
@router.get("/{feature_id:lazy_path}", response_model=FeatureIdResponseModel, tags=TAGS, description="""
This endpoint allows detail of a single feature to be fetched, without the necessary context. However, the tradeoff for this endpoint is:

- the endpoint typing is the union of all possible return types
- the client needs to supply any necessary query param (e.g. subject for regional connectivity, gene for gene expression etc)
""")
@version(*FASTAPI_VERSION)
@async_router_decorator(ROLE, func=get_single_feature_from_id)
async def get_single_feature(feature_id: str, request: Request, func):
    """Get a single feature, from feature_id

    ```python
    import siibra

    feature = siibra.features.Feature._get_instance_by_id(feature_id)
    ```
    """
    if not func:
        raise HTTPException(500, detail="get_single_feature, func not passed along")
    try:
        return await func(feature_id=feature_id, **dict(request.query_params))
    except Exception as e:
        raise HTTPException(400, detail=str(e))

get_single_feature_plot(feature_id, request, func, template=PlotlyTemplate.plotly) async

Get plotly spec from feature_id

Source code in api/server/features/__init__.py
@router.get("/{feature_id:lazy_path}/plotly", description="""
Get the plotly specification of the plot.

For the appearance of the template, see [https://plotly.com/python/templates/](https://plotly.com/python/templates/)
""")
@async_router_decorator(ROLE, func=get_single_feature_plot_from_id)
async def get_single_feature_plot(feature_id: str, request: Request, func, template: PlotlyTemplate=PlotlyTemplate.plotly):
    """Get plotly spec from feature_id"""
    try:
        kwargs = {**dict(request.query_params), 'template': template.value if isinstance(template, PlotlyTemplate) else template}
        return await func(feature_id=feature_id, **kwargs)
    except NotFound as e:
        raise HTTPException(404, detail=str(e))
    except Exception as e:
        raise HTTPException(500, detail=str(e))

get_single_feature_intents(feature_id, request, func) async

Get feature intents from feature_id

Source code in api/server/features/__init__.py
@router.get("/{feature_id:lazy_path}/intents", response_model=Page[ColorizationIntent])
@async_router_decorator(ROLE, func=get_single_feature_intents_from_id)
async def get_single_feature_intents(feature_id: str, request: Request, func):
    """Get feature intents from feature_id"""
    try:
        return paginate(
            await func(feature_id=feature_id, **dict(request.query_params))
        )
    except NotFound as e:
        raise HTTPException(404, detail=str(e))
    except Exception as e:
        raise HTTPException(500, detail=str(e))

get_single_feature_download(feature_id, request, func) async

Get download zip

Source code in api/server/features/__init__.py
@router.get("/{feature_id:lazy_path}/download", description="""
Get a zip archive of the downloadables from a feature.
""")
@async_router_decorator(ROLE, func=get_single_feature_download_zip_path)
async def get_single_feature_download(feature_id: str, request: Request, func):
    """Get download zip"""
    try:
        kwargs = dict(request.query_params)
        path_to_zip = await func(feature_id=feature_id, **kwargs)
        return FileResponse(path_to_zip, filename=f"download-{feature_id}.zip")
    except NotFound as e:
        raise HTTPException(404, detail=str(e))
    except Exception as e:
        raise HTTPException(500, detail=str(e))

util

wrap_feature_category(feature_category)

Wrap feature category

Parameters:

Name Type Description Default
feature_category str

string representing the type to be passed as keyword argument

required
Source code in api/server/features/util.py
def wrap_feature_category(feature_category: str):
    """Wrap feature category

    Args:
        feature_category: string representing the type to be passed as keyword argument
    """
    def outer(fn):

        pass_type_flag = "type" in signature(fn).parameters
        """if type is not present in original fn, do not add as kwarg"""

        if iscoroutinefunction(fn):
            @wraps(fn)
            async def inner(*args, **kwargs):
                if not pass_type_flag:
                    return await fn(*args, **kwargs)

                # If type not added as kwarg, assuming wanting all feature from said category
                # hence add feature_category as type
                if "type" not in kwargs or kwargs["type"] is None:
                    kwargs["type"] = feature_category
                return await fn(*args, **kwargs)
        else:
            @wraps(fn)
            def inner(*args, **kwargs):
                if not pass_type_flag:
                    return fn(*args, **kwargs)

                # If type not added as kwarg, assuming wanting all feature from said category
                # hence add feature_category as type
                if "type" not in kwargs or kwargs["type"] is None:
                    kwargs["type"] = feature_category
                return fn(*args, **kwargs)
        return inner
    return outer

metrics

Singleton

Timer singleton

Source code in api/server/metrics.py
class Singleton:
    """Timer singleton"""
    cached_metrics=None
    cached_du: Dict[str, str] = {}
    res_mtime: float = None
    cached_res_usage: Dict[str, Tuple[float, float]] = {}

    @staticmethod
    @cron.minutely
    @has_metric_dir
    @is_server
    def parse_metrics_txt():
        def parse_cpu(text: str) -> float:
            if text.endswith("m"):
                return float(text.replace("m", ""))
            raise ValueError(f"Cannot parse cpu text {text}")

        def parse_memory(text: str) -> float:
            if text.endswith("Mi"):
                return float(text.replace("Mi", "")) * 1024 * 1024
            raise ValueError(f"Cannot parse memory text {text}")

        def parse_text(text: str):
            titles = ["NAME", "CPU", "MEMORY"]

            Singleton.cached_res_usage.clear()

            for line in text.splitlines():
                if all(t in line for t in titles):
                    continue
                podname, cpuusage, memoryusage = line.split()
                try:
                    Singleton.cached_res_usage[podname] = (
                        str(parse_cpu(cpuusage)),
                        str(parse_memory(memoryusage)),
                    )
                except Exception as e:
                    general_logger.error(f"Cannot parse line: {str(e)}")

        try:
            metrics_path = Path(MONITOR_FIRSTLVL_DIR) / "metrics.txt"
            metric_text = metrics_path.read_text()
            Singleton.res_mtime = metrics_path.lstat().st_mtime
            parse_text(metric_text)

        except FileNotFoundError as e:
            ...
        except Exception as e:
            general_logger.error(f"Reading metrics.txt error: {str(e)}")

    @staticmethod
    @cron.ten_minutely
    @has_metric_dir
    @is_server
    def first_lvl_du():

        try:
            dirs = os.listdir(MONITOR_FIRSTLVL_DIR)
        except Exception as e:
            general_logger.warn(f"Failed to listdir of {MONITOR_FIRSTLVL_DIR}: {str(e)}")
            return

        for dir in dirs:
            if dir == "lost+found":
                continue
            path_to_dir = Path(MONITOR_FIRSTLVL_DIR) / dir
            try:
                result = run(["du", "-s", str(path_to_dir)], capture_output=True, text=True)
                size_b, *_ = result.stdout.split("\t")
                Singleton.cached_du[dir] = int(size_b)
            except Exception as e:
                general_logger.warn(f"Failed to check du of {str(path_to_dir)}: {str(e)}")

    @staticmethod
    @cron.minutely
    @is_server
    def refresh_metric():
        """Refresh metrics."""
        from api.worker.app import app
        from prometheus_client import Gauge, CollectorRegistry, generate_latest

        registry = CollectorRegistry()
        common_kwargs = {
            'registry':registry,
            'namespace':NAME_SPACE,
        }

        cpu_usage = Gauge("resource_usage_cpu",
                        "CPU usage by pods",
                        labelnames=("podname",),
                        **common_kwargs)

        memory_usage = Gauge("resource_usage_memory",
                        "RAM usage by pods",
                        labelnames=("podname",),
                        **common_kwargs)

        for podname, (cpu, ram) in Singleton.cached_res_usage.items():
            cpu_usage.labels(podname=podname).set(cpu)
            memory_usage.labels(podname=podname).set(ram)

        res_timestamp = Gauge("resource_usage_timestamp",
                            "Timestamp", **common_kwargs)
        if Singleton.res_mtime:
            res_timestamp.set(Singleton.res_mtime)

        du = Gauge(f"firstlvl_folder_disk_usage",
                "Bytes used by first level folders",
                labelnames=("folder_name",),
                **common_kwargs)
        for folder_name, size_b in Singleton.cached_du.items():
            du.labels(folder_name=folder_name).set(size_b)

        num_task_in_q_gauge = Gauge(f"num_task_in_q",
                                    "Number of tasks in queue (not yet picked up by workers)",
                                    labelnames=("q_name",),
                                    **common_kwargs)
        num_worker_gauge = Gauge("num_workers",
                                "Number of workers",
                                labelnames=("version", "namespace", "queue"), **common_kwargs)
        scheduled_gauge = Gauge("scheduled_tasks","Number of scheduled tasks",  labelnames=("hostname",), **common_kwargs)
        active_gauge = Gauge("active_tasks", "Number of active tasks", labelnames=("hostname",), **common_kwargs)
        reserved_gauge = Gauge("reserved_tasks", "Number of reserved tasks", labelnames=("hostname",), **common_kwargs)
        last_pinged = Gauge("last_pinged", "Last pinged time", labelnames=[], **common_kwargs)

        # assuming we are using redis as broker
        import redis

        _r = redis.from_url(CELERY_CONFIG.broker_url)

        last_pinged.set_to_current_time()

        # number of tasks in queue
        for q in CELERY_CONFIG.task_queues.keys():
            num_task_in_q_gauge.labels(q_name=q).set(_r.llen(q))

        i = app.control.inspect()

        # number of active workers
        result = app.control.inspect().active_queues()

        tally = defaultdict(int)
        for hostname in (result or {}):
            for queue in result[hostname]:
                routing_key = queue.get("routing_key")
                *_, namespace, queue = routing_key.split(".")
                version = ".".join(_)
                tally[(version, namespace, queue)] += 1

        for ((version, namespace, queue), total) in tally.items():
            num_worker_gauge.labels(version=version,
                                    namespace=namespace,
                                    queue=queue).set(total)

        for workername, queue in (i.scheduled() or {}).items():
            scheduled_gauge.labels(hostname=workername).set(len(queue))

        for workername, queue in (i.active() or {}).items():
            active_gauge.labels(hostname=workername).set(len(queue))

        for workername, queue in (i.reserved() or {}).items():
            reserved_gauge.labels(hostname=workername).set(len(queue))

        Singleton.cached_metrics = generate_latest(registry)

refresh_metric() staticmethod

Refresh metrics.

Source code in api/server/metrics.py
@staticmethod
@cron.minutely
@is_server
def refresh_metric():
    """Refresh metrics."""
    from api.worker.app import app
    from prometheus_client import Gauge, CollectorRegistry, generate_latest

    registry = CollectorRegistry()
    common_kwargs = {
        'registry':registry,
        'namespace':NAME_SPACE,
    }

    cpu_usage = Gauge("resource_usage_cpu",
                    "CPU usage by pods",
                    labelnames=("podname",),
                    **common_kwargs)

    memory_usage = Gauge("resource_usage_memory",
                    "RAM usage by pods",
                    labelnames=("podname",),
                    **common_kwargs)

    for podname, (cpu, ram) in Singleton.cached_res_usage.items():
        cpu_usage.labels(podname=podname).set(cpu)
        memory_usage.labels(podname=podname).set(ram)

    res_timestamp = Gauge("resource_usage_timestamp",
                        "Timestamp", **common_kwargs)
    if Singleton.res_mtime:
        res_timestamp.set(Singleton.res_mtime)

    du = Gauge(f"firstlvl_folder_disk_usage",
            "Bytes used by first level folders",
            labelnames=("folder_name",),
            **common_kwargs)
    for folder_name, size_b in Singleton.cached_du.items():
        du.labels(folder_name=folder_name).set(size_b)

    num_task_in_q_gauge = Gauge(f"num_task_in_q",
                                "Number of tasks in queue (not yet picked up by workers)",
                                labelnames=("q_name",),
                                **common_kwargs)
    num_worker_gauge = Gauge("num_workers",
                            "Number of workers",
                            labelnames=("version", "namespace", "queue"), **common_kwargs)
    scheduled_gauge = Gauge("scheduled_tasks","Number of scheduled tasks",  labelnames=("hostname",), **common_kwargs)
    active_gauge = Gauge("active_tasks", "Number of active tasks", labelnames=("hostname",), **common_kwargs)
    reserved_gauge = Gauge("reserved_tasks", "Number of reserved tasks", labelnames=("hostname",), **common_kwargs)
    last_pinged = Gauge("last_pinged", "Last pinged time", labelnames=[], **common_kwargs)

    # assuming we are using redis as broker
    import redis

    _r = redis.from_url(CELERY_CONFIG.broker_url)

    last_pinged.set_to_current_time()

    # number of tasks in queue
    for q in CELERY_CONFIG.task_queues.keys():
        num_task_in_q_gauge.labels(q_name=q).set(_r.llen(q))

    i = app.control.inspect()

    # number of active workers
    result = app.control.inspect().active_queues()

    tally = defaultdict(int)
    for hostname in (result or {}):
        for queue in result[hostname]:
            routing_key = queue.get("routing_key")
            *_, namespace, queue = routing_key.split(".")
            version = ".".join(_)
            tally[(version, namespace, queue)] += 1

    for ((version, namespace, queue), total) in tally.items():
        num_worker_gauge.labels(version=version,
                                namespace=namespace,
                                queue=queue).set(total)

    for workername, queue in (i.scheduled() or {}).items():
        scheduled_gauge.labels(hostname=workername).set(len(queue))

    for workername, queue in (i.active() or {}).items():
        active_gauge.labels(hostname=workername).set(len(queue))

    for workername, queue in (i.reserved() or {}).items():
        reserved_gauge.labels(hostname=workername).set(len(queue))

    Singleton.cached_metrics = generate_latest(registry)

on_startup()

On startup

Source code in api/server/metrics.py
def on_startup():
    """On startup"""
    cron.start()

on_terminate()

On terminate

Source code in api/server/metrics.py
def on_terminate():
    """On terminate"""
    cron.stop()

prom_metrics_resp()

Return PlainTextResponse of metrics

Source code in api/server/metrics.py
def prom_metrics_resp():
    """Return PlainTextResponse of metrics"""
    if ROLE != "server":
        raise HTTPException(404, "siibra-api is not configured to be server, so metrics page is not enabled")

    from prometheus_client.metrics_core import METRIC_NAME_RE

    if not METRIC_NAME_RE.match(NAME_SPACE):
        raise HTTPException(500, detail=f"NAME_SPACE: {NAME_SPACE!r} is not a valid namespace. Please use [a-zA-Z0-9_]+ only!")

    if Singleton.cached_metrics is None:
        raise HTTPException(404, 'Not yet populated. Please wait ...')

    return PlainTextResponse(Singleton.cached_metrics, status_code=200)

util

SapiCustomRoute

SapiCustomRoute, custom route class. This is so that func param is not interpreted to be a part of swagger-api.

Source code in api/server/util.py
class SapiCustomRoute(APIRoute):
    """SapiCustomRoute, custom route class. This is so that `func` param is not interpreted to be a part of swagger-api."""
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.dependant.query_params = [ query_param
            for query_param in self.dependant.query_params
            if query_param.name != "func"
        ]

add_lazy_path()

adds lazy_path path converter for starlette route

For example:

GET /atlases/juelich%2Fiav%2Fatlas%2Fv1.0.0%2F1/parcellations/minds%2Fcore%2Fparcellationatlas%2Fv1.0.0%2F94c1125b-b87e-45e4-901c-00daee7f2579-290/regions/Area%20hOc1%20%28V1%2C%2017%2C%20CalcS%29%20right/features/siibra%2Ffeatures%2Fcells%2Fhttps%3A%2F%2Fopenminds.ebrains.eu%2Fcore%2FDatasetVersion%2Fc1438d1996d1d2c86baa05496ba28fc5

or

"/atlases/{atlas_id}/parcellations/{parc_id}/regions/{region_id}/features/{feat_id}".format(
    atlas_id="juelich%2Fiav%2Fatlas%2Fv1.0.0%2F1",
    parc_id="minds%2Fcore%2Fparcellationatlas%2Fv1.0.0%2F94c1125b-b87e-45e4-901c-00daee7f2579-290",
    region_id="Area%20hOc1%20%28V1%2C%2017%2C%20CalcS%29%20right",
    feat_id="siibra%2Ffeatures%2Fcells%2Fhttps%3A%2F%2Fopenminds.ebrains.eu%2Fcore%2FDatasetVersion%2Fc1438d1996d1d2c86baa05496ba28fc5",
)

default path converter (eager) will:

1/ deserialize URI encoded characters, resulting in:

"/atlases/{atlas_id}/parcellations/{parc_id}/regions/{region_id}/features/{feat_id}".format(
    atlas_id="juelich/iav/atlas/v1.0.0/1",
    parc_id="minds/core/parcellationatlas/v1.0.0/94c1125b-b87e-45e4-901c-00daee7f2579-290",
    region_id="Area hOc1 (V1, 17, CalcS) right",
    feat_id="siibra/features/cells/https://openminds.ebrains.eu/core/DatasetVersion/c1438d1996d1d2c86baa05496ba28fc5",
)

2/ try to eager match, resulting in errorenous parsing of the path:

"/atlases/{atlas_id}/parcellations/{parc_id}/regions/{region_id}/features/{feat_id}".format(
    atlas_id="juelich/iav/atlas/v1.0.0/1",
    parc_id="minds/core/parcellationatlas/v1.0.0/94c1125b-b87e-45e4-901c-00daee7f2579-290",
    region_id="Area hOc1 (V1, 17, CalcS) right/features/siibra",
    feat_id="cells/https://openminds.ebrains.eu/core/DatasetVersion/c1438d1996d1d2c86baa05496ba28fc5",
)

The lazy path converter is not without its (potential) issue:

For example:

"GET /atlases/foo-bar/parcellations/parc%2Ffeatures%2Ffoo/features"

or

"/atlases/{atlas_id}/parcellations/{parc_id}/features".format(
    atlas_id="foo-bar",
    parc_id="parc%2Ffeatures%2Ffoo"
)

1/ deserialization of URI encoded characters, resulting in:

"/atlases/{atlas_id}/parcellations/{parc_id}/features".format(
    atlas_id="foo-bar",
    parc_id="parc/features/foo"
)

2/ trying to lazy match, resulting in errorenous parsing of the path:

"/atlases/{atlas_id}/parcellations/{parc_id}/features/{features_id}".format(
    atlas_id="foo-bar",
    parc_id="parc",
    features_id="foo"
)

Most ideally, the starlette routing should split path first, then decode the encoded characters

Source code in api/server/util.py
def add_lazy_path():
    """adds lazy_path path converter for starlette route

    For example:

    ```
    GET /atlases/juelich%2Fiav%2Fatlas%2Fv1.0.0%2F1/parcellations/minds%2Fcore%2Fparcellationatlas%2Fv1.0.0%2F94c1125b-b87e-45e4-901c-00daee7f2579-290/regions/Area%20hOc1%20%28V1%2C%2017%2C%20CalcS%29%20right/features/siibra%2Ffeatures%2Fcells%2Fhttps%3A%2F%2Fopenminds.ebrains.eu%2Fcore%2FDatasetVersion%2Fc1438d1996d1d2c86baa05496ba28fc5
    ```

    or 

    ```python
    "/atlases/{atlas_id}/parcellations/{parc_id}/regions/{region_id}/features/{feat_id}".format(
        atlas_id="juelich%2Fiav%2Fatlas%2Fv1.0.0%2F1",
        parc_id="minds%2Fcore%2Fparcellationatlas%2Fv1.0.0%2F94c1125b-b87e-45e4-901c-00daee7f2579-290",
        region_id="Area%20hOc1%20%28V1%2C%2017%2C%20CalcS%29%20right",
        feat_id="siibra%2Ffeatures%2Fcells%2Fhttps%3A%2F%2Fopenminds.ebrains.eu%2Fcore%2FDatasetVersion%2Fc1438d1996d1d2c86baa05496ba28fc5",
    )
    ```

    default path converter (eager) will:

    1/ deserialize URI encoded characters, resulting in:

    ```python
    "/atlases/{atlas_id}/parcellations/{parc_id}/regions/{region_id}/features/{feat_id}".format(
        atlas_id="juelich/iav/atlas/v1.0.0/1",
        parc_id="minds/core/parcellationatlas/v1.0.0/94c1125b-b87e-45e4-901c-00daee7f2579-290",
        region_id="Area hOc1 (V1, 17, CalcS) right",
        feat_id="siibra/features/cells/https://openminds.ebrains.eu/core/DatasetVersion/c1438d1996d1d2c86baa05496ba28fc5",
    )
    ```

    2/ try to eager match, resulting in errorenous parsing of the path:

    ```python
    "/atlases/{atlas_id}/parcellations/{parc_id}/regions/{region_id}/features/{feat_id}".format(
        atlas_id="juelich/iav/atlas/v1.0.0/1",
        parc_id="minds/core/parcellationatlas/v1.0.0/94c1125b-b87e-45e4-901c-00daee7f2579-290",
        region_id="Area hOc1 (V1, 17, CalcS) right/features/siibra",
        feat_id="cells/https://openminds.ebrains.eu/core/DatasetVersion/c1438d1996d1d2c86baa05496ba28fc5",
    )
    ```

    The lazy path converter is not without its (potential) issue:

    For example:

    "GET /atlases/foo-bar/parcellations/parc%2Ffeatures%2Ffoo/features"

    or 

    ```python
    "/atlases/{atlas_id}/parcellations/{parc_id}/features".format(
        atlas_id="foo-bar",
        parc_id="parc%2Ffeatures%2Ffoo"
    )
    ```

    1/ deserialization of URI encoded characters, resulting in:

    ```python
    "/atlases/{atlas_id}/parcellations/{parc_id}/features".format(
        atlas_id="foo-bar",
        parc_id="parc/features/foo"
    )
    ```

    2/ trying to lazy match, resulting in errorenous parsing of the path:

    ```python
    "/atlases/{atlas_id}/parcellations/{parc_id}/features/{features_id}".format(
        atlas_id="foo-bar",
        parc_id="parc",
        features_id="foo"
    )
    ```

    Most ideally, the starlette routing should split path first, then decode the encoded characters
    """
    from starlette.convertors import PathConvertor, CONVERTOR_TYPES

    class LazyPathConverter(PathConvertor):
        regex = ".*?"

    CONVERTOR_TYPES["lazy_path"] = LazyPathConverter()

volcabularies

TAGS = ['vocabularies'] module-attribute

HTTP vocabularies tags

router = APIRouter(route_class=SapiCustomRoute, tags=TAGS) module-attribute

HTTP vocabularies router

get_genes(find=None, func=None)

HTTP get (filtered) genes

Source code in api/server/volcabularies/__init__.py
@router.get("/genes", response_model=Page[GeneModel])
@version(*FASTAPI_VERSION)
@router_decorator(ROLE, func=get_genes)
def get_genes(find:str=None, func=None):
    """HTTP get (filtered) genes"""
    if func is None:
        raise HTTPException(500, "func: None passed")
    return paginate(func(find=find))

volumes

parcellationmap

TAGS = ['maps'] module-attribute

HTTP map tags

router = APIRouter(route_class=SapiCustomRoute, tags=TAGS) module-attribute

HTTP map router

get_siibra_map(parcellation_id, space_id, map_type, *, func)

Get map according to specification

Source code in api/server/volumes/parcellationmap.py
@router.get("", response_model=MapModel)
@version(*FASTAPI_VERSION)
@router_decorator(ROLE, func=get_map)
def get_siibra_map(parcellation_id: str, space_id: str, map_type: MapType, *, func):
    """Get map according to specification"""
    if func is None:
        raise HTTPException(500, f"func: None passsed")
    return func(parcellation_id, space_id, map_type)

get_resampled_map(parcellation_id, space_id, *, func)

Get resampled map according to specification

Source code in api/server/volumes/parcellationmap.py
@router.get("/resampled_template", response_class=FileResponse, tags=TAGS, description="""
Return a resampled template volume, based on labelled parcellation map.
""")
@version(*FASTAPI_VERSION)
@router_decorator(ROLE, func=get_resampled_map)
def get_resampled_map(parcellation_id: str, space_id: str, *, func):
    """Get resampled map according to specification"""
    if func is None:
        raise HTTPException(500, f"func: None passsed")

    headers={
        "content-type": "application/octet-stream",
        "content-disposition": f'attachment; filename="labelled_map.nii.gz"'
    }

    full_filename, cache_flag = func(parcellation_id=parcellation_id, space_id=space_id)
    if cache_flag:
        headers[cache_header] = "hit"
    assert os.path.isfile(full_filename), f"file saved incorrectly"
    return FileResponse(full_filename, headers=headers)

get_parcellation_labelled_map(parcellation_id, space_id, region_id=None, *, func)

Get labelled map according to specification

Source code in api/server/volumes/parcellationmap.py
@router.get("/labelled_map.nii.gz", response_class=FileResponse, tags=TAGS, description="""
Returns a labelled map if region_id is not provided.

Returns a mask if a region_id is provided.

region_id MAY refer to ANY region on the region hierarchy, and a combined mask will be returned.
""")
@version(*FASTAPI_VERSION)
@router_decorator(ROLE, func=get_parcellation_labelled_map)
def get_parcellation_labelled_map(parcellation_id: str, space_id: str, region_id: str=None, *, func):
    """Get labelled map according to specification"""
    if func is None:
        raise HTTPException(500, f"func: None passsed")

    headers={
        "content-type": "application/octet-stream",
        "content-disposition": f'attachment; filename="labelled_map.nii.gz"'
    }

    full_filename, cache_flag = func(parcellation_id, space_id, region_id)
    if cache_flag:
        headers[cache_header] = "hit"
    assert os.path.isfile(full_filename), f"file saved incorrectly"
    return FileResponse(full_filename, headers=headers)

get_region_statistical_map(parcellation_id, space_id, region_id, *, func)

Get statistical map according to specification

Source code in api/server/volumes/parcellationmap.py
@router.get("/statistical_map.nii.gz", response_class=FileResponse, tags=TAGS, description="""
Returns a statistic map.

region_id MUST refer to leaf region on the region hierarchy.
""")
@version(*FASTAPI_VERSION)
@router_decorator(ROLE, func=get_region_statistic_map)
def get_region_statistical_map(parcellation_id: str, space_id: str, region_id: str, *, func):
    """Get statistical map according to specification"""
    if func is None:
        raise HTTPException(500, f"func: None passsed")

    headers={
        "content-type": "application/octet-stream",
        "content-disposition": f'attachment; filename="statistical_map.nii.gz"'
    }

    full_filename, cache_flag = func(parcellation_id, region_id, space_id)
    if cache_flag:
        headers[cache_header] = "hit"
    assert os.path.isfile(full_filename), f"file saved incorrectly"
    return FileResponse(full_filename, headers=headers)

get_region_statistical_map_metadata(parcellation_id, space_id, region_id, *, func)

Get metadata of statistical map according to specification

Source code in api/server/volumes/parcellationmap.py
@router.get("/statistical_map.info.json", response_model=StatisticModelInfo, tags=TAGS)
@version(*FASTAPI_VERSION)
@router_decorator(ROLE, func=get_region_statistic_map_info)
def get_region_statistical_map_metadata(parcellation_id: str, space_id: str, region_id: str, *, func):
    """Get metadata of statistical map according to specification"""
    if func is None:
        raise HTTPException(500, f"func: None passsed")

    data = func(parcellation_id, region_id, space_id)
    return StatisticModelInfo(**data)

get_assign_point(parcellation_id, space_id, point, assignment_type='statistical', sigma_mm=0.0, *, func)

Perform assignment according to specification

Source code in api/server/volumes/parcellationmap.py
@router.get("/assign", response_model=DataFrameModel, tags=[TAGS])
@version(*FASTAPI_VERSION)
@router_decorator(ROLE, func=assign_point)
def get_assign_point(parcellation_id: str, space_id: str, point: str, assignment_type: str="statistical", sigma_mm: float=0., *, func):
    """Perform assignment according to specification"""
    if func is None:
        raise HTTPException(500, f"func: None passsed")
    return func(parcellation_id, space_id, point, assignment_type, sigma_mm)