toposhirt/topologytool.py
2026-02-01 21:04:32 -08:00

247 lines
9.7 KiB
Python

import os
import shutil
import sys
from PIL import Image, ImageOps
from country_iso2ify import get_resolver
import pycountry
from country_bounding_boxes import country_subunits_by_iso_code
import requests
import ee
import numpy as np
import pandas as pd
import cairocffi as cairo
class Topology:
def __init__(self,
country=None,
iso_country=None,
subregion_mode=False,
refetch_elevation_data=False,
subregion=None,
linemap_size=(800,800),
linemap_rows=30,
linemap_cols=30,
linemap_width=1.0,
linemap_roundcap=True,
linemap_transparent=False,
storage_directory=None,
):
self.country = country
self.refetch_elevation_data = refetch_elevation_data
self.iso_country = iso_country
self.subregion = subregion
self.linemap_size = linemap_size
self.linemap_rows = linemap_rows
self.linemap_cols = linemap_cols
self.linemap_width = linemap_width
self.linemap_roundcap = linemap_roundcap
self.linemap_transparent = linemap_transparent
self.subregion_mode = subregion_mode
self.target_region_name = None
self.elevation_filename = None
self.storage_directory = storage_directory
if self.country is None and self.iso_country is None:
self.country = "Switzerland"
if self.country is None:
self.country = pycountry.countries.get(alpha_2=self.iso_country).name
self.subregion = self.country
if self.iso_country is None:
self.iso_country = get_resolver().resolve(self.country)
if self.subregion_mode is False:
self.target_region_name=self.country
self.subregion = self.country
else:
self.target_region_name=self.subregion
self.elevation_filename = self.storage_directory + "/" + self.target_region_name+".png"
print("Elevation file name is [{}]".format(self.elevation_filename))
def list_subunits(self):
countries = country_subunits_by_iso_code(self.iso_country)
print("Country is [{}] iso code is [{}]".format(self.country, self.iso_country))
item = None
if countries:
for item in countries:
print("Subunit of [{}]: [{}]".format(self.country, item.subunit))
def get_boundingbox(self):
countries = country_subunits_by_iso_code(self.iso_country)
print("bbox--Country is [{}] iso code is [{}] subregion is [{}]".format(self.country, self.iso_country, self.subregion))
# The library is designed to return the main body of the country
# in the first result or by filtering for largest area.
item = None
countries = country_subunits_by_iso_code(self.iso_country)
if countries and self.subregion is None:
for item in countries:
print(self.subregion)
if item.subunit == self.subregion:
return item.bbox
else:
if countries:
for item in countries:
if item.subunit == self.subregion:
return item.bbox
return None
def create_elevation_file(self):
ee.Initialize(project="arizona-topo")
elv = ee.Image('USGS/SRTMGL1_003')
boundingbox = self.get_boundingbox()
region_of_interest = ee.Geometry.BBox(boundingbox[0],
boundingbox[1],
boundingbox[2],
boundingbox[3])
countries = ee.FeatureCollection('FAO/GAUL/2015/level1').select('ADM0_NAME')
# logging.debug("Countries are [{}]".format(countries))
states = ee.FeatureCollection('FAO/GAUL/2015/level1').select('ADM1_NAME')
target_boundary = countries.filter(ee.Filter.eq('ADM0_NAME', self.target_region_name))
elevation_image = elv.updateMask(elv.gt(0))
elevation_clip = elevation_image.clip(target_boundary)
url = elevation_clip.getThumbUrl({
'min': -300, 'max': 8500, 'region': region_of_interest, 'dimensions': 1024,
'crs': 'EPSG:4326',
'fileFormat': 'GeoTIFF',
})
print("url is [{}]".format(url))
print("Downloading image from [{}]".format(url))
response = requests.get(url, stream=True)
with open(self.elevation_filename, 'wb') as out_file:
shutil.copyfileobj(response.raw, out_file)
del response
def convert_png_to_data(self,rotation=None):
elevation_data = Image.open(self.elevation_filename).convert('L')
frame_image = Image.new(mode='L', size=(1224,1224), color=0)
frame_image.paste(elevation_data,
((1224 - elevation_data.size[0]) // 2,
(1224 - elevation_data.size[1]) // 2))
if rotation is None:
elevation_data = frame_image
else:
elevation_data = frame_image.rotate(rotation, expand=False)
width, height = elevation_data.size
grid_w, grid_h = self.linemap_rows, self.linemap_cols
# Calculate cell dimensions
cell_w = width // grid_w
cell_h = height // grid_h
# Calculate mean values for each cell
# future improve -- only average non-zero values to get coastlines and edges right
values = np.zeros((grid_h, grid_w))
for r in range(grid_h):
for c in range(grid_w):
# Define box (left, upper, right, lower)
box = (c * cell_w, r * cell_h, (c + 1) * cell_w, (r + 1) * cell_h)
cell = elevation_data.crop(box)
values[r, c] = np.mean(np.array(cell))
return values
def create_svg_file(self, rotation=None,file_index=None):
# if no refetch, check if file exists
if os.path.exists(self.elevation_filename):
print("Elevation file exists")
if self.refetch_elevation_data:
print("Refetching elevation data requested")
self.create_elevation_file()
else:
print("Elevation file does not exist, fetching")
self.create_elevation_file()
# get png data to cols x rows grid
values = self.convert_png_to_data(rotation)
svg_filename = self.storage_directory + "/" + self.target_region_name+".svg"
if rotation is not None:
rotation_string = f"{rotation:05.2f}"
svg_filename = self.storage_directory + "/" + self.target_region_name + ".r" + rotation_string + ".svg"
if file_index is not None:
# assume this is an integer, convert to a string with %05d
file_index_string = f"{file_index:05d}"
svg_filename = (self.storage_directory
+ "/" + self.target_region_name
+ ".rot_"
+ rotation_string
+ "_degrees."
+ file_index_string
+ ".svg")
# now make a line drawing of it
surface = cairo.SVGSurface(svg_filename,
self.linemap_size[0],
self.linemap_size[1])
cr = cairo.Context(surface)
# Background
cr.set_source_rgb(1, 1, 1)
cr.paint()
# Simple Orthographic Projection Matrix (pseudo-3D)
# Projects (x,y,z) -> (x+z/2, y+z/2)
# max height should be no more than 3 times the distance between rows, this could be a setting
def project(x, y, z, xscale=20, yscale=10, offset=(0, 0)):
shelf = 0 if z == 0 else z + 2
shelf = z
return (offset[0] + x * xscale, offset[1] + y * yscale + shelf)
# return (offset[0] + x * scale + z * 2, offset[1] + y * scale + z * 2)
rows, cols = values.shape
max_val = values.max()
# Normalize data for visualization height (0-100)
norm_data = (values / max_val) * 100
# Draw Lines along rows
cr.set_source_rgb(0, 0, 0) # Black lines
cr.set_line_width(1.0)
x = 0
y = 0
for r in range(rows):
for c in range(cols):
z = norm_data[r, c]
x, y = project(c, r, -z, xscale=800 / cols, yscale=800 / rows) # -z to make higher values go "up" in 2D
if c == 0:
cr.move_to(x, y + 50)
cr.line_to(x, y)
else:
cr.line_to(x, y)
cr.line_to(x, y + 50)
cr.close_path()
cr.set_source_rgb(1, 1, 1)
cr.fill_preserve()
cr.set_source_rgb(0, 0, 0)
cr.set_line_width(1.0)
cr.stroke()
surface.write_to_png(svg_filename.replace(".svg", ".png"))
surface.finish()
def main():
topology = Topology(country="Spain",
storage_directory="/Users/he/PycharmProjects/toposhirt/outputfiles",
subregion_mode=False,
linemap_cols=170,
linemap_rows=270)
topology.list_subunits()
print(topology.get_boundingbox())
for r in range(4*360):
print("Rotating by [{}] degrees".format(r/4.0))
topology.create_svg_file(rotation=(r / 4.0), file_index=r)
sys.exit(0)
t2=Topology(country="United Kingdom",
subregion_mode=True,
subregion="Wales")
t2.list_subunits()
print(t2.get_boundingbox())
t3=Topology(iso_country="US")
t3.list_subunits()
print(t3.get_boundingbox())
if __name__ == "__main__":
main()