#!/usr/bin/env python3 ''' This is a program to get a url from Google ee api based on supplied region''' import logging import stat import os from os.path import exists import requests import shutil from PIL import Image, ImageOps from io import BytesIO import ee import sys import argparse import time import pycountry from country_bounding_boxes import country_subunits_by_iso_code from country_iso2ify import get_resolver resolver = get_resolver() def create_elevation_file_from_region_name(target_region_name, target_filename): ee.Initialize(project="arizona-topo") elv = ee.Image('USGS/SRTMGL1_003') region_of_interest = ee.Geometry.BBox(b_list[0], b_list[1], b_list[2], b_list[3]) countries = ee.FeatureCollection('FAO/GAUL/2015/level1').select('ADM0_NAME') # logging.debug("Countries are [{}]".format(countries)) states = ee.FeatureCollection('FAO/GAUL/2015/level1').select('ADM1_NAME') target_boundary = countries.filter(ee.Filter.eq('ADM0_NAME', target_region_name)) # logging.debug("target_boundary is [{}]".format(target_boundary)) elevation_image = elv.updateMask(elv.gt(0)) elevation_clip = elevation_image.clip(target_boundary) url = elevation_clip.getThumbUrl({ 'min': -300, 'max': 8500, 'region': region_of_interest, 'dimensions': 1024, 'crs': 'EPSG:4326', 'fileFormat': 'GeoTIFF', }) logging.debug("url is [{}]".format(url)) print("Downloading image from [{}]".format(url)) response = requests.get(url, stream=True) with open(target_filename, 'wb') as out_file: shutil.copyfileobj(response.raw, out_file) del response def list_subunits_and_exit(country_name): iso_code = resolver.resolve(country_name) countries = country_subunits_by_iso_code(iso_code) logging.debug("Country is [{}] iso code is [{}]".format(country_name,iso_code)) item = None if countries: for item in countries: print("Subunit of [{}]: [{}]".format(country_name,item.subunit)) sys.exit(0) def get_mainland_bbox(country_name, region_name): # Returns the main continental/primary bounding box iso_code = resolver.resolve(country_name) countries = country_subunits_by_iso_code(iso_code) logging.debug("Country is [{}] iso code is [{}]".format(country_name,iso_code)) # The library is designed to return the main body of the country # in the first result or by filtering for largest area. item = None if countries: for item in countries: logging.debug(item.subunit+str(item.bbox)) item = None countries = country_subunits_by_iso_code(iso_code) if countries and region_name == 'None': for item in countries: if item.subunit == country_name: return item.bbox else: if countries: for item in countries: if item.subunit == region_name: return item.bbox return None class Timerlog: def __init__(self, name): self.time_start = 0.0 self.time_end = 0.0 self.name = name def start(self): self.time_start = time.perf_counter() return self def end(self): self.time_end = time.perf_counter() return self def report(self, duration_only=False): if duration_only: return "Timer [{}] duration [{:.5f}]".format( self.name, self.time_end - self.time_start) return "Timer [{}] begin [{:.5f}] end [{:.5f}] duration [{:.9f}]".format( self.name, self.time_start, self.time_end, self.time_end - self.time_start) program_name = 'get_ee_from_region.py' parser = argparse.ArgumentParser( prog=program_name, description='get a url from google ee api based on supplied region, optionally list subregions of a country', epilog='Good luck') parser.add_argument( "-l", "--logging", dest='logging', metavar='log_file', required=False, action='store', help="log to file") parser.add_argument( "-L", "--list-subunits", dest='list_subunits', action='count', default=0, required=False, help="list country subunits and exit") parser.add_argument( "-v", "--verbose", dest='verbose', default=True, required=False, action='store_true', help="verbose mode, log output goes to stderr") parser.add_argument( "-O", "--override-existing", dest='override_existing', action='count', default=0, required=False, help="will not refetch an existing elevation image file unless this option is specified") parser.add_argument( "-c", "--country", dest='country', metavar='country', default='United States', action='store', help="country to get topo image for") parser.add_argument( "-r", "--region", dest='region', metavar='region', default='None', action='store', help="subregion to get, default is to get entire country") args = parser.parse_args() if args.logging or args.verbose: handlers = [] if args.logging: file_handler = logging.FileHandler(filename=args.logging) handlers.append(file_handler) if args.verbose: stderr_handler = logging.StreamHandler(stream=sys.stderr) handlers.append(stderr_handler) logging.basicConfig( encoding='utf-8', handlers=handlers, format='%(asctime)s.%(msecs)03d %(levelname)s {%(module)s} [%(funcName)s] %(message)s', datefmt='%Y-%m-%d,%H:%M:%S', level=logging.DEBUG) if args.logging: logging.debug('logging to log_file requested') if args.verbose: logging.debug('logging to stderr requested') logging.debug('list_subunits is [{}]'.format(args.list_subunits)) if args.list_subunits > 0: list_subunits_and_exit(args.country) t = Timerlog("atomic test").start() logging.debug(t.end().report()) logging.debug("Country is [{}]".format(args.country)) logging.debug("Region is [{}]".format(args.region)) iso_code = resolver.resolve(args.country) logging.debug("iso_code for [{}] is [{}]".format(args.country, iso_code)) target_region_name="{}{}{}".format(args.country, "" if args.region == 'None' else "-", "" if args.region == 'None' else args.region) bbox=get_mainland_bbox(args.country, args.region) logging.debug("BBOX for [{}] as tuple is {}".format(target_region_name, bbox)) b_list = list(bbox) logging.debug("BBOX for [{}] as list is {}".format(target_region_name, b_list)) target_filename = target_region_name + ".png" if os.path.exists(target_filename): logging.debug("The file [{}] already exists.".format(target_filename)) if args.override_existing == 0: print("The file [{}] already exists. -O flag not specified. Using existing file.".format(target_filename)) else: logging.debug("The file [{}] does not exist.".format(target_filename)) create_elevation_file_from_region_name(target_region_name, target_filename) #pil_image = Image.open(target_filename) import numpy as np import pandas as pd import cairocffi as cairo # 1. Load Image and get Max Values in a 5x7 Grid def get_image_grid_max(image_path, grid_size=(15,15)): # Open image and convert to grayscale for pixel intensity #img = ImageOps.expand(Image.open(image_path).convert('L'), border=200,fill=0) orig=Image.open(image_path).convert('L') logging.debug('orig image size is [{}]'.format(orig.size)) img= Image.new(mode='L', size=(1224,1224), color=0) logging.debug('background image size is [{}]'.format(img.size)) img.paste(orig, ( (1224-orig.size[0])//2, (1224-orig.size[1])//2) ) img.save("frame.png") width, height = img.size grid_w, grid_h = grid_size # Calculate cell dimensions cell_w = width // grid_w cell_h = height // grid_h # Calculate max values for each cell max_values = np.zeros((grid_h, grid_w)) for r in range(grid_h): for c in range(grid_w): # Define box (left, upper, right, lower) box = (c * cell_w, r * cell_h, (c + 1) * cell_w, (r + 1) * cell_h) cell = img.crop(box) max_values[r, c] = np.mean(np.array(cell)) return max_values # 2. Create 3D Line Chart using Cairo (No Matplotlib) def create_3d_line_chart_svg(data, output_path, svg_size=(800, 800)): surface = cairo.SVGSurface(output_path, svg_size[0], svg_size[1]) cr = cairo.Context(surface) # Background cr.set_source_rgb(1, 1, 1) cr.paint() # Simple Orthographic Projection Matrix (pseudo-3D) # Projects (x,y,z) -> (x+z/2, y+z/2) def project(x, y, z, xscale=20, yscale=10, offset=(0, 0)): shelf = 0 if z == 0 else z+2 shelf = z return (offset[0] + x * xscale, offset[1] + y * yscale + shelf) #return (offset[0] + x * scale + z * 2, offset[1] + y * scale + z * 2) rows, cols = data.shape max_val = data.max() # Normalize data for visualization height (0-100) norm_data = (data / max_val) * 100 # Draw Lines along rows cr.set_source_rgb(0, 0, 0) # Black lines cr.set_line_width(1.0) x=0 y=0 for r in range(rows): for c in range(cols): z = norm_data[r, c] x, y = project(c, r, -z, xscale=800/cols, yscale=800/rows) # -z to make higher values go "up" in 2D if c == 0: cr.move_to(x, y+50) cr.line_to(x, y) else: cr.line_to(x, y) cr.line_to(x, y + 50) cr.close_path() cr.set_source_rgb(1,1,1) cr.fill_preserve() cr.set_source_rgb(0,0,0) cr.set_line_width(1.0) cr.stroke() if False: # Draw Lines along columns for c in range(cols): for r in range(rows): z = norm_data[r, c] x, y = project(c, r, -z) if r == 0: cr.move_to(x, y) else: cr.line_to(x, y) #cr.stroke() surface.finish() # Process t = Timerlog("Gridify png file").start() grid_max = get_image_grid_max(target_filename, grid_size=(80,80)) print("Grid Max Values:\n", grid_max) logging.debug(t.end().report()) # Plot t = Timerlog("Convert to SVG").start() create_3d_line_chart_svg(grid_max, target_region_name+".svg", svg_size=(800,800)) print("SVG file created: "+ target_region_name+".svg") logging.debug(t.end().report())