Source code for py3dtiles.tilers.point.node.shared_node_store

import gc
import time
from pathlib import Path
from sys import getsizeof
from typing import Dict, List, Optional, Tuple

import lz4.frame as gzip

from py3dtiles.exceptions import TilerException
from py3dtiles.utils import node_name_to_path


[docs] class SharedNodeStore: """ A class that implements a compressed storage for arbitrary data, that is able to limit the amount of ram it uses by temporary storing part of the storage on disk for later retrieval. The memory limit is not automatic at the moment, but instead is done by calling `self.control_memory_usage`. """ def __init__(self, folder: Path) -> None: """ Construct a SharedNodeStore. :param folder: where to store the piece of data that go over the limit """ self.metadata: Dict[bytes, Optional[Tuple[float, int]]] = {} self.data: List[Optional[bytes]] = [] self.folder = folder self.stats = { "hit": 0, "miss": 0, "new": 0, } self.memory_size = { "content": 0, "container": getsizeof(self.data) + getsizeof(self.metadata), }
[docs] def control_memory_usage(self, max_size_mb: int, verbose: int) -> None: """ Limit the memory usage of this instance. """ bytes_to_mb = 1.0 / (1024 * 1024) max_size_mb = max(max_size_mb, 200) if verbose >= 3: self.print_statistics() # guess cache size cache_size = ( self.memory_size["container"] + self.memory_size["content"] ) * bytes_to_mb before = cache_size if before < max_size_mb: return if verbose >= 2: print(f">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> CACHE CLEANING [{before}]") self.remove_oldest_nodes(1 - max_size_mb / before) gc.collect() if verbose >= 2: print("<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< CACHE CLEANING")
[docs] def get(self, name: bytes, stat_inc: int = 1) -> bytes: """ Get a node storage. :param name: the name of the node """ metadata = self.metadata.get(name, None) data = b"" if metadata is not None: tmp_data = self.data[metadata[1]] if tmp_data is None: raise TilerException( "tmp_data shouldn't be None if metadata is not None." ) else: data = tmp_data self.stats["hit"] += stat_inc else: node_path = node_name_to_path(self.folder, name) if node_path.exists(): self.stats["miss"] += stat_inc with node_path.open("rb") as f: data = f.read() else: self.stats["new"] += stat_inc #  should we cache this node? return data
[docs] def remove(self, name: bytes) -> None: """ Remove a node from this storage. """ meta = self.metadata.pop(name, None) node_path = node_name_to_path(self.folder, name) if meta is None: if not node_path.exists(): raise FileNotFoundError(f"{node_path} should exist") else: self.memory_size["content"] -= getsizeof(meta) content = self.data[meta[1]] if content is None: raise TilerException( f"{name!r} is present in self.metadata but not in self.data." ) self.memory_size["content"] -= len(content) self.memory_size["container"] = getsizeof(self.data) + getsizeof( self.metadata ) self.data[meta[1]] = None if node_path.exists(): node_path.unlink()
[docs] def put(self, name: bytes, data: bytes) -> None: """ Insert or change """ compressed_data = gzip.compress(data) metadata = self.metadata.get(name, None) if metadata is None: metadata = (time.time(), len(self.data)) self.data.append(compressed_data) else: metadata = (time.time(), metadata[1]) self.data[metadata[1]] = compressed_data self.metadata.update([(name, metadata)]) self.memory_size["content"] += len(compressed_data) + getsizeof( (name, metadata) ) self.memory_size["container"] = getsizeof(self.data) + getsizeof(self.metadata)
[docs] def remove_oldest_nodes(self, percent: float = 100) -> Tuple[int, int]: count = _remove_all(self) self.memory_size["content"] = 0 self.memory_size["container"] = getsizeof(self.data) + getsizeof(self.metadata) return count
[docs] def print_statistics(self) -> None: print( "Stats: Hits = {}, Miss = {}, New = {}".format( self.stats["hit"], self.stats["miss"], self.stats["new"] ) )
def _remove_all(store: SharedNodeStore) -> Tuple[int, int]: # delete the entries count = len(store.metadata) bytes_written = 0 for name, meta in store.metadata.items(): if meta is None: continue data = store.data[meta[1]] if data is None: raise TilerException( f"{name!r} is present in self.metadata but not in self.data." ) node_path = node_name_to_path(store.folder, name) with node_path.open("wb") as f: bytes_written += f.write(data) store.metadata = {} store.data = [] return count, bytes_written