Skip to content

Point Values - Series

PointValuesSeries(baze)

Class used for handling a point value time series.

Source code in echo_baze/baze_root.py
def __init__(self, baze: e_bz.Baze) -> None:
    """Base class that all subclasses should inherit from.

    Parameters
    ----------
    baze : Baze
        Top level object carrying all functionality and the connection handler.

    """
    # check inputs
    if not isinstance(baze, e_bz.Baze):
        raise ValueError(f"baze must be of type Baze, not {type(baze)}")

    self.baze: e_bz.Baze = baze

delete(points, period, time_zone='local', request_interval=timedelta(weeks=2))

Method used to delete values from the given points and objects within the desired period.

The request will be made per object and divided into smaller batches according to request_interval.

Parameters:

  • points

    (dict[str, list[str]]) –

    Dict in the format {object_name: [point, ...], ...}

  • period

    (DateTimeRange) –

    Period of time to get the data from. It is assumed to be in the same time zone as the one specified in time_zone.

  • time_zone

    (Literal['UTC', 'local'] | int, default: 'local' ) –

    In which time zone we assume the inputs are. Also used to define in which time zone the output is. There are three options:

    • If "UTC" is used, we assume time already is in UTC.
    • If local is used, the default time zone defined in echo_baze will be used.
    • If an int, must be between -12 and +12

    By default "local"

  • request_interval

    (timedelta, default: timedelta(weeks=2) ) –

    To avoid doing large requests, the requests will be made to Bazefield in batches considering this interval. By default timedelta(weeks=2)

Source code in echo_baze/point_values_series.py
@validate_call
def delete(
    self,
    points: dict[str, list[str]],
    period: DateTimeRange,
    time_zone: TimeZone = "local",
    request_interval: timedelta = timedelta(weeks=2),
) -> None:
    """Method used to delete values from the given points and objects within the desired period.

    The request will be made per object and divided into smaller batches according to request_interval.

    Parameters
    ----------
    points : dict[str, list[str]]
        Dict in the format {object_name: [point, ...], ...}
    period : DateTimeRange
        Period of time to get the data from. It is assumed to be in the same time zone as the one specified in time_zone.
    time_zone : Literal["UTC", "local"]| int, optional
        In which time zone we assume the inputs are. Also used to define in which time zone the output is. There are three options:

        - If "UTC" is used, we assume time already is in UTC.
        - If local is used, the default time zone defined in echo_baze will be used.
        - If an int, must be between -12 and +12

        By default "local"
    request_interval : timedelta, optional
        To avoid doing large requests, the requests will be made to Bazefield in batches considering this interval. By default timedelta(weeks=2)

    """
    t0 = time.perf_counter()

    # checking input
    if request_interval <= timedelta(0):
        raise ValueError(
            f"request_interval must be greater than 0, not {request_interval}",
        )

    # getting initial info
    object_name_to_id, _, object_models, object_model_points = self.baze.points._get_points_info(points)  # noqa

    # breaking the period into smaller periods
    # we are using normalize to make sure when getting aggregated data, the period starts at 00:00
    subperiods = period.split_multiple(separator=request_interval, normalize=False)

    # iterating each object
    for object_name, object_points in points.items():
        # checking if all points are available for the model
        not_found_points = set(object_points) - set(
            object_model_points[object_models[object_name]],
        )
        if len(not_found_points) > 0:
            raise ValueError(
                f"The following points were not found for object '{object_name}': {not_found_points}",
            )

        # iterating subperiods
        for subperiod in subperiods:
            # base payload, specific keys for aggregations will be added later
            payload = {
                "ObjectIds": [object_name_to_id[object_name]],
                "Points": object_points,
                "From": timestamp_from_datetime(
                    dt=subperiod.start,
                    time_zone=time_zone,
                    unit="milliseconds",
                ),
                "To": timestamp_from_datetime(
                    dt=subperiod.end,
                    time_zone=time_zone,
                    unit="milliseconds",
                ),
            }

            endpoint = "objects/points"

            # deleting the data
            result = self.baze.conn.delete(endpoint, json=payload)
            self._handle_http_errors(result)

    logger.debug(f"Deleted data in {time.perf_counter() - t0:.3f} s")

export_file(file_path, points, period, aggregation='Raw', aggregation_interval=None, time_zone='local', filter_quality=None, request_interval=timedelta(days=2), **kwargs)

Exports the time series of the given points and objects to a given file. The format of the file should be inferred from the file extension.

The request will be made per object and divided into smaller batches according to request_interval.

Parameters:

  • file_path

    (Path) –

    File path including extension. If the folder does not exist it will be created. The file extension will be validated against the allowed ones. Currently only supports "csv" and "xlsx".

  • points

    (dict[str, list[str]]) –

    Dict in the format {object_name: [point, ...], ...}

  • period

    (DateTimeRange) –

    Period of time to get the data from. It is assumed to be in the same time zone as the one specified in time_zone.

  • aggregation

    (SUPPORTED_AGGREGATIONS | list[SUPPORTED_AGGREGATIONS], default: 'Raw' ) –

    Type of aggregation to be used. Can be one of:

    • Raw
    • Interpolative
    • Total
    • Average
    • TimeAverage
    • Count
    • Stdev
    • MinimumActualtime
    • Minimum
    • MaximumActualtime
    • Maximum
    • Start
    • End
    • Delta
    • Variance
    • Range
    • DurationGood
    • DurationBad
    • PercentGood
    • PercentBad
    • WorstQuality

    A list of aggregations can be passed to get multiple aggregations at once.

    By default "Raw"

  • aggregation_interval

    (timedelta | None, default: None ) –

    Length of each timestamp. Not necessary if aggregation=="Raw". By default None

  • time_zone

    ( TimeZone, default: 'local' ) –

    In which time zone we assume the inputs are. Also used to define in which time zone the output is. There are three options:

    • If "UTC" is used, we assume time already is in UTC.
    • If local is used, the default time zone defined in echo_baze will be used.
    • If an int, must be between -12 and +12

    By default "local"

  • filter_quality

    (list[SUPPORTED_QUALITIES] | None, default: None ) –

    description, by default None

  • filter_quality

    (list[SUPPORTED_QUALITIES] | None, default: None ) –

    If not None, will only return the values that match the desired qualities. Must be a subset of ["Good", "Uncertain", "Bad"] By default None

  • request_interval

    (timedelta, default: timedelta(days=2) ) –

    To avoid doing large requests, the data will be requested from Bazefield in batches considering this interval. By default timedelta(days=2)

  • **kwargs

    sep : str, optional Separator to be used in the csv file. By default ";" decimal : str, optional Decimal separator to be used in the csv file. By default "." float_format : str, optional Format to be used when saving the float values. By default "%.3f" encoding : str, optional Encoding to be used when saving the file. By default "utf-8"

Source code in echo_baze/point_values_series.py
@validate_call
def export_file(
    self,
    file_path: Path,
    points: dict[str, list[str]],
    period: DateTimeRange,
    aggregation: SUPPORTED_AGGREGATIONS | list[SUPPORTED_AGGREGATIONS] = "Raw",
    aggregation_interval: timedelta | None = None,
    time_zone: TimeZone = "local",
    filter_quality: list[SUPPORTED_QUALITIES] | None = None,
    request_interval: timedelta = timedelta(days=2),
    **kwargs,
) -> None:
    """Exports the time series of the given points and objects to a given file. The format of the file should be inferred from the file extension.

    The request will be made per object and divided into smaller batches according to request_interval.

    Parameters
    ----------
    file_path : Path
        File path including extension. If the folder does not exist it will be created.
        The file extension will be validated against the allowed ones. Currently only supports "csv" and "xlsx".
    points : dict[str, list[str]]
        Dict in the format {object_name: [point, ...], ...}
    period : DateTimeRange
        Period of time to get the data from. It is assumed to be in the same time zone as the one specified in time_zone.
    aggregation : SUPPORTED_AGGREGATIONS | list[SUPPORTED_AGGREGATIONS], optional
        Type of aggregation to be used. Can be one of:

        - Raw
        - Interpolative
        - Total
        - Average
        - TimeAverage
        - Count
        - Stdev
        - MinimumActualtime
        - Minimum
        - MaximumActualtime
        - Maximum
        - Start
        - End
        - Delta
        - Variance
        - Range
        - DurationGood
        - DurationBad
        - PercentGood
        - PercentBad
        - WorstQuality

        A list of aggregations can be passed to get multiple aggregations at once.

        By default "Raw"
    aggregation_interval : timedelta | None, optional
        Length of each timestamp. Not necessary if aggregation=="Raw". By default None
    time_zone :  TimeZone, optional
        In which time zone we assume the inputs are. Also used to define in which time zone the output is. There are three options:

        - If "UTC" is used, we assume time already is in UTC.
        - If local is used, the default time zone defined in echo_baze will be used.
        - If an int, must be between -12 and +12

        By default "local"
    filter_quality : list[SUPPORTED_QUALITIES] | None, optional
        _description_, by default None
    filter_quality : list[SUPPORTED_QUALITIES] | None, optional
        If not None, will only return the values that match the desired qualities.
        Must be a subset of ["Good", "Uncertain", "Bad"]
        By default None
    request_interval : timedelta, optional
        To avoid doing large requests, the data will be requested from Bazefield in batches considering this interval. By default timedelta(days=2)
    **kwargs
        sep : str, optional
            Separator to be used in the csv file. By default ";"
        decimal : str, optional
            Decimal separator to be used in the csv file. By default "."
        float_format : str, optional
            Format to be used when saving the float values. By default "%.3f"
        encoding : str, optional
            Encoding to be used when saving the file. By default "utf-8"

    """
    t0 = time.perf_counter()

    allowed_extensions = ("csv", "xlsx")

    # checking extension
    if file_path.suffix[1:] not in allowed_extensions:
        raise ValueError(
            f"file_path extension must be one of {allowed_extensions}, not {file_path.suffix[1:]}",
        )

    # getting the data
    data = self.baze.points.values.series.get(
        points=points,
        period=period,
        aggregation=aggregation,
        aggregation_interval=aggregation_interval,
        time_zone=time_zone,
        output_type="DataFrame",
        return_quality=False,
        filter_quality=filter_quality,
        request_interval=request_interval,
    )

    # removing name of index
    data.index.name = None

    # creating folder if it does not exist
    file_path.parent.mkdir(parents=True, exist_ok=True)

    # saving the data
    match file_path.suffix[1:]:
        case "csv":
            data.to_csv(
                file_path,
                index=True,
                sep=kwargs.get("sep", ";"),
                decimal=kwargs.get("decimal", "."),
                float_format=kwargs.get("float_format", "%.3f"),
                encoding=kwargs.get("encoding", "utf-8"),
            )
        case "xlsx":
            from pandas import ExcelWriter  # noqa: PLC0415

            # creating excel writer
            writer = ExcelWriter(file_path, engine="xlsxwriter")

            # writing the data
            data.to_excel(writer, sheet_name="data", index=True)

            # formatting the sheet
            # index will be set to YYYY-mm-dd HH:MM:SS and width 20
            # other columns will be set to %.3f and width 15
            workbook = writer.book
            worksheet = writer.sheets["data"]

            # Formatting the columns
            date_format = workbook.add_format(
                {"num_format": "aaaa-mm-dd hh:mm:ss", "border": 1, "bold": True},
            )
            num_format = workbook.add_format(
                {"num_format": "0.000", "border": 1, "bold": False},
            )

            worksheet.set_column(0, 0, 20, date_format)
            worksheet.set_column(1, data.shape[1], 15, num_format)

            # Freeze panes
            worksheet.freeze_panes(2, 1)

            # hide grid lines
            worksheet.hide_gridlines(2)

            # Saving the workbook
            writer.close()
        case _:
            raise ValueError(
                f"file_path extension must be one of {allowed_extensions}, not {file_path.suffix[1:]}",
            )

    logger.debug(
        f"Exported data to {file_path} in {time.perf_counter() - t0:.3f} s",
    )

get(points, period, aggregation='Raw', aggregation_interval=None, time_zone='local', output_type='DataFrame', return_quality=False, filter_quality=None, request_interval=timedelta(days=2), objects_per_request=30, reindex=None, round_timestamps=None, value_dtype=(pl.Float32, 'float32[pyarrow]'))

Gets the time series of the given points and objects.

The request will be made per group of objects and divided into smaller batches according to request_interval. For faster response, do not do multiple calls of this function for different objects/points as the function has some overhead to check all the inputs. Calling the function once will always be faster.

Parameters:

  • points

    (dict[str, list[str]]) –

    Dict in the format {object_name: [point, ...], ...}

  • period

    (DateTimeRange) –

    Period of time to get the data from. It is assumed to be in the same time zone as the one specified in time_zone.

  • aggregation

    (SUPPORTED_AGGREGATIONS | list[SUPPORTED_AGGREGATIONS], default: 'Raw' ) –

    Type of aggregation to be used. Can be one of:

    • Raw
    • Interpolative
    • Total
    • Average
    • TimeAverage
    • Count
    • Stdev
    • MinimumActualtime
    • Minimum
    • MaximumActualtime
    • Maximum
    • Start
    • End
    • Delta
    • Variance
    • Range
    • DurationGood
    • DurationBad
    • PercentGood
    • PercentBad
    • WorstQuality

    A list of aggregations can be passed to get multiple aggregations at once.

    By default "Raw"

  • aggregation_interval

    (timedelta | None, default: None ) –

    Length of each timestamp. Not necessary if aggregation=="Raw". By default None

  • time_zone

    ( TimeZone, default: 'local' ) –

    In which time zone we assume the inputs are. Also used to define in which time zone the output is. There are three options:

    • If "UTC" is used, we assume time already is in UTC.
    • If local is used, the default time zone defined in echo_baze will be used.
    • If an int, must be between -12 and +12

    By default "local"

  • output_type

    (Literal['dict', 'DataFrame'], default: 'DataFrame' ) –

    Output type of the data. Can be one of ["dict", "DataFrame"] By default "DataFrame"

  • return_quality

    (bool, default: False ) –

    If set to True, the quality of the value will also be returned. By default False

  • filter_quality

    (list[SUPPORTED_QUALITIES] | None, default: None ) –

    If not None, will only return the values that match the desired qualities. Must be a subset of ["Good", "Uncertain", "Bad"] By default None

  • request_interval

    (timedelta, default: timedelta(days=2) ) –

    To avoid doing large requests, the data will be requested from Bazefield in batches considering this interval. By default timedelta(days=2)

  • objects_per_request

    (int, default: 30 ) –

    Number of objects to be requested in each batch. A value higher than one will only be valid if all the points are equal for all requested objects. If they are different the value will be forced to 1. By default, 30

  • reindex

    (str | None, default: None ) –

    String containing timedelta the should be considered to reindex the DataFrame (ex: "10min", "5min", etc.).

    This does not resample the DataFrame! It is only used to get all the timestamps if some are missing from the data (the created timestamps will be set to NaN).

    If set to "infer" the timedelta will be inferred from the acquired data from the database. If set to None no reindex will be done.

    By default None

    Only applicable if output_type is "DataFrame".

  • round_timestamps

    (RoundTimeStampsDict | None, default: None ) –

    Dictionary used to round timestamps to the nearest expected timestamp. Contains the following keys:

    • freq: timedelta, the frequency to round the timestamps to.
    • tolerance: timedelta, the tolerance to be used when rounding timestamps

    If set to None, no rounding will be done. Only applicable if output_type is "DataFrame". By default None

  • value_dtype

    (tuple[Any, Any], default: (Float32, 'float32[pyarrow]') ) –

    Tuple with the dtype to be used for the values. The first element is the dtype to be used in the internal polars DataFrame (like pl.Float64) and the second is the dtype to be used in the final pandas DataFrame (like "float64[pyarrow]").

Returns:

  • dict[str, dict[str, list[dict[str, datetime | float | int]]]]

    In case output_type == "dict" and aggregation is a string (only one aggregation), it will return a dict with the following format: {object_name: {point: [{t: time, v: value, q: quality}, ...], ...}, ...}

  • dict[str, dict[str, dict[str, list[dict[str, datetime | float | int]]]]]

    In case output_type == "dict" and aggregation is a list of aggregations, it will return a dict with the following format: {object_name: {point: {aggregation: [{t: time, v: value, q: quality}, ...], ...}, ...}, ...}

  • DataFrame

    In case output_type == "DataFrame" it will return a DataFrame with time in the index. The columns will be a MultiIndex with levels = [object_name, point, aggregation, quantity], where quantity = [value, quality]. In case aggregation is a string (only one aggregation), the aggregation level will not exist in the columns. In case return_quality is False, the quality level will not exist in the columns.

Source code in echo_baze/point_values_series.py
@validate_call
def get(
    self,
    points: dict[str, list[str]],
    period: DateTimeRange,
    aggregation: SUPPORTED_AGGREGATIONS | list[SUPPORTED_AGGREGATIONS] = "Raw",
    aggregation_interval: timedelta | None = None,
    time_zone: TimeZone = "local",
    output_type: Literal["dict", "DataFrame"] = "DataFrame",
    return_quality: bool = False,
    filter_quality: list[SUPPORTED_QUALITIES] | None = None,
    request_interval: timedelta = timedelta(days=2),
    objects_per_request: int = 30,
    reindex: str | None = None,
    round_timestamps: RoundTimeStampsDict | None = None,
    value_dtype: tuple[Any, Any] = (pl.Float32, "float32[pyarrow]"),
) -> (
    dict[str, dict[str, list[dict[str, datetime | float | int]]]]
    | dict[str, dict[str, dict[str, list[dict[str, datetime | float | int]]]]]
    | pd.DataFrame
):
    """Gets the time series of the given points and objects.

    The request will be made per group of objects and divided into smaller batches according to request_interval. For faster response, do not do multiple calls of this function for different objects/points as the function has some overhead to check all the inputs. Calling the function once will always be faster.

    Parameters
    ----------
    points : dict[str, list[str]]
        Dict in the format {object_name: [point, ...], ...}
    period : DateTimeRange
        Period of time to get the data from. It is assumed to be in the same time zone as the one specified in time_zone.
    aggregation : SUPPORTED_AGGREGATIONS | list[SUPPORTED_AGGREGATIONS], optional
        Type of aggregation to be used. Can be one of:

        - Raw
        - Interpolative
        - Total
        - Average
        - TimeAverage
        - Count
        - Stdev
        - MinimumActualtime
        - Minimum
        - MaximumActualtime
        - Maximum
        - Start
        - End
        - Delta
        - Variance
        - Range
        - DurationGood
        - DurationBad
        - PercentGood
        - PercentBad
        - WorstQuality

        A list of aggregations can be passed to get multiple aggregations at once.

        By default "Raw"
    aggregation_interval : timedelta | None, optional
        Length of each timestamp. Not necessary if aggregation=="Raw". By default None
    time_zone :  TimeZone, optional
        In which time zone we assume the inputs are. Also used to define in which time zone the output is. There are three options:

        - If "UTC" is used, we assume time already is in UTC.
        - If local is used, the default time zone defined in echo_baze will be used.
        - If an int, must be between -12 and +12

        By default "local"
    output_type : Literal["dict", "DataFrame"], optional
        Output type of the data. Can be one of ["dict", "DataFrame"]
        By default "DataFrame"
    return_quality : bool, optional
        If set to True, the quality of the value will also be returned. By default False
    filter_quality : list[SUPPORTED_QUALITIES] | None, optional
        If not None, will only return the values that match the desired qualities.
        Must be a subset of ["Good", "Uncertain", "Bad"]
        By default None
    request_interval : timedelta, optional
        To avoid doing large requests, the data will be requested from Bazefield in batches considering this interval. By default timedelta(days=2)
    objects_per_request : int, optional
        Number of objects to be requested in each batch.
        A value higher than one will only be valid if all the points are equal for all requested objects. If they are different the value will be forced to 1.
        By default, 30
    reindex : str | None, optional
        String containing timedelta the should be considered to reindex the DataFrame (ex: "10min", "5min", etc.).

        This does not resample the DataFrame! It is only used to get all the timestamps if some are missing from the data (the created timestamps will be set to NaN).

        If set to "infer" the timedelta will be inferred from the acquired data from the database.
        If set to None no reindex will be done.

        By default None

        Only applicable if output_type is "DataFrame".
    round_timestamps : RoundTimeStampsDict | None, optional
        Dictionary used to round timestamps to the nearest expected timestamp. Contains the following keys:

        - freq: timedelta, the frequency to round the timestamps to.
        - tolerance: timedelta, the tolerance to be used when rounding timestamps

        If set to None, no rounding will be done. Only applicable if output_type is "DataFrame".
        By default None
    value_dtype : tuple[Any, Any], optional
        Tuple with the dtype to be used for the values. The first element is the dtype to be used in the internal polars DataFrame (like pl.Float64) and the second is the dtype to be used in the final pandas DataFrame (like "float64[pyarrow]").

    Returns
    -------
    dict[str, dict[str, list[dict[str, datetime | float | int]]]]
        In case output_type == "dict" and aggregation is a string (only one aggregation), it will return a dict with the following format: {object_name: {point: [{t: time, v: value, q: quality}, ...], ...}, ...}
    dict[str, dict[str, dict[str, list[dict[str, datetime | float | int]]]]]
        In case output_type == "dict" and aggregation is a list of aggregations, it will return a dict with the following format: {object_name: {point: {aggregation: [{t: time, v: value, q: quality}, ...], ...}, ...}, ...}
    DataFrame
        In case output_type == "DataFrame" it will return a DataFrame with time in the index. The columns will be a MultiIndex with levels = [object_name, point, aggregation, quantity], where quantity = [value, quality].
        In case aggregation is a string (only one aggregation), the aggregation level will not exist in the columns.
        In case return_quality is False, the quality level will not exist in the columns.

    """
    t0 = time.perf_counter()

    # checking input
    if isinstance(aggregation, list) and len(aggregation) != len(set(aggregation)):
        raise ValueError("aggregation must have unique values")
    if (
        (isinstance(aggregation, str) and aggregation != "Raw")
        or (isinstance(aggregation, list) and any(agg != "Raw" for agg in aggregation))
    ) and aggregation_interval is None:
        raise ValueError("aggregation_interval must be set if aggregation != 'Raw'")

    if request_interval <= timedelta(0):
        raise ValueError(
            f"request_interval must be greater than 0, not {request_interval}",
        )
    if not isinstance(objects_per_request, int) and objects_per_request > 0:
        raise TypeError(
            f"objects_per_request must be an int greater than 0, not {objects_per_request}",
        )
    if reindex and output_type != "DataFrame":
        raise ValueError(
            "reindex can only be used if output_type is 'DataFrame'",
        )
    if round_timestamps and output_type != "DataFrame":
        raise ValueError(
            "round_timestamps can only be used if output_type is 'DataFrame'",
        )
    if len(value_dtype) != 2:
        raise ValueError(
            f"value_dtype must be a tuple with 2 elements, not {len(value_dtype)}",
        )

    # checking if all points are equal for all objects. If that is not the case reduce objects per request to 1
    if objects_per_request > 1:
        ref_points = points[next(iter(points.keys()))]
        for obj_points in points.values():
            if obj_points != ref_points:
                logger.debug("Points are not equal for all objects. Reducing objects_per_request to 1.")
                objects_per_request = 1
                break

    # getting initial info
    object_name_to_id, object_id_to_name, object_models, object_model_points = self.baze.points._get_points_info(points)  # noqa
    all_points = set()
    for value in points.values():
        all_points = all_points.union(set(value))
    all_points = list(all_points)

    # dict that gets pretty names for the aggregations
    aggregation_pretty_names = {k.upper(): k for k in AGGREGATE_IDS} | {
        "NOAGGREGATE": "Raw",
    }

    # list with aggregations to be requested
    aggregation_groups = [aggregation] if isinstance(aggregation, str) else aggregation
    # separating raw from the rest
    aggregation_groups = [agg for agg in aggregation_groups if agg != "Raw"]
    if aggregation_groups:
        aggregation_groups = [aggregation_groups]
    if "Raw" in aggregation:
        # if Raw is in the aggregations, we need to request it separately
        aggregation_groups.append(["Raw"])

    # breaking the period into smaller periods
    # we are using normalize to make sure when getting aggregated data, the period starts at 00:00
    subperiods = period.split_multiple(separator=request_interval, normalize=True)

    logger.debug(f"Initial info gathered in {time.perf_counter() - t0:.3f} s")

    # dividing the objects into groups to avoid large requests
    request_obj_groups = [
        list(object_name_to_id.keys())[i : i + objects_per_request] for i in range(0, len(object_name_to_id), objects_per_request)
    ]

    # list of DataFrames to be concatenated and later processed
    result_df_list = []

    # columns that can be ignored
    drop_cols = ["t_local"]
    if not return_quality and not filter_quality:
        drop_cols.append("q")
    if isinstance(aggregation, str):
        drop_cols.append("aggregation")

    schema = {
        "t": pl.Int64,
        "v": value_dtype[0],
        "object_name": pl.Enum(list(object_name_to_id.keys())),
        "point": pl.Enum(all_points),
    }
    if "q" not in drop_cols:
        schema["q"] = pl.Int64
    if "aggregation" not in drop_cols:
        schema["aggregation"] = pl.Enum(aggregation if isinstance(aggregation, list) else [aggregation])

    # getting the values for each object
    for objs in request_obj_groups:
        # getting the points for the objects
        object_points = points[objs[0]]

        # checking if all points are available for the model
        for object_name in objs:
            not_found_points = set(object_points) - set(
                object_model_points[object_models[object_name]],
            )
            if len(not_found_points) > 0:
                raise ValueError(
                    f"The following points were not found for object '{object_name}': {not_found_points}",
                )

        # iterating over the aggregation groups
        for aggregation_group in aggregation_groups:
            # iterating subperiods
            for subperiod in subperiods:
                # base payload, specific keys for aggregations will be added later
                payload = {
                    "ObjectIds": [object_name_to_id[object_name] for object_name in objs],
                    "Points": object_points,
                    "From": timestamp_from_datetime(
                        dt=subperiod.start,
                        time_zone=time_zone,
                        unit="milliseconds",
                    ),
                    # adding 1 to the end to make sure we get all data (Bazefield seems to be exclusive on the end timestamp)
                    "To": timestamp_from_datetime(
                        dt=subperiod.end,
                        time_zone=time_zone,
                        unit="milliseconds",
                    )
                    + 1,
                    "Take": 0,  # this makes sure we get all data
                }

                # getting Raw data
                if aggregation_group == ["Raw"]:
                    endpoint = "objects/timeseries"
                else:
                    endpoint = "objects/timeseries/aggregated"
                    payload["Aggregates"] = [AGGREGATE_IDS[agg] for agg in aggregation_group]
                    payload["Interval"] = int(aggregation_interval.total_seconds())

                # getting the data
                t1 = time.perf_counter()
                result = self.baze.conn.get(endpoint, json=payload)
                self._handle_http_errors(result)
                # converting to dict
                result: dict[str, dict[str, dict[str, dict[str, Any]]]] = result.json()["objects"]

                t2 = time.perf_counter()

                # iterating results to convert to DataFrames for later processing
                # first objects
                for obj_id in list(result.keys()):
                    if "points" not in result[obj_id]:
                        logger.warning(f"No data found for object '{object_id_to_name[obj_id]}'")
                        # free memory
                        del result[obj_id]
                        continue
                    # then points
                    for point_name in list(result[obj_id]["points"].keys()):
                        # then Aggregates
                        while len(result[obj_id]["points"][point_name]) > 0:
                            # pop first item in list
                            agg_vals = result[obj_id]["points"][point_name].pop(0)
                            # converting time series to DataFrame
                            df = pl.from_dicts(agg_vals["timeSeries"], schema=schema)
                            # adding object_name, point_name and aggregation to the DataFrame
                            df = df.with_columns(
                                object_name=pl.lit(object_id_to_name[obj_id]).cast(schema["object_name"]),
                                point=pl.lit(point_name).cast(schema["point"]),
                            )
                            if "aggregation" not in drop_cols:
                                df = df.with_columns(
                                    aggregation=pl.lit(aggregation_pretty_names[agg_vals["aggregate"]]).cast(schema["aggregation"]),
                                )
                            # adding to the list
                            result_df_list.append(df)

                            # free memory
                            del agg_vals

                        # free memory
                        del result[obj_id]["points"][point_name]
                    # free memory
                    del result[obj_id]
                del result

                logger.debug(
                    f"Request response for {objs} {aggregation_group} {object_points} {subperiod.start:%Y-%m-%d %H:%M:%S} to {subperiod.end:%Y-%m-%d %H:%M:%S} in {t2 - t1:.3f} s. Processed request response in {time.perf_counter() - t2:.3f} s",
                )

    # concatenating the list of DataFrames
    # using copy=False to avoid copying the data if not needed, saving memory
    if len(result_df_list) > 0:  # noqa: SIM108
        # concatenating the DataFrames
        df = pl.concat(result_df_list, how="vertical")
    else:
        # creating empty DataFrame with the correct columns
        df = pl.DataFrame(
            schema=schema,
        )
    del result_df_list

    # dropping duplicates on t, object_name, point, aggregation
    subset = ["t", "object_name", "point", "aggregation"]
    subset = [col for col in subset if col in df.columns]
    df = df.unique(subset=subset)

    # converting t to datetime
    time_zone_modifier = 0
    if time_zone != "UTC":
        time_zone_modifier = DEFAULT_TIME_ZONE if time_zone == "local" else time_zone
    df = df.with_columns(
        t=pl.col("t").cast(pl.Datetime(time_unit="ms")) + timedelta(hours=time_zone_modifier),
    )

    # adjusting quality column if requested
    if return_quality or filter_quality:
        # adjusting quality column using cast_quality method
        q_dtype = pl.Enum(list(TAG_QUALITY_CODES.keys()))
        # TODO we might want to not use cast_quality in favor of a better implementation using polars
        df = df.with_columns(
            q=pl.col("q").map_elements(lambda x: cast_quality(x, "str"), skip_nulls=True, return_dtype=pl.String).cast(q_dtype),
        )

        # filter_quality
        if filter_quality:
            df = df.filter(pl.col("q").is_in(filter_quality))

    # dropping quality column if not requested
    if not return_quality:
        df = df.drop("q", strict=False)

    # converting to pandas DataFrame

    # converting df to correct types
    dtypes = {
        "t": "datetime64[ms]",
        "v": value_dtype[1],
        "q": "category",
        "object_name": "category",
        "point": "category",
        "aggregation": "category",
    }
    dtypes = {k: v for k, v in dtypes.items() if k in df.columns}

    # sorting the DataFrame by t, object_name, point and aggregation
    sort_cols = ["t", "object_name", "point", "aggregation"]
    sort_cols = [col for col in sort_cols if col in df.columns]
    df = df.sort(by=sort_cols)

    df = df.to_pandas(use_pyarrow_extension_array=True)
    df = df.astype(dtypes)

    # returning on the desired format
    match output_type:
        case "dict":
            t1 = time.perf_counter()
            index_cols = ["object_name", "point"]
            if not isinstance(aggregation, str):
                index_cols.append("aggregation")
            df = df.set_index(index_cols)
            if df.empty:
                logger.warning(f"No data found for {points} and period {period}")
                results = {}
            else:
                # grouping by object_name, point and aggregation to create a list of dicts
                result = df.groupby(level=index_cols, observed=True).apply(lambda x: x.to_dict(orient="records")).to_dict()
                # unpacking dict keys from tuples to nested dicts
                results = {}
                for key, value in result.items():
                    obj_name, point_name, *agg = key
                    if len(agg) == 0:
                        results.setdefault(obj_name, {})[point_name] = value
                    else:
                        results.setdefault(obj_name, {}).setdefault(point_name, {})[agg[0]] = value

            # making sure we have dict keys for all wanted objects, points, aggregations and quantities as if there is no data they will not be created automatically
            for obj, obj_points in points.items():
                if obj not in results:
                    results[obj] = {}
                for obj_point in obj_points:
                    if obj_point not in results[obj]:
                        results[obj][obj_point] = {} if not isinstance(aggregation, str) else []
                if not isinstance(aggregation, str):
                    for agg in aggregation:
                        results[obj][obj_point][agg] = []

            logger.debug(
                f"Converted to dict in {time.perf_counter() - t1:.3f} s",
            )
            del df

            return results

        case "DataFrame":
            t1 = time.perf_counter()
            # converting to DataFrame

            # creating quantity column if quality is requested
            # we need to melt the DataFrame to have the quantity as a column
            if return_quality:
                df = df.melt(
                    id_vars=df.columns.difference(["v", "q"]).tolist(),
                    value_vars=["v", "q"],
                    var_name="quantity",
                    value_name="value",
                )
                # replacing q with quality and v with value
                df["quantity"] = df["quantity"].replace({"q": "quality", "v": "value"})
                df = df.rename(columns={"value": "v"})

            index_cols = ["t", "object_name", "point"]
            if not isinstance(aggregation, str):
                index_cols.append("aggregation")
            if return_quality:
                index_cols.append("quantity")
            df = df.set_index(index_cols).unstack(
                level=index_cols[1:],
            )
            df = df.droplevel(0, axis=1)  # dropping the first level of columns (v) as it is not needed
            df.index.name = "time"

            # making sure we have columns for all wanted objects, points, aggregations and quantities as if there is no data they will not be created automatically
            concat_list = []
            for obj, obj_points in points.items():
                level_values = [[obj], obj_points]
                level_names = ["object_name", "point"]
                if isinstance(aggregation, list):
                    level_values.append(aggregation)
                    level_names.append("aggregation")
                if return_quality:
                    level_values.append(["quality", "value"])
                    level_names.append("quantity")

                complete_columns = pd.MultiIndex.from_product(level_values, names=level_names)
                add_columns = complete_columns.difference(df.columns)
                if len(add_columns) > 0:
                    concat_df = pd.DataFrame(columns=add_columns)
                    concat_list.append(concat_df)
            df = pd.concat([df, *concat_list], axis=1, copy=False)

            # changing dtype of columns to string[pyarrow]
            for i in range(len(df.columns.names)):
                df.columns = df.columns.set_levels(
                    df.columns.levels[i].astype("string[pyarrow]"),
                    level=i,
                )

            # sorting the columns and index
            df = df.sort_index(axis=1)
            df = df.sort_index(axis=0)

            results = df

            logger.debug(
                f"Converted to DataFrame in {time.perf_counter() - t1:.3f} s",
            )
        case _:
            raise ValueError(
                f"output_type must be one of ['dict', 'DataFrame'], got '{output_type}'",
            )

    # forcing the index to be a DatetimeIndex
    results.index = results.index.astype("datetime64[s]")

    # rounding timestamps if requested
    if round_timestamps is not None:
        results = round_ts(df=results, **round_timestamps)

    # reindex if requested
    if reindex is not None:
        final_reindex = copy(reindex)

        # reindexing DataFrame to have all timestamps if desired
        if final_reindex is not None:
            # inferring frequency if reindex is "infer"
            if final_reindex == "infer" and len(results) > 3:
                final_reindex = pd.infer_freq(results.index)
            elif final_reindex == "infer":
                logger.debug("Cannot infer frequency from data because it has less than 3 timestamps. Using '10min' as default.")
                final_reindex = "10min"

            # if failed (returned None), lets try other function
            if final_reindex is None:
                final_reindex = get_index_freq(results)

            # if still failed, lets raise an error
            if final_reindex is None:
                raise RuntimeError("Cannot infer frequency from data. Please set 'reindex' argument manually.")

            new_index = pd.date_range(
                start=period["start"].replace(minute=0, second=0),
                end=period["end"].replace(minute=0, second=0) + timedelta(hours=1),
                freq=final_reindex,
            )
            results = results.reindex(new_index, method=None)
            # restricting to desired period only
            results = results[(results.index >= period["start"]) & (results.index <= period["end"])].copy()

    logger.debug(
        f"Got time series {aggregation} data between {period} for {points} in {output_type} format in {time.perf_counter() - t0:.3f} s",
    )
    return results

insert(data, time_zone='local', delete_before=False, request_interval=timedelta(days=2))

Inserts time series data into Bazefield.

The request will be made per object and divided into smaller batches according to request_interval.

Parameters:

  • data

    (DataFrame | DataFrame) –

    DataFrame with the data to be uploaded.

    In case it is a Pandas DataFrame, it must have the following structure: - Index with name "time" or "timestamp" and dtype timestamp[pyarrow] or datetime64. - Columns as a MultiIndex with levels "object_name" and "point". - Values must be convertible to float.

    In case it is a Polars DataFrame, it must have the following structure: - It must be in the format: columns=["timestamp", "object_name@point_name", ...]. - The "timestamp" column must be of type Datetime and the values must be convertible to float.

  • time_zone

    (Literal['UTC', 'local'] | int, default: 'local' ) –

    In which time zone we assume the inputs are. Also used to define in which time zone the output is. There are three options:

    • If "UTC" is used, we assume time already is in UTC.
    • If local is used, the default time zone defined in echo_baze will be used.
    • If an int, must be between -12 and +12

    By default "local"

  • delete_before

    (bool, default: False ) –

    If set to True, will delete all the data in the period present in data before uploading. This is useful to make sure that no past data is kept when it should now be None. By default False

  • request_interval

    (timedelta, default: timedelta(days=2) ) –

    To avoid doing large requests, the requests will be made to Bazefield in batches considering this interval. By default timedelta(days=2)

Source code in echo_baze/point_values_series.py
@validate_call
def insert(
    self,
    data: pd.DataFrame | pl.DataFrame,
    time_zone: TimeZone = "local",
    delete_before: bool = False,
    request_interval: timedelta = timedelta(days=2),
) -> None:
    """Inserts time series data into Bazefield.

    The request will be made per object and divided into smaller batches according to request_interval.

    Parameters
    ----------
    data : pd.DataFrame | pl.DataFrame
        DataFrame with the data to be uploaded.

        In case it is a **Pandas** DataFrame, it must have the following structure:
        - Index with name "time" or "timestamp" and dtype timestamp[pyarrow] or datetime64.
        - Columns as a MultiIndex with levels "object_name" and "point".
        - Values must be convertible to float.

        In case it is a **Polars** DataFrame, it must have the following structure:
        - It must be in the format: columns=["timestamp", "object_name@point_name", ...].
        - The "timestamp" column must be of type Datetime and the values must be convertible to float.

    time_zone : Literal["UTC", "local"] | int, optional
        In which time zone we assume the inputs are. Also used to define in which time zone the output is. There are three options:

        - If "UTC" is used, we assume time already is in UTC.
        - If local is used, the default time zone defined in echo_baze will be used.
        - If an int, must be between -12 and +12

        By default "local"
    delete_before : bool, optional
        If set to True, will delete all the data in the period present in data before uploading. This is useful to make sure that no past data is kept when it should now be None. By default False
    request_interval : timedelta, optional
        To avoid doing large requests, the requests will be made to Bazefield in batches considering this interval. By default timedelta(days=2)

    """
    t0 = time.perf_counter()

    # in case it is a Pandas DataFrame:

    if isinstance(data, pd.DataFrame):
        data = data.copy()
        # checking input
        if data.index.name not in ("time", "timestamp") or (
            "datetime" not in str(data.index.dtype) and "timestamp" not in str(data.index.dtype)
        ):
            raise TypeError(
                "data index must be a DatetimeIndex with name 'time' or 'timestamp' and dtype timestamp[pyarrow] or datetime64",
            )
        for col, dtype in data.dtypes.to_dict().items():
            # trying to convert to double[pyarrow]
            try:
                data[col] = data[col].astype("float64[pyarrow]")
            except Exception as e:
                raise TypeError(
                    f"data column '{col}' must be convertible to float, got {dtype}",
                ) from e
        if not isinstance(data.columns, pd.MultiIndex) or data.columns.names != [
            "object_name",
            "point",
        ]:
            raise TypeError(
                "data must have columns as a Multindex with names 'object_name' and 'point'",
            )

        # joining the two column levels into one
        data.columns = data.columns.map(lambda x: f"{x[0]}@{x[1]}")

        # forcing the index to be named timestamp and them resetting it
        data.index.name = "timestamp"
        data = data.reset_index(drop=False)

        # converting the DataFrame to Polars
        data = pl.from_pandas(data)

    # polars checks
    data = data.clone()

    # checking if column timestamp exists
    if "timestamp" not in data.columns:
        raise ValueError("data must have a column 'timestamp'")
    # checking if timestamp column is of type Datetime
    if not isinstance(data["timestamp"].dtype, pl.Datetime):
        raise ValueError("data 'timestamp' column must be of type Datetime")

    # getting the list of all other columns and checking if they are in the format "object_name@point_name"
    # a dict like {object: [point1, point2, ...], ...} will be created
    points = {}
    for col in data.columns:
        if col == "timestamp":
            continue
        # checking if the column is in the format "object_name@point_name"
        if "@" not in col:
            raise ValueError(f"Column {col} must be in the format 'object_name@point_name'")
        # splitting the column name into object and point
        obj, feat = col.split("@")
        if not obj or not feat:
            raise ValueError(f"Column {col} must be in the format 'object_name@point_name' with non-empty object and point names")
        # adding the point to the object in the dict
        if obj not in points:
            points[obj] = []
        points[obj].append(feat)

        # making sure the data type is Float64, if not convert it
        if not isinstance(data[col].dtype, pl.Float64):
            try:
                data = data.with_columns(pl.col(col).cast(pl.Float64))
            except Exception as e:
                raise ValueError(f"Column {col} must be convertible to Float64") from e

    # skipping if data is empty
    if len(data) == 0:
        return

    # getting initial info
    object_name_to_id, _, object_models, object_model_points = self.baze.points._get_points_info(points)  # noqa

    # getting total period
    period = DateTimeRange(data["timestamp"].min(), data["timestamp"].max())

    # breaking the period into smaller periods
    # we are using normalize to make sure when getting aggregated data, the period starts at 00:00
    subperiods = period.split_multiple(separator=request_interval, normalize=False)

    # deleting data if requested
    if delete_before:
        self.baze.points.values.series.delete(
            points=points,
            period=period,
            time_zone=time_zone,
            request_interval=request_interval,
        )

    # iterating each object
    for object_name, object_points in points.items():
        upload_points = object_points.copy()
        # checking if all points are available for the model
        not_found_points = set(upload_points) - set(
            object_model_points[object_models[object_name]],
        )
        if len(not_found_points) > 0:
            logger.error(f"The following points were not found and will be skipped '{object_name}': {not_found_points}")
            # removing the points that were not found
            upload_points = [point for point in upload_points if point not in not_found_points]
            if not upload_points:
                continue

        # getting a slice of the DataFrame for the object
        obj_df = data.select(
            pl.col("timestamp"),
            *[pl.col(f"{object_name}@{point}") for point in object_points],
        )

        # converting to a lazy frame for faster processing
        obj_df = obj_df.lazy()

        # unpivoting the DataFrame to have columns timestamp and point
        obj_df = (
            obj_df.unpivot(
                index=["timestamp"],
                variable_name="point",
                value_name="value",
            )
            # Split the 'point' column into a struct with named fields
            .with_columns(
                pl.col("point")
                .str.split_exact("@", 1)  # Use split_exact for fixed number of splits
                .struct.rename_fields(["object_name", "point_name"])
                .alias("point_struct"),
            )
            # Unnest the struct columns into the main DataFrame
            .unnest("point_struct")
            # Cast to Enum (less memory usage) and drop the original point column
            .with_columns(
                pl.col("point_name").cast(pl.Enum(categories=object_points)).alias("point_name"),
            )
            # drop the original 'point' column
            .drop(["point", "object_name"])
            # drop all NA rows
            .drop_nulls()
            # dropping duplicate rows
            .unique(subset=["timestamp", "point_name"])
            # sorting by timestamp, point_name
            .sort(["timestamp", "point_name"])
        )

        timezone_offset = 0 if time_zone == "UTC" else DEFAULT_TIME_ZONE if time_zone == "local" else time_zone

        # converting timestamp to milliseconds since epoch and then subtracting the time zone offset in milliseconds
        obj_df = obj_df.with_columns(
            # first converting timestamp to milliseconds since epoch
            pl.col("timestamp").dt.epoch(time_unit="ms").alias(name="epoch"),
        )

        # then subtracting the time zone offset in milliseconds
        if timezone_offset != 0:
            obj_df = obj_df.with_columns(
                pl.col("epoch") - (timezone_offset * 3600 * 1000),  # converting hours to milliseconds
            )

        # converting value column to a struct with t, v, q, where:
        # - t is the timestamp in milliseconds since epoch
        # - v is the value as float
        # - q is the quality as 192 (good quality)

        obj_df = obj_df.with_columns(
            pl.struct(
                [
                    pl.col("epoch").cast(pl.Int64).alias("t"),  # timestamp in milliseconds since epoch
                    pl.col("value").cast(pl.Float64).alias("v"),  # value as float
                    pl.lit(192).cast(pl.Int16).alias("q"),  # quality as 192 (good quality)
                ],
            ).alias("point_values"),
        )

        # iterating subperiods
        for subperiod in subperiods:
            # getting only wanted part of the DataFrame
            sub_obj_df = obj_df.filter(
                (pl.col("timestamp") >= subperiod.start) & (pl.col("timestamp") <= subperiod.end),
            )

            # collecting results for point_values and point_name as they are the only needed columns
            sub_obj_df = sub_obj_df.select(
                pl.col("point_name"),
                pl.col("point_values"),
            ).collect()

            # if the DataFrame is empty, skip it
            if sub_obj_df.is_empty():
                continue

            # creating points dict with the structure {point_name: [{"t": timestamp, "v": value, "q": quality}, ...], ...}
            points_dict = {
                point_name: sub_obj_df.filter(pl.col("point_name") == point_name)["point_values"].to_list()
                for point_name in sub_obj_df["point_name"].unique().to_list()
            }

            # getting dict to upload
            payload = {
                "data": [
                    {
                        "objectId": object_name_to_id[object_name],
                        "points": points_dict,
                    },
                ],
            }

            # popping NA values in the dict
            for point in payload["data"][0]["points"]:
                payload["data"][0]["points"][point] = [val for val in payload["data"][0]["points"][point] if not pd.isna(val["v"])]

            # removing empty points
            payload["data"][0]["points"] = {point: values for point, values in payload["data"][0]["points"].items() if len(values) > 0}

            # skipping if no data to upload
            if len(payload["data"][0]["points"]) == 0:
                continue

            endpoint = "objects/points/insert"

            # inserting the data (trying 3 times just to be sure)
            total_points_sent = sum(len(values) for values in payload["data"][0]["points"].values())
            succes = False
            try_number = 1
            while not succes and try_number <= 3:
                try:
                    result = self.baze.conn.post(endpoint, json=payload)
                    self._handle_http_errors(result)
                    result_json = result.json()
                    # making sure number of inserted and published points are the same as the ones we sent
                    if result_json["inserted"] != total_points_sent or result_json["published"] != total_points_sent:
                        raise ValueError("Bazefield replied saying we inserted/published less points than the ones sent")
                    succes = True
                except Exception as e:  # noqa
                    logger.exception(
                        f"Tentative {try_number} - Failed to insert data for {object_name} in {subperiod.start:%Y-%m-%d %H:%M:%S} to {subperiod.end:%Y-%m-%d %H:%M:%S}",
                    )
                    try_number += 1
    logger.debug(f"Inserted data in {time.perf_counter() - t0:.3f} s")