Skip to content

Curves

Curves(baze)

Class used for handling Reference Curves.

Source code in echo_baze/baze_root.py
def __init__(self, baze: e_bz.Baze) -> None:
    """Base class that all subclasses should inherit from.

    Parameters
    ----------
    baze : Baze
        Top level object carrying all functionality and the connection handler.

    """
    # check inputs
    if not isinstance(baze, e_bz.Baze):
        raise ValueError(f"baze must be of type Baze, not {type(baze)}")

    self.baze: e_bz.Baze = baze

get(get_connected_objects=False, object_models=None, curve_type=None, name_search=None, output_type='dict')

Gets all reference curves with detailed information.

The most useful keys/columns returned are:

  • description
  • group
  • type
  • refCurveArray
  • settings
  • connectedObjects (only if get_connected_objects == True)

Parameters:

  • get_connected_objects

    (bool, default: False ) –

    Whether to get the objects connected to the curves or not, by default False

  • object_models

    (list[str] | None, default: None ) –

    List of object models to filter. If set, will only get curves associated to these models. By default None

  • curve_type

    (str | None, default: None ) –

    Type of curve (Measured, Contractual, Custom, etc.). If set, will only get curves of these types. By default None

  • name_search

    (str | None, default: None ) –

    String used to search for the curves. If set will only get curves that have a name that includes this. By default None

  • output_type

    (Literal['dict', 'DataFrame'], default: 'dict' ) –

    Output type of the data. Can be one of ["dict", "DataFrame"] By default "dict"

Returns:

  • dict[str, dict[str, Any]]

    In case output_type == "dict" it will return a dict with the following format: {name: {attribute: value, ...}, ...}

  • DataFrame

    In case output_type == "DataFrame" it will return a DataFrame with the following format: index = name, columns = [attribute, ...]

Source code in echo_baze/curves.py
@validate_call
def get(
    self,
    get_connected_objects: bool = False,
    object_models: list[str] | None = None,
    curve_type: str | None = None,
    name_search: str | None = None,
    output_type: Literal["dict", "DataFrame"] = "dict",
) -> dict[str, dict[str, Any]] | DataFrame:
    """Gets all reference curves with detailed information.

    The most useful keys/columns returned are:

    - description
    - group
    - type
    - refCurveArray
    - settings
    - connectedObjects (only if get_connected_objects == True)

    Parameters
    ----------
    get_connected_objects : bool, optional
        Whether to get the objects connected to the curves or not, by default False
    object_models : list[str] | None, optional
        List of object models to filter. If set, will only get curves associated to these models. By default None
    curve_type : str | None, optional
        Type of curve (Measured, Contractual, Custom, etc.). If set, will only get curves of these types. By default None
    name_search : str | None, optional
        String used to search for the curves. If set will only get curves that have a name that includes this. By default None
    output_type : Literal["dict", "DataFrame"], optional
        Output type of the data. Can be one of ["dict", "DataFrame"]
        By default "dict"

    Returns
    -------
    dict[str, dict[str, Any]]
        In case output_type == "dict" it will return a dict with the following format: {name: {attribute: value, ...}, ...}
    DataFrame
        In case output_type == "DataFrame" it will return a DataFrame with the following format: index = name, columns = [attribute, ...]

    """
    # getting the data
    endpoint = "analytics/referenceCurves/extended"

    payload = {
        "OrderBy": "DomainModelName",
        "OrderDir": "asc",
        "Skip": 0,
        "Take": 0,
    }

    if object_models is not None:
        # getting the ids of the object models
        object_models_ids = self.baze.objects.models.get_ids()
        if wrong_models := set(object_models) - set(object_models_ids):
            raise ValueError(f"Invalid object models: {wrong_models}")
        payload["DomainIds"] = [object_models_ids[model] for model in object_models]

    if curve_type is not None:
        payload["SelectedCurveType"] = curve_type

    if name_search is not None:
        payload["Search"] = name_search

    result = self.baze.conn.get(endpoint, json=payload)
    self._handle_http_errors(result)

    # converting to dict
    result: list[dict[str, Any]] = result.json()["refCurves"]

    # getting the connected objects
    if get_connected_objects:
        for entry in result:
            entry["connectedObjects"] = self.get_connected_objects(entry["name"])

    # converting reference curve to list of lists [[x1,y1],[x2,y2],...]
    for entry in result:
        if len(entry["refCurveArray"]) > 0:
            entry["refCurveArray"] = eval(entry["refCurveArray"])  # pylint: disable=eval-used # noqa

    # converting to desired format
    match output_type:
        case "dict":
            # defining the name as the key for the dict
            result_dict = {entry["name"]: entry for entry in result}
            # removing the name from the dict values
            for entry in result_dict.values():
                del entry["name"]
            return result_dict
        case "DataFrame":
            df = json_normalize(result, max_level=1)
            df = df.convert_dtypes(dtype_backend="pyarrow")
            df = df.set_index("name")
            return df
        case _:
            raise ValueError(f"Invalid output_type: {output_type}")

get_connected_objects(curve_name)

Gets all objects connected to a reference curve.

Parameters:

  • curve_name

    (str) –

    Name of the curve to get the connected objects from.

Returns:

  • list[dict[str, str]]

    List of objects connected to the curve in the format [{name: name, id: id, isPrimary: True/False}, ...]

Source code in echo_baze/curves.py
@validate_call
def get_connected_objects(self, curve_name: str) -> list[dict[str, str]]:
    """Gets all objects connected to a reference curve.

    Parameters
    ----------
    curve_name : str
        Name of the curve to get the connected objects from.

    Returns
    -------
    list[dict[str, str]]
        List of objects connected to the curve in the format [{name: name, id: id, isPrimary: True/False}, ...]

    """
    # getting the id of the curve
    ids = self.get_ids()
    if curve_name not in ids:
        raise ValueError(f"Curve with name '{curve_name}' does not exist!")
    curve_id = ids[curve_name]

    # getting the data
    endpoint = f"analytics/referenceCurve/{curve_id}/connections"

    result = self.baze.conn.get(endpoint)
    self._handle_http_errors(result)

    # converting to desired format
    result: list[dict[str, Any]] = result.json()

    # if there are no connected objects, return an empty list
    if all("objectName" not in entry for entry in result):
        return []

    result = [{"name": entry["objectName"], "id": entry["objectId"], "isPrimary": entry["isPrimary"]} for entry in result]

    return result

get_ids()

Gets a dictionary with all reference curve names and ids.

Returns:

  • dict[str, int]:

    Dictionary with all curves in the format {name: id, ...}

Source code in echo_baze/curves.py
def get_ids(self) -> dict[str, int]:
    """Gets a dictionary with all reference curve names and ids.

    Returns
    -------
    dict[str, int]:
        Dictionary with all curves in the format {name: id, ...}

    """
    endpoint = "analytics/referenceCurves/extended"

    # getting the data
    result = self.baze.conn.get(endpoint)
    self._handle_http_errors(result)

    # converting to dict
    result: list[dict[str, Any]] = result.json()["refCurves"]

    return {entry["name"]: entry["id"] for entry in result}

update(curve_name, curve_df=None, description=None, curve_type=None)

Method used to update a reference curve.

Parameters:

  • curve_name

    (str) –

    Name of the curve to update.

  • curve_df

    (DataFrame | None, default: None ) –

    DataFrame with the curve. Must have two columns X and Y of type float64 or double[pyarrow] and without NaNs. If set to None will not be updated. By default None

  • description

    (str | None, default: None ) –

    Description of the curve. If set to None will not be updated. By default None

  • curve_type

    (str | None, default: None ) –

    Type of the curve (Measured, Contractual, Custom, etc.) If set to None will not be updated. By default None

Source code in echo_baze/curves.py
@validate_call
def update(
    self,
    curve_name: str,
    curve_df: DataFrame | None = None,
    description: str | None = None,
    curve_type: str | None = None,
) -> None:
    """Method used to update a reference curve.

    Parameters
    ----------
    curve_name : str
        Name of the curve to update.
    curve_df : DataFrame | None, optional
        DataFrame with the curve. Must have two columns X and Y of type float64 or double[pyarrow] and without NaNs.
        If set to None will not be updated. By default None
    description : str | None, optional
        Description of the curve.
        If set to None will not be updated. By default None
    curve_type : str | None, optional
        Type of the curve (Measured, Contractual, Custom, etc.)
        If set to None will not be updated. By default None

    """
    # ! NOTE: There are many more options of the curve that can be updated, but what was implemented here is enough for now considering the current use. If needed, more can be added.

    # checking input
    if curve_df is not None:
        if "X" not in curve_df.columns or "Y" not in curve_df.columns:
            raise ValueError("curve_df must have columns 'X' and 'Y'")
        if curve_df["X"].isna().any() or curve_df["Y"].isna().any():
            raise ValueError("curve_df cannot have NaNs")
        if curve_df["X"].dtype not in ["float64", "double[pyarrow]"] or curve_df["Y"].dtype not in ["float64", "double[pyarrow]"]:
            raise ValueError("curve_df columns must be of type float64 or double[pyarrow]")

    # checking if at least one of the parameters is not None
    if curve_df is None and description is None and curve_type is None:
        raise ValueError("At least one of the parameters must be not None")

    # getting the id of the curve
    ids = self.get_ids()
    if curve_name not in ids:
        raise ValueError(f"Curve with name '{curve_name}' does not exist! Please create it first in the portal.")
    curve_id = ids[curve_name]

    # getting the current curve
    endpoint = f"analytics/referenceCurve/{curve_id}"

    # getting the data
    curve = self.baze.conn.get(endpoint)
    self._handle_http_errors(curve)
    curve = curve.json()

    # updating the curve
    if curve_df is not None:
        curve_df = curve_df.reset_index(drop=True)
        # checking if bin size is the same in all rows
        bin_sizes = curve_df["X"].diff().round(decimals=3).dropna().unique()
        if len(bin_sizes) > 1:
            raise ValueError(f"Curve must have evenly spaced data, got bin sizes: {bin_sizes.tolist()}")
        # converting to the format "[[x1,y1],[x2,y2],...]" with 3 decimal places
        curve["refCurveArray"] = str(curve_df[["X", "Y"]].to_numpy().tolist()).replace(" ", "")
        # adjusting bin size
        curve["settings"]["binSize"] = bin_sizes[0]
    if description is not None:
        curve["description"] = description
    if curve_type is not None:
        curve["type"] = curve_type

    # updating the curve
    endpoint = "analytics/referenceCurve/update"

    result = self.baze.conn.put(endpoint, json=curve)
    self._handle_http_errors(result)

    # checking if the curve was updated
    new_curve = result.json()
    if (
        new_curve["refCurveArray"] != curve["refCurveArray"]
        or new_curve["description"] != curve["description"]
        or new_curve["type"] != curve["type"]
    ):
        raise RuntimeError(f"Curve '{curve_name}' was not updated successfully!")

    logger.info(f"Curve '{curve_name}' updated successfully!")