csv_batcher.test_csv_pooler
1from csv_batcher.utils.time import time_and_log 2from csv_batcher.csv_pooler import CSVPooler, CallbackWith 3import pandas as pd 4 5def __process_dataframe_row(row): 6 return row.iloc[0] 7 8def __process_csv_filename(csv_chunk_filename): 9 # print("processing ", csv_chunk_filename) 10 df = pd.read_csv(csv_chunk_filename, skipinitialspace=True, index_col=None) 11 return df.apply(__process_dataframe_row, axis=1) 12 13def __process_as_dataframe(df): 14 return df.apply(__process_dataframe_row, axis=1) 15 16def test_big_file_as_csv(): 17 with time_and_log("test_big_file_as_csv"): 18 pooler = CSVPooler("5mSalesRecords.csv", __process_csv_filename) 19 for processed_batch in pooler.process(): 20 assert isinstance(processed_batch, pd.Series) 21 22def test_big_file_as_dataframe(): 23 with time_and_log("test_big_file_as_dataframe"): 24 pooler = CSVPooler("5mSalesRecords.csv", __process_as_dataframe, callback_with=CallbackWith.DATAFRAME) 25 for processed_batch in pooler.process(): 26 assert isinstance(processed_batch, pd.Series) 27 28def test_big_file_as_dataframe_rows(): 29 with time_and_log("test_big_file_as_dataframe_rows"): 30 pooler = CSVPooler("5mSalesRecords.csv", __process_dataframe_row, callback_with=CallbackWith.DATAFRAME_ROW) 31 for processed_batch in pooler.process(): 32 assert isinstance(processed_batch, pd.Series) 33 34def test_no_pooler(): 35 with time_and_log("test_no_pooler"): 36 __process_csv_filename("5mSalesRecords.csv") 37 38 39if __name__ == '__main__': 40 test_big_file_as_csv() 41 test_big_file_as_dataframe() 42 test_big_file_as_dataframe_rows() 43 # test_migrator_idempotency()
def
test_big_file_as_csv():
def
test_big_file_as_dataframe():
def
test_big_file_as_dataframe_rows():
29def test_big_file_as_dataframe_rows(): 30 with time_and_log("test_big_file_as_dataframe_rows"): 31 pooler = CSVPooler("5mSalesRecords.csv", __process_dataframe_row, callback_with=CallbackWith.DATAFRAME_ROW) 32 for processed_batch in pooler.process(): 33 assert isinstance(processed_batch, pd.Series)
def
test_no_pooler():