import datetime
import numpy as np
import pandas as pd
from dagster import (
AssetExecutionContext,
EnvVar,
MaterializeResult,
asset,
)
from music_dagster.contracts import DataFrameContract, enforce_dataframe_contract
from music_dagster.data_contracts import GIDAMPS_CLEANED_CONTRACT
from music_dagster.observability import build_dataframe_observability_metadata
from music_dagster.resources import GTracResource, build_gtrac_materialize_metadata
from music_dagster.redcap import fetch_redcap_json
from music_dagster.settings import get_settings
from music_dagster.transforms.clinical_scores import apply_sccai_hbi_maps
from music_dagster.transforms.mapping_utils import map_columns
from music_dagster.transforms.montreal import apply_montreal_uc_maps
from music_dagster.transforms.shared_maps import SEX_MAP, SMOKING_STATUS_MAP
from music_dagster.utils import drop_columns_with_log, rename_columns_with_log, reorder_queryable_dataset_columns
def _backfill_repeat_instrument_demographics(df: pd.DataFrame) -> pd.DataFrame:
"""Backfill baseline demographics into repeat-instrument rows for GIDAMPS."""
repeat_instrument_column = "redcap_repeat_instrument"
columns_to_backfill = ("study_group_name", "sex")
baseline_mask = df[repeat_instrument_column].fillna("").astype(str).str.strip().eq("")
df[list(columns_to_backfill)] = df[list(columns_to_backfill)].replace(r"^\s*$", np.nan, regex=True)
baseline_df = df.loc[baseline_mask, ["study_id", *columns_to_backfill]]
for column in columns_to_backfill:
baseline_values = (
baseline_df.loc[baseline_df[column].notna(), ["study_id", column]]
.drop_duplicates(subset=["study_id"], keep="first")
.set_index("study_id")[column]
)
repeat_missing_mask = (~baseline_mask) & df[column].isna()
df.loc[repeat_missing_mask, column] = df.loc[repeat_missing_mask, "study_id"].map(baseline_values)
return df
def _merge_with_column_precedence(
left_df: pd.DataFrame,
right_df: pd.DataFrame,
*,
keys: tuple[str, ...],
prefer_left_columns: set[str],
) -> pd.DataFrame:
"""Merge two dataframes and collapse overlapping columns without leaking suffixes."""
overlap = sorted((set(left_df.columns) & set(right_df.columns)) - set(keys))
if not overlap:
return pd.merge(left_df, right_df, how="left", on=list(keys))
merged_df = pd.merge(
left_df,
right_df,
how="left",
on=list(keys),
suffixes=("__left", "__right"),
)
resolved_columns: dict[str, pd.Series] = {}
for column in overlap:
left_column = f"{column}__left"
right_column = f"{column}__right"
if column in prefer_left_columns:
resolved_columns[column] = merged_df[left_column].combine_first(merged_df[right_column])
else:
resolved_columns[column] = merged_df[right_column].combine_first(merged_df[left_column])
suffixed_columns = [f"{column}__left" for column in overlap] + [f"{column}__right" for column in overlap]
merged_df = merged_df.drop(columns=suffixed_columns)
resolved_df = pd.DataFrame(resolved_columns, index=merged_df.index)
return pd.concat([merged_df, resolved_df], axis=1)
@asset(
io_manager_key="io_manager",
description="Fetches GI-DAMPs data from IGMM RedCap Server",
group_name="gidamps",
)
def gidamps_raw_dataframe(context: AssetExecutionContext) -> pd.DataFrame:
"""
Fetches raw data from the GIDAMPS REDCap API and returns it as a pandas DataFrame.
The function performs the following steps:
1. Retrieves the GIDAMPS API token from environment variables.
2. Defines the REDCap API URL and the data payload for the API request.
3. Sends a POST request to the REDCap API to fetch the data.
4. Converts the JSON response into a pandas DataFrame.
5. Drops specific columns from the DataFrame that are not needed.
Returns:
pd.DataFrame: A pandas DataFrame containing the raw data from the GIDAMPS REDCap API.
"""
GIDAMPS_API_TOKEN = EnvVar("GIDAMPS_API_TOKEN").get_value()
redcap_url = get_settings().redcap_api_url
redcap_api_data = {
"token": f"{GIDAMPS_API_TOKEN}",
"content": "record",
"action": "export",
"format": "json",
"type": "flat",
"csvDelimiter": "",
"rawOrLabel": "raw",
"rawOrLabelHeaders": "raw",
"exportCheckboxLabel": "false",
"exportSurveyFields": "false",
"exportDataAccessGroups": "false",
"returnFormat": "json",
}
data = fetch_redcap_json(redcap_url, redcap_api_data, fixture_name="gidamps.json")
df = pd.DataFrame.from_records(data)
df = drop_columns_with_log(
df,
[
"email",
"chi_no",
"consent",
"initial",
"dob",
"legacy_study_id",
"patient_details_complete",
"pd_gi_participant_category",
"registration_location",
"date_enrolledconsent",
"data_entry_date",
"study_group",
"study_group_hc",
"blood_experiment",
"faecal_experiment",
"biopsy_experiment",
"bloodtestdate_as_lbt",
"blood_sample_collected___5",
"blood_sample_collected___6",
"blood_sample_collected___3",
"blood_sample_collected___4",
"blood_sample_collected___100",
"blood_sample_collected___200",
"blood_sample_collected___99",
"blood_sample_collected___2",
"blood_sample_collected___1",
"blood_sample_collected____1000",
"blood_sample_optinal_set___1",
"blood_sample_optinal_set____1000",
"blood_sample_additional___1",
"blood_sample_additional___2",
"blood_sample_additional____1000",
"faecal_test_date",
"faecal_sample_collected___1",
"faecal_sample_collected___2",
"faecal_sample_collected___3",
"faecal_sample_collected___4",
"faecal_sample_collected___99",
"faecal_sample_collected____1000",
"sampls_date_experiment4",
"biopsy_sample_collected___1",
"biopsy_sample_collected___2",
"biopsy_sample_collected___99",
"biopsy_sample_collected____1000",
"sampling_complete",
"baseline_eims____1000",
"sccai_complications____1000",
"hbicomplications____1000",
"adalimumab_test",
"infliximab_test",
"vedolizumab_test",
"ustekinumab_test",
"drug_level_uste",
"drug_level_antibody_uste",
"drug_level_vedo",
"drug_level_antibody_vedo",
"endoscopy_yn",
"endoscopy_type____1000",
"gidamps_participant_questionnaire_complete",
"baseline_mont_cd_loc____1000",
"baseline_mont_cd_beh____1000",
"radiology",
"ibd_background_clinician_complete",
],
context,
)
context.add_output_metadata(build_dataframe_observability_metadata(df))
return df
@asset(description="Data cleaning - Renames columns and maps values.", group_name="gidamps")
def gidamps_cleaned_dataframe(context: AssetExecutionContext, gidamps_raw_dataframe: pd.DataFrame) -> pd.DataFrame:
"""
Cleans and transforms the raw GIDAMPS dataframe.
This function performs the following operations:
1. Renames columns to more readable names.
2. Converts specified columns to numeric types, coercing errors.
3. Converts date columns to datetime objects and calculates diagnosis duration.
4. Maps categorical columns to more readable values.
5. Creates new columns based on existing data.
6. Drops unnecessary columns.
7. Replaces empty strings with NaN values.
Args:
gidamps_raw_dataframe (pd.DataFrame): The raw dataframe containing GIDAMPS data.
Returns:
pd.DataFrame: The cleaned and transformed dataframe.
"""
df = gidamps_raw_dataframe.copy()
df = df.rename(
columns={
"bl_new_diagnosis": "new_diagnosis_of_ibd",
"smokeryn1": "smoking_status",
"bmi_height": "height",
"bmi_weight": "weight",
"diagnosis_age": "age_at_diagnosis",
"smokeryn1_y": "is_smoker",
"patient_active_symptomyn1": "has_active_symptoms",
"antibiotics": "sampling_abx",
"steroids": "sampling_steroids",
"haematocrit_lab": "haematocrit",
"neutrophils_lab": "neutrophils",
"lymphocytes_lab": "lymphocytes",
"monocytes_lab": "monocytes",
"eosinophils_lab": "eosinophils",
"basophils_lab": "basophils",
"plt_lab": "platelets",
"urea_lab": "urea",
"creatinine_lab": "creatinine",
"sodium_lab": "sodium",
"potassium_lab": "potassium",
"egfr_lab": "egfr",
"faecal_test_date_2": "calprotectin_date",
"drug_level_inflxi": "ifx_level",
"drug_level_adalimumab": "ada_level",
"drug_level_antibody_adali": "ada_antibody",
"drug_level_antibody_inflx": "ifx_antibody",
"baseline_mont_uc_extent": "montreal_uc_extent",
"baseline_mont_seve_uc": "montreal_uc_severity",
"baseline_eims___2": "baseline_eims_arthralgia",
"baseline_eims___3": "baseline_eims_ank_spon",
"baseline_eims___5": "baseline_eims_erythema_nodosum",
"baseline_eims___6": "baseline_eims_pyoderma",
"baseline_eims___10": "baseline_eims_uveitis",
"baseline_eims___12": "baseline_eims_episcleritis",
"baseline_eims___8": "baseline_eims_sacroileitis",
"baseline_eims___15": "baseline_eims_none",
"sccai_complications___1": "sccai_arthralgia",
"sccai_complications___2": "sccai_uveitis",
"sccai_complications___3": "sccai_erythema_nodosum",
"sccai_complications___4": "sccai_pyoderma",
"hbicomplications___1": "hbi_arthralgia",
"hbicomplications___2": "hbi_uveitis",
"hbicomplications___3": "hbi_erythema_nodosum",
"hbicomplications___4": "hbi_apthous_ulcers",
"hbicomplications___5": "hbi_pyoderma",
"hbicomplications___6": "hbi_anal_fissures",
"hbicomplications___7": "hbi_new_fistula",
"hbicomplications___8": "hbi_abscess",
"baseline_gi_symptoms_desc": "symptoms_description",
"sccai_bowel_freqday": "sccai_bowel_frequency_day",
"sccai_bowel_frequency_nigh": "sccai_bowel_frequency_night",
"sccai_urgency_of_defecatio": "sccai_urgency",
"hbinumber_of_liquid_stools": "hbi_liquid_stools",
"hbiabdominal_mass1": "hbi_abdominal_mass",
"hbigeneral_well_being_as": "hbi_general_well_being",
"hbiabdominal_pain": "hbi_abdominal_pain",
"blood_test_date_red": "nhs_bloods_date",
"date_test_adali_inflixi": "drug_level_date",
"endoscopy_result_endcospy": "endoscopy_report",
"histopathology_report_text": "pathology_report",
"ct_abdomen_and_or_pelvis": "ct_abdomen",
"radilogy_r_ctabdo_pelvic": "ct_abdomen_report",
"radiology_r_mri_sml_bowel": "mri_small_bowel_report",
"radiology_r_mri_pelvis": "mri_pelvis_report",
"comment_if_any_of_these_qu": "participant_questionnaire_comments",
"mh_appendx1": "previous_appendicectomy",
"mh_tonsilout1": "previous_tonsillectomy",
"mh_tonsil_date1": "age_or_year_of_tonsillectomy",
"mh_appendix_date1": "age_or_year_of_appendicectomy",
"fhdiagnosis_pfh_1": "family_history_diagnosis",
"relationship_pfh": "family_history_relationship",
"family_history_pfh": "family_history_of_ibd",
"gi_q1": "giq_ethnicity",
"gi_q1_other_3": "giq_ethicity_text",
"q_hc_goodhealth": "giq_are_you_in_good_health",
"gi_q1_other_2": "giq_good_health_text",
"q_hc_longterm_medication": "giq_long_term_medication",
"gi_q1_other": "giq_long_term_medication_text",
"q3_gi": "giq_smoking_status_at_diagnosis",
"q5_gi": "giq_strenous_exercise_last_48h",
"q5_gi_yes": "giq_strenous_exercise_last_48h_text",
"q4_gi": "giq_do_you_drink_alcohol",
"q4_gi_yes": "giq_alcohol_consumption",
"sccai_total_calculation": "sccai_total",
"hbi_total_calculation": "hbi_total",
},
)
df = _backfill_repeat_instrument_demographics(df)
enforce_dataframe_contract(
df,
"gidamps_cleaned_dataframe_source_enums",
DataFrameContract(
required_columns=(
"study_id",
"redcap_repeat_instrument",
"study_group_name",
"sex",
),
enum_coverage={
"redcap_repeat_instrument": ("", "sampling", "cucq32"),
"study_group_name": ("1", "2", "3", "4", "5", "6"),
"sex": ("1", "2"),
},
null_thresholds={
"study_id": 0.0,
"study_group_name": 0.0,
"sex": 0.05,
},
),
)
columns_to_convert = [
"study_group_name",
"baseline_recruitment_type",
"sex",
"ibd_status",
"new_diagnosis_of_ibd",
"smoking_status",
"sccai_general_well_being",
"sccai_bowel_frequency_day",
"sccai_bowel_frequency_night",
"sccai_urgency",
"sccai_blood_in_stool",
"montreal_uc_extent",
"montreal_uc_severity",
"hbi_abdominal_mass",
"hbi_general_well_being",
"hbi_abdominal_pain",
"endoscopy_type___1",
"endoscopy_type___2",
"endoscopy_type___3",
"baseline_mont_cd_loc___0",
"baseline_mont_cd_loc___1",
"baseline_mont_cd_loc___2",
"baseline_mont_cd_loc___3",
"baseline_mont_cd_beh___0",
"baseline_mont_cd_beh___1",
"baseline_mont_cd_beh___2",
"baseline_mont_cd_beh___3",
"ct_abdomen",
"mri_small_bowel",
"mri_pelvis",
"ada_antibody",
"ifx_antibody",
"past_ibd_surgery",
"ifx",
"ciclo",
"aza",
"mp",
"mtx",
"ada",
"uste",
"vedo",
"filgo",
"risa",
"upa",
"golim",
"tofa",
"previous_appendicectomy",
"family_history_of_ibd",
"giq_smoking_status_at_diagnosis",
"giq_alcohol_consumption",
"family_history_diagnosis",
"giq_ethnicity",
]
df[columns_to_convert] = df[columns_to_convert].apply(pd.to_numeric, errors="coerce")
df["diagnosis_duration_in_days"] = (
pd.to_datetime(datetime.date.today()) - pd.to_datetime(df["date_of_diagnosis"])
).dt.days
df["ct_abdomen"] = df["ct_abdomen"].map({1: 1, 2: 0})
df["mri_small_bowel"] = df["mri_small_bowel"].map({1: 1, 2: 0})
df["mri_pelvis"] = df["mri_pelvis"].map({1: 1, 2: 0})
df["past_ibd_surgery"] = df["past_ibd_surgery"].map({1: 1, 2: 0})
df["ifx"] = df["ifx"].map({1: 1, 2: 0})
df["ciclo"] = df["ciclo"].map({1: 1, 2: 0})
df["previous_appendicectomy"] = df["previous_appendicectomy"].map({1: 1, 2: 0})
df["family_history_of_ibd"] = df["family_history_of_ibd"].map({1: "yes", 2: "no", 3: "not_available"})
df["giq_smoking_status_at_diagnosis"] = df["giq_smoking_status_at_diagnosis"].map({1: "yes", 2: "no", 3: "unsure"})
df["giq_alcohol_consumption"] = df["giq_alcohol_consumption"].map(
{
1: "most_days",
2: "weekends_only",
3: "once_or_twice_a_week",
4: "once_or_twice_a_month",
5: "once_or_twice_a_year",
}
)
df["family_history_diagnosis"] = df["family_history_diagnosis"].map(
{
6: "cd",
7: "uc",
9: "ibdu",
10: "possible_cd",
11: "possible_uc",
12: "possible_ibdu",
99: "other_diagnosis",
}
)
df["giq_ethnicity"] = df["giq_ethnicity"].map({1: "white_european", 99: "other_see_text"})
df["ada_antibody"] = df["ada_antibody"].map({1: "<10", 2: "10-40", 3: "40-200", 4: ">200", 5: "not_tested"})
df["ifx_antibody"] = df["ifx_antibody"].map({1: "<10", 2: "10-40", 3: "40-200", 4: ">200", 5: "not_tested"})
df["study_group_name"] = df["study_group_name"].map(
{
1: "cd",
2: "uc",
3: "ibdu",
4: "non_ibd",
5: "await_dx",
6: "hc",
}
)
df["baseline_recruitment_type"] = df["baseline_recruitment_type"].map(
{
1: "endoscopy",
2: "outpatient",
3: "inpatient",
}
)
map_columns(df, ("sex",), SEX_MAP)
df["ibd_status"] = df["ibd_status"].map(
{
0: "biochem_remission",
1: "remission",
2: "active",
3: "highly_active",
4: "not_applicable",
}
)
df["new_diagnosis_of_ibd"] = df["new_diagnosis_of_ibd"].map(
{
1: "yes",
0: "no",
}
)
map_columns(df, ("smoking_status",), SMOKING_STATUS_MAP)
df = apply_sccai_hbi_maps(df)
df = apply_montreal_uc_maps(df)
df.loc[df["baseline_mont_cd_beh___0"] == 1, "montreal_cd_behaviour"] = "B1"
df.loc[df["baseline_mont_cd_beh___1"] == 1, "montreal_cd_behaviour"] = "B2"
df.loc[df["baseline_mont_cd_beh___2"] == 1, "montreal_cd_behaviour"] = "B3"
df.loc[df["baseline_mont_cd_beh___3"] == 1, "montreal_perianal"] = 1
df.loc[df["baseline_mont_cd_loc___0"] == 1, "montreal_cd_location"] = "L1"
df.loc[df["baseline_mont_cd_loc___1"] == 1, "montreal_cd_location"] = "L2"
df.loc[df["baseline_mont_cd_loc___2"] == 1, "montreal_cd_location"] = "L3"
df.loc[df["baseline_mont_cd_loc___3"] == 1, "montreal_upper_gi"] = 1
df.loc[df["endoscopy_type___1"] == 1, "endoscopy_type"] = "other"
df.loc[df["endoscopy_type___2"] == 1, "endoscopy_type"] = "colonoscopy"
df.loc[df["endoscopy_type___3"] == 1, "endoscopy_type"] = "flexi_sig"
df.loc[df["aza"] == 1, "baseline_thiopurine_exposure"] = 1
df.loc[df["mp"] == 1, "baseline_thiopurine_exposure"] = 1
df.loc[df["ada"] == 1, "baseline_anti_tnf_exposure"] = 1
df.loc[df["ifx"] == 1, "baseline_anti_tnf_exposure"] = 1
df.loc[df["golim"] == 1, "baseline_anti_tnf_exposure"] = 1
df.loc[df["ada"] == 1, "baseline_biologic_exposure"] = 1
df.loc[df["ifx"] == 1, "baseline_biologic_exposure"] = 1
df.loc[df["golim"] == 1, "baseline_biologic_exposure"] = 1
df.loc[df["uste"] == 1, "baseline_biologic_exposure"] = 1
df.loc[df["vedo"] == 1, "baseline_biologic_exposure"] = 1
df.loc[df["filgo"] == 1, "baseline_biologic_exposure"] = 1
df.loc[df["risa"] == 1, "baseline_biologic_exposure"] = 1
df.loc[df["upa"] == 1, "baseline_biologic_exposure"] = 1
df.loc[df["tofa"] == 1, "baseline_biologic_exposure"] = 1
df.loc[df["upa"] == 1, "baseline_jak_exposure"] = 1
df.loc[df["tofa"] == 1, "baseline_jak_exposure"] = 1
df.loc[df["filgo"] == 1, "baseline_jak_exposure"] = 1
cols_to_fill = [
"baseline_thiopurine_exposure",
"baseline_anti_tnf_exposure",
"baseline_biologic_exposure",
"baseline_jak_exposure",
]
df[cols_to_fill] = df[cols_to_fill].fillna(0)
df = drop_columns_with_log(
df,
[
"baseline_mont_cd_loc___0",
"baseline_mont_cd_loc___1",
"baseline_mont_cd_loc___2",
"baseline_mont_cd_loc___3",
"baseline_mont_cd_beh___0",
"baseline_mont_cd_beh___1",
"baseline_mont_cd_beh___2",
"baseline_mont_cd_beh___3",
"endoscopy_type___1",
"endoscopy_type___2",
"endoscopy_type___3",
],
context,
)
df["study_center"] = df["study_id"].apply(
lambda x: "glasgow" if "136-" in x else ("dundee" if "138-" in x else "edinburgh")
)
# Data Harmonization
df = rename_columns_with_log(df, {"study_group_name": "study_group"}, context)
df["study_id"] = df["study_id"].apply(lambda x: f"GID-{x}")
df = df.replace(r"^\s*$", np.nan, regex=True)
contract_metadata = enforce_dataframe_contract(df, "gidamps_cleaned_dataframe", GIDAMPS_CLEANED_CONTRACT)
context.add_output_metadata(
build_dataframe_observability_metadata(
df,
contract_metadata=contract_metadata,
include_preview=True,
preview_rows=10,
extra_metadata={
"contract_violation_count": contract_metadata["violation_count"],
},
)
)
return df
@asset(description="Creates demographics dataframe", group_name="gidamps")
def gidamps_demographics_dataframe(
context: AssetExecutionContext,
gidamps_cleaned_dataframe: pd.DataFrame,
ibd_snp_carriers_dataframe: pd.DataFrame,
) -> pd.DataFrame:
"""
Filters the given GIDAMPS cleaned dataframe to produce a demographics dataframe.
This function removes rows where the 'redcap_repeat_instrument' column has the values 'sampling' or 'cucq32'.
It also drops columns that contain only NaN values.
"""
df = gidamps_cleaned_dataframe.copy()
demographics_mask = ~df["redcap_repeat_instrument"].isin(["sampling", "cucq32"])
demographics_df = df.loc[demographics_mask].copy()
demographics_df = demographics_df.dropna(axis=1, how="all")
# Merge SNP data into GI-DAMPs demographics
snp_df = ibd_snp_carriers_dataframe[ibd_snp_carriers_dataframe["study_id"].str.startswith("GID-")]
# Create a set of study_ids with SNP data for faster lookups
snp_study_ids = set(snp_df["study_id"])
# Merge SNP data into demographics dataframe
demographics_df = pd.merge(demographics_df, snp_df, how="left", on="study_id")
# Add column indicating if genotype data is available
demographics_df["genotype_data_available"] = demographics_df["study_id"].isin(snp_study_ids)
contract_metadata = enforce_dataframe_contract(
demographics_df,
"gidamps_demographics_dataframe",
DataFrameContract(
required_columns=("study_id",),
unique_keys=(("study_id",),),
null_thresholds={"study_id": 0.0},
),
)
context.add_output_metadata(
build_dataframe_observability_metadata(
demographics_df,
contract_metadata=contract_metadata,
include_preview=True,
preview_rows=10,
extra_metadata={
"dataframe/number_of_participants_with_snp_data": int(snp_df.shape[0]),
},
)
)
return demographics_df
@asset(
description="Creates sampling dataframe",
group_name="gidamps",
)
def gidamps_sampling_dataframe(
context: AssetExecutionContext, gidamps_cleaned_dataframe: pd.DataFrame
) -> pd.DataFrame:
"""
Processes and merges different subsets of a given DataFrame based on specific conditions.
Args:
gidamps_cleaned_dataframe (pd.DataFrame): The input DataFrame containing GIDAMPS data.
Returns:
pd.DataFrame: The processed and merged DataFrame.
The function performs the following steps:
1. Filters out rows where 'redcap_repeat_instrument' is 'sampling' or 'cucq32' to create a demographics DataFrame.
2. Drops columns with all NaN values from the demographics DataFrame.
3. Creates separate DataFrames for 'sampling' and 'cucq32' instruments.
4. Drops columns with all NaN values from the 'sampling' and 'cucq32' DataFrames.
5. Drops specific columns from the 'sampling' and 'cucq32' DataFrames.
6. Renames the 'cucq_date' column to 'sampling_date' in the 'cucq32' DataFrame.
7. Merges the 'sampling' DataFrame with the demographics DataFrame on 'study_id'.
8. Merges the resulting DataFrame with the 'cucq32' DataFrame on 'study_id' and 'sampling_date'.
Note:
The function assumes that the input DataFrame contains the columns 'redcap_repeat_instrument',
'study_id', 'cucq_date', and other columns mentioned in the code.
"""
df = gidamps_cleaned_dataframe.copy()
demographics_mask = ~df["redcap_repeat_instrument"].isin(["sampling", "cucq32"])
demographics_df = df.loc[demographics_mask].copy()
demographics_df = demographics_df.dropna(axis=1, how="all")
sampling_df = df[df["redcap_repeat_instrument"] == "sampling"].copy()
cucq_df = df[df["redcap_repeat_instrument"] == "cucq32"].copy()
if not sampling_df.empty:
sampling_df = sampling_df.dropna(axis=1, how="all")
if not cucq_df.empty:
cucq_df = cucq_df.dropna(axis=1, how="all")
cols_to_drop = [
"baseline_thiopurine_exposure",
"baseline_anti_tnf_exposure",
"baseline_biologic_exposure",
"baseline_jak_exposure",
"redcap_repeat_instrument",
"study_center",
]
sampling_df = drop_columns_with_log(sampling_df, cols_to_drop, context)
cucq_df = drop_columns_with_log(cucq_df, cols_to_drop, context)
cucq_df = drop_columns_with_log(cucq_df, ["redcap_repeat_instance"], context)
if "cucq_date" in cucq_df.columns:
if "sampling_date" in cucq_df.columns:
cucq_df = drop_columns_with_log(cucq_df, ["sampling_date"], context)
cucq_df = rename_columns_with_log(cucq_df, {"cucq_date": "sampling_date"}, context)
sampling_columns = set(sampling_df.columns) - {"study_id", "sampling_date"}
merged_df = _merge_with_column_precedence(
sampling_df,
demographics_df,
keys=("study_id",),
prefer_left_columns=sampling_columns,
)
merged_df = _merge_with_column_precedence(
merged_df,
cucq_df,
keys=("study_id", "sampling_date"),
prefer_left_columns=sampling_columns,
)
contract_metadata = enforce_dataframe_contract(
merged_df,
"gidamps_sampling_dataframe",
DataFrameContract(
required_columns=("study_id", "sampling_date"),
unique_keys=(("study_id", "sampling_date"),),
null_thresholds={"study_id": 0.0},
),
)
context.add_output_metadata(
build_dataframe_observability_metadata(
merged_df,
contract_metadata=contract_metadata,
include_preview=True,
preview_rows=10,
)
)
return merged_df
@asset(
group_name="gidamps",
description="Stores GIDAMPS demographic data in GTrac Dataset model",
)
def store_demographic_data_in_gtrac(
gidamps_demographics_dataframe: pd.DataFrame, gtrac: GTracResource
) -> MaterializeResult:
df = reorder_queryable_dataset_columns(gidamps_demographics_dataframe)
enforce_dataframe_contract(
df,
"store_demographic_data_in_gtrac",
DataFrameContract(
required_columns=("study_id",),
unique_keys=(("study_id",),),
null_thresholds={"study_id": 0.0},
),
)
json = df.to_json(orient="records")
data = {
"scope_type": "study",
"modality": "clinical",
"studies": ["gidamps"],
"name": "gidamps_demographics",
"description": "Demographic data from the GIDAMPS study. Each row represents a single participant.",
"columns": list(df.columns),
"json": json,
}
response = gtrac.submit_data(data)
metadata = build_gtrac_materialize_metadata(response, rows_submitted=len(df))
metadata.update(build_dataframe_observability_metadata(df))
return MaterializeResult(metadata=metadata)
@asset(
group_name="gidamps",
description="Stores GIDAMPS sampling data in GTrac Dataset model",
)
def store_sampling_data_in_gtrac(gidamps_sampling_dataframe: pd.DataFrame, gtrac: GTracResource) -> MaterializeResult:
df = reorder_queryable_dataset_columns(gidamps_sampling_dataframe)
enforce_dataframe_contract(
df,
"store_sampling_data_in_gtrac",
DataFrameContract(
required_columns=("study_id", "sampling_date"),
unique_keys=(("study_id", "sampling_date"),),
null_thresholds={"study_id": 0.0},
),
)
json = df.to_json(orient="records")
data = {
"scope_type": "study",
"modality": "clinical",
"studies": ["gidamps"],
"name": "gidamps_sampling",
"description": "Sampling data from the GIDAMPS study. Each row represents a single sampling event. There may be multiple sampling events per participant.",
"columns": list(df.columns),
"json": json,
}
response = gtrac.submit_data(data)
metadata = build_gtrac_materialize_metadata(response, rows_submitted=len(df))
metadata.update(build_dataframe_observability_metadata(df))
return MaterializeResult(metadata=metadata)