Source code for py3dtiles.export

from __future__ import annotations

import argparse
import errno
import getpass
import json
import math
import os
import sys
import traceback
from pathlib import Path
from typing import TYPE_CHECKING, Any

import numpy as np
import numpy.typing as npt

from py3dtiles.constants import EXIT_CODES
from py3dtiles.tilers.b3dm.wkb_utils import TriangleSoup
from py3dtiles.tileset.content import B3dm
from py3dtiles.tileset.content.batch_table import BatchTable

if TYPE_CHECKING:
    # we dynamically import it below, but we need it for static analysis
    import psycopg2


[docs] class BoundingBox: def __init__(self, minimum, maximum): self.min = [float(i) for i in minimum] self.max = [float(i) for i in maximum]
[docs] def inside(self, point): return all(self.min[idx] <= point[idx] < self.max[idx] for idx in (0, 1))
[docs] def center(self): return [(i + j) / 2 for (i, j) in zip(self.min, self.max)]
[docs] def add(self, box): self.min = [min(i, j) for (i, j) in zip(self.min, box.min)] self.max = [max(i, j) for (i, j) in zip(self.max, box.max)]
[docs] class Feature: def __init__(self, index, box): self.index = index self.box = box
[docs] class Node: counter = 0 def __init__(self, features=None): self.id = Node.counter Node.counter += 1 self.features = features if features else [] self.box = None self.children = []
[docs] def add(self, node): self.children.append(node)
[docs] def compute_bbox(self): self.box = BoundingBox( [float("inf"), float("inf"), float("inf")], [-float("inf"), -float("inf"), -float("inf")], ) for c in self.children: c.compute_bbox() self.box.add(c.box) for g in self.features: self.box.add(g.box)
[docs] def to_tileset(self, transform): self.compute_bbox() tiles = { "asset": {"version": "1.0"}, "geometricError": 500, # TODO "root": self.to_tileset_r(500), } tiles["root"]["transform"] = [round(float(e), 3) for e in transform] return tiles
[docs] def to_tileset_r(self, error): if self.box is None: raise RuntimeError( "The attribute box cannot be None. Use the method to_tileset." ) (c1, c2) = (self.box.min, self.box.max) center = [(c1[i] + c2[i]) / 2 for i in range(3)] x_axis = [(c2[0] - c1[0]) / 2, 0, 0] y_axis = [0, (c2[1] - c1[1]) / 2, 0] z_axis = [0, 0, (c2[2] - c1[2]) / 2] box = [round(x, 3) for x in center + x_axis + y_axis + z_axis] tile = { "boundingVolume": {"box": box}, "geometricError": error, # TODO "children": [n.to_tileset_r(error / 2.0) for n in self.children], "refine": "add", } if len(self.features) != 0: tile["content"] = {"uri": f"tiles/{self.id}.b3dm"} return tile
[docs] def all_nodes(self): nodes = [self] for c in self.children: nodes.extend(c.all_nodes()) return nodes
[docs] def tile_extent(extent, size, i, j): min_extent = [extent.min[0] + i * size, extent.min[1] + j * size] max_extent = [extent.min[0] + (i + 1) * size, extent.min[1] + (j + 1) * size] return BoundingBox(min_extent, max_extent)
# TODO: transform
[docs] def arrays2tileset( output_dir: Path, triangle_soups: list[TriangleSoup], transform: npt.NDArray[np.float32], ids: list[str] = None, ) -> None: print("Creating tileset...") max_tile_size = 2000 features_per_tile = 20 indices = list(range(len(triangle_soups))) # glTF is Y-up, so to get the bounding boxes in the 3D tiles # coordinate system, we have to apply a Y-to-Z transform to the # glTF bounding boxes z_up_bboxes = [] for bbox in [ts.get_bbox() for ts in triangle_soups]: tmp = m = bbox[0] M = bbox[1] m = [m[0], -m[2], m[1]] M = [M[0], -tmp[2], M[1]] z_up_bboxes.append([m, M]) # Compute extent x_min = y_min = float("inf") x_max = y_max = -float("inf") for bbox in z_up_bboxes: x_min = min(x_min, bbox[0][0]) y_min = min(y_min, bbox[0][1]) x_max = max(x_max, bbox[1][0]) y_max = max(y_max, bbox[1][1]) extent = BoundingBox([x_min, y_min], [x_max, y_max]) extent_x = x_max - x_min extent_y = y_max - y_min # Create quadtree tree = Node() for i in range(int(math.ceil(max(1, extent_x) / max_tile_size))): for j in range(int(math.ceil(max(1, extent_y) / max_tile_size))): tile = tile_extent(extent, max_tile_size, i, j) geoms = [] for idx, box in zip(indices, z_up_bboxes): bbox = BoundingBox(box[0], box[1]) if tile.inside(bbox.center()): geoms.append(Feature(idx, bbox)) if len(geoms) == 0: continue if len(geoms) > features_per_tile: node = Node(geoms[0:features_per_tile]) tree.add(node) divide( tile, geoms[features_per_tile : len(geoms)], i * 2, j * 2, max_tile_size / 2.0, features_per_tile, node, ) else: node = Node(geoms) tree.add(node) # Export b3dm & tileset tileset = tree.to_tileset(transform) with open(Path(output_dir, "tileset.json"), "w") as f: f.write(json.dumps(tileset)) print("Creating tiles...") nodes = tree.all_nodes() try: os.makedirs(f"{output_dir}/tiles") except OSError as e: if e.errno != errno.EEXIST: raise for node in nodes: if len(node.features) != 0: arrays = [] gids = [] for feature in node.features: pos = feature.index arrays.append( { "vertices": triangle_soups[pos].vertices, "indices": triangle_soups[pos].triangle_indices, "normal": triangle_soups[pos].compute_normals(), } ) if ids is not None: gids.append(ids[pos]) bt = None if ids is not None: bt = BatchTable() bt.add_property_as_json("id", gids) b3dm = B3dm.from_numpy_arrays( points=np.concatenate([arr["vertices"] for arr in arrays]), triangles=np.concatenate([arr["indices"] for arr in arrays]), normal=np.concatenate([arr["normal"] for arr in arrays]), batch_table=bt, ) b3dm.save_as(Path(output_dir, f"tiles/{node.id}.b3dm"))
[docs] def divide( extent, geometries, x_offset, y_offset, tile_size, features_per_tile, parent ): for i in range(2): for j in range(2): tile = tile_extent(extent, tile_size, i, j) geoms = [] for g in geometries: if tile.inside(g.box.center()): geoms.append(g) if len(geoms) == 0: continue if len(geoms) > features_per_tile: node = Node(geoms[0:features_per_tile]) parent.add(node) divide( tile, geoms[features_per_tile : len(geoms)], (x_offset + i) * 2, (y_offset + j) * 2, tile_size / 2.0, features_per_tile, node, ) else: node = Node(geoms) parent.add(node)
[docs] def wkbs_to_tileset( wkbs: list[bytes], ids: list[str] | None, transform: npt.NDArray[np.float32], output_dir: Path, ) -> None: geoms = [TriangleSoup.from_wkb_multipolygon(wkb) for wkb in wkbs] arrays2tileset(output_dir, geoms, transform, ids)
[docs] def build_secure_conn(db_conn_info: str) -> psycopg2.extensions.connection: """Get a psycopg2 connexion securely, e.g. without writing the password explicitely in the terminal Parameters ---------- db_conn_info : str Returns ------- psycopg2.extensions.connection """ import psycopg2 try: connection = psycopg2.connect(db_conn_info) except psycopg2.OperationalError: pw = getpass.getpass("Postgres password: ") connection = psycopg2.connect(db_conn_info + f" password={pw}") return connection
[docs] def from_db(db_conn_info, table_name, column_name, id_column_name, output_dir) -> None: connection = build_secure_conn(db_conn_info) cur = connection.cursor() print("Loading data from database...") cur.execute(f"SELECT ST_3DExtent({column_name}) FROM {table_name}") extent = cur.fetchall()[0][0] extent = [m.split(" ") for m in extent[6:-1].split(",")] offset = [ (float(extent[1][0]) + float(extent[0][0])) / 2, (float(extent[1][1]) + float(extent[0][1])) / 2, (float(extent[1][2]) + float(extent[0][2])) / 2, ] id_statement = "" if id_column_name is not None: id_statement = "," + id_column_name cur.execute( "SELECT ST_AsBinary(ST_RotateX(ST_Translate({0}, {1}, {2}, {3}), -pi() / 2))," "ST_Area(ST_Force2D({0})) AS weight{5} FROM {4} ORDER BY weight DESC".format( column_name, -offset[0], -offset[1], -offset[2], table_name, id_statement ) ) res = cur.fetchall() wkbs = [t[0] for t in res] ids = None if id_column_name is not None: ids = [t[2] for t in res] transform = np.array( [ [1, 0, 0, offset[0]], [0, 1, 0, offset[1]], [0, 0, 1, offset[2]], [0, 0, 0, 1], ], dtype=np.float32, ) transform = transform.flatten("F") wkbs_to_tileset(wkbs, ids, transform, output_dir)
[docs] def from_directory(directory, offset, output_dir) -> None: # TODO: improvement -> order wkbs by geometry size, similarly to database mode offset = (0, 0, 0) if offset is None else offset # open all wkbs from directory files = os.listdir(directory) files = [os.path.join(directory, f) for f in os.listdir(directory)] files = [f for f in files if os.path.isfile(f) and os.path.splitext(f)[1] == ".wkb"] wkbs = [] for f in files: of = open(f, "rb") wkbs.append(of.read()) of.close() transform = np.array( [ [1, 0, 0, offset[0]], [0, 1, 0, offset[1]], [0, 0, 1, offset[2]], [0, 0, 0, 1], ], dtype=np.float32, ) transform = transform.flatten("F") wkbs_to_tileset(wkbs, None, transform, output_dir)
def _init_parser( subparser: argparse._SubParsersAction[Any], ) -> argparse.ArgumentParser: descr = "Generate a tileset from a set of geometries" parser: argparse.ArgumentParser = subparser.add_parser("export", help=descr) group = parser.add_mutually_exclusive_group() d_help = "Name of the directory containing the geometries" group.add_argument("-d", metavar="DIRECTORY", type=str, help=d_help) o_help = "Offset of the geometries (only with '-d')" parser.add_argument("-o", nargs=3, metavar=("X", "Y", "Z"), type=float, help=o_help) D_help = """ Database connexion info (e.g. 'service=py3dtiles' or \ 'dbname=py3dtiles host=localhost port=5432 user=yourname password=yourpwd') """ group.add_argument("-D", metavar="DB_CONNINFO", type=str, help=D_help) t_help = "Table name (required if '-D' option is activated)" parser.add_argument("-t", metavar="TABLE", type=str, help=t_help) c_help = "Geometry column name (required if '-D' option is activated)" parser.add_argument("-c", metavar="COLUMN", type=str, help=c_help) i_help = "Id column name (only with '-D')" parser.add_argument("-i", metavar="IDCOLUMN", type=str, help=i_help) s_help = "Output directory where command results are stored" parser.add_argument("-s", metavar="OUTPUT_DIR", default=".", type=str, help=s_help) return parser def _main(args: argparse.Namespace) -> None: output_dir = Path(args.s) output_dir.mkdir(exist_ok=True, parents=True) if args.D is not None: if args.t is None or args.c is None: print("Error: please define a table (-t) and column (-c)", file=sys.stderr) exit(EXIT_CODES.MISSING_ARGS.value) try: from_db(args.D, args.t, args.c, args.i, output_dir) except ImportError: traceback.print_exc() print( "Cannot import psycopg2. You might have to execute `pip install py3dtiles[postgres]`. Please read the installation documentation." ) exit(EXIT_CODES.MISSING_DEPS.value) elif args.d is not None: from_directory(args.d, args.o, output_dir) else: print("Error: database or directory must be set", file=sys.stderr) exit(EXIT_CODES.MISSING_ARGS.value)