Source code for py3dtiles.tilers.point.point_tiler_worker

import os
import pickle
import struct
import time
from pathlib import PurePath
from typing import TYPE_CHECKING

import lz4.frame as gzip
import zmq

from py3dtiles.tilers.base_tiler import TilerWorker
from py3dtiles.tilers.point.node import DummyNode
from py3dtiles.utils import READER_MAP

from .node import NodeCatalog, NodeProcess
from .pnts import pnts_writer
from .point_message_type import PointManagerMessage, PointWorkerMessageType
from .point_shared_metadata import PointSharedMetadata

if TYPE_CHECKING:
    from py3dtiles.tilers.point.node.node import DummyNodeDictType


[docs] class PointTilerWorker(TilerWorker[PointSharedMetadata]):
[docs] def execute( self, skt: zmq.Socket[bytes], command: bytes, content: list[bytes] ) -> None: if command == PointManagerMessage.READ_FILE.value: self.execute_read_file(skt, content) elif command == PointManagerMessage.PROCESS_JOBS.value: self.execute_process_jobs(skt, content) elif command == PointManagerMessage.WRITE_PNTS.value: self.execute_write_pnts(skt, content[1], content[0]) else: raise NotImplementedError(f"Unknown command {command!r}")
[docs] def execute_read_file(self, skt: zmq.Socket[bytes], content: list[bytes]) -> None: parameters = pickle.loads(content[0]) extension = PurePath(parameters["filename"]).suffix.lower() if extension in READER_MAP: reader = READER_MAP[extension] else: raise ValueError( f"The file with {extension} extension can't be read, " f"the available extensions are: {READER_MAP.keys()}" ) reader_gen = reader.run( parameters["filename"], parameters["offset_scale"], parameters["portion"], self.shared_metadata.transformer, self.shared_metadata.color_scale, self.shared_metadata.write_rgb, self.shared_metadata.extra_fields_to_include, ) for coords, colors, extra_fields in reader_gen: skt.send_multipart( [ PointWorkerMessageType.NEW_TASK.value, b"", pickle.dumps( {"xyz": coords, "rgb": colors, "extra_fields": extra_fields} ), struct.pack(">I", len(coords)), ], copy=False, ) skt.send_multipart([PointWorkerMessageType.READ.value])
[docs] def execute_write_pnts( self, skt: zmq.Socket[bytes], content: bytes, node_name: bytes ) -> None: # we can safely write the .pnts file if len(content) > 0: root = pickle.loads(gzip.decompress(content)) total = 0 for name in root: node_data: DummyNodeDictType = pickle.loads(root[name]) node = DummyNode(node_data) total += pnts_writer.node_to_pnts( name, node, self.shared_metadata.out_folder ) skt.send_multipart( [ PointWorkerMessageType.PNTS_WRITTEN.value, struct.pack(">I", total), node_name, ] )
[docs] def execute_process_jobs( self, skt: zmq.Socket[bytes], content: list[bytes] ) -> None: begin = time.time() log_enabled = self.shared_metadata.verbosity >= 2 if log_enabled: log_filename = f"py3dtiles-{os.getpid()}.log" log_file = open(log_filename, "a") else: log_file = None i = 0 while i < len(content): name = content[i] node = content[i + 1] count = struct.unpack(">I", content[i + 2])[0] tasks = content[i + 3 : i + 3 + count] i += 3 + count node_catalog = NodeCatalog( node, name, self.shared_metadata.root_aabb, self.shared_metadata.root_spacing, self.shared_metadata.write_rgb, self.shared_metadata.extra_fields_to_include, ) node_process = NodeProcess( node_catalog, self.shared_metadata.scale[0], name, tasks, begin, log_file, ) for proc_name, proc_data, proc_point_count in node_process.run(): skt.send_multipart( [ PointWorkerMessageType.NEW_TASK.value, proc_name, proc_data, struct.pack(">I", proc_point_count), ], copy=False, block=False, ) if log_enabled: print(f"save on disk {name!r} [{time.time() - begin}]", file=log_file) # save node state on disk if len(name) > 0: data = node_catalog.dump(name, node_process.infer_depth_from_name() - 1) else: data = b"" if log_enabled: print(f"saved on disk [{time.time() - begin}]", file=log_file) skt.send_multipart( [ PointWorkerMessageType.PROCESSED.value, pickle.dumps( { "name": name, "total": node_process.total_point_count, "data": data, } ), ], copy=False, ) if log_enabled: print( "[<] return result [{} sec] [{}]".format( round(time.time() - begin, 2), time.time() - begin ), file=log_file, flush=True, ) if log_file is not None: log_file.close()