Skip to content

Point Tags

PointTags(baze)

Class used for handling point Tags

Source code in echo_baze/baze_root.py
def __init__(self, baze: e_bz.Baze) -> None:
    """Base class that all subclasses should inherit from.

    Parameters
    ----------
    baze : Baze
        Top level object carrying all functionality and the connection handler.

    """
    # check inputs
    if not isinstance(baze, e_bz.Baze):
        raise ValueError(f"baze must be of type Baze, not {type(baze)}")

    self.baze: e_bz.Baze = baze

get(points=None, tag_ids=None, tag_names=None, get_device_names=False, request_batch_size=1000, output_type='dict')

Gets the Tags and it's definitions.

The main attributes returned are:

  • id
  • accessType
  • coerceType
  • description
  • deviceId
  • driverSource
  • engineDeadBand
  • engineId
  • euFullScale
  • euZeroScale
  • inverted
  • ioDataType
  • itemId
  • libraryId
  • logData
  • logDeadBand
  • logType
  • memoryDataType
  • rawFullScale
  • rawZeroScale
  • scaleEquation
  • scaleType
  • setInitialValue
  • tagId
  • tagName
  • tagStatus
  • tagType
  • unitId
  • unitName
  • update
  • createdOn
  • updatedOn
  • uniqueID
  • databaseId
  • attributes.LogicalId
  • systemAttributes.ObjectId

Parameters:

  • points

    (dict[str, list[str]], default: None ) –

    Dict in the format {object_name: [point_name, ...], ...}. By default None

  • tag_ids

    (list[int], default: None ) –

    List of tag ids to get the details from. By default None

  • tag_names

    (list[str], default: None ) –

    List of tag names to get the details from. By default None

  • get_device_names

    (bool, default: False ) –

    If True, it will get the device names for the tags. By default False

  • request_batch_size

    (int, default: 1000 ) –

    Size of the batch to request the data. Only this many tags will be requested at a time. By default 1000

  • output_type

    (Literal['dict', 'DataFrame'], default: 'dict' ) –

    Output type of the data. Can be one of ["dict", "DataFrame"] By default "dict"

Returns:

  • dict[str, dict[str, dict[str, Any]]]

    In case output_type == "dict", it will return a dict with the format {object_name: {point_name: {attribute_name: value, ...}, ...}, ...}

  • DataFrame

    In case output_type == "DataFrame", it will return a DataFrame with object_name and point as the index (MultiIndex).

Source code in echo_baze/point_tags.py
@validate_call
def get(
    self,
    points: dict[str, list[str]] | None = None,
    tag_ids: list[int] | None = None,
    tag_names: list[str] | None = None,
    get_device_names: bool = False,
    request_batch_size: int = 1000,
    output_type: Literal["dict", "DataFrame"] = "dict",
) -> dict[str, dict[str, dict[str, Any]]] | DataFrame:
    """Gets the Tags and it's definitions.

    The main attributes returned are:

    - id
    - accessType
    - coerceType
    - description
    - deviceId
    - driverSource
    - engineDeadBand
    - engineId
    - euFullScale
    - euZeroScale
    - inverted
    - ioDataType
    - itemId
    - libraryId
    - logData
    - logDeadBand
    - logType
    - memoryDataType
    - rawFullScale
    - rawZeroScale
    - scaleEquation
    - scaleType
    - setInitialValue
    - tagId
    - tagName
    - tagStatus
    - tagType
    - unitId
    - unitName
    - update
    - createdOn
    - updatedOn
    - uniqueID
    - databaseId
    - attributes.LogicalId
    - systemAttributes.ObjectId

    Parameters
    ----------
    points : dict[str, list[str]], optional
        Dict in the format {object_name: [point_name, ...], ...}. By default None
    tag_ids : list[int], optional
        List of tag ids to get the details from. By default None
    tag_names : list[str], optional
        List of tag names to get the details from. By default None
    get_device_names : bool, optional
        If True, it will get the device names for the tags. By default False
    request_batch_size : int, optional
        Size of the batch to request the data. Only this many tags will be requested at a time. By default 1000
    output_type : Literal["dict", "DataFrame"], optional
        Output type of the data. Can be one of ["dict", "DataFrame"]
        By default "dict"

    Returns
    -------
    dict[str, dict[str, dict[str, Any]]]
        In case output_type == "dict", it will return a dict with the format {object_name: {point_name: {attribute_name: value, ...}, ...}, ...}
    DataFrame
        In case output_type == "DataFrame", it will return a DataFrame with object_name and point as the index (MultiIndex).
    """
    # validating if only one of the inputs is set
    if sum([points is not None, tag_ids is not None, tag_names is not None]) != 1:
        raise ValueError("Only one of points, tag_ids or tag_names can be set.")

    tag_batches = []
    if tag_ids is not None:
        tag_batches = [tag_ids[i : i + request_batch_size] for i in range(0, len(tag_ids), request_batch_size)]
    elif tag_names is not None:
        tag_batches = [tag_names[i : i + request_batch_size] for i in range(0, len(tag_names), request_batch_size)]
    elif points is not None:
        tag_names = [f"{object_name}-{point_name}" for object_name, point_names in points.items() for point_name in point_names]
        tag_batches = [tag_names[i : i + request_batch_size] for i in range(0, len(tag_names), request_batch_size)]

    df_list = []
    got_count = 0
    total_tags = sum([len(tag_batch) for tag_batch in tag_batches])
    for tag_batch in tag_batches:
        logger.debug(
            f"Getting tags definitions for {len(tag_batch)} tags. Done {got_count} of {total_tags}, {got_count / total_tags:.2%}",
        )
        #  this will only be used to get the id of the turbines
        endpoint = "/measurements/metadata/search"
        payload = {}
        if tag_ids is not None:
            payload["TagIds"] = tag_batch
        if tag_names is not None or points is not None:
            payload["TagNames"] = tag_batch

        result = self.baze.conn.get(endpoint, json=payload)
        self._handle_http_errors(result)
        result = result.json()

        # converting to DataFrame
        df = DataFrame(result)
        df_list.append(df)
        got_count += len(tag_batch)

    df = concat(df_list)
    df = df.reset_index(drop=True)  # needed as the index is not unique

    # create the necessary columns
    cols = ["attributes", "systemAttributes"]
    for col in cols:
        if col not in df.columns:
            df[col] = NA

    # converting attributes and systemAttributes to dicts (they are a string in the format attribute1=value1;attribute2=value2)
    for col in ["attributes", "systemAttributes"]:
        if col not in df.columns:
            continue
        if col == "attributes":
            df["expanded"] = df[col].apply(
                lambda x: dict(
                    [tuple(attr.split("=", maxsplit=1)) for attr in x.split("|") if len(attr.split("=", maxsplit=1)) == 2],
                )
                if not isna(x)
                else {},
            )
        elif col == "systemAttributes":
            df["expanded"] = df[col].apply(
                lambda x: dict(
                    [tuple(attr.split("=", maxsplit=1)) for attr in x.split(";") if len(attr.split("=", maxsplit=1)) == 2],
                )
                if not isna(x)
                else {},
            )
        # expanding the dict to columns with the attribute name as prefix
        expanded_df = json_normalize(df["expanded"])
        expanded_df.columns = [f"{col}.{col_name}" for col_name in expanded_df.columns]
        df = concat([df, expanded_df], axis=1)
        # dropping the original column
        df = df.drop(columns=[col, "expanded"])

    # create the necessary columns
    cols = ["systemAttributes.ObjectKey", "systemAttributes.Schema"]
    for col in cols:
        if col not in df.columns:
            df[col] = NA

    # renaming systemAttributes.objectKey as object_name and systemAttributes.Schema as point
    df = df.rename(columns={"systemAttributes.ObjectKey": "object_name", "systemAttributes.Schema": "point"})

    if "deviceId" not in df.columns:
        df["deviceId"] = NA

    # getting device names
    if get_device_names:
        device_ids = self.baze.devices.get_ids()
        device_ids = {value: key for key, value in device_ids.items()}
        df["deviceName"] = df["deviceId"].map(device_ids)
    else:
        df["deviceName"] = NA

    # converting to correct data types
    df = df.astype(
        {
            "object_name": "string[pyarrow]",
            "point": "string[pyarrow]",
            "deviceName": "string[pyarrow]",
            "deviceId": "int32[pyarrow]",
        },
    )

    # setting object_name and point as index
    df = df.set_index(["object_name", "point"])

    # handling output
    match output_type:
        case "dict":
            # converting to dict
            result = df.to_dict(orient="index")
            results = {}
            # unpacking the dict to have the format {object_name: {point_name: {attribute_name: value, ...}, ...}, ...}
            for key, value in result.items():
                obj_name, point_name = key
                results.setdefault(obj_name, {})[point_name] = value

            return results
        case "DataFrame":
            return df