Working With Fixed Record Length Files
I'm often called upon to work with data files that contain records composed of CSV data, where all of the records in the file have one fixed record length. Here's a small class I wrote to make the handling of fixed record length files easy. It does not handle the parsing of CSV data itself; for that, use my CSV Adaptor from my previous article.
Put this code in a file named recfile.py: """This file contains the RecordFile class, for working with fixed length record files.""" __author__ = "Mike Kent" __version__ = "$Id$".split()[-2:][0] class RecordFileOpenError(Exception): pass class RecordFileReadError(Exception): pass class RecordFileTruncError(Exception): pass class RecordFileWriteError(Exception): pass class RecordFile: """This class provides a standard way to handle files which are layed out as fixed-length records, where each record is padded to its proper length with a padding character, and may be optionally terminated with a record terminator string.""" def __init__(self, filename, mode, reclen, recpad=' ', recterm=None): """The default record padding string is a single space. The record terminator defaults to None.""" try: self._file = file(filename, mode) except IOError: raise RecordFileOpenError self.recLen = reclen self.recPad = recpad self.recTerm = recterm self.lenRecTerm = recterm and len(recterm) or 0 return def close(self): if hasattr(self, "_file"): self._file.close() return __del__ = close def flush(self): self._file.flush() return def read(self, recNum): """Read a record by number, and return a string. Record numbers start a 1. The resulting string will have any record terminator or padding specified on class initialization stripped. An empty string will be returned on EOF.""" if recNum < 1: raise RecordFileReadError try: self._file.seek((recNum - 1) * self.recLen) rec = self._file.read(self.recLen) except IOError: raise RecordFileReadError lenRec = len(rec) # If we got a record... if lenRec > 0: # If what we read was too short, or it's supposed to have a record # terminator, but it's not there... if(lenRec < self.recLen or (self.lenRecTerm and not rec.endswith(self.recTerm))): raise RecordFileReadError # If it is supposed to have a record terminator, and it does, # strip it. if self.lenRecTerm and rec.endswith(self.recTerm): rec = rec[:-self.lenRecTerm] # If there is padding present, strip it. if len(self.recPad): rec = rec.rstrip(self.recPad) return rec def write(self, recNum, data): """Write a string to a record by record number. Record numbers start with 1. The record will be padded to the correct length using the padding character, and optionally terminated by the record terminator string. You can seek to, and write, records beyond EOF. However, to append a new record to the current actual EOF, give a record number of 0. This function returns the actual record number written to.""" newRecNum = recNum lenData = len(data) # Calculate the amount of padding needed. paddingNeeded = self.recLen - (lenData + self.lenRecTerm) # If that amount is negative, the record data is too long to fit. if paddingNeeded < 0: raise RecordFileTruncError # If padding is needed, append it to the record data. if paddingNeeded > 0: data += self.recPad * paddingNeeded # If a record terminator is wanted, append it to the record data. if self.lenRecTerm: data += self.recTerm # If the record number is zero, we want to seek to the current # end of file... if recNum == 0: offset = 0 whence = 2 # Seek relative to the end # Else we want to seek to the beginning of the specified record. else: offset = (recNum - 1) * self.recLen whence = 0 # Seek relative to the beginning try: self._file.seek(offset, whence) # If we are writing to the current end of file, # calculate what that record number is. if recNum == 0: newRecNum = (self._file.tell() / self.recLen) + 1 self._file.write(data) except IOError: raise RecordFileWriteError # Return the actual record number written to. return newRecNum
Although you might not think so from my previous posts to this weblog, I'm a firm believer in unit testing, so here are the tests for the above code.
Put this code in a file named test_recfile.py: #! /usr/bin/env python import sys import unittest import recfile class TestCases_01_RecordFile(unittest.TestCase): def test_01_instantiate(self): recFileObj = recfile.RecordFile("test.txt", "w+b", 20, ' ', 'r\n') self.assert_(recFileObj is not None) return class TestCases_02_RecordFileWriteAdd(unittest.TestCase): def test_01_writeAddOne(self): recFileObj = recfile.RecordFile("test.txt", "w+b", 20, ' ', 'r\n') self.assert_(recFileObj is not None) rec = "this is a test" recFileObj.write(0, rec) recFileObj.flush() testFileObj = file("test.txt", "rb") expected = "this is a test \r\n" newRec = testFileObj.read(20) self.assertEqual(newRec, expected) return def test_02_writeAddSeveral(self): recFileObj = recfile.RecordFile("test.txt", "w+b", 20, ' ', 'r\n') self.assert_(recFileObj is not None) for count in range(5): rec = "Record %d" % (count + 1) recFileObj.write(0, rec) recFileObj.flush() testFileObj = file("test.txt", "rb") for count in range(5): expected = "Record %d \r\n" % (count + 1) newRec = testFileObj.read(20) self.assertEqual(newRec, expected) return class TestCases_03_RecordFileWriteRandom(unittest.TestCase): def setUp(self): recFileObj = recfile.RecordFile("test.txt", "w+b", 20, ' ', 'r\n') self.assert_(recFileObj is not None) for count in range(5): rec = "Record %d" % (count + 1) recFileObj.write(0, rec) recFileObj.close() return def test_01_writeRandomOne(self): recFileObj = recfile.RecordFile("test.txt", "r+b", 20, ' ', 'r\n') self.assert_(recFileObj is not None) rec = "this is a test" recFileObj.write(2, rec) recFileObj.flush() testFileObj = file("test.txt", "rb") expected = "this is a test \r\n" testFileObj.seek(20) newRec = testFileObj.read(20) self.assertEqual(newRec, expected) return def test_02_writeRandomSeveral(self): recFileObj = recfile.RecordFile("test.txt", "r+b", 20, ' ', 'r\n') self.assert_(recFileObj is not None) recNumList = [ 1, 5, 3, 2, 4 ] for recNum in recNumList: rec = "New record %d" % recNum recFileObj.write(recNum, rec) recFileObj.flush() testFileObj = file("test.txt", "rb") for count in range(5): expected = "New record %d \r\n" % recNumList[count] testFileObj.seek((recNumList[count] - 1) * 20) newRec = testFileObj.read(20) self.assertEqual(newRec, expected) return class TestCases_04_RecordFileReadRandom(unittest.TestCase): def setUp(self): recFileObj = recfile.RecordFile("test.txt", "w+b", 20, ' ', 'r\n') self.assert_(recFileObj is not None) for count in range(5): rec = "Record %d" % (count + 1) recFileObj.write(0, rec) recFileObj.close() return def test_01_readRandomOne(self): recFileObj = recfile.RecordFile("test.txt", "r+b", 20, ' ', 'r\n') rec = recFileObj.read(3) expected = "Record 3" self.assertEqual(rec, expected) return def test_02_readRandomSeveral(self): recFileObj = recfile.RecordFile("test.txt", "r+b", 20, ' ', 'r\n') recNumList = [ 1, 5, 3, 1, 2, 2, 4 ] for recNum in recNumList: rec = recFileObj.read(recNum) expected = "Record %d" % recNum self.assertEqual(rec, expected) return if __name__ == "__main__": unittest.main() sys.exit(0)
4:47:19 PM
|