import pandas as pd
from dagster import AssetExecutionContext, MaterializeResult, asset
from music_dagster.contracts import enforce_dataframe_contract
from music_dagster.data_contracts import COMBINED_MUSIC_MINI_CONTRACT
from music_dagster.observability import build_dataframe_observability_metadata
from music_dagster.resources import GTracResource, build_gtrac_materialize_metadata
from music_dagster.utils import reorder_queryable_dataset_columns
@asset(
group_name="combined_music_mini_music",
description="Merges music and mini-music dataframes",
)
def combined_music_mini_music_dataframe(
context: AssetExecutionContext,
music_cleaned_dataframe: pd.DataFrame,
mini_music_cleaned_dataframe: pd.DataFrame,
) -> pd.DataFrame:
music_df = music_cleaned_dataframe
mini_music_df = mini_music_cleaned_dataframe
overlapping_columns = set(music_df.columns).intersection(set(mini_music_df.columns))
music_only_columns = set(music_df.columns).difference(set(mini_music_df.columns))
mini_music_only_columns = set(mini_music_df.columns).difference(set(music_df.columns))
df = pd.concat([music_df, mini_music_df])
contract_metadata = enforce_dataframe_contract(
df, "combined_music_mini_music_dataframe", COMBINED_MUSIC_MINI_CONTRACT
)
context.add_output_metadata(
build_dataframe_observability_metadata(
df,
contract_metadata=contract_metadata,
extra_metadata={
"input_music_df_dimension": f"{music_df.shape[0]} rows x {music_df.shape[1]} columns",
"input_mini_music_df_dimension": f"{mini_music_df.shape[0]} rows x {mini_music_df.shape[1]} columns",
"output_df_dimension": f"{df.shape[0]} rows x {df.shape[1]} columns",
"overlapping_column_count": len(overlapping_columns),
"music_only_column_count": len(music_only_columns),
"mini_music_only_column_count": len(mini_music_only_columns),
"overlapping_column_preview": sorted(list(overlapping_columns))[:20],
"contract_violation_count": contract_metadata["violation_count"],
},
)
)
return df
@asset(
group_name="combined_music_mini_music",
description="Stores combined dataframe in G-Trac",
)
def store_combined_music_in_gtrac(
combined_music_mini_music_dataframe: pd.DataFrame, gtrac: GTracResource
) -> MaterializeResult:
df = reorder_queryable_dataset_columns(combined_music_mini_music_dataframe)
enforce_dataframe_contract(df, "store_combined_music_in_gtrac", COMBINED_MUSIC_MINI_CONTRACT)
json = df.to_json(orient="records")
data = {
"scope_type": "study",
"modality": "clinical",
"studies": ["music", "mini_music"],
"name": "combined_music",
"description": (
"Combined music and mini-music dataframes. "
"Consider dropping timepoint_4 and timepoint_5 to harmonize "
"the longitudinal timepoints between both studies. "
"Demographic data can be generated from timepoint_1 rows."
),
"columns": list(df.columns),
"json": json,
}
response = gtrac.submit_data(data)
metadata = build_gtrac_materialize_metadata(response, rows_submitted=len(df))
metadata.update(build_dataframe_observability_metadata(df))
return MaterializeResult(metadata=metadata)