Allocations History¶
AllocationHistory(baze)
¶
Class used for handling allocation history.
Source code in echo_baze/allocation_history.py
def __init__(self, baze: e_bz.Baze) -> None:
super().__init__(baze)
# below are the require object type attributes for the calculations
# it is in the format {code_alias: attribute_key, ...} where attribute_key is the key of the attribute in Bazefield
self._required_calc_obj_type_attrs = {
"ActivePower": "ActivePowerKpiPoint",
"ActivePowerTheoretical": "ActivePowerTheoreticalKpiPoint",
}
calc_lost_energy(allocations, object_names, period, points=None, ons_points=None, point_names=None)
¶
Calculates the lost energy for the given allocations.
It will get the point values for the points defined in point_names (actual and theoretical) and calculate the lost energy for each allocation. Note that if the points are in kW, the lost energy will be in kWh (the same applies for MW and MWh).
This method will also add a lostCurtailmentProduction column to the allocations DataFrame. This represents the energy that would be lost due to curtailment if the ONS power limits were not applied. In other words, measured + lostProduction + lostCurtailmentProduction = theoreticalProduction.
Please note that overlapping allocations will be removed when this executed! Otherwise we would have a problem.
Parameters:
-
(allocations¶DataFrame) –DataFrame containing allocations for one or more objects. Must have the following columns: "objectKey", "start", "end", "lostProduction", "category.name", "category.available", "category.excluded". The index must be the allocation ids (having the name "id").
-
(object_names¶list[str]) –List of object names to get the point values from.
This is needed as the allocations DataFrame might not contain all desired objects as some of then might not have allocations in the given period.
-
(period¶DateTimeRange) –Period of the allocations. It is assumed that all allocations are within this period.
It will be used to get the point values and then calculate the total produced energy in the period.
We assume this will be in a resolution of 10 minutes or higher, so no periods starting or ending in minutes not multiple of 10 will be accepted.
Also, it is assumed that the periods are open on the left and closed on the right.
-
(points¶DataFrame | None, default:None) –DataFrame containing the point values for the objects in the allocations. If None, it will be requested from Bazefield. By default None
-
(ons_points¶DataFrame | None, default:None) –DataFrame containing the power limit imposed by ONS. If None, no adjustment considering ONS power will be done. By default None.
-
(point_names¶dict[str, dict[str, str]] | None, default:None) –Dict containing mapping from ActivePower and ActivePowerTheoretical to the actual point names in the points DataFrame. It should only be used if points DataFrame is provided.
It should be in the format {object_name: {{"ActivePower": ActivePowerKpiPoint, "ActivePowerTheoretical": ActivePowerTheoreticalKpiPoint}}}, where ActivePowerTheoreticalKpiPoint and ActivePowerKpiPoint are attributes of the object type defined in Bazefield.
Returns:
-
DataFrame–allocations DataFrame with the lostProduction column updated and added lostCurtailmentProduction column.
-
dict[str, float]–dict containing the total produced energy in the period
Source code in echo_baze/allocation_history.py
@validate_call
def calc_lost_energy(
self,
allocations: pd.DataFrame,
object_names: list[str],
period: DateTimeRange,
points: pd.DataFrame | None = None,
ons_points: pd.DataFrame | None = None,
point_names: dict[str, dict[str, str]] | None = None,
) -> tuple[pd.DataFrame, dict[str, float]]:
"""Calculates the lost energy for the given allocations.
It will get the point values for the points defined in point_names (actual and theoretical) and calculate the lost energy for each allocation. Note that if the points are in kW, the lost energy will be in kWh (the same applies for MW and MWh).
This method will also add a lostCurtailmentProduction column to the allocations DataFrame. This represents the energy that would be lost due to curtailment if the ONS power limits were not applied. In other words, measured + lostProduction + lostCurtailmentProduction = theoreticalProduction.
Please note that overlapping allocations will be removed when this executed! Otherwise we would have a problem.
Parameters
----------
allocations : DataFrame
DataFrame containing allocations for one or more objects. Must have the following columns: "objectKey", "start", "end", "lostProduction", "category.name", "category.available", "category.excluded".
The index must be the allocation ids (having the name "id").
object_names : list[str]
List of object names to get the point values from.
This is needed as the allocations DataFrame might not contain all desired objects as some of then might not have allocations in the given period.
period : DateTimeRange
Period of the allocations. It is assumed that all allocations are within this period.
It will be used to get the point values and then calculate the total produced energy in the period.
We assume this will be in a resolution of 10 minutes or higher, so no periods starting or ending in minutes not multiple of 10 will be accepted.
Also, it is assumed that the periods are open on the left and closed on the right.
points : DataFrame | None, optional
DataFrame containing the point values for the objects in the allocations. If None, it will be requested from Bazefield. By default None
ons_points : DataFrame | None, optional
DataFrame containing the power limit imposed by ONS. If None, no adjustment considering ONS power will be done. By default None.
point_names : dict[str, dict[str, str]] | None, optional
Dict containing mapping from ActivePower and ActivePowerTheoretical to the actual point names in the points DataFrame. It should only be used if points DataFrame is provided.
It should be in the format {object_name: {{"ActivePower": ActivePowerKpiPoint, "ActivePowerTheoretical": ActivePowerTheoreticalKpiPoint}}}, where ActivePowerTheoreticalKpiPoint and ActivePowerKpiPoint are attributes of the object type defined in Bazefield.
Returns
-------
DataFrame
allocations DataFrame with the lostProduction column updated and added lostCurtailmentProduction column.
dict[str, float]
dict containing the total produced energy in the period
"""
t0 = time.perf_counter()
# checking inputs
needed_columns = {"objectKey", "start", "end", "lostProduction", "category.name", "category.available", "category.excluded"}
if missing_columns := needed_columns - set(allocations.columns):
raise ValueError(f"allocations DataFrame is missing the following columns: {missing_columns}")
if allocations.index.name != "id":
raise ValueError(f"allocations DataFrame must have the index name set to 'id', got: {allocations.index.name}")
if len(allocations) > 0 and (allocations["start"].min() < period.start or allocations["end"].max() > period.end):
raise ValueError(
f"allocations must be fully contained in the period. Got allocations from {allocations['start'].min():%Y-%m-%d %H:%M:%S} to {allocations['end'].max():%Y-%m-%d %H:%M:%S} and period from {period.start:%Y-%m-%d %H:%M:%S} to {period.end:%Y-%m-%d %H:%M:%S}",
)
if period.start.minute % 10 != 0 or period.end.minute % 10 != 0:
raise ValueError(f"period must start and end in minutes multiple of 10. Got {period}")
if points is not None and not isinstance(points.columns, pd.MultiIndex) and points.columns.names != ["object_name", "point"]:
raise ValueError(
f"points must have a pd.MultiIndex with names ['object_name', 'point'], got {type(points.columns)} with names {points.columns.names if isinstance(points.columns, pd.MultiIndex) else None}",
)
if point_names is None:
point_names = self._get_lost_energy_point_names(object_names=object_names)
if missing_objects := set(object_names) - set(point_names.keys()):
raise ValueError(f"The following objects are missing in point_names: {missing_objects}")
if any(set(self._required_calc_obj_type_attrs) - set(point_names[obj].keys()) for obj in object_names):
raise ValueError(f"At least one object does not have the required attributes {list(self._required_calc_obj_type_attrs.keys())}")
# making a copy of the allocations
allocations = allocations.copy()
# changing lostProduction to double just to make sure
allocations["lostProduction"] = allocations["lostProduction"].astype("double[pyarrow]")
if "lostCurtailmentProduction" not in allocations:
allocations["lostCurtailmentProduction"] = 0.0
allocations["lostCurtailmentProduction"] = allocations["lostCurtailmentProduction"].astype("double[pyarrow]")
# removing id from index as it might be duplicated
allocations = allocations.reset_index(drop=False)
# checking if only not available or excluded allocations are present
if not (~allocations["category.available"] | allocations["category.excluded"]).all():
raise ValueError("allocations must have only not available or excluded categories")
# checking and removing if there are overlapping allocations
if len(allocations) > 0:
allocations = self.remove_overlapping(allocations)
# getting all objects in the allocations
objects = allocations["objectKey"].unique().tolist()
if wrong_objects := set(objects) - set(object_names):
raise ValueError(f"The following objects are in the allocations DataFrame but not within object_names: {wrong_objects}")
produced_energy = {}
# making a copy of the points
provided_points = points.copy() if points is not None else None
# checking if the points are provided
requested_points = []
# getting the period that covers all the allocations
alloc_period = DateTimeRange(start=period.start - timedelta(minutes=10), end=period.end + timedelta(minutes=10))
for obj in object_names:
# checking if the points are provided
if provided_points is not None:
# checking if object and point_names are in the points
if obj not in provided_points.columns.get_level_values("object_name"):
raise ValueError(f"The object {obj} is not in the provided points")
if any(point not in provided_points.columns.get_level_values("point") for point in point_names[obj].values()):
raise ValueError("Not all point_names are in the provided points")
# checking if aggregation level is not present
if "aggregation" in provided_points.columns.names:
raise ValueError("The provided points must not have an aggregation level")
# checking if quantity is not present
if "quantity" in provided_points.columns.names:
raise ValueError("The provided points must not have a quantity level")
# checking if the period is in the points
if provided_points.index.min() > alloc_period.start + timedelta(
minutes=10,
) or provided_points.index.max() < alloc_period.end - timedelta(minutes=10):
raise ValueError(f"The provided points do not cover the period {alloc_period}")
else:
# getting the needed points
this_requested_points: pd.DataFrame = self.baze.points.values.series.get(
points={obj: list(point_names[obj].values())},
period=alloc_period,
aggregation="Raw",
round_timestamps={"freq": timedelta(minutes=5), "tolerance": timedelta(seconds=1)},
)
requested_points.append(this_requested_points)
if provided_points is None:
# concatenating the requested points
provided_points = pd.concat(requested_points, axis=1)
# reducing the points to the period
provided_points = provided_points[(provided_points.index > alloc_period.start) & (provided_points.index <= alloc_period.end)].copy()
if ons_points is not None:
ons_points = ons_points[(ons_points.index > alloc_period.start) & (ons_points.index <= alloc_period.end)].copy()
# reducing provided_points to only the points of the wanted objects
provided_points = provided_points.loc[:, pd.IndexSlice[object_names, :]].copy()
# copy of the points before clipping based on ONS Set Point
theoretical_points = [
(obj, point_real_name)
for obj, point_val in point_names.items()
for point_name, point_real_name in point_val.items()
if "theoretical" in point_name.lower() and obj in object_names
]
unclipped_points = provided_points.loc[:, theoretical_points].copy()
# renaming second level to ActivePowerTheoreticalUnclipped
unclipped_points.columns = pd.MultiIndex.from_tuples(
[(obj, "ActivePowerTheoreticalUnclipped") for obj, _ in unclipped_points.columns],
names=["object_name", "point"],
)
# merging
provided_points = provided_points.merge(unclipped_points, how="left", left_index=True, right_index=True)
# adjusting theoretical power based on ONS Set Point
if ons_points is not None:
# reducing ONS points to only the points of the wanted objects
spe_names = list({re.match(r"(\w+-\w+)-", obj).group(1) for obj in object_names})
ons_points = ons_points.loc[:, spe_names].copy()
# TODO here we need to also get the lost energy based on ONS clipping
provided_points = self._clip_theoretical_based_on_ons(
obj_points=provided_points,
ons_points=ons_points,
allocations=allocations,
point_names=point_names,
)
else:
logger.warning("ONS points not provided. Theoretical power will not be clipped based on ONS Set Point")
# iterating objects
for obj in object_names:
# getting the allocations for the object
obj_allocations = allocations[allocations["objectKey"] == obj].copy()
obj_allocations = obj_allocations.sort_values("start")
# getting the timedelta that represents the resolution of the points (extracting X minutes from ActivePower_10min.AVG, or ActivePowerTheoretical_10min.AVG)
n_mins = int(re.search(r"\d+", point_names[obj]["ActivePower"]).group())
# reindexing to n_mins
new_index = pd.date_range(
start=period.start - timedelta(minutes=n_mins),
end=period.end + timedelta(minutes=n_mins),
freq=timedelta(minutes=n_mins),
normalize=True,
)
new_index = new_index[
(new_index >= period.start - timedelta(minutes=n_mins)) & (new_index <= period.end + timedelta(minutes=n_mins))
]
# getting points for the object
points = provided_points.loc[
alloc_period.start : alloc_period.end,
(obj, [*list(point_names[obj].values()), "ActivePowerTheoreticalUnclipped"]),
]
points = points.reindex(new_index, method="nearest", tolerance=timedelta(seconds=1))
points = points.droplevel("object_name", axis=1)
points = points.rename(columns={v: k for k, v in point_names[obj].items()})
points = points.astype(
{
"ActivePowerTheoretical": "double[pyarrow]",
"ActivePower": "double[pyarrow]",
"ActivePowerTheoreticalUnclipped": "double[pyarrow]",
},
)
# calculating lost energy
# clip is used as we don't want to penalize the unavailable periods with energy consumption, which could lead to negative availability
points["LostActivePower"] = (
points["ActivePowerTheoretical"].fillna(0.0) - points["ActivePower"].fillna(0.0).clip(0.0, None)
).clip(0.0, None)
# lost due to curtailment
points["LostActivePowerCurtailment"] = (
points["ActivePowerTheoreticalUnclipped"].fillna(0.0) - points["ActivePowerTheoretical"].fillna(0.0).clip(0.0, None)
).clip(0.0, None)
# checking if there are missing expected_timestamps in the ActivePowerTheoretical column
expected_timestamps = int(alloc_period.duration / timedelta(minutes=n_mins))
if not np.isclose(points["ActivePowerTheoretical"].notna().sum(), expected_timestamps, atol=1):
missing_timestamps = expected_timestamps - points["ActivePowerTheoretical"].notna().sum()
logger.warning(
f"Missing timestamps in {point_names[obj]['ActivePowerTheoretical']} of {obj} in {alloc_period}: {missing_timestamps} timestamps considering timestamp resolution of {n_mins} minutes",
)
lost_energy_result = _calc_lost_energy(
alloc_starts=obj_allocations["start"].astype("datetime64[s]").to_numpy(np.datetime64),
alloc_ends=obj_allocations["end"].astype("datetime64[s]").to_numpy(np.datetime64),
lost_series_time=points.index.astype("datetime64[s]").to_numpy(np.datetime64),
lost_series_values=points["LostActivePower"].to_numpy(np.float64),
lost_curtailment_series_values=points["LostActivePowerCurtailment"].to_numpy(np.float64),
timestamp_length=np.timedelta64(int(timedelta(minutes=n_mins).total_seconds()), "s"),
)
for name, col in [("lostProduction", 0), ("lostCurtailmentProduction", 1)]:
obj_allocations[name] = lost_energy_result[:, col]
obj_allocations[name] = obj_allocations[name].astype("double[pyarrow]")
# adding values back to the allocations
allocations.loc[obj_allocations.index, name] = obj_allocations[name]
# calculating total produced energy
# removing not wanted timestamps
points = points[(points.index > period.start) & (points.index <= period.end)]
# sum of the produced energy in the period
produced_energy[obj] = points["ActivePower"].clip(lower=0).sum() * (timedelta(minutes=n_mins) / timedelta(minutes=60))
# checking if sum of produced and lost is equal or lower than 0
if produced_energy[obj] + obj_allocations["lostProduction"].sum() <= 0.0:
logger.warning(
f"Total energy (produced + lost) for {obj} in {period} is equal or lower than 0. Produced={produced_energy[obj]:.2f} kWh, Lost={obj_allocations['lostProduction'].sum():.2f} kWh",
)
# returning id as index
allocations = allocations.set_index("id")
logger.debug(f"Calculated lost energy for {len(object_names)} objects in {time.perf_counter() - t0:.3f} s")
return allocations, produced_energy
delete(allocation_ids)
¶
Deletes the given allocations. This uses Bazefield Operations > Undo API to delete multiple allocations at once.
Please be sure this is what you want to do, as in most cases it is better to insert a Full Performance allocation in the period instead of deleting the allocations.
Parameters:
-
(allocation_ids¶list[int]) –List of allocation ids to delete.
Source code in echo_baze/allocation_history.py
@validate_call
def delete(self, allocation_ids: list[int]) -> None:
"""Deletes the given allocations. This uses Bazefield Operations > Undo API to delete multiple allocations at once.
Please be sure this is what you want to do, as in most cases it is better to insert a Full Performance allocation in the period instead of deleting the allocations.
Parameters
----------
allocation_ids : list[int]
List of allocation ids to delete.
"""
t0 = time.perf_counter()
# getting wanted allocations to check if they exist and if they are deletable
wanted_allocations = self.get_by_ids(allocation_ids=allocation_ids, output_type="DataFrame")
if missing_allocations := set(allocation_ids) - set(
wanted_allocations.index,
):
raise ValueError(f"The following allocation ids do not exist: {missing_allocations}")
# checking if all allocations are deletable (manual allocations)
if not wanted_allocations["manual"].all():
not_deletable_allocations = set(allocation_ids) - set(wanted_allocations[wanted_allocations["manual"]].index)
raise ValueError(f"The following allocation ids are not deletable (they are system allocations): {not_deletable_allocations}")
# URL encoded message
message = quote(
f"Deleted by {self.baze.users.get_identity()['userName']} using echo-baze @ {datetime.now(UTC):%Y-%m-%d %H:%M:%S} UTC",
)
endpoint = f"allocations?ids={','.join(str(alloc_id) for alloc_id in allocation_ids)}&changeMessage={message}"
result = self.baze.conn.delete(endpoint)
self._handle_http_errors(result)
logger.debug(f"Deleted {len(allocation_ids)} allocations in {time.perf_counter() - t0:.3f} s")
export_file(file_path, period, object_names, allocation_type=None, allocation_categories=None, include_available=False, include_excluded=False, include_overridden=False, include_full_performance=False, min_duration=None, truncate=False, time_zone='local', request_interval=timedelta(days=62), max_objects_per_request=100)
¶
Exports the allocations to a file.
Parameters:
-
(file_path¶Path) –Full file path to save the allocations. It must include the file extension. Currently supported file formats are .json and .yaml
-
(period¶DateTimeRange) –Period of the allocations.
-
(object_names¶list[str]) –Name of the desired objects.
-
(allocation_type¶str | None, default:None) –Name of the allocation type, by default will get the default allocation type for the wanted object types.
-
(allocation_categories¶list[str] | None, default:None) –List of allocation categories to get, by default None
-
(include_available¶bool, default:False) –If set to True, available allocations will be included, by default False
-
(include_excluded¶bool, default:False) –If set to True, excluded allocations will be included, by default False
-
(include_overridden¶bool, default:False) –If set to True, overridden allocations will be included, by default False
-
(include_full_performance¶bool, default:False) –If set to True, full performance allocations will be included, by default False
-
(min_duration¶timedelta | None, default:None) –Minimum duration of the allocations to be included, by default None
-
(truncate¶bool, default:False) –If set to True, the allocations will be truncated to the period, by default False
-
(time_zone¶TimeZone, default:'local') –Time zone to consider for the period, by default "local"
-
(request_interval¶timedelta, default:timedelta(days=62)) –Interval to request the allocations, by default timedelta(days=62)
-
(max_objects_per_request¶int, default:100) –Maximum number of objects to request in each call, by default 100
Source code in echo_baze/allocation_history.py
@validate_call
def export_file(
self,
file_path: Path,
period: DateTimeRange,
object_names: list[str],
allocation_type: str | None = None,
allocation_categories: list[str] | None = None,
include_available: bool = False,
include_excluded: bool = False,
include_overridden: bool = False,
include_full_performance: bool = False,
min_duration: timedelta | None = None,
truncate: bool = False,
time_zone: TimeZone = "local",
request_interval: timedelta = timedelta(days=62),
max_objects_per_request: int = 100,
) -> None:
"""Exports the allocations to a file.
Parameters
----------
file_path : Path
Full file path to save the allocations. It must include the file extension.
Currently supported file formats are .json and .yaml
period : DateTimeRange
Period of the allocations.
object_names : list[str]
Name of the desired objects.
allocation_type : str | None, optional
Name of the allocation type, by default will get the default allocation type for the wanted object types.
allocation_categories : list[str] | None, optional
List of allocation categories to get, by default None
include_available : bool, optional
If set to True, available allocations will be included, by default False
include_excluded : bool, optional
If set to True, excluded allocations will be included, by default False
include_overridden : bool, optional
If set to True, overridden allocations will be included, by default False
include_full_performance : bool, optional
If set to True, full performance allocations will be included, by default False
min_duration : timedelta | None, optional
Minimum duration of the allocations to be included, by default None
truncate : bool, optional
If set to True, the allocations will be truncated to the period, by default False
time_zone : TimeZone, optional
Time zone to consider for the period, by default "local"
request_interval : timedelta, optional
Interval to request the allocations, by default timedelta(days=62)
max_objects_per_request : int, optional
Maximum number of objects to request in each call, by default 100
"""
allowed_extensions = {".json", ".yaml"}
if file_path.suffix not in allowed_extensions:
raise ValueError(f"File extension '{file_path.suffix}' not allowed. Allowed extensions are: {allowed_extensions}")
# getting the allocations
allocations: dict[int, dict[str, Any]] = self.get(
period=period,
object_names=object_names,
allocation_type=allocation_type,
allocation_categories=allocation_categories,
include_available=include_available,
include_excluded=include_excluded,
include_overridden=include_overridden,
include_full_performance=include_full_performance,
min_duration=min_duration,
truncate=truncate,
time_zone=time_zone,
request_interval=request_interval,
max_objects_per_request=max_objects_per_request,
output_type="dict",
)
# creating folder if it does not exist
file_path.parent.mkdir(parents=True, exist_ok=True)
# saving dictionary to file
match file_path.suffix[1:]:
case "json":
with file_path.open(mode="w", encoding="utf-8") as file:
json.dump(allocations, file, indent=4, sort_keys=True, default=str)
case "yaml":
with file_path.open(mode="w", encoding="utf-8") as file:
yaml.dump(allocations, file, indent=4)
logger.info(f"Allocations exported to {file_path}")
get(period, object_names, allocation_type=None, allocation_categories=None, include_available=False, include_excluded=False, include_overridden=False, include_full_performance=False, min_duration=None, truncate=False, time_zone='local', output_type='DataFrame', request_interval=timedelta(days=62), max_objects_per_request=100, request_object_groups='types')
¶
Gets the allocation history for the given parameters.
The most important keys/columns of the output are:
- id
- objectId
- turbineName
- start
- end
- duration
- manual
- overridden
- label
- category.name
- categories : list[dict[str, Any]] - List of categories, useful when there are multiple categories in the same allocation (children and parent for example)
- lastComment.createdBy
- lastComment.commentText
Parameters:
-
(period¶DateTimeRange) –Period of time to get the data from. It is assumed to be in the same time zone as the one specified in time_zone.
-
(object_names¶list[str]) –Names of the objects to get the allocation history from.
-
(allocation_type¶str | None = None, default:None) –Type of allocation to get, by default will get the default allocation type for the wanted object types.
-
(allocation_categories¶list[str] | None, default:None) –Which categories to get. If set to None will get all. By default None
-
(include_available¶bool, default:False) –If set to True will also get allocations with categories that are "available", by default False
-
(include_excluded¶bool, default:False) –If set to True will also get allocations with categories that are "excluded", by default False
-
(include_overridden¶bool, default:False) –If set to True will also get allocations that are overridden, by default False
-
(include_full_performance¶bool, default:False) –If set to True will also get allocation with "Full Performance" category, by default False
-
(min_duration¶timedelta | None, default:None) –Minimum duration to get allocations, by default None
-
(truncate¶bool, default:False) –If set to True, will truncate all allocations that are not fully contained in the period, by default False
-
(time_zone¶TimeZone, default:'local') –In which time zone we assume the inputs are. Also used to define in which time zone the output is. There are three options:
- If "UTC" is used, we assume time already is in UTC.
- If local is used, the default time zone defined in echo_baze will be used.
- If an int, must be between -12 and +12
By default "local"
-
(output_type¶Literal['dict', 'DataFrame'], default:'DataFrame') –Output type of the data. Can be one of ["dict", "DataFrame"] By default "DataFrame"
-
(request_interval¶timedelta, default:timedelta(days=62)) –To avoid doing large requests, the data will be requested from Bazefield in batches considering this interval. This is set by default to more than two months as tests showed that request time does not increase significantly with large request intervals. By default timedelta(days=62)
-
(max_objects_per_request¶int, default:100) –Maximum number of objects to request in each batch. Default was set to a higher value as large requests are not significantly slower than smaller ones. By default 100
-
(request_object_groups¶Literal['types'] | None, default:'types') –How should we group the objects to request them. Options are:
- "types": will group objects of the same type together, so each request will only contain objects of the same type. This is useful as allocation types are defined per object type, so we avoid doing multiple requests for the same object type.
- None: If you forced a specific allocation_type, then all objects can be grouped together as they will all use the same allocation type. In this case, setting this to None will make the requests faster as more objects will be requested in each request.
This value is already forced to None if allocation_type is set.
Returns:
-
dict[int, dict[str, Any]]–In case output_type == "dict" it will return a dict with the following format: {allocation_id: {allocation_attribute: value, ...}, ...}
-
DataFrame–In case output_type == "DataFrame" it will return a DataFrame with the following format: index = "allocation_id", columns = [attribute, ...]
Source code in echo_baze/allocation_history.py
@validate_call
def get(
self,
period: DateTimeRange,
object_names: list[str],
allocation_type: str | None = None,
allocation_categories: list[str] | None = None,
include_available: bool = False,
include_excluded: bool = False,
include_overridden: bool = False,
include_full_performance: bool = False,
min_duration: timedelta | None = None,
truncate: bool = False,
time_zone: TimeZone = "local",
output_type: Literal["dict", "DataFrame"] = "DataFrame",
request_interval: timedelta = timedelta(days=62),
max_objects_per_request: int = 100,
request_object_groups: Literal["types"] | None = "types",
) -> dict[int, dict[str, Any]] | pd.DataFrame:
"""Gets the allocation history for the given parameters.
The most important keys/columns of the output are:
- id
- objectId
- turbineName
- start
- end
- duration
- manual
- overridden
- label
- category.name
- categories : list[dict[str, Any]] - List of categories, useful when there are multiple categories in the same allocation (children and parent for example)
- lastComment.createdBy
- lastComment.commentText
Parameters
----------
period : DateTimeRange
Period of time to get the data from. It is assumed to be in the same time zone as the one specified in time_zone.
object_names : list[str]
Names of the objects to get the allocation history from.
allocation_type : str | None = None
Type of allocation to get, by default will get the default allocation type for the wanted object types.
allocation_categories : list[str] | None, optional
Which categories to get. If set to None will get all. By default None
include_available : bool, optional
If set to True will also get allocations with categories that are "available", by default False
include_excluded : bool, optional
If set to True will also get allocations with categories that are "excluded", by default False
include_overridden : bool, optional
If set to True will also get allocations that are overridden, by default False
include_full_performance : bool, optional
If set to True will also get allocation with "Full Performance" category, by default False
min_duration : timedelta | None, optional
Minimum duration to get allocations, by default None
truncate : bool, optional
If set to True, will truncate all allocations that are not fully contained in the period, by default False
time_zone : TimeZone, optional
In which time zone we assume the inputs are. Also used to define in which time zone the output is. There are three options:
- If "UTC" is used, we assume time already is in UTC.
- If local is used, the default time zone defined in echo_baze will be used.
- If an int, must be between -12 and +12
By default "local"
output_type : Literal["dict", "DataFrame"], optional
Output type of the data. Can be one of ["dict", "DataFrame"]
By default "DataFrame"
request_interval : timedelta, optional
To avoid doing large requests, the data will be requested from Bazefield in batches considering this interval.
This is set by default to more than two months as tests showed that request time does not increase significantly with large request intervals.
By default timedelta(days=62)
max_objects_per_request : int, optional
Maximum number of objects to request in each batch.
Default was set to a higher value as large requests are not significantly slower than smaller ones.
By default 100
request_object_groups : Literal["types"] | None, optional
How should we group the objects to request them. Options are:
- "types": will group objects of the same type together, so each request will only contain objects of the same type. This is useful as allocation types are defined per object type, so we avoid doing multiple requests for the same object type.
- None: If you forced a specific allocation_type, then all objects can be grouped together as they will all use the same allocation type. In this case, setting this to None will make the requests faster as more objects will be requested in each request.
This value is already forced to None if allocation_type is set.
Returns
-------
dict[int, dict[str, Any]]
In case output_type == "dict" it will return a dict with the following format: {allocation_id: {allocation_attribute: value, ...}, ...}
DataFrame
In case output_type == "DataFrame" it will return a DataFrame with the following format: index = "allocation_id", columns = [attribute, ...]
"""
t0 = time.perf_counter()
# checking input
if truncate is not None and not isinstance(truncate, bool):
raise TypeError(f"truncate must be a bool, got: {type(truncate)}")
if not (isinstance(output_type, str) and output_type in ["dict", "DataFrame"]):
raise ValueError(f"Output type must be one of ['dict', 'DataFrame'], got {output_type}")
# getting inputs
_, object_ids, object_types, allocation_types, allocation_category_ids, subperiods = self._check_get_inputs(
period=period,
object_names=object_names,
allocation_type=allocation_type,
allocation_categories=allocation_categories,
request_interval=request_interval,
)
object_id_to_name = {object_id: object_name for object_name, object_id in object_ids.items()}
results = {}
# forcing request_object_groups to be None if allocation_type is set
if allocation_type is not None:
request_object_groups = None
logger.debug("Forcing request_object_groups to be None as allocation_type is set")
if request_object_groups == "types":
# defining groups of objects to request (will be a list of dicts)
# first separating objects of each type (objs_per_type will be a dict like {object_type_name: [object_name,...], ...})
objs_per_type = {}
for obj_name, obj_type in object_types.items():
if obj_type not in objs_per_type:
objs_per_type[obj_type] = []
objs_per_type[obj_type].append(obj_name)
# object_groups will be a list of dicts in the format [{object_name: object_id, ...}, ...] where each dict will have at most max_objects_per_request objects and only one object type
object_groups = []
for obj_type in objs_per_type: # noqa: PLC0206
type_object_groups = [
objs_per_type[obj_type][i : i + max_objects_per_request]
for i in range(0, len(objs_per_type[obj_type]), max_objects_per_request)
]
object_groups += type_object_groups
# converting to dict
object_groups = [{name: object_ids[name] for name in group} for group in object_groups]
elif request_object_groups is None:
object_groups = [
{name: object_ids[name] for name in object_names[i : i + max_objects_per_request]}
for i in range(0, len(object_names), max_objects_per_request)
]
else:
raise ValueError(f"request_object_groups must be one of ['types', None], got {request_object_groups}")
logger.debug(f"Got required data before getting allocations in {time.perf_counter() - t0:.3f} s")
# iterating subperiods
for subperiod in subperiods:
# iterating object groups
for object_group in object_groups:
t1 = time.perf_counter()
wanted_alloc_type = allocation_types[next(iter(object_group.keys()))]
# linked endpoint could be used, but currently for some specific arguments it performs a lot slower than the one used (same as the old get method)
endpoint = f"allocations/{wanted_alloc_type['id']}/from/{timestamp_from_datetime(dt=subperiod.start, time_zone=time_zone, unit='milliseconds')}/to/{timestamp_from_datetime(dt=subperiod.end, time_zone=time_zone, unit='milliseconds')}"
# defining the payload
payload = {
"objectIds": ",".join(object_group.values()),
"skip": 0,
"orderBy": "Start desc",
"includeOverridden": include_overridden,
"categoryIds": list(allocation_category_ids[wanted_alloc_type["name"]].values()),
"includeAvailable": include_available,
"includeSystemAllocs": include_full_performance,
"includeExcluded": include_excluded,
"inObjectTime": False,
"minDuration": int(min_duration.total_seconds()) if min_duration is not None else 0,
"includeAssetAvailability": False,
"includeDataLoss": False,
"includeFilteredLoss": False,
}
logger.debug(
f"Getting allocations from {subperiod.start:%Y-%m-%d %H:%M:%S} to {subperiod.end:%Y-%m-%d %H:%M:%S} for {list(object_group)} objects",
)
# getting the data
request_result = self.baze.conn.get(endpoint, json=payload)
self._handle_http_errors(request_result)
result: list[dict[str, Any]] = request_result.json()["turbineAllocationsList"]
final_result = []
for object_allocs in result:
final_result += object_allocs["allocations"]
result = final_result
# converting to desired format
final_result = self._adjust_request_result(
allocation_types,
result,
object_id_to_name,
period=period,
truncate=truncate,
time_zone=time_zone,
include_available=include_available,
include_excluded=include_excluded,
include_overridden=include_overridden,
include_full_performance=include_full_performance,
)
logger.info(
f"Got {len(result)} allocations in {time.perf_counter() - t1:.3f} s. Objects={len(object_group)} and period {subperiod}",
)
# adding to the results
results = results | final_result
# converting to desired output
match output_type:
case "dict":
pass
case "DataFrame":
results = self._convert_to_dataframe(results)
case _:
raise ValueError(f"output_type must be one of ['dict', 'DataFrame'], got '{output_type}'")
logger.info(
f"Got {len(results)} allocations in {output_type} format in {time.perf_counter() - t0:.3f} s. Objects={len(object_names)} and period {period}",
)
return results
get_by_ids(allocation_ids, time_zone='local', output_type='DataFrame')
¶
Gets the allocations with the given ids.
Parameters:
-
(allocation_ids¶list[int]) –List of allocation ids to get.
-
(time_zone¶TimeZone, default:'local') –Used to define in which time zone the output is. If local is used, the default time zone defined in echo_baze will be used. If an int, must be between -12 and +12 By default "local"
-
(output_type¶Literal['dict', 'DataFrame'], default:'DataFrame') –Output type of the data. Can be one of ["dict", "DataFrame"] By default "DataFrame"
Returns:
-
dict[int, dict[str, Any]]–In case output_type == "dict" it will return a dict with the following format: {allocation_id: {allocation_attribute: value, ...}, ...}
-
DataFrame–In case output_type == "DataFrame" it will return a DataFrame with the following format: index = "allocation_id", columns = [attribute, ...]
Source code in echo_baze/allocation_history.py
@validate_call
def get_by_ids(
self,
allocation_ids: list[int],
time_zone: TimeZone = "local",
output_type: Literal["dict", "DataFrame"] = "DataFrame",
) -> dict[int, dict[str, Any]] | pd.DataFrame:
"""Gets the allocations with the given ids.
Parameters
----------
allocation_ids : list[int]
List of allocation ids to get.
time_zone : TimeZone, optional
Used to define in which time zone the output is.
If local is used, the default time zone defined in echo_baze will be used.
If an int, must be between -12 and +12
By default "local"
output_type : Literal["dict", "DataFrame"], optional
Output type of the data. Can be one of ["dict", "DataFrame"]
By default "DataFrame"
Returns
-------
dict[int, dict[str, Any]]
In case output_type == "dict" it will return a dict with the following format: {allocation_id: {allocation_attribute: value, ...}, ...}
DataFrame
In case output_type == "DataFrame" it will return a DataFrame with the following format: index = "allocation_id", columns = [attribute, ...]
"""
t0 = time.perf_counter()
endpoint = f"allocations/ids/{','.join(str(object=alloc_id) for alloc_id in allocation_ids)}"
# getting the data
request_result = self.baze.conn.get(endpoint)
self._handle_http_errors(request_result)
result: list[dict[str, Any]] = request_result.json()
# getting object ids
object_ids = self.baze.objects.instances.get_ids()
object_ids = {object_id: object_name for object_name, object_id in object_ids.items()}
# converting to desired format
results = self._adjust_request_result(None, result, object_ids, time_zone=time_zone)
# converting to desired output
match output_type:
case "dict":
results = OrderedDict((alloc_id, results[alloc_id]) for alloc_id in allocation_ids if alloc_id in results)
# pass
case "DataFrame":
results = self._convert_to_dataframe(results)
results = results.loc[allocation_ids]
case _:
raise ValueError(f"output_type must be one of ['dict', 'DataFrame'], got '{output_type}'")
logger.debug(f"Got {len(results)} allocations in {output_type} format in {time.perf_counter() - t0:.3f} s")
return results
get_ids(period, object_names, allocation_type=None, allocation_categories=None, include_available=False, include_excluded=False, include_overridden=False, include_full_performance=False, min_duration=None, time_zone='local', request_interval=timedelta(weeks=4))
¶
Gets the allocation ids for the given parameters.
Parameters:
-
(period¶DateTimeRange) –Period of time to get the data from. It is assumed to be in the same time zone as the one specified in time_zone.
-
(object_names¶list[str]) –Names of the objects to get the allocation history from.
-
(allocation_type¶str | None, default:None) –Type of allocation to get, by default will get the default allocation type for the wanted object types.
-
(allocation_categories¶list[str] | None, default:None) –Which categories to get. If set to None will get all. By default None
-
(include_available¶bool, default:False) –If set to True will also get allocations with categories that are "available", by default False
-
(include_excluded¶bool, default:False) –If set to True will also get allocations with categories that are "excluded", by default False
-
(include_overridden¶bool, default:False) –If set to True will also get allocations that are overridden, by default False
-
(include_full_performance¶bool, default:False) –If set to True will also get allocation with "Full Performance" category, by default False
-
(min_duration¶timedelta | None, default:None) –Minimum duration to get allocations, by default None
-
(time_zone¶TimeZone, default:'local') –In which time zone we assume the inputs are. Also used to define in which time zone the output is. There are three options:
- If "UTC" is used, we assume time already is in UTC.
- If local is used, the default time zone defined in echo_baze will be used.
- If an int, must be between -12 and +12
By default "local"
-
(request_interval¶timedelta, default:timedelta(weeks=4)) –To avoid doing large requests, the data will be requested from Bazefield in batches considering this interval. By default timedelta(weeks=4)
Returns:
-
dict[str, list[int]]–Dict with the allocation ids in the format {object_name: [allocation_id, ...], ...} object_name keys will always be present, even if no allocations were found (in which case the list will be empty)
Source code in echo_baze/allocation_history.py
@validate_call
def get_ids(
self,
period: DateTimeRange,
object_names: list[str],
allocation_type: str | None = None,
allocation_categories: list[str] | None = None,
include_available: bool = False,
include_excluded: bool = False,
include_overridden: bool = False,
include_full_performance: bool = False,
min_duration: timedelta | None = None,
time_zone: TimeZone = "local",
request_interval: timedelta = timedelta(weeks=4),
) -> dict[str, list[int]]:
"""Gets the allocation ids for the given parameters.
Parameters
----------
period : DateTimeRange
Period of time to get the data from. It is assumed to be in the same time zone as the one specified in time_zone.
object_names : list[str]
Names of the objects to get the allocation history from.
allocation_type : str | None, optional
Type of allocation to get, by default will get the default allocation type for the wanted object types.
allocation_categories : list[str] | None, optional
Which categories to get. If set to None will get all. By default None
include_available : bool, optional
If set to True will also get allocations with categories that are "available", by default False
include_excluded : bool, optional
If set to True will also get allocations with categories that are "excluded", by default False
include_overridden : bool, optional
If set to True will also get allocations that are overridden, by default False
include_full_performance : bool, optional
If set to True will also get allocation with "Full Performance" category, by default False
min_duration : timedelta | None, optional
Minimum duration to get allocations, by default None
time_zone : TimeZone, optional
In which time zone we assume the inputs are. Also used to define in which time zone the output is. There are three options:
- If "UTC" is used, we assume time already is in UTC.
- If local is used, the default time zone defined in echo_baze will be used.
- If an int, must be between -12 and +12
By default "local"
request_interval : timedelta, optional
To avoid doing large requests, the data will be requested from Bazefield in batches considering this interval. By default timedelta(weeks=4)
Returns
-------
dict[str, list[int]]
Dict with the allocation ids in the format {object_name: [allocation_id, ...], ...}
object_name keys will always be present, even if no allocations were found (in which case the list will be empty)
"""
t0 = time.perf_counter()
# getting inputs
_, object_ids, _, allocation_types, allocation_category_ids, subperiods = self._check_get_inputs(
period=period,
object_names=object_names,
allocation_type=allocation_type,
allocation_categories=allocation_categories,
request_interval=request_interval,
)
results = {}
endpoint = "availability/linked"
# iterating objects
for object_name in object_ids:
results[object_name] = []
# iterating subperiods
for subperiod in subperiods:
# defining the payload
payload = {
"onlyReturnIds": True, # this is the only difference with the get method
"objectIds": object_ids[object_name],
# "turbineIds": str(asset_ids[object_name]), # noqa
"search": "",
"skip": 0,
"orderBy": "Start desc",
"includeOverridden": "true" if include_overridden else "false",
"suspect": None,
"exportStatus": "",
"sourceIds": [],
"categoryIds": list(allocation_category_ids.values()),
"includeAvailable": "true" if include_available else "false",
"includeSystemAllocs": "true" if include_full_performance else "false",
"includeExcluded": "true" if include_excluded else "false",
"inObjectTime": False,
"type": allocation_types[object_name]["id"],
"from": timestamp_from_datetime(dt=subperiod.start, time_zone=time_zone, unit="milliseconds"),
"to": timestamp_from_datetime(dt=subperiod.end, time_zone=time_zone, unit="milliseconds"),
"minDuration": str(int(min_duration.total_seconds())) if min_duration is not None else "",
}
# getting the data
request_result = self.baze.conn.post(endpoint, json=payload)
self._handle_http_errors(request_result)
result: list[str] = request_result.json()["allocationIds"]
# adding to the results
results[object_name] += result
# removing duplicates
results[object_name] = list(set(results[object_name]))
logger.debug(f"Got {len(results)} allocations in {time.perf_counter() - t0:.3f} s")
return results
import_file(file_path)
¶
Imports allocations from a file.
If the file is an excel file, allocations must be in a sheet called "Allocations" and it must contain the following columns:
- ObjectKey
- Start
- End
- AllocationType
- AllocationCategory
- Labels (as comma separated string, without spaces)
- Comment
If a .json or .yaml file is provided, it must be the output of the export_file method.
Allocations will be inserted one by one using the insert method.
Parameters:
-
(file_path¶Path) –Path to the file to import. Currently supported file formats are .xlsx, .json and .yaml
Source code in echo_baze/allocation_history.py
@validate_call
def import_file(self, file_path: Path) -> None:
"""Imports allocations from a file.
If the file is an excel file, allocations must be in a sheet called "Allocations" and it must contain the following columns:
- ObjectKey
- Start
- End
- AllocationType
- AllocationCategory
- Labels (as comma separated string, without spaces)
- Comment
If a .json or .yaml file is provided, it must be the output of the export_file method.
Allocations will be inserted one by one using the insert method.
Parameters
----------
file_path : Path
Path to the file to import.
Currently supported file formats are .xlsx, .json and .yaml
"""
# checking if file exists
if not file_path.is_file():
raise FileNotFoundError(f"File not found: {file_path}")
allowed_extensions = {".xlsx", ".json", ".yaml"}
if file_path.suffix not in allowed_extensions:
raise ValueError(f"File extension '{file_path.suffix}' not allowed. Allowed extensions are: {allowed_extensions}")
col_dtypes = {
"ObjectKey": "string[pyarrow]",
"Start": "datetime64[s]",
"End": "datetime64[s]",
"AllocationType": "string[pyarrow]",
"AllocationCategory": "string[pyarrow]",
"Labels": "string[pyarrow]",
"Comment": "string[pyarrow]",
}
alloc_dict = None
# reading the file
match file_path.suffix[1:]:
case "xlsx":
# checking if "Allocation" sheet exists
if "Allocations" not in pd.ExcelFile(file_path).sheet_names:
raise ValueError(f"Sheet 'Allocations' not found in {file_path}")
# reading the sheet
df = pd.read_excel(file_path, sheet_name="Allocations")
case "json":
with file_path.open(mode="r", encoding="utf-8") as file:
alloc_dict = json.load(file)
case "yaml":
with file_path.open(mode="r", encoding="utf-8") as file:
alloc_dict = yaml.load(file, Loader=yaml.Loader) # noqa
case _:
raise ValueError(f"File extension '{file_path.suffix}' not allowed")
if alloc_dict is not None:
# converting to DataFrame
alloc_dict = [
{
"ObjectKey": alloc["objectKey"],
"Start": alloc["start"]
if isinstance(alloc["start"], datetime)
else datetime.strptime(alloc["start"], "%Y-%m-%d %H:%M:%S"),
"End": alloc["end"] if isinstance(alloc["end"], datetime) else datetime.strptime(alloc["end"], "%Y-%m-%d %H:%M:%S"),
"AllocationType": alloc["category"]["allocationType"]["name"],
"AllocationCategory": alloc["category"]["name"],
"Labels": ",".join(label["name"] for label in alloc["labels"]) if "labels" in alloc else None,
"Comment": alloc["lastComment"]["commentText"] if "lastComment" in alloc else None,
}
for alloc in alloc_dict.values()
]
df = pd.DataFrame(alloc_dict)
if missing_cols := set(col_dtypes) - set(df.columns):
raise ValueError(f"The following columns are missing in the file: {missing_cols}")
# converting to the expected dtypes
for col in ["Start", "End"]:
df[col] = df[col].dt.round("s")
df = df.astype(col_dtypes, errors="raise")
# splitting labels
df["Labels"] = df["Labels"].str.split(",")
# converting to dict
allocations = df.to_dict(orient="records")
# uploading the allocations
for alloc in allocations:
try:
self.insert(
object_name=alloc["ObjectKey"],
period=DateTimeRange(start=alloc["Start"], end=alloc["End"]),
allocation_category=alloc["AllocationCategory"],
allocation_type=alloc["AllocationType"],
labels=alloc["Labels"],
comment=alloc["Comment"],
)
except Exception:
warnings.warn(f"Failed to insert allocation {alloc}. Check log for more details.", stacklevel=2)
logger.exception(f"Failed to insert allocation {alloc}")
insert(object_name, period, allocation_category, allocation_type=None, other_allocation_categories=None, labels=None, comment=None, time_zone='local')
¶
Inserts a new allocation with the given parameters.
This uses the new Bazefield interface that previews the changes before applying them.
Parameters:
-
(object_name¶str) –Name of the object to insert the allocation to.
-
(period¶DateTimeRange) –Period of the allocation. It is assumed to be in the same time zone as the one specified in time_zone.
-
(allocation_category¶str) –Category of the allocation.
-
(other_allocation_categories¶dict[str, str] | None, default:None) –Other categories to add to the allocation. This is applicable in case there are multiple categories in the same allocation (children and parent for example).
If not specified, the default child category will be used. By default None
The format is {allocation_type: allocation_category, ...} where allocation_type is the type of the category and allocation_category is the name of the category.
-
(allocation_type¶str | None = None, default:None) –Type of allocation to get, by default will get the default allocation type for the wanted object types.
-
(labels¶list[str] | None, default:None) –Labels to add to the allocation. If they are specified, the resulting allocation will be updated after it was inserted to add the labels. By default None
-
(comment¶str | None, default:None) –Comment to add to the allocation. If it is specified, the resulting allocation will be updated after it was inserted to add the comment.
-
(time_zone¶TimeZone, default:'local') –In which time zone we assume the inputs are. Also used to define in which time zone the output is. There are three options:
- If "UTC" is used, we assume time already is in UTC.
- If local is used, the default time zone defined in echo_baze will be used.
- If an int, must be between -12 and +12
By default "local"
Returns:
-
int–Id of the inserted allocation
Source code in echo_baze/allocation_history.py
@validate_call
def insert(
self,
object_name: str,
period: DateTimeRange,
allocation_category: str,
allocation_type: str | None = None,
other_allocation_categories: dict[str, str] | None = None,
labels: list[str] | None = None,
comment: str | None = None,
time_zone: TimeZone = "local",
) -> int:
"""Inserts a new allocation with the given parameters.
This uses the new Bazefield interface that previews the changes before applying them.
Parameters
----------
object_name : str
Name of the object to insert the allocation to.
period : DateTimeRange
Period of the allocation. It is assumed to be in the same time zone as the one specified in time_zone.
allocation_category : str
Category of the allocation.
other_allocation_categories : dict[str, str] | None, optional
Other categories to add to the allocation. This is applicable in case there are multiple categories in the same allocation (children and parent for example).
If not specified, the default child category will be used. By default None
The format is {allocation_type: allocation_category, ...} where allocation_type is the type of the category and allocation_category is the name of the category.
allocation_type : str | None = None
Type of allocation to get, by default will get the default allocation type for the wanted object types.
labels : list[str] | None, optional
Labels to add to the allocation. If they are specified, the resulting allocation will be updated after it was inserted to add the labels.
By default None
comment : str | None, optional
Comment to add to the allocation. If it is specified, the resulting allocation will be updated after it was inserted to add the comment.
time_zone : TimeZone, optional
In which time zone we assume the inputs are. Also used to define in which time zone the output is. There are three options:
- If "UTC" is used, we assume time already is in UTC.
- If local is used, the default time zone defined in echo_baze will be used.
- If an int, must be between -12 and +12
By default "local"
Returns
-------
int
Id of the inserted allocation
"""
t0 = time.perf_counter()
# getting object id
object_ids = self.baze.objects.instances.get_ids()
if object_name not in object_ids:
raise ValueError(f"Wrong object name: {object_name}")
object_id = object_ids[object_name]
# getting allocation types for each object
if allocation_type is None:
allocation_type = self.baze.allocations.types.get_default(object_names=[object_name])[object_name][0]
# getting allocation type id
allocation_type_ids = self.baze.allocations.types.get_ids()
if allocation_type not in allocation_type_ids:
raise ValueError(f"Wrong allocation type: {allocation_type}")
# getting allocation category id
allocation_category_def = self.baze.allocations.categories.get(allocation_types=list(allocation_type_ids.keys()))
allocation_category_ids = {
alloc_type: {alloc_cat: alloc_cat_vals["id"] for alloc_cat, alloc_cat_vals in alloc_type_vals.items()}
for alloc_type, alloc_type_vals in allocation_category_def.items()
}
if allocation_category not in allocation_category_ids[allocation_type]:
raise ValueError(f"Wrong allocation category: {allocation_category}")
allocation_category_id = allocation_category_ids[allocation_type][allocation_category]
# processing other categories
# first we need to get the default child category for the allocation type
default_children_categories = self.baze.allocations.categories.get_default_children(allocation_types=[allocation_type])
other_category_ids = []
if (
allocation_type in default_children_categories
and allocation_category in default_children_categories[allocation_type]
and default_children_categories[allocation_type][allocation_category]
):
children_types = default_children_categories[allocation_type][allocation_category]
else:
children_types = None
if children_types:
for children_type, children_default_category in children_types.items():
if other_allocation_categories is not None and children_type in other_allocation_categories:
allocation_child_category = other_allocation_categories[children_type]
else:
allocation_child_category = children_default_category["name"]
# getting the allocation category id for the child
if (
allocation_child_category
not in allocation_category_def[allocation_type][allocation_category]["childrenCategoryNames"][children_type]
):
raise ValueError(
f"Wrong allocation child category: '{allocation_child_category}' for '{children_type}' of parent category '{allocation_category}'",
)
allocation_child_category_id = allocation_category_ids[children_type][allocation_child_category]
other_category_ids.append(allocation_child_category_id)
# putting allocation_category_id as the first element
alloc_categories = [allocation_category_id] + [x for x in list(set(other_category_ids)) if x != allocation_category_id]
# metadata
user_name = self.baze.users.get_identity()["userName"]
message = f"Created by {user_name} using echo-baze @ {datetime.now(UTC):%Y-%m-%d %H:%M:%S} UTC"
# defining the payload
payload = {
"categoryId": allocation_category_id,
"linkedCategoryIds": alloc_categories,
"objectId": object_id,
"start": convert_time_zone(period.start, time_zone, "UTC").strftime("%Y-%m-%dT%H:%M:%S.000Z"),
"end": convert_time_zone(period.end, time_zone, "UTC").strftime("%Y-%m-%dT%H:%M:%S.000Z"),
"inObjectTime": False,
"preview": "All",
"rejectIfOverlappingExistingAllocations": True,
"createdMessage": message,
}
endpoint = "allocations/add"
# asking fot the changes
request_result = self.baze.conn.post(endpoint, json=payload)
self._handle_http_errors(request_result)
preview_result: list[dict[str, Any]] = request_result.json()["previewAllocations"]
# applying the updates
payload = {
"createdMessage": message,
"inObjectTime": False,
"updates": preview_result,
}
endpoint = "availability/applyUpdates"
request_result = self.baze.conn.put(endpoint, json=payload)
self._handle_http_errors(request_result)
result: list[int] = request_result.json()["allocationIds"]
# getting the inserted allocation
alloc_ids = self.get_ids(
period=period,
object_names=[object_name],
allocation_type=allocation_type,
allocation_categories=[allocation_category],
include_available=True,
include_excluded=True,
include_overridden=False,
include_full_performance=True,
time_zone=time_zone,
)
# checking if the allocation was inserted
if object_name not in alloc_ids or len(alloc_ids[object_name]) == 0:
raise RuntimeError(f"Failed to insert allocation for {object_name}")
if len(alloc_ids[object_name]) > 1:
# there are cases where a millisecond difference in the start or end time can cause multiple allocations to be returned
# in this case, lets get not only the id's but also the allocations to define which one was the inserted allocation
period_allocs = self.get(
period=period,
object_names=[object_name],
allocation_type=allocation_type,
allocation_categories=[allocation_category],
include_available=True,
include_excluded=True,
include_overridden=False,
include_full_performance=True,
time_zone=time_zone,
output_type="DataFrame",
)
# removing allocations with a category different from the one we inserted
period_allocs = period_allocs[period_allocs["category.name"] == allocation_category]
# removing allocations with duration different from the one we inserted
period_allocs = period_allocs[period_allocs["duration"] == (period.end - period.start)]
# if no allocation was found or if more than one was found, raise an error
if len(period_allocs) == 0 or len(period_allocs) > 1:
raise RuntimeError(
f"After inserting zero or more than one manual allocation for {object_name} was found in the given period! Found allocatios {period_allocs.index.to_list()}",
)
# adding the selected allocation to the alloc_ids dict
alloc_ids[object_name] = [period_allocs.index[0]]
alloc_id = alloc_ids[object_name][0]
logger.debug(
f"Inserted allocation {alloc_id} for {object_name} in {time.perf_counter() - t0:.3f} s. It affected {len(result)} allocations",
)
# adding labels and comment if needed
if labels is not None or comment is not None:
self.update(allocation_id=alloc_id, labels=labels, comment=comment, time_zone=time_zone)
return alloc_id
labels_impact(allocations, truncate_label=False)
¶
Gets the time impact of the labels in the allocations.
It will consider only the first label of each allocation and group by it.
The output will have two columns:
- duration: Total duration of the label
- percentage: Percentage of the label duration in the total duration of all labels (including not label). This is not the availability impact!
An entry called "NotLabeled" will be added to account for the allocations without labels.
Parameters:
-
(allocations¶DataFrame) –DataFrame as the one returned by the get method.
-
(truncate_label¶bool, default:False) –If set to True, labels will be truncated at ".", considering only the first part of it, by default False
Returns:
-
DataFrame–DataFrame containing the total duration and percentage of each label. The index is the name of the label.
-
DataFrame–DataFrame containing the total duration and percentage of each label per object. The index is a MultiIndex with the label and the object name.
Source code in echo_baze/allocation_history.py
@validate_call
def labels_impact(
self,
allocations: pd.DataFrame,
truncate_label: bool = False,
) -> tuple[pd.DataFrame, pd.DataFrame]:
"""Gets the time impact of the labels in the allocations.
It will consider only the first label of each allocation and group by it.
The output will have two columns:
- duration: Total duration of the label
- percentage: Percentage of the label duration in the total duration of all labels (including not label). This is not the availability impact!
An entry called "NotLabeled" will be added to account for the allocations without labels.
Parameters
----------
allocations : DataFrame
DataFrame as the one returned by the get method.
truncate_label : bool, optional
If set to True, labels will be truncated at ".", considering only the first part of it, by default False
Returns
-------
DataFrame
DataFrame containing the total duration and percentage of each label. The index is the name of the label.
DataFrame
DataFrame containing the total duration and percentage of each label per object. The index is a MultiIndex with the label and the object name.
"""
# validating inputs
if not isinstance(allocations, pd.DataFrame):
raise TypeError(f"allocations must be a DataFrame, got: {type(allocations)}")
if not isinstance(truncate_label, bool):
raise TypeError(f"truncate_label must be a bool, got: {type(truncate_label)}")
# making a copy of the allocations
allocations = allocations.copy()
# adding value "NotLabeled" when column "label" is empty
allocations["label"] = allocations["label"].fillna("NotLabeled")
# truncating label at "."
if truncate_label and not allocations["label"].notna().all():
allocations["label"] = allocations["label"].str.split(".", expand=True).loc[:, 0]
# grouping by label
top_labels = allocations[["label", "duration"]].groupby("label").sum()
top_labels = top_labels.sort_values(by=["duration"], ascending=False)
top_labels["percentage"] = top_labels["duration"] / top_labels["duration"].sum()
# grouping by label and object
top_labels_obj = allocations[["label", "objectKey", "duration"]].groupby(["label", "objectKey"]).sum()
top_labels_obj = top_labels_obj.sort_values(by=["label", "duration"], ascending=False)
top_labels_obj["percentage"] = top_labels_obj["duration"] / top_labels_obj["duration"].sum()
return top_labels, top_labels_obj
remove_overlapping(allocations)
classmethod
¶
Method used to remove overlapping allocations.
This only manipulates the provided allocations DataFrame, not interacting with Bazefield. This method will sort the DataFrame considering the "category.available" and "category.excluded" columns, so the not available and not excluded will have priority. Watch out, after this method is executed, the DataFrame might have duplicated entries in the index if allocations were splitted.
Parameters:
-
(allocations¶DataFrame) –DataFrame containing the allocations.
Returns:
-
DataFrame–DataFrame containing the allocations without overlapping.
Source code in echo_baze/allocation_history.py
@classmethod
@validate_call
def remove_overlapping(cls, allocations: pd.DataFrame) -> pd.DataFrame:
"""Method used to remove overlapping allocations.
This only manipulates the provided allocations DataFrame, not interacting with Bazefield.
This method will sort the DataFrame considering the "category.available" and "category.excluded" columns, so the not available and not excluded will have priority.
Watch out, after this method is executed, the DataFrame might have duplicated entries in the index if allocations were splitted.
Parameters
----------
allocations : DataFrame
DataFrame containing the allocations.
Returns
-------
DataFrame
DataFrame containing the allocations without overlapping.
"""
if len(allocations) == 0:
return allocations
# getting all objectKeys
object_keys = allocations["objectKey"].unique().tolist()
df_list = []
# iterating over objectKeys
for obj in object_keys:
# getting the allocations for the object
obj_allocations = allocations[allocations["objectKey"] == obj].copy()
# removing overlapping allocations
obj_allocations = cls._remove_overlapping_single_obj(obj_allocations)
df_list.append(obj_allocations)
# concatenating the DataFrames
new_allocations = pd.concat(df_list)
return new_allocations
truncate(allocations, period)
staticmethod
¶
Truncates the allocations to the period.
This only manipulates the provided allocations DataFrame, not interacting with Bazefield.
Parameters:
-
(allocations¶DataFrame) –DataFrame containing the allocations.
-
(period¶DateTimeRange) –Desired period.
Returns:
-
DataFrame–DataFrame containing the truncated allocations.
Source code in echo_baze/allocation_history.py
@staticmethod
@validate_call
def truncate(allocations: pd.DataFrame, period: DateTimeRange) -> pd.DataFrame:
"""Truncates the allocations to the period.
This only manipulates the provided allocations DataFrame, not interacting with Bazefield.
Parameters
----------
allocations : DataFrame
DataFrame containing the allocations.
period : DateTimeRange
Desired period.
Returns
-------
DataFrame
DataFrame containing the truncated allocations.
"""
# making a copy of the allocations
allocations = allocations.copy()
# removing allocations that are outside the period
allocations = allocations[~((allocations["start"] > period.end) | (allocations["end"] < period.start))].copy()
# adjusting start and end
allocations.loc[allocations["start"] < period.start, "start"] = period.start
allocations.loc[allocations["end"] > period.end, "end"] = period.end
# calculating duration again
allocations["duration"] = allocations["end"] - allocations["start"]
return allocations
update(allocation_id, period=None, allocation_type=None, allocation_category=None, labels=None, comment=None, time_zone='local')
¶
Updates the given allocation with the given parameters.
Parameters set to None will not be updated.
Parameters:
-
(allocation_id¶int) –Id of the allocation to update.
-
(period¶DateTimeRange | None, default:None) –New period of the allocation. It is assumed to be in the same time zone as the one specified in time_zone. By default None
-
(allocation_type¶str | None, default:None) –What type of allocation to consider when changing allocation_category. Only used if allocation_category is not None. By default None
-
(allocation_category¶str | None, default:None) –New category for the allocation. To work must be set with allocation_type. By default None
-
(labels¶list[str] | None, default:None) –New labels for the allocation, by default None
-
(comment¶str | None, default:None) –Comment to add to the allocation, by default None
-
(time_zone¶TimeZone, default:'local') –In which time zone we assume the inputs are. If local is used, the default time zone defined in echo_baze will be used. If an int, must be between -12 and +12 By default "local"
Returns:
-
dict[str, Any]–Dict with the updated allocation.
Source code in echo_baze/allocation_history.py
@validate_call
def update(
self,
allocation_id: int,
period: DateTimeRange | None = None,
allocation_type: str | None = None,
allocation_category: str | None = None,
labels: list[str] | None = None,
comment: str | None = None,
time_zone: TimeZone = "local",
) -> dict[str, Any]:
"""Updates the given allocation with the given parameters.
Parameters set to None will not be updated.
Parameters
----------
allocation_id : int
Id of the allocation to update.
period : DateTimeRange | None, optional
New period of the allocation. It is assumed to be in the same time zone as the one specified in time_zone. By default None
allocation_type : str | None, optional
What type of allocation to consider when changing allocation_category. Only used if allocation_category is not None. By default None
allocation_category : str | None, optional
New category for the allocation. To work must be set with allocation_type. By default None
labels : list[str] | None, optional
New labels for the allocation, by default None
comment : str | None, optional
Comment to add to the allocation, by default None
time_zone : TimeZone, optional
In which time zone we assume the inputs are.
If local is used, the default time zone defined in echo_baze will be used.
If an int, must be between -12 and +12
By default "local"
Returns
-------
dict[str, Any]
Dict with the updated allocation.
"""
t0 = time.perf_counter()
# checking if at least one parameter is not None
if period is None and allocation_category is None and labels is None and comment is None:
raise ValueError("At least one parameter must be not None")
# getting wanted allocation to check if it exists
wanted_allocation = self.get_by_ids(allocation_ids=[allocation_id], output_type="dict")
if allocation_id not in wanted_allocation:
raise ValueError(f"The allocation id {allocation_id} does not exist")
# * labels ------------------------------------------------------------------------
# getting labels in case they are not None
if labels is not None:
available_labels = self.baze.allocations.labels.get_ids()
if wrong_labels := set(labels) - set(available_labels):
raise ValueError(f"The following labels do not exist: {wrong_labels}")
# adjusting format to the one expected by Bazefield
labels = [
{
"id": available_labels[label],
"name": label,
"type": "Allocation",
"selected": True,
"order": i + 1,
"optionLabel": label.lower(),
}
for i, label in enumerate(labels)
]
# * category ---------------------------------------------------------------------
# getting allocation type
if allocation_category is not None:
if allocation_type is None:
raise ValueError("allocation_type must be set if allocation_category is set")
# checking if allocation_type is valid
wrong_alloc_type = True
for category in wanted_allocation[allocation_id]["categories"]:
if category["allocationType"]["name"] == allocation_type:
wrong_alloc_type = False
break
if wrong_alloc_type:
raise ValueError(f"The allocation type {allocation_type} does not exist for the allocation id {allocation_id}")
all_alloc_types = self.baze.allocations.types.get_ids()
allocation_categories = self.baze.allocations.categories.get(allocation_types=list(all_alloc_types.keys()))
if allocation_category not in allocation_categories[allocation_type]:
raise ValueError(
f"The allocation category {allocation_category} does not exist for the allocation type {allocation_type} of the wanted allocation",
)
# finding main allocation category
parent_category = allocation_categories[allocation_type][allocation_category].get("parentCategoryName")
if parent_category is not None:
main_category = allocation_categories[
allocation_categories[allocation_type][allocation_category].get("parentAllocationTypeName")
][parent_category]
categories = [main_category, allocation_categories[allocation_type][allocation_category]]
else:
main_category = allocation_categories[allocation_type][allocation_category]
categories = [main_category]
# checking if there are children categories
children_categories = allocation_categories[allocation_type][allocation_category].get("childrenCategoryNames")
if children_categories is not None:
# getting default child category
default_child_category = self.baze.allocations.categories.get_default_children(allocation_types=[allocation_type])[
allocation_type
]
if default_child_category:
children_categories = default_child_category[allocation_category]
children_categories = list(children_categories.values())
categories.extend(children_categories)
# removing custom keys from categories and main_category
custom_keys = ["parentCategoryName", "parentAllocationTypeName", "childrenCategoryIds", "childrenCategoryNames"]
for category in categories:
for key in custom_keys:
if key in category:
del category[key]
for key in custom_keys:
if key in main_category:
del main_category[key]
# * period -----------------------------------------------------------------------
if period is not None:
start = convert_time_zone(period.start, time_zone, "UTC").strftime("%Y-%m-%dT%H:%M:%S.000Z")
end = convert_time_zone(period.end, time_zone, "UTC").strftime("%Y-%m-%dT%H:%M:%S.000Z")
duration = int((period.end - period.start).total_seconds() * 1000)
# * updating ---------------------------------------------------------------------
# adding comment if needed
if comment is not None:
self.baze.allocations.comments.insert(allocation_id=allocation_id, text=comment)
# getting allocation in the format returned by Bazefield to send it back
endpoint = f"allocations/{allocation_id}"
allocation = self.baze.conn.get(endpoint)
self._handle_http_errors(allocation)
allocation: dict[str, Any] = allocation.json()
# adjusting change message
user_name = self.baze.users.get_identity()["userName"]
changed_parameters = []
if labels is not None:
changed_parameters.append("labels")
if allocation_category is not None:
changed_parameters.append("category")
if period is not None:
changed_parameters.append("period")
change_message = (
f"Changed {', '.join(changed_parameters)} by {user_name} using echo-baze @ {datetime.now(UTC):%Y-%m-%d %H:%M:%S} UTC"
)
payload = deepcopy(allocation) | {
"changeLog": change_message + ";" + allocation["changeLog"],
"original": deepcopy(allocation),
"$edit": True,
"$editRepeat": [1],
"isSaving": True,
"rejectIfOverlappingExistingAllocations": True,
"changeMessage": change_message,
"Preview": "All",
}
if labels is not None:
payload["labels"] = labels
if allocation_category is not None:
payload["category"] = main_category
payload["categories"] = categories
if period is not None:
payload["start"] = start
payload["end"] = end
payload["duration"] = duration
# uploading the changes
endpoint = "availability/linked"
result = self.baze.conn.put(endpoint, json=payload)
self._handle_http_errors(result)
# applying updates in case the period was changed
if period is not None:
endpoint = "availability/applyUpdates"
preview_allocations: list[dict[str, Any]] = result.json()["previewAllocations"]
# applying the updates
payload = {
"createdMessage": f"Created by {user_name} using echo-baze @ {datetime.now(UTC):%Y-%m-%d %H:%M:%S} UTC",
"inObjectTime": False,
"updates": preview_allocations,
}
period_result = self.baze.conn.put(endpoint, json=payload)
self._handle_http_errors(period_result)
result: dict[str, Any] = result.json()["allocation"]
if "__type" in result:
del result["__type"]
# checking if the allocation was updated
if labels is not None and (missing_labels := {label["name"] for label in labels} - {label["name"] for label in result["labels"]}):
raise ValueError(f"The following labels were not updated: {missing_labels}")
if allocation_category is not None and result["category"]["name"] != main_category["name"]:
raise ValueError(f"Failed to update the allocation category to {main_category['name']}")
# getting object ids
object_ids = self.baze.objects.instances.get_ids()
object_ids = {object_id: object_name for object_name, object_id in object_ids.items()}
# adjusting the result before returning it
result = self._adjust_request_result(None, [result], object_ids, period=period, time_zone=time_zone)[result["id"]]
logger.debug(f"Updated allocation {allocation_id} in {time.perf_counter() - t0:.3f} s")
return result