Source code for py3dtiles.convert

import argparse
import os
import pickle
import shutil
import sys
import tempfile
import time
import traceback
from multiprocessing import Process
from pathlib import Path
from time import sleep
from typing import Any, Optional, Union

import zmq
from pyproj import CRS

from py3dtiles.constants import CPU_COUNT, DEFAULT_CACHE_SIZE, EXIT_CODES
from py3dtiles.exceptions import (
    Py3dtilesException,
    SrsInMissingException,
    TilerException,
    TilerNotFoundException,
    WorkerException,
)
from py3dtiles.merger import create_tileset_from_root_tiles
from py3dtiles.tilers.base_tiler import Tiler
from py3dtiles.tilers.base_tiler.message_type import ManagerMessage, WorkerMessageType
from py3dtiles.tilers.base_tiler.tiler_worker import TilerWorker
from py3dtiles.tilers.point.point_tiler import PointTiler
from py3dtiles.utils import mkdir_or_raise, str_to_CRS

# IPC protocol is not supported on Windows
if os.name == "nt":
    URI = "tcp://127.0.0.1:0"
else:
    # Generate a unique name for this socket
    tmpdir = tempfile.TemporaryDirectory()
    URI = f"ipc://{tmpdir.name}/py3dtiles.sock"


META_TILER_NAME = b"meta"


def _worker_target(
    worker_tilers: dict[str, TilerWorker[Any]],
    verbosity: int,
    uri: bytes,
) -> None:
    return _WorkerDispatcher(
        worker_tilers,
        verbosity,
        uri,
    ).run()


class _WorkerDispatcher:
    """
    This class waits from jobs commands from the Zmq socket.
    """

    skt: zmq.Socket[bytes]

    def __init__(
        self,
        worker_tilers: dict[str, TilerWorker[Any]],
        verbosity: int,
        uri: bytes,
    ) -> None:
        self.worker_tilers = worker_tilers
        self.verbosity = verbosity
        self.uri = uri

        # Socket to receive messages on
        self.context = zmq.Context()

    def run(self) -> None:
        self.skt = self.context.socket(zmq.DEALER)

        self.skt.connect(self.uri)  # type: ignore [arg-type]

        startup_time = time.time()
        idle_time = 0.0

        # notify we're ready
        self.skt.send_multipart([WorkerMessageType.REGISTER.value])

        while True:
            try:
                before = time.time() - startup_time
                self.skt.poll()
                after = time.time() - startup_time

                idle_time += after - before

                message = self.skt.recv_multipart()
                tiler_name = message[1].decode()
                command = message[2]
                content = message[3:]

                delta = time.time() - pickle.loads(message[0])
                if delta > 0.01 and self.verbosity >= 1:
                    print(
                        f"{os.getpid()} / {round(after, 2)} : Delta time: {round(delta, 3)}"
                    )

                if command == ManagerMessage.SHUTDOWN.value:
                    break  # ack
                else:
                    for answer in self.worker_tilers[tiler_name].execute(
                        command, content
                    ):
                        self.skt.send_multipart(answer, copy=False)

                # notify we're idle
                self.skt.send_multipart([WorkerMessageType.IDLE.value])
            except Exception as e:
                traceback.print_exc()
                error_message = f"{e.__class__.__module__}.{e.__class__.__name__}: {e}"
                self.skt.send_multipart(
                    [WorkerMessageType.ERROR.value, error_message.encode()]
                )
                # we still print it for stacktraces

        if self.verbosity >= 1:
            print(
                "total: {} sec, idle: {}".format(
                    round(time.time() - startup_time, 1), round(idle_time, 1)
                )
            )

        self.skt.send_multipart([WorkerMessageType.HALTED.value])


# Manager
class _ZmqManager:
    """
    This class sends messages to the workers.
    We can also request general status.
    """

    def __init__(
        self,
        number_of_jobs: int,
        worker_tilers: dict[str, TilerWorker[Any]],
        verbosity: int,
    ) -> None:
        """
        For the process_args argument, see the init method of Worker
        to get the list of needed parameters.
        """
        self.context = zmq.Context()

        self.number_of_jobs = number_of_jobs

        self.socket = self.context.socket(zmq.ROUTER)
        self.socket.bind(URI)
        # Useful only when TCP is used to get the URI with the opened port
        self.uri = self.socket.getsockopt(zmq.LAST_ENDPOINT)
        if not isinstance(self.uri, bytes):
            raise RuntimeError(
                "The uri returned by self.socket.getsockopt should be bytes."
            )

        self.processes = [
            Process(
                target=_worker_target,
                args=(worker_tilers, verbosity, self.uri),
            )
            for _ in range(number_of_jobs)
        ]
        for p in self.processes:
            p.start()

        self.activities = [p.pid for p in self.processes]
        self.clients: set[bytes] = set()
        self.idle_clients: set[bytes] = set()

        self.killing_processes = False
        self.number_processes_killed = 0
        self.time_waiting_an_idle_process = 0.0

    def all_clients_registered(self) -> bool:
        return len(self.clients) == self.number_of_jobs

    def send_to_process(self, message: list[bytes]) -> None:
        if not self.idle_clients:
            raise ValueError("idle_clients is empty")
        self.socket.send_multipart(
            [self.idle_clients.pop(), pickle.dumps(time.time())] + message
        )

    def send_to_all_processes(self, message: list[bytes]) -> None:
        if len(self.clients) == 0:
            raise ValueError("No registered clients")
        for client in self.clients:
            self.socket.send_multipart([client, pickle.dumps(time.time())] + message)

    def send_to_all_idle_processes(self, message: list[bytes]) -> None:
        if not self.idle_clients:
            raise ValueError("idle_clients is empty")
        for client in self.idle_clients:
            self.socket.send_multipart([client, pickle.dumps(time.time())] + message)
        self.idle_clients.clear()

    def can_queue_more_jobs(self) -> bool:
        return len(self.idle_clients) != 0

    def register_client(self, client_id: bytes) -> None:
        if client_id in self.clients:
            print(f"Warning: {client_id!r} already registered")
        else:
            self.clients.add(client_id)
        self.add_idle_client(client_id)

    def add_idle_client(self, client_id: bytes) -> None:
        if client_id in self.idle_clients:
            raise ValueError(f"The client id {client_id!r} is already in idle_clients")
        self.idle_clients.add(client_id)

    def are_all_processes_idle(self) -> bool:
        return len(self.idle_clients) == self.number_of_jobs

    def are_all_processes_killed(self) -> bool:
        return self.number_processes_killed == self.number_of_jobs

    def shutdown_all_processes(self) -> None:
        self.send_to_all_processes([META_TILER_NAME, ManagerMessage.SHUTDOWN.value])
        self.killing_processes = True

    def join_all_processes(self) -> None:
        for p in self.processes:
            p.join()


[docs] def convert( files: Union[list[Union[str, Path]], str, Path], outfolder: Union[str, Path] = "./3dtiles", overwrite: bool = False, jobs: int = CPU_COUNT, cache_size: int = DEFAULT_CACHE_SIZE, crs_out: Optional[CRS] = None, crs_in: Optional[CRS] = None, force_crs_in: bool = False, pyproj_always_xy: bool = False, benchmark: Optional[str] = None, rgb: bool = True, extra_fields: Optional[list[str]] = None, color_scale: Optional[float] = None, use_process_pool: bool = True, verbose: int = False, ) -> None: """ Convert the input files into 3dtiles. :param files: Filenames to process. The file must use the .las, .laz, .xyz or .ply format. :param outfolder: The folder where the resulting tileset will be written. :param overwrite: Overwrite the ouput folder if it already exists. :param jobs: The number of parallel jobs to start. Default to the number of cpu. :param cache_size: Cache size in MB. Default to available memory / 10. :param crs_out: CRS to convert the output with :param crs_in: Set a default input CRS :param force_crs_in: Force every input CRS to be `crs_in`, even if not null :param pyproj_always_xy: When converting from a CRS to another, pass the `always_xy` flag to pyproj. This is useful if your data is in a CRS whose definition specifies an axis order other than easting/northing, but your data still have the easting component in the first field (often named X or longitude). See https://pyproj4.github.io/pyproj/stable/gotchas.html#axis-order-changes-in-proj-6 for more information. :param benchmark: Print summary at the end of the process :param rgb: Export rgb attributes. :param extra_fields: Extra fields names to include in this conversion. These field names should be present in each input files. Currently vlrs and evlrs are not supported for las files. :param color_scale: Scale the color with the specified amount. Useful to lighten or darken black pointclouds with only intensity. :raises SrsInMissingException: if py3dtiles couldn't find srs informations in input files and srs_in is not specified :raises SrsInMixinException: if the input files have different CRS """ files = [files] if isinstance(files, (str, Path)) else files paths = [Path(file) for file in files] tilers: list[Tiler[Any, Any]] = [ PointTiler( crs_in, crs_out, force_crs_in, pyproj_always_xy, rgb, color_scale, cache_size, verbose, jobs, extra_fields=extra_fields, ) ] converter = Converter( tilers, overwrite=overwrite, jobs=jobs, cache_size=cache_size, crs_out=crs_out, crs_in=crs_in, force_crs_in=force_crs_in, pyproj_always_xy=pyproj_always_xy, benchmark=benchmark, use_process_pool=use_process_pool, verbose=verbose, ) try: return converter.convert(paths, Path(outfolder), overwrite=overwrite) except TilerNotFoundException: print("ERROR: support not found for files", files) print( "Please check https://py3dtiles.org/v9.0.0/install.html#file-formats-support" ) sys.exit(1)
[docs] class Converter: """ The Converter class allows for fine-grained conversion process and custom Tilers. It is built with a list of tilers instead of files. Each tiler is responsible to generate a hierarchy of tiles. The process will then build a tileset that will regroup all the tilesets generated by individual tilers. :param jobs: The number of parallel jobs to start. Default to the number of cpu. :param cache_size: Cache size in MB. Default to available memory / 10. :param crs_out: CRS to convert the output with :param crs_in: Set a default input CRS :param force_crs_in: Force every input CRS to be `crs_in`, even if not null :param pyproj_always_xy: When converting from a CRS to another, pass the `always_xy` flag to pyproj. This is useful if your data is in a CRS whose definition specifies an axis order other than easting/northing, but your data still have the easting component in the first field (often named X or longitude). See https://pyproj4.github.io/pyproj/stable/gotchas.html#axis-order-changes-in-proj-6 for more information. :param benchmark: Print summary at the end of the process """ def __init__( self, tilers: list[Tiler[Any, Any]], overwrite: bool = False, jobs: int = CPU_COUNT, cache_size: int = DEFAULT_CACHE_SIZE, crs_out: Optional[CRS] = None, crs_in: Optional[CRS] = None, force_crs_in: bool = False, pyproj_always_xy: bool = False, benchmark: Optional[str] = None, use_process_pool: bool = True, verbose: int = False, ) -> None: # create folder self.tilers = tilers self.jobs = jobs self.verbose = verbose self.benchmark = benchmark self.use_process_pool = use_process_pool def _assign_file_to_tilers(self, files: list[Path]) -> dict[str, list[Path]]: files_by_tiler_names: dict[str, list[Path]] = {} tiler_not_found_files: list[Path] = [] for file in files: for tiler in self.tilers: if tiler.supports(file): if tiler.name not in files_by_tiler_names: files_by_tiler_names[tiler.name] = [] files_by_tiler_names[tiler.name].append(file) break else: tiler_not_found_files.append(file) if len(tiler_not_found_files) > 0: raise TilerNotFoundException(tiler_not_found_files) return files_by_tiler_names
[docs] def convert( self, files: Union[Path, list[Path]], out_folder: Path, overwrite: bool = False, ) -> None: """ Convert some files. :param files: Filenames to process. The file must use the .las, .laz, .xyz or .ply format. :param outfolder: The folder where the resulting tileset will be written. :param overwrite: Overwrite the ouput folder if it already exists. :raises SrsInMissingException: if py3dtiles couldn't find srs informations in input files and srs_in is not specified :raises SrsInMixinException: if the input files have different CRS """ mkdir_or_raise(out_folder, overwrite=overwrite) working_dir = out_folder / "tmp" working_dir.mkdir(parents=True) paths = [files] if isinstance(files, Path) else files paths_by_tiler_name = self._assign_file_to_tilers(paths) worker_tilers: dict[str, TilerWorker[Any]] = {} for tiler in self.tilers: # check if at least one file would use that tiler if tiler.name not in paths_by_tiler_name: continue if tiler.name in worker_tilers: raise TilerException("There are tilers with the same attribute name.") try: tiler_out_folder = Path(out_folder) / tiler.name tiler_out_folder.mkdir(exist_ok=True) tiler.initialize( paths_by_tiler_name[tiler.name], working_dir / str(tiler.name), tiler_out_folder, ) except Py3dtilesException as e: shutil.rmtree(out_folder) raise e worker_tilers[tiler.name] = tiler.get_worker() if self.verbose >= 1: for tiler in self.tilers: if tiler.name not in paths_by_tiler_name: continue tiler.print_summary() self.zmq_manager = _ZmqManager( self.jobs, worker_tilers, self.verbose, ) startup: float = time.time() try: root_tiles = [] for tiler in self.tilers: if tiler.name not in paths_by_tiler_name: continue while True: if ( not self.zmq_manager.can_queue_more_jobs() or self.zmq_manager.socket.poll(timeout=0, flags=zmq.POLLIN) ): self._process_message(tiler) # we wait for all processes/threads to register # if we don't there are tricky cases where an exception fires in a worker before all the workers registered, which means that not all workers will receive the shutdown signal if not self.zmq_manager.all_clients_registered(): sleep(0.1) continue if self.zmq_manager.can_queue_more_jobs(): for command, data in tiler.get_tasks(): self.zmq_manager.send_to_process( [tiler.name.encode("UTF-8"), command] + data ) if not self.zmq_manager.can_queue_more_jobs(): break # if at this point we have no work in progress => we're done if self.zmq_manager.are_all_processes_idle(): break tiler.memory_control() tiler.validate() if self.verbose >= 1: print("Writing 3dtiles") root_tiles.append( tiler.get_root_tile(use_process_pool=self.use_process_pool) ) if self.verbose >= 1: print(f"Tiler {tiler.name!r} done") if self.benchmark: tiler.benchmark(self.benchmark, startup) if self.verbose >= 1: print("Merging tilesets") for tile in root_tiles: # we need to make sure the contents are loaded for the merger if tile.has_content(): tile.get_or_fetch_content(out_folder) tileset = create_tileset_from_root_tiles(root_tiles) if tileset.root_tile.has_content(): tileset.root_tile.write_content(out_folder) tileset.write_as_json(out_folder / "tileset.json") finally: self.zmq_manager.shutdown_all_processes() self.zmq_manager.join_all_processes() shutil.rmtree(working_dir, ignore_errors=True) if self.verbose >= 1: print( "destroy", round(self.zmq_manager.time_waiting_an_idle_process, 2) ) self.zmq_manager.context.destroy()
def _process_message(self, tiler: Tiler[Any, Any]) -> None: # Blocking read but it's fine because either all our child processes are busy # or we know that there's something to read (zmq.POLLIN) start = time.time() message = self.zmq_manager.socket.recv_multipart() client_id = message[0] message_type = message[1] content = message[2:] if message_type == WorkerMessageType.REGISTER.value: self.zmq_manager.register_client(client_id) elif message_type == WorkerMessageType.IDLE.value: self.zmq_manager.add_idle_client(client_id) if not self.zmq_manager.can_queue_more_jobs(): self.zmq_manager.time_waiting_an_idle_process += time.time() - start elif message_type == WorkerMessageType.HALTED.value: self.zmq_manager.number_processes_killed += 1 elif message_type == WorkerMessageType.ERROR.value: raise WorkerException( f"An exception occurred in a worker: {content[0].decode()}" ) else: tiler.process_message(message_type, content)
def _init_parser( subparser: "argparse._SubParsersAction[Any]", ) -> argparse.ArgumentParser: parser: argparse.ArgumentParser = subparser.add_parser( "convert", help="Convert input 3D data to a 3dtiles tileset.", formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) parser.add_argument( "files", nargs="+", help="Filenames to process. The file must use the .las, .laz (lastools must be installed), .xyz or .ply format.", ) parser.add_argument( "--out", type=str, help="The folder where the resulting tileset will be written.", default="./3dtiles", ) parser.add_argument( "--overwrite", help="Delete and recreate the ouput folder if it already exists. WARNING: be careful, there will be no confirmation!", action="store_true", ) parser.add_argument( "--jobs", help="The number of parallel jobs to start. Default to the number of cpu.", default=CPU_COUNT, type=int, ) parser.add_argument( "--cache_size", help="Cache size in MB. Default to available memory / 10.", default=DEFAULT_CACHE_SIZE, type=int, ) parser.add_argument( "--srs_out", help="SRS to convert the output with (numeric part of the EPSG code)", type=str, ) parser.add_argument( "--srs_in", help="Override input SRS (numeric part of the EPSG code)", type=str ) parser.add_argument( "--benchmark", help="Print summary at the end of the process", type=str ) parser.add_argument( "--no-rgb", help="Don't export rgb attributes", action="store_true" ) parser.add_argument( "--extra-fields", help="Extra field names present in source data to include in resulting tileset. All input files *must* have this fields, with the same data type.", action="append", ) parser.add_argument("--color_scale", help="Force color scale", type=float) parser.add_argument( "--force-srs-in", help="Force the input srs even if the srs in the input files are different. CAUTION, use only if you know what you are doing.", action="store_true", ) parser.add_argument( "--disable-processpool", help="Disables using a process pool when writing 3D tiles. Useful for running in environments lacking shared memory.", action="store_true", ) parser.add_argument( "--pyproj-always-xy", help="When converting from a CRS to another, pass the `always_xy` flag to pyproj. This is useful if your data is in a CRS whose definition specifies an axis order other than easting/northing, but your data still have the easting component in the first field (often named X or longitude). See https://pyproj4.github.io/pyproj/stable/gotchas.html#axis-order-changes-in-proj-6 for more information. ", action="store_true", ) return parser def _main(args: argparse.Namespace) -> None: try: return convert( args.files, args.out, overwrite=args.overwrite, jobs=args.jobs, cache_size=args.cache_size, crs_out=str_to_CRS(args.srs_out), crs_in=str_to_CRS(args.srs_in), force_crs_in=args.force_srs_in, pyproj_always_xy=args.pyproj_always_xy, benchmark=args.benchmark, rgb=not args.no_rgb, extra_fields=[] if args.extra_fields is None else args.extra_fields, color_scale=args.color_scale, use_process_pool=not args.disable_processpool, verbose=args.verbose, ) except SrsInMissingException: print( "No SRS information in input files, you should specify it with --srs_in", file=sys.stderr, ) sys.exit(EXIT_CODES.MISSING_SRS_IN_FILE.value)