Recalc¶
JobRecalc(baze)
¶
Class used for handling recalc jobs.
Source code in echo_baze/baze_root.py
def __init__(self, baze: e_bz.Baze) -> None:
"""Base class that all subclasses should inherit from.
Parameters
----------
baze : Baze
Top level object carrying all functionality and the connection handler.
"""
# check inputs
if not isinstance(baze, e_bz.Baze):
raise ValueError(f"baze must be of type Baze, not {type(baze)}")
self.baze: e_bz.Baze = baze
insert(period, points, time_zone='local', priority=99, method='tag')
¶
Schedule recalculations of points for the desired points.
Parameters:
-
(period¶DateTimeRange) –The desired period
-
(points¶dict[str, list[str]]) –Dict in the format {object_name: [point_name, ...], ...}
Objects will be grouped by model, so the points that will be recalculated for each model are all the points specified for the objects of that model. For example: {"SDM1-VRN1-01": ["IEC-OperationState"], "SDM1-VRN1-02": ["CommunicationState"]} will result in both "IEC-OperationState" and "CommunicationState" being recalculated for the two turbines as they are in the same model.
-
(time_zone¶TimeZone, default:'local') –In which time zone we assume the inputs are. Also used to define in which time zone the output is. There are three options:
- If "UTC" is used, we assume time already is in UTC.
- If local is used, the default time zone defined in echo_baze will be used.
- If an int, must be between -12 and +12
By default "local"
-
(priority¶int, default:99) –The priority of the recalc job. Default is 99.
-
(method¶Literal['schema', 'tag'], default:'tag') –The method to use for recalculation.
- "schema" will act like if recalculating by Data Management page in portal. It will request the recalc by objects and points (schemas).
- "tag" will act like if recalculating by the Tag Manager page in portal. It will request the recalc by tags ids.
By default "tag".
Returns:
-
dict[str, Any]–The response of the request. Most useful keys are orderId and pkId
Source code in echo_baze/job_recalc.py
@validate_call
def insert(
self,
period: DateTimeRange,
points: dict[str, list[str]],
time_zone: TimeZone = "local",
priority: int = 99,
method: Literal["schema", "tag"] = "tag",
) -> dict[str, Any]:
"""
Schedule recalculations of points for the desired points.
Parameters
----------
period : DateTimeRange
The desired period
points : dict[str, list[str]]
Dict in the format {object_name: [point_name, ...], ...}
Objects will be grouped by model, so the points that will be recalculated for each model are all the points specified for the objects of that model. For example: {"SDM1-VRN1-01": ["IEC-OperationState"], "SDM1-VRN1-02": ["CommunicationState"]} will result in both "IEC-OperationState" and "CommunicationState" being recalculated for the two turbines as they are in the same model.
time_zone : TimeZone, optional
In which time zone we assume the inputs are. Also used to define in which time zone the output is. There are three options:
- If "UTC" is used, we assume time already is in UTC.
- If local is used, the default time zone defined in echo_baze will be used.
- If an int, must be between -12 and +12
By default "local"
priority : int, optional
The priority of the recalc job. Default is 99.
method : Literal["schema", "tag"], optional
The method to use for recalculation.
- "schema" will act like if recalculating by Data Management page in portal. It will request the recalc by objects and points (schemas).
- "tag" will act like if recalculating by the Tag Manager page in portal. It will request the recalc by tags ids.
By default "tag".
Returns
-------
dict[str, Any]
The response of the request. Most useful keys are orderId and pkId
"""
t0 = time.perf_counter()
endpoint = "statesyncorders"
# getting models of all objects
objs_def = self.baze.objects.instances.get(object_names=list(points.keys()))
# grouping objects by model
obj_models = {}
for obj_name, obj_def in objs_def.items():
if obj_def["attributes"]["domainName"] not in obj_models:
obj_models[obj_def["attributes"]["domainName"]] = []
obj_models[obj_def["attributes"]["domainName"]].append(obj_name)
if method == "schema":
logger.info("Starting recalc job using schema method (Data Management page in portal)")
# getting point definitions
point_details = self.baze.points.definitions.get(object_models=list(obj_models.keys()))
# iterating over all models
for obj_model, objs in obj_models.items():
model_point_details = point_details[obj_model]
# getting all points for the model
model_points = []
for obj_name in objs:
model_points += list(points[obj_name])
model_points = list(set(model_points))
# checking if all points are available for the model
not_found_points = set(model_points) - set(model_point_details.keys())
if len(not_found_points) > 0:
raise ValueError(
f"The following points were not found for model '{obj_model}': {not_found_points}",
)
# getting the schemaId for all the points
schema_ids = [model_point_details[point]["schemaId"] for point in model_points]
# getting assets
assets = [objs_def[obj_name]["attributes"]["objectHandle"] for obj_name in objs]
payload = {
"from": convert_time_zone(dt=period.start, old_tz=time_zone, new_tz="UTC").strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z",
"to": convert_time_zone(dt=period.end, old_tz=time_zone, new_tz="UTC").strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z",
"orderMask": 2, # dont know why this is needed, just replicating what the portal does
"realarm": False,
"reallocate": False,
"recalc": True,
"priority": priority,
"settings": { # dont know why this is needed, just replicating what the portal does
"calculationMask": 15,
# concat all the schemaIds as a string separated by commas
"schemaIds": ",".join([str(schema_id) for schema_id in schema_ids]),
},
"assetIds": assets,
}
t1 = time.perf_counter()
result = self.baze.conn.post(endpoint, json=payload)
self._handle_http_errors(result)
logger.debug(
f"Request response for {objs} {model_points} {period.start:%Y-%m-%d %H:%M:%S} to {period.end:%Y-%m-%d %H:%M:%S} in {time.perf_counter() - t1:.3f} s",
)
result = result.json()
# check if all assets are in result["assets"]
for obj_name in objs:
obj_id = objs_def[obj_name]["objectId"]
obj_found = False
for asset in result["assets"]:
if asset["id"] == obj_id:
obj_found = True
break
if not obj_found:
raise ValueError(f"Object {obj_name} not found in recalc job creation: {result}")
if method == "tag":
logger.info("Starting recalc job using tag method (Tag Manager page in portal)")
# getting tag ids
point_details = self.baze.points.details.get(points=points)
payload = {
"from": convert_time_zone(dt=period.start, old_tz=time_zone, new_tz="UTC").strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z",
"to": convert_time_zone(dt=period.end, old_tz=time_zone, new_tz="UTC").strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z",
"orderMask": 2, # dont know why this is needed, just replicating what the portal does
"priority": priority,
"settings": { # dont know why this is needed, just replicating what the portal does
"calculationMask": 15,
},
}
tag_ids = []
for object_name, object_points in points.items():
# checking if all points are available for the model
not_found_points = set(object_points) - set(point_details[object_name].keys())
if len(not_found_points) > 0:
raise ValueError(
f"The following points were not found for object '{object_name}': {not_found_points}",
)
tag_ids = tag_ids + [v["measurementId"] for v in point_details[object_name].values()]
# getting the data
payload["measurementIds"] = tag_ids
t1 = time.perf_counter()
result = self.baze.conn.post(endpoint, json=payload)
self._handle_http_errors(result)
logger.debug(
f"Request response for {object_name} {object_points} {period.start:%Y-%m-%d %H:%M:%S} to {period.end:%Y-%m-%d %H:%M:%S} in {time.perf_counter() - t1:.3f} s",
)
result = result.json()
if result["state"] != "Created":
raise ValueError(f"Error in recalc job creation: {result}")
logger.debug(f"Created recalc job in {time.perf_counter() - t0:.3f} s")
return result