csv_batcher.test_csv_pooler

 1from csv_batcher.utils.time import time_and_log
 2from csv_batcher.csv_pooler import CSVPooler, CallbackWith
 3import pandas as pd
 4
 5def __process_dataframe_row(row):
 6    return row.iloc[0]
 7
 8def __process_csv_filename(csv_chunk_filename):
 9    # print("processing ", csv_chunk_filename)
10    df = pd.read_csv(csv_chunk_filename, skipinitialspace=True, index_col=None)
11    return df.apply(__process_dataframe_row, axis=1)
12
13def __process_as_dataframe(df):
14    return df.apply(__process_dataframe_row, axis=1)
15
16def test_big_file_as_csv():
17    with time_and_log("test_big_file_as_csv"):
18        pooler = CSVPooler("5mSalesRecords.csv", __process_csv_filename)
19        for processed_batch in pooler.process():
20            assert isinstance(processed_batch, pd.Series)
21
22def test_big_file_as_dataframe():
23    with time_and_log("test_big_file_as_dataframe"):
24        pooler = CSVPooler("5mSalesRecords.csv", __process_as_dataframe, callback_with=CallbackWith.DATAFRAME)
25        for processed_batch in pooler.process():
26            assert isinstance(processed_batch, pd.Series)
27
28def test_big_file_as_dataframe_rows():
29    with time_and_log("test_big_file_as_dataframe_rows"):
30        pooler = CSVPooler("5mSalesRecords.csv", __process_dataframe_row, callback_with=CallbackWith.DATAFRAME_ROW)
31        for processed_batch in pooler.process():
32            assert isinstance(processed_batch, pd.Series)
33
34def test_no_pooler():
35    with time_and_log("test_no_pooler"):
36        __process_csv_filename("5mSalesRecords.csv")
37
38
39if __name__ == '__main__':
40    test_big_file_as_csv()
41    test_big_file_as_dataframe()
42    test_big_file_as_dataframe_rows()
43    # test_migrator_idempotency()
def test_big_file_as_csv():
17def test_big_file_as_csv():
18    with time_and_log("test_big_file_as_csv"):
19        pooler = CSVPooler("5mSalesRecords.csv", __process_csv_filename)
20        for processed_batch in pooler.process():
21            assert isinstance(processed_batch, pd.Series)
def test_big_file_as_dataframe():
23def test_big_file_as_dataframe():
24    with time_and_log("test_big_file_as_dataframe"):
25        pooler = CSVPooler("5mSalesRecords.csv", __process_as_dataframe, callback_with=CallbackWith.DATAFRAME)
26        for processed_batch in pooler.process():
27            assert isinstance(processed_batch, pd.Series)
def test_big_file_as_dataframe_rows():
29def test_big_file_as_dataframe_rows():
30    with time_and_log("test_big_file_as_dataframe_rows"):
31        pooler = CSVPooler("5mSalesRecords.csv", __process_dataframe_row, callback_with=CallbackWith.DATAFRAME_ROW)
32        for processed_batch in pooler.process():
33            assert isinstance(processed_batch, pd.Series)
def test_no_pooler():
35def test_no_pooler():
36    with time_and_log("test_no_pooler"):
37        __process_csv_filename("5mSalesRecords.csv")