Skip to content

Alarms History

AlarmHistory(baze)

Class used for handling history of alarms.

Source code in echo_baze/baze_root.py
def __init__(self, baze: e_bz.Baze) -> None:
    """Base class that all subclasses should inherit from.

    Parameters
    ----------
    baze : Baze
        Top level object carrying all functionality and the connection handler.

    """
    # check inputs
    if not isinstance(baze, e_bz.Baze):
        raise ValueError(f"baze must be of type Baze, not {type(baze)}")

    self.baze: e_bz.Baze = baze

get(object_names, period, alarm_type='all', active_only=False, time_zone='local', output_type='dict', batch_size=10)

Gets the alarm history for the given objects

The most useful keys/columns returned are:

  • alarm
  • alarmDescription
  • alarmId
  • alarmRemarks
  • start
  • end
  • duration
  • code
  • type

Parameters:

  • object_names

    (list[str]) –

    Desired object names to get the alarm history from.

  • period

    (DateTimeRange) –

    Desired period to get the alarm history from. It is assumed to be in the same time zone as the one specified in time_zone.

  • alarm_type

    (Literal['all', 'alarm', 'warning', 'status'], default: 'all' ) –

    Desired alarm types to get the alarm history from.

  • active_only

    (bool, default: False ) –

    Whether to get only the active alarms or not, by default False

  • time_zone

    (TimeZone, default: 'local' ) –

    In which time zone we assume the inputs are. Also used to define in which time zone the output is. There are three options:

    • If "UTC" is used, we assume time already is in UTC.
    • If local is used, the default time zone defined in echo_baze will be used.
    • If an int, must be between -12 and +12

    By default "local"

  • output_type

    (Literal['dict', 'DataFrame'], default: 'dict' ) –

    Output type of the data. Can be one of ["dict", "DataFrame"] By default "dict"

  • batch_size

    (int, default: 10 ) –

    Number of objects to request to Bazefield API at once. If the amount of alarms per object is expected to be low, increasing this can dramatically improve the speed of the method. By default 10

Returns:

  • dict[str, list[dict[str, Any]]]

    In case output_type == "dict" it will return a dict with the following format: {object_name: [{alarm_attribute: value, ...}, ...], ...}

  • DataFrame

    In case output_type == "DataFrame" it will return a DataFrame with the following format: index = MultiIndex with levels ["object_model", "alarm", "start"], columns = [attribute, ...]

Source code in echo_baze/alarm_history.py
@validate_call
def get(
    self,
    object_names: list[str],
    period: DateTimeRange,
    alarm_type: Literal["all", "alarm", "warning", "status"] = "all",
    active_only: bool = False,
    time_zone: TimeZone = "local",
    output_type: Literal["dict", "DataFrame"] = "dict",
    batch_size: int = 10,
) -> dict[str, list[dict[str, Any]]] | DataFrame:
    """Gets the alarm history for the given objects

    The most useful keys/columns returned are:

    - alarm
    - alarmDescription
    - alarmId
    - alarmRemarks
    - start
    - end
    - duration
    - code
    - type

    Parameters
    ----------
    object_names : list[str]
        Desired object names to get the alarm history from.
    period : DateTimeRange
        Desired period to get the alarm history from. It is assumed to be in the same time zone as the one specified in time_zone.
    alarm_type : Literal["all", "alarm", "warning", "status"]
        Desired alarm types to get the alarm history from.
    active_only : bool, optional
        Whether to get only the active alarms or not, by default False
    time_zone : TimeZone, optional
        In which time zone we assume the inputs are. Also used to define in which time zone the output is. There are three options:

        - If "UTC" is used, we assume time already is in UTC.
        - If local is used, the default time zone defined in echo_baze will be used.
        - If an int, must be between -12 and +12

        By default "local"
    output_type : Literal["dict", "DataFrame"], optional
        Output type of the data. Can be one of ["dict", "DataFrame"]
        By default "dict"
    batch_size : int, optional
        Number of objects to request to Bazefield API at once. If the amount of alarms per object is expected to be low, increasing this can dramatically improve the speed of the method. By default 10

    Returns
    -------
    dict[str, list[dict[str, Any]]]
        In case output_type == "dict" it will return a dict with the following format: {object_name: [{alarm_attribute: value, ...}, ...], ...}
    DataFrame
        In case output_type == "DataFrame" it will return a DataFrame with the following format: index = MultiIndex with levels ["object_model", "alarm", "start"], columns = [attribute, ...]

    """
    t0 = time.perf_counter()

    # getting object ids
    object_ids = self.baze.objects.instances.get_ids()
    object_id_to_name = {v: k for k, v in object_ids.items()}

    if missing_object_names := set(object_names) - set(object_ids):
        logger.warning(f"Object names {missing_object_names} do not exist")

    # getting the alarm history
    endpoint = "objects/alarmhistory"

    success = False
    try_num = 0
    while not success:
        if try_num > 0:
            # reducing batch size in
            if batch_size == 1:
                raise ValueError("Batch size is already 1, cannot reduce it further")
            batch_size = max(1, batch_size // 2)
            logger.warning(f"Trying again with smaller batch size: {batch_size}")

        results = {}
        obj_batches = [object_names[i : i + batch_size] for i in range(0, len(object_names), batch_size)]

        one_failed = False
        for objects in obj_batches:
            t0 = time.perf_counter()
            payload = {
                "from": timestamp_from_datetime(dt=period.start, time_zone=time_zone, unit="milliseconds"),
                "to": timestamp_from_datetime(dt=period.end, time_zone=time_zone, unit="milliseconds"),
                "objectIds": [object_ids[obj] for obj in objects],
                "type": alarm_type,
                "filter": "",
                "filterActive": active_only,
                "filterRootCauses": False,
                "inObjectTime": False,
                "includeCount": False,
                "includeOkAlarms": True,
                "orderBy": "Start desc",
                "skip": 0,
                "take": 99999999999999999999999999,
            }

            try:
                request_result = self.baze.conn.post(endpoint, json=payload)
            except Exception as e:
                # checking if response was 500 and response.text contains "The query processor ran out of internal resources".
                # If that is the case we will start again with a smaller batch size
                if "500" in traceback.format_exc() and "The query processor ran out of internal resources" in traceback.format_exc():
                    logger.error(
                        f"Got 500 error with message 'The query processor ran out of internal resources'. Trying again with smaller batch size. The error message was\n {traceback.format_exc()}",
                    )
                    one_failed = True
                    try_num += 1
                    break

                raise e

            self._handle_http_errors(request_result)

            # converting to dict
            result: list[dict[str, Any]] = request_result.json()["alarms"]

            # converting dates to datetime
            result = [
                entry
                | {"objectKey": object_id_to_name[entry["objectId"].upper()]}
                | {"start": convert_time_zone(datetime.strptime(entry["start"][:-9], "%Y-%m-%dT%H:%M:%S"), "UTC", time_zone)}
                | (
                    {
                        "end": convert_time_zone(datetime.strptime(entry["end"][:-9], "%Y-%m-%dT%H:%M:%S"), "UTC", time_zone),
                        "duration": datetime.strptime(entry["end"][:-9], "%Y-%m-%dT%H:%M:%S")
                        - datetime.strptime(entry["start"][:-9], "%Y-%m-%dT%H:%M:%S"),
                    }
                    if "end" in entry
                    else {"duration": None}
                )
                | (
                    {"uncertainStopTime": datetime.strptime(entry["uncertainStopTime"][:-9], "%Y-%m-%dT%H:%M:%S")}
                    if "uncertainStopTime" in entry
                    else {}
                )
                for entry in result
            ]

            # converting to split by object_name
            result_by_object_name = {}
            for entry in result:
                object_name = entry.pop("objectKey")
                if object_name not in result_by_object_name:
                    result_by_object_name[object_name] = []
                result_by_object_name[object_name].append(entry)

            results |= result_by_object_name

            logger.debug(f"Got alarms data in {time.perf_counter() - t0:.3f} s for {objects}")

        if not one_failed:
            success = True
        else:
            continue

    # converting to desired output
    match output_type:
        case "dict":
            pass
        case "DataFrame":
            # converting to a list of dicts
            results = [{**entry, "object_name": object_name} for object_name, entries in results.items() for entry in entries]
            # converting to DataFrame
            results = json_normalize(results, max_level=1)

            cols = {
                "id": "int64[pyarrow]",
                "alarmId": "int64[pyarrow]",
                "siteId": "int64[pyarrow]",
                "turbineId": "int64[pyarrow]",
                "objectId": "string[pyarrow]",
                "turbineName": "string[pyarrow]",
                "turbineTitle": "string[pyarrow]",
                "alarmTemplateId": "int64[pyarrow]",
                "start": "datetime64[s]",
                "stopTime": "string[pyarrow]",
                "alarm": "string[pyarrow]",
                "alarmDescription": "string[pyarrow]",
                "code": "int64[pyarrow]",
                "brakePrg": "int64[pyarrow]",
                "isOkAlarm": "bool[pyarrow]",
                "alarmType": "int64[pyarrow]",
                "sourceId": "int64[pyarrow]",
                "sourceName": "string[pyarrow]",
                "isRootCause": "bool[pyarrow]",
                "alarmRemarks": "string[pyarrow]",
                "allocations": "object",
                "sfId": "string[pyarrow]",
                "end": "datetime64[s]",
                "uncertainStopTime": "datetime64[s]",
                "duration": "timedelta64[s]",
                "ackDuration": "string[pyarrow]",
                "type": "string[pyarrow]",
                "object_name": "string[pyarrow]",
            }
            if results.empty:
                results = DataFrame(columns=list(cols.keys())).astype(cols)

            results = results.convert_dtypes(dtype_backend="pyarrow")

            # adding any missing columns if they are not present
            for col, dtype in cols.items():
                if col not in results.columns:
                    results[col] = NA
                    results[col] = results[col].astype(dtype)

            # forcing datetime columns to be in standard datetime time for better compatibility
            results["start"] = results["start"].astype("datetime64[s]")
            results["end"] = results["end"].astype("datetime64[s]")
            results["duration"] = results["duration"].astype("timedelta64[s]")
            if "uncertainStopTime" in results.columns:
                results["uncertainStopTime"] = results["uncertainStopTime"].astype("datetime64[s]")

            results = results.set_index(["object_name", "alarm", "start"])
        case _:
            raise ValueError(f"output_type must be one of ['dict', 'DataFrame'], got '{output_type}'")

    logger.debug(f"Got alarms data in {output_type} format in {time.perf_counter() - t0:.3f} s")
    return results