from __future__ import annotations
from enum import Enum
from io import StringIO
from pathlib import Path, PurePath
from typing import Callable, NamedTuple, TYPE_CHECKING, TypeVar
import numpy as np
import numpy.typing as npt
from pyproj import CRS
if TYPE_CHECKING:
from py3dtiles.tilers.node import Node
from typing_extensions import ParamSpec
_T = TypeVar("_T", bound=npt.NBitBase)
MIN_AABB_SIZE = 0.00001
[docs]
def str_to_CRS(srs: str | CRS | None) -> CRS | None:
"""
Convert a string in a pyproj CRS object. The string could be an epsg code or a Proj4 string. If srs is already
a CRS object, the function returns the same CRS definition.
"""
if srs is None:
return None
try:
return CRS.from_epsg(int(srs)) # type: ignore [arg-type]
except ValueError:
return CRS(srs)
[docs]
class CommandType(Enum):
READ_FILE = b"read_file"
WRITE_PNTS = b"write_pnts"
PROCESS_JOBS = b"process_jobs"
SHUTDOWN = b"shutdown"
[docs]
class ResponseType(Enum):
IDLE = b"idle"
HALTED = b"halted"
READ = b"read"
PROCESSED = b"processed"
PNTS_WRITTEN = b"pnts_written"
NEW_TASK = b"new_task"
ERROR = b"error"
if TYPE_CHECKING:
Param = ParamSpec("Param")
RetType = TypeVar("RetType")
[docs]
def profile(func: Callable[Param, RetType]) -> Callable[Param, RetType]:
from line_profiler import LineProfiler
def wrapper(*args, **kwargs): # type: ignore [no-untyped-def]
lp = LineProfiler()
deco = lp(func)
res = deco(*args, **kwargs)
s = StringIO()
lp.print_stats(stream=s)
print(s.getvalue())
return res
return wrapper
[docs]
class SubdivisionType(Enum):
OCTREE = 1
QUADTREE = 2
[docs]
def node_name_to_path(
working_dir: Path, name: bytes, suffix: str = "", split_len: int = 8
) -> Path:
"""
Get the path of a tile from its name and the working directory.
If the name is '222262175' with the suffix '.pnts', the result is 'working_dir/22226217/r5.pnts'
"""
str_name = name.decode("ascii")
if len(str_name) <= split_len:
filename = PurePath("r" + str_name + suffix)
else:
# the name is split on every 'split_len' char to avoid to have too many tiles on the same folder.
sub_folders = [
str_name[i : i + split_len] for i in range(0, len(str_name), split_len)
]
working_dir = working_dir.joinpath(*sub_folders[:-1])
filename = PurePath("r" + sub_folders[-1] + suffix)
full_path = working_dir / filename
working_dir.mkdir(parents=True, exist_ok=True)
return full_path
[docs]
def compute_spacing(aabb: npt.NDArray[np.floating[_T]]) -> float:
return float(np.linalg.norm(aabb[1] - aabb[0]) / 125)
[docs]
def aabb_size_to_subdivision_type(
size: npt.NDArray[np.floating[_T]],
) -> SubdivisionType:
if size[2] / min(size[0], size[1]) < 0.5:
return SubdivisionType.QUADTREE
else:
return SubdivisionType.OCTREE
[docs]
def split_aabb(
aabb: npt.NDArray[np.floating[_T]], index: int, force_quadtree: bool = False
) -> npt.NDArray[np.floating[_T]]:
half = (aabb[1] - aabb[0]) * 0.5
t = aabb_size_to_subdivision_type(half)
new_aabb = np.array([aabb[0], aabb[0] + half])
if index & 4:
new_aabb[0][0] += half[0]
new_aabb[1][0] += half[0]
if index & 2:
new_aabb[0][1] += half[1]
new_aabb[1][1] += half[1]
if force_quadtree or t == SubdivisionType.QUADTREE:
new_aabb[1][2] += half[2]
elif index & 1:
new_aabb[0][2] += half[2]
new_aabb[1][2] += half[2]
return new_aabb
[docs]
def make_aabb_cubic(aabb: npt.NDArray[np.floating[_T]]) -> npt.NDArray[np.floating[_T]]:
s = max(aabb[1] - aabb[0])
aabb[1][0] = aabb[0][0] + s
aabb[1][1] = aabb[0][1] + s
aabb[1][2] = aabb[0][2] + s
return aabb
[docs]
def make_aabb_valid(aabb: list[list[float]]) -> None:
"""
Modify inplace the aabb so that no dimension is 0-sized
"""
for i in range(3):
if aabb[0][i] == aabb[1][i]:
aabb[1][i] = aabb[1][i] + MIN_AABB_SIZE
[docs]
def node_from_name(
name: bytes,
parent_aabb: npt.NDArray[np.floating[_T]],
parent_spacing: float,
) -> Node:
from py3dtiles.tilers.node import Node
spacing = parent_spacing * 0.5
aabb = split_aabb(parent_aabb, int(name[-1])) if len(name) > 0 else parent_aabb
# let's build a new Node
return Node(name, aabb.astype(np.float64), spacing)