csv_batcher.csv_splitter

 1import tempfile
 2from typing import Sequence
 3from csv_batcher.utils.logger import logging
 4
 5class CSVSplitter:
 6    """Splits a CSV file into multiple files"""
 7    def __init__(self, csv_filename:str, chunk_line_cnt:int = 10000):
 8        """
 9        Construct CSVSplitter
10
11        Args:
12            csv_filename (str): path to CSV file
13            chunk_line_cnt (int, optional): Target lines for each chunk. Last chunk might
14            be smaller than this.  Defaults to 10000.
15        """
16        self.csv_filename = csv_filename
17        self.chunk_line_cnt = chunk_line_cnt  # lines
18        self.chunk_dir =  tempfile.TemporaryDirectory()
19        self.chunk_files = []
20        self._split()
21
22    def _split(self):
23        """ Does the actual splitting into multiple files defined by `self.chunk_line_cnt`. """
24        self.chunk_files = []
25        with open(self.csv_filename, 'r') as f:
26            count = 0
27            header = f.readline()
28            lines = []
29            for line in f:
30                count += 1
31                lines.append(line)
32                if count % self.chunk_line_cnt == 0:
33                    self._write_chunk(header, count // self.chunk_line_cnt, lines)
34                    lines = []
35
36            # write remainder
37            if len(lines) > 0:
38                self._write_chunk(header, (count // self.chunk_line_cnt) + 1, lines)
39        logging.info(f"Split ({self.csv_filename}) into {len(self.chunk_files)}")
40
41    def _write_chunk(self, header:str, part:int, lines:Sequence):
42        chunk_filename = f"{self.chunk_dir.name}/data_part_{str(part)}.csv"
43        with open(chunk_filename, 'w') as f_out:
44            f_out.write(header)
45            f_out.writelines(lines)
46            self.chunk_files.append(chunk_filename)
47
48    def csv_files(self):
49        """ Returns `self.chunk_files` """
50        return self.chunk_files
51
52    def cleanup(self):
53        """
54        Remove temporary directory for chunk files; this must be called
55        and should be called in a `finally` block
56        """
57        self.chunk_dir.cleanup()
class CSVSplitter:
 6class CSVSplitter:
 7    """Splits a CSV file into multiple files"""
 8    def __init__(self, csv_filename:str, chunk_line_cnt:int = 10000):
 9        """
10        Construct CSVSplitter
11
12        Args:
13            csv_filename (str): path to CSV file
14            chunk_line_cnt (int, optional): Target lines for each chunk. Last chunk might
15            be smaller than this.  Defaults to 10000.
16        """
17        self.csv_filename = csv_filename
18        self.chunk_line_cnt = chunk_line_cnt  # lines
19        self.chunk_dir =  tempfile.TemporaryDirectory()
20        self.chunk_files = []
21        self._split()
22
23    def _split(self):
24        """ Does the actual splitting into multiple files defined by `self.chunk_line_cnt`. """
25        self.chunk_files = []
26        with open(self.csv_filename, 'r') as f:
27            count = 0
28            header = f.readline()
29            lines = []
30            for line in f:
31                count += 1
32                lines.append(line)
33                if count % self.chunk_line_cnt == 0:
34                    self._write_chunk(header, count // self.chunk_line_cnt, lines)
35                    lines = []
36
37            # write remainder
38            if len(lines) > 0:
39                self._write_chunk(header, (count // self.chunk_line_cnt) + 1, lines)
40        logging.info(f"Split ({self.csv_filename}) into {len(self.chunk_files)}")
41
42    def _write_chunk(self, header:str, part:int, lines:Sequence):
43        chunk_filename = f"{self.chunk_dir.name}/data_part_{str(part)}.csv"
44        with open(chunk_filename, 'w') as f_out:
45            f_out.write(header)
46            f_out.writelines(lines)
47            self.chunk_files.append(chunk_filename)
48
49    def csv_files(self):
50        """ Returns `self.chunk_files` """
51        return self.chunk_files
52
53    def cleanup(self):
54        """
55        Remove temporary directory for chunk files; this must be called
56        and should be called in a `finally` block
57        """
58        self.chunk_dir.cleanup()

Splits a CSV file into multiple files

CSVSplitter(csv_filename: str, chunk_line_cnt: int = 10000)
 8    def __init__(self, csv_filename:str, chunk_line_cnt:int = 10000):
 9        """
10        Construct CSVSplitter
11
12        Args:
13            csv_filename (str): path to CSV file
14            chunk_line_cnt (int, optional): Target lines for each chunk. Last chunk might
15            be smaller than this.  Defaults to 10000.
16        """
17        self.csv_filename = csv_filename
18        self.chunk_line_cnt = chunk_line_cnt  # lines
19        self.chunk_dir =  tempfile.TemporaryDirectory()
20        self.chunk_files = []
21        self._split()

Construct CSVSplitter

Args: csv_filename (str): path to CSV file chunk_line_cnt (int, optional): Target lines for each chunk. Last chunk might be smaller than this. Defaults to 10000.

csv_filename
chunk_line_cnt
chunk_dir
chunk_files
def csv_files(self):
49    def csv_files(self):
50        """ Returns `self.chunk_files` """
51        return self.chunk_files

Returns self.chunk_files

def cleanup(self):
53    def cleanup(self):
54        """
55        Remove temporary directory for chunk files; this must be called
56        and should be called in a `finally` block
57        """
58        self.chunk_dir.cleanup()

Remove temporary directory for chunk files; this must be called and should be called in a finally block