import os
import ssl
import urllib
import pipeline.h.tasks.importdata.fluxes as fluxes
import pipeline.h.tasks.importdata.importdata as importdata
import pipeline.infrastructure as infrastructure
import pipeline.infrastructure.vdp as vdp
from pipeline.infrastructure import task_registry
from . import dbfluxes
__all__ = [
'ALMAImportData',
'ALMAImportDataInputs',
'ALMAImportDataResults'
]
LOG = infrastructure.get_logger(__name__)
try:
FLUX_SERVICE_URL = os.environ['FLUX_SERVICE_URL']
except Exception as e:
FLUX_SERVICE_URL = ''
# FLUX_SERVICE_URL = 'https://2019jul.asa-test.alma.cl/sc/flux'
try:
FLUX_SERVICE_URL_BACKUP = os.environ['FLUX_SERVICE_URL_BACKUP']
# 'https://2019jul.asa-test.alma.cl/sc/flux'
except Exception as e:
FLUX_SERVICE_URL_BACKUP = ''
[docs]class ALMAImportDataResults(importdata.ImportDataResults):
def __init__(self, mses=None, setjy_results=None):
super().__init__(mses=mses, setjy_results=setjy_results)
self.parang_ranges = {}
def __repr__(self):
return 'ALMAImportDataResults:\n\t{0}'.format(
'\n\t'.join([ms.name for ms in self.mses]))
[docs]@task_registry.set_equivalent_casa_task('hifa_importdata')
@task_registry.set_casa_commands_comment('If required, ASDMs are converted to MeasurementSets.')
class ALMAImportData(importdata.ImportData):
Inputs = ALMAImportDataInputs
Results = ALMAImportDataResults
def _get_fluxes(self, context, observing_run):
# get the flux measurements from Source.xml for each MS
if self.inputs.dbservice:
testquery = '?DATE=27-March-2013&FREQUENCY=86837309056.169219970703125&WEIGHTED=true&RESULT=1&NAME=J1427-4206'
# Test for service response
baseurl = FLUX_SERVICE_URL
url = baseurl + testquery
if baseurl == '':
url = ''
try:
# ignore HTTPS certificate
ssl_context = ssl._create_unverified_context()
response = urllib.request.urlopen(url, context=ssl_context, timeout=60.0)
xml_results = dbfluxes.get_setjy_results(observing_run.measurement_sets)
fluxservice = 'FIRSTURL'
except Exception as e:
try:
LOG.warn('Unable to execute initial test query with primary flux service.')
# ignore HTTPS certificate
ssl_context = ssl._create_unverified_context()
baseurl = FLUX_SERVICE_URL_BACKUP
url = baseurl + testquery
if baseurl == '':
url = ''
response = urllib.request.urlopen(url, context=ssl_context, timeout=60.0)
xml_results = dbfluxes.get_setjy_results(observing_run.measurement_sets)
fluxservice='BACKUPURL'
except Exception as e2:
if url == '':
msg = 'Backup URL not defined for test query...'
else:
msg = 'Unable to execute backup test query with flux service.'
LOG.warn(msg+'\nProceeding without using the online flux catalog service.')
xml_results = fluxes.get_setjy_results(observing_run.measurement_sets)
fluxservice = 'FAIL'
else:
xml_results = fluxes.get_setjy_results(observing_run.measurement_sets)
fluxservice = None
# write/append them to flux.csv
# Cycle 1 hack for exporting the field intents to the CSV file:
# export_flux_from_result queries the context, so we pseudo-register
# the mses with the context by replacing the original observing run
orig_observing_run = context.observing_run
context.observing_run = observing_run
try:
fluxes.export_flux_from_result(xml_results, context)
finally:
context.observing_run = orig_observing_run
# re-read from flux.csv, which will include any user-coded values
combined_results = fluxes.import_flux(context.output_dir, observing_run)
return fluxservice, combined_results