Source code for py3dtiles.export

import argparse
import errno
import getpass
import json
import math
import os
import sys
import traceback
from typing import Any

import numpy as np

from py3dtiles.constants import EXIT_CODES
from py3dtiles.tilers.b3dm.wkb_utils import TriangleSoup
from py3dtiles.tileset.content import B3dm, GlTF
from py3dtiles.tileset.content.batch_table import BatchTable


[docs] class BoundingBox: def __init__(self, minimum, maximum): self.min = [float(i) for i in minimum] self.max = [float(i) for i in maximum]
[docs] def inside(self, point): return all(self.min[idx] <= point[idx] < self.max[idx] for idx in (0, 1))
[docs] def center(self): return [(i + j) / 2 for (i, j) in zip(self.min, self.max)]
[docs] def add(self, box): self.min = [min(i, j) for (i, j) in zip(self.min, box.min)] self.max = [max(i, j) for (i, j) in zip(self.max, box.max)]
[docs] class Feature: def __init__(self, index, box): self.index = index self.box = box
[docs] class Node: counter = 0 def __init__(self, features=None): self.id = Node.counter Node.counter += 1 self.features = features if features else [] self.box = None self.children = []
[docs] def add(self, node): self.children.append(node)
[docs] def compute_bbox(self): self.box = BoundingBox( [float("inf"), float("inf"), float("inf")], [-float("inf"), -float("inf"), -float("inf")], ) for c in self.children: c.compute_bbox() self.box.add(c.box) for g in self.features: self.box.add(g.box)
[docs] def to_tileset(self, transform): self.compute_bbox() tiles = { "asset": {"version": "1.0"}, "geometricError": 500, # TODO "root": self.to_tileset_r(500), } tiles["root"]["transform"] = [round(float(e), 3) for e in transform] return tiles
[docs] def to_tileset_r(self, error): if self.box is None: raise RuntimeError( "The attribute box cannot be None. Use the method to_tileset." ) (c1, c2) = (self.box.min, self.box.max) center = [(c1[i] + c2[i]) / 2 for i in range(3)] x_axis = [(c2[0] - c1[0]) / 2, 0, 0] y_axis = [0, (c2[1] - c1[1]) / 2, 0] z_axis = [0, 0, (c2[2] - c1[2]) / 2] box = [round(x, 3) for x in center + x_axis + y_axis + z_axis] tile = { "boundingVolume": {"box": box}, "geometricError": error, # TODO "children": [n.to_tileset_r(error / 2.0) for n in self.children], "refine": "add", } if len(self.features) != 0: tile["content"] = {"uri": f"tiles/{self.id}.b3dm"} return tile
[docs] def all_nodes(self): nodes = [self] for c in self.children: nodes.extend(c.all_nodes()) return nodes
[docs] def tile_extent(extent, size, i, j): min_extent = [extent.min[0] + i * size, extent.min[1] + j * size] max_extent = [extent.min[0] + (i + 1) * size, extent.min[1] + (j + 1) * size] return BoundingBox(min_extent, max_extent)
# TODO: transform
[docs] def arrays2tileset(positions, normals, bboxes, transform, ids=None): print("Creating tileset...") max_tile_size = 2000 features_per_tile = 20 indices = list(range(len(positions))) # glTF is Y-up, so to get the bounding boxes in the 3D tiles # coordinate system, we have to apply a Y-to-Z transform to the # glTF bounding boxes z_up_bboxes = [] for bbox in bboxes: tmp = m = bbox[0] M = bbox[1] m = [m[0], -m[2], m[1]] M = [M[0], -tmp[2], M[1]] z_up_bboxes.append([m, M]) # Compute extent x_min = y_min = float("inf") x_max = y_max = -float("inf") for bbox in z_up_bboxes: x_min = min(x_min, bbox[0][0]) y_min = min(y_min, bbox[0][1]) x_max = max(x_max, bbox[1][0]) y_max = max(y_max, bbox[1][1]) extent = BoundingBox([x_min, y_min], [x_max, y_max]) extent_x = x_max - x_min extent_y = y_max - y_min # Create quadtree tree = Node() for i in range(int(math.ceil(extent_x / max_tile_size))): for j in range(int(math.ceil(extent_y / max_tile_size))): tile = tile_extent(extent, max_tile_size, i, j) geoms = [] for idx, box in zip(indices, z_up_bboxes): bbox = BoundingBox(box[0], box[1]) if tile.inside(bbox.center()): geoms.append(Feature(idx, bbox)) if len(geoms) == 0: continue if len(geoms) > features_per_tile: node = Node(geoms[0:features_per_tile]) tree.add(node) divide( tile, geoms[features_per_tile : len(geoms)], i * 2, j * 2, max_tile_size / 2.0, features_per_tile, node, ) else: node = Node(geoms) tree.add(node) # Export b3dm & tileset tileset = tree.to_tileset(transform) with open("tileset.json", "w") as f: f.write(json.dumps(tileset)) print("Creating tiles...") nodes = tree.all_nodes() identity = np.identity(4).flatten("F") try: os.makedirs("tiles") except OSError as e: if e.errno != errno.EEXIST: raise for node in nodes: if len(node.features) != 0: bin_arrays = [] gids = [] for feature in node.features: pos = feature.index bin_arrays.append( { "position": positions[pos], "normal": normals[pos], "bbox": [[float(i) for i in j] for j in bboxes[pos]], } ) if ids is not None: gids.append(ids[pos]) gltf = GlTF.from_binary_arrays(bin_arrays, identity) bt = None if ids is not None: bt = BatchTable() bt.add_property_as_json("id", gids) b3dm = B3dm.from_gltf(gltf, bt).to_array() with open(f"tiles/{node.id}.b3dm", "wb") as f: f.write(b3dm.tobytes())
[docs] def divide( extent, geometries, x_offset, y_offset, tile_size, features_per_tile, parent ): for i in range(2): for j in range(2): tile = tile_extent(extent, tile_size, i, j) geoms = [] for g in geometries: if tile.inside(g.box.center()): geoms.append(g) if len(geoms) == 0: continue if len(geoms) > features_per_tile: node = Node(geoms[0:features_per_tile]) parent.add(node) divide( tile, geoms[features_per_tile : len(geoms)], (x_offset + i) * 2, (y_offset + j) * 2, tile_size / 2.0, features_per_tile, node, ) else: node = Node(geoms) parent.add(node)
[docs] def wkbs_to_tileset(wkbs, ids, transform): geoms = [TriangleSoup.from_wkb_multipolygon(wkb) for wkb in wkbs] positions = [ts.get_position_array() for ts in geoms] normals = [ts.get_normal_array() for ts in geoms] bboxes = [ts.get_bbox() for ts in geoms] arrays2tileset(positions, normals, bboxes, transform, ids)
[docs] def build_secure_conn(db_conn_info): """Get a psycopg2 connexion securely, e.g. without writing the password explicitely in the terminal Parameters ---------- db_conn_info : str Returns ------- psycopg2.extensions.connection """ import psycopg2 try: connection = psycopg2.connect(db_conn_info) except psycopg2.OperationalError: pw = getpass.getpass("Postgres password: ") connection = psycopg2.connect(db_conn_info + f" password={pw}") return connection
[docs] def from_db(db_conn_info, table_name, column_name, id_column_name): connection = build_secure_conn(db_conn_info) cur = connection.cursor() print("Loading data from database...") cur.execute(f"SELECT ST_3DExtent({column_name}) FROM {table_name}") extent = cur.fetchall()[0][0] extent = [m.split(" ") for m in extent[6:-1].split(",")] offset = [ (float(extent[1][0]) + float(extent[0][0])) / 2, (float(extent[1][1]) + float(extent[0][1])) / 2, (float(extent[1][2]) + float(extent[0][2])) / 2, ] id_statement = "" if id_column_name is not None: id_statement = "," + id_column_name cur.execute( "SELECT ST_AsBinary(ST_RotateX(ST_Translate({0}, {1}, {2}, {3}), -pi() / 2))," "ST_Area(ST_Force2D({0})) AS weight{5} FROM {4} ORDER BY weight DESC".format( column_name, -offset[0], -offset[1], -offset[2], table_name, id_statement ) ) res = cur.fetchall() wkbs = [t[0] for t in res] ids = None if id_column_name is not None: ids = [t[2] for t in res] transform = np.array( [ [1, 0, 0, offset[0]], [0, 1, 0, offset[1]], [0, 0, 1, offset[2]], [0, 0, 0, 1], ], dtype=float, ) transform = transform.flatten("F") wkbs_to_tileset(wkbs, ids, transform)
[docs] def from_directory(directory, offset): # TODO: improvement -> order wkbs by geometry size, similarly to database mode offset = (0, 0, 0) if offset is None else offset # open all wkbs from directory files = os.listdir(directory) files = [os.path.join(directory, f) for f in os.listdir(directory)] files = [f for f in files if os.path.isfile(f) and os.path.splitext(f)[1] == ".wkb"] wkbs = [] for f in files: of = open(f, "rb") wkbs.append(of.read()) of.close() transform = np.array( [ [1, 0, 0, offset[0]], [0, 1, 0, offset[1]], [0, 0, 1, offset[2]], [0, 0, 0, 1], ], dtype=float, ) transform = transform.flatten("F") wkbs_to_tileset(wkbs, None, transform)
[docs] def init_parser( subparser: "argparse._SubParsersAction[Any]", ) -> argparse.ArgumentParser: descr = "Generate a tileset from a set of geometries" parser: argparse.ArgumentParser = subparser.add_parser("export", help=descr) group = parser.add_mutually_exclusive_group() d_help = "Name of the directory containing the geometries" group.add_argument("-d", metavar="DIRECTORY", type=str, help=d_help) o_help = "Offset of the geometries (only with '-d')" parser.add_argument("-o", nargs=3, metavar=("X", "Y", "Z"), type=float, help=o_help) D_help = """ Database connexion info (e.g. 'service=py3dtiles' or \ 'dbname=py3dtiles host=localhost port=5432 user=yourname password=yourpwd') """ group.add_argument("-D", metavar="DB_CONNINFO", type=str, help=D_help) t_help = "Table name (required if '-D' option is activated)" parser.add_argument("-t", metavar="TABLE", type=str, help=t_help) c_help = "Geometry column name (required if '-D' option is activated)" parser.add_argument("-c", metavar="COLUMN", type=str, help=c_help) i_help = "Id column name (only with '-D')" parser.add_argument("-i", metavar="IDCOLUMN", type=str, help=i_help) return parser
[docs] def main(args: argparse.Namespace) -> None: if args.D is not None: if args.t is None or args.c is None: print("Error: please define a table (-t) and column (-c)", file=sys.stderr) exit(EXIT_CODES.MISSING_ARGS.value) try: from_db(args.D, args.t, args.c, args.i) except ImportError: traceback.print_exc() print( "Cannot import psycopg2. You might have to execute `pip install py3dtiles[postgres]`. Please read the installation documentation." ) exit(EXIT_CODES.MISSING_DEPS.value) elif args.d is not None: from_directory(args.d, args.o) else: print("Error: database or directory must be set", file=sys.stderr) exit(EXIT_CODES.MISSING_ARGS.value)