toposhirt/get_ee_from_region.py

312 lines
9.4 KiB
Python
Executable File

#!/usr/bin/env python3
''' This is a program to get a url from Google ee api based on supplied region'''
import logging
import stat
import os
from os.path import exists
import requests
import shutil
from PIL import Image
from io import BytesIO
import ee
import sys
import argparse
import time
import pycountry
from country_bounding_boxes import country_subunits_by_iso_code
from country_iso2ify import get_resolver
resolver = get_resolver()
def create_elevation_file_from_region_name(target_region_name, target_filename):
ee.Initialize(project="arizona-topo")
elv = ee.Image('USGS/SRTMGL1_003')
region_of_interest = ee.Geometry.BBox(b_list[0],
b_list[1],
b_list[2],
b_list[3])
countries = ee.FeatureCollection('FAO/GAUL/2015/level1').select('ADM0_NAME')
# logging.debug("Countries are [{}]".format(countries))
states = ee.FeatureCollection('FAO/GAUL/2015/level1').select('ADM1_NAME')
target_boundary = countries.filter(ee.Filter.eq('ADM0_NAME', target_region_name))
# logging.debug("target_boundary is [{}]".format(target_boundary))
elevation_image = elv.updateMask(elv.gt(0))
elevation_clip = elevation_image.clip(target_boundary)
url = elevation_clip.getThumbUrl({
'min': -300, 'max': 8500, 'region': region_of_interest, 'dimensions': 1024,
'crs': 'EPSG:4326',
'fileFormat': 'GeoTIFF',
})
logging.debug("url is [{}]".format(url))
print("Downloading image from [{}]".format(url))
response = requests.get(url, stream=True)
with open(target_filename, 'wb') as out_file:
shutil.copyfileobj(response.raw, out_file)
del response
def list_subunits_and_exit(country_name):
iso_code = resolver.resolve(country_name)
countries = country_subunits_by_iso_code(iso_code)
logging.debug("Country is [{}] iso code is [{}]".format(country_name,iso_code))
item = None
if countries:
for item in countries:
print("Subunit of [{}]: [{}]".format(country_name,item.subunit))
sys.exit(0)
def get_mainland_bbox(country_name, region_name):
# Returns the main continental/primary bounding box
iso_code = resolver.resolve(country_name)
countries = country_subunits_by_iso_code(iso_code)
logging.debug("Country is [{}] iso code is [{}]".format(country_name,iso_code))
# The library is designed to return the main body of the country
# in the first result or by filtering for largest area.
item = None
if countries:
for item in countries:
logging.debug(item.subunit+str(item.bbox))
item = None
countries = country_subunits_by_iso_code(iso_code)
if countries and region_name == 'None':
for item in countries:
if item.subunit == country_name:
return item.bbox
else:
if countries:
for item in countries:
if item.subunit == region_name:
return item.bbox
return None
class Timerlog:
def __init__(self, name):
self.time_start = 0.0
self.time_end = 0.0
self.name = name
def start(self):
self.time_start = time.perf_counter()
return self
def end(self):
self.time_end = time.perf_counter()
return self
def report(self, duration_only=False):
if duration_only:
return "Timer [{}] duration [{:.5f}]".format(
self.name, self.time_end - self.time_start)
return "Timer [{}] begin [{:.5f}] end [{:.5f}] duration [{:.9f}]".format(
self.name, self.time_start, self.time_end, self.time_end - self.time_start)
program_name = 'get_ee_from_region.py'
parser = argparse.ArgumentParser(
prog=program_name,
description='get a url from google ee api based on supplied region, optionally list subregions of a country',
epilog='Good luck')
parser.add_argument(
"-l",
"--logging",
dest='logging',
metavar='log_file',
required=False,
action='store',
help="log to file")
parser.add_argument(
"-L",
"--list-subunits",
dest='list_subunits',
action='count',
default=0,
required=False,
help="list country subunits and exit")
parser.add_argument(
"-v",
"--verbose",
dest='verbose',
default=True,
required=False,
action='store_true',
help="verbose mode, log output goes to stderr")
parser.add_argument(
"-O",
"--override-existing",
dest='override_existing',
action='count',
default=0,
required=False,
help="will not refetch an existing elevation image file unless this option is specified")
parser.add_argument(
"-c",
"--country",
dest='country',
metavar='country',
default='United States',
action='store',
help="country to get topo image for")
parser.add_argument(
"-r",
"--region",
dest='region',
metavar='region',
default='None',
action='store',
help="subregion to get, default is to get entire country")
args = parser.parse_args()
if args.logging or args.verbose:
handlers = []
if args.logging:
file_handler = logging.FileHandler(filename=args.logging)
handlers.append(file_handler)
if args.verbose:
stderr_handler = logging.StreamHandler(stream=sys.stderr)
handlers.append(stderr_handler)
logging.basicConfig(
encoding='utf-8',
handlers=handlers,
format='%(asctime)s.%(msecs)03d %(levelname)s {%(module)s} [%(funcName)s] %(message)s',
datefmt='%Y-%m-%d,%H:%M:%S',
level=logging.DEBUG)
if args.logging:
logging.debug('logging to log_file requested')
if args.verbose:
logging.debug('logging to stderr requested')
logging.debug('list_subunits is [{}]'.format(args.list_subunits))
if args.list_subunits > 0:
list_subunits_and_exit(args.country)
t = Timerlog("atomic test").start()
logging.debug(t.end().report())
logging.debug("Country is [{}]".format(args.country))
logging.debug("Region is [{}]".format(args.region))
iso_code = resolver.resolve(args.country)
logging.debug("iso_code for [{}] is [{}]".format(args.country, iso_code))
target_region_name="{}{}{}".format(args.country,
"" if args.region == 'None' else "-",
"" if args.region == 'None' else args.region)
bbox=get_mainland_bbox(args.country, args.region)
logging.debug("BBOX for [{}] as tuple is {}".format(target_region_name, bbox))
b_list = list(bbox)
logging.debug("BBOX for [{}] as list is {}".format(target_region_name, b_list))
target_filename = target_region_name + ".png"
if os.path.exists(target_filename):
logging.debug("The file [{}] already exists.".format(target_filename))
if args.override_existing == 0:
print("The file [{}] already exists. -O flag not specified. Using existing file.".format(target_filename))
else:
logging.debug("The file [{}] does not exist.".format(target_filename))
create_elevation_file_from_region_name(target_region_name, target_filename)
#pil_image = Image.open(target_filename)
import numpy as np
import pandas as pd
import cairocffi as cairo
# 1. Load Image and get Max Values in a 5x7 Grid
def get_image_grid_max(image_path, grid_size=(5, 7)):
# Open image and convert to grayscale for pixel intensity
img = Image.open(image_path).convert('L')
width, height = img.size
grid_w, grid_h = grid_size
# Calculate cell dimensions
cell_w = width // grid_w
cell_h = height // grid_h
# Calculate max values for each cell
max_values = np.zeros((grid_h, grid_w))
for r in range(grid_h):
for c in range(grid_w):
# Define box (left, upper, right, lower)
box = (c * cell_w, r * cell_h, (c + 1) * cell_w, (r + 1) * cell_h)
cell = img.crop(box)
max_values[r, c] = np.max(np.array(cell))
return max_values
# 2. Create 3D Line Chart using Cairo (No Matplotlib)
def create_3d_line_chart_svg(data, output_path, svg_size=(800, 600)):
surface = cairo.SVGSurface(output_path, svg_size[0], svg_size[1])
cr = cairo.Context(surface)
# Background
cr.set_source_rgb(1, 1, 1)
cr.paint()
# Simple Orthographic Projection Matrix (pseudo-3D)
# Projects (x,y,z) -> (x+z/2, y+z/2)
def project(x, y, z, scale=20, offset=(100, 100)):
return (offset[0] + x * scale + z * 2, offset[1] + y * scale + z * 2)
rows, cols = data.shape
max_val = data.max()
# Normalize data for visualization height (0-100)
norm_data = (data / max_val) * 100
# Draw Lines along rows
cr.set_source_rgb(0, 0, 0) # Black lines
cr.set_line_width(1.0)
for r in range(rows):
for c in range(cols):
z = norm_data[r, c]
x, y = project(c, r, -z) # -z to make higher values go "up" in 2D
if c == 0:
cr.move_to(x, y)
else:
cr.line_to(x, y)
# Draw Lines along columns
for c in range(cols):
for r in range(rows):
z = norm_data[r, c]
x, y = project(c, r, -z)
if r == 0:
cr.move_to(x, y)
else:
cr.line_to(x, y)
cr.stroke()
surface.finish()
# Process
grid_max = get_image_grid_max(target_filename, grid_size=(25, 27))
print("Grid Max Values:\n", grid_max)
# Plot
create_3d_line_chart_svg(grid_max, target_region_name+".svg")
print("SVG file created: "+ target_region_name+".svg")