Source code for ideuy.query

import json
import logging
from fnmatch import fnmatch
from itertools import zip_longest, islice

import requests
from shapely.ops import transform

from ideuy.vector import get_vector_bounds_and_crs, reproject_shape, flip

HOSTNAME = "https://visualizador.ide.uy"
SERVICE_PATH = "/geonetwork/srv/eng/q"
SERVICE_URL = f"{HOSTNAME}{SERVICE_PATH}"
MAX_PAGE_SIZE = 100
DEFAULT_CRS = 'epsg:4326'
DEFAULT_PARAMS = {
    "_content_type": "json",
    "bucket": "s101",
    "fast": "index",
    "resultType": "details",
    "sortBy": "relevance"
}

_logger = logging.getLogger(__name__)


[docs]def query(query=None, aoi=None, limit=None, categories=[], file_filters=[]): if not categories: categories = [] params = {**DEFAULT_PARAMS, 'facet.q': '&'.join(categories)} if query: params['title'] = f'{query}*' if aoi: # TODO: Query for each feature geometry bounds in AOI file... bounds, crs = get_vector_bounds_and_crs(aoi) if crs != DEFAULT_CRS: # If crs is not the default one, reproject bounds = reproject_shape(bounds, crs, DEFAULT_CRS) # Flip (latitude,longitude) because the web service expects it the other way... bounds = transform(flip, bounds) params['geometry'] = bounds.wkt gen = query_all_pages(params) if limit: gen = islice(gen, limit) products = build_products(gen) return products
[docs]def build_products(raw_products): res = [] for result in raw_products: files = [] # Build list of downloadable files in product links = result['link'] # Make sure links is a list (e.g. when there is only 1 link) if not isinstance(links, list): links = [links] for link in links: parts = link.split("|") link_id, name, url = parts[0], parts[1], parts[2] # Replace file:// URL for current https static assets URL if url.startswith('file://'): url = url.replace("file:///opt/", f"{HOSTNAME}/") files.append(dict(id=link_id, name=name, url=url)) res.append(dict(**result, __files=files)) return res
[docs]def filter_products_by_files(products, file_filters=[]): res = [] for product in products: files = [] # For each file filter, add filtered files to new files list for filt in file_filters: key, pattern = filt.split('/') files.extend( [f for f in product['__files'] if fnmatch(f[key], pattern)]) # Only return product if it has any file, after filtering if files: product['__files'] = files res.append(product) return res
[docs]def grouper(iterable, n, fillvalue=None): "Collect data into fixed-length chunks or blocks" # grouper('ABCDEFG', 3, 'x') --> ABC DEF Gxx" args = [iter(iterable)] * n return zip_longest(*args, fillvalue=fillvalue)
[docs]def query_all_pages(params): """Generates results for all pages""" i = 1 while True: page_params = {**params, 'from': i, 'to': (i + MAX_PAGE_SIZE - 1)} _logger.info(f"Query: {page_params}") res = requests.get(SERVICE_URL, params=page_params) if not res.ok: raise RuntimeError( "Status code: {res.status_code}. Response: {res.content}") body = json.loads(res.content) metadata = body.get('metadata', []) # Make sure metadata is a list (e.g. when there is only 1 result) if not isinstance(metadata, list): metadata = [metadata] for row in metadata: yield row # If page results count is less than max page size, # this is the last page, so return: if len(metadata) < MAX_PAGE_SIZE: return # Otherwise, increment item_from and item_to to query next page i += MAX_PAGE_SIZE