Skip to content

Combined Mini-MUSIC and MUSIC pipeline

This combined asset concatenates the cleaned adult MUSIC dataframe with the paediatric Mini-MUSIC extract so analysts can run cross-cohort queries. Before returning the merged frame it records row and column dimensions plus the overlapping versus study-specific columns, which helps consumers decide which variables can be compared directly.

music_dagster/assets/combined_music_mini_music.py
import pandas as pd
from dagster import AssetExecutionContext, MaterializeResult, asset

from music_dagster.contracts import enforce_dataframe_contract
from music_dagster.data_contracts import COMBINED_MUSIC_MINI_CONTRACT
from music_dagster.observability import build_dataframe_observability_metadata
from music_dagster.resources import GTracResource, build_gtrac_materialize_metadata
from music_dagster.utils import reorder_queryable_dataset_columns


@asset(
    group_name="combined_music_mini_music",
    description="Merges music and mini-music dataframes",
)
def combined_music_mini_music_dataframe(
    context: AssetExecutionContext,
    music_cleaned_dataframe: pd.DataFrame,
    mini_music_cleaned_dataframe: pd.DataFrame,
) -> pd.DataFrame:
    music_df = music_cleaned_dataframe
    mini_music_df = mini_music_cleaned_dataframe

    overlapping_columns = set(music_df.columns).intersection(set(mini_music_df.columns))
    music_only_columns = set(music_df.columns).difference(set(mini_music_df.columns))
    mini_music_only_columns = set(mini_music_df.columns).difference(set(music_df.columns))

    df = pd.concat([music_df, mini_music_df])
    contract_metadata = enforce_dataframe_contract(
        df, "combined_music_mini_music_dataframe", COMBINED_MUSIC_MINI_CONTRACT
    )

    context.add_output_metadata(
        build_dataframe_observability_metadata(
            df,
            contract_metadata=contract_metadata,
            extra_metadata={
                "input_music_df_dimension": f"{music_df.shape[0]} rows x {music_df.shape[1]} columns",
                "input_mini_music_df_dimension": f"{mini_music_df.shape[0]} rows x {mini_music_df.shape[1]} columns",
                "output_df_dimension": f"{df.shape[0]} rows x {df.shape[1]} columns",
                "overlapping_column_count": len(overlapping_columns),
                "music_only_column_count": len(music_only_columns),
                "mini_music_only_column_count": len(mini_music_only_columns),
                "overlapping_column_preview": sorted(list(overlapping_columns))[:20],
                "contract_violation_count": contract_metadata["violation_count"],
            },
        )
    )
    return df


@asset(
    group_name="combined_music_mini_music",
    description="Stores combined dataframe in G-Trac",
)
def store_combined_music_in_gtrac(
    combined_music_mini_music_dataframe: pd.DataFrame, gtrac: GTracResource
) -> MaterializeResult:
    df = reorder_queryable_dataset_columns(combined_music_mini_music_dataframe)
    enforce_dataframe_contract(df, "store_combined_music_in_gtrac", COMBINED_MUSIC_MINI_CONTRACT)
    json = df.to_json(orient="records")
    data = {
        "scope_type": "study",
        "modality": "clinical",
        "studies": ["music", "mini_music"],
        "name": "combined_music",
        "description": (
            "Combined music and mini-music dataframes. "
            "Consider dropping timepoint_4 and timepoint_5 to harmonize "
            "the longitudinal timepoints between both studies. "
            "Demographic data can be generated from timepoint_1 rows."
        ),
        "columns": list(df.columns),
        "json": json,
    }
    response = gtrac.submit_data(data)
    metadata = build_gtrac_materialize_metadata(response, rows_submitted=len(df))
    metadata.update(build_dataframe_observability_metadata(df))
    return MaterializeResult(metadata=metadata)