import concurrent.futures
import pickle
import struct
import time
from collections.abc import Generator
from pathlib import Path
from typing import Any, Optional, Union
import numpy as np
import numpy.typing as npt
from pyproj import CRS, Transformer
from py3dtiles.constants import CPU_COUNT, DEFAULT_CACHE_SIZE
from py3dtiles.exceptions import (
SrsInMissingException,
SrsInMixinException,
TilerException,
)
from py3dtiles.tilers.base_tiler import Tiler
from py3dtiles.tileset.content import read_binary_tile_content
from py3dtiles.tileset.tile import Tile
from py3dtiles.typing import ExtraFieldsDescription
from py3dtiles.utils import (
READER_MAP,
compute_spacing,
make_aabb_valid,
node_name_to_path,
)
from .matrix_manipulation import (
make_rotation_matrix,
make_scale_matrix,
make_translation_matrix,
)
from .node import Node, SharedNodeStore
from .pnts import MIN_POINT_SIZE, pnts_writer
from .point_message_type import PointManagerMessage, PointWorkerMessageType
from .point_shared_metadata import PointSharedMetadata
from .point_state import PointState
from .point_tiler_worker import PointTilerWorker
[docs]
def is_ancestor(node_name: bytes, ancestor: bytes) -> bool:
"""
Example, the tile 22 is ancestor of 22458
Particular case, the tile 22 is ancestor of 22
"""
return len(ancestor) <= len(node_name) and node_name[0 : len(ancestor)] == ancestor
[docs]
def is_ancestor_in_list(
node_name: bytes, ancestors: Union[set[bytes], dict[bytes, Any]]
) -> bool:
return any(
not ancestor or is_ancestor(node_name, ancestor) for ancestor in ancestors
)
[docs]
def can_pnts_be_written(
node_name: bytes,
finished_node: bytes,
input_nodes: dict[bytes, Any],
active_nodes: set[bytes],
) -> bool:
return (
is_ancestor(node_name, finished_node)
and not is_ancestor_in_list(node_name, active_nodes)
and not is_ancestor_in_list(node_name, input_nodes)
)
[docs]
class PointTiler(Tiler[PointSharedMetadata, PointTilerWorker]):
"""
Tiler that split pointclouds.
This tiler is able to reproject pointclouds, and can embed arbitrary fields in the resulting 3dtiles
:param crs_in: crs to use for files that don't have crs informations in their metadata, or for all files if `force_crs_in` is used
:param crs_out: output crs
:param force_crs_in: whether or not to apply crs_in for all files.
:param pyproj_always_xy: some crs defines an axis order, but some dataset still use xy order nonetheless. This boolean allows to support this case.
:param rgb: whether to include rgb info or not
:param color_scale: scale the color in the case of colors wrongly encoded in 8 bit in a 16-bit field (like in las/laz files).
:param cache_size: the size in MB to use for ram cache.
:param verbosity: verbosity level
:param number_of_jobs: how many process this tiler is allowed to use
:param extra_fields: the list of extra fields to include in the resulting 3dtiles
"""
name = "points"
files_info: dict[str, Any]
out_folder: Path
crs_in: Optional[CRS]
crs_out: Optional[CRS]
force_crs_in: bool
rgb: bool
color_scale: Optional[float]
cache_size: int
verbosity: int
file_info: dict[str, Any]
root_aabb: npt.NDArray[np.float64]
root_scale: npt.NDArray[np.float32]
root_spacing: float
node_store: SharedNodeStore
state: PointState
extra_fields_to_include: list[str]
transformer: Optional[Transformer]
number_of_jobs: int
shared_metadata: PointSharedMetadata
def __init__(
self,
crs_in: Optional[CRS] = None,
crs_out: Optional[CRS] = None,
force_crs_in: bool = False,
pyproj_always_xy: bool = False,
rgb: bool = True,
color_scale: Optional[float] = None,
cache_size: int = DEFAULT_CACHE_SIZE,
verbosity: int = 0,
number_of_jobs: int = CPU_COUNT,
extra_fields: Optional[list[str]] = None,
):
"""
Constructs a PointTiler
"""
super().__init__()
self.rgb = rgb
self.extra_fields_to_include = [] if extra_fields is None else extra_fields
self.color_scale = color_scale
self.crs_in = crs_in
self.crs_out = crs_out
self.force_crs_in = force_crs_in
self.pyproj_always_xy = pyproj_always_xy
self.cache_size = cache_size
self.verbosity = verbosity
self.number_of_jobs = number_of_jobs
[docs]
def get_worker(self) -> PointTilerWorker:
return PointTilerWorker(self.shared_metadata)
[docs]
def get_tasks(self) -> Generator[tuple[bytes, list[bytes]], None, None]:
while len(self.state.pnts_to_writing) > 0:
yield self.send_pnts_to_write()
yield from self.send_points_to_process()
while self.state.can_add_reading_jobs():
yield self.send_file_to_read()
[docs]
def initialize(
self, files: list[Path], working_dir: Path, out_folder: Path
) -> None:
self.files = files
self.out_folder = out_folder
self.files_info = self.get_files_info(self.crs_in, self.force_crs_in)
self.transformer = self.get_transformer()
(
self.rotation_matrix,
self.original_aabb,
self.avg_min,
) = self.get_rotation_matrix(self.crs_out, self.transformer)
self.root_aabb, self.root_scale, self.root_spacing = self.get_root_aabb(
self.original_aabb
)
self.node_store = SharedNodeStore(working_dir)
self.state = PointState(
self.files_info["portions"], max(1, self.number_of_jobs // 2)
)
self.shared_metadata = PointSharedMetadata(
self.transformer,
self.root_aabb,
self.root_spacing,
self.root_scale,
self.out_folder,
self.rgb,
self.color_scale,
self.files_info["extra_fields"],
self.verbosity,
)
[docs]
def supports(self, file: Path) -> bool:
extension = file.suffix.lower()
return extension in READER_MAP
[docs]
def get_files_info(
self,
crs_in: Optional[CRS],
force_crs_in: bool = False,
) -> dict[str, Any]:
pointcloud_file_portions = []
aabb = None
total_point_count = 0
avg_min = np.array([0.0, 0.0, 0.0])
# read all input files headers and determine the aabb/spacing
extra_fields_dict: dict[str, ExtraFieldsDescription] = {}
for file in self.files:
extension = file.suffix.lower()
reader = READER_MAP[extension]
file_info = reader.get_metadata(file, self.color_scale)
extra_fields_by_name = {obj.name: obj for obj in file_info["extra_fields"]}
if self.rgb and not file_info["has_color"]:
print(
f"Warning: file ${file} does not have rgb, will default to (0, 0, 0)"
)
# check if we have all the asked dimensions
for f in self.extra_fields_to_include:
if f in extra_fields_by_name:
# calculate best size
if f in extra_fields_dict:
# find common dtype able to store all the different format we might have for this field
extra_fields_dict[f].dtype = np.promote_types(
extra_fields_dict[f].dtype, extra_fields_by_name[f].dtype
)
else:
extra_fields_dict[f] = extra_fields_by_name[f]
else:
print(
f"Warning: the file {file} does not have the field {f}, will default to 0 or omitted if no other files have this field."
)
pointcloud_file_portions += file_info["portions"]
if aabb is None:
aabb = file_info["aabb"]
else:
aabb[0] = np.minimum(aabb[0], file_info["aabb"][0])
aabb[1] = np.maximum(aabb[1], file_info["aabb"][1])
file_crs_in = file_info["crs_in"]
if file_crs_in is not None:
if crs_in is None:
crs_in = file_crs_in
elif crs_in != file_crs_in and not force_crs_in:
raise SrsInMixinException(
"All input files should have the same srs in, currently there are a mix of"
f" {crs_in} and {file_crs_in}"
)
total_point_count += file_info["point_count"]
avg_min += file_info["avg_min"] / len(self.files)
# The fact self.files is not empty have been checked before, so this shouldn't happen
# but this keeps mypy happy and also serve as "defensive programming"
if aabb is None:
raise RuntimeError("No aabb could be computed!")
# correct aabb, so that we don't have null sized box
# we add 10^-5, supposing it's reasonable for most use case
make_aabb_valid(aabb)
return {
"portions": pointcloud_file_portions,
"aabb": aabb,
"crs_in": crs_in,
"point_count": total_point_count,
"avg_min": avg_min,
# note: we loop to "unwrap" the dict_values objects, which are not pickable
"extra_fields": list(extra_fields_dict.values()),
}
[docs]
def get_rotation_matrix(
self, crs_out: Optional[CRS], transformer: Optional[Transformer]
) -> tuple[
npt.NDArray[np.float64], npt.NDArray[np.float64], npt.NDArray[np.float64]
]:
avg_min: npt.NDArray[np.float64] = self.files_info["avg_min"]
aabb: npt.NDArray[np.float64] = self.files_info["aabb"]
rotation_matrix: npt.NDArray[np.float64] = np.identity(4)
if crs_out is not None and transformer is not None:
bl: npt.NDArray[np.float64] = np.array(
list(transformer.transform(aabb[0][0], aabb[0][1], aabb[0][2]))
)
tr: npt.NDArray[np.float64] = np.array(
list(transformer.transform(aabb[1][0], aabb[1][1], aabb[1][2]))
)
br: npt.NDArray[np.float64] = np.array(
list(transformer.transform(aabb[1][0], aabb[0][1], aabb[0][2]))
)
avg_min = np.array(
list(transformer.transform(avg_min[0], avg_min[1], avg_min[2]))
)
x_axis = br - bl
bl = bl - avg_min
tr = tr - avg_min
if crs_out.to_epsg() == 4978:
# Transform geocentric normal => (0, 0, 1)
# and 4978-bbox x axis => (1, 0, 0),
# to have a bbox in local coordinates that's nicely aligned with the data
rotation_matrix = make_rotation_matrix(avg_min, np.array([0, 0, 1]))
rotation_matrix = np.dot(
make_rotation_matrix(x_axis, np.array([1, 0, 0])), rotation_matrix
)
rotation_matrix_part = rotation_matrix[:3, :3].T
bl = np.dot(bl, rotation_matrix_part)
tr = np.dot(tr, rotation_matrix_part)
root_aabb = np.array([np.minimum(bl, tr), np.maximum(bl, tr)])
else:
# offset
root_aabb = aabb - avg_min
return rotation_matrix, root_aabb, avg_min
[docs]
def get_root_aabb(
self, original_aabb: npt.NDArray[np.float64]
) -> tuple[npt.NDArray[np.float64], npt.NDArray[np.float32], float]:
base_spacing = compute_spacing(original_aabb)
if base_spacing > 10:
root_scale = np.array([0.01, 0.01, 0.01])
elif base_spacing > 1:
root_scale = np.array([0.1, 0.1, 0.1])
else:
root_scale = np.array([1, 1, 1])
root_aabb = original_aabb * root_scale
root_spacing = compute_spacing(root_aabb)
return root_aabb, root_scale, root_spacing
[docs]
def print_summary(self) -> None:
print("Summary:")
print(f" - files to process: {self.files}")
print(" - points to process: {}".format(self.files_info["point_count"]))
print(f" - offset to use: {self.avg_min}")
print(f" - root spacing: {self.root_spacing / self.root_scale[0]}")
print(f" - root aabb: {self.root_aabb}")
print(f" - original aabb: {self.original_aabb}")
print(f" - scale: {self.root_scale}")
[docs]
def send_file_to_read(self) -> tuple[bytes, list[bytes]]:
if self.verbosity >= 1:
print(f"Submit next portion {self.state.point_cloud_file_parts[-1]}")
file, portion = self.state.point_cloud_file_parts.pop()
self.state.points_in_progress += portion[1] - portion[0]
self.state.number_of_reading_jobs += 1
return PointManagerMessage.READ_FILE.value, [
pickle.dumps(
{
"filename": file,
"offset_scale": (
-self.avg_min,
self.root_scale,
self.rotation_matrix[:3, :3].T,
),
"portion": portion,
}
),
]
[docs]
def send_points_to_process(
self,
) -> Generator[tuple[bytes, list[bytes]], None, None]:
potentials = sorted(
# a key (=task) can be in node_to_process and processing_nodes if the node isn't completely processed
[
(node, task)
for node, task in self.state.node_to_process.items() # task: [data...], point_count
if node not in self.state.processing_nodes
],
key=lambda task: -len(task[0]),
) # sort by node name size, the root nodes first
while potentials:
target_count = 100_000
job_list = []
count = 0
idx = len(potentials) - 1
while count < target_count and idx >= 0:
name, (tasks, point_count) = potentials[idx]
count += point_count
job_list += [
name,
self.node_store.get(name),
struct.pack(">I", len(tasks)),
] + tasks
del potentials[idx]
del self.state.node_to_process[name]
self.state.processing_nodes.add(name)
if name in self.state.waiting_writing_nodes:
self.state.waiting_writing_nodes.pop(
self.state.waiting_writing_nodes.index(name)
)
idx -= 1
if job_list:
yield PointManagerMessage.PROCESS_JOBS.value, job_list
[docs]
def send_pnts_to_write(self) -> tuple[bytes, list[bytes]]:
node_name = self.state.pnts_to_writing.pop()
data = self.node_store.get(node_name)
if not data:
raise ValueError(f"{node_name!r} has no data")
self.node_store.remove(node_name)
self.state.number_of_writing_jobs += 1
return PointManagerMessage.WRITE_PNTS.value, [node_name, data]
[docs]
def process_message(self, message_type: bytes, message: list[bytes]) -> None:
if message_type == PointWorkerMessageType.READ.value:
self.state.number_of_reading_jobs -= 1
elif message_type == PointWorkerMessageType.PROCESSED.value:
content = pickle.loads(message[-1])
self.state.processed_points += content["total"]
self.state.points_in_progress -= content["total"]
self.state.processing_nodes.remove(content["name"])
self.dispatch_processed_nodes(content)
elif message_type == PointWorkerMessageType.PNTS_WRITTEN.value:
self.state.points_in_pnts += struct.unpack(">I", message[0])[0]
self.state.number_of_writing_jobs -= 1
elif message_type == PointWorkerMessageType.NEW_TASK.value:
self.state.add_tasks_to_process(
node_name=message[0],
data=message[1],
point_count=struct.unpack(">I", message[2])[0],
)
else:
raise NotImplementedError(
f"The command {message_type!r} is not implemented"
)
[docs]
def dispatch_processed_nodes(self, content: dict[str, bytes]) -> None:
if not content["name"]:
return
self.node_store.put(content["name"], content["data"])
self.state.waiting_writing_nodes.append(content["name"])
if not self.state.is_reading_finish():
return
# if all nodes aren't processed yet,
# we should check if linked ancestors are processed
if self.state.processing_nodes or self.state.node_to_process:
finished_node = content["name"]
if can_pnts_be_written(
finished_node,
finished_node,
self.state.node_to_process,
self.state.processing_nodes,
):
self.state.waiting_writing_nodes.pop(-1)
self.state.pnts_to_writing.append(finished_node)
for i in range(len(self.state.waiting_writing_nodes) - 1, -1, -1):
candidate = self.state.waiting_writing_nodes[i]
if can_pnts_be_written(
candidate,
finished_node,
self.state.node_to_process,
self.state.processing_nodes,
):
self.state.waiting_writing_nodes.pop(i)
self.state.pnts_to_writing.append(candidate)
else:
for c in self.state.waiting_writing_nodes:
self.state.pnts_to_writing.append(c)
self.state.waiting_writing_nodes.clear()
[docs]
def validate(self) -> None:
if self.state.points_in_pnts != self.files_info["point_count"]:
raise ValueError(
"Invalid point count in the written .pnts"
+ f"(expected: {self.files_info['point_count']}, was: {self.state.points_in_pnts})"
)
[docs]
def get_root_tile(self, use_process_pool: bool = True) -> Tile:
# compute tile transform matrix
transform = np.linalg.inv(self.rotation_matrix)
transform = np.dot(transform, make_scale_matrix(1.0 / self.root_scale[0]))
transform = np.dot(make_translation_matrix(self.avg_min), transform)
# Create the root tile by sampling (or taking all points?) of child nodes
root_node = Node(
b"",
self.root_aabb,
self.root_spacing * 2,
self.shared_metadata.write_rgb,
self.shared_metadata.extra_fields_to_include,
)
root_node.children = []
inv_aabb_size = (
1.0
/ np.maximum(
MIN_POINT_SIZE,
self.root_aabb[1] - self.root_aabb[0],
)
).astype(np.float32)
for child_num in range(8):
tile_path = node_name_to_path(
self.out_folder, str(child_num).encode("ascii"), ".pnts"
)
if tile_path.exists():
tile_content = read_binary_tile_content(tile_path)
fth = tile_content.body.feature_table.header
xyz = tile_content.body.feature_table.body.position.view(
np.float32
).reshape((fth.points_length, 3))
if self.rgb:
tile_color = tile_content.body.feature_table.body.color
if tile_color is None:
raise TilerException(
"tile_content.body.feature_table.body.color shouldn't be None here. Seems to be a py3dtiles issue."
)
if tile_color.dtype != np.uint8:
raise TilerException(
"The data type of tile_content.body.feature_table.body.color must be np.uint8. Seems to be a py3dtiles issue."
)
rgb = tile_color.reshape((fth.points_length, 3)).astype(
np.uint8, copy=False
) # the astype is used for typing
else:
rgb = np.zeros(xyz.shape, dtype=np.uint8)
extra_fields: dict[str, npt.NDArray[Any]] = {}
for item in tile_content.body.batch_table.header.data.keys():
arr = tile_content.body.batch_table.get_binary_property(item)
extra_fields[item] = arr
root_node.grid.insert(
self.root_aabb[0].astype(np.float32),
inv_aabb_size,
xyz.copy(),
rgb,
extra_fields,
)
pnts_writer.node_to_pnts(
b"",
root_node,
self.out_folder,
)
if use_process_pool:
pool_executor = concurrent.futures.ProcessPoolExecutor()
else:
pool_executor = None
root_tile = Node.create_child_node_from_parent(
b"",
self.root_aabb,
self.root_spacing,
self.shared_metadata.write_rgb,
self.shared_metadata.extra_fields_to_include,
).to_tile(self.out_folder, self.root_scale, None, 0, pool_executor)
if pool_executor is not None:
pool_executor.shutdown()
if root_tile is None:
raise RuntimeError(
"root_tile cannot be None here. This is likely a tiler bug."
)
root_tile.transform = transform
root_tile.set_refine_mode(
"REPLACE"
) # The root tile is in the "REPLACE" refine mode
# And children with the "ADD" refine mode
# No need to set this property in their children, they will take the parent value if it is not present
for child in root_tile.children:
child.set_refine_mode("ADD")
return root_tile
[docs]
def benchmark(self, benchmark_id: str, startup: float) -> None:
print(
"{},{},{},{}".format(
self.benchmark,
",".join([f.name for f in self.files]),
self.state.points_in_pnts,
round(time.time() - startup, 1),
)
)
[docs]
def memory_control(self) -> None:
self.node_store.control_memory_usage(self.cache_size, self.verbosity)