Point Values - Series¶
PointValuesSeries(baze)
¶
Class used for handling a point value time series.
Source code in echo_baze/baze_root.py
def __init__(self, baze: e_bz.Baze) -> None:
"""Base class that all subclasses should inherit from.
Parameters
----------
baze : Baze
Top level object carrying all functionality and the connection handler.
"""
# check inputs
if not isinstance(baze, e_bz.Baze):
raise ValueError(f"baze must be of type Baze, not {type(baze)}")
self.baze: e_bz.Baze = baze
delete(points, period, time_zone='local', request_interval=timedelta(weeks=2))
¶
Method used to delete values from the given points and objects within the desired period.
The request will be made per object and divided into smaller batches according to request_interval.
Parameters:
-
(points¶dict[str, list[str]]) –Dict in the format {object_name: [point, ...], ...}
-
(period¶DateTimeRange) –Period of time to get the data from. It is assumed to be in the same time zone as the one specified in time_zone.
-
(time_zone¶Literal['UTC', 'local'] | int, default:'local') –In which time zone we assume the inputs are. Also used to define in which time zone the output is. There are three options:
- If "UTC" is used, we assume time already is in UTC.
- If local is used, the default time zone defined in echo_baze will be used.
- If an int, must be between -12 and +12
By default "local"
-
(request_interval¶timedelta, default:timedelta(weeks=2)) –To avoid doing large requests, the requests will be made to Bazefield in batches considering this interval. By default timedelta(weeks=2)
Source code in echo_baze/point_values_series.py
@validate_call
def delete(
self,
points: dict[str, list[str]],
period: DateTimeRange,
time_zone: TimeZone = "local",
request_interval: timedelta = timedelta(weeks=2),
) -> None:
"""Method used to delete values from the given points and objects within the desired period.
The request will be made per object and divided into smaller batches according to request_interval.
Parameters
----------
points : dict[str, list[str]]
Dict in the format {object_name: [point, ...], ...}
period : DateTimeRange
Period of time to get the data from. It is assumed to be in the same time zone as the one specified in time_zone.
time_zone : Literal["UTC", "local"]| int, optional
In which time zone we assume the inputs are. Also used to define in which time zone the output is. There are three options:
- If "UTC" is used, we assume time already is in UTC.
- If local is used, the default time zone defined in echo_baze will be used.
- If an int, must be between -12 and +12
By default "local"
request_interval : timedelta, optional
To avoid doing large requests, the requests will be made to Bazefield in batches considering this interval. By default timedelta(weeks=2)
"""
t0 = time.perf_counter()
# checking input
if request_interval <= timedelta(0):
raise ValueError(
f"request_interval must be greater than 0, not {request_interval}",
)
# getting initial info
object_name_to_id, _, object_models, object_model_points = self.baze.points._get_points_info(points) # noqa
# breaking the period into smaller periods
# we are using normalize to make sure when getting aggregated data, the period starts at 00:00
subperiods = period.split_multiple(separator=request_interval, normalize=False)
# iterating each object
for object_name, object_points in points.items():
# checking if all points are available for the model
not_found_points = set(object_points) - set(
object_model_points[object_models[object_name]],
)
if len(not_found_points) > 0:
raise ValueError(
f"The following points were not found for object '{object_name}': {not_found_points}",
)
# iterating subperiods
for subperiod in subperiods:
# base payload, specific keys for aggregations will be added later
payload = {
"ObjectIds": [object_name_to_id[object_name]],
"Points": object_points,
"From": timestamp_from_datetime(
dt=subperiod.start,
time_zone=time_zone,
unit="milliseconds",
),
"To": timestamp_from_datetime(
dt=subperiod.end,
time_zone=time_zone,
unit="milliseconds",
),
}
endpoint = "objects/points"
# deleting the data
result = self.baze.conn.delete(endpoint, json=payload)
self._handle_http_errors(result)
logger.debug(f"Deleted data in {time.perf_counter() - t0:.3f} s")
export_file(file_path, points, period, aggregation='Raw', aggregation_interval=None, time_zone='local', filter_quality=None, request_interval=timedelta(days=2), **kwargs)
¶
Exports the time series of the given points and objects to a given file. The format of the file should be inferred from the file extension.
The request will be made per object and divided into smaller batches according to request_interval.
Parameters:
-
(file_path¶Path) –File path including extension. If the folder does not exist it will be created. The file extension will be validated against the allowed ones. Currently only supports "csv" and "xlsx".
-
(points¶dict[str, list[str]]) –Dict in the format {object_name: [point, ...], ...}
-
(period¶DateTimeRange) –Period of time to get the data from. It is assumed to be in the same time zone as the one specified in time_zone.
-
(aggregation¶SUPPORTED_AGGREGATIONS | list[SUPPORTED_AGGREGATIONS], default:'Raw') –Type of aggregation to be used. Can be one of:
- Raw
- Interpolative
- Total
- Average
- TimeAverage
- Count
- Stdev
- MinimumActualtime
- Minimum
- MaximumActualtime
- Maximum
- Start
- End
- Delta
- Variance
- Range
- DurationGood
- DurationBad
- PercentGood
- PercentBad
- WorstQuality
A list of aggregations can be passed to get multiple aggregations at once.
By default "Raw"
-
(aggregation_interval¶timedelta | None, default:None) –Length of each timestamp. Not necessary if aggregation=="Raw". By default None
-
(time_zone¶TimeZone, default:'local') –In which time zone we assume the inputs are. Also used to define in which time zone the output is. There are three options:
- If "UTC" is used, we assume time already is in UTC.
- If local is used, the default time zone defined in echo_baze will be used.
- If an int, must be between -12 and +12
By default "local"
-
(filter_quality¶list[SUPPORTED_QUALITIES] | None, default:None) –description, by default None
-
(filter_quality¶list[SUPPORTED_QUALITIES] | None, default:None) –If not None, will only return the values that match the desired qualities. Must be a subset of ["Good", "Uncertain", "Bad"] By default None
-
(request_interval¶timedelta, default:timedelta(days=2)) –To avoid doing large requests, the data will be requested from Bazefield in batches considering this interval. By default timedelta(days=2)
-
–**kwargs¶sep : str, optional Separator to be used in the csv file. By default ";" decimal : str, optional Decimal separator to be used in the csv file. By default "." float_format : str, optional Format to be used when saving the float values. By default "%.3f" encoding : str, optional Encoding to be used when saving the file. By default "utf-8"
Source code in echo_baze/point_values_series.py
@validate_call
def export_file(
self,
file_path: Path,
points: dict[str, list[str]],
period: DateTimeRange,
aggregation: SUPPORTED_AGGREGATIONS | list[SUPPORTED_AGGREGATIONS] = "Raw",
aggregation_interval: timedelta | None = None,
time_zone: TimeZone = "local",
filter_quality: list[SUPPORTED_QUALITIES] | None = None,
request_interval: timedelta = timedelta(days=2),
**kwargs,
) -> None:
"""Exports the time series of the given points and objects to a given file. The format of the file should be inferred from the file extension.
The request will be made per object and divided into smaller batches according to request_interval.
Parameters
----------
file_path : Path
File path including extension. If the folder does not exist it will be created.
The file extension will be validated against the allowed ones. Currently only supports "csv" and "xlsx".
points : dict[str, list[str]]
Dict in the format {object_name: [point, ...], ...}
period : DateTimeRange
Period of time to get the data from. It is assumed to be in the same time zone as the one specified in time_zone.
aggregation : SUPPORTED_AGGREGATIONS | list[SUPPORTED_AGGREGATIONS], optional
Type of aggregation to be used. Can be one of:
- Raw
- Interpolative
- Total
- Average
- TimeAverage
- Count
- Stdev
- MinimumActualtime
- Minimum
- MaximumActualtime
- Maximum
- Start
- End
- Delta
- Variance
- Range
- DurationGood
- DurationBad
- PercentGood
- PercentBad
- WorstQuality
A list of aggregations can be passed to get multiple aggregations at once.
By default "Raw"
aggregation_interval : timedelta | None, optional
Length of each timestamp. Not necessary if aggregation=="Raw". By default None
time_zone : TimeZone, optional
In which time zone we assume the inputs are. Also used to define in which time zone the output is. There are three options:
- If "UTC" is used, we assume time already is in UTC.
- If local is used, the default time zone defined in echo_baze will be used.
- If an int, must be between -12 and +12
By default "local"
filter_quality : list[SUPPORTED_QUALITIES] | None, optional
_description_, by default None
filter_quality : list[SUPPORTED_QUALITIES] | None, optional
If not None, will only return the values that match the desired qualities.
Must be a subset of ["Good", "Uncertain", "Bad"]
By default None
request_interval : timedelta, optional
To avoid doing large requests, the data will be requested from Bazefield in batches considering this interval. By default timedelta(days=2)
**kwargs
sep : str, optional
Separator to be used in the csv file. By default ";"
decimal : str, optional
Decimal separator to be used in the csv file. By default "."
float_format : str, optional
Format to be used when saving the float values. By default "%.3f"
encoding : str, optional
Encoding to be used when saving the file. By default "utf-8"
"""
t0 = time.perf_counter()
allowed_extensions = ("csv", "xlsx")
# checking extension
if file_path.suffix[1:] not in allowed_extensions:
raise ValueError(
f"file_path extension must be one of {allowed_extensions}, not {file_path.suffix[1:]}",
)
# getting the data
data = self.baze.points.values.series.get(
points=points,
period=period,
aggregation=aggregation,
aggregation_interval=aggregation_interval,
time_zone=time_zone,
output_type="DataFrame",
return_quality=False,
filter_quality=filter_quality,
request_interval=request_interval,
)
# removing name of index
data.index.name = None
# creating folder if it does not exist
file_path.parent.mkdir(parents=True, exist_ok=True)
# saving the data
match file_path.suffix[1:]:
case "csv":
data.to_csv(
file_path,
index=True,
sep=kwargs.get("sep", ";"),
decimal=kwargs.get("decimal", "."),
float_format=kwargs.get("float_format", "%.3f"),
encoding=kwargs.get("encoding", "utf-8"),
)
case "xlsx":
from pandas import ExcelWriter # noqa: PLC0415
# creating excel writer
writer = ExcelWriter(file_path, engine="xlsxwriter")
# writing the data
data.to_excel(writer, sheet_name="data", index=True)
# formatting the sheet
# index will be set to YYYY-mm-dd HH:MM:SS and width 20
# other columns will be set to %.3f and width 15
workbook = writer.book
worksheet = writer.sheets["data"]
# Formatting the columns
date_format = workbook.add_format(
{"num_format": "aaaa-mm-dd hh:mm:ss", "border": 1, "bold": True},
)
num_format = workbook.add_format(
{"num_format": "0.000", "border": 1, "bold": False},
)
worksheet.set_column(0, 0, 20, date_format)
worksheet.set_column(1, data.shape[1], 15, num_format)
# Freeze panes
worksheet.freeze_panes(2, 1)
# hide grid lines
worksheet.hide_gridlines(2)
# Saving the workbook
writer.close()
case _:
raise ValueError(
f"file_path extension must be one of {allowed_extensions}, not {file_path.suffix[1:]}",
)
logger.debug(
f"Exported data to {file_path} in {time.perf_counter() - t0:.3f} s",
)
get(points, period, aggregation='Raw', aggregation_interval=None, time_zone='local', output_type='DataFrame', return_quality=False, filter_quality=None, request_interval=timedelta(days=2), objects_per_request=30, reindex=None, round_timestamps=None, value_dtype=(pl.Float32, 'float32[pyarrow]'))
¶
Gets the time series of the given points and objects.
The request will be made per group of objects and divided into smaller batches according to request_interval. For faster response, do not do multiple calls of this function for different objects/points as the function has some overhead to check all the inputs. Calling the function once will always be faster.
Parameters:
-
(points¶dict[str, list[str]]) –Dict in the format {object_name: [point, ...], ...}
-
(period¶DateTimeRange) –Period of time to get the data from. It is assumed to be in the same time zone as the one specified in time_zone.
-
(aggregation¶SUPPORTED_AGGREGATIONS | list[SUPPORTED_AGGREGATIONS], default:'Raw') –Type of aggregation to be used. Can be one of:
- Raw
- Interpolative
- Total
- Average
- TimeAverage
- Count
- Stdev
- MinimumActualtime
- Minimum
- MaximumActualtime
- Maximum
- Start
- End
- Delta
- Variance
- Range
- DurationGood
- DurationBad
- PercentGood
- PercentBad
- WorstQuality
A list of aggregations can be passed to get multiple aggregations at once.
By default "Raw"
-
(aggregation_interval¶timedelta | None, default:None) –Length of each timestamp. Not necessary if aggregation=="Raw". By default None
-
(time_zone¶TimeZone, default:'local') –In which time zone we assume the inputs are. Also used to define in which time zone the output is. There are three options:
- If "UTC" is used, we assume time already is in UTC.
- If local is used, the default time zone defined in echo_baze will be used.
- If an int, must be between -12 and +12
By default "local"
-
(output_type¶Literal['dict', 'DataFrame'], default:'DataFrame') –Output type of the data. Can be one of ["dict", "DataFrame"] By default "DataFrame"
-
(return_quality¶bool, default:False) –If set to True, the quality of the value will also be returned. By default False
-
(filter_quality¶list[SUPPORTED_QUALITIES] | None, default:None) –If not None, will only return the values that match the desired qualities. Must be a subset of ["Good", "Uncertain", "Bad"] By default None
-
(request_interval¶timedelta, default:timedelta(days=2)) –To avoid doing large requests, the data will be requested from Bazefield in batches considering this interval. By default timedelta(days=2)
-
(objects_per_request¶int, default:30) –Number of objects to be requested in each batch. A value higher than one will only be valid if all the points are equal for all requested objects. If they are different the value will be forced to 1. By default, 30
-
(reindex¶str | None, default:None) –String containing timedelta the should be considered to reindex the DataFrame (ex: "10min", "5min", etc.).
This does not resample the DataFrame! It is only used to get all the timestamps if some are missing from the data (the created timestamps will be set to NaN).
If set to "infer" the timedelta will be inferred from the acquired data from the database. If set to None no reindex will be done.
By default None
Only applicable if output_type is "DataFrame".
-
(round_timestamps¶RoundTimeStampsDict | None, default:None) –Dictionary used to round timestamps to the nearest expected timestamp. Contains the following keys:
- freq: timedelta, the frequency to round the timestamps to.
- tolerance: timedelta, the tolerance to be used when rounding timestamps
If set to None, no rounding will be done. Only applicable if output_type is "DataFrame". By default None
-
(value_dtype¶tuple[Any, Any], default:(Float32, 'float32[pyarrow]')) –Tuple with the dtype to be used for the values. The first element is the dtype to be used in the internal polars DataFrame (like pl.Float64) and the second is the dtype to be used in the final pandas DataFrame (like "float64[pyarrow]").
Returns:
-
dict[str, dict[str, list[dict[str, datetime | float | int]]]]–In case output_type == "dict" and aggregation is a string (only one aggregation), it will return a dict with the following format: {object_name: {point: [{t: time, v: value, q: quality}, ...], ...}, ...}
-
dict[str, dict[str, dict[str, list[dict[str, datetime | float | int]]]]]–In case output_type == "dict" and aggregation is a list of aggregations, it will return a dict with the following format: {object_name: {point: {aggregation: [{t: time, v: value, q: quality}, ...], ...}, ...}, ...}
-
DataFrame–In case output_type == "DataFrame" it will return a DataFrame with time in the index. The columns will be a MultiIndex with levels = [object_name, point, aggregation, quantity], where quantity = [value, quality]. In case aggregation is a string (only one aggregation), the aggregation level will not exist in the columns. In case return_quality is False, the quality level will not exist in the columns.
Source code in echo_baze/point_values_series.py
@validate_call
def get(
self,
points: dict[str, list[str]],
period: DateTimeRange,
aggregation: SUPPORTED_AGGREGATIONS | list[SUPPORTED_AGGREGATIONS] = "Raw",
aggregation_interval: timedelta | None = None,
time_zone: TimeZone = "local",
output_type: Literal["dict", "DataFrame"] = "DataFrame",
return_quality: bool = False,
filter_quality: list[SUPPORTED_QUALITIES] | None = None,
request_interval: timedelta = timedelta(days=2),
objects_per_request: int = 30,
reindex: str | None = None,
round_timestamps: RoundTimeStampsDict | None = None,
value_dtype: tuple[Any, Any] = (pl.Float32, "float32[pyarrow]"),
) -> (
dict[str, dict[str, list[dict[str, datetime | float | int]]]]
| dict[str, dict[str, dict[str, list[dict[str, datetime | float | int]]]]]
| pd.DataFrame
):
"""Gets the time series of the given points and objects.
The request will be made per group of objects and divided into smaller batches according to request_interval. For faster response, do not do multiple calls of this function for different objects/points as the function has some overhead to check all the inputs. Calling the function once will always be faster.
Parameters
----------
points : dict[str, list[str]]
Dict in the format {object_name: [point, ...], ...}
period : DateTimeRange
Period of time to get the data from. It is assumed to be in the same time zone as the one specified in time_zone.
aggregation : SUPPORTED_AGGREGATIONS | list[SUPPORTED_AGGREGATIONS], optional
Type of aggregation to be used. Can be one of:
- Raw
- Interpolative
- Total
- Average
- TimeAverage
- Count
- Stdev
- MinimumActualtime
- Minimum
- MaximumActualtime
- Maximum
- Start
- End
- Delta
- Variance
- Range
- DurationGood
- DurationBad
- PercentGood
- PercentBad
- WorstQuality
A list of aggregations can be passed to get multiple aggregations at once.
By default "Raw"
aggregation_interval : timedelta | None, optional
Length of each timestamp. Not necessary if aggregation=="Raw". By default None
time_zone : TimeZone, optional
In which time zone we assume the inputs are. Also used to define in which time zone the output is. There are three options:
- If "UTC" is used, we assume time already is in UTC.
- If local is used, the default time zone defined in echo_baze will be used.
- If an int, must be between -12 and +12
By default "local"
output_type : Literal["dict", "DataFrame"], optional
Output type of the data. Can be one of ["dict", "DataFrame"]
By default "DataFrame"
return_quality : bool, optional
If set to True, the quality of the value will also be returned. By default False
filter_quality : list[SUPPORTED_QUALITIES] | None, optional
If not None, will only return the values that match the desired qualities.
Must be a subset of ["Good", "Uncertain", "Bad"]
By default None
request_interval : timedelta, optional
To avoid doing large requests, the data will be requested from Bazefield in batches considering this interval. By default timedelta(days=2)
objects_per_request : int, optional
Number of objects to be requested in each batch.
A value higher than one will only be valid if all the points are equal for all requested objects. If they are different the value will be forced to 1.
By default, 30
reindex : str | None, optional
String containing timedelta the should be considered to reindex the DataFrame (ex: "10min", "5min", etc.).
This does not resample the DataFrame! It is only used to get all the timestamps if some are missing from the data (the created timestamps will be set to NaN).
If set to "infer" the timedelta will be inferred from the acquired data from the database.
If set to None no reindex will be done.
By default None
Only applicable if output_type is "DataFrame".
round_timestamps : RoundTimeStampsDict | None, optional
Dictionary used to round timestamps to the nearest expected timestamp. Contains the following keys:
- freq: timedelta, the frequency to round the timestamps to.
- tolerance: timedelta, the tolerance to be used when rounding timestamps
If set to None, no rounding will be done. Only applicable if output_type is "DataFrame".
By default None
value_dtype : tuple[Any, Any], optional
Tuple with the dtype to be used for the values. The first element is the dtype to be used in the internal polars DataFrame (like pl.Float64) and the second is the dtype to be used in the final pandas DataFrame (like "float64[pyarrow]").
Returns
-------
dict[str, dict[str, list[dict[str, datetime | float | int]]]]
In case output_type == "dict" and aggregation is a string (only one aggregation), it will return a dict with the following format: {object_name: {point: [{t: time, v: value, q: quality}, ...], ...}, ...}
dict[str, dict[str, dict[str, list[dict[str, datetime | float | int]]]]]
In case output_type == "dict" and aggregation is a list of aggregations, it will return a dict with the following format: {object_name: {point: {aggregation: [{t: time, v: value, q: quality}, ...], ...}, ...}, ...}
DataFrame
In case output_type == "DataFrame" it will return a DataFrame with time in the index. The columns will be a MultiIndex with levels = [object_name, point, aggregation, quantity], where quantity = [value, quality].
In case aggregation is a string (only one aggregation), the aggregation level will not exist in the columns.
In case return_quality is False, the quality level will not exist in the columns.
"""
t0 = time.perf_counter()
# checking input
if isinstance(aggregation, list) and len(aggregation) != len(set(aggregation)):
raise ValueError("aggregation must have unique values")
if (
(isinstance(aggregation, str) and aggregation != "Raw")
or (isinstance(aggregation, list) and any(agg != "Raw" for agg in aggregation))
) and aggregation_interval is None:
raise ValueError("aggregation_interval must be set if aggregation != 'Raw'")
if request_interval <= timedelta(0):
raise ValueError(
f"request_interval must be greater than 0, not {request_interval}",
)
if not isinstance(objects_per_request, int) and objects_per_request > 0:
raise TypeError(
f"objects_per_request must be an int greater than 0, not {objects_per_request}",
)
if reindex and output_type != "DataFrame":
raise ValueError(
"reindex can only be used if output_type is 'DataFrame'",
)
if round_timestamps and output_type != "DataFrame":
raise ValueError(
"round_timestamps can only be used if output_type is 'DataFrame'",
)
if len(value_dtype) != 2:
raise ValueError(
f"value_dtype must be a tuple with 2 elements, not {len(value_dtype)}",
)
# checking if all points are equal for all objects. If that is not the case reduce objects per request to 1
if objects_per_request > 1:
ref_points = points[next(iter(points.keys()))]
for obj_points in points.values():
if obj_points != ref_points:
logger.debug("Points are not equal for all objects. Reducing objects_per_request to 1.")
objects_per_request = 1
break
# getting initial info
object_name_to_id, object_id_to_name, object_models, object_model_points = self.baze.points._get_points_info(points) # noqa
all_points = set()
for value in points.values():
all_points = all_points.union(set(value))
all_points = list(all_points)
# dict that gets pretty names for the aggregations
aggregation_pretty_names = {k.upper(): k for k in AGGREGATE_IDS} | {
"NOAGGREGATE": "Raw",
}
# list with aggregations to be requested
aggregation_groups = [aggregation] if isinstance(aggregation, str) else aggregation
# separating raw from the rest
aggregation_groups = [agg for agg in aggregation_groups if agg != "Raw"]
if aggregation_groups:
aggregation_groups = [aggregation_groups]
if "Raw" in aggregation:
# if Raw is in the aggregations, we need to request it separately
aggregation_groups.append(["Raw"])
# breaking the period into smaller periods
# we are using normalize to make sure when getting aggregated data, the period starts at 00:00
subperiods = period.split_multiple(separator=request_interval, normalize=True)
logger.debug(f"Initial info gathered in {time.perf_counter() - t0:.3f} s")
# dividing the objects into groups to avoid large requests
request_obj_groups = [
list(object_name_to_id.keys())[i : i + objects_per_request] for i in range(0, len(object_name_to_id), objects_per_request)
]
# list of DataFrames to be concatenated and later processed
result_df_list = []
# columns that can be ignored
drop_cols = ["t_local"]
if not return_quality and not filter_quality:
drop_cols.append("q")
if isinstance(aggregation, str):
drop_cols.append("aggregation")
schema = {
"t": pl.Int64,
"v": value_dtype[0],
"object_name": pl.Enum(list(object_name_to_id.keys())),
"point": pl.Enum(all_points),
}
if "q" not in drop_cols:
schema["q"] = pl.Int64
if "aggregation" not in drop_cols:
schema["aggregation"] = pl.Enum(aggregation if isinstance(aggregation, list) else [aggregation])
# getting the values for each object
for objs in request_obj_groups:
# getting the points for the objects
object_points = points[objs[0]]
# checking if all points are available for the model
for object_name in objs:
not_found_points = set(object_points) - set(
object_model_points[object_models[object_name]],
)
if len(not_found_points) > 0:
raise ValueError(
f"The following points were not found for object '{object_name}': {not_found_points}",
)
# iterating over the aggregation groups
for aggregation_group in aggregation_groups:
# iterating subperiods
for subperiod in subperiods:
# base payload, specific keys for aggregations will be added later
payload = {
"ObjectIds": [object_name_to_id[object_name] for object_name in objs],
"Points": object_points,
"From": timestamp_from_datetime(
dt=subperiod.start,
time_zone=time_zone,
unit="milliseconds",
),
# adding 1 to the end to make sure we get all data (Bazefield seems to be exclusive on the end timestamp)
"To": timestamp_from_datetime(
dt=subperiod.end,
time_zone=time_zone,
unit="milliseconds",
)
+ 1,
"Take": 0, # this makes sure we get all data
}
# getting Raw data
if aggregation_group == ["Raw"]:
endpoint = "objects/timeseries"
else:
endpoint = "objects/timeseries/aggregated"
payload["Aggregates"] = [AGGREGATE_IDS[agg] for agg in aggregation_group]
payload["Interval"] = int(aggregation_interval.total_seconds())
# getting the data
t1 = time.perf_counter()
result = self.baze.conn.get(endpoint, json=payload)
self._handle_http_errors(result)
# converting to dict
result: dict[str, dict[str, dict[str, dict[str, Any]]]] = result.json()["objects"]
t2 = time.perf_counter()
# iterating results to convert to DataFrames for later processing
# first objects
for obj_id in list(result.keys()):
if "points" not in result[obj_id]:
logger.warning(f"No data found for object '{object_id_to_name[obj_id]}'")
# free memory
del result[obj_id]
continue
# then points
for point_name in list(result[obj_id]["points"].keys()):
# then Aggregates
while len(result[obj_id]["points"][point_name]) > 0:
# pop first item in list
agg_vals = result[obj_id]["points"][point_name].pop(0)
# converting time series to DataFrame
df = pl.from_dicts(agg_vals["timeSeries"], schema=schema)
# adding object_name, point_name and aggregation to the DataFrame
df = df.with_columns(
object_name=pl.lit(object_id_to_name[obj_id]).cast(schema["object_name"]),
point=pl.lit(point_name).cast(schema["point"]),
)
if "aggregation" not in drop_cols:
df = df.with_columns(
aggregation=pl.lit(aggregation_pretty_names[agg_vals["aggregate"]]).cast(schema["aggregation"]),
)
# adding to the list
result_df_list.append(df)
# free memory
del agg_vals
# free memory
del result[obj_id]["points"][point_name]
# free memory
del result[obj_id]
del result
logger.debug(
f"Request response for {objs} {aggregation_group} {object_points} {subperiod.start:%Y-%m-%d %H:%M:%S} to {subperiod.end:%Y-%m-%d %H:%M:%S} in {t2 - t1:.3f} s. Processed request response in {time.perf_counter() - t2:.3f} s",
)
# concatenating the list of DataFrames
# using copy=False to avoid copying the data if not needed, saving memory
if len(result_df_list) > 0: # noqa: SIM108
# concatenating the DataFrames
df = pl.concat(result_df_list, how="vertical")
else:
# creating empty DataFrame with the correct columns
df = pl.DataFrame(
schema=schema,
)
del result_df_list
# dropping duplicates on t, object_name, point, aggregation
subset = ["t", "object_name", "point", "aggregation"]
subset = [col for col in subset if col in df.columns]
df = df.unique(subset=subset)
# converting t to datetime
time_zone_modifier = 0
if time_zone != "UTC":
time_zone_modifier = DEFAULT_TIME_ZONE if time_zone == "local" else time_zone
df = df.with_columns(
t=pl.col("t").cast(pl.Datetime(time_unit="ms")) + timedelta(hours=time_zone_modifier),
)
# adjusting quality column if requested
if return_quality or filter_quality:
# adjusting quality column using cast_quality method
q_dtype = pl.Enum(list(TAG_QUALITY_CODES.keys()))
# TODO we might want to not use cast_quality in favor of a better implementation using polars
df = df.with_columns(
q=pl.col("q").map_elements(lambda x: cast_quality(x, "str"), skip_nulls=True, return_dtype=pl.String).cast(q_dtype),
)
# filter_quality
if filter_quality:
df = df.filter(pl.col("q").is_in(filter_quality))
# dropping quality column if not requested
if not return_quality:
df = df.drop("q", strict=False)
# converting to pandas DataFrame
# converting df to correct types
dtypes = {
"t": "datetime64[ms]",
"v": value_dtype[1],
"q": "category",
"object_name": "category",
"point": "category",
"aggregation": "category",
}
dtypes = {k: v for k, v in dtypes.items() if k in df.columns}
# sorting the DataFrame by t, object_name, point and aggregation
sort_cols = ["t", "object_name", "point", "aggregation"]
sort_cols = [col for col in sort_cols if col in df.columns]
df = df.sort(by=sort_cols)
df = df.to_pandas(use_pyarrow_extension_array=True)
df = df.astype(dtypes)
# returning on the desired format
match output_type:
case "dict":
t1 = time.perf_counter()
index_cols = ["object_name", "point"]
if not isinstance(aggregation, str):
index_cols.append("aggregation")
df = df.set_index(index_cols)
if df.empty:
logger.warning(f"No data found for {points} and period {period}")
results = {}
else:
# grouping by object_name, point and aggregation to create a list of dicts
result = df.groupby(level=index_cols, observed=True).apply(lambda x: x.to_dict(orient="records")).to_dict()
# unpacking dict keys from tuples to nested dicts
results = {}
for key, value in result.items():
obj_name, point_name, *agg = key
if len(agg) == 0:
results.setdefault(obj_name, {})[point_name] = value
else:
results.setdefault(obj_name, {}).setdefault(point_name, {})[agg[0]] = value
# making sure we have dict keys for all wanted objects, points, aggregations and quantities as if there is no data they will not be created automatically
for obj, obj_points in points.items():
if obj not in results:
results[obj] = {}
for obj_point in obj_points:
if obj_point not in results[obj]:
results[obj][obj_point] = {} if not isinstance(aggregation, str) else []
if not isinstance(aggregation, str):
for agg in aggregation:
results[obj][obj_point][agg] = []
logger.debug(
f"Converted to dict in {time.perf_counter() - t1:.3f} s",
)
del df
return results
case "DataFrame":
t1 = time.perf_counter()
# converting to DataFrame
# creating quantity column if quality is requested
# we need to melt the DataFrame to have the quantity as a column
if return_quality:
df = df.melt(
id_vars=df.columns.difference(["v", "q"]).tolist(),
value_vars=["v", "q"],
var_name="quantity",
value_name="value",
)
# replacing q with quality and v with value
df["quantity"] = df["quantity"].replace({"q": "quality", "v": "value"})
df = df.rename(columns={"value": "v"})
index_cols = ["t", "object_name", "point"]
if not isinstance(aggregation, str):
index_cols.append("aggregation")
if return_quality:
index_cols.append("quantity")
df = df.set_index(index_cols).unstack(
level=index_cols[1:],
)
df = df.droplevel(0, axis=1) # dropping the first level of columns (v) as it is not needed
df.index.name = "time"
# making sure we have columns for all wanted objects, points, aggregations and quantities as if there is no data they will not be created automatically
concat_list = []
for obj, obj_points in points.items():
level_values = [[obj], obj_points]
level_names = ["object_name", "point"]
if isinstance(aggregation, list):
level_values.append(aggregation)
level_names.append("aggregation")
if return_quality:
level_values.append(["quality", "value"])
level_names.append("quantity")
complete_columns = pd.MultiIndex.from_product(level_values, names=level_names)
add_columns = complete_columns.difference(df.columns)
if len(add_columns) > 0:
concat_df = pd.DataFrame(columns=add_columns)
concat_list.append(concat_df)
df = pd.concat([df, *concat_list], axis=1, copy=False)
# changing dtype of columns to string[pyarrow]
for i in range(len(df.columns.names)):
df.columns = df.columns.set_levels(
df.columns.levels[i].astype("string[pyarrow]"),
level=i,
)
# sorting the columns and index
df = df.sort_index(axis=1)
df = df.sort_index(axis=0)
results = df
logger.debug(
f"Converted to DataFrame in {time.perf_counter() - t1:.3f} s",
)
case _:
raise ValueError(
f"output_type must be one of ['dict', 'DataFrame'], got '{output_type}'",
)
# forcing the index to be a DatetimeIndex
results.index = results.index.astype("datetime64[s]")
# rounding timestamps if requested
if round_timestamps is not None:
results = round_ts(df=results, **round_timestamps)
# reindex if requested
if reindex is not None:
final_reindex = copy(reindex)
# reindexing DataFrame to have all timestamps if desired
if final_reindex is not None:
# inferring frequency if reindex is "infer"
if final_reindex == "infer" and len(results) > 3:
final_reindex = pd.infer_freq(results.index)
elif final_reindex == "infer":
logger.debug("Cannot infer frequency from data because it has less than 3 timestamps. Using '10min' as default.")
final_reindex = "10min"
# if failed (returned None), lets try other function
if final_reindex is None:
final_reindex = get_index_freq(results)
# if still failed, lets raise an error
if final_reindex is None:
raise RuntimeError("Cannot infer frequency from data. Please set 'reindex' argument manually.")
new_index = pd.date_range(
start=period["start"].replace(minute=0, second=0),
end=period["end"].replace(minute=0, second=0) + timedelta(hours=1),
freq=final_reindex,
)
results = results.reindex(new_index, method=None)
# restricting to desired period only
results = results[(results.index >= period["start"]) & (results.index <= period["end"])].copy()
logger.debug(
f"Got time series {aggregation} data between {period} for {points} in {output_type} format in {time.perf_counter() - t0:.3f} s",
)
return results
insert(data, time_zone='local', delete_before=False, request_interval=timedelta(days=2))
¶
Inserts time series data into Bazefield.
The request will be made per object and divided into smaller batches according to request_interval.
Parameters:
-
(data¶DataFrame | DataFrame) –DataFrame with the data to be uploaded.
In case it is a Pandas DataFrame, it must have the following structure: - Index with name "time" or "timestamp" and dtype timestamp[pyarrow] or datetime64. - Columns as a MultiIndex with levels "object_name" and "point". - Values must be convertible to float.
In case it is a Polars DataFrame, it must have the following structure: - It must be in the format: columns=["timestamp", "object_name@point_name", ...]. - The "timestamp" column must be of type Datetime and the values must be convertible to float.
-
(time_zone¶Literal['UTC', 'local'] | int, default:'local') –In which time zone we assume the inputs are. Also used to define in which time zone the output is. There are three options:
- If "UTC" is used, we assume time already is in UTC.
- If local is used, the default time zone defined in echo_baze will be used.
- If an int, must be between -12 and +12
By default "local"
-
(delete_before¶bool, default:False) –If set to True, will delete all the data in the period present in data before uploading. This is useful to make sure that no past data is kept when it should now be None. By default False
-
(request_interval¶timedelta, default:timedelta(days=2)) –To avoid doing large requests, the requests will be made to Bazefield in batches considering this interval. By default timedelta(days=2)
Source code in echo_baze/point_values_series.py
@validate_call
def insert(
self,
data: pd.DataFrame | pl.DataFrame,
time_zone: TimeZone = "local",
delete_before: bool = False,
request_interval: timedelta = timedelta(days=2),
) -> None:
"""Inserts time series data into Bazefield.
The request will be made per object and divided into smaller batches according to request_interval.
Parameters
----------
data : pd.DataFrame | pl.DataFrame
DataFrame with the data to be uploaded.
In case it is a **Pandas** DataFrame, it must have the following structure:
- Index with name "time" or "timestamp" and dtype timestamp[pyarrow] or datetime64.
- Columns as a MultiIndex with levels "object_name" and "point".
- Values must be convertible to float.
In case it is a **Polars** DataFrame, it must have the following structure:
- It must be in the format: columns=["timestamp", "object_name@point_name", ...].
- The "timestamp" column must be of type Datetime and the values must be convertible to float.
time_zone : Literal["UTC", "local"] | int, optional
In which time zone we assume the inputs are. Also used to define in which time zone the output is. There are three options:
- If "UTC" is used, we assume time already is in UTC.
- If local is used, the default time zone defined in echo_baze will be used.
- If an int, must be between -12 and +12
By default "local"
delete_before : bool, optional
If set to True, will delete all the data in the period present in data before uploading. This is useful to make sure that no past data is kept when it should now be None. By default False
request_interval : timedelta, optional
To avoid doing large requests, the requests will be made to Bazefield in batches considering this interval. By default timedelta(days=2)
"""
t0 = time.perf_counter()
# in case it is a Pandas DataFrame:
if isinstance(data, pd.DataFrame):
data = data.copy()
# checking input
if data.index.name not in ("time", "timestamp") or (
"datetime" not in str(data.index.dtype) and "timestamp" not in str(data.index.dtype)
):
raise TypeError(
"data index must be a DatetimeIndex with name 'time' or 'timestamp' and dtype timestamp[pyarrow] or datetime64",
)
for col, dtype in data.dtypes.to_dict().items():
# trying to convert to double[pyarrow]
try:
data[col] = data[col].astype("float64[pyarrow]")
except Exception as e:
raise TypeError(
f"data column '{col}' must be convertible to float, got {dtype}",
) from e
if not isinstance(data.columns, pd.MultiIndex) or data.columns.names != [
"object_name",
"point",
]:
raise TypeError(
"data must have columns as a Multindex with names 'object_name' and 'point'",
)
# joining the two column levels into one
data.columns = data.columns.map(lambda x: f"{x[0]}@{x[1]}")
# forcing the index to be named timestamp and them resetting it
data.index.name = "timestamp"
data = data.reset_index(drop=False)
# converting the DataFrame to Polars
data = pl.from_pandas(data)
# polars checks
data = data.clone()
# checking if column timestamp exists
if "timestamp" not in data.columns:
raise ValueError("data must have a column 'timestamp'")
# checking if timestamp column is of type Datetime
if not isinstance(data["timestamp"].dtype, pl.Datetime):
raise ValueError("data 'timestamp' column must be of type Datetime")
# getting the list of all other columns and checking if they are in the format "object_name@point_name"
# a dict like {object: [point1, point2, ...], ...} will be created
points = {}
for col in data.columns:
if col == "timestamp":
continue
# checking if the column is in the format "object_name@point_name"
if "@" not in col:
raise ValueError(f"Column {col} must be in the format 'object_name@point_name'")
# splitting the column name into object and point
obj, feat = col.split("@")
if not obj or not feat:
raise ValueError(f"Column {col} must be in the format 'object_name@point_name' with non-empty object and point names")
# adding the point to the object in the dict
if obj not in points:
points[obj] = []
points[obj].append(feat)
# making sure the data type is Float64, if not convert it
if not isinstance(data[col].dtype, pl.Float64):
try:
data = data.with_columns(pl.col(col).cast(pl.Float64))
except Exception as e:
raise ValueError(f"Column {col} must be convertible to Float64") from e
# skipping if data is empty
if len(data) == 0:
return
# getting initial info
object_name_to_id, _, object_models, object_model_points = self.baze.points._get_points_info(points) # noqa
# getting total period
period = DateTimeRange(data["timestamp"].min(), data["timestamp"].max())
# breaking the period into smaller periods
# we are using normalize to make sure when getting aggregated data, the period starts at 00:00
subperiods = period.split_multiple(separator=request_interval, normalize=False)
# deleting data if requested
if delete_before:
self.baze.points.values.series.delete(
points=points,
period=period,
time_zone=time_zone,
request_interval=request_interval,
)
# iterating each object
for object_name, object_points in points.items():
upload_points = object_points.copy()
# checking if all points are available for the model
not_found_points = set(upload_points) - set(
object_model_points[object_models[object_name]],
)
if len(not_found_points) > 0:
logger.error(f"The following points were not found and will be skipped '{object_name}': {not_found_points}")
# removing the points that were not found
upload_points = [point for point in upload_points if point not in not_found_points]
if not upload_points:
continue
# getting a slice of the DataFrame for the object
obj_df = data.select(
pl.col("timestamp"),
*[pl.col(f"{object_name}@{point}") for point in object_points],
)
# converting to a lazy frame for faster processing
obj_df = obj_df.lazy()
# unpivoting the DataFrame to have columns timestamp and point
obj_df = (
obj_df.unpivot(
index=["timestamp"],
variable_name="point",
value_name="value",
)
# Split the 'point' column into a struct with named fields
.with_columns(
pl.col("point")
.str.split_exact("@", 1) # Use split_exact for fixed number of splits
.struct.rename_fields(["object_name", "point_name"])
.alias("point_struct"),
)
# Unnest the struct columns into the main DataFrame
.unnest("point_struct")
# Cast to Enum (less memory usage) and drop the original point column
.with_columns(
pl.col("point_name").cast(pl.Enum(categories=object_points)).alias("point_name"),
)
# drop the original 'point' column
.drop(["point", "object_name"])
# drop all NA rows
.drop_nulls()
# dropping duplicate rows
.unique(subset=["timestamp", "point_name"])
# sorting by timestamp, point_name
.sort(["timestamp", "point_name"])
)
timezone_offset = 0 if time_zone == "UTC" else DEFAULT_TIME_ZONE if time_zone == "local" else time_zone
# converting timestamp to milliseconds since epoch and then subtracting the time zone offset in milliseconds
obj_df = obj_df.with_columns(
# first converting timestamp to milliseconds since epoch
pl.col("timestamp").dt.epoch(time_unit="ms").alias(name="epoch"),
)
# then subtracting the time zone offset in milliseconds
if timezone_offset != 0:
obj_df = obj_df.with_columns(
pl.col("epoch") - (timezone_offset * 3600 * 1000), # converting hours to milliseconds
)
# converting value column to a struct with t, v, q, where:
# - t is the timestamp in milliseconds since epoch
# - v is the value as float
# - q is the quality as 192 (good quality)
obj_df = obj_df.with_columns(
pl.struct(
[
pl.col("epoch").cast(pl.Int64).alias("t"), # timestamp in milliseconds since epoch
pl.col("value").cast(pl.Float64).alias("v"), # value as float
pl.lit(192).cast(pl.Int16).alias("q"), # quality as 192 (good quality)
],
).alias("point_values"),
)
# iterating subperiods
for subperiod in subperiods:
# getting only wanted part of the DataFrame
sub_obj_df = obj_df.filter(
(pl.col("timestamp") >= subperiod.start) & (pl.col("timestamp") <= subperiod.end),
)
# collecting results for point_values and point_name as they are the only needed columns
sub_obj_df = sub_obj_df.select(
pl.col("point_name"),
pl.col("point_values"),
).collect()
# if the DataFrame is empty, skip it
if sub_obj_df.is_empty():
continue
# creating points dict with the structure {point_name: [{"t": timestamp, "v": value, "q": quality}, ...], ...}
points_dict = {
point_name: sub_obj_df.filter(pl.col("point_name") == point_name)["point_values"].to_list()
for point_name in sub_obj_df["point_name"].unique().to_list()
}
# getting dict to upload
payload = {
"data": [
{
"objectId": object_name_to_id[object_name],
"points": points_dict,
},
],
}
# popping NA values in the dict
for point in payload["data"][0]["points"]:
payload["data"][0]["points"][point] = [val for val in payload["data"][0]["points"][point] if not pd.isna(val["v"])]
# removing empty points
payload["data"][0]["points"] = {point: values for point, values in payload["data"][0]["points"].items() if len(values) > 0}
# skipping if no data to upload
if len(payload["data"][0]["points"]) == 0:
continue
endpoint = "objects/points/insert"
# inserting the data (trying 3 times just to be sure)
total_points_sent = sum(len(values) for values in payload["data"][0]["points"].values())
succes = False
try_number = 1
while not succes and try_number <= 3:
try:
result = self.baze.conn.post(endpoint, json=payload)
self._handle_http_errors(result)
result_json = result.json()
# making sure number of inserted and published points are the same as the ones we sent
if result_json["inserted"] != total_points_sent or result_json["published"] != total_points_sent:
raise ValueError("Bazefield replied saying we inserted/published less points than the ones sent")
succes = True
except Exception as e: # noqa
logger.exception(
f"Tentative {try_number} - Failed to insert data for {object_name} in {subperiod.start:%Y-%m-%d %H:%M:%S} to {subperiod.end:%Y-%m-%d %H:%M:%S}",
)
try_number += 1
logger.debug(f"Inserted data in {time.perf_counter() - t0:.3f} s")