Source code for py3dtiles.tilers.node.shared_node_store

import gc
from pathlib import Path
from sys import getsizeof
import time
from typing import Dict, List, Tuple

import lz4.frame as gzip

from py3dtiles.utils import node_name_to_path


[docs] class SharedNodeStore: def __init__(self, folder: Path) -> None: self.metadata: Dict[bytes, Tuple[float, int] | None] = {} self.data: List[bytes | None] = [] self.folder = folder self.stats = { 'hit': 0, 'miss': 0, 'new': 0, } self.memory_size = { 'content': 0, 'container': getsizeof(self.data) + getsizeof(self.metadata), }
[docs] def control_memory_usage(self, max_size_mb: int, verbose: int) -> None: bytes_to_mb = 1.0 / (1024 * 1024) max_size_mb = max(max_size_mb, 200) if verbose >= 3: self.print_statistics() # guess cache size cache_size = (self.memory_size['container'] + self.memory_size['content']) * bytes_to_mb before = cache_size if before < max_size_mb: return if verbose >= 2: print(f'>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> CACHE CLEANING [{before}]') self.remove_oldest_nodes(1 - max_size_mb / before) gc.collect() if verbose >= 2: print('<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< CACHE CLEANING')
[docs] def get(self, name: bytes, stat_inc: int = 1) -> bytes: metadata = self.metadata.get(name, None) data = b'' if metadata is not None: tmp_data = self.data[metadata[1]] if tmp_data is None: raise RuntimeError("tmp_data shouldn't be None if metadata is not None.") else: data = tmp_data self.stats['hit'] += stat_inc else: node_path = node_name_to_path(self.folder, name) if node_path.exists(): self.stats['miss'] += stat_inc with node_path.open('rb') as f: data = f.read() else: self.stats['new'] += stat_inc #  should we cache this node? return data
[docs] def remove(self, name: bytes) -> None: meta = self.metadata.pop(name, None) node_path = node_name_to_path(self.folder, name) if meta is None: if not node_path.exists(): raise FileNotFoundError(f"{node_path} should exist") else: self.memory_size['content'] -= getsizeof(meta) if self.data[meta[1]] is None: raise ValueError(f"{name!r} is present in self.metadata but not in self.data.") self.memory_size['content'] -= len(self.data[meta[1]]) # type: ignore self.memory_size['container'] = getsizeof(self.data) + getsizeof(self.metadata) self.data[meta[1]] = None if node_path.exists(): node_path.unlink()
[docs] def put(self, name: bytes, data: bytes) -> None: compressed_data = gzip.compress(data) metadata = self.metadata.get(name, None) if metadata is None: metadata = (time.time(), len(self.data)) self.data.append(compressed_data) else: metadata = (time.time(), metadata[1]) self.data[metadata[1]] = compressed_data self.metadata.update([(name, metadata)]) self.memory_size['content'] += len(compressed_data) + getsizeof((name, metadata)) self.memory_size['container'] = getsizeof(self.data) + getsizeof(self.metadata)
[docs] def remove_oldest_nodes(self, percent: float = 100) -> Tuple[int, int]: count = _remove_all(self) self.memory_size['content'] = 0 self.memory_size['container'] = getsizeof(self.data) + getsizeof(self.metadata) return count
[docs] def print_statistics(self) -> None: print('Stats: Hits = {}, Miss = {}, New = {}'.format( self.stats['hit'], self.stats['miss'], self.stats['new']))
def _remove_all(store: SharedNodeStore) -> Tuple[int, int]: # delete the entries count = len(store.metadata) bytes_written = 0 for name, meta in store.metadata.items(): if meta is None: continue data = store.data[meta[1]] if data is None: raise ValueError(f"{name!r} is present in self.metadata but not in self.data.") node_path = node_name_to_path(store.folder, name) with node_path.open('wb') as f: bytes_written += f.write(data) store.metadata = {} store.data = [] return count, bytes_written