Skip to content

Recalc

JobRecalc(baze)

Class used for handling recalc jobs.

Source code in echo_baze/baze_root.py
def __init__(self, baze: e_bz.Baze) -> None:
    """Base class that all subclasses should inherit from.

    Parameters
    ----------
    baze : Baze
        Top level object carrying all functionality and the connection handler.

    """
    # check inputs
    if not isinstance(baze, e_bz.Baze):
        raise ValueError(f"baze must be of type Baze, not {type(baze)}")

    self.baze: e_bz.Baze = baze

insert(period, points, time_zone='local', priority=99, method='tag')

Schedule recalculations of points for the desired points.

Parameters:

  • period

    (DateTimeRange) –

    The desired period

  • points

    (dict[str, list[str]]) –

    Dict in the format {object_name: [point_name, ...], ...}

    Objects will be grouped by model, so the points that will be recalculated for each model are all the points specified for the objects of that model. For example: {"SDM1-VRN1-01": ["IEC-OperationState"], "SDM1-VRN1-02": ["CommunicationState"]} will result in both "IEC-OperationState" and "CommunicationState" being recalculated for the two turbines as they are in the same model.

  • time_zone

    ( TimeZone, default: 'local' ) –

    In which time zone we assume the inputs are. Also used to define in which time zone the output is. There are three options:

    • If "UTC" is used, we assume time already is in UTC.
    • If local is used, the default time zone defined in echo_baze will be used.
    • If an int, must be between -12 and +12

    By default "local"

  • priority

    (int, default: 99 ) –

    The priority of the recalc job. Default is 99.

  • method

    (Literal['schema', 'tag'], default: 'tag' ) –

    The method to use for recalculation.

    • "schema" will act like if recalculating by Data Management page in portal. It will request the recalc by objects and points (schemas).
    • "tag" will act like if recalculating by the Tag Manager page in portal. It will request the recalc by tags ids.

    By default "tag".

Returns:

  • dict[str, Any]

    The response of the request. Most useful keys are orderId and pkId

Source code in echo_baze/job_recalc.py
@validate_call
def insert(
    self,
    period: DateTimeRange,
    points: dict[str, list[str]],
    time_zone: TimeZone = "local",
    priority: int = 99,
    method: Literal["schema", "tag"] = "tag",
) -> dict[str, Any]:
    """
    Schedule recalculations of points for the desired points.

    Parameters
    ----------
    period : DateTimeRange
        The desired period
    points : dict[str, list[str]]
        Dict in the format {object_name: [point_name, ...], ...}

        Objects will be grouped by model, so the points that will be recalculated for each model are all the points specified for the objects of that model. For example: {"SDM1-VRN1-01": ["IEC-OperationState"], "SDM1-VRN1-02": ["CommunicationState"]} will result in both "IEC-OperationState" and "CommunicationState" being recalculated for the two turbines as they are in the same model.
    time_zone :  TimeZone, optional
        In which time zone we assume the inputs are. Also used to define in which time zone the output is. There are three options:

        - If "UTC" is used, we assume time already is in UTC.
        - If local is used, the default time zone defined in echo_baze will be used.
        - If an int, must be between -12 and +12

        By default "local"
    priority : int, optional
        The priority of the recalc job. Default is 99.
    method : Literal["schema", "tag"], optional
        The method to use for recalculation.

        - "schema" will act like if recalculating by Data Management page in portal. It will request the recalc by objects and points (schemas).
        - "tag" will act like if recalculating by the Tag Manager page in portal. It will request the recalc by tags ids.

        By default "tag".

    Returns
    -------
    dict[str, Any]
        The response of the request. Most useful keys are orderId and pkId
    """
    t0 = time.perf_counter()

    endpoint = "statesyncorders"

    # getting models of all objects
    objs_def = self.baze.objects.instances.get(object_names=list(points.keys()))

    # grouping objects by model
    obj_models = {}
    for obj_name, obj_def in objs_def.items():
        if obj_def["attributes"]["domainName"] not in obj_models:
            obj_models[obj_def["attributes"]["domainName"]] = []
        obj_models[obj_def["attributes"]["domainName"]].append(obj_name)

    if method == "schema":
        logger.info("Starting recalc job using schema method (Data Management page in portal)")

        # getting point definitions
        point_details = self.baze.points.definitions.get(object_models=list(obj_models.keys()))

        # iterating over all models
        for obj_model, objs in obj_models.items():
            model_point_details = point_details[obj_model]
            # getting all points for the model
            model_points = []
            for obj_name in objs:
                model_points += list(points[obj_name])
            model_points = list(set(model_points))
            # checking if all points are available for the model
            not_found_points = set(model_points) - set(model_point_details.keys())
            if len(not_found_points) > 0:
                raise ValueError(
                    f"The following points were not found for model '{obj_model}': {not_found_points}",
                )

            # getting the schemaId for all the points
            schema_ids = [model_point_details[point]["schemaId"] for point in model_points]

            # getting assets
            assets = [objs_def[obj_name]["attributes"]["objectHandle"] for obj_name in objs]

            payload = {
                "from": convert_time_zone(dt=period.start, old_tz=time_zone, new_tz="UTC").strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z",
                "to": convert_time_zone(dt=period.end, old_tz=time_zone, new_tz="UTC").strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z",
                "orderMask": 2,  # dont know why this is needed, just replicating what the portal does
                "realarm": False,
                "reallocate": False,
                "recalc": True,
                "priority": priority,
                "settings": {  # dont know why this is needed, just replicating what the portal does
                    "calculationMask": 15,
                    # concat all the schemaIds as a string separated by commas
                    "schemaIds": ",".join([str(schema_id) for schema_id in schema_ids]),
                },
                "assetIds": assets,
            }

            t1 = time.perf_counter()
            result = self.baze.conn.post(endpoint, json=payload)
            self._handle_http_errors(result)
            logger.debug(
                f"Request response for {objs} {model_points} {period.start:%Y-%m-%d %H:%M:%S} to {period.end:%Y-%m-%d %H:%M:%S} in {time.perf_counter() - t1:.3f} s",
            )

            result = result.json()

            # check if all assets are in result["assets"]
            for obj_name in objs:
                obj_id = objs_def[obj_name]["objectId"]
                obj_found = False
                for asset in result["assets"]:
                    if asset["id"] == obj_id:
                        obj_found = True
                        break
                if not obj_found:
                    raise ValueError(f"Object {obj_name} not found in recalc job creation: {result}")

    if method == "tag":
        logger.info("Starting recalc job using tag method (Tag Manager page in portal)")

        # getting tag ids
        point_details = self.baze.points.details.get(points=points)

        payload = {
            "from": convert_time_zone(dt=period.start, old_tz=time_zone, new_tz="UTC").strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z",
            "to": convert_time_zone(dt=period.end, old_tz=time_zone, new_tz="UTC").strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z",
            "orderMask": 2,  # dont know why this is needed, just replicating what the portal does
            "priority": priority,
            "settings": {  # dont know why this is needed, just replicating what the portal does
                "calculationMask": 15,
            },
        }

        tag_ids = []
        for object_name, object_points in points.items():
            # checking if all points are available for the model
            not_found_points = set(object_points) - set(point_details[object_name].keys())
            if len(not_found_points) > 0:
                raise ValueError(
                    f"The following points were not found for object '{object_name}': {not_found_points}",
                )

            tag_ids = tag_ids + [v["measurementId"] for v in point_details[object_name].values()]

        # getting the data
        payload["measurementIds"] = tag_ids

        t1 = time.perf_counter()
        result = self.baze.conn.post(endpoint, json=payload)
        self._handle_http_errors(result)
        logger.debug(
            f"Request response for {object_name} {object_points} {period.start:%Y-%m-%d %H:%M:%S} to {period.end:%Y-%m-%d %H:%M:%S} in {time.perf_counter() - t1:.3f} s",
        )

        result = result.json()

    if result["state"] != "Created":
        raise ValueError(f"Error in recalc job creation: {result}")

    logger.debug(f"Created recalc job in {time.perf_counter() - t0:.3f} s")

    return result