Source code for py3dtiles.convert

import argparse
import os
import pickle
import shutil
import sys
import tempfile
import time
import traceback
from multiprocessing import Process, cpu_count
from pathlib import Path
from time import sleep
from typing import Any, Dict, List, Optional, Set, Union

import psutil
import zmq
from pyproj import CRS

from py3dtiles.constants import EXIT_CODES
from py3dtiles.exceptions import (
    Py3dtilesException,
    SrsInMissingException,
    TilerException,
    WorkerException,
)
from py3dtiles.tilers.base_tiler import Tiler
from py3dtiles.tilers.base_tiler.message_type import ManagerMessage, WorkerMessageType
from py3dtiles.tilers.base_tiler.tiler_worker import TilerWorker
from py3dtiles.tilers.point.point_tiler import PointTiler
from py3dtiles.utils import mkdir_or_raise, str_to_CRS

TOTAL_MEMORY_MB = int(psutil.virtual_memory().total / (1024 * 1024))
DEFAULT_CACHE_SIZE = int(TOTAL_MEMORY_MB / 10)
CPU_COUNT = cpu_count()

# IPC protocol is not supported on Windows
if os.name == "nt":
    URI = "tcp://127.0.0.1:0"
else:
    # Generate a unique name for this socket
    tmpdir = tempfile.TemporaryDirectory()
    URI = f"ipc://{tmpdir.name}/py3dtiles.sock"


META_TILER_NAME = b"meta"


class _WorkerDispatcher(Process):
    """
    This class waits from jobs commands from the Zmq socket.
    """

    skt: zmq.Socket[bytes]

    def __init__(
        self,
        worker_tilers: Dict[bytes, TilerWorker[Any]],
        verbosity: int,
        uri: bytes,
    ) -> None:
        super().__init__()
        self.worker_tilers = worker_tilers
        self.verbosity = verbosity
        self.uri = uri

        # Socket to receive messages on
        self.context = zmq.Context()

    def run(self) -> None:
        self.skt = self.context.socket(zmq.DEALER)

        self.skt.connect(self.uri)  # type: ignore [arg-type]

        startup_time = time.time()
        idle_time = 0.0

        # notify we're ready
        self.skt.send_multipart([WorkerMessageType.REGISTER.value])

        while True:
            try:
                before = time.time() - startup_time
                self.skt.poll()
                after = time.time() - startup_time

                idle_time += after - before

                message = self.skt.recv_multipart()
                tiler_name = message[1]
                command = message[2]
                content = message[3:]

                delta = time.time() - pickle.loads(message[0])
                if delta > 0.01 and self.verbosity >= 1:
                    print(
                        f"{os.getpid()} / {round(after, 2)} : Delta time: {round(delta, 3)}"
                    )

                if command == ManagerMessage.SHUTDOWN.value:
                    break  # ack
                else:
                    self.worker_tilers[tiler_name].execute(self.skt, command, content)

                # notify we're idle
                self.skt.send_multipart([WorkerMessageType.IDLE.value])
            except Exception as e:
                traceback.print_exc()
                error_message = f"{e.__class__.__module__}.{e.__class__.__name__}: {e}"
                self.skt.send_multipart(
                    [WorkerMessageType.ERROR.value, error_message.encode()]
                )
                # we still print it for stacktraces

        if self.verbosity >= 1:
            print(
                "total: {} sec, idle: {}".format(
                    round(time.time() - startup_time, 1), round(idle_time, 1)
                )
            )

        self.skt.send_multipart([WorkerMessageType.HALTED.value])


# Manager
class _ZmqManager:
    """
    This class sends messages to the workers.
    We can also request general status.
    """

    def __init__(
        self,
        number_of_jobs: int,
        worker_tilers: Dict[bytes, TilerWorker[Any]],
        verbosity: int,
    ) -> None:
        """
        For the process_args argument, see the init method of Worker
        to get the list of needed parameters.
        """
        self.context = zmq.Context()

        self.number_of_jobs = number_of_jobs

        self.socket = self.context.socket(zmq.ROUTER)
        self.socket.bind(URI)
        # Useful only when TCP is used to get the URI with the opened port
        self.uri = self.socket.getsockopt(zmq.LAST_ENDPOINT)
        if not isinstance(self.uri, bytes):
            raise RuntimeError(
                "The uri returned by self.socket.getsockopt should be bytes."
            )

        self.processes = [
            _WorkerDispatcher(worker_tilers, verbosity, self.uri)
            for _ in range(number_of_jobs)
        ]
        for p in self.processes:
            p.start()

        self.activities = [p.pid for p in self.processes]
        self.clients: Set[bytes] = set()
        self.idle_clients: Set[bytes] = set()

        self.killing_processes = False
        self.number_processes_killed = 0
        self.time_waiting_an_idle_process = 0.0

    def all_clients_registered(self) -> bool:
        return len(self.clients) == self.number_of_jobs

    def send_to_process(self, message: List[bytes]) -> None:
        if not self.idle_clients:
            raise ValueError("idle_clients is empty")
        self.socket.send_multipart(
            [self.idle_clients.pop(), pickle.dumps(time.time())] + message
        )

    def send_to_all_processes(self, message: List[bytes]) -> None:
        if len(self.clients) == 0:
            raise ValueError("No registered clients")
        for client in self.clients:
            self.socket.send_multipart([client, pickle.dumps(time.time())] + message)

    def send_to_all_idle_processes(self, message: List[bytes]) -> None:
        if not self.idle_clients:
            raise ValueError("idle_clients is empty")
        for client in self.idle_clients:
            self.socket.send_multipart([client, pickle.dumps(time.time())] + message)
        self.idle_clients.clear()

    def can_queue_more_jobs(self) -> bool:
        return len(self.idle_clients) != 0

    def register_client(self, client_id: bytes) -> None:
        if client_id in self.clients:
            print(f"Warning: {client_id!r} already registered")
        else:
            self.clients.add(client_id)
        self.add_idle_client(client_id)

    def add_idle_client(self, client_id: bytes) -> None:
        if client_id in self.idle_clients:
            raise ValueError(f"The client id {client_id!r} is already in idle_clients")
        self.idle_clients.add(client_id)

    def are_all_processes_idle(self) -> bool:
        return len(self.idle_clients) == self.number_of_jobs

    def are_all_processes_killed(self) -> bool:
        return self.number_processes_killed == self.number_of_jobs

    def shutdown_all_processes(self) -> None:
        self.send_to_all_processes([META_TILER_NAME, ManagerMessage.SHUTDOWN.value])
        self.killing_processes = True

    def join_all_processes(self) -> None:
        for p in self.processes:
            p.join()


[docs] def convert( files: Union[List[Union[str, Path]], str, Path], outfolder: Union[str, Path] = "./3dtiles", overwrite: bool = False, jobs: int = CPU_COUNT, cache_size: int = DEFAULT_CACHE_SIZE, crs_out: Optional[CRS] = None, crs_in: Optional[CRS] = None, force_crs_in: bool = False, benchmark: Optional[str] = None, rgb: bool = True, classification: bool = True, intensity: bool = True, color_scale: Optional[float] = None, verbose: int = False, ) -> None: """ Convert the input dataset into 3dtiles. For the argument list and their effects, please see :py:class:`.Converter`. :param files: Filenames to process. The file must use the .las, .laz, .xyz or .ply format. :param outfolder: The folder where the resulting tileset will be written. :param overwrite: Overwrite the ouput folder if it already exists. :param jobs: The number of parallel jobs to start. Default to the number of cpu. :param cache_size: Cache size in MB. Default to available memory / 10. :param crs_out: CRS to convert the output with :param crs_in: Set a default input CRS :param force_crs_in: Force every input CRS to be `crs_in`, even if not null :param benchmark: Print summary at the end of the process :param rgb: Export rgb attributes. :param classification: Export classification attribute. :param intensity: Export intensity attributes. This support is currently limited to unsigned 8 bits integer for ply files, and to integers for xyz files. :param color_scale: Scale the color with the specified amount. Useful to lighten or darken black pointclouds with only intensity. :raises SrsInMissingException: if py3dtiles couldn't find srs informations in input files and srs_in is not specified :raises SrsInMixinException: if the input files have different CRS """ converter = _Convert( files, outfolder=outfolder, overwrite=overwrite, jobs=jobs, cache_size=cache_size, crs_out=crs_out, crs_in=crs_in, force_crs_in=force_crs_in, benchmark=benchmark, rgb=rgb, classification=classification, intensity=intensity, color_scale=color_scale, verbose=verbose, ) return converter.convert()
class _Convert: def __init__( self, files: Union[List[Union[str, Path]], str, Path], outfolder: Union[str, Path] = "./3dtiles", overwrite: bool = False, jobs: int = CPU_COUNT, cache_size: int = DEFAULT_CACHE_SIZE, crs_out: Optional[CRS] = None, crs_in: Optional[CRS] = None, force_crs_in: bool = False, benchmark: Optional[str] = None, rgb: bool = True, classification: bool = True, intensity: bool = True, color_scale: Optional[float] = None, verbose: int = False, ) -> None: """ :param files: Filenames to process. The file must use the .las, .laz, .xyz or .ply format. :param outfolder: The folder where the resulting tileset will be written. :param overwrite: Overwrite the ouput folder if it already exists. :param jobs: The number of parallel jobs to start. Default to the number of cpu. :param cache_size: Cache size in MB. Default to available memory / 10. :param crs_out: CRS to convert the output with :param crs_in: Set a default input CRS :param force_crs_in: Force every input CRS to be `crs_in`, even if not null :param benchmark: Print summary at the end of the process :param rgb: Export rgb attributes. :param classification: Export classification attribute. :param intensity: Export intensity attribute. :param color_scale: Scale the color with the specified amount. Useful to lighten or darken black pointclouds with only intensity. :raises SrsInMissingException: if py3dtiles couldn't find srs informations in input files and srs_in is not specified :raises SrsInMixinException: if the input files have different CRS """ # create folder self.out_folder = Path(outfolder) mkdir_or_raise(self.out_folder, overwrite=overwrite) self.tilers = [ PointTiler( self.out_folder, files, crs_in, force_crs_in, rgb, classification, intensity, color_scale, cache_size, verbose, ) ] self.jobs = jobs self.verbose = verbose self.benchmark = benchmark self.working_dir = self.out_folder / "tmp" self.working_dir.mkdir(parents=True) worker_tilers: Dict[bytes, TilerWorker[Any]] = {} for tiler in self.tilers: if tiler.name in worker_tilers: raise TilerException("There are tilers with the same attribute name.") try: tiler.initialization( crs_out, self.working_dir / str(tiler.name), self.jobs ) except Py3dtilesException as e: shutil.rmtree(self.out_folder) raise e worker_tilers[tiler.name] = tiler.get_worker() if self.verbose >= 1: for tiler in self.tilers: tiler.print_summary() self.zmq_manager = _ZmqManager( self.jobs, worker_tilers, self.verbose, ) def convert(self) -> None: """convert Convert pointclouds (xyz, las or laz) to 3dtiles tileset containing pnts node """ startup: float = time.time() try: for tiler in self.tilers: while True: now = time.time() - startup at_least_one_job_ended = False if ( not self.zmq_manager.can_queue_more_jobs() or self.zmq_manager.socket.poll(timeout=0, flags=zmq.POLLIN) ): at_least_one_job_ended = self.process_message(tiler) # we wait for all processes/threads to register # if we don't there are tricky cases where an exception fires in a worker before all the workers registered, which means that not all workers will receive the shutdown signal if not self.zmq_manager.all_clients_registered(): sleep(0.1) continue if self.zmq_manager.can_queue_more_jobs(): for command, data in tiler.get_tasks(startup): self.zmq_manager.send_to_process( [PointTiler.name, command] + data ) if not self.zmq_manager.can_queue_more_jobs(): break # if at this point we have no work in progress => we're done if self.zmq_manager.are_all_processes_idle(): break if at_least_one_job_ended: tiler.print_debug( now, self.jobs, len(self.zmq_manager.idle_clients) ) tiler.memory_control() tiler.validate_binary_data() if self.verbose >= 1: print("Writing 3dtiles") tiler.write_tileset() shutil.rmtree(self.working_dir / str(tiler.name), ignore_errors=True) if self.verbose >= 1: print(f"Tiler {tiler.name!r} done") if self.benchmark: tiler.benchmark(self.benchmark, startup) finally: self.zmq_manager.shutdown_all_processes() self.zmq_manager.join_all_processes() if self.verbose >= 1: print( "destroy", round(self.zmq_manager.time_waiting_an_idle_process, 2) ) self.zmq_manager.context.destroy() def process_message(self, tiler: Tiler[Any, Any]) -> bool: at_least_one_job_ended = False # Blocking read but it's fine because either all our child processes are busy # or we know that there's something to read (zmq.POLLIN) start = time.time() message = self.zmq_manager.socket.recv_multipart() client_id = message[0] return_type = message[1] content = message[2:] if return_type == WorkerMessageType.REGISTER.value: self.zmq_manager.register_client(client_id) elif return_type == WorkerMessageType.IDLE.value: self.zmq_manager.add_idle_client(client_id) if not self.zmq_manager.can_queue_more_jobs(): self.zmq_manager.time_waiting_an_idle_process += time.time() - start elif return_type == WorkerMessageType.HALTED.value: self.zmq_manager.number_processes_killed += 1 elif return_type == WorkerMessageType.ERROR.value: raise WorkerException( f"An exception occurred in a worker: {content[0].decode()}" ) else: at_least_one_job_ended = tiler.process_message(return_type, content) return at_least_one_job_ended def _init_parser( subparser: "argparse._SubParsersAction[Any]", ) -> argparse.ArgumentParser: parser: argparse.ArgumentParser = subparser.add_parser( "convert", help="Convert input 3D data to a 3dtiles tileset.", formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) parser.add_argument( "files", nargs="+", help="Filenames to process. The file must use the .las, .laz (lastools must be installed), .xyz or .ply format.", ) parser.add_argument( "--out", type=str, help="The folder where the resulting tileset will be written.", default="./3dtiles", ) parser.add_argument( "--overwrite", help="Delete and recreate the ouput folder if it already exists. WARNING: be careful, there will be no confirmation!", action="store_true", ) parser.add_argument( "--jobs", help="The number of parallel jobs to start. Default to the number of cpu.", default=cpu_count(), type=int, ) parser.add_argument( "--cache_size", help="Cache size in MB. Default to available memory / 10.", default=int(TOTAL_MEMORY_MB / 10), type=int, ) parser.add_argument( "--srs_out", help="SRS to convert the output with (numeric part of the EPSG code)", type=str, ) parser.add_argument( "--srs_in", help="Override input SRS (numeric part of the EPSG code)", type=str ) parser.add_argument( "--benchmark", help="Print summary at the end of the process", type=str ) parser.add_argument( "--no-rgb", help="Don't export rgb attributes", action="store_true" ) parser.add_argument( "--classification", help="Export classification attributes", action="store_true" ) parser.add_argument( "--intensity", help="Export intensity attributes. This support is currently limited to unsigned 8 bits integer for ply files, and to integers for xyz files.", action="store_true", ) parser.add_argument("--color_scale", help="Force color scale", type=float) parser.add_argument( "--force-srs-in", help="Force the input srs even if the srs in the input files are different. CAUTION, use only if you know what you are doing.", action="store_true", ) return parser def _main(args: argparse.Namespace) -> None: try: return convert( args.files, outfolder=args.out, overwrite=args.overwrite, jobs=args.jobs, cache_size=args.cache_size, crs_out=str_to_CRS(args.srs_out), crs_in=str_to_CRS(args.srs_in), force_crs_in=args.force_srs_in, benchmark=args.benchmark, rgb=not args.no_rgb, classification=args.classification, intensity=args.intensity, color_scale=args.color_scale, verbose=args.verbose, ) except SrsInMissingException: print( "No SRS information in input files, you should specify it with --srs_in", file=sys.stderr, ) sys.exit(EXIT_CODES.MISSING_SRS_IN_FILE.value)