csv_batcher.csv_pooler

 1from enum import StrEnum, auto
 2from multiprocessing import Pool
 3from typing import Callable
 4
 5import pandas as pd
 6
 7from csv_batcher.csv_splitter import CSVSplitter
 8from csv_batcher.utils.logger import logging
 9
10class CallbackWith(StrEnum):
11    """
12    CallbackWith Enum, used to control what is passed to callback function
13    """
14    # Pass the chunked CSV filename:
15    CSV_FILENAME = auto()
16
17    # Pass a Dataframe created from the chunked CSV file:
18    DATAFRAME = auto()
19
20    # First creates a DataFrame, then calls `apply()` with the callback
21    #   function. This causes the callback function to be called with
22    #   each row as a `pd.Series` object:
23    DATAFRAME_ROW = auto()
24class CSVPooler:
25    def __init__(
26        self,
27        csv_filename: str,
28        process_fn: Callable,
29        callback_with: CallbackWith = CallbackWith.CSV_FILENAME,
30        pool_size: int = 5,
31        chunk_lines: int = 10000,
32    ):
33        """
34        Construct `Pooler` with given `csv_filename`, `process_fn`, `as_dataframe`, `pool_size', 'chunk_size'
35
36        Args:
37            csv_filename (str): Name of CSV file
38            process_fn (Callable): A function that accepts a single argument
39                By default, this is the path to a chunked CSV file
40                If `as_dataframe` is True, then the argument sent is a dataframe of the chunked CSV
41            callback_with (CallbackWith): Controls what is sent to callback function.
42                @see CallbackWith enumeration for details
43                Defaults to CallbackWith.CSV_FILENAME.
44            as_dataframe_rows (bool): When True, a dataframe is created as with as_dataframe.
45                that is sent to `process_fn` instead.  Defaults to False.
46            pool_size (int, optional): Number of workers to uses. Defaults to 8.
47            chunk_lines (int, optional): Target row count for each chunked CSV. Last chunk may
48                have fewer rows.  Defaults to 10000.
49        """
50        self.csv_filename = csv_filename
51        self.process_fn = process_fn
52        self.callback_with = callback_with
53        self.pool_size = pool_size
54        self.chunk_lines = chunk_lines
55
56    def process(self):
57        """
58        Processes `self.csv_filename` by using `CSVSplitter` to split it
59            into multiple temporary files defined by `self.chunk_lines`.
60        Use `multiprocessing.Pool` to use multiple process workers to process
61        the group of CSVs.
62        """
63        processed_count = 0
64        csv_splitter = CSVSplitter(self.csv_filename, self.chunk_lines)
65        try:
66            csv_file_cnt = len(csv_splitter.csv_files())
67            logging.info(f"Pooling against {csv_file_cnt} files")
68            with Pool(self.pool_size) as p:
69                for result, count in p.imap(self._process_csv, csv_splitter.csv_files()):
70                    yield(result)
71                    processed_count += count
72        finally:
73            csv_splitter.cleanup()
74
75        logging.info(f"Processed {processed_count} rows from {csv_file_cnt} CSV Files")
76
77    def _process_csv(self, csv_chunk_filename):
78        if self.callback_with == CallbackWith.CSV_FILENAME:
79            result = self.process_fn(csv_chunk_filename)
80            with open(csv_chunk_filename) as f:
81                # Get total lines and subtract for header:
82                count = sum(1 for line in f) - 1
83        elif self.callback_with == CallbackWith.DATAFRAME:
84            df = pd.read_csv(csv_chunk_filename, skipinitialspace=True, index_col=None)
85            count = df.shape[0]
86            result = self.process_fn(df)
87        elif self.callback_with == CallbackWith.DATAFRAME_ROW:
88            df = pd.read_csv(csv_chunk_filename, skipinitialspace=True, index_col=None)
89            count = df.shape[0]
90            result = df.apply(self.process_fn, axis=1)
91
92        return result, count
class CallbackWith(enum.StrEnum):
11class CallbackWith(StrEnum):
12    """
13    CallbackWith Enum, used to control what is passed to callback function
14    """
15    # Pass the chunked CSV filename:
16    CSV_FILENAME = auto()
17
18    # Pass a Dataframe created from the chunked CSV file:
19    DATAFRAME = auto()
20
21    # First creates a DataFrame, then calls `apply()` with the callback
22    #   function. This causes the callback function to be called with
23    #   each row as a `pd.Series` object:
24    DATAFRAME_ROW = auto()

CallbackWith Enum, used to control what is passed to callback function

CSV_FILENAME = <CallbackWith.CSV_FILENAME: 'csv_filename'>
DATAFRAME = <CallbackWith.DATAFRAME: 'dataframe'>
DATAFRAME_ROW = <CallbackWith.DATAFRAME_ROW: 'dataframe_row'>
Inherited Members
enum.Enum
name
value
builtins.str
encode
replace
split
rsplit
join
capitalize
casefold
title
center
count
expandtabs
find
partition
index
ljust
lower
lstrip
rfind
rindex
rjust
rstrip
rpartition
splitlines
strip
swapcase
translate
upper
startswith
endswith
removeprefix
removesuffix
isascii
islower
isupper
istitle
isspace
isdecimal
isdigit
isnumeric
isalpha
isalnum
isidentifier
isprintable
zfill
format
format_map
maketrans
class CSVPooler:
25class CSVPooler:
26    def __init__(
27        self,
28        csv_filename: str,
29        process_fn: Callable,
30        callback_with: CallbackWith = CallbackWith.CSV_FILENAME,
31        pool_size: int = 5,
32        chunk_lines: int = 10000,
33    ):
34        """
35        Construct `Pooler` with given `csv_filename`, `process_fn`, `as_dataframe`, `pool_size', 'chunk_size'
36
37        Args:
38            csv_filename (str): Name of CSV file
39            process_fn (Callable): A function that accepts a single argument
40                By default, this is the path to a chunked CSV file
41                If `as_dataframe` is True, then the argument sent is a dataframe of the chunked CSV
42            callback_with (CallbackWith): Controls what is sent to callback function.
43                @see CallbackWith enumeration for details
44                Defaults to CallbackWith.CSV_FILENAME.
45            as_dataframe_rows (bool): When True, a dataframe is created as with as_dataframe.
46                that is sent to `process_fn` instead.  Defaults to False.
47            pool_size (int, optional): Number of workers to uses. Defaults to 8.
48            chunk_lines (int, optional): Target row count for each chunked CSV. Last chunk may
49                have fewer rows.  Defaults to 10000.
50        """
51        self.csv_filename = csv_filename
52        self.process_fn = process_fn
53        self.callback_with = callback_with
54        self.pool_size = pool_size
55        self.chunk_lines = chunk_lines
56
57    def process(self):
58        """
59        Processes `self.csv_filename` by using `CSVSplitter` to split it
60            into multiple temporary files defined by `self.chunk_lines`.
61        Use `multiprocessing.Pool` to use multiple process workers to process
62        the group of CSVs.
63        """
64        processed_count = 0
65        csv_splitter = CSVSplitter(self.csv_filename, self.chunk_lines)
66        try:
67            csv_file_cnt = len(csv_splitter.csv_files())
68            logging.info(f"Pooling against {csv_file_cnt} files")
69            with Pool(self.pool_size) as p:
70                for result, count in p.imap(self._process_csv, csv_splitter.csv_files()):
71                    yield(result)
72                    processed_count += count
73        finally:
74            csv_splitter.cleanup()
75
76        logging.info(f"Processed {processed_count} rows from {csv_file_cnt} CSV Files")
77
78    def _process_csv(self, csv_chunk_filename):
79        if self.callback_with == CallbackWith.CSV_FILENAME:
80            result = self.process_fn(csv_chunk_filename)
81            with open(csv_chunk_filename) as f:
82                # Get total lines and subtract for header:
83                count = sum(1 for line in f) - 1
84        elif self.callback_with == CallbackWith.DATAFRAME:
85            df = pd.read_csv(csv_chunk_filename, skipinitialspace=True, index_col=None)
86            count = df.shape[0]
87            result = self.process_fn(df)
88        elif self.callback_with == CallbackWith.DATAFRAME_ROW:
89            df = pd.read_csv(csv_chunk_filename, skipinitialspace=True, index_col=None)
90            count = df.shape[0]
91            result = df.apply(self.process_fn, axis=1)
92
93        return result, count
CSVPooler( csv_filename: str, process_fn: Callable, callback_with: CallbackWith = <CallbackWith.CSV_FILENAME: 'csv_filename'>, pool_size: int = 5, chunk_lines: int = 10000)
26    def __init__(
27        self,
28        csv_filename: str,
29        process_fn: Callable,
30        callback_with: CallbackWith = CallbackWith.CSV_FILENAME,
31        pool_size: int = 5,
32        chunk_lines: int = 10000,
33    ):
34        """
35        Construct `Pooler` with given `csv_filename`, `process_fn`, `as_dataframe`, `pool_size', 'chunk_size'
36
37        Args:
38            csv_filename (str): Name of CSV file
39            process_fn (Callable): A function that accepts a single argument
40                By default, this is the path to a chunked CSV file
41                If `as_dataframe` is True, then the argument sent is a dataframe of the chunked CSV
42            callback_with (CallbackWith): Controls what is sent to callback function.
43                @see CallbackWith enumeration for details
44                Defaults to CallbackWith.CSV_FILENAME.
45            as_dataframe_rows (bool): When True, a dataframe is created as with as_dataframe.
46                that is sent to `process_fn` instead.  Defaults to False.
47            pool_size (int, optional): Number of workers to uses. Defaults to 8.
48            chunk_lines (int, optional): Target row count for each chunked CSV. Last chunk may
49                have fewer rows.  Defaults to 10000.
50        """
51        self.csv_filename = csv_filename
52        self.process_fn = process_fn
53        self.callback_with = callback_with
54        self.pool_size = pool_size
55        self.chunk_lines = chunk_lines

Construct Pooler with given csv_filename, process_fn, as_dataframe, `pool_size', 'chunk_size'

Args: csv_filename (str): Name of CSV file process_fn (Callable): A function that accepts a single argument By default, this is the path to a chunked CSV file If as_dataframe is True, then the argument sent is a dataframe of the chunked CSV callback_with (CallbackWith): Controls what is sent to callback function. @see CallbackWith enumeration for details Defaults to CallbackWith.CSV_FILENAME. as_dataframe_rows (bool): When True, a dataframe is created as with as_dataframe. that is sent to process_fn instead. Defaults to False. pool_size (int, optional): Number of workers to uses. Defaults to 8. chunk_lines (int, optional): Target row count for each chunked CSV. Last chunk may have fewer rows. Defaults to 10000.

csv_filename
process_fn
callback_with
pool_size
chunk_lines
def process(self):
57    def process(self):
58        """
59        Processes `self.csv_filename` by using `CSVSplitter` to split it
60            into multiple temporary files defined by `self.chunk_lines`.
61        Use `multiprocessing.Pool` to use multiple process workers to process
62        the group of CSVs.
63        """
64        processed_count = 0
65        csv_splitter = CSVSplitter(self.csv_filename, self.chunk_lines)
66        try:
67            csv_file_cnt = len(csv_splitter.csv_files())
68            logging.info(f"Pooling against {csv_file_cnt} files")
69            with Pool(self.pool_size) as p:
70                for result, count in p.imap(self._process_csv, csv_splitter.csv_files()):
71                    yield(result)
72                    processed_count += count
73        finally:
74            csv_splitter.cleanup()
75
76        logging.info(f"Processed {processed_count} rows from {csv_file_cnt} CSV Files")

Processes self.csv_filename by using CSVSplitter to split it into multiple temporary files defined by self.chunk_lines. Use multiprocessing.Pool to use multiple process workers to process the group of CSVs.