Skip to content

Allocation Comments

AllocationComments(baze)

Class used for handling allocation comments.

Source code in echo_baze/baze_root.py
def __init__(self, baze: e_bz.Baze) -> None:
    """Base class that all subclasses should inherit from.

    Parameters
    ----------
    baze : Baze
        Top level object carrying all functionality and the connection handler.

    """
    # check inputs
    if not isinstance(baze, e_bz.Baze):
        raise ValueError(f"baze must be of type Baze, not {type(baze)}")

    self.baze: e_bz.Baze = baze

get(allocation_ids, output_type='dict', time_zone='local')

Gets all allocation comments for the given allocation ids.

The most useful keys/columns returned are: - allocationId - commentId - createdBy - commentText

Parameters:

  • allocation_ids

    (list[int]) –

    List of allocation ids to get the comments for.

  • output_type

    (Literal['dict', 'DataFrame'], default: 'dict' ) –

    Output type of the data. Can be one of ["dict", "DataFrame"] By default "dict"

  • time_zone

    ( TimeZone, default: 'local' ) –

    In which time zone the dates should be returned. If local is used, the default time zone defined in echo_baze will be used. If an int, must be between -12 and +12 By default "local"

Returns:

  • dict[int, dict[int, dict[str, Any]]]

    In case output_type == "dict" it will return a dict in the format {allocationId: {commentId: {comment_attribute: comment_value, ...}, ...}, ...}

  • DataFrame

    In case output_type == "DataFrame" it will return a DataFrame where the index is a MultiIndex with levels allocation_id and comment_id. The columns are the attributes

Source code in echo_baze/allocation_comments.py
@validate_call
def get(
    self,
    allocation_ids: list[int],
    output_type: Literal["dict", "DataFrame"] = "dict",
    time_zone: TimeZone = "local",
) -> dict[int, dict[int, dict[str, Any]]] | DataFrame:
    """Gets all allocation comments for the given allocation ids.

    The most useful keys/columns returned are:
    - allocationId
    - commentId
    - createdBy
    - commentText

    Parameters
    ----------
    allocation_ids : list[int]
        List of allocation ids to get the comments for.
    output_type : Literal["dict", "DataFrame"], optional
        Output type of the data. Can be one of ["dict", "DataFrame"]
        By default "dict"
    time_zone :  TimeZone, optional
        In which time zone the dates should be returned.
        If local is used, the default time zone defined in echo_baze will be used.
        If an int, must be between -12 and +12
        By default "local"

    Returns
    -------
    dict[int, dict[int, dict[str, Any]]]
        In case output_type == "dict" it will return a dict in the format {allocationId: {commentId: {comment_attribute: comment_value, ...}, ...}, ...}
    DataFrame
        In case output_type == "DataFrame" it will return a DataFrame where the index is a MultiIndex with levels allocation_id and comment_id. The columns are the attributes

    """
    t0 = time.perf_counter()

    results: dict[int, Any] = {}
    for allocation_id in allocation_ids:
        results[allocation_id] = {}

        # getting the allocation
        allocations = self.baze.allocations.history.get_by_ids([allocation_id], output_type="dict", time_zone="UTC")
        endpoint = "comments/type/3"
        payload = {"commentType": 3, "foreignKeys": [allocations[allocation_id]["sfId"]]}

        result = self.baze.conn.post(endpoint, json=payload)
        try:
            self._handle_http_errors(result)
        except Exception:
            logger.exception(f"Error getting allocation comments for allocation id {allocation_id}")
            continue

        result = result.json()

        # converting to a better format

        result = {comment["id"]: comment | {"commentId": comment["id"], "allocationId": allocation_id} for comment in result}
        # converting dates to datetime
        for comment_id in result:
            del result[comment_id]["id"]

            for attr in ["createdTimestamp"]:
                if attr in result[comment_id]:
                    result[comment_id][attr] = convert_time_zone(
                        datetime.strptime(result[comment_id][attr][:-9], "%Y-%m-%dT%H:%M:%S"),
                        "UTC",
                        time_zone,
                    )
            # processing commentRelationships
            if "commentRelationships" in result[comment_id]:
                for relationship in result[comment_id]["commentRelationships"]:
                    relationship_type = relationship.get("type")
                    foreign_key = relationship.get("foreignKey")
                    if relationship_type and foreign_key:
                        result[comment_id][relationship_type] = foreign_key
                # removing the original commentRelationships
                del result[comment_id]["commentRelationships"]

        # adding the data to the results
        results[allocation_id] = result

    # converting to desired output
    match output_type:
        case "dict":
            final_results: dict[int, Any] | DataFrame = results
        case "DataFrame":
            # main columns that need to be present in the DataFrame even if there are no values
            # this is needed to make sure to return a DataFrame when there are no allocations
            # if needed, more columns can be added here
            main_cols = {
                "allocationId": "int64[pyarrow]",
                "commentId": "int64[pyarrow]",
                "sfId": "string[pyarrow]",
                "createdByUserId": "int64[pyarrow]",
                "createdTimestamp": "datetime64[s]",
                "commentText": "string[pyarrow]",
                "private": "bool[pyarrow]",
                "external": "bool[pyarrow]",
                "cleared": "bool[pyarrow]",
                "OBJECT": "string[pyarrow]",
                "ALLOCATION": "string[pyarrow]",
            }

            # converting to a list of dicts
            results_list = []
            for comments in results.values():
                results_list += list(comments.values())

            # creating DataFrame
            if len(results_list) == 0:
                final_results = DataFrame(columns=main_cols)
            else:
                final_results = json_normalize(results_list, max_level=1)

                # making sure timestamps cols are rounded to second level
                for col in ["createdTimestamp"]:
                    if col in final_results.columns:
                        final_results[col] = final_results[col].dt.round("s")

                # adding missing columns before setting dtypes
                for col in main_cols:
                    if col not in final_results.columns:
                        final_results[col] = None

                # set dtypes
                final_results = final_results.astype(main_cols)
                final_results = final_results.convert_dtypes(dtype_backend="pyarrow")

                # set index AFTER all columns are properly configured
                final_results = final_results.set_index(["allocationId", "commentId"])

        case _:
            raise ValueError(f"output_type must be one of ['dict', 'DataFrame'], got '{output_type}'")

    logger.debug(f"Getting allocation comments took {time.perf_counter() - t0:.2f}s")

    return final_results

insert(allocation_id, text)

Inserts a new comment for the given allocation id.

Parameters:

  • allocation_id

    (int) –

    Id of the allocation to insert the comment for.

  • text

    (str) –

    Comment text to insert.

Source code in echo_baze/allocation_comments.py
@validate_call
def insert(
    self,
    allocation_id: int,
    text: str,
) -> None:
    """Inserts a new comment for the given allocation id.

    Parameters
    ----------
    allocation_id : int
        Id of the allocation to insert the comment for.
    text : str
        Comment text to insert.

    """
    # getting the allocation
    allocations = self.baze.allocations.history.get_by_ids([allocation_id], output_type="dict", time_zone="UTC")

    if allocation_id not in allocations:
        raise ValueError(f"Allocation id {allocation_id} does not exist")

    endpoint = "comments"
    payload = {
        "comment": {
            "id": None,
            "commentText": text,
            "private": False,
            "external": False,
            "cleared": False,
            "commentRelationships": [
                {"type": 3, "foreignKey": allocations[allocation_id]["sfId"]},  # Type 3 is for allocations
                {"type": 1, "foreignKey": allocations[allocation_id]["objectId"]},  # Type 1 is for objects
            ],
        },
    }

    result = self.baze.conn.post(endpoint, json=payload)
    self._handle_http_errors(result)