Alarms History¶
AlarmHistory(baze)
¶
Class used for handling history of alarms.
Source code in echo_baze/baze_root.py
def __init__(self, baze: e_bz.Baze) -> None:
"""Base class that all subclasses should inherit from.
Parameters
----------
baze : Baze
Top level object carrying all functionality and the connection handler.
"""
# check inputs
if not isinstance(baze, e_bz.Baze):
raise ValueError(f"baze must be of type Baze, not {type(baze)}")
self.baze: e_bz.Baze = baze
get(object_names, period, alarm_type='all', active_only=False, time_zone='local', output_type='dict', batch_size=10)
¶
Gets the alarm history for the given objects
The most useful keys/columns returned are:
- alarm
- alarmDescription
- alarmId
- alarmRemarks
- start
- end
- duration
- code
- type
Parameters:
-
(object_names¶list[str]) –Desired object names to get the alarm history from.
-
(period¶DateTimeRange) –Desired period to get the alarm history from. It is assumed to be in the same time zone as the one specified in time_zone.
-
(alarm_type¶Literal['all', 'alarm', 'warning', 'status'], default:'all') –Desired alarm types to get the alarm history from.
-
(active_only¶bool, default:False) –Whether to get only the active alarms or not, by default False
-
(time_zone¶TimeZone, default:'local') –In which time zone we assume the inputs are. Also used to define in which time zone the output is. There are three options:
- If "UTC" is used, we assume time already is in UTC.
- If local is used, the default time zone defined in echo_baze will be used.
- If an int, must be between -12 and +12
By default "local"
-
(output_type¶Literal['dict', 'DataFrame'], default:'dict') –Output type of the data. Can be one of ["dict", "DataFrame"] By default "dict"
-
(batch_size¶int, default:10) –Number of objects to request to Bazefield API at once. If the amount of alarms per object is expected to be low, increasing this can dramatically improve the speed of the method. By default 10
Returns:
-
dict[str, list[dict[str, Any]]]–In case output_type == "dict" it will return a dict with the following format: {object_name: [{alarm_attribute: value, ...}, ...], ...}
-
DataFrame–In case output_type == "DataFrame" it will return a DataFrame with the following format: index = MultiIndex with levels ["object_model", "alarm", "start"], columns = [attribute, ...]
Source code in echo_baze/alarm_history.py
@validate_call
def get(
self,
object_names: list[str],
period: DateTimeRange,
alarm_type: Literal["all", "alarm", "warning", "status"] = "all",
active_only: bool = False,
time_zone: TimeZone = "local",
output_type: Literal["dict", "DataFrame"] = "dict",
batch_size: int = 10,
) -> dict[str, list[dict[str, Any]]] | DataFrame:
"""Gets the alarm history for the given objects
The most useful keys/columns returned are:
- alarm
- alarmDescription
- alarmId
- alarmRemarks
- start
- end
- duration
- code
- type
Parameters
----------
object_names : list[str]
Desired object names to get the alarm history from.
period : DateTimeRange
Desired period to get the alarm history from. It is assumed to be in the same time zone as the one specified in time_zone.
alarm_type : Literal["all", "alarm", "warning", "status"]
Desired alarm types to get the alarm history from.
active_only : bool, optional
Whether to get only the active alarms or not, by default False
time_zone : TimeZone, optional
In which time zone we assume the inputs are. Also used to define in which time zone the output is. There are three options:
- If "UTC" is used, we assume time already is in UTC.
- If local is used, the default time zone defined in echo_baze will be used.
- If an int, must be between -12 and +12
By default "local"
output_type : Literal["dict", "DataFrame"], optional
Output type of the data. Can be one of ["dict", "DataFrame"]
By default "dict"
batch_size : int, optional
Number of objects to request to Bazefield API at once. If the amount of alarms per object is expected to be low, increasing this can dramatically improve the speed of the method. By default 10
Returns
-------
dict[str, list[dict[str, Any]]]
In case output_type == "dict" it will return a dict with the following format: {object_name: [{alarm_attribute: value, ...}, ...], ...}
DataFrame
In case output_type == "DataFrame" it will return a DataFrame with the following format: index = MultiIndex with levels ["object_model", "alarm", "start"], columns = [attribute, ...]
"""
t0 = time.perf_counter()
# getting object ids
object_ids = self.baze.objects.instances.get_ids()
object_id_to_name = {v: k for k, v in object_ids.items()}
if missing_object_names := set(object_names) - set(object_ids):
logger.warning(f"Object names {missing_object_names} do not exist")
# getting the alarm history
endpoint = "objects/alarmhistory"
success = False
try_num = 0
while not success:
if try_num > 0:
# reducing batch size in
if batch_size == 1:
raise ValueError("Batch size is already 1, cannot reduce it further")
batch_size = max(1, batch_size // 2)
logger.warning(f"Trying again with smaller batch size: {batch_size}")
results = {}
obj_batches = [object_names[i : i + batch_size] for i in range(0, len(object_names), batch_size)]
one_failed = False
for objects in obj_batches:
t0 = time.perf_counter()
payload = {
"from": timestamp_from_datetime(dt=period.start, time_zone=time_zone, unit="milliseconds"),
"to": timestamp_from_datetime(dt=period.end, time_zone=time_zone, unit="milliseconds"),
"objectIds": [object_ids[obj] for obj in objects],
"type": alarm_type,
"filter": "",
"filterActive": active_only,
"filterRootCauses": False,
"inObjectTime": False,
"includeCount": False,
"includeOkAlarms": True,
"orderBy": "Start desc",
"skip": 0,
"take": 99999999999999999999999999,
}
try:
request_result = self.baze.conn.post(endpoint, json=payload)
except Exception as e:
# checking if response was 500 and response.text contains "The query processor ran out of internal resources".
# If that is the case we will start again with a smaller batch size
if "500" in traceback.format_exc() and "The query processor ran out of internal resources" in traceback.format_exc():
logger.error(
f"Got 500 error with message 'The query processor ran out of internal resources'. Trying again with smaller batch size. The error message was\n {traceback.format_exc()}",
)
one_failed = True
try_num += 1
break
raise e
self._handle_http_errors(request_result)
# converting to dict
result: list[dict[str, Any]] = request_result.json()["alarms"]
# converting dates to datetime
result = [
entry
| {"objectKey": object_id_to_name[entry["objectId"].upper()]}
| {"start": convert_time_zone(datetime.strptime(entry["start"][:-9], "%Y-%m-%dT%H:%M:%S"), "UTC", time_zone)}
| (
{
"end": convert_time_zone(datetime.strptime(entry["end"][:-9], "%Y-%m-%dT%H:%M:%S"), "UTC", time_zone),
"duration": datetime.strptime(entry["end"][:-9], "%Y-%m-%dT%H:%M:%S")
- datetime.strptime(entry["start"][:-9], "%Y-%m-%dT%H:%M:%S"),
}
if "end" in entry
else {"duration": None}
)
| (
{"uncertainStopTime": datetime.strptime(entry["uncertainStopTime"][:-9], "%Y-%m-%dT%H:%M:%S")}
if "uncertainStopTime" in entry
else {}
)
for entry in result
]
# converting to split by object_name
result_by_object_name = {}
for entry in result:
object_name = entry.pop("objectKey")
if object_name not in result_by_object_name:
result_by_object_name[object_name] = []
result_by_object_name[object_name].append(entry)
results |= result_by_object_name
logger.debug(f"Got alarms data in {time.perf_counter() - t0:.3f} s for {objects}")
if not one_failed:
success = True
else:
continue
# converting to desired output
match output_type:
case "dict":
pass
case "DataFrame":
# converting to a list of dicts
results = [{**entry, "object_name": object_name} for object_name, entries in results.items() for entry in entries]
# converting to DataFrame
results = json_normalize(results, max_level=1)
cols = {
"id": "int64[pyarrow]",
"alarmId": "int64[pyarrow]",
"siteId": "int64[pyarrow]",
"turbineId": "int64[pyarrow]",
"objectId": "string[pyarrow]",
"turbineName": "string[pyarrow]",
"turbineTitle": "string[pyarrow]",
"alarmTemplateId": "int64[pyarrow]",
"start": "datetime64[s]",
"stopTime": "string[pyarrow]",
"alarm": "string[pyarrow]",
"alarmDescription": "string[pyarrow]",
"code": "int64[pyarrow]",
"brakePrg": "int64[pyarrow]",
"isOkAlarm": "bool[pyarrow]",
"alarmType": "int64[pyarrow]",
"sourceId": "int64[pyarrow]",
"sourceName": "string[pyarrow]",
"isRootCause": "bool[pyarrow]",
"alarmRemarks": "string[pyarrow]",
"allocations": "object",
"sfId": "string[pyarrow]",
"end": "datetime64[s]",
"uncertainStopTime": "datetime64[s]",
"duration": "timedelta64[s]",
"ackDuration": "string[pyarrow]",
"type": "string[pyarrow]",
"object_name": "string[pyarrow]",
}
if results.empty:
results = DataFrame(columns=list(cols.keys())).astype(cols)
results = results.convert_dtypes(dtype_backend="pyarrow")
# adding any missing columns if they are not present
for col, dtype in cols.items():
if col not in results.columns:
results[col] = NA
results[col] = results[col].astype(dtype)
# forcing datetime columns to be in standard datetime time for better compatibility
results["start"] = results["start"].astype("datetime64[s]")
results["end"] = results["end"].astype("datetime64[s]")
results["duration"] = results["duration"].astype("timedelta64[s]")
if "uncertainStopTime" in results.columns:
results["uncertainStopTime"] = results["uncertainStopTime"].astype("datetime64[s]")
results = results.set_index(["object_name", "alarm", "start"])
case _:
raise ValueError(f"output_type must be one of ['dict', 'DataFrame'], got '{output_type}'")
logger.debug(f"Got alarms data in {output_type} format in {time.perf_counter() - t0:.3f} s")
return results