Find Gaps in Fluxnet data

test_file_zip = here() / Path("../fluxnet/FLX_DE-Hai_FLUXNET2015_FULLSET_2000-2012_1-4.zip")
test_file = here() / Path("../fluxnet/FLX_DE-Hai_FLUXNET2015_FULLSET_2000-2012_1-4/FLX_DE-Hai_FLUXNET2015_FULLSET_HH_2000-2012_1-4.csv")
tmp_dir = Path("/tmp")
out_dir = here() / Path("../fluxnet/gap_stat")
download_dir = Path("/run/media/simone/Simone DATI/fluxnet_all")
test_file_zip
PosixPath('/home/simone/Documents/uni/Thesis/GPFA_imputation/../fluxnet/FLX_DE-Hai_FLUXNET2015_FULLSET_2000-2012_1-4.zip')

unzip the file and load it lazily with polars

zipf = zipfile.ZipFile(test_file_zip).extract('FLX_DE-Hai_FLUXNET2015_FULLSET_HH_2000-2012_1-4.csv', path=tmp_dir)
zipf
'/tmp/FLX_DE-Hai_FLUXNET2015_FULLSET_HH_2000-2012_1-4.csv'
# load columns names
col_names = pl.read_csv(zipf, n_rows=1).columns
types = {
    **{
        # pl.Uint8 should be enough for a QC flag, but some columns are floats in the csv ...
        col_name: pl.Float32 if col_name.endswith("_QC") else pl.Float64 for col_name in col_names
    },
    "TIMESTAMP_START": pl.Int64,
    "TIMESTAMP_END": pl.Int64
}
# load df with correct types
df = pl.scan_csv(zipf, null_values=["-9999", "-9999.99"], dtypes=types)
df.head().collect()
shape: (5, 238)
TIMESTAMP_START TIMESTAMP_END TA_F_MDS TA_F_MDS_QC TA_ERA TA_F TA_F_QC SW_IN_POT SW_IN_F_MDS SW_IN_F_MDS_QC SW_IN_ERA SW_IN_F SW_IN_F_QC LW_IN_F_MDS LW_IN_F_MDS_QC LW_IN_ERA LW_IN_F LW_IN_F_QC LW_IN_JSB LW_IN_JSB_QC LW_IN_JSB_ERA LW_IN_JSB_F LW_IN_JSB_F_QC VPD_F_MDS VPD_F_MDS_QC VPD_ERA VPD_F VPD_F_QC PA PA_ERA PA_F PA_F_QC P P_ERA P_F P_F_QC WS ... RECO_DT_VUT_75 RECO_DT_VUT_84 RECO_DT_VUT_95 RECO_DT_CUT_REF RECO_DT_CUT_USTAR50 RECO_DT_CUT_MEAN RECO_DT_CUT_SE RECO_DT_CUT_05 RECO_DT_CUT_16 RECO_DT_CUT_25 RECO_DT_CUT_50 RECO_DT_CUT_75 RECO_DT_CUT_84 RECO_DT_CUT_95 GPP_DT_VUT_REF GPP_DT_VUT_USTAR50 GPP_DT_VUT_MEAN GPP_DT_VUT_SE GPP_DT_VUT_05 GPP_DT_VUT_16 GPP_DT_VUT_25 GPP_DT_VUT_50 GPP_DT_VUT_75 GPP_DT_VUT_84 GPP_DT_VUT_95 GPP_DT_CUT_REF GPP_DT_CUT_USTAR50 GPP_DT_CUT_MEAN GPP_DT_CUT_SE GPP_DT_CUT_05 GPP_DT_CUT_16 GPP_DT_CUT_25 GPP_DT_CUT_50 GPP_DT_CUT_75 GPP_DT_CUT_84 GPP_DT_CUT_95 RECO_SR
i64 i64 f64 f32 f64 f64 f32 f64 f64 f32 f64 f64 f32 f64 f32 f64 f64 f32 f64 f32 f64 f64 f32 f64 f32 f64 f64 f32 f64 f64 f64 f32 f64 f64 f64 f32 f64 ... f64 f64 f64 f64 f64 f64 f64 f64 f64 f64 f64 f64 f64 f64 f64 f64 f64 f64 f64 f64 f64 f64 f64 f64 f64 f64 f64 f64 f64 f64 f64 f64 f64 f64 f64 f64 f64
200001010000 200001010030 -0.6 0.0 -0.349 -0.6 0.0 0.0 0.0 0.0 0.0 0.0 0.0 null null 302.475 302.475 2.0 270.92 0.0 307.452 270.92 0.0 0.222 0.0 0.403 0.222 0.0 96.63 96.671 96.63 0.0 0.0 0.011 0.0 0.0 2.05 ... 1.04549 1.04774 1.04848 1.02086 1.02913 1.02528 0.006221 0.938402 0.965572 1.0197 1.03409 1.04549 1.04669 1.04848 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 1.23379
200001010030 200001010100 -0.65 0.0 -0.39 -0.65 0.0 0.0 0.0 0.0 0.0 0.0 0.0 null null 302.475 302.475 2.0 271.265 0.0 307.452 271.265 0.0 0.122 0.0 0.399 0.122 0.0 96.58 96.676 96.58 0.0 0.0 0.011 0.0 0.0 2.53 ... 1.04422 1.04647 1.0472 1.01962 1.02788 1.02403 0.006213 0.937264 0.964401 1.01847 1.03283 1.04422 1.04542 1.0472 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 1.23238
200001010100 200001010130 -0.58 0.0 -0.43 -0.58 0.0 0.0 0.0 0.0 0.0 0.0 0.0 null null 301.677 301.677 2.0 271.958 0.0 306.541 271.958 0.0 0.09 0.0 0.396 0.09 0.0 96.56 96.682 96.56 0.0 0.0 0.0 0.0 0.0 3.15 ... 1.046 1.04825 1.04898 1.02135 1.02963 1.02577 0.006224 0.938856 0.96604 1.0202 1.03459 1.046 1.0472 1.04898 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 1.23238
200001010130 200001010200 -0.51 0.0 -0.439 -0.51 0.0 0.0 0.0 0.0 0.0 0.0 0.0 null null 301.677 301.677 2.0 272.292 0.0 306.541 272.292 0.0 0.11 0.0 0.411 0.11 0.0 96.56 96.673 96.56 0.0 0.0 0.0 0.0 0.0 3.12 ... 1.04777 1.05003 1.05076 1.02308 1.03137 1.02751 0.006234 0.940447 0.967676 1.02193 1.03634 1.04777 1.04897 1.05076 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 1.23521
200001010200 200001010230 -0.49 0.0 -0.449 -0.49 0.0 0.0 0.0 0.0 0.0 0.0 0.0 null null 301.677 301.677 2.0 272.481 0.0 306.541 272.481 0.0 0.102 0.0 0.426 0.102 0.0 96.57 96.665 96.57 0.0 0.0 0.0 0.0 0.0 3.04 ... 1.04827 1.05054 1.05127 1.02357 1.03187 1.02801 0.006237 0.940901 0.968143 1.02242 1.03684 1.04827 1.04948 1.05127 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 1.23379

columns selection, interested only in QC columns to find gaps

df.head().select(pl.col("^.*_QC$")).collect().columns
['TA_F_MDS_QC',
 'TA_F_QC',
 'SW_IN_F_MDS_QC',
 'SW_IN_F_QC',
 'LW_IN_F_MDS_QC',
 'LW_IN_F_QC',
 'LW_IN_JSB_QC',
 'LW_IN_JSB_F_QC',
 'VPD_F_MDS_QC',
 'VPD_F_QC',
 'PA_F_QC',
 'P_F_QC',
 'WS_F_QC',
 'CO2_F_MDS_QC',
 'TS_F_MDS_1_QC',
 'TS_F_MDS_2_QC',
 'TS_F_MDS_3_QC',
 'TS_F_MDS_4_QC',
 'TS_F_MDS_5_QC',
 'SWC_F_MDS_1_QC',
 'SWC_F_MDS_2_QC',
 'SWC_F_MDS_3_QC',
 'G_F_MDS_QC',
 'LE_F_MDS_QC',
 'H_F_MDS_QC',
 'NEE_CUT_REF_QC',
 'NEE_VUT_REF_QC',
 'NEE_CUT_USTAR50_QC',
 'NEE_VUT_USTAR50_QC',
 'NEE_CUT_MEAN_QC',
 'NEE_VUT_MEAN_QC',
 'NEE_CUT_05_QC',
 'NEE_CUT_16_QC',
 'NEE_CUT_25_QC',
 'NEE_CUT_50_QC',
 'NEE_CUT_75_QC',
 'NEE_CUT_84_QC',
 'NEE_CUT_95_QC',
 'NEE_VUT_05_QC',
 'NEE_VUT_16_QC',
 'NEE_VUT_25_QC',
 'NEE_VUT_50_QC',
 'NEE_VUT_75_QC',
 'NEE_VUT_84_QC',
 'NEE_VUT_95_QC']

The goal is to find where the data is missing in the dataset (which means that it has been gap-filled) and find:

we filter out the rows where there is no gap (QC =0)

then find the start and end of gap by comparing with the original row number of the previous entry

df.select(
        [pl.col("TA_F_QC"), "TIMESTAMP_START"]
    ).with_column(
        pl.first().cumcount().alias("row_num")
    ).filter(
        pl.col("TA_F_QC") != 0
    ).slice(10, 10).collect()
shape: (10, 3)
TA_F_QC TIMESTAMP_START row_num
f32 i64 u32
1.0 200112201230 34537
1.0 200206051100 42550
1.0 200211081330 50043
1.0 200404200930 75427
2.0 200404201000 75428
2.0 200404201030 75429
2.0 200404201100 75430
2.0 200404201130 75431
2.0 200404201200 75432
2.0 200404201230 75433
df.select(
        [pl.col("TA_F_QC"), "TIMESTAMP_START"]
    ).with_column(
        pl.first().cumcount().alias("row_num")
    ).filter(
        pl.col("TA_F_QC") != 0
    ).slice(10, 15).with_columns([
        (pl.col("row_num") - pl.col("row_num").shift() ).alias("before"),
        (pl.col("row_num").shift(-1) - pl.col("row_num")).alias("after"),
    ]).collect()
shape: (15, 5)
TA_F_QC TIMESTAMP_START row_num before after
f32 i64 u32 u32 u32
1.0 200112201230 34537 null 8013
1.0 200206051100 42550 8013 7493
1.0 200211081330 50043 7493 25384
1.0 200404200930 75427 25384 1
2.0 200404201000 75428 1 1
2.0 200404201030 75429 1 1
2.0 200404201100 75430 1 1
2.0 200404201130 75431 1 1
2.0 200404201200 75432 1 1
2.0 200404201230 75433 1 1
1.0 200404201300 75434 1 46940
1.0 200612241100 122374 46940 1
1.0 200612241130 122375 1 1
1.0 200612241200 122376 1 1
1.0 200612241230 122377 1 null
df.select(
        [pl.col("TA_F_QC"), "TIMESTAMP_START"]
    ).with_column(
        pl.first().cumcount().alias("row_num")
    ).filter(
        pl.col("TA_F_QC") != 0
    ).slice(10, 15).with_columns([
        (pl.col("row_num") - pl.col("row_num").shift() ).alias("before"),
        (pl.col("row_num").shift(-1) - pl.col("row_num")).alias("after"),
    ]).filter(
        (pl.col("before") != 1) | (pl.col("after") != 1)
    ).with_column(
        (pl.when((pl.col("before") != 1) & (pl.col("after") != 1))
        .then(1)
        .otherwise(pl.col("row_num").shift(-1) - pl.col("row_num") + 1)
        .alias("gap_len"))
    ).collect()
shape: (7, 6)
TA_F_QC TIMESTAMP_START row_num before after gap_len
f32 i64 u32 u32 u32 u32
1.0 200112201230 34537 null 8013 1
1.0 200206051100 42550 8013 7493 1
1.0 200211081330 50043 7493 25384 1
1.0 200404200930 75427 25384 1 8
1.0 200404201300 75434 1 46940 46941
1.0 200612241100 122374 46940 1 4
1.0 200612241230 122377 1 null null
gaps = df.select(
        [pl.col("TA_F_QC"), "TIMESTAMP_START"]
    ).with_column(
        pl.first().cumcount().alias("row_num")
    ).filter(
        pl.col("TA_F_QC") != 0
    ).with_columns([
        (pl.col("row_num") - pl.col("row_num").shift() ).alias("before"),
        (pl.col("row_num").shift(-1) - pl.col("row_num")).alias("after"),
    ]).filter(
        (pl.col("before") != 1) | (pl.col("after") != 1)
    ).with_column(
        (pl.when((pl.col("before") != 1) & (pl.col("after") != 1))
        .then(1)
        .otherwise(pl.col("row_num").shift(-1) - pl.col("row_num") + 1)
        .alias("gap_len"))
    ).filter(
        pl.col("before") != 1
    ).select(
        ["TIMESTAMP_START", "gap_len"]
    ).collect()
gaps
shape: (25, 2)
TIMESTAMP_START gap_len
i64 u32
200007261130 1
200007271230 1
200008021000 1
200008161200 1
200008231030 1
200009201200 1
200012040100 1
200107121630 1
200108190930 1
200110231230 1
200112201230 1
200206051100 1
200211081330 1
200404200930 8
200612241100 15
200904021230 1
200912281130 341
201003161130 1490
201007010000 22
201104211030 1
201105051030 1
201108111030 1008
201206042230 4
201209250930 47
201210090930 3
gaps.select(pl.col("gap_len").sum())
shape: (1, 1)
gap_len
u32
2954
df.select(
        [pl.col("TA_F_QC"), "TIMESTAMP_START"]
    ).with_column(
        pl.first().cumcount().alias("row_num")
    ).filter(
        pl.col("TA_F_QC") != 0
    ).collect().shape
(2954, 3)

it works!

def find_gap(df, col_name):
    return df.select(
        [col_name, pl.col("TIMESTAMP_END").alias("gap_start")]
    ).with_column(
        pl.first().cumcount().alias("row_num")
    ).filter(
        pl.col(col_name) != 0
    ).with_columns([
        (pl.col("row_num") - pl.col("row_num").shift() ).alias("before"),
        (pl.col("row_num").shift(-1) - pl.col("row_num")).alias("after"),
    ]).filter(
        (pl.col("before") != 1) | (pl.col("after") != 1)
    ).with_column(
        (pl.when((pl.col("before") != 1) & (pl.col("after") != 1))
        .then(pl.col("gap_start"))
        .otherwise(pl.col("gap_start").shift(-1))
        .alias("gap_end"))
    ).filter(
        pl.col("before") != 1
    ).select(
        ["gap_start", "gap_end", pl.lit(col_name).alias("variable")]
    )

source

find_gap

 find_gap (df, col_name)
df_test = pl.DataFrame({'var': [0,0,0,0,0,1,1,1,1,0,0,1,1,0], 'TIMESTAMP_END': df.head(14).collect()['TIMESTAMP_END']})
find_gap(df_test, 'var').with_column( (pl.col("gap_end") - pl.col("gap_start")).alias("gap_len"))
shape: (2, 4)
gap_start gap_end variable gap_len
i64 i64 str i64
200001010300 200001010430 "var" 130
200001010600 200001010630 "var" 30
def scan_fluxnet_csv(f, convert_dates=False):
    
    # col names may be different between the stations, so read them from the csv before parsing the whole file
    col_names = pl.read_csv(f, n_rows=1).columns
        
    types = {
        **{
            # pl.Uint8 should be enough for a QC flag, but some columns are floats in the csv ...
            col_name: pl.Float32 if col_name.endswith("_QC") else pl.Float64 for col_name in col_names
        },
        "TIMESTAMP_START": pl.Int64, # for now keep as int convert to dates at the end
        "TIMESTAMP_END": pl.Int64
    } 
    
    df = pl.scan_csv(
        f, null_values=["-9999", "-9999.99"], dtypes=types
        ).rename({
            "TIMESTAMP_START": "start",
            "TIMESTAMP_END": "end", 
        })
    
    if convert_dates:
        df = df.with_columns([
            pl.col("start").cast(pl.Utf8).str.strptime(pl.Datetime, "%Y%m%d%H%M"),
            pl.col("end").cast(pl.Utf8).str.strptime(pl.Datetime, "%Y%m%d%H%M"),
        ])
        
    return df

source

scan_fluxnet_csv

 scan_fluxnet_csv (f, convert_dates=False)
scan_fluxnet_csv(test_file).head().collect()
shape: (5, 238)
start end TA_F_MDS TA_F_MDS_QC TA_ERA TA_F TA_F_QC SW_IN_POT SW_IN_F_MDS SW_IN_F_MDS_QC SW_IN_ERA SW_IN_F SW_IN_F_QC LW_IN_F_MDS LW_IN_F_MDS_QC LW_IN_ERA LW_IN_F LW_IN_F_QC LW_IN_JSB LW_IN_JSB_QC LW_IN_JSB_ERA LW_IN_JSB_F LW_IN_JSB_F_QC VPD_F_MDS VPD_F_MDS_QC VPD_ERA VPD_F VPD_F_QC PA PA_ERA PA_F PA_F_QC P P_ERA P_F P_F_QC WS ... RECO_DT_VUT_75 RECO_DT_VUT_84 RECO_DT_VUT_95 RECO_DT_CUT_REF RECO_DT_CUT_USTAR50 RECO_DT_CUT_MEAN RECO_DT_CUT_SE RECO_DT_CUT_05 RECO_DT_CUT_16 RECO_DT_CUT_25 RECO_DT_CUT_50 RECO_DT_CUT_75 RECO_DT_CUT_84 RECO_DT_CUT_95 GPP_DT_VUT_REF GPP_DT_VUT_USTAR50 GPP_DT_VUT_MEAN GPP_DT_VUT_SE GPP_DT_VUT_05 GPP_DT_VUT_16 GPP_DT_VUT_25 GPP_DT_VUT_50 GPP_DT_VUT_75 GPP_DT_VUT_84 GPP_DT_VUT_95 GPP_DT_CUT_REF GPP_DT_CUT_USTAR50 GPP_DT_CUT_MEAN GPP_DT_CUT_SE GPP_DT_CUT_05 GPP_DT_CUT_16 GPP_DT_CUT_25 GPP_DT_CUT_50 GPP_DT_CUT_75 GPP_DT_CUT_84 GPP_DT_CUT_95 RECO_SR
i64 i64 f64 f32 f64 f64 f32 f64 f64 f32 f64 f64 f32 f64 f32 f64 f64 f32 f64 f32 f64 f64 f32 f64 f32 f64 f64 f32 f64 f64 f64 f32 f64 f64 f64 f32 f64 ... f64 f64 f64 f64 f64 f64 f64 f64 f64 f64 f64 f64 f64 f64 f64 f64 f64 f64 f64 f64 f64 f64 f64 f64 f64 f64 f64 f64 f64 f64 f64 f64 f64 f64 f64 f64 f64
200001010000 200001010030 -0.6 0.0 -0.349 -0.6 0.0 0.0 0.0 0.0 0.0 0.0 0.0 null null 302.475 302.475 2.0 270.92 0.0 307.452 270.92 0.0 0.222 0.0 0.403 0.222 0.0 96.63 96.671 96.63 0.0 0.0 0.011 0.0 0.0 2.05 ... 1.04549 1.04774 1.04848 1.02086 1.02913 1.02528 0.006221 0.938402 0.965572 1.0197 1.03409 1.04549 1.04669 1.04848 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 1.23379
200001010030 200001010100 -0.65 0.0 -0.39 -0.65 0.0 0.0 0.0 0.0 0.0 0.0 0.0 null null 302.475 302.475 2.0 271.265 0.0 307.452 271.265 0.0 0.122 0.0 0.399 0.122 0.0 96.58 96.676 96.58 0.0 0.0 0.011 0.0 0.0 2.53 ... 1.04422 1.04647 1.0472 1.01962 1.02788 1.02403 0.006213 0.937264 0.964401 1.01847 1.03283 1.04422 1.04542 1.0472 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 1.23238
200001010100 200001010130 -0.58 0.0 -0.43 -0.58 0.0 0.0 0.0 0.0 0.0 0.0 0.0 null null 301.677 301.677 2.0 271.958 0.0 306.541 271.958 0.0 0.09 0.0 0.396 0.09 0.0 96.56 96.682 96.56 0.0 0.0 0.0 0.0 0.0 3.15 ... 1.046 1.04825 1.04898 1.02135 1.02963 1.02577 0.006224 0.938856 0.96604 1.0202 1.03459 1.046 1.0472 1.04898 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 1.23238
200001010130 200001010200 -0.51 0.0 -0.439 -0.51 0.0 0.0 0.0 0.0 0.0 0.0 0.0 null null 301.677 301.677 2.0 272.292 0.0 306.541 272.292 0.0 0.11 0.0 0.411 0.11 0.0 96.56 96.673 96.56 0.0 0.0 0.0 0.0 0.0 3.12 ... 1.04777 1.05003 1.05076 1.02308 1.03137 1.02751 0.006234 0.940447 0.967676 1.02193 1.03634 1.04777 1.04897 1.05076 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 1.23521
200001010200 200001010230 -0.49 0.0 -0.449 -0.49 0.0 0.0 0.0 0.0 0.0 0.0 0.0 null null 301.677 301.677 2.0 272.481 0.0 306.541 272.481 0.0 0.102 0.0 0.426 0.102 0.0 96.57 96.665 96.57 0.0 0.0 0.0 0.0 0.0 3.04 ... 1.04827 1.05054 1.05127 1.02357 1.03187 1.02801 0.006237 0.940901 0.968143 1.02242 1.03684 1.04827 1.04948 1.05127 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 1.23379
def get_site_info(df):
    return df.select([
        pl.col("start").first(),
        pl.col("end").last()
    ]).collect()

source

get_site_info

 get_site_info (df)
def _get_site_url(url): return re.search(r"[A-Z]{2}-[A-z0-9]{3}", url).group()
    
def find_gaps_fluxnet_archive(path_zip, # zip file path that uses fluxnet 
                  out_dir,
                  tmp_dir,
                  delete_file = True
                         ):
    try:
        fname = path_zip.stem.replace("FULLSET", "FULLSET_HH") 
        out_name = out_dir / f"GAPS_stat_{fname}.parquet"
        f = zipfile.ZipFile(path_zip).extract(fname + ".csv", path=tmp_dir)
    except KeyError:
        fname = path_zip.stem.replace("FULLSET", "FULLSET_HR") # some sites are naed differently
        out_name = out_dir / f"GAPS_stat_{fname}.parquet"
        f = zipfile.ZipFile(path_zip).extract(fname + ".csv", path=tmp_dir)
    
    df = scan_fluxnet_csv(f)
    
   
    
    gaps = find_all_gaps(df).collect()
    
    # site info
    site = _get_site_url(fname)
    site_info = get_site_info(df, site)
    
    gaps = gaps.with_column(
        pl.lit(site_info[0, "site_start"]).alias("site_start"),
        pl.lit(site_info[0, "site_end"]).alias("site_end"),
        pl.lit(site).alias("site"),
    )
    
    # convert dates to correct type
    gaps = gaps.with_column(
       pl.col("start").str.strptime(pl.Datetime, "%Y%m%d%H%M"), 
       pl.col("end").str.strptime(pl.Datetime, "%Y%m%d%H%M"), 
    )
    
    gaps.write_parquet(out_name)
    
    if delete_file: Path(f).unlink()
    
    return fname, site_info

source

find_gaps_fluxnet_archive

 find_gaps_fluxnet_archive (path_zip, out_dir, tmp_dir, delete_file=True)
Type Default Details
path_zip zip file path that uses fluxnet
out_dir
tmp_dir
delete_file bool True
def find_all_gaps(df):
    return pl.concat(
        [find_gap(df, col_name) for col_name in df.select(pl.col("^.*_QC$")).columns]
    )

source

find_all_gaps

 find_all_gaps (df)
gaps_all = find_all_gaps(df).collect()
gaps_all
shape: (388218, 3)
gap_start gap_end variable
i64 i64 str
200007261200 200007261200 "TA_F_MDS_QC"
200007271300 200007271300 "TA_F_MDS_QC"
200008021030 200008021030 "TA_F_MDS_QC"
200008161230 200008161230 "TA_F_MDS_QC"
200008231100 200008231100 "TA_F_MDS_QC"
200009201230 200009201230 "TA_F_MDS_QC"
200012040130 200012040130 "TA_F_MDS_QC"
200107121700 200107121700 "TA_F_MDS_QC"
200108191000 200108191000 "TA_F_MDS_QC"
200110231300 200110231300 "TA_F_MDS_QC"
200112201300 200112201300 "TA_F_MDS_QC"
200206051130 200206051130 "TA_F_MDS_QC"
... ... ...
201212292200 201212292300 "NEE_VUT_95_QC"
201212300030 201212300330 "NEE_VUT_95_QC"
201212300800 201212300900 "NEE_VUT_95_QC"
201212301000 201212301200 "NEE_VUT_95_QC"
201212301430 201212301700 "NEE_VUT_95_QC"
201212301800 201212301900 "NEE_VUT_95_QC"
201212302230 201212302300 "NEE_VUT_95_QC"
201212310130 201212310200 "NEE_VUT_95_QC"
201212310300 201212310330 "NEE_VUT_95_QC"
201212310930 201212311030 "NEE_VUT_95_QC"
201212311500 201212311530 "NEE_VUT_95_QC"
201212311630 201212311700 "NEE_VUT_95_QC"
find_gaps_fluxnet_archive(test_file_zip, out_dir, tmp_dir)
NotFoundError: TIMESTAMP_END

loading correctly from disk

pl.read_parquet(out_dir / "GAPS_stat_FLX_DE-Hai_FLUXNET2015_FULLSET_HH_2000-2012_1-4.parquet")
shape: (388218, 3)
TIMESTAMP_END gap_len variable
i64 u32 str
200007261200 1 "TA_F_MDS_QC"
200007271300 1 "TA_F_MDS_QC"
200008021030 1 "TA_F_MDS_QC"
200008161230 1 "TA_F_MDS_QC"
200008231100 1 "TA_F_MDS_QC"
200009201230 1 "TA_F_MDS_QC"
200012040130 1 "TA_F_MDS_QC"
200107121700 1 "TA_F_MDS_QC"
200108191000 1 "TA_F_MDS_QC"
200110231300 1 "TA_F_MDS_QC"
200112201300 1 "TA_F_MDS_QC"
200206051130 1 "TA_F_MDS_QC"
... ... ...
201212292200 3 "NEE_VUT_95_QC"
201212300030 7 "NEE_VUT_95_QC"
201212300800 3 "NEE_VUT_95_QC"
201212301000 5 "NEE_VUT_95_QC"
201212301430 6 "NEE_VUT_95_QC"
201212301800 3 "NEE_VUT_95_QC"
201212302230 2 "NEE_VUT_95_QC"
201212310130 2 "NEE_VUT_95_QC"
201212310300 2 "NEE_VUT_95_QC"
201212310930 3 "NEE_VUT_95_QC"
201212311500 2 "NEE_VUT_95_QC"
201212311630 2 "NEE_VUT_95_QC"
url = "https://ftp.fluxdata.org/.fluxnet_downloads_86523/FLUXNET2015/FLX_AU-Cpr_FLUXNET2015_FULLSET_2010-2014_2-4.zip?=mone27"
re.search(r"[A-Z]{2}-[A-z]{3}", url).group()
'AU-Cpr'
re.search(r"([^/]*)\?", url).group()[:-1]
'FLX_AU-Cpr_FLUXNET2015_FULLSET_2010-2014_2-4.zip'
re.search(r"([^/]*)\?", url).group()[:-1]
'FLX_AU-Cpr_FLUXNET2015_FULLSET_2010-2014_2-4.zip'
requests.head(url).headers['Content-Length']
'48304433'
def download_fluxnet(url, download_dir):
    
    
    file_name = download_dir / re.search(r"([^/]*)\?", url).group()[:-1] 
    
    if file_name.exists(): return file_name
    
    n_iter = int(requests.head(url).headers['Content-Length']) / 1024
    r = requests.get(url, allow_redirects=True, stream=True)
    n_iter = int(r.headers['Content-Length'])
    with open(file_name, 'wb') as file:
        with tqdm(total=n_iter, unit_divisor=1024, unit_scale=True, unit='B') as pbar:
            for chunk in r.iter_content(chunk_size=1024):
                if chunk:
                    file.write(chunk)
                    pbar.set_postfix(site=file_name.name[:10], refresh=False)
                    pbar.update(1024) # one chunck
    
    return file_name

source

download_fluxnet

 download_fluxnet (url, download_dir)
download_fluxnet("https://ftp.fluxdata.org/.fluxnet_downloads_86523/FLUXNET2015/FLX_AU-Cpr_FLUXNET2015_FULLSET_2010-2014_2-4.zip?=mone27", download_dir)
FileNotFoundError: [Errno 2] No such file or directory: '/run/media/simone/Simone DATI/fluxnet_all/FLX_AU-Cpr_FLUXNET2015_FULLSET_2010-2014_2-4.zip'
Path("/home/simone/Downloads/fluxnet_all").exists()

Find gaps

def _find_gap_df(df, col_name):
    "Find gaps with a df with a single QC column"
    return df.filter(
        pl.col(col_name) != 0
    ).with_columns([
        (pl.col("row_num") - pl.col("row_num").shift() ).alias("before"),
        (pl.col("row_num").shift(-1) - pl.col("row_num")).alias("after"),
    ]).filter(
        (pl.col("before") != 1) | (pl.col("after") != 1)
    ).with_column(
        (pl.when((pl.col("before") != 1) & (pl.col("after") != 1))
        .then(pl.col("start"))
        .otherwise(pl.col("start").shift(-1))
        .alias("gap_end"))
    ).filter(
        pl.col("before") != 1
    ).select(
        [pl.col("start").alias("gap_start"), "gap_end"]
    )
def find_gap_variable(df, col_name):
    
    # row numembering has to happen before filtering
    df = df.with_column(
        pl.first().cumcount().alias("row_num")
    )
    
    # start with null values
    dff = df.filter(
            pl.col(col_name).is_null()
        )
    gaps = [
        _find_gap_df(dff, col_name).with_columns(
            pl.lit(None).alias("gap_value")
        )]
    
    
    # all other values
    # here the QC flags are merged together as we we not interested in the QC alg only if there is a gap
    dff = df.filter(
            ~pl.col(col_name).is_null()
        )
    gaps.append(
        _find_gap_df(dff, col_name).with_columns(
            pl.lit(1).alias("gap_value")
        ))
    
    return pl.concat(gaps)
find_gap_variable(df, "SW_IN_F_QC").collect().head()
gaps_all.groupby("variable").agg(pl.col("gap_len").sum() / df.collect().shape[0])
def download_and_find_gaps(urls, download_dir, out_dir, tmp_dir):
    site_infos = []
    for url in tqdm(urls):
        file_zip = download_fluxnet(url, download_dir)
        file, site_info = find_gaps_fluxnet_archive(file_zip, out_dir, tmp_dir)
        site_infos.append(site_info)
        print(file)
        
    return pl.concat(site_infos)

source

download_and_find_gaps

 download_and_find_gaps (urls, download_dir, out_dir, tmp_dir)
urls = ["https://ftp.fluxdata.org/.fluxnet_downloads_86523/FLUXNET2015/FLX_AR-SLu_FLUXNET2015_FULLSET_2009-2011_1-4.zip?=mone27",
"https://ftp.fluxdata.org/.fluxnet_downloads_86523/FLUXNET2015/FLX_AR-Vir_FLUXNET2015_FULLSET_2009-2012_1-4.zip?=mone27"]
download_and_find_gaps(urls, download_dir, out_dir, tmp_dir)