Source code for openva_pipeline.odk

"""
openva_pipeline.odk
-------------------

This module uses ODK Briefcase to pull VA records from an ODK Aggregate server.
"""

import subprocess
import os
import shutil
import requests
import csv

from .exceptions import ODKError


[docs]class ODK: """Manages Pipeline's interaction with ODK Aggregate. This class handles the segment of the pipeline related to ODK. The ODK.connect() method calls ODK Briefcase to connect with an ODK Aggregate server and export VA records. It also checks for previously exported files and updates them as needed. Finally, it logs messages and errors to the pipeline database. :parameter settings: Configuration settings for pipeline steps (which is returned from :meth:`Pipeline.config() <config>`). :type settings: dictionary of named tuples """ def __init__(self, settings): odk_settings = settings["odk"] pipeline_settings = settings["pipeline"] self.odk_id = odk_settings.odk_id self.odk_url = odk_settings.odk_url self.odk_user = odk_settings.odk_user self.odk_password = odk_settings.odk_password self.odk_form_id = odk_settings.odk_form_id self.odk_last_run = odk_settings.odk_last_run self.odk_last_run_date = odk_settings.odk_last_run_date self.odk_last_run_date_prev = odk_settings.odk_last_run_date_prev # self.odkLastRunResult = odk_settings.odkLastRunResult # bcDir = os.path.abspath(os.path.dirname(__file__)) # self.bc_path = os.path.join(bcDir, "libs/ODK-Briefcase-v1.12.2.jar") self.bc_path = os.path.join(pipeline_settings.working_directory, "ODK-Briefcase-v1.18.0.jar") self.odk_project_number = odk_settings.odk_project_number odk_path = os.path.join(pipeline_settings.working_directory, "ODKFiles") self.export_dir = odk_path self.storage_dir = odk_path self.file_name = "odk_export_new.csv" self.n_records = None try: if not os.path.isdir(odk_path): os.makedirs(odk_path) except (PermissionError, OSError) as exc: raise ODKError("Unable to create directory " + odk_path) from exc
[docs] def merge_to_prev_export(self): """Merge previous ODK Briefcase export files.""" export_file_prev = os.path.join(self.export_dir, "odk_export_prev.csv") export_file_new = os.path.join(self.export_dir, self.file_name) is_export_file_prev = os.path.isfile(export_file_prev) is_export_file_new = os.path.isfile(export_file_new) if is_export_file_prev and is_export_file_new: with open(export_file_new, "r", newline="") as f_new: f_new_lines = f_new.readlines() with open(export_file_prev, "r", newline="") as f_prev: f_prev_lines = f_prev.readlines() with open(export_file_prev, "a", newline="") as f_combined: for line in f_new_lines: if line not in f_prev_lines: f_combined.write(line) os.remove(export_file_new) if is_export_file_new and not is_export_file_prev: shutil.move(export_file_new, export_file_prev)
[docs] def briefcase(self): """Calls ODK Briefcase. This method spawns a new process that runs the ODK Briefcase Java application (via a command-line interface) to download a CSV file with verbal autopsy records from an ODK Aggregate server. :returns: Return value from method subprocess.run() :rtype: subprocess.CompletedProcess :raises: ODKError """ # bc_args_plla = ['java', '-jar', self.bc_path, # '-plla', # '--odk_url', str('"' + self.odk_url + '"'), # '--odk_username', str('"' + self.odk_user + '"'), # '--odk_password', str('"' + self.odk_password + '"'), # '--storage_directory', str(self.storage_dir), # '--form_id', str('"' + self.odk_form_id + '"'), # '-e', # '--export_directory', str(self.export_dir), # '--export_filename', str(self.file_name), # '--export_start_date', # str('"' + self.odk_last_run_date_prev + '"'), # '--overwrite_csv_export', '--exclude_media_export'] bc_args_plla = [ "java", "-jar", self.bc_path, "-plla", "--odk_url", str('"' + self.odk_url + '"'), "--odk_username", str('"' + self.odk_user + '"'), "--odk_password", str('"' + self.odk_password + '"'), "--storage_directory", str(self.storage_dir), "--form_id", str('"' + self.odk_form_id + '"'), ] try: subprocess.run( args=bc_args_plla, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True, ) except subprocess.CalledProcessError as exc: raise ODKError(str(exc.stderr)) from exc bc_args_export = [ "java", "-jar", self.bc_path, "-e", "--form_id", str('"' + self.odk_form_id + '"'), "--storage_directory", str(self.storage_dir), "--export_directory", str(self.export_dir), "--export_filename", str(self.file_name), "--export_start_date", str('"' + self.odk_last_run_date_prev + '"'), "--overwrite_csv_export", "--exclude_media_export", ] try: completed_export = subprocess.run( args=bc_args_export, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True, ) except subprocess.CalledProcessError as exc: raise ODKError(str(exc.stderr)) from exc return completed_export
[docs] def central(self): """Connects to ODK Central through api. This method calls requests.get to download a CSV file with verbal autopsy records from an ODK Collect server. :returns: Returns a string indicating the number of downloaded records. :rtype: string :raises: ODKError """ export_file_new = os.path.join(self.export_dir, self.file_name) url = os.path.join( self.odk_url, "v1/projects", self.odk_project_number, "forms", self.odk_form_id, "submissions.csv", ) data_filter = ( "?$filter=__system/submissionDate%20ge%20" + self.odk_last_run_date.replace("/", "-") ) username = self.odk_user password = self.odk_password try: r = requests.get(url + data_filter, auth=(username, password)) except requests.exceptions.ConnectionError as e: raise ODKError( "Network problem, unable to connect to ODK Central" + " (using requests): {0}".format(e) ) except requests.exceptions.Timeout as e: raise ODKError( "ODK Central server failing to respond after establishing" + " a connection (using requests): {0}".format(e) ) except requests.exceptions.TooManyRedirects as e: raise ODKError( "Exceeded number of maximum redirections from ODK Central " + " server (using requests): {0}".format(e) ) if r.status_code != 200: raise ODKError( "Error getting data from ODK Central: {0}".format(r.text)) odk_text = r.text.splitlines() self.n_records = len(odk_text) - 1 odk_reader = csv.reader(odk_text, delimiter=',', quotechar='"') odk_data = [row for row in odk_reader] with open(export_file_new, "w") as f: writer = csv.writer(f) writer.writerows(odk_data) return f"Downloaded {self.n_records} records."