#!/usr/bin/env python # encoding: utf-8 """ $Id: adl_post_process.py 2496 2015-12-23 17:53:05Z scottm $ Purpose: Run post prosses on products produced by ADL public methods: repack_products aggregate_products Copyright (c) 2011 University of Wisconsin Regents. Licensed under GNU GPLv3. """ import os, sys, logging, glob, traceback import xml.etree.ElementTree as ET from adl_common import sh, env, status_line from adl_common import COMMON_LOG_CHECK_TABLE, DDS_PRODUCT_FILE, CSPP_RT_HOME, check_existing_env_var import adl_geo_ref from subprocess import CalledProcessError from multiprocessing import Pool, Lock, Value, cpu_count LOG = logging.getLogger('adl_post_process') # these variables can be initialized from the environment but default to expected values NAGG = os.path.join(CSPP_RT_HOME, 'common', 'local', 'bin', 'nagg') REPACK = os.path.join(CSPP_RT_HOME, 'common', 'local', 'bin', 'h5repack') DPE_DOMAIN = check_existing_env_var('DPE_DOMAIN', default_value='dev') DPE_SITE_ID = check_existing_env_var('DPE_SITE_ID', default_value='cspp') #try: # DPE_SITE_ID=os.environ['DPE_SITE_ID'] #except: # print "no DPE_SITE_ID set, please update environment and re-try" # sys.exit(9) # #try: # DPE_DOMAIN=os.environ['DPE_DOMAIN'] #except: # print "no DPE_DOMAIN set, please update environment and re-try" # sys.exit(9) # #NAGG=os.path.join(CSPP_RT_HOME, 'common', 'local','bin' ,'nagg') #REPACK=os.path.join(CSPP_RT_HOME, 'common', 'local','bin' ,'h5repack') # ## table of NPP short names to data product ids #DDS_PRODUCT_FILE = os.path.join(ADL_HOME,"cfg","DDS_PRODUCT_IDS.xml") PRODUCTID_2_SHORTNAME = dict() SHORTNAME_2_PRODUCTID = dict() def _build_product_name_maps(): """ Read ADL short name to product map and create dictionaries for name convertion """ if len(PRODUCTID_2_SHORTNAME) == 0: tree = ET.parse(DDS_PRODUCT_FILE) p = tree.find("group/group") links = list(p.iter("config")) # Returns list of all links for i in links: productid = i.find('configValue').text shortname = i.find('name').text shortname = shortname.replace("_NPP", "") PRODUCTID_2_SHORTNAME[productid] = shortname SHORTNAME_2_PRODUCTID[shortname] = productid # nagg -n 10 -O "cspp" -D "dev" -t SVM16 SVM16*.h5 reported_nagg_status = False reported_repack_status = False reported_geo_status = False geo_product_ids = \ ["GATMO", "GCRSO", "GAERO", "GCLDO", "GDNBO", "GNCCO", "GIGTO", "GIMGO", "GITCO", "GMGTO", "GMODO", "GMTCO", "GNHFO", "GOTCO", "GOSCO", "GONPO", "GONCO", "GCRIO", "GATRO", "ICDBG","GOTCO"] edr_product_ids = \ ["ICALI", "ICALM", "ICCCR", "ICISE", "ICMSE", "ICSTT", "ICTLI", "ICTLM", "IICMO", "IICMS", "SATMR", "SATMS", "SCRIS", "SOMPS", "SOMTC", "SOMSC", "SOMNC", "SVDNB", "SVI01", "SVI02", "SVI03", "SVI04", "SVI05", "SVM01", "SVM02", "SVM03", "SVM04", "SVM05", "SVM06", "SVM07", "SVM08", "SVM09", "SVM10", "SVM11", "SVM12", "SVM13", "SVM14", "SVM15", "SVM16", "TATMS", "REDRO", "OOTCO", "VAOOO", "VCBHO", "VCCLO", "VCEPO", "VCOTO", "VCTHO", "VCTPO", "VCTTO", "VI1BO", "VI2BO", "VI3BO", "VI4BO", "VI5BO", "VISTO", "VLSTO", "VM01O", "VM02O", "VM03O", "VM04O", "VM05O", "VM06O", "VNCCO", "VNHFO", "VOCCO", "VISAO", "VSCDO", "VSCMO", "VSICO", "VSSTO", "VSTYO", "VSUMO", "VIVIO", "REDRS", "OOTCS", "VAOOS", "VCBHS", "VCCLS", "VCEPS", "VCOTS", "VCTHS", "VCTPS", "VCTTS", "VISTS", "VLSTS", "VNCCS", "VNHFS", "VOCCS", "VISAS", "VSCDS", "VSCMS", "VSICS", "VSSTS", "VSTPS", "VSUMS", "VIVIS", "INCTO", "INPAK", "IIROO", "IIROS", "IMOPO", "IMOPS", "IVAMI", "IVAOT", "IVBPX", "IVCBH", "IVCDB", "IVCLT", "IVCOP", "IVCTP", "IVICC", "IVIIC", "IVIIW", "IVIQF", "IVIRT", "IVISR", "IVIWT", "IVPCM", "IVPCP", "IVPTP", "IVSIC", "IVSTP"] def _nagg(work_dir, productid, num_gran=15, files_to_agg=None): global reported_nagg_status if files_to_agg is None: pattern = productid + "_npp*.h5" before = glob.glob(os.path.join(work_dir, pattern)) else: before = files_to_agg LOG.debug("products to AGG") LOG.debug(before) geo = False if productid in geo_product_ids: geo = True if geo: # args = [NAGG, "-g", productid, "-n", str(num_gran), "-O", DPE_SITE_ID, "-D", DPE_DOMAIN, "-S"] args = [NAGG, "-g", productid, "--onefile", "-O", DPE_SITE_ID, "-D", DPE_DOMAIN, "-S"] else: # args = [NAGG, "-g", "no", "-n", str(num_gran), "-O", DPE_SITE_ID, "-D", DPE_DOMAIN, "-S", "-t", productid] args = [NAGG, "-g", "no", "--onefile", "-O", DPE_SITE_ID, "-D", DPE_DOMAIN, "-S", "-t", productid] for f in before: args.append(f) if len(before) > 0: if not reported_nagg_status: reported_nagg_status = True status_line('Aggregate products') sh(args, env=env(WORK_DIR=work_dir)) for fn in before: "" LOG.debug("remove" + fn) os.remove(fn) def aggregate_products(work_dir, short_names, num_gran=300, files_to_agg=None): """Aggregate available products matching a short name list, returning a count of products that had problems """ _build_product_name_maps() problems = 0 for short_name in short_names: productid = SHORTNAME_2_PRODUCTID[short_name] LOG.debug("aggregate_products " + short_name + " " + productid) try: _nagg(work_dir, productid, num_gran=num_gran) except CalledProcessError as oops: LOG.debug(traceback.format_exc()) if productid not in edr_product_ids: LOG.warning('nagg does not support aggregation of {} files...'.format(productid, oops)) LOG.error('nagg failed on %r: %r . Continuing' % (productid, oops)) problems = problems + 1 return problems def run_nagg(additional_env): """ run nagg in multiprocessor env """ _nagg(additional_env['WORK_DIR'], additional_env['PRODUCT_ID'], additional_env['NUM_GRAN'], files_to_agg=None) def aggregate_products_par(work_dir, short_names, nprocs=1, num_gran=300, files_to_agg=None): parallel = Pool(int(nprocs)) argument_dictionaries = [] problems=0 for short_name in short_names: productid = SHORTNAME_2_PRODUCTID[short_name] LOG.debug("aggregate_products " + short_name + " " + productid) additional_envs = dict( WORK_DIR=work_dir, PRODUCT_ID=productid, NUM_GRAN=num_gran ) argument_dictionaries.append(additional_envs) try: results = parallel.map_async(run_nagg, argument_dictionaries).get(9999999) except KeyboardInterrupt: LOG.error("Got exception, stopping workers and exiting.") parallel.terminate() parallel.join() sys.exit(1) return problems def _repack(work_dir, h5): """run Adl_Unpacker.exe on an HDF5 file """ if not h5.endswith('.h5'): LOG.warning('%r should end with .h5?' % h5) LOG.debug('running h5repack on %r' % h5) h5_big = h5 + ".big" os.rename(h5, h5_big) sh([REPACK, "-f", "GZIP=1", h5_big, h5], log_execution=False, env=env(WORK_DIR=work_dir)) os.remove(h5_big) def repack_products(work_dir, short_names): global reported_repack_status _build_product_name_maps() problems = 0 for short_name in short_names: productid = SHORTNAME_2_PRODUCTID[short_name] pattern = productid + "_npp*.h5" for fn in glob.glob(os.path.join(work_dir, pattern)): if not reported_repack_status: # status_line('Compress products') reported_repack_status = True try: LOG.debug("Compress:" + fn) _repack(work_dir, fn) except CalledProcessError as oops: LOG.debug(traceback.format_exc()) LOG.error('h5repack failed on %r: %r . Continuing' % (fn, oops)) problems = problems + 1 return problems def add_geo_attribute_to_aggregates(work_dir, short_names): """ For all the geo files produced add the the N_GEO_Ref property """ global reported_geo_status _build_product_name_maps() problems = 0 for short_name in short_names: sdr_name = SHORTNAME_2_PRODUCTID[short_name] + "_npp*.h5" # Name Example: SVI04_npp_d20111121_t1805421_e1807062_b00346_c20120127203200212753_cspp_dev.h5 files = glob.glob(os.path.join(work_dir, sdr_name)) for file in files: try: if not reported_geo_status: reported_geo_status = True status_line('add geo to Aggregate products') name = os.path.basename(file) adl_geo_ref.write_geo_ref(name) except CalledProcessError as oops: LOG.debug(traceback.format_exc()) LOG.error('add_geo_ref failed on %r: %r . Continuing' % (file, oops)) problems += 1 return problems _build_product_name_maps()