from __future__ import annotations
import json
from collections.abc import Iterator
from typing import TYPE_CHECKING, Any, Literal, Union
import numpy as np
import numpy.typing as npt
from py3dtiles.exceptions import Invalid3dtilesError
from py3dtiles.typing import BatchTableHeaderDataType
from .constants import COMPONENT_TYPE_NUMPY_MAPPING, DTYPE_TO_COMPONENT_TYPE_MAPPING
if TYPE_CHECKING:
from py3dtiles.tileset.content import TileContentHeader
TYPE_LENGTH_MAPPING = {
"SCALAR": 1,
"VEC2": 2,
"VEC3": 3,
"VEC4": 4,
}
ComponentLiteralType = Literal[
"BYTE",
"UNSIGNED_BYTE",
"SHORT",
"UNSIGNED_SHORT",
"INT",
"UNSIGNED_INT",
"FLOAT",
"DOUBLE",
]
ComponentNumpyType = Union[
np.int8, np.uint8, np.int16, np.uint16, np.int32, np.uint32, np.float32, np.float64
]
PropertyLiteralType = Literal["SCALAR", "VEC2", "VEC3", "VEC4"]
[docs]
class BatchTableBody:
def __init__(self, data: list[npt.NDArray[ComponentNumpyType]] | None = None):
if data is not None:
self.data = data
else:
self.data = []
[docs]
def to_array(self) -> npt.NDArray[np.uint8]:
if not self.data:
return np.empty((0,), dtype=np.uint8)
if self.nbytes % 8 != 0:
padding_str = " " * (8 - self.nbytes % 8)
padding = np.frombuffer(padding_str.encode("utf-8"), dtype=np.uint8)
self.data.append(padding)
return np.concatenate(
[data.view(np.uint8) for data in self.data], dtype=np.uint8
)
@property
def nbytes(self) -> int:
return sum([data.nbytes for data in self.data])
[docs]
class BatchTable:
"""
Only the JSON header has been implemented for now. According to the batch
table documentation, the binary body is useful for storing long arrays of
data (better performances)
"""
def __init__(self) -> None:
self.header = BatchTableHeader()
self.body = BatchTableBody()
[docs]
def add_property_as_json(self, property_name: str, array: list[Any]) -> None:
self.header.data[property_name] = array
[docs]
def add_property_as_binary(
self,
property_name: str,
array: npt.NDArray[ComponentNumpyType],
property_type: PropertyLiteralType,
) -> None:
component_type = DTYPE_TO_COMPONENT_TYPE_MAPPING.get(array.dtype)
if component_type is None:
raise ValueError(
f"Cannot find a component_type corresponding to the dtype ${array.dtype}"
)
self.header.data[property_name] = {
"byteOffset": self.body.nbytes,
"componentType": component_type,
"type": property_type,
}
transformed_array = array.reshape(-1)
self.body.data.append(transformed_array)
[docs]
def get_property_names(self) -> Iterator[str]:
return iter(self.header.data.keys())
[docs]
def get_binary_property(
self, property_name_to_fetch: str
) -> npt.NDArray[ComponentNumpyType]:
binary_property_index = 0
# The order in self.header.data is the same as in self.body.data
# We should filter properties added as json.
for property_name, property_definition in self.header.data.items():
if isinstance(
property_definition, list
): # If it is a list, it means that it is a json property
continue
elif property_name_to_fetch == property_name:
return self.body.data[binary_property_index]
else:
binary_property_index += 1
else:
raise ValueError(f"The property {property_name_to_fetch} is not found")
[docs]
def to_array(self) -> npt.NDArray[np.uint8]:
batch_table_header_array = self.header.to_array()
batch_table_body_array = self.body.to_array()
return np.concatenate((batch_table_header_array, batch_table_body_array))
[docs]
@staticmethod
def from_array(
tile_header: TileContentHeader,
array: npt.NDArray[np.uint8],
batch_len: int = 0,
) -> BatchTable:
batch_table = BatchTable()
# separate batch table header
batch_table_header_length = tile_header.bt_json_byte_length
batch_table_body_array = array[batch_table_header_length:]
batch_table_header_array = array[0:batch_table_header_length]
jsond = json.loads(batch_table_header_array.tobytes().decode("utf-8") or "{}")
batch_table.header.data = jsond
# parse binary attributes
previous_byte_offset = 0
for bt_property in batch_table.header.data:
# ignore extensions
if bt_property == "extensions" or bt_property == "extras":
continue
property_definition = batch_table.header.data[bt_property]
# ignore JSON attributes
if isinstance(property_definition, list):
continue
if previous_byte_offset != property_definition["byteOffset"]:
raise Invalid3dtilesError(
f"The byte offset is {property_definition['byteOffset']} but the byte offset computed is {previous_byte_offset}"
)
numpy_type = COMPONENT_TYPE_NUMPY_MAPPING[
property_definition["componentType"]
]
end_byte_offset = property_definition["byteOffset"] + (
np.dtype(numpy_type).itemsize
* TYPE_LENGTH_MAPPING[property_definition["type"]]
* batch_len
)
batch_table.body.data.append(
batch_table_body_array[
property_definition["byteOffset"] : end_byte_offset
].view(numpy_type)
)
previous_byte_offset = end_byte_offset
return batch_table