Point Tags¶
PointTags(baze)
¶
Class used for handling point Tags
Source code in echo_baze/baze_root.py
def __init__(self, baze: e_bz.Baze) -> None:
"""Base class that all subclasses should inherit from.
Parameters
----------
baze : Baze
Top level object carrying all functionality and the connection handler.
"""
# check inputs
if not isinstance(baze, e_bz.Baze):
raise ValueError(f"baze must be of type Baze, not {type(baze)}")
self.baze: e_bz.Baze = baze
get(points=None, tag_ids=None, tag_names=None, get_device_names=False, request_batch_size=1000, output_type='dict')
¶
Gets the Tags and it's definitions.
The main attributes returned are:
- id
- accessType
- coerceType
- description
- deviceId
- driverSource
- engineDeadBand
- engineId
- euFullScale
- euZeroScale
- inverted
- ioDataType
- itemId
- libraryId
- logData
- logDeadBand
- logType
- memoryDataType
- rawFullScale
- rawZeroScale
- scaleEquation
- scaleType
- setInitialValue
- tagId
- tagName
- tagStatus
- tagType
- unitId
- unitName
- update
- createdOn
- updatedOn
- uniqueID
- databaseId
- attributes.LogicalId
- systemAttributes.ObjectId
Parameters:
-
(points¶dict[str, list[str]], default:None) –Dict in the format {object_name: [point_name, ...], ...}. By default None
-
(tag_ids¶list[int], default:None) –List of tag ids to get the details from. By default None
-
(tag_names¶list[str], default:None) –List of tag names to get the details from. By default None
-
(get_device_names¶bool, default:False) –If True, it will get the device names for the tags. By default False
-
(request_batch_size¶int, default:1000) –Size of the batch to request the data. Only this many tags will be requested at a time. By default 1000
-
(output_type¶Literal['dict', 'DataFrame'], default:'dict') –Output type of the data. Can be one of ["dict", "DataFrame"] By default "dict"
Returns:
-
dict[str, dict[str, dict[str, Any]]]–In case output_type == "dict", it will return a dict with the format {object_name: {point_name: {attribute_name: value, ...}, ...}, ...}
-
DataFrame–In case output_type == "DataFrame", it will return a DataFrame with object_name and point as the index (MultiIndex).
Source code in echo_baze/point_tags.py
@validate_call
def get(
self,
points: dict[str, list[str]] | None = None,
tag_ids: list[int] | None = None,
tag_names: list[str] | None = None,
get_device_names: bool = False,
request_batch_size: int = 1000,
output_type: Literal["dict", "DataFrame"] = "dict",
) -> dict[str, dict[str, dict[str, Any]]] | DataFrame:
"""Gets the Tags and it's definitions.
The main attributes returned are:
- id
- accessType
- coerceType
- description
- deviceId
- driverSource
- engineDeadBand
- engineId
- euFullScale
- euZeroScale
- inverted
- ioDataType
- itemId
- libraryId
- logData
- logDeadBand
- logType
- memoryDataType
- rawFullScale
- rawZeroScale
- scaleEquation
- scaleType
- setInitialValue
- tagId
- tagName
- tagStatus
- tagType
- unitId
- unitName
- update
- createdOn
- updatedOn
- uniqueID
- databaseId
- attributes.LogicalId
- systemAttributes.ObjectId
Parameters
----------
points : dict[str, list[str]], optional
Dict in the format {object_name: [point_name, ...], ...}. By default None
tag_ids : list[int], optional
List of tag ids to get the details from. By default None
tag_names : list[str], optional
List of tag names to get the details from. By default None
get_device_names : bool, optional
If True, it will get the device names for the tags. By default False
request_batch_size : int, optional
Size of the batch to request the data. Only this many tags will be requested at a time. By default 1000
output_type : Literal["dict", "DataFrame"], optional
Output type of the data. Can be one of ["dict", "DataFrame"]
By default "dict"
Returns
-------
dict[str, dict[str, dict[str, Any]]]
In case output_type == "dict", it will return a dict with the format {object_name: {point_name: {attribute_name: value, ...}, ...}, ...}
DataFrame
In case output_type == "DataFrame", it will return a DataFrame with object_name and point as the index (MultiIndex).
"""
# validating if only one of the inputs is set
if sum([points is not None, tag_ids is not None, tag_names is not None]) != 1:
raise ValueError("Only one of points, tag_ids or tag_names can be set.")
tag_batches = []
if tag_ids is not None:
tag_batches = [tag_ids[i : i + request_batch_size] for i in range(0, len(tag_ids), request_batch_size)]
elif tag_names is not None:
tag_batches = [tag_names[i : i + request_batch_size] for i in range(0, len(tag_names), request_batch_size)]
elif points is not None:
tag_names = [f"{object_name}-{point_name}" for object_name, point_names in points.items() for point_name in point_names]
tag_batches = [tag_names[i : i + request_batch_size] for i in range(0, len(tag_names), request_batch_size)]
df_list = []
got_count = 0
total_tags = sum([len(tag_batch) for tag_batch in tag_batches])
for tag_batch in tag_batches:
logger.debug(
f"Getting tags definitions for {len(tag_batch)} tags. Done {got_count} of {total_tags}, {got_count / total_tags:.2%}",
)
# this will only be used to get the id of the turbines
endpoint = "/measurements/metadata/search"
payload = {}
if tag_ids is not None:
payload["TagIds"] = tag_batch
if tag_names is not None or points is not None:
payload["TagNames"] = tag_batch
result = self.baze.conn.get(endpoint, json=payload)
self._handle_http_errors(result)
result = result.json()
# converting to DataFrame
df = DataFrame(result)
df_list.append(df)
got_count += len(tag_batch)
df = concat(df_list)
df = df.reset_index(drop=True) # needed as the index is not unique
# create the necessary columns
cols = ["attributes", "systemAttributes"]
for col in cols:
if col not in df.columns:
df[col] = NA
# converting attributes and systemAttributes to dicts (they are a string in the format attribute1=value1;attribute2=value2)
for col in ["attributes", "systemAttributes"]:
if col not in df.columns:
continue
if col == "attributes":
df["expanded"] = df[col].apply(
lambda x: dict(
[tuple(attr.split("=", maxsplit=1)) for attr in x.split("|") if len(attr.split("=", maxsplit=1)) == 2],
)
if not isna(x)
else {},
)
elif col == "systemAttributes":
df["expanded"] = df[col].apply(
lambda x: dict(
[tuple(attr.split("=", maxsplit=1)) for attr in x.split(";") if len(attr.split("=", maxsplit=1)) == 2],
)
if not isna(x)
else {},
)
# expanding the dict to columns with the attribute name as prefix
expanded_df = json_normalize(df["expanded"])
expanded_df.columns = [f"{col}.{col_name}" for col_name in expanded_df.columns]
df = concat([df, expanded_df], axis=1)
# dropping the original column
df = df.drop(columns=[col, "expanded"])
# create the necessary columns
cols = ["systemAttributes.ObjectKey", "systemAttributes.Schema"]
for col in cols:
if col not in df.columns:
df[col] = NA
# renaming systemAttributes.objectKey as object_name and systemAttributes.Schema as point
df = df.rename(columns={"systemAttributes.ObjectKey": "object_name", "systemAttributes.Schema": "point"})
if "deviceId" not in df.columns:
df["deviceId"] = NA
# getting device names
if get_device_names:
device_ids = self.baze.devices.get_ids()
device_ids = {value: key for key, value in device_ids.items()}
df["deviceName"] = df["deviceId"].map(device_ids)
else:
df["deviceName"] = NA
# converting to correct data types
df = df.astype(
{
"object_name": "string[pyarrow]",
"point": "string[pyarrow]",
"deviceName": "string[pyarrow]",
"deviceId": "int32[pyarrow]",
},
)
# setting object_name and point as index
df = df.set_index(["object_name", "point"])
# handling output
match output_type:
case "dict":
# converting to dict
result = df.to_dict(orient="index")
results = {}
# unpacking the dict to have the format {object_name: {point_name: {attribute_name: value, ...}, ...}, ...}
for key, value in result.items():
obj_name, point_name = key
results.setdefault(obj_name, {})[point_name] = value
return results
case "DataFrame":
return df