Source code for py3dtiles.convert

import argparse
import concurrent.futures
import json
import os
import pickle
import shutil
import struct
import sys
import tempfile
import time
import traceback
from multiprocessing import Process, cpu_count
from pathlib import Path, PurePath
from typing import Any, Dict, List, Optional, Set, Tuple, Union

import numpy as np
import numpy.typing as npt
import psutil
import zmq
from pyproj import CRS, Transformer

from py3dtiles.constants import EXIT_CODES
from py3dtiles.exceptions import (
    FormatSupportMissingException,
    SrsInMissingException,
    SrsInMixinException,
    TilerException,
    WorkerException,
)
from py3dtiles.reader import xyz_reader
from py3dtiles.tilers.matrix_manipulation import (
    make_rotation_matrix,
    make_scale_matrix,
    make_translation_matrix,
)
from py3dtiles.tilers.node import Node, NodeCatalog, NodeProcess, SharedNodeStore
from py3dtiles.tilers.pnts import pnts_writer
from py3dtiles.tilers.pnts.constants import MIN_POINT_SIZE
from py3dtiles.tileset.content import read_binary_tile_content
from py3dtiles.typing import PortionsType
from py3dtiles.utils import (
    CommandType,
    OctreeMetadata,
    ResponseType,
    compute_spacing,
    make_aabb_valid,
    node_from_name,
    node_name_to_path,
    str_to_CRS,
)

TOTAL_MEMORY_MB = int(psutil.virtual_memory().total / (1024 * 1024))
DEFAULT_CACHE_SIZE = int(TOTAL_MEMORY_MB / 10)
CPU_COUNT = cpu_count()

# IPC protocol is not supported on Windows
if os.name == "nt":
    URI = "tcp://127.0.0.1:0"
else:
    # Generate a unique name for this socket
    tmpdir = tempfile.TemporaryDirectory()
    URI = f"ipc://{tmpdir.name}/py3dtiles.sock"


READER_MAP = {
    ".xyz": xyz_reader,
    ".csv": xyz_reader,
}

try:
    from py3dtiles.reader import las_reader

    READER_MAP[".las"] = las_reader
    # then check for the presence of laz support
    from importlib.util import find_spec

    if find_spec("laszip") or find_spec("lazrs"):
        READER_MAP[".laz"] = las_reader
except ImportError:
    pass

try:
    from py3dtiles.reader import ply_reader

    READER_MAP[".ply"] = ply_reader
except ImportError:
    pass


[docs] def worker_target( activity_graph: bool, transformer: Optional[Transformer], octree_metadata: OctreeMetadata, folder: Path, write_rgb: bool, color_scale: Optional[float], write_classification: bool, verbosity: int, uri: bytes, ) -> None: return Worker( activity_graph, transformer, octree_metadata, folder, write_rgb, color_scale, write_classification, verbosity, uri, ).run()
[docs] class Worker: """ This class waits from jobs commands from the Zmq socket. """ def __init__( self, activity_graph: bool, transformer: Optional[Transformer], octree_metadata: OctreeMetadata, folder: Path, write_rgb: bool, color_scale: Optional[float], write_classification: bool, verbosity: int, uri: bytes, ) -> None: self.activity_graph = activity_graph self.transformer = transformer self.octree_metadata = octree_metadata self.folder = folder self.write_rgb = write_rgb self.color_scale = color_scale self.write_classification = write_classification self.verbosity = verbosity self.uri = uri # Socket to receive messages on self.context = zmq.Context()
[docs] def run(self) -> None: self.skt: zmq.Socket[bytes] = self.context.socket(zmq.DEALER) self.skt.connect(self.uri) # type: ignore [arg-type] startup_time = time.time() idle_time = 0.0 if self.activity_graph: activity = open(f"activity.{os.getpid()}.csv", "w") # notify we're ready self.skt.send_multipart([ResponseType.REGISTER.value]) while True: try: before = time.time() - startup_time self.skt.poll() after = time.time() - startup_time idle_time += after - before message = self.skt.recv_multipart() content = message[1:] command = content[0] delta = time.time() - pickle.loads(message[0]) if delta > 0.01 and self.verbosity >= 1: print( f"{os.getpid()} / {round(after, 2)} : Delta time: {round(delta, 3)}" ) if command == CommandType.READ_FILE.value: self.execute_read_file(content) command_type = 1 elif command == CommandType.PROCESS_JOBS.value: self.execute_process_jobs(content) command_type = 2 elif command == CommandType.WRITE_PNTS.value: self.execute_write_pnts(content[2], content[1]) command_type = 3 elif command == CommandType.SHUTDOWN.value: break # ack else: raise NotImplementedError(f"Unknown command {command!r}") # notify we're idle self.skt.send_multipart([ResponseType.IDLE.value]) if self.activity_graph: print(f"{before}, {command_type}", file=activity) print(f"{before}, 0", file=activity) print(f"{after}, 0", file=activity) print(f"{after}, {command_type}", file=activity) except Exception as e: traceback.print_exc() # usually first arg is the explaining string. # let's assume it is always in our context self.skt.send_multipart([ResponseType.ERROR.value, e.args[0].encode()]) # we still print it for stacktraces if self.activity_graph: activity.close() if self.verbosity >= 1: print( "total: {} sec, idle: {}".format( round(time.time() - startup_time, 1), round(idle_time, 1) ) ) self.skt.send_multipart([ResponseType.HALTED.value])
[docs] def execute_read_file(self, content: List[bytes]) -> None: parameters = pickle.loads(content[1]) extension = PurePath(parameters["filename"]).suffix if extension in READER_MAP: reader = READER_MAP[extension] else: raise ValueError( f"The file with {extension} extension can't be read, " f"the available extensions are: {', '.join(READER_MAP.keys())}" ) reader_gen = reader.run( parameters["filename"], parameters["offset_scale"], parameters["portion"], self.transformer, self.color_scale, ) for coords, colors, classification in reader_gen: self.skt.send_multipart( [ ResponseType.NEW_TASK.value, b"", pickle.dumps( {"xyz": coords, "rgb": colors, "classification": classification} ), struct.pack(">I", len(coords)), ], copy=False, ) self.skt.send_multipart([ResponseType.READ.value])
[docs] def execute_write_pnts(self, data: bytes, node_name: bytes) -> None: pnts_writer_gen = pnts_writer.run( data, self.folder, self.write_rgb, self.write_classification ) for total in pnts_writer_gen: self.skt.send_multipart( [ResponseType.PNTS_WRITTEN.value, struct.pack(">I", total), node_name] )
[docs] def execute_process_jobs(self, content: List[bytes]) -> None: begin = time.time() log_enabled = self.verbosity >= 2 if log_enabled: log_filename = f"py3dtiles-{os.getpid()}.log" log_file = open(log_filename, "a") else: log_file = None work = content[1:] i = 0 while i < len(work): name = work[i] node = work[i + 1] count = struct.unpack(">I", work[i + 2])[0] tasks = work[i + 3 : i + 3 + count] i += 3 + count node_catalog = NodeCatalog(node, name, self.octree_metadata) node_process = NodeProcess( node_catalog, self.octree_metadata, name, tasks, begin, log_file, ) for proc_name, proc_data, proc_point_count in node_process.run(): self.skt.send_multipart( [ ResponseType.NEW_TASK.value, proc_name, proc_data, struct.pack(">I", proc_point_count), ], copy=False, block=False, ) if log_enabled: print(f"save on disk {name!r} [{time.time() - begin}]", file=log_file) # save node state on disk if len(name) > 0: data = node_catalog.dump(name, node_process.infer_depth_from_name() - 1) else: data = b"" if log_enabled: print(f"saved on disk [{time.time() - begin}]", file=log_file) self.skt.send_multipart( [ ResponseType.PROCESSED.value, pickle.dumps( { "name": name, "total": node_process.total_point_count, "data": data, } ), ], copy=False, ) if log_enabled: print( "[<] return result [{} sec] [{}]".format( round(time.time() - begin, 2), time.time() - begin ), file=log_file, flush=True, ) if log_file is not None: log_file.close()
# Manager
[docs] class ZmqManager: """ This class sends messages to the workers. We can also request general status. """ def __init__( self, number_of_jobs: int, process_args: Tuple[ bool, Optional[Transformer], OctreeMetadata, Path, bool, Optional[float], bool, int, ], ) -> None: """ For the process_args argument, see the init method of Worker to get the list of needed parameters. """ self.context = zmq.Context() self.number_of_jobs = number_of_jobs self.socket = self.context.socket(zmq.ROUTER) self.socket.bind(URI) # Useful only when TCP is used to get the URI with the opened port self.uri = self.socket.getsockopt(zmq.LAST_ENDPOINT) if not isinstance(self.uri, bytes): raise RuntimeError( "The uri returned by self.socket.getsockopt should be bytes." ) self.processes = [ Process(target=worker_target, args=(*process_args, self.uri)) for _ in range(number_of_jobs) ] for p in self.processes: p.start() self.activities = [p.pid for p in self.processes] self.clients: Set[bytes] = set() self.idle_clients: Set[bytes] = set() self.killing_processes = False self.number_processes_killed = 0 self.time_waiting_an_idle_process = 0.0
[docs] def all_clients_registered(self) -> bool: return len(self.clients) == self.number_of_jobs
[docs] def send_to_process(self, message: List[bytes]) -> None: if not self.idle_clients: raise ValueError("idle_clients is empty") self.socket.send_multipart( [self.idle_clients.pop(), pickle.dumps(time.time())] + message )
[docs] def send_to_all_processes(self, message: List[bytes]) -> None: if len(self.clients) == 0: raise ValueError("No registered clients") for client in self.clients: self.socket.send_multipart([client, pickle.dumps(time.time())] + message)
[docs] def send_to_all_idle_processes(self, message: List[bytes]) -> None: if not self.idle_clients: raise ValueError("idle_clients is empty") for client in self.idle_clients: self.socket.send_multipart([client, pickle.dumps(time.time())] + message) self.idle_clients.clear()
[docs] def can_queue_more_jobs(self) -> bool: return len(self.idle_clients) != 0
[docs] def register_client(self, client_id: bytes) -> None: if client_id in self.clients: print(f"Warning: {client_id!r} already registered") else: self.clients.add(client_id) self.add_idle_client(client_id)
[docs] def add_idle_client(self, client_id: bytes) -> None: if client_id in self.idle_clients: raise ValueError(f"The client id {client_id!r} is already in idle_clients") self.idle_clients.add(client_id)
[docs] def are_all_processes_idle(self) -> bool: return len(self.idle_clients) == self.number_of_jobs
[docs] def are_all_processes_killed(self) -> bool: return self.number_processes_killed == self.number_of_jobs
[docs] def shutdown_all_processes(self) -> None: self.send_to_all_processes([CommandType.SHUTDOWN.value]) self.killing_processes = True
[docs] def join_all_processes(self) -> None: for p in self.processes: p.join()
[docs] def is_ancestor(node_name: bytes, ancestor: bytes) -> bool: """ Example, the tile 22 is ancestor of 22458 Particular case, the tile 22 is ancestor of 22 """ return len(ancestor) <= len(node_name) and node_name[0 : len(ancestor)] == ancestor
[docs] def is_ancestor_in_list( node_name: bytes, ancestors: Union[List[bytes], Dict[bytes, Any]] ) -> bool: return any( not ancestor or is_ancestor(node_name, ancestor) for ancestor in ancestors )
[docs] def can_pnts_be_written( node_name: bytes, finished_node: bytes, input_nodes: Union[List[bytes], Dict[bytes, Any]], active_nodes: Union[List[bytes], Dict[bytes, Any]], ) -> bool: return ( is_ancestor(node_name, finished_node) and not is_ancestor_in_list(node_name, active_nodes) and not is_ancestor_in_list(node_name, input_nodes) )
[docs] class State: def __init__( self, pointcloud_file_portions: PortionsType, max_reading_jobs: int ) -> None: self.processed_points = 0 self.max_point_in_progress = 60_000_000 self.points_in_progress = 0 self.points_in_pnts = 0 # pointcloud_file_portions is a list of tuple (filename, (start offset, end offset)) self.point_cloud_file_parts = pointcloud_file_portions self.initial_portion_count = len(pointcloud_file_portions) self.max_reading_jobs = max_reading_jobs self.number_of_reading_jobs = 0 self.number_of_writing_jobs = 0 # node_to_process is a dictionary of tasks, # each entry is a tile identified by its name (a string of numbers) # so for each entry, it is a list of tasks # a task is a tuple (list of points, point_count) # points is a dictionary {xyz: list of coordinates, color: the associated color} self.node_to_process: Dict[bytes, Tuple[List[bytes], int]] = {} # when a node is sent to a process, the item moves to processing_nodes # the structure is different. The key remains the node name. But the value is : (len(tasks), point_count, now) # these values is for loging self.processing_nodes: Dict[bytes, Tuple[int, int, float]] = {} # when processing is finished, move the tile name in processed_nodes # since the content is at this stage, stored in the node_store, # just keep the name of the node. # This list will be filled until the writing could be started. self.waiting_writing_nodes: List[bytes] = [] # when the node is writing, its name is moved from waiting_writing_nodes to pnts_to_writing # the data to write are stored in a node object. self.pnts_to_writing: List[bytes] = []
[docs] def is_reading_finish(self) -> bool: return not self.point_cloud_file_parts and self.number_of_reading_jobs == 0
[docs] def add_tasks_to_process( self, node_name: bytes, data: bytes, point_count: int ) -> None: if point_count <= 0: raise ValueError( "point_count should be strictly positive, currently", point_count ) if node_name not in self.node_to_process: self.node_to_process[node_name] = ([data], point_count) else: tasks, count = self.node_to_process[node_name] tasks.append(data) self.node_to_process[node_name] = (tasks, count + point_count)
[docs] def can_add_reading_jobs(self) -> bool: return bool( self.point_cloud_file_parts and self.points_in_progress < self.max_point_in_progress and self.number_of_reading_jobs < self.max_reading_jobs )
[docs] def print_debug(self) -> None: print("{:^16}|{:^8}|{:^8}|{:^8}".format("Step", "Input", "Active", "Inactive")) print( "{:^16}|{:^8}|{:^8}|{:^8}".format( "Reader", len(self.point_cloud_file_parts), self.number_of_reading_jobs, "", ) ) print( "{:^16}|{:^8}|{:^8}|{:^8}".format( "Node process", len(self.node_to_process), len(self.processing_nodes), len(self.waiting_writing_nodes), ) ) print( "{:^16}|{:^8}|{:^8}|{:^8}".format( "Pnts writer", len(self.pnts_to_writing), self.number_of_writing_jobs, "", ) )
[docs] def convert(*args, **kwargs) -> None: # type: ignore [no-untyped-def] # todo use directly the _Convert class converter = _Convert(*args, **kwargs) return converter.convert()
class _Convert: def __init__( self, files: Union[List[Union[str, Path]], str, Path], outfolder: Union[str, Path] = "./3dtiles", overwrite: bool = False, jobs: int = CPU_COUNT, cache_size: int = DEFAULT_CACHE_SIZE, crs_out: Optional[CRS] = None, crs_in: Optional[CRS] = None, force_crs_in: bool = False, fraction: int = 100, benchmark: Optional[str] = None, rgb: bool = True, classification: bool = True, graph: bool = False, color_scale: Optional[float] = None, verbose: int = False, ) -> None: """ :param files: Filenames to process. The file must use the .las, .laz, .xyz or .ply format. :param outfolder: The folder where the resulting tileset will be written. :param overwrite: Overwrite the ouput folder if it already exists. :param jobs: The number of parallel jobs to start. Default to the number of cpu. :param cache_size: Cache size in MB. Default to available memory / 10. :param crs_out: CRS to convert the output with :param crs_in: Set a default input CRS :param force_crs_in: Force every input CRS to be `crs_in`, even if not null :param fraction: Percentage of the pointcloud to process, between 0 and 100. :param benchmark: Print summary at the end of the process :param rgb: Export rgb attributes. :param classification: Export classification attribute. :param graph: Produce debug graphs (requires pygal). :param color_scale: Scale the color with the specified amount. Useful to lighten or darken black pointclouds with only intensity. :raises SrsInMissingException: if py3dtiles couldn't find srs informations in input files and srs_in is not specified :raises SrsInMixinException: if the input files have different CRS """ self.jobs = jobs self.cache_size = cache_size self.rgb = rgb self.classification = classification # allow str directly if only one input files = [files] if isinstance(files, (str, Path)) else files self.files = [Path(file) for file in files] self.verbose = verbose self.graph = graph self.benchmark = benchmark self.color_scale = color_scale self.file_info = self.get_file_info(crs_in, force_crs_in) transformer = self.get_transformer(crs_out) ( self.rotation_matrix, self.original_aabb, self.avg_min, ) = self.get_rotation_matrix(crs_out, transformer) self.root_aabb, self.root_scale, self.root_spacing = self.get_root_aabb( self.original_aabb ) octree_metadata = OctreeMetadata( aabb=self.root_aabb, spacing=self.root_spacing, scale=self.root_scale[0] ) if self.verbose >= 1: self.print_summary() if self.graph: self.progression_log = open("progression.csv", "w") # create folder self.out_folder = Path(outfolder) if self.out_folder.is_dir(): if overwrite: shutil.rmtree(self.out_folder, ignore_errors=True) else: raise FileExistsError(f"Folder '{self.out_folder}' already exists") self.out_folder.mkdir() self.working_dir = self.out_folder / "tmp" self.working_dir.mkdir(parents=True) self.zmq_manager = ZmqManager( self.jobs, ( self.graph, transformer, octree_metadata, self.out_folder, self.rgb, self.color_scale, self.classification, self.verbose, ), ) self.node_store = SharedNodeStore(self.working_dir) self.state = State(self.file_info["portions"], max(1, self.jobs // 2)) def get_file_info( self, crs_in: Optional[CRS], force_crs_in: bool = False, ) -> Dict[str, Any]: pointcloud_file_portions = [] aabb = None total_point_count = 0 avg_min = np.array([0.0, 0.0, 0.0]) # read all input files headers and determine the aabb/spacing for file in self.files: extension = file.suffix if extension in READER_MAP: reader = READER_MAP[extension] else: raise FormatSupportMissingException( f"The file with {extension} extension can't be read, " f"the available extensions are: {', '.join(READER_MAP.keys())}. " f"NOTE: for ply, las and laz support, some additional dependencies are needed. Please check the documentation." ) file_info = reader.get_metadata(file) pointcloud_file_portions += file_info["portions"] if aabb is None: aabb = file_info["aabb"] else: aabb[0] = np.minimum(aabb[0], file_info["aabb"][0]) aabb[1] = np.maximum(aabb[1], file_info["aabb"][1]) file_crs_in = file_info["crs_in"] if file_crs_in is not None: if crs_in is None: crs_in = file_crs_in elif crs_in != file_crs_in and not force_crs_in: raise SrsInMixinException( "All input files should have the same srs in, currently there are a mix of" f" {crs_in} and {file_crs_in}" ) total_point_count += file_info["point_count"] avg_min += file_info["avg_min"] / len(self.files) # The fact self.files is not empty have been checked before, so this shouldn't happen # but this keeps mypy happy and also serve as "defensive programming" if aabb is None: raise RuntimeError("No aabb could be computed!") # correct aabb, so that we don't have null sized box # we add 10^-5, supposing it's reasonable for most use case make_aabb_valid(aabb) return { "portions": pointcloud_file_portions, "aabb": aabb, "crs_in": crs_in, "point_count": total_point_count, "avg_min": avg_min, } def get_transformer(self, crs_out: Optional[CRS]) -> Optional[Transformer]: if crs_out: if self.file_info["crs_in"] is None: raise SrsInMissingException( "None file has a input srs specified. Should be provided." ) transformer = Transformer.from_crs(self.file_info["crs_in"], crs_out) else: transformer = None return transformer def get_rotation_matrix( self, crs_out: Optional[CRS], transformer: Optional[Transformer] ) -> Tuple[ npt.NDArray[np.float64], npt.NDArray[np.float64], npt.NDArray[np.float64] ]: avg_min: npt.NDArray[np.float64] = self.file_info["avg_min"] aabb: npt.NDArray[np.float64] = self.file_info["aabb"] rotation_matrix: npt.NDArray[np.float64] = np.identity(4) if crs_out is not None and transformer is not None: bl: npt.NDArray[np.float64] = np.array( list(transformer.transform(aabb[0][0], aabb[0][1], aabb[0][2])) ) tr: npt.NDArray[np.float64] = np.array( list(transformer.transform(aabb[1][0], aabb[1][1], aabb[1][2])) ) br: npt.NDArray[np.float64] = np.array( list(transformer.transform(aabb[1][0], aabb[0][1], aabb[0][2])) ) avg_min = np.array( list(transformer.transform(avg_min[0], avg_min[1], avg_min[2])) ) x_axis = br - bl bl = bl - avg_min tr = tr - avg_min if crs_out.to_epsg() == 4978: # Transform geocentric normal => (0, 0, 1) # and 4978-bbox x axis => (1, 0, 0), # to have a bbox in local coordinates that's nicely aligned with the data rotation_matrix = make_rotation_matrix(avg_min, np.array([0, 0, 1])) rotation_matrix = np.dot( make_rotation_matrix(x_axis, np.array([1, 0, 0])), rotation_matrix ) rotation_matrix_part = rotation_matrix[:3, :3].T bl = np.dot(bl, rotation_matrix_part) tr = np.dot(tr, rotation_matrix_part) root_aabb = np.array([np.minimum(bl, tr), np.maximum(bl, tr)]) else: # offset root_aabb = aabb - avg_min return rotation_matrix, root_aabb, avg_min def get_root_aabb( self, original_aabb: npt.NDArray[np.float64] ) -> Tuple[npt.NDArray[np.float64], npt.NDArray[np.float32], float]: base_spacing = compute_spacing(original_aabb) if base_spacing > 10: root_scale = np.array([0.01, 0.01, 0.01]) elif base_spacing > 1: root_scale = np.array([0.1, 0.1, 0.1]) else: root_scale = np.array([1, 1, 1]) root_aabb = original_aabb * root_scale root_spacing = compute_spacing(root_aabb) return root_aabb, root_scale, root_spacing def convert(self) -> None: """convert Convert pointclouds (xyz, las or laz) to 3dtiles tileset containing pnts node """ startup: float = time.time() try: while not self.zmq_manager.killing_processes: now = time.time() - startup at_least_one_job_ended = False if ( not self.zmq_manager.can_queue_more_jobs() or self.zmq_manager.socket.poll(timeout=0, flags=zmq.POLLIN) ): at_least_one_job_ended = self.process_message() # we wait for all processes/threads to register # if we don't there are tricky cases where an exception fires in a worker before all the workers registered, which means that not all workers will receive the shutdown signal if not self.zmq_manager.all_clients_registered(): continue while ( self.state.pnts_to_writing and self.zmq_manager.can_queue_more_jobs() ): self.send_pnts_to_write() if self.zmq_manager.can_queue_more_jobs(): self.send_points_to_process(now) while ( self.state.can_add_reading_jobs() and self.zmq_manager.can_queue_more_jobs() ): self.send_file_to_read() # if at this point we have no work in progress => we're done if self.zmq_manager.are_all_processes_idle(): break if at_least_one_job_ended: self.print_debug(now) if self.graph: percent = round( 100 * self.state.processed_points / self.file_info["point_count"], 3, ) print( f"{time.time() - startup}, {percent}", file=self.progression_log, ) self.node_store.control_memory_usage(self.cache_size, self.verbose) if self.state.points_in_pnts != self.file_info["point_count"]: raise ValueError( "!!! Invalid point count in the written .pnts" + f"(expected: {self.file_info['point_count']}, was: {self.state.points_in_pnts})" ) if self.verbose >= 1: print("Writing 3dtiles {}".format(self.file_info["avg_min"])) self.write_tileset() shutil.rmtree(self.working_dir) if self.verbose >= 1: print("Done") if self.benchmark: print( "{},{},{},{}".format( self.benchmark, ",".join([f.name for f in self.files]), self.state.points_in_pnts, round(time.time() - startup, 1), ) ) finally: self.zmq_manager.shutdown_all_processes() self.zmq_manager.join_all_processes() if self.verbose >= 1: print( "destroy", round(self.zmq_manager.time_waiting_an_idle_process, 2) ) # pygal chart if self.graph: self.progression_log.close() self.draw_graph() self.zmq_manager.context.destroy() def process_message(self) -> bool: one_job_ended = False # Blocking read but it's fine because either all our child processes are busy # or we know that there's something to read (zmq.POLLIN) start = time.time() message = self.zmq_manager.socket.recv_multipart() client_id = message[0] result = message[1:] return_type = result[0] if return_type == ResponseType.REGISTER.value: self.zmq_manager.register_client(client_id) elif return_type == ResponseType.IDLE.value: self.zmq_manager.add_idle_client(client_id) if not self.zmq_manager.can_queue_more_jobs(): self.zmq_manager.time_waiting_an_idle_process += time.time() - start elif return_type == ResponseType.HALTED.value: self.zmq_manager.number_processes_killed += 1 elif return_type == ResponseType.READ.value: self.state.number_of_reading_jobs -= 1 one_job_ended = True elif return_type == ResponseType.PROCESSED.value: content = pickle.loads(result[-1]) self.state.processed_points += content["total"] self.state.points_in_progress -= content["total"] del self.state.processing_nodes[content["name"]] self.dispatch_processed_nodes(content) one_job_ended = True elif return_type == ResponseType.PNTS_WRITTEN.value: self.state.points_in_pnts += struct.unpack(">I", result[1])[0] self.state.number_of_writing_jobs -= 1 elif return_type == ResponseType.NEW_TASK.value: self.state.add_tasks_to_process( node_name=result[1], data=result[2], point_count=struct.unpack(">I", result[3])[0], ) elif return_type == ResponseType.ERROR.value: raise WorkerException( f"An exception occurred in a worker: {result[1].decode()}" ) else: raise NotImplementedError(f"The command {return_type!r} is not implemented") return one_job_ended def dispatch_processed_nodes(self, content: Dict[str, bytes]) -> None: if not content["name"]: return self.node_store.put(content["name"], content["data"]) self.state.waiting_writing_nodes.append(content["name"]) if not self.state.is_reading_finish(): return # if all nodes aren't processed yet, # we should check if linked ancestors are processed if self.state.processing_nodes or self.state.node_to_process: finished_node = content["name"] if can_pnts_be_written( finished_node, finished_node, self.state.node_to_process, self.state.processing_nodes, ): self.state.waiting_writing_nodes.pop(-1) self.state.pnts_to_writing.append(finished_node) for i in range(len(self.state.waiting_writing_nodes) - 1, -1, -1): candidate = self.state.waiting_writing_nodes[i] if can_pnts_be_written( candidate, finished_node, self.state.node_to_process, self.state.processing_nodes, ): self.state.waiting_writing_nodes.pop(i) self.state.pnts_to_writing.append(candidate) else: for c in self.state.waiting_writing_nodes: self.state.pnts_to_writing.append(c) self.state.waiting_writing_nodes.clear() def send_pnts_to_write(self) -> None: node_name = self.state.pnts_to_writing.pop() data = self.node_store.get(node_name) if not data: raise ValueError(f"{node_name!r} has no data") self.zmq_manager.send_to_process( [CommandType.WRITE_PNTS.value, node_name, data] ) self.node_store.remove(node_name) self.state.number_of_writing_jobs += 1 def send_points_to_process(self, now: float) -> None: potentials = sorted( # a key (=task) can be in node_to_process and processing_nodes if the node isn't completely processed [ (node, task) for node, task in self.state.node_to_process.items() # task: [data...], point_count if node not in self.state.processing_nodes ], key=lambda task: -len(task[0]), ) # sort by node name size, the root nodes first while self.zmq_manager.can_queue_more_jobs() and potentials: target_count = 100_000 job_list = [] count = 0 idx = len(potentials) - 1 while count < target_count and idx >= 0: name, (tasks, point_count) = potentials[idx] count += point_count job_list += [ name, self.node_store.get(name), struct.pack(">I", len(tasks)), ] + tasks del potentials[idx] del self.state.node_to_process[name] self.state.processing_nodes[name] = (len(tasks), point_count, now) if name in self.state.waiting_writing_nodes: self.state.waiting_writing_nodes.pop( self.state.waiting_writing_nodes.index(name) ) idx -= 1 if job_list: self.zmq_manager.send_to_process( [CommandType.PROCESS_JOBS.value] + job_list ) def send_file_to_read(self) -> None: if self.verbose >= 1: print(f"Submit next portion {self.state.point_cloud_file_parts[-1]}") file, portion = self.state.point_cloud_file_parts.pop() self.state.points_in_progress += portion[1] - portion[0] self.zmq_manager.send_to_process( [ CommandType.READ_FILE.value, pickle.dumps( { "filename": file, "offset_scale": ( -self.avg_min, self.root_scale, self.rotation_matrix[:3, :3].T, ), "portion": portion, } ), ] ) self.state.number_of_reading_jobs += 1 def write_tileset(self) -> None: # compute tile transform matrix transform = np.linalg.inv(self.rotation_matrix) transform = np.dot(transform, make_scale_matrix(1.0 / self.root_scale[0])) transform = np.dot(make_translation_matrix(self.avg_min), transform) # Create the root tile by sampling (or taking all points?) of child nodes root_node = Node(b"", self.root_aabb, self.root_spacing * 2) root_node.children = [] inv_aabb_size = ( 1.0 / np.maximum(MIN_POINT_SIZE, self.root_aabb[1] - self.root_aabb[0]) ).astype(np.float32) for child_num in range(8): tile_path = node_name_to_path( self.out_folder, str(child_num).encode("ascii"), ".pnts" ) if tile_path.exists(): tile_content = read_binary_tile_content(tile_path) fth = tile_content.body.feature_table.header xyz = tile_content.body.feature_table.body.position.view( np.float32 ).reshape((fth.points_length, 3)) if self.rgb: tile_color = tile_content.body.feature_table.body.color if tile_color is None: raise TilerException( "tile_content.body.feature_table.body.color shouldn't be None here. Seems to be a py3dtiles issue." ) if tile_color.dtype != np.uint8: raise TilerException( "The data type of tile_content.body.feature_table.body.color must be np.uint8. Seems to be a py3dtiles issue." ) rgb = tile_color.reshape((fth.points_length, 3)).astype( np.uint8, copy=False ) # the astype is used for typing else: rgb = np.zeros(xyz.shape, dtype=np.uint8) if self.classification: classification = ( tile_content.body.batch_table.get_binary_property( "Classification" ) .astype(np.uint8) .reshape(-1, 1) ) else: classification = np.zeros((fth.points_length, 1), dtype=np.uint8) root_node.grid.insert( self.root_aabb[0].astype(np.float32), inv_aabb_size, xyz.copy(), rgb, classification, ) pnts_writer.node_to_pnts( b"", root_node, self.out_folder, self.rgb, self.classification ) pool_executor = concurrent.futures.ProcessPoolExecutor() root_tileset = node_from_name( b"", self.root_aabb, self.root_spacing ).to_tileset(self.out_folder, self.root_scale, None, 0, pool_executor) pool_executor.shutdown() if root_tileset is None: raise RuntimeError( "root_tileset cannot be None here. This is likely a tiler bug." ) root_tileset["transform"] = transform.T.reshape(16).tolist() root_tileset[ "refine" ] = "REPLACE" # The root tile is in the "REPLACE" refine mode if "children" in root_tileset: # And children with the "ADD" refine mode # No need to set this property in their children, they will take the parent value if it is not present for child in root_tileset["children"]: child["refine"] = "ADD" geometric_error = ( np.linalg.norm(self.root_aabb[1] - self.root_aabb[0]) / self.root_scale[0] ) tileset = { "asset": { "version": "1.0", }, "geometricError": geometric_error, "root": root_tileset, } tileset_path = self.out_folder / "tileset.json" with tileset_path.open("w") as f: f.write(json.dumps(tileset)) def print_summary(self) -> None: print("Summary:") print(" - points to process: {}".format(self.file_info["point_count"])) print(f" - offset to use: {self.avg_min}") print(f" - root spacing: {self.root_spacing / self.root_scale[0]}") print(f" - root aabb: {self.root_aabb}") print(f" - original aabb: {self.original_aabb}") print(f" - scale: {self.root_scale}") def draw_graph(self) -> None: import pygal dateline = pygal.XY(x_label_rotation=25, secondary_range=(0, 100)) for pid in self.zmq_manager.activities: activity = [] activity_csv_path = Path(f"activity.{pid}.csv") i = ( len(self.zmq_manager.activities) - self.zmq_manager.activities.index(pid) - 1 ) # activities.index(pid) = with activity_csv_path.open() as f: content = f.read().split("\n") for line in content[1:]: line_item = line.split(",") if line_item[0]: ts = float(line_item[0]) value = int(line_item[1]) / 3.0 activity.append((ts, i + value * 0.9)) activity_csv_path.unlink() if activity: activity.append((activity[-1][0], activity[0][1])) activity.append(activity[0]) dateline.add(str(pid), activity, show_dots=False, fill=True) progression_csv_path = Path("progression.csv") with progression_csv_path.open() as f: values = [] for line in f.read().split("\n"): if line: line_item = line.split(",") values += [(float(line_item[0]), float(line_item[1]))] progression_csv_path.unlink() dateline.add( "progression", values, show_dots=False, secondary=True, stroke_style={"width": 2, "color": "black"}, ) dateline.render_to_file("activity.svg") def print_debug(self, now: float) -> None: if self.verbose >= 3: print("{:^16}|{:^8}|{:^8}".format("Name", "Points", "Seconds")) for name, v in self.state.processing_nodes.items(): print( "{:^16}|{:^8}|{:^8}".format( "{} ({})".format(name.decode("ascii"), v[0]), v[1], round(now - v[2], 1), ) ) print("") print("Pending:") print( " - root: {} / {}".format( len(self.state.point_cloud_file_parts), self.state.initial_portion_count, ) ) print( " - other: {} files for {} nodes".format( sum([len(f[0]) for f in self.state.node_to_process.values()]), len(self.state.node_to_process), ) ) print("") elif self.verbose >= 2: self.state.print_debug() if self.verbose >= 1: print( "{} % points in {} sec [{} tasks, {} nodes, {} wip]".format( round( 100 * self.state.processed_points / self.file_info["point_count"], 2, ), round(now, 1), self.jobs - len(self.zmq_manager.idle_clients), len(self.state.processing_nodes), self.state.points_in_progress, ) ) elif self.verbose >= 0: percent = round( 100 * self.state.processed_points / self.file_info["point_count"], 2 ) time_left = (100 - percent) * now / (percent + 0.001) print( f"\r{percent:>6} % in {round(now)} sec [est. time left: {round(time_left)} sec] ", end="", flush=True, )
[docs] def init_parser( subparser: "argparse._SubParsersAction[Any]", ) -> argparse.ArgumentParser: parser: argparse.ArgumentParser = subparser.add_parser( "convert", help="Convert input 3D data to a 3dtiles tileset.", formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) parser.add_argument( "files", nargs="+", help="Filenames to process. The file must use the .las, .laz (lastools must be installed), .xyz or .ply format.", ) parser.add_argument( "--out", type=str, help="The folder where the resulting tileset will be written.", default="./3dtiles", ) parser.add_argument( "--overwrite", help="Delete and recreate the ouput folder if it already exists. WARNING: be careful, there will be no confirmation!", action="store_true", ) parser.add_argument( "--jobs", help="The number of parallel jobs to start. Default to the number of cpu.", default=cpu_count(), type=int, ) parser.add_argument( "--cache_size", help="Cache size in MB. Default to available memory / 10.", default=int(TOTAL_MEMORY_MB / 10), type=int, ) parser.add_argument( "--srs_out", help="SRS to convert the output with (numeric part of the EPSG code)", type=str, ) parser.add_argument( "--srs_in", help="Override input SRS (numeric part of the EPSG code)", type=str ) parser.add_argument( "--fraction", help="Percentage of the pointcloud to process.", default=100, type=int, ) parser.add_argument( "--benchmark", help="Print summary at the end of the process", type=str ) parser.add_argument( "--no-rgb", help="Don't export rgb attributes", action="store_true" ) parser.add_argument( "--classification", help="Export classification attributes", action="store_true" ) parser.add_argument( "--graph", help="Produce debug graphes (requires pygal)", action="store_true" ) parser.add_argument("--color_scale", help="Force color scale", type=float) parser.add_argument( "--force-srs-in", help="Force the input srs even if the srs in the input files are different. CAUTION, use only if you know what you are doing.", action="store_true", ) return parser
[docs] def main(args: argparse.Namespace) -> None: try: return convert( args.files, outfolder=args.out, overwrite=args.overwrite, jobs=args.jobs, cache_size=args.cache_size, crs_out=str_to_CRS(args.srs_out), crs_in=str_to_CRS(args.srs_in), force_crs_in=args.force_srs_in, fraction=args.fraction, benchmark=args.benchmark, rgb=not args.no_rgb, classification=args.classification, graph=args.graph, color_scale=args.color_scale, verbose=args.verbose, ) except SrsInMissingException: print( "No SRS information in input files, you should specify it with --srs_in", file=sys.stderr, ) sys.exit(EXIT_CODES.MISSING_SRS_IN_FILE.value) except FormatSupportMissingException as e: print(e, file=sys.stderr) sys.exit(EXIT_CODES.UNSPECIFIED_ERROR.value)