from __future__ import annotations
import argparse
import errno
import getpass
import json
import math
import os
import sys
import traceback
from pathlib import Path
from typing import TYPE_CHECKING, Any
import numpy as np
import numpy.typing as npt
from py3dtiles.constants import EXIT_CODES
from py3dtiles.tilers.b3dm.wkb_utils import TriangleSoup
from py3dtiles.tileset.content import B3dm
from py3dtiles.tileset.content.batch_table import BatchTable
if TYPE_CHECKING:
# we dynamically import it below, but we need it for static analysis
import psycopg2
[docs]
class BoundingBox:
def __init__(self, minimum, maximum):
self.min = [float(i) for i in minimum]
self.max = [float(i) for i in maximum]
[docs]
def inside(self, point):
return all(self.min[idx] <= point[idx] < self.max[idx] for idx in (0, 1))
[docs]
def center(self):
return [(i + j) / 2 for (i, j) in zip(self.min, self.max)]
[docs]
def add(self, box):
self.min = [min(i, j) for (i, j) in zip(self.min, box.min)]
self.max = [max(i, j) for (i, j) in zip(self.max, box.max)]
[docs]
class Feature:
def __init__(self, index, box):
self.index = index
self.box = box
[docs]
class Node:
counter = 0
def __init__(self, features=None):
self.id = Node.counter
Node.counter += 1
self.features = features if features else []
self.box = None
self.children = []
[docs]
def add(self, node):
self.children.append(node)
[docs]
def compute_bbox(self):
self.box = BoundingBox(
[float("inf"), float("inf"), float("inf")],
[-float("inf"), -float("inf"), -float("inf")],
)
for c in self.children:
c.compute_bbox()
self.box.add(c.box)
for g in self.features:
self.box.add(g.box)
[docs]
def to_tileset(self, transform):
self.compute_bbox()
tiles = {
"asset": {"version": "1.0"},
"geometricError": 500, # TODO
"root": self.to_tileset_r(500),
}
tiles["root"]["transform"] = [round(float(e), 3) for e in transform]
return tiles
[docs]
def to_tileset_r(self, error):
if self.box is None:
raise RuntimeError(
"The attribute box cannot be None. Use the method to_tileset."
)
(c1, c2) = (self.box.min, self.box.max)
center = [(c1[i] + c2[i]) / 2 for i in range(3)]
x_axis = [(c2[0] - c1[0]) / 2, 0, 0]
y_axis = [0, (c2[1] - c1[1]) / 2, 0]
z_axis = [0, 0, (c2[2] - c1[2]) / 2]
box = [round(x, 3) for x in center + x_axis + y_axis + z_axis]
tile = {
"boundingVolume": {"box": box},
"geometricError": error, # TODO
"children": [n.to_tileset_r(error / 2.0) for n in self.children],
"refine": "add",
}
if len(self.features) != 0:
tile["content"] = {"uri": f"tiles/{self.id}.b3dm"}
return tile
[docs]
def all_nodes(self):
nodes = [self]
for c in self.children:
nodes.extend(c.all_nodes())
return nodes
[docs]
def tile_extent(extent, size, i, j):
min_extent = [extent.min[0] + i * size, extent.min[1] + j * size]
max_extent = [extent.min[0] + (i + 1) * size, extent.min[1] + (j + 1) * size]
return BoundingBox(min_extent, max_extent)
# TODO: transform
[docs]
def arrays2tileset(
output_dir: Path,
triangle_soups: list[TriangleSoup],
transform: npt.NDArray[np.float32],
ids: list[str] = None,
) -> None:
print("Creating tileset...")
max_tile_size = 2000
features_per_tile = 20
indices = list(range(len(triangle_soups)))
# glTF is Y-up, so to get the bounding boxes in the 3D tiles
# coordinate system, we have to apply a Y-to-Z transform to the
# glTF bounding boxes
z_up_bboxes = []
for bbox in [ts.get_bbox() for ts in triangle_soups]:
tmp = m = bbox[0]
M = bbox[1]
m = [m[0], -m[2], m[1]]
M = [M[0], -tmp[2], M[1]]
z_up_bboxes.append([m, M])
# Compute extent
x_min = y_min = float("inf")
x_max = y_max = -float("inf")
for bbox in z_up_bboxes:
x_min = min(x_min, bbox[0][0])
y_min = min(y_min, bbox[0][1])
x_max = max(x_max, bbox[1][0])
y_max = max(y_max, bbox[1][1])
extent = BoundingBox([x_min, y_min], [x_max, y_max])
extent_x = x_max - x_min
extent_y = y_max - y_min
# Create quadtree
tree = Node()
for i in range(int(math.ceil(max(1, extent_x) / max_tile_size))):
for j in range(int(math.ceil(max(1, extent_y) / max_tile_size))):
tile = tile_extent(extent, max_tile_size, i, j)
geoms = []
for idx, box in zip(indices, z_up_bboxes):
bbox = BoundingBox(box[0], box[1])
if tile.inside(bbox.center()):
geoms.append(Feature(idx, bbox))
if len(geoms) == 0:
continue
if len(geoms) > features_per_tile:
node = Node(geoms[0:features_per_tile])
tree.add(node)
divide(
tile,
geoms[features_per_tile : len(geoms)],
i * 2,
j * 2,
max_tile_size / 2.0,
features_per_tile,
node,
)
else:
node = Node(geoms)
tree.add(node)
# Export b3dm & tileset
tileset = tree.to_tileset(transform)
with open(Path(output_dir, "tileset.json"), "w") as f:
f.write(json.dumps(tileset))
print("Creating tiles...")
nodes = tree.all_nodes()
try:
os.makedirs(f"{output_dir}/tiles")
except OSError as e:
if e.errno != errno.EEXIST:
raise
for node in nodes:
if len(node.features) != 0:
arrays = []
gids = []
for feature in node.features:
pos = feature.index
arrays.append(
{
"vertices": triangle_soups[pos].vertices,
"indices": triangle_soups[pos].triangle_indices,
"normal": triangle_soups[pos].compute_normals(),
}
)
if ids is not None:
gids.append(ids[pos])
bt = None
if ids is not None:
bt = BatchTable()
bt.add_property_as_json("id", gids)
b3dm = B3dm.from_numpy_arrays(
points=np.concatenate([arr["vertices"] for arr in arrays]),
triangles=np.concatenate([arr["indices"] for arr in arrays]),
normal=np.concatenate([arr["normal"] for arr in arrays]),
batch_table=bt,
)
b3dm.save_as(Path(output_dir, f"tiles/{node.id}.b3dm"))
[docs]
def divide(
extent, geometries, x_offset, y_offset, tile_size, features_per_tile, parent
):
for i in range(2):
for j in range(2):
tile = tile_extent(extent, tile_size, i, j)
geoms = []
for g in geometries:
if tile.inside(g.box.center()):
geoms.append(g)
if len(geoms) == 0:
continue
if len(geoms) > features_per_tile:
node = Node(geoms[0:features_per_tile])
parent.add(node)
divide(
tile,
geoms[features_per_tile : len(geoms)],
(x_offset + i) * 2,
(y_offset + j) * 2,
tile_size / 2.0,
features_per_tile,
node,
)
else:
node = Node(geoms)
parent.add(node)
[docs]
def wkbs_to_tileset(
wkbs: list[bytes],
ids: list[str] | None,
transform: npt.NDArray[np.float32],
output_dir: Path,
) -> None:
geoms = [TriangleSoup.from_wkb_multipolygon(wkb) for wkb in wkbs]
arrays2tileset(output_dir, geoms, transform, ids)
[docs]
def build_secure_conn(db_conn_info: str) -> psycopg2.extensions.connection:
"""Get a psycopg2 connexion securely, e.g. without writing the password explicitely
in the terminal
Parameters
----------
db_conn_info : str
Returns
-------
psycopg2.extensions.connection
"""
import psycopg2
try:
connection = psycopg2.connect(db_conn_info)
except psycopg2.OperationalError:
pw = getpass.getpass("Postgres password: ")
connection = psycopg2.connect(db_conn_info + f" password={pw}")
return connection
[docs]
def from_db(db_conn_info, table_name, column_name, id_column_name, output_dir) -> None:
connection = build_secure_conn(db_conn_info)
cur = connection.cursor()
print("Loading data from database...")
cur.execute(f"SELECT ST_3DExtent({column_name}) FROM {table_name}")
extent = cur.fetchall()[0][0]
extent = [m.split(" ") for m in extent[6:-1].split(",")]
offset = [
(float(extent[1][0]) + float(extent[0][0])) / 2,
(float(extent[1][1]) + float(extent[0][1])) / 2,
(float(extent[1][2]) + float(extent[0][2])) / 2,
]
id_statement = ""
if id_column_name is not None:
id_statement = "," + id_column_name
cur.execute(
"SELECT ST_AsBinary(ST_RotateX(ST_Translate({0}, {1}, {2}, {3}), -pi() / 2)),"
"ST_Area(ST_Force2D({0})) AS weight{5} FROM {4} ORDER BY weight DESC".format(
column_name, -offset[0], -offset[1], -offset[2], table_name, id_statement
)
)
res = cur.fetchall()
wkbs = [t[0] for t in res]
ids = None
if id_column_name is not None:
ids = [t[2] for t in res]
transform = np.array(
[
[1, 0, 0, offset[0]],
[0, 1, 0, offset[1]],
[0, 0, 1, offset[2]],
[0, 0, 0, 1],
],
dtype=np.float32,
)
transform = transform.flatten("F")
wkbs_to_tileset(wkbs, ids, transform, output_dir)
[docs]
def from_directory(directory, offset, output_dir) -> None:
# TODO: improvement -> order wkbs by geometry size, similarly to database mode
offset = (0, 0, 0) if offset is None else offset
# open all wkbs from directory
files = os.listdir(directory)
files = [os.path.join(directory, f) for f in os.listdir(directory)]
files = [f for f in files if os.path.isfile(f) and os.path.splitext(f)[1] == ".wkb"]
wkbs = []
for f in files:
of = open(f, "rb")
wkbs.append(of.read())
of.close()
transform = np.array(
[
[1, 0, 0, offset[0]],
[0, 1, 0, offset[1]],
[0, 0, 1, offset[2]],
[0, 0, 0, 1],
],
dtype=np.float32,
)
transform = transform.flatten("F")
wkbs_to_tileset(wkbs, None, transform, output_dir)
def _init_parser(
subparser: argparse._SubParsersAction[Any],
) -> argparse.ArgumentParser:
descr = "Generate a tileset from a set of geometries"
parser: argparse.ArgumentParser = subparser.add_parser("export", help=descr)
group = parser.add_mutually_exclusive_group()
d_help = "Name of the directory containing the geometries"
group.add_argument("-d", metavar="DIRECTORY", type=str, help=d_help)
o_help = "Offset of the geometries (only with '-d')"
parser.add_argument("-o", nargs=3, metavar=("X", "Y", "Z"), type=float, help=o_help)
D_help = """
Database connexion info (e.g. 'service=py3dtiles' or \
'dbname=py3dtiles host=localhost port=5432 user=yourname password=yourpwd')
"""
group.add_argument("-D", metavar="DB_CONNINFO", type=str, help=D_help)
t_help = "Table name (required if '-D' option is activated)"
parser.add_argument("-t", metavar="TABLE", type=str, help=t_help)
c_help = "Geometry column name (required if '-D' option is activated)"
parser.add_argument("-c", metavar="COLUMN", type=str, help=c_help)
i_help = "Id column name (only with '-D')"
parser.add_argument("-i", metavar="IDCOLUMN", type=str, help=i_help)
s_help = "Output directory where command results are stored"
parser.add_argument("-s", metavar="OUTPUT_DIR", default=".", type=str, help=s_help)
return parser
def _main(args: argparse.Namespace) -> None:
output_dir = Path(args.s)
output_dir.mkdir(exist_ok=True, parents=True)
if args.D is not None:
if args.t is None or args.c is None:
print("Error: please define a table (-t) and column (-c)", file=sys.stderr)
exit(EXIT_CODES.MISSING_ARGS.value)
try:
from_db(args.D, args.t, args.c, args.i, output_dir)
except ImportError:
traceback.print_exc()
print(
"Cannot import psycopg2. You might have to execute `pip install py3dtiles[postgres]`. Please read the installation documentation."
)
exit(EXIT_CODES.MISSING_DEPS.value)
elif args.d is not None:
from_directory(args.d, args.o, output_dir)
else:
print("Error: database or directory must be set", file=sys.stderr)
exit(EXIT_CODES.MISSING_ARGS.value)