test_file_zip = here() / Path("../fluxnet/FLX_DE-Hai_FLUXNET2015_FULLSET_2000-2012_1-4.zip" )
test_file = here() / Path("../fluxnet/FLX_DE-Hai_FLUXNET2015_FULLSET_2000-2012_1-4/FLX_DE-Hai_FLUXNET2015_FULLSET_HH_2000-2012_1-4.csv" )
tmp_dir = Path("/tmp" )
out_dir = here() / Path("../fluxnet/gap_stat" )
download_dir = Path("/run/media/simone/Simone DATI/fluxnet_all" )
PosixPath('/home/simone/Documents/uni/Thesis/GPFA_imputation/../fluxnet/FLX_DE-Hai_FLUXNET2015_FULLSET_2000-2012_1-4.zip')
unzip the file and load it lazily with polars
zipf = zipfile.ZipFile(test_file_zip).extract('FLX_DE-Hai_FLUXNET2015_FULLSET_HH_2000-2012_1-4.csv' , path= tmp_dir)
'/tmp/FLX_DE-Hai_FLUXNET2015_FULLSET_HH_2000-2012_1-4.csv'
# load columns names
col_names = pl.read_csv(zipf, n_rows= 1 ).columns
types = {
** {
# pl.Uint8 should be enough for a QC flag, but some columns are floats in the csv ...
col_name: pl.Float32 if col_name.endswith("_QC" ) else pl.Float64 for col_name in col_names
},
"TIMESTAMP_START" : pl.Int64,
"TIMESTAMP_END" : pl.Int64
}
# load df with correct types
df = pl.scan_csv(zipf, null_values= ["-9999" , "-9999.99" ], dtypes= types)
shape: (5, 238)
TIMESTAMP_START
TIMESTAMP_END
TA_F_MDS
TA_F_MDS_QC
TA_ERA
TA_F
TA_F_QC
SW_IN_POT
SW_IN_F_MDS
SW_IN_F_MDS_QC
SW_IN_ERA
SW_IN_F
SW_IN_F_QC
LW_IN_F_MDS
LW_IN_F_MDS_QC
LW_IN_ERA
LW_IN_F
LW_IN_F_QC
LW_IN_JSB
LW_IN_JSB_QC
LW_IN_JSB_ERA
LW_IN_JSB_F
LW_IN_JSB_F_QC
VPD_F_MDS
VPD_F_MDS_QC
VPD_ERA
VPD_F
VPD_F_QC
PA
PA_ERA
PA_F
PA_F_QC
P
P_ERA
P_F
P_F_QC
WS
...
RECO_DT_VUT_75
RECO_DT_VUT_84
RECO_DT_VUT_95
RECO_DT_CUT_REF
RECO_DT_CUT_USTAR50
RECO_DT_CUT_MEAN
RECO_DT_CUT_SE
RECO_DT_CUT_05
RECO_DT_CUT_16
RECO_DT_CUT_25
RECO_DT_CUT_50
RECO_DT_CUT_75
RECO_DT_CUT_84
RECO_DT_CUT_95
GPP_DT_VUT_REF
GPP_DT_VUT_USTAR50
GPP_DT_VUT_MEAN
GPP_DT_VUT_SE
GPP_DT_VUT_05
GPP_DT_VUT_16
GPP_DT_VUT_25
GPP_DT_VUT_50
GPP_DT_VUT_75
GPP_DT_VUT_84
GPP_DT_VUT_95
GPP_DT_CUT_REF
GPP_DT_CUT_USTAR50
GPP_DT_CUT_MEAN
GPP_DT_CUT_SE
GPP_DT_CUT_05
GPP_DT_CUT_16
GPP_DT_CUT_25
GPP_DT_CUT_50
GPP_DT_CUT_75
GPP_DT_CUT_84
GPP_DT_CUT_95
RECO_SR
i64
i64
f64
f32
f64
f64
f32
f64
f64
f32
f64
f64
f32
f64
f32
f64
f64
f32
f64
f32
f64
f64
f32
f64
f32
f64
f64
f32
f64
f64
f64
f32
f64
f64
f64
f32
f64
...
f64
f64
f64
f64
f64
f64
f64
f64
f64
f64
f64
f64
f64
f64
f64
f64
f64
f64
f64
f64
f64
f64
f64
f64
f64
f64
f64
f64
f64
f64
f64
f64
f64
f64
f64
f64
f64
200001010000
200001010030
-0.6
0.0
-0.349
-0.6
0.0
0.0
0.0
0.0
0.0
0.0
0.0
null
null
302.475
302.475
2.0
270.92
0.0
307.452
270.92
0.0
0.222
0.0
0.403
0.222
0.0
96.63
96.671
96.63
0.0
0.0
0.011
0.0
0.0
2.05
...
1.04549
1.04774
1.04848
1.02086
1.02913
1.02528
0.006221
0.938402
0.965572
1.0197
1.03409
1.04549
1.04669
1.04848
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
1.23379
200001010030
200001010100
-0.65
0.0
-0.39
-0.65
0.0
0.0
0.0
0.0
0.0
0.0
0.0
null
null
302.475
302.475
2.0
271.265
0.0
307.452
271.265
0.0
0.122
0.0
0.399
0.122
0.0
96.58
96.676
96.58
0.0
0.0
0.011
0.0
0.0
2.53
...
1.04422
1.04647
1.0472
1.01962
1.02788
1.02403
0.006213
0.937264
0.964401
1.01847
1.03283
1.04422
1.04542
1.0472
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
1.23238
200001010100
200001010130
-0.58
0.0
-0.43
-0.58
0.0
0.0
0.0
0.0
0.0
0.0
0.0
null
null
301.677
301.677
2.0
271.958
0.0
306.541
271.958
0.0
0.09
0.0
0.396
0.09
0.0
96.56
96.682
96.56
0.0
0.0
0.0
0.0
0.0
3.15
...
1.046
1.04825
1.04898
1.02135
1.02963
1.02577
0.006224
0.938856
0.96604
1.0202
1.03459
1.046
1.0472
1.04898
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
1.23238
200001010130
200001010200
-0.51
0.0
-0.439
-0.51
0.0
0.0
0.0
0.0
0.0
0.0
0.0
null
null
301.677
301.677
2.0
272.292
0.0
306.541
272.292
0.0
0.11
0.0
0.411
0.11
0.0
96.56
96.673
96.56
0.0
0.0
0.0
0.0
0.0
3.12
...
1.04777
1.05003
1.05076
1.02308
1.03137
1.02751
0.006234
0.940447
0.967676
1.02193
1.03634
1.04777
1.04897
1.05076
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
1.23521
200001010200
200001010230
-0.49
0.0
-0.449
-0.49
0.0
0.0
0.0
0.0
0.0
0.0
0.0
null
null
301.677
301.677
2.0
272.481
0.0
306.541
272.481
0.0
0.102
0.0
0.426
0.102
0.0
96.57
96.665
96.57
0.0
0.0
0.0
0.0
0.0
3.04
...
1.04827
1.05054
1.05127
1.02357
1.03187
1.02801
0.006237
0.940901
0.968143
1.02242
1.03684
1.04827
1.04948
1.05127
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
1.23379
columns selection, interested only in QC columns to find gaps
df.head().select(pl.col("^.*_QC$" )).collect().columns
['TA_F_MDS_QC',
'TA_F_QC',
'SW_IN_F_MDS_QC',
'SW_IN_F_QC',
'LW_IN_F_MDS_QC',
'LW_IN_F_QC',
'LW_IN_JSB_QC',
'LW_IN_JSB_F_QC',
'VPD_F_MDS_QC',
'VPD_F_QC',
'PA_F_QC',
'P_F_QC',
'WS_F_QC',
'CO2_F_MDS_QC',
'TS_F_MDS_1_QC',
'TS_F_MDS_2_QC',
'TS_F_MDS_3_QC',
'TS_F_MDS_4_QC',
'TS_F_MDS_5_QC',
'SWC_F_MDS_1_QC',
'SWC_F_MDS_2_QC',
'SWC_F_MDS_3_QC',
'G_F_MDS_QC',
'LE_F_MDS_QC',
'H_F_MDS_QC',
'NEE_CUT_REF_QC',
'NEE_VUT_REF_QC',
'NEE_CUT_USTAR50_QC',
'NEE_VUT_USTAR50_QC',
'NEE_CUT_MEAN_QC',
'NEE_VUT_MEAN_QC',
'NEE_CUT_05_QC',
'NEE_CUT_16_QC',
'NEE_CUT_25_QC',
'NEE_CUT_50_QC',
'NEE_CUT_75_QC',
'NEE_CUT_84_QC',
'NEE_CUT_95_QC',
'NEE_VUT_05_QC',
'NEE_VUT_16_QC',
'NEE_VUT_25_QC',
'NEE_VUT_50_QC',
'NEE_VUT_75_QC',
'NEE_VUT_84_QC',
'NEE_VUT_95_QC']
The goal is to find where the data is missing in the dataset (which means that it has been gap-filled) and find:
start time of gap
length of the gap
we filter out the rows where there is no gap (QC =0)
then find the start and end of gap by comparing with the original row number of the previous entry
df.select(
[pl.col("TA_F_QC" ), "TIMESTAMP_START" ]
).with_column(
pl.first().cumcount().alias("row_num" )
).filter (
pl.col("TA_F_QC" ) != 0
).slice (10 , 10 ).collect()
shape: (10, 3)
TA_F_QC
TIMESTAMP_START
row_num
f32
i64
u32
1.0
200112201230
34537
1.0
200206051100
42550
1.0
200211081330
50043
1.0
200404200930
75427
2.0
200404201000
75428
2.0
200404201030
75429
2.0
200404201100
75430
2.0
200404201130
75431
2.0
200404201200
75432
2.0
200404201230
75433
df.select(
[pl.col("TA_F_QC" ), "TIMESTAMP_START" ]
).with_column(
pl.first().cumcount().alias("row_num" )
).filter (
pl.col("TA_F_QC" ) != 0
).slice (10 , 15 ).with_columns([
(pl.col("row_num" ) - pl.col("row_num" ).shift() ).alias("before" ),
(pl.col("row_num" ).shift(- 1 ) - pl.col("row_num" )).alias("after" ),
]).collect()
shape: (15, 5)
TA_F_QC
TIMESTAMP_START
row_num
before
after
f32
i64
u32
u32
u32
1.0
200112201230
34537
null
8013
1.0
200206051100
42550
8013
7493
1.0
200211081330
50043
7493
25384
1.0
200404200930
75427
25384
1
2.0
200404201000
75428
1
1
2.0
200404201030
75429
1
1
2.0
200404201100
75430
1
1
2.0
200404201130
75431
1
1
2.0
200404201200
75432
1
1
2.0
200404201230
75433
1
1
1.0
200404201300
75434
1
46940
1.0
200612241100
122374
46940
1
1.0
200612241130
122375
1
1
1.0
200612241200
122376
1
1
1.0
200612241230
122377
1
null
df.select(
[pl.col("TA_F_QC" ), "TIMESTAMP_START" ]
).with_column(
pl.first().cumcount().alias("row_num" )
).filter (
pl.col("TA_F_QC" ) != 0
).slice (10 , 15 ).with_columns([
(pl.col("row_num" ) - pl.col("row_num" ).shift() ).alias("before" ),
(pl.col("row_num" ).shift(- 1 ) - pl.col("row_num" )).alias("after" ),
]).filter (
(pl.col("before" ) != 1 ) | (pl.col("after" ) != 1 )
).with_column(
(pl.when((pl.col("before" ) != 1 ) & (pl.col("after" ) != 1 ))
.then(1 )
.otherwise(pl.col("row_num" ).shift(- 1 ) - pl.col("row_num" ) + 1 )
.alias("gap_len" ))
).collect()
shape: (7, 6)
TA_F_QC
TIMESTAMP_START
row_num
before
after
gap_len
f32
i64
u32
u32
u32
u32
1.0
200112201230
34537
null
8013
1
1.0
200206051100
42550
8013
7493
1
1.0
200211081330
50043
7493
25384
1
1.0
200404200930
75427
25384
1
8
1.0
200404201300
75434
1
46940
46941
1.0
200612241100
122374
46940
1
4
1.0
200612241230
122377
1
null
null
gaps = df.select(
[pl.col("TA_F_QC" ), "TIMESTAMP_START" ]
).with_column(
pl.first().cumcount().alias("row_num" )
).filter (
pl.col("TA_F_QC" ) != 0
).with_columns([
(pl.col("row_num" ) - pl.col("row_num" ).shift() ).alias("before" ),
(pl.col("row_num" ).shift(- 1 ) - pl.col("row_num" )).alias("after" ),
]).filter (
(pl.col("before" ) != 1 ) | (pl.col("after" ) != 1 )
).with_column(
(pl.when((pl.col("before" ) != 1 ) & (pl.col("after" ) != 1 ))
.then(1 )
.otherwise(pl.col("row_num" ).shift(- 1 ) - pl.col("row_num" ) + 1 )
.alias("gap_len" ))
).filter (
pl.col("before" ) != 1
).select(
["TIMESTAMP_START" , "gap_len" ]
).collect()
shape: (25, 2)
TIMESTAMP_START
gap_len
i64
u32
200007261130
1
200007271230
1
200008021000
1
200008161200
1
200008231030
1
200009201200
1
200012040100
1
200107121630
1
200108190930
1
200110231230
1
200112201230
1
200206051100
1
200211081330
1
200404200930
8
200612241100
15
200904021230
1
200912281130
341
201003161130
1490
201007010000
22
201104211030
1
201105051030
1
201108111030
1008
201206042230
4
201209250930
47
201210090930
3
gaps.select(pl.col("gap_len" ).sum ())
df.select(
[pl.col("TA_F_QC" ), "TIMESTAMP_START" ]
).with_column(
pl.first().cumcount().alias("row_num" )
).filter (
pl.col("TA_F_QC" ) != 0
).collect().shape
it works!
def find_gap(df, col_name):
return df.select(
[col_name, pl.col("TIMESTAMP_END" ).alias("gap_start" )]
).with_column(
pl.first().cumcount().alias("row_num" )
).filter (
pl.col(col_name) != 0
).with_columns([
(pl.col("row_num" ) - pl.col("row_num" ).shift() ).alias("before" ),
(pl.col("row_num" ).shift(- 1 ) - pl.col("row_num" )).alias("after" ),
]).filter (
(pl.col("before" ) != 1 ) | (pl.col("after" ) != 1 )
).with_column(
(pl.when((pl.col("before" ) != 1 ) & (pl.col("after" ) != 1 ))
.then(pl.col("gap_start" ))
.otherwise(pl.col("gap_start" ).shift(- 1 ))
.alias("gap_end" ))
).filter (
pl.col("before" ) != 1
).select(
["gap_start" , "gap_end" , pl.lit(col_name).alias("variable" )]
)
source
find_gap
find_gap (df, col_name)
df_test = pl.DataFrame({'var' : [0 ,0 ,0 ,0 ,0 ,1 ,1 ,1 ,1 ,0 ,0 ,1 ,1 ,0 ], 'TIMESTAMP_END' : df.head(14 ).collect()['TIMESTAMP_END' ]})
find_gap(df_test, 'var' ).with_column( (pl.col("gap_end" ) - pl.col("gap_start" )).alias("gap_len" ))
shape: (2, 4)
gap_start
gap_end
variable
gap_len
i64
i64
str
i64
200001010300
200001010430
"var"
130
200001010600
200001010630
"var"
30
def scan_fluxnet_csv(f, convert_dates= False ):
# col names may be different between the stations, so read them from the csv before parsing the whole file
col_names = pl.read_csv(f, n_rows= 1 ).columns
types = {
** {
# pl.Uint8 should be enough for a QC flag, but some columns are floats in the csv ...
col_name: pl.Float32 if col_name.endswith("_QC" ) else pl.Float64 for col_name in col_names
},
"TIMESTAMP_START" : pl.Int64, # for now keep as int convert to dates at the end
"TIMESTAMP_END" : pl.Int64
}
df = pl.scan_csv(
f, null_values= ["-9999" , "-9999.99" ], dtypes= types
).rename({
"TIMESTAMP_START" : "start" ,
"TIMESTAMP_END" : "end" ,
})
if convert_dates:
df = df.with_columns([
pl.col("start" ).cast(pl.Utf8).str .strptime(pl.Datetime, "%Y%m %d %H%M" ),
pl.col("end" ).cast(pl.Utf8).str .strptime(pl.Datetime, "%Y%m %d %H%M" ),
])
return df
source
scan_fluxnet_csv
scan_fluxnet_csv (f, convert_dates=False)
scan_fluxnet_csv(test_file).head().collect()
shape: (5, 238)
start
end
TA_F_MDS
TA_F_MDS_QC
TA_ERA
TA_F
TA_F_QC
SW_IN_POT
SW_IN_F_MDS
SW_IN_F_MDS_QC
SW_IN_ERA
SW_IN_F
SW_IN_F_QC
LW_IN_F_MDS
LW_IN_F_MDS_QC
LW_IN_ERA
LW_IN_F
LW_IN_F_QC
LW_IN_JSB
LW_IN_JSB_QC
LW_IN_JSB_ERA
LW_IN_JSB_F
LW_IN_JSB_F_QC
VPD_F_MDS
VPD_F_MDS_QC
VPD_ERA
VPD_F
VPD_F_QC
PA
PA_ERA
PA_F
PA_F_QC
P
P_ERA
P_F
P_F_QC
WS
...
RECO_DT_VUT_75
RECO_DT_VUT_84
RECO_DT_VUT_95
RECO_DT_CUT_REF
RECO_DT_CUT_USTAR50
RECO_DT_CUT_MEAN
RECO_DT_CUT_SE
RECO_DT_CUT_05
RECO_DT_CUT_16
RECO_DT_CUT_25
RECO_DT_CUT_50
RECO_DT_CUT_75
RECO_DT_CUT_84
RECO_DT_CUT_95
GPP_DT_VUT_REF
GPP_DT_VUT_USTAR50
GPP_DT_VUT_MEAN
GPP_DT_VUT_SE
GPP_DT_VUT_05
GPP_DT_VUT_16
GPP_DT_VUT_25
GPP_DT_VUT_50
GPP_DT_VUT_75
GPP_DT_VUT_84
GPP_DT_VUT_95
GPP_DT_CUT_REF
GPP_DT_CUT_USTAR50
GPP_DT_CUT_MEAN
GPP_DT_CUT_SE
GPP_DT_CUT_05
GPP_DT_CUT_16
GPP_DT_CUT_25
GPP_DT_CUT_50
GPP_DT_CUT_75
GPP_DT_CUT_84
GPP_DT_CUT_95
RECO_SR
i64
i64
f64
f32
f64
f64
f32
f64
f64
f32
f64
f64
f32
f64
f32
f64
f64
f32
f64
f32
f64
f64
f32
f64
f32
f64
f64
f32
f64
f64
f64
f32
f64
f64
f64
f32
f64
...
f64
f64
f64
f64
f64
f64
f64
f64
f64
f64
f64
f64
f64
f64
f64
f64
f64
f64
f64
f64
f64
f64
f64
f64
f64
f64
f64
f64
f64
f64
f64
f64
f64
f64
f64
f64
f64
200001010000
200001010030
-0.6
0.0
-0.349
-0.6
0.0
0.0
0.0
0.0
0.0
0.0
0.0
null
null
302.475
302.475
2.0
270.92
0.0
307.452
270.92
0.0
0.222
0.0
0.403
0.222
0.0
96.63
96.671
96.63
0.0
0.0
0.011
0.0
0.0
2.05
...
1.04549
1.04774
1.04848
1.02086
1.02913
1.02528
0.006221
0.938402
0.965572
1.0197
1.03409
1.04549
1.04669
1.04848
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
1.23379
200001010030
200001010100
-0.65
0.0
-0.39
-0.65
0.0
0.0
0.0
0.0
0.0
0.0
0.0
null
null
302.475
302.475
2.0
271.265
0.0
307.452
271.265
0.0
0.122
0.0
0.399
0.122
0.0
96.58
96.676
96.58
0.0
0.0
0.011
0.0
0.0
2.53
...
1.04422
1.04647
1.0472
1.01962
1.02788
1.02403
0.006213
0.937264
0.964401
1.01847
1.03283
1.04422
1.04542
1.0472
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
1.23238
200001010100
200001010130
-0.58
0.0
-0.43
-0.58
0.0
0.0
0.0
0.0
0.0
0.0
0.0
null
null
301.677
301.677
2.0
271.958
0.0
306.541
271.958
0.0
0.09
0.0
0.396
0.09
0.0
96.56
96.682
96.56
0.0
0.0
0.0
0.0
0.0
3.15
...
1.046
1.04825
1.04898
1.02135
1.02963
1.02577
0.006224
0.938856
0.96604
1.0202
1.03459
1.046
1.0472
1.04898
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
1.23238
200001010130
200001010200
-0.51
0.0
-0.439
-0.51
0.0
0.0
0.0
0.0
0.0
0.0
0.0
null
null
301.677
301.677
2.0
272.292
0.0
306.541
272.292
0.0
0.11
0.0
0.411
0.11
0.0
96.56
96.673
96.56
0.0
0.0
0.0
0.0
0.0
3.12
...
1.04777
1.05003
1.05076
1.02308
1.03137
1.02751
0.006234
0.940447
0.967676
1.02193
1.03634
1.04777
1.04897
1.05076
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
1.23521
200001010200
200001010230
-0.49
0.0
-0.449
-0.49
0.0
0.0
0.0
0.0
0.0
0.0
0.0
null
null
301.677
301.677
2.0
272.481
0.0
306.541
272.481
0.0
0.102
0.0
0.426
0.102
0.0
96.57
96.665
96.57
0.0
0.0
0.0
0.0
0.0
3.04
...
1.04827
1.05054
1.05127
1.02357
1.03187
1.02801
0.006237
0.940901
0.968143
1.02242
1.03684
1.04827
1.04948
1.05127
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
1.23379
def get_site_info(df):
return df.select([
pl.col("start" ).first(),
pl.col("end" ).last()
]).collect()
source
get_site_info
get_site_info (df)
def _get_site_url(url): return re.search(r"[A-Z] {2} -[A-z0-9] {3} " , url).group()
def find_gaps_fluxnet_archive(path_zip, # zip file path that uses fluxnet
out_dir,
tmp_dir,
delete_file = True
):
try :
fname = path_zip.stem.replace("FULLSET" , "FULLSET_HH" )
out_name = out_dir / f"GAPS_stat_ { fname} .parquet"
f = zipfile.ZipFile(path_zip).extract(fname + ".csv" , path= tmp_dir)
except KeyError :
fname = path_zip.stem.replace("FULLSET" , "FULLSET_HR" ) # some sites are naed differently
out_name = out_dir / f"GAPS_stat_ { fname} .parquet"
f = zipfile.ZipFile(path_zip).extract(fname + ".csv" , path= tmp_dir)
df = scan_fluxnet_csv(f)
gaps = find_all_gaps(df).collect()
# site info
site = _get_site_url(fname)
site_info = get_site_info(df, site)
gaps = gaps.with_column(
pl.lit(site_info[0 , "site_start" ]).alias("site_start" ),
pl.lit(site_info[0 , "site_end" ]).alias("site_end" ),
pl.lit(site).alias("site" ),
)
# convert dates to correct type
gaps = gaps.with_column(
pl.col("start" ).str .strptime(pl.Datetime, "%Y%m %d %H%M" ),
pl.col("end" ).str .strptime(pl.Datetime, "%Y%m %d %H%M" ),
)
gaps.write_parquet(out_name)
if delete_file: Path(f).unlink()
return fname, site_info
source
find_gaps_fluxnet_archive
find_gaps_fluxnet_archive (path_zip, out_dir, tmp_dir, delete_file=True)
path_zip
zip file path that uses fluxnet
out_dir
tmp_dir
delete_file
bool
True
def find_all_gaps(df):
return pl.concat(
[find_gap(df, col_name) for col_name in df.select(pl.col("^.*_QC$" )).columns]
)
source
find_all_gaps
find_all_gaps (df)
gaps_all = find_all_gaps(df).collect()
shape: (388218, 3)
gap_start
gap_end
variable
i64
i64
str
200007261200
200007261200
"TA_F_MDS_QC"
200007271300
200007271300
"TA_F_MDS_QC"
200008021030
200008021030
"TA_F_MDS_QC"
200008161230
200008161230
"TA_F_MDS_QC"
200008231100
200008231100
"TA_F_MDS_QC"
200009201230
200009201230
"TA_F_MDS_QC"
200012040130
200012040130
"TA_F_MDS_QC"
200107121700
200107121700
"TA_F_MDS_QC"
200108191000
200108191000
"TA_F_MDS_QC"
200110231300
200110231300
"TA_F_MDS_QC"
200112201300
200112201300
"TA_F_MDS_QC"
200206051130
200206051130
"TA_F_MDS_QC"
...
...
...
201212292200
201212292300
"NEE_VUT_95_QC"
201212300030
201212300330
"NEE_VUT_95_QC"
201212300800
201212300900
"NEE_VUT_95_QC"
201212301000
201212301200
"NEE_VUT_95_QC"
201212301430
201212301700
"NEE_VUT_95_QC"
201212301800
201212301900
"NEE_VUT_95_QC"
201212302230
201212302300
"NEE_VUT_95_QC"
201212310130
201212310200
"NEE_VUT_95_QC"
201212310300
201212310330
"NEE_VUT_95_QC"
201212310930
201212311030
"NEE_VUT_95_QC"
201212311500
201212311530
"NEE_VUT_95_QC"
201212311630
201212311700
"NEE_VUT_95_QC"
find_gaps_fluxnet_archive(test_file_zip, out_dir, tmp_dir)
NotFoundError: TIMESTAMP_END
loading correctly from disk
pl.read_parquet(out_dir / "GAPS_stat_FLX_DE-Hai_FLUXNET2015_FULLSET_HH_2000-2012_1-4.parquet" )
shape: (388218, 3)
TIMESTAMP_END
gap_len
variable
i64
u32
str
200007261200
1
"TA_F_MDS_QC"
200007271300
1
"TA_F_MDS_QC"
200008021030
1
"TA_F_MDS_QC"
200008161230
1
"TA_F_MDS_QC"
200008231100
1
"TA_F_MDS_QC"
200009201230
1
"TA_F_MDS_QC"
200012040130
1
"TA_F_MDS_QC"
200107121700
1
"TA_F_MDS_QC"
200108191000
1
"TA_F_MDS_QC"
200110231300
1
"TA_F_MDS_QC"
200112201300
1
"TA_F_MDS_QC"
200206051130
1
"TA_F_MDS_QC"
...
...
...
201212292200
3
"NEE_VUT_95_QC"
201212300030
7
"NEE_VUT_95_QC"
201212300800
3
"NEE_VUT_95_QC"
201212301000
5
"NEE_VUT_95_QC"
201212301430
6
"NEE_VUT_95_QC"
201212301800
3
"NEE_VUT_95_QC"
201212302230
2
"NEE_VUT_95_QC"
201212310130
2
"NEE_VUT_95_QC"
201212310300
2
"NEE_VUT_95_QC"
201212310930
3
"NEE_VUT_95_QC"
201212311500
2
"NEE_VUT_95_QC"
201212311630
2
"NEE_VUT_95_QC"
url = "https://ftp.fluxdata.org/.fluxnet_downloads_86523/FLUXNET2015/FLX_AU-Cpr_FLUXNET2015_FULLSET_2010-2014_2-4.zip?=mone27"
re.search(r"[A-Z] {2} -[A-z] {3} " , url).group()
re.search(r"([^/]*)\?" , url).group()[:- 1 ]
'FLX_AU-Cpr_FLUXNET2015_FULLSET_2010-2014_2-4.zip'
re.search(r"([^/]*)\?" , url).group()[:- 1 ]
'FLX_AU-Cpr_FLUXNET2015_FULLSET_2010-2014_2-4.zip'
requests.head(url).headers['Content-Length' ]
def download_fluxnet(url, download_dir):
file_name = download_dir / re.search(r"([^/]*)\?" , url).group()[:- 1 ]
if file_name.exists(): return file_name
n_iter = int (requests.head(url).headers['Content-Length' ]) / 1024
r = requests.get(url, allow_redirects= True , stream= True )
n_iter = int (r.headers['Content-Length' ])
with open (file_name, 'wb' ) as file :
with tqdm(total= n_iter, unit_divisor= 1024 , unit_scale= True , unit= 'B' ) as pbar:
for chunk in r.iter_content(chunk_size= 1024 ):
if chunk:
file .write(chunk)
pbar.set_postfix(site= file_name.name[:10 ], refresh= False )
pbar.update(1024 ) # one chunck
return file_name
source
download_fluxnet
download_fluxnet (url, download_dir)
download_fluxnet("https://ftp.fluxdata.org/.fluxnet_downloads_86523/FLUXNET2015/FLX_AU-Cpr_FLUXNET2015_FULLSET_2010-2014_2-4.zip?=mone27" , download_dir)
FileNotFoundError: [Errno 2] No such file or directory: '/run/media/simone/Simone DATI/fluxnet_all/FLX_AU-Cpr_FLUXNET2015_FULLSET_2010-2014_2-4.zip'
Path("/home/simone/Downloads/fluxnet_all" ).exists()
Find gaps
def _find_gap_df(df, col_name):
"Find gaps with a df with a single QC column"
return df.filter (
pl.col(col_name) != 0
).with_columns([
(pl.col("row_num" ) - pl.col("row_num" ).shift() ).alias("before" ),
(pl.col("row_num" ).shift(- 1 ) - pl.col("row_num" )).alias("after" ),
]).filter (
(pl.col("before" ) != 1 ) | (pl.col("after" ) != 1 )
).with_column(
(pl.when((pl.col("before" ) != 1 ) & (pl.col("after" ) != 1 ))
.then(pl.col("start" ))
.otherwise(pl.col("start" ).shift(- 1 ))
.alias("gap_end" ))
).filter (
pl.col("before" ) != 1
).select(
[pl.col("start" ).alias("gap_start" ), "gap_end" ]
)
def find_gap_variable(df, col_name):
# row numembering has to happen before filtering
df = df.with_column(
pl.first().cumcount().alias("row_num" )
)
# start with null values
dff = df.filter (
pl.col(col_name).is_null()
)
gaps = [
_find_gap_df(dff, col_name).with_columns(
pl.lit(None ).alias("gap_value" )
)]
# all other values
# here the QC flags are merged together as we we not interested in the QC alg only if there is a gap
dff = df.filter (
~ pl.col(col_name).is_null()
)
gaps.append(
_find_gap_df(dff, col_name).with_columns(
pl.lit(1 ).alias("gap_value" )
))
return pl.concat(gaps)
find_gap_variable(df, "SW_IN_F_QC" ).collect().head()
gaps_all.groupby("variable" ).agg(pl.col("gap_len" ).sum () / df.collect().shape[0 ])
def download_and_find_gaps(urls, download_dir, out_dir, tmp_dir):
site_infos = []
for url in tqdm(urls):
file_zip = download_fluxnet(url, download_dir)
file , site_info = find_gaps_fluxnet_archive(file_zip, out_dir, tmp_dir)
site_infos.append(site_info)
print (file )
return pl.concat(site_infos)
source
download_and_find_gaps
download_and_find_gaps (urls, download_dir, out_dir, tmp_dir)
urls = ["https://ftp.fluxdata.org/.fluxnet_downloads_86523/FLUXNET2015/FLX_AR-SLu_FLUXNET2015_FULLSET_2009-2011_1-4.zip?=mone27" ,
"https://ftp.fluxdata.org/.fluxnet_downloads_86523/FLUXNET2015/FLX_AR-Vir_FLUXNET2015_FULLSET_2009-2012_1-4.zip?=mone27" ]
download_and_find_gaps(urls, download_dir, out_dir, tmp_dir)