"""
openva_pipeline.openva
----------------------
This module runs openVA and smartVA to assign causes of death to VA records.
"""
import subprocess
import shutil
import os
import re
from pandas import read_csv
from pandas import DataFrame
from pandas import concat
from pandas import merge
import numpy as np
from pycrossva.transform import transform
from .exceptions import OpenVAError
from .exceptions import SmartVAError
[docs]class OpenVA:
"""Assign cause of death (COD) to verbal autopsies (VA) R package openVA.
This class creates and executes an R script that copies (and merges)
ODK Briefcase exports, runs openVA to assign CODs, and creates outputs for
depositing in the Transfers DB and to a DHIS server.
:parameter settings: Configuration settings for pipeline steps (which is
returned from :meth:`Pipeline.config() <config>`).
:type settings: dictionary of named tuples
:parameter pipeline_run_date: Date and time when instance of
:class:`Pipeline <Pipeline>` was created (instance attribute).
:type pipeline_run_date: datetime.datetime.now() with formatting
strftime("%Y-%m-%d_%H:%M:%S")
:raises: OpenVAError
"""
def __init__(self, settings, pipeline_run_date):
self.va_args = settings["openva"]
self.pipeline_args = settings["pipeline"]
if settings["odk"].odk_id is None:
self.odk_id = "meta-instanceID"
else:
self.odk_id = settings["odk"].odk_id
self.run_date = pipeline_run_date
dir_openva = os.path.join(self.pipeline_args.working_directory,
"OpenVAFiles")
self.dir_openva = dir_openva
dir_odk = os.path.join(self.pipeline_args.working_directory,
"ODKFiles")
self.dir_odk = dir_odk
self.cli_smartva = os.path.join(self.pipeline_args.working_directory,
"smartva")
self.dhis_org_unit = None
if "dhis" in settings.keys():
self.dhis_org_unit = str(settings["dhis"][0].dhis_org_unit)
self.dhis_org_units = re.split(r"\s|,", self.dhis_org_unit)
self.dhis_org_units = [i for i in self.dhis_org_units if i != ""]
self.successful_run = None
try:
if not os.path.isdir(dir_openva):
os.makedirs(dir_openva)
except (PermissionError, OSError) as exc:
raise OpenVAError("Unable to create directory" +
dir_openva) from exc
[docs] def prep_va_data(self):
"""Create data file for openVA by merging ODK export files & converting
with pycrossva.
:returns: Summary of the number of VA records at each step -- previous
ODK export (0 if there isn't one), new ODK export, and number of VA
records sent to openVA.
:rtype: dict
"""
summary = {"n_export_prev": None,
"n_export_new": None,
"n_to_openva": None}
export_file_prev = os.path.join(self.dir_odk, "odk_export_prev.csv")
export_file_new = os.path.join(self.dir_odk, "odk_export_new.csv")
pycva_input = os.path.join(self.dir_openva, "pycrossva_input.csv")
openva_input_file = os.path.join(self.dir_openva, "openva_input.csv")
is_export_file_prev = os.path.isfile(export_file_prev)
is_export_file_new = os.path.isfile(export_file_new)
algorithm_metadata = \
self.pipeline_args.algorithm_metadata_code.split("|")
who_instrument_version = algorithm_metadata[5]
if who_instrument_version == "v1_4_1":
pycva_instrument_version = "2016WHOv141"
else:
pycva_instrument_version = "2016WHOv151"
if is_export_file_new and not is_export_file_prev:
# with open(export_file_new, "r", newline="") as f_new:
# f_new_lines = f_new.readlines()
# summary["n_export_prev"] = 0
# summary["n_export_new"] = len(f_new_lines) - 1
# summary["n_to_openva"] = len(f_new_lines) - 1
# if len(f_new_lines) == 1:
# return summary
export_df_new = read_csv(export_file_new)
export_n_rows = export_df_new.shape[0]
summary["n_export_prev"] = 0
summary["n_export_new"] = export_n_rows
summary["n_to_openva"] = export_n_rows
if export_n_rows == 0:
return summary
shutil.copy(export_file_new, pycva_input)
if self.pipeline_args.algorithm == "SmartVA":
shutil.copy(pycva_input, openva_input_file)
else:
final_data = transform(mapping=(pycva_instrument_version,
"InterVA5"),
raw_data=pycva_input,
raw_data_id=self.odk_id,
verbose=0)
final_data.to_csv(openva_input_file, index=False)
return summary
# if is_export_file_new and is_export_file_prev:
else:
# with open(export_file_new, "r", newline="") as f_new:
# f_new_lines = f_new.readlines()
# with open(export_file_prev, "r", newline="") as f_prev:
# f_prev_lines = f_prev.readlines()
# summary["n_export_prev"] = len(f_prev_lines) - 1
# summary["n_export_new"] = len(f_new_lines) - 1
# if len(f_new_lines) == 1 and len(f_prev_lines) == 1:
# summary["n_to_openva"] = 0
# return summary
# shutil.copy(export_file_new, pycva_input)
# n_to_openva = len(f_new_lines) - 1
# with open(pycva_input, "a", newline="") as f_combined:
# for line in f_prev_lines:
# if line not in f_new_lines:
# f_combined.write(line)
# n_to_openva += 1
export_df_new = read_csv(export_file_new)
export_new_n_rows = export_df_new.shape[0]
export_df_prev = read_csv(export_file_prev)
export_prev_n_rows = export_df_prev.shape[0]
summary["n_export_prev"] = export_prev_n_rows
summary["n_export_new"] = export_new_n_rows
if export_prev_n_rows == 0 and export_new_n_rows == 0:
summary["n_to_openva"] = 0
return summary
exports_combined = concat([export_df_new, export_df_prev])
cols = list(exports_combined.columns)
match_instanceid = [x for x in cols if "instanceid" in x.lower()]
if len(match_instanceid) == 0:
match_col = None
else:
match_col = match_instanceid[0]
exports_combined.drop_duplicates(subset=match_col,
inplace=True)
exports_combined.to_csv(pycva_input, index=False)
summary["n_to_openva"] = exports_combined.shape[0]
if self.pipeline_args.algorithm == "SmartVA":
shutil.copy(pycva_input, openva_input_file)
else:
final_data = transform(mapping=(pycva_instrument_version,
"InterVA5"),
raw_data=pycva_input,
raw_data_id=self.odk_id,
verbose=0)
final_data.to_csv(openva_input_file, index=False)
return summary
[docs] def r_script(self):
"""Create an R script for running openVA and assigning CODs."""
if not self.pipeline_args.algorithm == "SmartVA":
try:
os.makedirs(
os.path.join(self.dir_openva, self.run_date)
)
except (PermissionError, OSError) as exc:
raise OpenVAError("Unable to create openVA dir" +
str(exc)) from exc
if self.pipeline_args.algorithm == "InSilicoVA":
self._r_script_insilicova()
if self.pipeline_args.algorithm == "InterVA":
self._r_script_interva()
def _r_script_insilicova(self):
file_name = os.path.join(self.dir_openva,
self.run_date,
"r_script_" + self.run_date + ".R")
algorithm_metadata = \
self.pipeline_args.algorithm_metadata_code.split("|")
who_instrument_version = algorithm_metadata[5]
raw_data_file = os.path.join(self.dir_openva, "pycrossva_input.csv")
if who_instrument_version not in ["v1_4_1", "v1_5_1", "v1_5_3"]:
raise OpenVAError("pyCrossVA not able to process WHO " +
"instrument version: " + who_instrument_version)
try:
with open(file_name, "w", newline="") as f:
f.write("date() \n")
f.write("library(openVA) \n")
# TODO: add lubridate to handle different date formats across
# the rows
f.write("getwd() \n")
f.write("raw_data <- read.csv('" + raw_data_file + "') \n")
odk_id_for_r = self.odk_id.replace("-", ".")
odk_id_for_r = odk_id_for_r.replace(":", ".")
f.write("raw_data_sorted <- raw_data[order(raw_data$" + odk_id_for_r + "),] \n")
f.write("col_names <- names(raw_data_sorted) \n")
f.write("new_names <- lapply(strsplit(col_names, '\\\\.'), tail, n = 1) \n")
f.write("names(raw_data_sorted) <- tolower(unlist(new_names)) \n")
f.write("raw_data_sorted$ID <- sort(raw_data$" + odk_id_for_r + ") \n")
f.write("data_from_pycrossva <- read.csv('" + self.dir_openva + "/openva_input.csv') \n")
f.write("records_sorted <- data_from_pycrossva[order(data_from_pycrossva$ID),] \n")
f.write("results <- insilico(data = records_sorted, \n")
f.write("\t data.type = '" + self.va_args.insilicova_data_type + "', \n")
f.write("\t isNumeric = " + self.va_args.insilicova_is_numeric + ", \n")
f.write("\t updateCondProb = " + self.va_args.insilicova_update_cond_prob + ", \n")
f.write("\t keepProbbase.level = " + self.va_args.insilicova_keep_probbase_level + ", \n")
f.write("\t CondProb = " + self.va_args.insilicova_cond_prob + ", \n")
f.write("\t CondProbNum = " + self.va_args.insilicova_cond_prob_num + ", \n")
f.write("\t datacheck = " + self.va_args.insilicova_datacheck + ", \n")
f.write("\t datacheck.missing = " + self.va_args.insilicova_datacheck_missing + ", \n")
f.write("\t warning.write = TRUE, \n")
f.write("\t directory = '" + os.path.join(self.dir_openva, self.run_date) + "', \n")
f.write("\t external.sep = " + self.va_args.insilicova_external_sep + ", \n")
f.write("\t Nsim = " + self.va_args.insilicova_nsim + ", \n")
f.write("\t thin = " + self.va_args.insilicova_thin + ", \n")
f.write("\t burnin = " + self.va_args.insilicova_burnin + ", \n")
f.write("\t auto.length = " + self.va_args.insilicova_auto_length + ", \n")
f.write("\t conv.csmf = " + self.va_args.insilicova_conv_csmf + ", \n")
f.write("\t jump.scale = " + self.va_args.insilicova_jump_scale + ", \n")
f.write("\t levels.prior = " + self.va_args.insilicova_levels_prior + ", \n")
f.write("\t levels.strength = " + self.va_args.insilicova_levels_strength + ", \n")
f.write("\t trunc.min = " + self.va_args.insilicova_trunc_min + ", \n")
f.write("\t trunc.max = " + self.va_args.insilicova_trunc_max + ", \n")
f.write("\t subpop = " + self.va_args.insilicova_subpop + ", \n")
f.write("\t java_option = '" + self.va_args.insilicova_java_option + "', \n")
f.write("\t seed = " + self.va_args.insilicova_seed + ", \n")
f.write("\t phy.code = " + self.va_args.insilicova_phy_code + ", \n")
f.write("\t phy.cat = " + self.va_args.insilicova_phy_cat + ", \n")
f.write("\t phy.unknown = " + self.va_args.insilicova_phy_unknown + ", \n")
f.write("\t phy.external = " + self.va_args.insilicova_phy_external + ", \n")
f.write("\t phy.debias = " + self.va_args.insilicova_phy_debias + ", \n")
f.write("\t exclude.impossible.cause = '" + self.va_args.insilicova_exclude_impossible_cause + "', \n")
f.write("\t no.is.missing = " + self.va_args.insilicova_no_is_missing + ", \n")
f.write("\t indiv.CI = " + self.va_args.insilicova_indiv_ci + ", \n")
f.write("\t groupcode = " + self.va_args.insilicova_no_is_missing + ") \n")
if self.va_args.insilicova_data_type == "WHO2012":
f.write("sex <- ifelse(tolower(records_sorted$MALE)=='y', 'Male', 'Female') \n")
if self.va_args.insilicova_data_type == "WHO2016":
f.write("sex <- ifelse(tolower(records_sorted$i019a)=='y', 'Male', 'Female') \n")
f.write("cod <- getTopCOD(results) \n")
f.write("names(cod) <- toupper(names(cod)) \n")
f.write("dob <- tryCatch(\n")
f.write(" expr = {as.Date(as.character(raw_data_sorted$id10021),")
f.write("tryFormats=c('%Y-%m-%d', '%Y/%m/%d', '%b %d, %Y', '%m/%d/%Y', '%m-%d-%Y'))}, \n")
f.write(" error = {function(e) return(rep('', length(raw_data_sorted$id10021)))}) \n")
f.write("if (length(dob) == 0) { \n")
f.write(" index_id10021 <- which(grepl('id10021$', names(raw_data_sorted)))\n")
f.write(" dob <- tryCatch(\n")
f.write(" expr = {as.Date(as.character(raw_data_sorted[, index_id10021]), ")
f.write("tryFormats=c('%Y-%m-%d', '%Y/%m/%d', '%b %d, %Y', '%m/%d/%Y', '%m-%d-%Y'))}, \n")
f.write(" error = {function(e) return(rep('', length(raw_data_sorted[, index_id10021])))})} \n")
f.write("dod <- tryCatch(\n")
f.write(" expr = {as.Date(as.character(raw_data_sorted$id10023), ")
f.write("tryFormats=c('%Y-%m-%d', '%Y/%m/%d', '%b %d, %Y', '%m/%d/%Y', '%m-%d-%Y'))}, \n")
f.write(" error = {function(c) return(rep('', length(raw_data_sorted$id10023)))}) \n")
f.write("if (length(dod) == 0) { \n")
f.write(" index_id10023 <- which(grepl('id10023$', names(raw_data_sorted)))\n")
f.write(" dod <- tryCatch(\n")
f.write(" expr = {as.Date(as.character(raw_data_sorted[, index_id10023]), ")
f.write("tryFormats=c('%Y-%m-%d', '%Y/%m/%d', '%b %d, %Y', '%m/%d/%Y', '%m-%d-%Y'))}, \n")
f.write(" error = {function(e) return(rep('', length(raw_data_sorted[, index_id10023])))})} \n")
f.write("if (any(which(grepl('ageinyears$', names(raw_data_sorted))))) {\n")
f.write(" index_ageinyears <- which(grepl('ageinyears$', names(raw_data_sorted)))\n")
f.write(" index_ageinyears2 <- which(grepl('ageinyears2$', names(raw_data_sorted)))\n")
f.write(" valid_age2 <- !is.na(raw_data_sorted[, index_ageinyears2])\n")
f.write(" index_isneonatal <- which(grepl('isneonatal$', names(raw_data_sorted)))\n")
f.write(" age <- raw_data_sorted[, index_ageinyears]\n")
f.write(" age[valid_age2] <- raw_data_sorted[valid_age2, index_ageinyears2]\n")
f.write(" age[is.na(age) & raw_data_sorted[, index_isneonatal] == 1] <- 0 \n")
f.write("} else {age <- rep('', nrow(raw_data_sorted))}\n")
f.write("## create matrices for DHIS2 blob (records2) and transfer database (records3) \n")
f.write("## first column must be ID \n")
f.write("metadataCode <- '" + self.pipeline_args.algorithm_metadata_code + "'\n")
#f.write("records2 <- merge(cod, records_sorted, by = 'ID', all = TRUE, sort = TRUE) \n")
f.write("records2 <- merge(cod, raw_data_sorted, by = 'ID', all = TRUE, sort = TRUE) \n")
f.write("records2[is.na(records2[, 'CAUSE1']), 'CAUSE1'] <- 'MISSING' \n")
f.write("cod2 <- records2[, 'CAUSE1'] \n")
f.write("names(records2)[which(names(records2) == 'CAUSE1')] <- 'Cause of Death' \n")
f.write("records2$Metadata <- metadataCode \n")
f.write("evaBlob <- cbind(rep(as.character(records2[,'ID']), each=ncol(records2)), rep(names(records2)), c(apply(records2, 1, c))) \n")
f.write("colnames(evaBlob) <- c('ID', 'Attribute', 'Value') \n")
f.write("write.csv(evaBlob, file='" + self.dir_openva + "/entity_attribute_value.csv', row.names=FALSE, na='') \n\n")
if self.dhis_org_unit is not None:
f.write("### check for DHIS org unit \n")
ou_list = []
for ou in self.dhis_org_units:
ou_colname = "org_unit_col" + str(len(ou_list) + 1)
ou_list.append(ou_colname)
f.write(ou_colname + " <- grepl('" + ou.lower() + "', names(raw_data_sorted)) \n")
f.write("if (sum(" + ou_colname + ") == 1) { \n")
f.write(" " + ou_colname + " <- raw_data_sorted[, " + ou_colname + "] \n")
f.write("} else { \n")
f.write(" " + ou_colname + " <- '" + ou + "' \n")
f.write("} \n")
f.write("### write out results to csv \n")
f.write("records3 <- cbind(as.character(records_sorted[,'ID']), sex, dob, dod, age, cod2, \n ")
f.write(", ".join(ou_list))
#f.write(", \n metadataCode, raw_data_sorted$instanceid, records_sorted[,-1]) \n")
f.write(", \n metadataCode, raw_data_sorted$instanceid, raw_data_sorted) \n")
f.write("names(records3) <- c('id', 'sex', 'dob', 'dod', 'age', 'cod', \n ")
#f.write(", ".join(f"'{ou}'".format(ou) for ou in ou_list))
f.write(str(ou_list)[1:-1])
#f.write(", \n 'metadataCode', 'odkMetaInstanceID', names(records_sorted[,-1])) \n")
f.write(", \n 'metadataCode', 'odkMetaInstanceID', names(raw_data_sorted)) \n")
else:
f.write("### write out results to csv \n")
f.write("records3 <- cbind(as.character(records_sorted[,'ID']), sex, dob, dod, age, cod2, metadataCode, raw_data_sorted$instanceid, raw_data_sorted) \n")
f.write("names(records3) <- c('id', 'sex', 'dob', 'dod', 'age', 'cod', 'metadataCode', 'odkMetaInstanceID', names(raw_data_sorted)) \n")
f.write("write.csv(records3, file='" + self.dir_openva + "/record_storage.csv', row.names=FALSE, na='') \n")
f.write("date() \n")
except (PermissionError, OSError) as exc:
raise OpenVAError("Problem writing R script " +
"for InSilicoVA.") from exc
def _r_script_interva(self):
file_name = os.path.join(self.dir_openva,
self.run_date,
"r_script_" + self.run_date + ".R")
algorithm_metadata = \
self.pipeline_args.algorithm_metadata_code.split("|")
who_instrument_version = algorithm_metadata[5]
raw_data_file = os.path.join(self.dir_openva, "pycrossva_input.csv")
if who_instrument_version not in ["v1_4_1", "v1_5_1", "v1_5_3"]:
raise OpenVAError("pyCrossVA not able to process WHO " +
"instrument version: " + who_instrument_version)
try:
with open(file_name, "w", newline="") as f:
f.write("date() \n")
f.write("library(openVA) \n")
# TODO: add lubridate to handle different date formats across
# the rows
f.write("getwd() \n")
f.write("raw_data <- read.csv('" + raw_data_file + "') \n")
odk_id_for_r = self.odk_id.replace("-", ".")
odk_id_for_r = odk_id_for_r.replace(":", ".")
f.write("raw_data_sorted <- raw_data[order(raw_data$" + odk_id_for_r + "),] \n")
f.write("col_names <- names(raw_data_sorted) \n")
f.write("new_names <- lapply(strsplit(col_names, '\\\\.'), tail, n = 1) \n")
f.write("names(raw_data_sorted) <- tolower(unlist(new_names)) \n")
f.write("raw_data_sorted$ID <- sort(raw_data$" + odk_id_for_r + ") \n")
f.write("data_from_pycrossva <- read.csv('" + self.dir_openva + "/openva_input.csv') \n")
f.write("records_sorted <- data_from_pycrossva[order(data_from_pycrossva$ID),] \n")
if self.va_args.interva_version == "4":
f.write("results <- InterVA5(Input = records_sorted, \n")
else:
f.write("results <- InterVA5(Input = records_sorted, \n")
f.write("\t HIV = '" + self.va_args.interva_hiv + "', \n")
f.write("\t Malaria = '" + self.va_args.interva_malaria + "', \n")
f.write("\t output = '" + self.va_args.interva_output + "', \n")
if self.va_args.interva_version == "4":
f.write("\t replicate = " + self.va_args.interva_replicate + ", \n")
f.write("\t replicate.bug1 = " + self.va_args.interva_replicate_bug1 + ", \n")
f.write("\t replicate.bug2 = " + self.va_args.interva_replicate_bug2 + ", \n")
f.write("\t groupcode = " + self.va_args.interva_groupcode + ", \n")
f.write("\t write = TRUE, \n")
f.write("\t directory = '" + os.path.join(self.dir_openva, self.run_date) + "', \n")
f.write("\t filename = 'interva5_results_" + self.run_date + "') \n")
if self.va_args.interva_version == "4":
f.write("sex <- ifelse(tolower(records_sorted$MALE)=='y', 'Male', 'Female') \n")
if self.va_args.interva_version == "5":
f.write("sex <- ifelse(tolower(records_sorted$i019a)=='y', 'Male', 'Female') \n")
f.write("cod <- getTopCOD(results) \n")
f.write("names(cod) <- toupper(names(cod)) \n")
f.write("dob <- tryCatch(\n")
f.write(" expr = {as.Date(as.character(raw_data_sorted$id10021),")
f.write("tryFormats=c('%Y-%m-%d', '%Y/%m/%d', '%b %d, %Y', '%m/%d/%Y', '%m-%d-%Y'))}, \n")
f.write(" error = {function(e) return(rep('', length(raw_data_sorted$id10021)))}) \n")
f.write("if (length(dob) == 0) { \n")
f.write(" index_id10021 <- which(grepl('id10021$', names(raw_data_sorted)))\n")
f.write(" dob <- tryCatch(\n")
f.write(" expr = {as.Date(as.character(raw_data_sorted[, index_id10021]), ")
f.write("tryFormats=c('%Y-%m-%d', '%Y/%m/%d', '%b %d, %Y', '%m/%d/%Y', '%m-%d-%Y'))}, \n")
f.write(" error = {function(e) return(rep('', length(raw_data_sorted[, index_id10021])))})} \n")
f.write("dod <- tryCatch(\n")
f.write(" expr = {as.Date(as.character(raw_data_sorted$id10023), ")
f.write("tryFormats=c('%Y-%m-%d', '%Y/%m/%d', '%b %d, %Y', '%m/%d/%Y', '%m-%d-%Y'))}, \n")
f.write(" error = {function(c) return(rep('', length(raw_data_sorted$id10023)))}) \n")
f.write("if (length(dod) == 0) { \n")
f.write(" index_id10023 <- which(grepl('id10023$', names(raw_data_sorted)))\n")
f.write(" dod <- tryCatch(\n")
f.write(" expr = {as.Date(as.character(raw_data_sorted[, index_id10023]), ")
f.write("tryFormats=c('%Y-%m-%d', '%Y/%m/%d', '%b %d, %Y', '%m/%d/%Y', '%m-%d-%Y'))}, \n")
f.write(" error = {function(e) return(rep('', length(raw_data_sorted[, index_id10023])))})} \n")
f.write("if (any(which(grepl('ageinyears$', names(raw_data_sorted))))) {\n")
f.write(" index_ageinyears <- which(grepl('ageinyears$', names(raw_data_sorted)))\n")
f.write(" index_ageinyears2 <- which(grepl('ageinyears2$', names(raw_data_sorted)))\n")
f.write(" valid_age2 <- !is.na(raw_data_sorted[, index_ageinyears2])\n")
f.write(" index_isneonatal <- which(grepl('isneonatal$', names(raw_data_sorted)))\n")
f.write(" age <- raw_data_sorted[, index_ageinyears]\n")
f.write(" age[valid_age2] <- raw_data_sorted[valid_age2, index_ageinyears2]\n")
f.write(" age[is.na(age) & raw_data_sorted[, index_isneonatal] == 1] <- 0 \n")
f.write("} else {age <- rep('', nrow(raw_data_sorted))}\n")
f.write("## create matrices for DHIS2 blob (data2) and transfer database (data3) \n")
f.write("## first column must be ID \n")
f.write("metadataCode <- '" + self.pipeline_args.algorithm_metadata_code + "'\n")
#f.write("records2 <- merge(cod, records_sorted, by = 'ID', all = TRUE, sort = TRUE) \n")
f.write("records2 <- merge(cod, raw_data_sorted, by = 'ID', all = TRUE, sort = TRUE) \n")
f.write("records2[is.na(records2[, 'CAUSE1']), 'CAUSE1'] <- 'MISSING' \n")
f.write("cod2 <- records2[, 'CAUSE1'] \n")
f.write("names(records2)[which(names(records2) == 'CAUSE1')] <- 'Cause of Death' \n")
f.write("records2$Metadata <- metadataCode \n")
f.write("evaBlob <- cbind(rep(as.character(records2[,'ID']), each=ncol(records2)), rep(names(records2)), c(apply(records2, 1, c))) \n")
f.write("colnames(evaBlob) <- c('ID', 'Attribute', 'Value') \n")
f.write("write.csv(evaBlob, file='" + self.dir_openva + "/entity_attribute_value.csv', row.names=FALSE, na='') \n\n")
if self.dhis_org_unit is not None:
f.write("### check for DHIS org unit \n")
ou_list = []
for ou in self.dhis_org_units:
ou_colname = "org_unit_col" + str(len(ou_list) + 1)
ou_list.append(ou_colname)
f.write(ou_colname + " <- grepl('" + ou.lower() + "', names(raw_data_sorted)) \n")
f.write("if (sum(" + ou_colname + ") == 1) { \n")
f.write(" " + ou_colname + " <- raw_data_sorted[, " + ou_colname + "] \n")
f.write("} else { \n")
f.write(" " + ou_colname + " <- '" + ou + "' \n")
f.write("} \n")
f.write("### write out results to csv \n")
f.write("records3 <- cbind(as.character(records_sorted[,'ID']), sex, dob, dod, age, cod2, \n ")
f.write(", ".join(ou_list))
#f.write(", \n metadataCode, raw_data_sorted$instanceid, records_sorted[,-1]) \n")
f.write(", \n metadataCode, raw_data_sorted$instanceid, raw_data_sorted) \n")
f.write("names(records3) <- c('id', 'sex', 'dob', 'dod', 'age', 'cod', \n ")
#f.write(", ".join(f"'{ou}'".format(ou) for ou in ou_list))
f.write(str(ou_list)[1:-1])
#f.write(", \n 'metadataCode', 'odkMetaInstanceID', names(records_sorted[,-1])) \n")
f.write(", \n 'metadataCode', 'odkMetaInstanceID', names(raw_data_sorted)) \n")
else:
f.write("### write out results to csv \n")
#f.write("records3 <- cbind(as.character(records_sorted[,'ID']), sex, dob, dod, age, cod2, metadataCode, raw_data_sorted$instanceid, records_sorted[,-1]) \n")
f.write("records3 <- cbind(as.character(records_sorted[,'ID']), sex, dob, dod, age, cod2, metadataCode, raw_data_sorted$instanceid, raw_data_sorted) \n")
#f.write("names(records3) <- c('id', 'sex', 'dob', 'dod', 'age', 'cod', 'metadataCode', 'odkMetaInstanceID', names(records_sorted[,-1])) \n")
f.write("names(records3) <- c('id', 'sex', 'dob', 'dod', 'age', 'cod', 'metadataCode', 'odkMetaInstanceID', names(raw_data_sorted)) \n")
f.write("write.csv(records3, file='" + self.dir_openva + "/record_storage.csv', row.names=FALSE, na='') \n")
f.write("date() \n")
except (PermissionError, OSError) as exc:
raise OpenVAError("Problem writing R script for InterVA.") from exc
[docs] def smartva_to_csv(self):
"""
Write two CSV files:
(1) Entity Value Attribute blob pushed to DHIS2
(entity_attribute_value.csv)
(2) table for transfer database (record_storage.csv)
Both CSV files are stored in the OpenVA folder.
"""
in_file = os.path.join(self.dir_openva, "openva_input.csv")
out_dir = os.path.join(self.dir_openva, self.run_date)
df_data = read_csv(in_file)
if self.dhis_org_unit is not None:
col_has_org_unit = [self.dhis_org_unit in col for col in
df_data.columns]
if sum(col_has_org_unit) == 1:
col_org_unit_index = col_has_org_unit.index(True)
df_data["dhis_org_unit"] = df_data.iloc[:, col_org_unit_index]
else:
df_data["dhis_org_unit"] = self.dhis_org_unit
df_results = read_csv(out_dir +
"/1-individual-cause-of-death/" +
"individual-cause-of-death.csv")
code_df = DataFrame(
np.repeat(self.pipeline_args.algorithm_metadata_code,
df_results.shape[0]), columns=["metadataCode"]
)
df_results = concat([df_results, code_df], axis=1)
cols_keep = ["sex", "birth_date", "death_date",
"age", "cause34", "metadataCode", "sid"]
# TODO: need to add dhis org unit
df_record_storage = merge(left=df_results[cols_keep],
left_on="sid",
right=df_data,
right_on="Generalmodule-sid",
how="right")
df_record_storage.rename(columns={"meta-instanceID":
"odkMetaInstanceID"},
inplace=True)
df_record_storage.drop(columns="sid", inplace=True)
df_record_storage.insert(loc=0, column="ID",
value=df_record_storage["odkMetaInstanceID"])
df_record_storage.to_csv(self.dir_openva + "/record_storage.csv",
index=False)
cols_keep = ["sid", "cause34", "metadataCode"]
df_temp = merge(left=df_results[cols_keep],
left_on="sid",
right=df_data,
right_on="Generalmodule-sid",
how="right")
# df_temp.dropna(subset=["cause34"])
df_temp["cause34"] = df_temp["cause34"].fillna("MISSING")
df_temp.drop(columns="sid", inplace=True)
df_temp.rename(columns={"meta-instanceID": "odkMetaInstanceID"},
inplace=True)
df_temp["ID"] = df_temp["odkMetaInstanceID"]
df_eva = df_temp.melt(id_vars=["ID"],
var_name="Attribute",
value_name="Value")
df_eva.sort_values(by=["ID"], inplace=True)
df_eva.to_csv(self.dir_openva + "/entity_attribute_value.csv",
index=False)
[docs] def get_cod(self):
"""Create and execute R script to assign a COD with openVA; or call
the SmartVA CLI to assign COD."""
if self.pipeline_args.algorithm in ["InSilicoVA", "InterVA"]:
r_script_in = os.path.join(self.dir_openva, self.run_date,
"r_script_" + self.run_date + ".R")
r_script_out = os.path.join(self.dir_openva, self.run_date,
"r_script_" + self.run_date + ".Rout")
#r_args = ["R", "CMD", "BATCH", "--vanilla",
r_args = ["R", "CMD", "BATCH", "--no-save", "--no-restore",
r_script_in, r_script_out]
try:
# capture_output=True not available in Python 3.6
completed = subprocess.run(args=r_args,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=True)
self.successful_run = True
return completed
except subprocess.CalledProcessError as exc:
if exc.returncode == 1:
self.successful_run = False
raise OpenVAError("Error running R script:" +
str(exc.stderr)) from exc
# if not ["InSilicoVA", "InterVA"], then run SmartVA
try:
os.makedirs(
os.path.join(self.dir_openva, self.run_date)
)
except (PermissionError, OSError) as exc:
raise OpenVAError("Unable to create openVA dir" +
str(exc)) from exc
in_file = os.path.join(self.dir_openva, "openva_input.csv")
out_dir = os.path.join(self.dir_openva, self.run_date)
sva_args = [self.cli_smartva,
"--country", "{}".format(self.va_args.smartva_country),
"--hiv", "{}".format(self.va_args.smartva_hiv),
"--malaria", "{}".format(self.va_args.smartva_malaria),
"--hce", "{}".format(self.va_args.smartva_hce),
"--freetext", "{}".format(self.va_args.smartva_freetext),
"--figures", "{}".format(self.va_args.smartva_figures),
"--language", "{}".format(self.va_args.smartva_language),
in_file,
out_dir]
try:
# capture_output=True not available in Python 3.6
completed = subprocess.run(args=sva_args,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=True)
self.smartva_to_csv()
self.successful_run = True
return completed
except subprocess.CalledProcessError as exc:
if exc.returncode == 2:
self.successful_run = False
raise SmartVAError("Error running SmartVA:" +
str(exc.stderr)) from exc
if "Country list" in exc.stdout:
self.successful_run = False
raise SmartVAError("Problem with SmartVA " +
"country code") from exc
[docs] def get_summary(self) -> dict:
"""
Get summary of openVA step.
:returns: Get the number of records passed to openVA and the number
of records without an assigned cause of death (CoD).
:rtype: dict
"""
data_path = os.path.join(self.dir_openva, "record_storage.csv")
record_storage = read_csv(data_path)
n_records = record_storage.shape[0]
if self.pipeline_args.algorithm in ["InSilicoVA", "InterVA"]:
n_missing = sum(record_storage["cod"] == "MISSING")
else:
# TODO: when VAs are not processed by SmartVA, make sure their
# sid value is set to missing when merged with SmartVA results
n_missing = sum(record_storage["cause34"] == "MISSING")
summary = {"n_processed": n_records,
"n_cod_missing": n_missing}
return summary