csv_batcher.csv_pooler
1from enum import StrEnum, auto 2from multiprocessing import Pool 3from typing import Callable 4 5import pandas as pd 6 7from csv_batcher.csv_splitter import CSVSplitter 8from csv_batcher.utils.logger import logging 9 10class CallbackWith(StrEnum): 11 """ 12 CallbackWith Enum, used to control what is passed to callback function 13 """ 14 # Pass the chunked CSV filename: 15 CSV_FILENAME = auto() 16 17 # Pass a Dataframe created from the chunked CSV file: 18 DATAFRAME = auto() 19 20 # First creates a DataFrame, then calls `apply()` with the callback 21 # function. This causes the callback function to be called with 22 # each row as a `pd.Series` object: 23 DATAFRAME_ROW = auto() 24class CSVPooler: 25 def __init__( 26 self, 27 csv_filename: str, 28 process_fn: Callable, 29 callback_with: CallbackWith = CallbackWith.CSV_FILENAME, 30 pool_size: int = 5, 31 chunk_lines: int = 10000, 32 ): 33 """ 34 Construct `Pooler` with given `csv_filename`, `process_fn`, `as_dataframe`, `pool_size', 'chunk_size' 35 36 Args: 37 csv_filename (str): Name of CSV file 38 process_fn (Callable): A function that accepts a single argument 39 By default, this is the path to a chunked CSV file 40 If `as_dataframe` is True, then the argument sent is a dataframe of the chunked CSV 41 callback_with (CallbackWith): Controls what is sent to callback function. 42 @see CallbackWith enumeration for details 43 Defaults to CallbackWith.CSV_FILENAME. 44 as_dataframe_rows (bool): When True, a dataframe is created as with as_dataframe. 45 that is sent to `process_fn` instead. Defaults to False. 46 pool_size (int, optional): Number of workers to uses. Defaults to 8. 47 chunk_lines (int, optional): Target row count for each chunked CSV. Last chunk may 48 have fewer rows. Defaults to 10000. 49 """ 50 self.csv_filename = csv_filename 51 self.process_fn = process_fn 52 self.callback_with = callback_with 53 self.pool_size = pool_size 54 self.chunk_lines = chunk_lines 55 56 def process(self): 57 """ 58 Processes `self.csv_filename` by using `CSVSplitter` to split it 59 into multiple temporary files defined by `self.chunk_lines`. 60 Use `multiprocessing.Pool` to use multiple process workers to process 61 the group of CSVs. 62 """ 63 processed_count = 0 64 csv_splitter = CSVSplitter(self.csv_filename, self.chunk_lines) 65 try: 66 csv_file_cnt = len(csv_splitter.csv_files()) 67 logging.info(f"Pooling against {csv_file_cnt} files") 68 with Pool(self.pool_size) as p: 69 for result, count in p.imap(self._process_csv, csv_splitter.csv_files()): 70 yield(result) 71 processed_count += count 72 finally: 73 csv_splitter.cleanup() 74 75 logging.info(f"Processed {processed_count} rows from {csv_file_cnt} CSV Files") 76 77 def _process_csv(self, csv_chunk_filename): 78 if self.callback_with == CallbackWith.CSV_FILENAME: 79 result = self.process_fn(csv_chunk_filename) 80 with open(csv_chunk_filename) as f: 81 # Get total lines and subtract for header: 82 count = sum(1 for line in f) - 1 83 elif self.callback_with == CallbackWith.DATAFRAME: 84 df = pd.read_csv(csv_chunk_filename, skipinitialspace=True, index_col=None) 85 count = df.shape[0] 86 result = self.process_fn(df) 87 elif self.callback_with == CallbackWith.DATAFRAME_ROW: 88 df = pd.read_csv(csv_chunk_filename, skipinitialspace=True, index_col=None) 89 count = df.shape[0] 90 result = df.apply(self.process_fn, axis=1) 91 92 return result, count
11class CallbackWith(StrEnum): 12 """ 13 CallbackWith Enum, used to control what is passed to callback function 14 """ 15 # Pass the chunked CSV filename: 16 CSV_FILENAME = auto() 17 18 # Pass a Dataframe created from the chunked CSV file: 19 DATAFRAME = auto() 20 21 # First creates a DataFrame, then calls `apply()` with the callback 22 # function. This causes the callback function to be called with 23 # each row as a `pd.Series` object: 24 DATAFRAME_ROW = auto()
CallbackWith Enum, used to control what is passed to callback function
Inherited Members
- enum.Enum
- name
- value
- builtins.str
- encode
- replace
- split
- rsplit
- join
- capitalize
- casefold
- title
- center
- count
- expandtabs
- find
- partition
- index
- ljust
- lower
- lstrip
- rfind
- rindex
- rjust
- rstrip
- rpartition
- splitlines
- strip
- swapcase
- translate
- upper
- startswith
- endswith
- removeprefix
- removesuffix
- isascii
- islower
- isupper
- istitle
- isspace
- isdecimal
- isdigit
- isnumeric
- isalpha
- isalnum
- isidentifier
- isprintable
- zfill
- format
- format_map
- maketrans
25class CSVPooler: 26 def __init__( 27 self, 28 csv_filename: str, 29 process_fn: Callable, 30 callback_with: CallbackWith = CallbackWith.CSV_FILENAME, 31 pool_size: int = 5, 32 chunk_lines: int = 10000, 33 ): 34 """ 35 Construct `Pooler` with given `csv_filename`, `process_fn`, `as_dataframe`, `pool_size', 'chunk_size' 36 37 Args: 38 csv_filename (str): Name of CSV file 39 process_fn (Callable): A function that accepts a single argument 40 By default, this is the path to a chunked CSV file 41 If `as_dataframe` is True, then the argument sent is a dataframe of the chunked CSV 42 callback_with (CallbackWith): Controls what is sent to callback function. 43 @see CallbackWith enumeration for details 44 Defaults to CallbackWith.CSV_FILENAME. 45 as_dataframe_rows (bool): When True, a dataframe is created as with as_dataframe. 46 that is sent to `process_fn` instead. Defaults to False. 47 pool_size (int, optional): Number of workers to uses. Defaults to 8. 48 chunk_lines (int, optional): Target row count for each chunked CSV. Last chunk may 49 have fewer rows. Defaults to 10000. 50 """ 51 self.csv_filename = csv_filename 52 self.process_fn = process_fn 53 self.callback_with = callback_with 54 self.pool_size = pool_size 55 self.chunk_lines = chunk_lines 56 57 def process(self): 58 """ 59 Processes `self.csv_filename` by using `CSVSplitter` to split it 60 into multiple temporary files defined by `self.chunk_lines`. 61 Use `multiprocessing.Pool` to use multiple process workers to process 62 the group of CSVs. 63 """ 64 processed_count = 0 65 csv_splitter = CSVSplitter(self.csv_filename, self.chunk_lines) 66 try: 67 csv_file_cnt = len(csv_splitter.csv_files()) 68 logging.info(f"Pooling against {csv_file_cnt} files") 69 with Pool(self.pool_size) as p: 70 for result, count in p.imap(self._process_csv, csv_splitter.csv_files()): 71 yield(result) 72 processed_count += count 73 finally: 74 csv_splitter.cleanup() 75 76 logging.info(f"Processed {processed_count} rows from {csv_file_cnt} CSV Files") 77 78 def _process_csv(self, csv_chunk_filename): 79 if self.callback_with == CallbackWith.CSV_FILENAME: 80 result = self.process_fn(csv_chunk_filename) 81 with open(csv_chunk_filename) as f: 82 # Get total lines and subtract for header: 83 count = sum(1 for line in f) - 1 84 elif self.callback_with == CallbackWith.DATAFRAME: 85 df = pd.read_csv(csv_chunk_filename, skipinitialspace=True, index_col=None) 86 count = df.shape[0] 87 result = self.process_fn(df) 88 elif self.callback_with == CallbackWith.DATAFRAME_ROW: 89 df = pd.read_csv(csv_chunk_filename, skipinitialspace=True, index_col=None) 90 count = df.shape[0] 91 result = df.apply(self.process_fn, axis=1) 92 93 return result, count
26 def __init__( 27 self, 28 csv_filename: str, 29 process_fn: Callable, 30 callback_with: CallbackWith = CallbackWith.CSV_FILENAME, 31 pool_size: int = 5, 32 chunk_lines: int = 10000, 33 ): 34 """ 35 Construct `Pooler` with given `csv_filename`, `process_fn`, `as_dataframe`, `pool_size', 'chunk_size' 36 37 Args: 38 csv_filename (str): Name of CSV file 39 process_fn (Callable): A function that accepts a single argument 40 By default, this is the path to a chunked CSV file 41 If `as_dataframe` is True, then the argument sent is a dataframe of the chunked CSV 42 callback_with (CallbackWith): Controls what is sent to callback function. 43 @see CallbackWith enumeration for details 44 Defaults to CallbackWith.CSV_FILENAME. 45 as_dataframe_rows (bool): When True, a dataframe is created as with as_dataframe. 46 that is sent to `process_fn` instead. Defaults to False. 47 pool_size (int, optional): Number of workers to uses. Defaults to 8. 48 chunk_lines (int, optional): Target row count for each chunked CSV. Last chunk may 49 have fewer rows. Defaults to 10000. 50 """ 51 self.csv_filename = csv_filename 52 self.process_fn = process_fn 53 self.callback_with = callback_with 54 self.pool_size = pool_size 55 self.chunk_lines = chunk_lines
Construct Pooler
with given csv_filename
, process_fn
, as_dataframe
, `pool_size', 'chunk_size'
Args:
csv_filename (str): Name of CSV file
process_fn (Callable): A function that accepts a single argument
By default, this is the path to a chunked CSV file
If as_dataframe
is True, then the argument sent is a dataframe of the chunked CSV
callback_with (CallbackWith): Controls what is sent to callback function.
@see CallbackWith enumeration for details
Defaults to CallbackWith.CSV_FILENAME.
as_dataframe_rows (bool): When True, a dataframe is created as with as_dataframe.
that is sent to process_fn
instead. Defaults to False.
pool_size (int, optional): Number of workers to uses. Defaults to 8.
chunk_lines (int, optional): Target row count for each chunked CSV. Last chunk may
have fewer rows. Defaults to 10000.
57 def process(self): 58 """ 59 Processes `self.csv_filename` by using `CSVSplitter` to split it 60 into multiple temporary files defined by `self.chunk_lines`. 61 Use `multiprocessing.Pool` to use multiple process workers to process 62 the group of CSVs. 63 """ 64 processed_count = 0 65 csv_splitter = CSVSplitter(self.csv_filename, self.chunk_lines) 66 try: 67 csv_file_cnt = len(csv_splitter.csv_files()) 68 logging.info(f"Pooling against {csv_file_cnt} files") 69 with Pool(self.pool_size) as p: 70 for result, count in p.imap(self._process_csv, csv_splitter.csv_files()): 71 yield(result) 72 processed_count += count 73 finally: 74 csv_splitter.cleanup() 75 76 logging.info(f"Processed {processed_count} rows from {csv_file_cnt} CSV Files")
Processes self.csv_filename
by using CSVSplitter
to split it
into multiple temporary files defined by self.chunk_lines
.
Use multiprocessing.Pool
to use multiple process workers to process
the group of CSVs.