396 lines
16 KiB
Python
396 lines
16 KiB
Python
import os
|
|
import shutil
|
|
import sys
|
|
from PIL import Image, ImageOps
|
|
from country_iso2ify import get_resolver
|
|
import pycountry
|
|
from country_bounding_boxes import country_subunits_by_iso_code
|
|
import requests
|
|
import ee
|
|
import numpy as np
|
|
import pandas as pd
|
|
import cairocffi as cairo
|
|
import math
|
|
|
|
class Topology:
|
|
def __init__(self,
|
|
country=None,
|
|
iso_country=None,
|
|
subregion_mode=False,
|
|
refetch_elevation_data=False,
|
|
subregion=None,
|
|
linemap_size=(800,800),
|
|
linemap_rows=30,
|
|
linemap_cols=30,
|
|
linemap_width=None,
|
|
linemap_roundcap=True,
|
|
linemap_transparent=False,
|
|
storage_directory=None,
|
|
):
|
|
self.country = country
|
|
self.refetch_elevation_data = refetch_elevation_data
|
|
self.iso_country = iso_country
|
|
self.subregion = subregion
|
|
self.linemap_size = linemap_size
|
|
self.linemap_xscale = 1.0 * self.linemap_size[0] / linemap_cols
|
|
self.linemap_yscale = 1.0 * self.linemap_size[1] / linemap_rows
|
|
self.linemap_rows = linemap_rows
|
|
self.linemap_cols = linemap_cols
|
|
if linemap_width is None:
|
|
self.linemap_width = linemap_size[1] / 400
|
|
else:
|
|
self.linemap_width = linemap_width
|
|
self.linemap_roundcap = linemap_roundcap
|
|
self.linemap_transparent = linemap_transparent
|
|
self.subregion_mode = subregion_mode
|
|
self.target_region_name = None
|
|
self.elevation_filename = None
|
|
self.storage_directory = storage_directory
|
|
|
|
if self.country is None and self.iso_country is None:
|
|
self.country = "Switzerland"
|
|
if self.country is None:
|
|
self.country = pycountry.countries.get(alpha_2=self.iso_country).name
|
|
self.subregion = self.country
|
|
if self.iso_country is None:
|
|
self.iso_country = get_resolver().resolve(self.country)
|
|
if self.subregion_mode is False:
|
|
self.target_region_name=self.country
|
|
self.subregion = self.country
|
|
else:
|
|
self.target_region_name=self.subregion
|
|
self.elevation_filename = self.storage_directory + "/" + self.target_region_name+".png"
|
|
print("Elevation file name is [{}]".format(self.elevation_filename))
|
|
print("requested linemap size is {}, xscale is [{}], yscale is [{}]".format(self.linemap_size, self.linemap_xscale, self.linemap_yscale))
|
|
|
|
|
|
def list_subunits(self):
|
|
countries = country_subunits_by_iso_code(self.iso_country)
|
|
print("Country is [{}] iso code is [{}]".format(self.country, self.iso_country))
|
|
item = None
|
|
if countries:
|
|
for item in countries:
|
|
print("Subunit of [{}]: [{}]".format(self.country, item.subunit))
|
|
|
|
def get_boundingbox(self):
|
|
countries = country_subunits_by_iso_code(self.iso_country)
|
|
print("bbox--Country is [{}] iso code is [{}] subregion is [{}]".format(self.country, self.iso_country, self.subregion))
|
|
# The library is designed to return the main body of the country
|
|
# in the first result or by filtering for largest area.
|
|
item = None
|
|
countries = country_subunits_by_iso_code(self.iso_country)
|
|
if countries and self.subregion is None:
|
|
for item in countries:
|
|
print(self.subregion)
|
|
if item.subunit == self.subregion:
|
|
return item.bbox
|
|
else:
|
|
if countries:
|
|
for item in countries:
|
|
if item.subunit == self.subregion:
|
|
return item.bbox
|
|
return None
|
|
|
|
def create_elevation_file(self):
|
|
ee.Initialize(project="arizona-topo")
|
|
elv = ee.Image('USGS/SRTMGL1_003')
|
|
boundingbox = self.get_boundingbox()
|
|
region_of_interest = ee.Geometry.BBox(boundingbox[0],
|
|
boundingbox[1],
|
|
boundingbox[2],
|
|
boundingbox[3])
|
|
countries = ee.FeatureCollection('FAO/GAUL/2015/level1').select('ADM0_NAME')
|
|
# logging.debug("Countries are [{}]".format(countries))
|
|
states = ee.FeatureCollection('FAO/GAUL/2015/level1').select('ADM1_NAME')
|
|
|
|
target_boundary = countries.filter(ee.Filter.eq('ADM0_NAME', self.target_region_name))
|
|
elevation_image = elv.updateMask(elv.gt(0))
|
|
elevation_clip = elevation_image.clip(target_boundary)
|
|
url = elevation_clip.getThumbUrl({
|
|
'min': -300, 'max': 8500, 'region': region_of_interest, 'dimensions': 1024,
|
|
'crs': 'EPSG:4326',
|
|
'fileFormat': 'GeoTIFF',
|
|
})
|
|
print("url is [{}]".format(url))
|
|
print("Downloading image from [{}]".format(url))
|
|
response = requests.get(url, stream=True)
|
|
with open(self.elevation_filename, 'wb') as out_file:
|
|
shutil.copyfileobj(response.raw, out_file)
|
|
del response
|
|
|
|
def convert_png_to_data(self,rotation=None):
|
|
elevation_data = Image.open(self.elevation_filename).convert('L')
|
|
|
|
frame_image = Image.new(mode='L', size=(1224,1224), color=0)
|
|
frame_image.paste(elevation_data,
|
|
((1224 - elevation_data.size[0]) // 2,
|
|
(1224 - elevation_data.size[1]) // 2))
|
|
frame_image.save(self.elevation_filename+".frame.png")
|
|
|
|
if rotation is None:
|
|
elevation_data = frame_image
|
|
else:
|
|
elevation_data = frame_image.rotate(rotation, expand=False)
|
|
|
|
width, height = elevation_data.size
|
|
print(width)
|
|
print(height)
|
|
grid_w, grid_h = self.linemap_rows, self.linemap_cols
|
|
|
|
# Calculate cell dimensions
|
|
cell_w = 1.0 * width / grid_w
|
|
cell_h = 1.0 * height / grid_h
|
|
print("Cell width is w,h [{},{}]".format(cell_w, cell_h))
|
|
|
|
# Calculate mean values for each cell
|
|
# future improve -- only average non-zero values to get coastlines and edges right
|
|
# need to use xscale yscale to get cell window, adjust for rounding/floor error
|
|
values = np.zeros((grid_h, grid_w))
|
|
for r in range(grid_h):
|
|
for c in range(grid_w):
|
|
# Define box (left, upper, right, lower)
|
|
box = (int(c * cell_w),
|
|
int(r * cell_h),
|
|
int((c + 1) * cell_w),
|
|
int((r + 1) * cell_h)
|
|
)
|
|
cell = elevation_data.crop(box)
|
|
this_array = np.array(cell)
|
|
if np.max(this_array) == 0:
|
|
values[r,c] = 0
|
|
else:
|
|
values[r, c] = np.mean(this_array[this_array != 0])
|
|
return values
|
|
|
|
def create_cylinder_svg_file(self, rotation=None,file_index=None):
|
|
|
|
# subfunction to draw an array, options are line y/n, fill y/n, line and fill color
|
|
def svg_draw_cylinder(svg_context: cairo.Context, x1,y1,x2,y2,gradient=None):
|
|
circlew = self.linemap_xscale / 3
|
|
halfcirclew = circlew / 2
|
|
svg_context.set_source_rgb(1, 1, 1)
|
|
svg_context.arc(x1,y1,circlew,0,2*math.pi)
|
|
svg_context.stroke()
|
|
|
|
svg_context.arc(x2,y2,circlew, 0,2*math.pi)
|
|
svg_context.stroke()
|
|
|
|
svg_context.move_to(x1-1,y1-1)
|
|
svg_context.rectangle(x1-1,y1-1,x1+1,y2+1)
|
|
svg_context.stroke()
|
|
# if no refetch, check if file exists
|
|
if os.path.exists(self.elevation_filename):
|
|
print("Elevation file exists")
|
|
if self.refetch_elevation_data:
|
|
print("Refetching elevation data requested")
|
|
self.create_elevation_file()
|
|
else:
|
|
print("Elevation file does not exist, fetching")
|
|
self.create_elevation_file()
|
|
# get png data to cols x rows grid
|
|
values = self.convert_png_to_data(rotation)
|
|
|
|
svg_filename = self.storage_directory + "/" + self.target_region_name+".svg"
|
|
if rotation is not None:
|
|
rotation_string = f"{rotation:05.2f}"
|
|
svg_filename = self.storage_directory + "/" + self.target_region_name + ".r" + rotation_string + ".svg"
|
|
if file_index is not None:
|
|
# assume this is an integer, convert to a string with %05d
|
|
file_index_string = f"{file_index:05d}"
|
|
svg_filename = (self.storage_directory
|
|
+ "/" + self.target_region_name
|
|
+ ".rot_"
|
|
+ rotation_string
|
|
+ "_degrees."
|
|
+ file_index_string
|
|
+ ".svg")
|
|
|
|
# now make a line drawing of it
|
|
surface = cairo.SVGSurface(svg_filename,
|
|
self.linemap_size[0],
|
|
self.linemap_size[1])
|
|
cr = cairo.Context(surface)
|
|
|
|
# Background
|
|
#cr.set_source_rgb(1, 1, 1)
|
|
cr.set_source_rgb(0, 0, 0)
|
|
cr.paint()
|
|
|
|
# Simple Orthographic Projection Matrix (pseudo-3D)
|
|
# Projects (x,y,z) -> (x+z/2, y+z/2)
|
|
# max height should be no more than 3 times the distance between rows, this could be a setting
|
|
def project(x, y, z):
|
|
return (x * self.linemap_xscale, y * self.linemap_yscale - (z if z > 0 else (-1.5 * self.linemap_yscale)))
|
|
# return (offset[0] + x * scale + z * 2, offset[1] + y * scale + z * 2)
|
|
|
|
rows, cols = values.shape
|
|
max_val = values.max()
|
|
|
|
# Normalize data for visualization height (0-100) -- max is no more than three row heights
|
|
norm_data = (values / max_val) * self.linemap_xscale * 3
|
|
|
|
# Draw Lines along rows
|
|
cr.set_source_rgb(0, 0, 0) # Black lines
|
|
cr.set_line_width(self.linemap_width)
|
|
cr.set_line_cap(cairo.LINE_CAP_ROUND)
|
|
cr.set_line_join(cairo.LINE_JOIN_ROUND)
|
|
x = 0
|
|
y = 0
|
|
|
|
if True:
|
|
for r in range(rows):
|
|
draw_list = []
|
|
for c in range(cols):
|
|
z = norm_data[r,c]
|
|
x,y = project(c, r, z) # now need to make array of x,y vals and pass to drawing func
|
|
draw_list.append((x,y))
|
|
svg_draw_cylinder(cr,x,y,x,y-z)
|
|
print("***" + str(draw_list))
|
|
#svg_draw_list(cr, draw_list, line_color="Black", fill_shape=True)
|
|
|
|
|
|
surface.write_to_png(svg_filename.replace(".svg", ".topo.png"))
|
|
surface.finish()
|
|
|
|
def create_svg_file(self, rotation=None, file_index=None):
|
|
|
|
# subfunction to draw an array, options are line y/n, fill y/n, line and fill color
|
|
def svg_draw_list(svg_context, draw_list, line_color="White",fill_shape=False):
|
|
if line_color == "White":
|
|
svg_context.set_source_rgb(1, 1, 1)
|
|
if line_color == "Black":
|
|
svg_context.set_source_rgb(0, 0, 0)
|
|
|
|
for item in range(len(draw_list)):
|
|
if item == 0:
|
|
svg_context.move_to(draw_list[item][0],draw_list[item][1])
|
|
else:
|
|
svg_context.line_to(draw_list[item][0],draw_list[item][1])
|
|
if fill_shape:
|
|
svg_context.close_path()
|
|
svg_context.fill_preserve()
|
|
svg_context.stroke()
|
|
|
|
|
|
# if no refetch, check if file exists
|
|
if os.path.exists(self.elevation_filename):
|
|
print("Elevation file exists")
|
|
if self.refetch_elevation_data:
|
|
print("Refetching elevation data requested")
|
|
self.create_elevation_file()
|
|
else:
|
|
print("Elevation file does not exist, fetching")
|
|
self.create_elevation_file()
|
|
# get png data to cols x rows grid
|
|
values = self.convert_png_to_data(rotation)
|
|
|
|
svg_filename = self.storage_directory + "/" + self.target_region_name+".svg"
|
|
if rotation is not None:
|
|
rotation_string = f"{rotation:05.2f}"
|
|
svg_filename = self.storage_directory + "/" + self.target_region_name + ".r" + rotation_string + ".svg"
|
|
if file_index is not None:
|
|
# assume this is an integer, convert to a string with %05d
|
|
file_index_string = f"{file_index:05d}"
|
|
svg_filename = (self.storage_directory
|
|
+ "/" + self.target_region_name
|
|
+ ".rot_"
|
|
+ rotation_string
|
|
+ "_degrees."
|
|
+ file_index_string
|
|
+ ".svg")
|
|
|
|
# now make a line drawing of it
|
|
surface = cairo.SVGSurface(svg_filename,
|
|
self.linemap_size[0],
|
|
self.linemap_size[1])
|
|
cr = cairo.Context(surface)
|
|
|
|
# Background
|
|
#cr.set_source_rgb(1, 1, 1)
|
|
cr.set_source_rgb(0, 0, 0)
|
|
cr.paint()
|
|
|
|
# Simple Orthographic Projection Matrix (pseudo-3D)
|
|
# Projects (x,y,z) -> (x+z/2, y+z/2)
|
|
# max height should be no more than 3 times the distance between rows, this could be a setting
|
|
def project(x, y, z):
|
|
return (x * self.linemap_xscale, y * self.linemap_yscale - (z if z > 0 else (-1.5 * self.linemap_yscale)))
|
|
# return (offset[0] + x * scale + z * 2, offset[1] + y * scale + z * 2)
|
|
|
|
rows, cols = values.shape
|
|
max_val = values.max()
|
|
|
|
# Normalize data for visualization height (0-100) -- max is no more than three row heights
|
|
norm_data = (values / max_val) * self.linemap_xscale * 3
|
|
|
|
# Draw Lines along rows
|
|
cr.set_source_rgb(0, 0, 0) # Black lines
|
|
cr.set_line_width(self.linemap_width)
|
|
cr.set_line_cap(cairo.LINE_CAP_ROUND)
|
|
cr.set_line_join(cairo.LINE_JOIN_ROUND)
|
|
x = 0
|
|
y = 0
|
|
|
|
if True:
|
|
for r in range(rows):
|
|
draw_list = []
|
|
for c in range(cols):
|
|
z = norm_data[r,c]
|
|
x,y = project(c, r, z) # now need to make array of x,y vals and pass to drawing func
|
|
draw_list.append((x,y))
|
|
print("***" + str(draw_list))
|
|
svg_draw_list(cr, draw_list, line_color="Black", fill_shape=True)
|
|
svg_draw_list(cr, draw_list, line_color="White", fill_shape=False)
|
|
|
|
if False:
|
|
for r in range(rows):
|
|
for c in range(cols):
|
|
z = norm_data[r, c]
|
|
x, y = project(c, r, z) # -z to make higher values go "up" in 2D
|
|
if c == 0:
|
|
cr.move_to(x, y + 50)
|
|
cr.line_to(x, y)
|
|
else:
|
|
cr.line_to(x, y)
|
|
cr.line_to(x, y + 50)
|
|
cr.close_path()
|
|
cr.set_source_rgb(0, 0, 0)
|
|
cr.fill_preserve()
|
|
cr.set_source_rgb(1, 1, 1)
|
|
cr.set_line_width(self.linemap_width)
|
|
cr.stroke()
|
|
|
|
|
|
surface.write_to_png(svg_filename.replace(".svg", ".topo.png"))
|
|
surface.finish()
|
|
|
|
def main():
|
|
topology = Topology(country="Switzerland",
|
|
storage_directory="/Users/he/PycharmProjects/toposhirt/outputfiles",
|
|
subregion_mode=False,
|
|
linemap_cols=50,
|
|
linemap_rows=50,
|
|
linemap_size=(1080,1080),
|
|
linemap_width=0.3)
|
|
topology.list_subunits()
|
|
print(topology.get_boundingbox())
|
|
#topology.create_svg_file()
|
|
topology.create_cylinder_svg_file()
|
|
#for r in range(4*360):
|
|
#print("Rotating by [{}] degrees".format(r/4.0))
|
|
#topology.create_svg_file(rotation=(r / 4.0), file_index=r)
|
|
|
|
sys.exit(0)
|
|
|
|
t2=Topology(country="United Kingdom",
|
|
subregion_mode=True,
|
|
subregion="Wales")
|
|
t2.list_subunits()
|
|
print(t2.get_boundingbox())
|
|
|
|
t3=Topology(iso_country="US")
|
|
t3.list_subunits()
|
|
print(t3.get_boundingbox())
|
|
|
|
if __name__ == "__main__":
|
|
main() |