Source code for py3dtiles.tilers.point.point_state

from typing import Dict, List, Tuple

from py3dtiles.typing import PortionsType


[docs] class PointState: def __init__( self, pointcloud_file_portions: PortionsType, max_reading_jobs: int ) -> None: self.processed_points = 0 self.max_point_in_progress = 60_000_000 self.points_in_progress = 0 self.points_in_pnts = 0 # pointcloud_file_portions is a list of tuple (filename, (start offset, end offset)) self.point_cloud_file_parts = pointcloud_file_portions self.initial_portion_count = len(pointcloud_file_portions) self.max_reading_jobs = max_reading_jobs self.number_of_reading_jobs = 0 self.number_of_writing_jobs = 0 # node_to_process is a dictionary of tasks, # each entry is a tile identified by its name (a string of numbers) # so for each entry, it is a list of tasks # a task is a tuple (list of points, point_count) # points is a dictionary {xyz: list of coordinates, color: the associated color} self.node_to_process: Dict[bytes, Tuple[List[bytes], int]] = {} # when a node is sent to a process, the item moves to processing_nodes # the structure is different. The key remains the node name. But the value is : (len(tasks), point_count, now) # these values is for logging self.processing_nodes: Dict[bytes, Tuple[int, int, float]] = {} # when processing is finished, move the tile name in processed_nodes # since the content is at this stage, stored in the node_store, # just keep the name of the node. # This list will be filled until the writing could be started. self.waiting_writing_nodes: List[bytes] = [] # when the node is writing, its name is moved from waiting_writing_nodes to pnts_to_writing # the data to write are stored in a node object. self.pnts_to_writing: List[bytes] = []
[docs] def is_reading_finish(self) -> bool: return not self.point_cloud_file_parts and self.number_of_reading_jobs == 0
[docs] def add_tasks_to_process( self, node_name: bytes, data: bytes, point_count: int ) -> None: if point_count <= 0: raise ValueError( "point_count should be strictly positive, currently", point_count ) if node_name not in self.node_to_process: self.node_to_process[node_name] = ([data], point_count) else: tasks, count = self.node_to_process[node_name] tasks.append(data) self.node_to_process[node_name] = (tasks, count + point_count)
[docs] def can_add_reading_jobs(self) -> bool: return bool( self.point_cloud_file_parts and self.points_in_progress < self.max_point_in_progress and self.number_of_reading_jobs < self.max_reading_jobs )
[docs] def print_debug(self) -> None: print("{:^16}|{:^8}|{:^8}|{:^8}".format("Step", "Input", "Active", "Inactive")) print( "{:^16}|{:^8}|{:^8}|{:^8}".format( "Reader", len(self.point_cloud_file_parts), self.number_of_reading_jobs, "", ) ) print( "{:^16}|{:^8}|{:^8}|{:^8}".format( "Node process", len(self.node_to_process), len(self.processing_nodes), len(self.waiting_writing_nodes), ) ) print( "{:^16}|{:^8}|{:^8}|{:^8}".format( "Pnts writer", len(self.pnts_to_writing), self.number_of_writing_jobs, "", ) )