Skip to content

KPIs - Availability

KpiAvailability(baze)

Class used for handling availability.

Source code in echo_baze/baze_root.py
def __init__(self, baze: e_bz.Baze) -> None:
    """Base class that all subclasses should inherit from.

    Parameters
    ----------
    baze : Baze
        Top level object carrying all functionality and the connection handler.

    """
    # check inputs
    if not isinstance(baze, e_bz.Baze):
        raise ValueError(f"baze must be of type Baze, not {type(baze)}")

    self.baze: e_bz.Baze = baze

get(object_names, period, subperiod_size=timedelta(days=1), availability_type='time', groups=None, allocations=None)

Gets the availability and amounts of lost time and/or energy for a list of objects and a period.

Parameters:

  • object_names

    (list[str]) –

    Desired objects. Keep in mind that as energy availability now considers the ONS set point of the SPE, all objects within the same SPE must be calculated at once. This means that if you select only one object of an SPE, the code will dynamically get all the objects of the SPE and calculate the availability for all of them.

  • period

    (DateTimeRange) –

    Period to get the availability.

  • subperiod_size

    (timedelta, default: timedelta(days=1) ) –

    Size of the subperiod to get availability. For example, if the period lasts 1 week and subperiod is 1 day, the result will contain availability for each day inside the period and also for the whole period. By default timedelta(days=1)

  • availability_type

    (Literal['time', 'energy', 'all'], default: 'time' ) –

    Defines which amounts will be calculated. If "all", both "time" and "energy" will be calculated. By default "time".

  • groups

    (dict[str, list[str]] | None, default: None ) –

    Groups of objects to calculate availability for (SPE, site, etc.).

    If provided, must be a dict in the format {group_name: [object1, object2, ...], ...} If None, no groups will be used, by default None

  • allocations

    (DataFrame | None, default: None ) –

    DataFrame with the allocations for the entire period and all requested objects. Can be used to avoid requesting this data to Bazefield. If None, will be requested to Bazefield. By default None

Returns:

  • DataFrame

    DataFrame containing the availability for each object and subperiod. Index: - Its index will be the start of each subperiod and a last one called "Period" for the whole period.

    Columns: - If availability_type is "all", its columns will be a MultiIndex with levels "Object" and "Type" (Energy or Time). - If availability_type is "time" or "energy", its columns will be a simple Index with the object names.

    Values: - All values will be between 0 and 1, representing the fraction of time/energy available.

  • DataFrame

    DataFrame containing the amounts of lost time and/or energy for each object and subperiod.

    Index: - Its index will be the start of each subperiod and a last one called "Period" for the whole period.

    Columns: - If availability_type is "all", its columns will be a MultiIndex with levels "Object", "Category" and "Type". The "Type" level will contain "Energy", "Curtailment" and/or "Time" depending on availability_type: - "Time" appears when availability_type is "time" or "all". - "Energy" and "Curtailment" appear when availability_type is "energy" or "all". - If availability_type is "time" or "energy", its columns will be a MultiIndex with levels "Object" and "Category".

    Values: - Time values are in seconds. - Energy and Curtailment values are in MWh (lost production and lost curtailment production respectively).

  • dict[str, list[DateTimeRange] | str]

    Dict containing the SPEs that failed to calculate the amounts and the periods that failed.

    The key is the SPE name and the value is a list of periods that failed to calculate the amounts or a string with the error message.

Notes
- The "Available" category contains the amount of time or the produced energy (used as denominator for availability).
- "Curtailment" represents energy that was curtailed (lostCurtailmentProduction) and is reported separately from general "Energy" loss. It is the energy that would be lost due to curtailment if the asset were operating at full capacity but is not included as a downtime loss due to ONS limitation.
Source code in echo_baze/kpi_availability.py
@validate_call
def get(
    self,
    object_names: list[str],
    period: DateTimeRange,
    subperiod_size: timedelta = timedelta(days=1),
    availability_type: Literal["time", "energy", "all"] = "time",
    groups: dict[str, list[str]] | None = None,
    allocations: pd.DataFrame | None = None,
) -> tuple[pd.DataFrame, pd.DataFrame, dict[str, list[DateTimeRange] | str]]:
    """Gets the availability and amounts of lost time and/or energy for a list of objects and a period.

    Parameters
    ----------
    object_names : list[str]
        Desired objects. Keep in mind that as energy availability now considers the ONS set point of the SPE, all objects within the same SPE must be calculated at once. This means that if you select only one object of an SPE, the code will dynamically get all the objects of the SPE and calculate the availability for all of them.
    period : DateTimeRange
        Period to get the availability.
    subperiod_size : timedelta, optional
        Size of the subperiod to get availability. For example, if the period lasts 1 week and subperiod is 1 day, the result will contain availability for each day inside the period and also for the whole period.
        By default timedelta(days=1)
    availability_type : Literal["time", "energy", "all"], optional
        Defines which amounts will be calculated. If "all", both "time" and "energy" will be calculated. By default "time".
    groups : dict[str, list[str]] | None, optional
        Groups of objects to calculate availability for (SPE, site, etc.).

        If provided, must be a dict in the format {group_name: [object1, object2, ...], ...}
        If None, no groups will be used, by default None
    allocations : DataFrame | None, optional
        DataFrame with the allocations for the entire period and all requested objects. Can be used to avoid requesting this data to Bazefield.
        If None, will be requested to Bazefield. By default None

    Returns
    -------
    DataFrame
        DataFrame containing the availability for each object and subperiod.
        Index:
        - Its index will be the start of each subperiod and a last one called "Period" for the whole period.

        Columns:
        - If availability_type is "all", its columns will be a MultiIndex with levels "Object" and "Type" (Energy or Time).
        - If availability_type is "time" or "energy", its columns will be a simple Index with the object names.

        Values:
        - All values will be between 0 and 1, representing the fraction of time/energy available.

    DataFrame
        DataFrame containing the amounts of lost time and/or energy for each object and subperiod.

        Index:
        - Its index will be the start of each subperiod and a last one called "Period" for the whole period.

        Columns:
        - If availability_type is "all", its columns will be a MultiIndex with levels "Object", "Category" and "Type". The "Type" level will contain "Energy", "Curtailment" and/or "Time" depending on availability_type:
        - "Time" appears when availability_type is "time" or "all".
        - "Energy" and "Curtailment" appear when availability_type is "energy" or "all".
        - If availability_type is "time" or "energy", its columns will be a MultiIndex with levels "Object" and "Category".

        Values:
        - Time values are in seconds.
        - Energy and Curtailment values are in MWh (lost production and lost curtailment production respectively).

    dict[str, list[DateTimeRange] | str]
        Dict containing the SPEs that failed to calculate the amounts and the periods that failed.

        The key is the SPE name and the value is a list of periods that failed to calculate the amounts or a string with the error message.

    Notes
    -----
        - The "Available" category contains the amount of time or the produced energy (used as denominator for availability).
        - "Curtailment" represents energy that was curtailed (lostCurtailmentProduction) and is reported separately from general "Energy" loss. It is the energy that would be lost due to curtailment if the asset were operating at full capacity but is not included as a downtime loss due to ONS limitation.
    """

    def _get_spe_objects_amounts(
        spe: str,
        spe_objects: list[str],
        allocations: pd.DataFrame,
        subperiods: list[DateTimeRange],
        availability_type: Literal["time", "energy", "all"],
        categories: pd.DataFrame,
        points: pd.DataFrame | None = None,
        ons_points: pd.DataFrame | None = None,
        point_names: dict[str, dict[str, str]] | None = None,
        obj_alloc_types: dict[str, list[str]] | None = None,
    ) -> tuple[pd.DataFrame, list[DateTimeRange]]:
        """Internal function to get the amounts of lost time and/or energy for a single object and a list of subperiods.

        Parameters
        ----------
        spe : str
            Name of the SPE.
        spe_objects : list[str]
            Name of the objects of the SPE.
        allocations : DataFrame
            DataFrame with the allocations for the entire period and all requested objects.
        subperiods : list[DateTimeRange]
            List of subperiods to get the amounts.
        availability_type : Literal["time", "energy", "all"]
            Type of availability to calculate. Can be "time", "energy" or "all".
        categories : DataFrame
            DataFrame with the categories definition.
        points : DataFrame | None, optional
            DataFrame with the points. It can only be None if the availability type is "time". By default None
        ons_points : DataFrame | None, optional
            DataFrame with the ONS points representing the power limit imposed by ONS. It can only be None if the availability type is "time". By default None
        point_names : dict[str, dict[str, str]] | None, optional
            Dict containing mapping from ActivePower and ActivePowerTheoretical to the actual point names in the points DataFrame. It should only be used if points DataFrame is provided.
            It should be in the format {object_name: {{"ActivePower": ActivePowerKpiPoint, "ActivePowerTheoretical": ActivePowerTheoreticalKpiPoint}}}, where ActivePowerTheoreticalKpiPoint and ActivePowerKpiPoint are attributes of the object type defined in Bazefield.
        obj_alloc_types : dict[str, list[str]]  | None, optional
            Dictionary with the default allocation types for the objects. If provided this will be used, skipping the request to Bazefield.
            By default None

        Returns
        -------
        DataFrame
            DataFrame containing the amounts of lost time and/or energy for each object and subperiod.
        list[DateTimeRange]
            List of subperiods that failed to calculate the amounts.

        """
        t0 = time.perf_counter()

        logger.debug(
            f"Calculating availability amounts for '{spe}' between {subperiods[0].start:%y-%m-%d %H:%M:%S} and {subperiods[-1].end:%y-%m-%d %H:%M:%S}",
        )

        # checking if all objects are of the wanted SPE
        if not all(obj.startswith(spe) for obj in spe_objects):
            raise ValueError(f"object_names must be all objects of the SPE '{spe}', got {spe_objects}")

        # getting object allocations
        spe_allocations = allocations[allocations["objectKey"].isin(spe_objects)].copy()
        # creating DataFrame to store amounts
        index = pd.Index([], name="Time")
        columns = pd.MultiIndex.from_product(
            [spe_objects, [*categories.index.get_level_values("category").to_list(), "Available"], ["Energy", "Curtailment", "Time"]],
            names=["Object", "Category", "Type"],
        )
        amounts = pd.DataFrame(index=index, columns=columns, dtype="double[pyarrow]").fillna(0.0)

        # iterating over subperiods
        failed_periods = []
        for subperiod in subperiods:
            logger.debug(f"Calculating availability amounts for '{spe}' and {subperiod}")
            t0 = time.perf_counter()
            try:
                # truncating allocations to subperiod
                subperiod_obj_allocations = self.baze.allocations.history.truncate(spe_allocations, subperiod)
                # getting amounts for subperiod
                subperiod_amounts = self._get_spe_objects_single_period_amounts(
                    spe_name=spe,
                    object_names=spe_objects,
                    period=subperiod,
                    availability_type=availability_type,
                    allocations=subperiod_obj_allocations,
                    points=points,
                    ons_points=ons_points,
                    point_names=point_names,
                    obj_alloc_types=obj_alloc_types,
                    categories=categories,
                )

                t1 = time.perf_counter()

                # converting subperiod_amounts that has category in the index and object.name and type in the columns to a DataFrame with "Object", "Category", "Type" in the index and a single column called "Value"
                subperiod_amounts = (
                    subperiod_amounts.melt(ignore_index=False)
                    .reset_index()
                    .rename(columns={"value": "Value", "object.name": "Object", "type": "Type"})
                )
                subperiod_amounts = subperiod_amounts.set_index(amounts.columns.names)
                subperiod_amounts = subperiod_amounts.T
                subperiod_amounts.index.name = amounts.index.name
                subperiod_amounts.index = [subperiod.start]

                # sorting columns of subperiod_amounts to match amounts
                if len(subperiod_amounts.columns) != len(amounts.columns):
                    raise ValueError(
                        f"subperiod_amounts columns ({subperiod_amounts.columns}) do not match amounts columns ({amounts.columns})",
                    )
                subperiod_amounts = subperiod_amounts[amounts.columns]

                # adding to amounts
                amounts = subperiod_amounts.copy() if amounts.empty else pd.concat([amounts, subperiod_amounts], axis=0)

                logger.debug(
                    f"Postprocessed availability amounts for '{spe}' and {subperiod} in {time.perf_counter() - t1:.2f} seconds",
                )

            except Exception:
                failed_periods.append(subperiod)
                logger.exception(f"Error calculating availability amounts for SPE '{spe}' and {subperiod}")
            logger.debug(
                f"Finished calculating availability amounts for '{spe}' and {subperiod} in {time.perf_counter() - t0:.2f} seconds",
            )

        logger.info(
            f"Calculated availability amounts for '{spe}' between {subperiods[0].start:%y-%m-%d %H:%M:%S} and {subperiods[-1].end:%y-%m-%d %H:%M:%S}  in {time.perf_counter() - t0:.2f} seconds",
        )

        return amounts, failed_periods

    t0 = time.perf_counter()

    # checking inputs
    if groups is not None:
        # checking if all objects are in groups
        for obj in object_names:
            found = False
            for group in groups.values():
                if obj in group:
                    found = True
                    break
            if not found:
                raise ValueError(f"object '{obj}' is not in any group")
        # checking if all groups objects are in object_names
        for group_name, group in groups.items():
            for obj in group:
                if obj not in object_names:
                    raise ValueError(f"object '{obj}' of group '{group_name}' is not in object_names")

    # rounding period to be always at 00:00:00
    if period.start.hour > 0 or period.start.minute > 0 or period.start.second > 0:
        logger.warning(f"Rounding period start to 00:00:00 from {period.start:%y-%m-%d %H:%M:%S.%f}")
        period.start = period.start.replace(hour=0, minute=0, second=0, microsecond=0)
    if period.end.hour > 0 or period.end.minute > 0 or period.end.second > 0:
        logger.warning(f"Rounding period end to 00:00:00 from {period.end:%y-%m-%d %H:%M:%S.%f}")
        period.end = period.end.replace(hour=0, minute=0, second=0, microsecond=0) + timedelta(days=1)
    for period_part in ["start", "end"]:
        period[period_part] = period[period_part].replace(hour=0, minute=0, second=0, microsecond=0)

    # validating if we have all the objects for the SPE
    baze_objs = self.baze.objects.instances.get(object_types=["Inverter", "Turbine"])
    # getting the SPEs of the wanted objects
    # the spe name is expected to be the first block in the object name like XXX-XXX-YYY-YYY-YYY where (XXX-XXX) is the spe name
    if not object_names:
        object_names = list(baze_objs.keys())
    spe_names = {re.match(r"(\w+-\w+)-", obj).group(1) for obj in object_names}
    spe_names = list(spe_names)
    spe_names.sort()
    # validating if we need to add more objects to the list
    new_objs = []
    for spe in spe_names:
        for obj in baze_objs:
            if obj.startswith(spe) and obj not in object_names:
                new_objs.append(obj)  # noqa: PERF401
    if new_objs:
        logger.info(f"Adding objects to the analysis to calculate entire SPEs: {new_objs}")
    object_names += new_objs
    object_names.sort()

    # getting allocations
    if allocations is None:
        allocations = self.baze.allocations.history.get(
            period=period,
            object_names=object_names,
            include_excluded=True,
            truncate=True,
        )

    # getting points
    # adding 10 min to period to make sure we get all points
    points_period = period.copy()
    points_period.start = points_period.start - timedelta(minutes=10)
    points_period.end = points_period.end + timedelta(minutes=10)
    if availability_type in ["energy", "all"]:
        # getting the needed points for each object
        obj_points = self.baze.allocations.history._get_lost_energy_point_names(object_names=object_names)  # noqa: SLF001
        obj_points = {obj: list(obj_points.values()) for obj, obj_points in obj_points.items()}

        points = self.baze.points.values.series.get(
            points=obj_points,
            period=points_period,
            aggregation="Raw",
            round_timestamps={"freq": timedelta(minutes=5), "tolerance": timedelta(minutes=2)},
        )
        # Logic to manage absurd values for wind assets
        if "ActivePower_10min.AVG" in points.columns.get_level_values("point"):
            value_condition = points > 5000  # maximum power for wind assets
            column_condition = points.columns.get_level_values("point") == "ActivePower_10min.AVG"
            absurd_mask = value_condition & column_condition
            points = points.mask(absurd_mask)
    else:
        points = None

    # getting ONS points
    if availability_type in ["energy", "all"]:
        # connecting to postgres
        perfdb = PerfDB(application_name="baze_kpi_availability")

        # getting nominal power of each SPE
        spes_nominal_power = perfdb.objects.instances.attributes.get(
            object_names=spe_names,
            attribute_names=["nominal_power"],
        )

        # getting ONS power limit
        ons_points = perfdb.ons.limitations.series.get(
            period=points_period,
            group_type="spe",
            object_names=spe_names,
            output_type="DataFrame",
            errors="ignore",
        )

        # filling NA with nominal power
        for spe in spe_names:
            if spe not in ons_points.columns.to_list():
                logger.warning(f"Spe '{spe}' not found in ONS data columns")
                ons_points[spe] = pd.NA
            nominal_power = spes_nominal_power.get(spe, {}).get("nominal_power", {}).get("attribute_value", 100000)
            if nominal_power == 100000:
                logger.warning(f"Nominal power for SPE '{spe}' not found, using 100000 kW as default")
            ons_points[spe] = ons_points[spe].fillna(nominal_power)

        # resampling to 5 minutes
        ons_points = ons_points.resample(rule="5min").mean()

        # validating if any of the columns of the ons_points   have all values lower or equal to 0
        all_zero_cols = ons_points.columns[(ons_points <= 0).all()]
        if len(all_zero_cols) > 0:
            logger.warning(f"Columns found with all values lower or equal to 0 in ons_points: {all_zero_cols}")

        del perfdb

    # dividing periods in subperiods
    subperiods = period.split_multiple(separator=subperiod_size, normalize=True, start_end_equal=True)

    # getting default allocation types for the wanted objects
    obj_alloc_types = self.baze.allocations.types.get_default(object_names=object_names)
    orig_obj_alloc_types = deepcopy(obj_alloc_types)
    # converting so only one obj_alloc_type per object
    obj_alloc_types = {obj: obj_alloc_type[0] for obj, obj_alloc_type in obj_alloc_types.items()}
    alloc_types = list(obj_alloc_types.values())
    alloc_types = list(set(alloc_types))

    # getting categories definition
    categories = self.baze.allocations.categories.get(output_type="DataFrame", allocation_types=alloc_types)
    categories = categories[(~categories["available"]) | (categories["excluded"])].copy()

    # * Amount of time/energy lost for each object and subperiod ------------

    # creating DataFrame to store amounts
    index = pd.Index([subperiod.start for subperiod in subperiods] + ["Period"], name="Time")
    names = object_names.copy()
    if groups is not None:
        names += list(groups)
    names += ["Total"]
    columns = pd.MultiIndex.from_product(
        [names, [*list(set(categories.index.get_level_values("category").to_list())), "Available"], ["Energy", "Curtailment", "Time"]],
        names=["Object", "Category", "Type"],
    )
    amounts = pd.DataFrame(index=index, columns=columns, dtype="double[pyarrow]").fillna(0.0)

    # getting names mapping for standard features of each object. This will be passed down until calc_lost_energy to avoid requesting the points again from Bazefield
    point_names = self.baze.allocations.history._get_lost_energy_point_names(object_names=object_names)  # noqa: SLF001

    spe_objs = {spe: [obj for obj in object_names if spe in obj] for spe in spe_names}

    # getting amounts for each SPE sequentially
    failed_spes = {}
    for spe, this_spe_objs in spe_objs.items():
        try:
            spe_obj_amounts, failed_periods = _get_spe_objects_amounts(
                spe=spe,
                spe_objects=this_spe_objs,
                allocations=allocations,
                subperiods=subperiods,
                availability_type=availability_type,
                categories=categories.loc[pd.IndexSlice[obj_alloc_types[this_spe_objs[0]], :], :],
                points=points,
                ons_points=ons_points,
                point_names=point_names,
                obj_alloc_types=orig_obj_alloc_types,
            )

            amounts.loc[spe_obj_amounts.index, spe_obj_amounts.columns] = spe_obj_amounts.values
        except Exception:
            failed_spes[spe] = "Failed for unknown reason"
            logger.exception(f"Error calculating availability amounts for SPE '{spe}'")

        if failed_periods:
            failed_spes[spe] = failed_periods
            logger.warning(f"Failed to calculate availability amounts for SPE '{spe}' and periods {failed_periods}")

    t1 = time.perf_counter()

    # calculating total for all objects
    total = amounts.T.groupby(level=["Category", "Type"]).sum()
    # adding another level to columns (object level)
    total = total.reset_index(drop=False)
    total["Object"] = "Total"
    total = total.set_index(["Object", "Category", "Type"]).T
    amounts.loc[total.index, total.columns] = total.values

    # iterating over groups
    if groups is not None:
        for group_name, group in groups.items():
            # getting group allocations
            group_totals = amounts.loc[:, pd.IndexSlice[group, :, :]].T.groupby(level=["Category", "Type"]).sum()
            # adding another level to columns (group level)
            group_totals = group_totals.reset_index(drop=False)
            group_totals["Object"] = group_name
            group_totals = group_totals.set_index(["Object", "Category", "Type"]).T
            amounts.loc[group_totals.index, group_totals.columns] = group_totals.values

    # getting name of index as it will be removed by the next operation
    index_name = amounts.index.name
    # calculating total for period
    period_total = amounts.sum(axis=0).values
    # creating a new row for period
    period_row = pd.DataFrame(data=[period_total], index=["Period"], columns=amounts.columns)
    # removing period row if it already exists
    amounts = amounts.drop(index="Period", errors="ignore")
    # concatenating to amounts
    amounts = pd.concat([amounts, period_row], axis=0)
    # adding name of index again
    amounts.index.name = index_name

    # * Availability --------------------------------------------------------

    # DataFrame to store availability
    index = amounts.index
    columns = pd.MultiIndex.from_product([names, ["Energy", "Time"]], names=["Object", "Type"])
    availability = pd.DataFrame(index=index, columns=columns, dtype="double[pyarrow]").fillna(0.0)

    # segregating categories
    cat_unavailable = list(set(categories[~categories["available"]].index.get_level_values("category").to_list()))
    cat_available = ["Available"]

    # calculating availability
    for avail_type in ["Energy", "Time"]:
        unavail_amount = amounts.loc[:, pd.IndexSlice[:, cat_unavailable, avail_type]].T.groupby(level=["Object"]).sum()
        avail_amount = amounts.loc[:, pd.IndexSlice[:, cat_available, avail_type]].T.groupby(level=["Object"]).sum()
        avail = avail_amount / (avail_amount + unavail_amount)
        # replacing inf and nan with 0
        avail = avail.replace([np.inf, -np.inf, np.nan], 0)
        # transposing and reordering like availability
        avail = avail.T.reindex(columns=availability.xs(avail_type, level="Type", axis=1).columns)
        avail = avail.reindex(index=availability.index)
        # adjusting avail DataFrame to be in the same format as availability for faster update
        avail.index.name = availability.index.name
        avail.columns = pd.MultiIndex.from_product([avail.columns, [avail_type]], names=availability.columns.names)
        # adding to availability
        availability.loc[avail.index, avail.columns] = avail.values

    # dropping unwanted types
    if availability_type == "time":
        availability = availability.xs("Time", level="Type", axis=1)
        amounts = amounts.xs("Time", level="Type", axis=1)
    elif availability_type == "energy":
        availability = availability.xs("Energy", level="Type", axis=1)
        amounts = amounts.xs("Energy", level="Type", axis=1)

    logger.info(f"Postprocessed availability for {len(object_names)} objects and {period} in {time.perf_counter() - t1:.2f} seconds")

    logger.info(f"Calculated availability for {len(object_names)} objects and {period} in {time.perf_counter() - t0:.2f} seconds")

    return availability, amounts, failed_spes