Trivial Thoughts
Thoughts and discussion on programming projects using the Python language.


Python Sites of Note
Software Development



Recent Posts
 9/23/03
 9/22/03
 9/12/03
 9/11/03
 8/21/03
 7/21/03
 7/17/03
 7/10/03
 7/7/03
 7/1/03
 6/26/03
 6/25/03
 6/18/03
 6/15/03
 6/2/03
 5/28/03


Subscribe to "Trivial Thoughts" in Radio UserLand.

Click to see the XML version of this web page.

Click here to send an email to the editor of this weblog.
 

 

Monday, September 22, 2003
 

Working With Fixed Record Length Files

I'm often called upon to work with data files that contain records composed of CSV data, where all of the records in the file have one fixed record length.  Here's a small class I wrote to make the handling of fixed record length files easy.  It does not handle the parsing of CSV data itself;  for that, use my CSV Adaptor from my previous article.

Put this code in a file named recfile.py:

"""This file contains the RecordFile class, for working with fixed length
record files."""
__author__ = "Mike Kent"
__version__ = "$Id$".split()[-2:][0]
class RecordFileOpenError(Exception): pass
class RecordFileReadError(Exception): pass
class RecordFileTruncError(Exception): pass
class RecordFileWriteError(Exception): pass
class RecordFile:
    """This class provides a standard way to handle files which are layed out
    as fixed-length records, where each record is padded to its proper length
    with a padding character, and may be optionally terminated with a record
    terminator string."""
    def __init__(self, filename, mode, reclen, recpad=' ', recterm=None):
        """The default record padding string is a single space.
        The record terminator defaults to None."""
        try:
            self._file = file(filename, mode)
        except IOError:
            raise RecordFileOpenError
        self.recLen = reclen
        self.recPad = recpad
        self.recTerm = recterm
        self.lenRecTerm = recterm and len(recterm) or 0
        return
    def close(self):
        if hasattr(self, "_file"):
            self._file.close()
        return
    __del__ = close
   
    def flush(self):
        self._file.flush()
        return
    def read(self, recNum):
        """Read a record by number, and return a string.  Record numbers
        start a 1.  The resulting string will have any record terminator or
        padding specified on class initialization stripped.  An empty string
        will be returned on EOF."""
        if recNum < 1:
            raise RecordFileReadError
       
        try:
            self._file.seek((recNum - 1) * self.recLen)
            rec = self._file.read(self.recLen)
        except IOError:
            raise RecordFileReadError
        lenRec = len(rec)
        # If we got a record...
        if lenRec > 0:
            # If what we read was too short, or it's supposed to have a record
            # terminator, but it's not there...
            if(lenRec < self.recLen or
                (self.lenRecTerm and not rec.endswith(self.recTerm))):
                raise RecordFileReadError
            # If it is supposed to have a record terminator, and it does,
            # strip it.
            if self.lenRecTerm and rec.endswith(self.recTerm):
                rec = rec[:-self.lenRecTerm]
            # If there is padding present, strip it.
            if len(self.recPad):
                rec = rec.rstrip(self.recPad)
        return rec
       
    def write(self, recNum, data):
        """Write a string to a record by record number.  Record
        numbers start with 1.  The record will be
        padded to the correct length using the padding character, and
        optionally terminated by the record terminator string.
        You can seek to, and write, records beyond EOF.  However, to
        append a new record to the current actual EOF, give a record number of 0.
        This function returns the actual record number written to."""
        newRecNum = recNum
        lenData = len(data)
        # Calculate the amount of padding needed.
        paddingNeeded = self.recLen - (lenData + self.lenRecTerm)
        # If that amount is negative, the record data is too long to fit.
        if paddingNeeded < 0:
            raise RecordFileTruncError
        # If padding is needed, append it to the record data.
        if paddingNeeded > 0:
            data += self.recPad * paddingNeeded
        # If a record terminator is wanted, append it to the record data.
        if self.lenRecTerm:
            data += self.recTerm
        # If the record number is zero, we want to seek to the current
        # end of file...
        if recNum == 0:
            offset = 0
            whence = 2  # Seek relative to the end
        # Else we want to seek to the beginning of the specified record.
        else:
            offset = (recNum - 1) * self.recLen
            whence = 0  # Seek relative to the beginning
        try:
            self._file.seek(offset, whence)
            # If we are writing to the current end of file,
            # calculate what that record number is.
            if recNum == 0:
                newRecNum = (self._file.tell() / self.recLen) + 1
            self._file.write(data)
        except IOError:
            raise RecordFileWriteError
        # Return the actual record number written to.
        return newRecNum

Although you might not think so from my previous posts to this weblog, I'm a firm believer in unit testing, so here are the tests for the above code.

Put this code in a file named test_recfile.py:

#! /usr/bin/env python
import sys
import unittest
import recfile
class TestCases_01_RecordFile(unittest.TestCase):
    def test_01_instantiate(self):
        recFileObj = recfile.RecordFile("test.txt", "w+b", 20, ' ', 'r\n')
        self.assert_(recFileObj is not None)
        return
class TestCases_02_RecordFileWriteAdd(unittest.TestCase):
    def test_01_writeAddOne(self):
        recFileObj = recfile.RecordFile("test.txt", "w+b", 20, ' ', 'r\n')
        self.assert_(recFileObj is not None)
        rec = "this is a test"
        recFileObj.write(0, rec)
        recFileObj.flush()
        testFileObj = file("test.txt", "rb")
        expected = "this is a test    \r\n"
        newRec = testFileObj.read(20)
        self.assertEqual(newRec, expected)
        return
       
    def test_02_writeAddSeveral(self):
        recFileObj = recfile.RecordFile("test.txt", "w+b", 20, ' ', 'r\n')
        self.assert_(recFileObj is not None)
        for count in range(5):        
            rec = "Record %d" % (count + 1)
            recFileObj.write(0, rec)
           
        recFileObj.flush()
        testFileObj = file("test.txt", "rb")
        for count in range(5):
            expected = "Record %d          \r\n" % (count + 1)
            newRec = testFileObj.read(20)
            self.assertEqual(newRec, expected)
        return
class TestCases_03_RecordFileWriteRandom(unittest.TestCase):
    def setUp(self):
        recFileObj = recfile.RecordFile("test.txt", "w+b", 20, ' ', 'r\n')
        self.assert_(recFileObj is not None)
        for count in range(5):        
            rec = "Record %d" % (count + 1)
            recFileObj.write(0, rec)
        recFileObj.close()
        return
    def test_01_writeRandomOne(self):
        recFileObj = recfile.RecordFile("test.txt", "r+b", 20, ' ', 'r\n')
        self.assert_(recFileObj is not None)
        rec = "this is a test"
        recFileObj.write(2, rec)
        recFileObj.flush()
        testFileObj = file("test.txt", "rb")
        expected = "this is a test    \r\n"
        testFileObj.seek(20)
        newRec = testFileObj.read(20)
        self.assertEqual(newRec, expected)
        return
    def test_02_writeRandomSeveral(self):
        recFileObj = recfile.RecordFile("test.txt", "r+b", 20, ' ', 'r\n')
        self.assert_(recFileObj is not None)
        recNumList = [ 1, 5, 3, 2, 4 ]
        for recNum in recNumList:
            rec = "New record %d" % recNum
            recFileObj.write(recNum, rec)
       
        recFileObj.flush()
        testFileObj = file("test.txt", "rb")
        for count in range(5):        
            expected = "New record %d      \r\n" % recNumList[count]
            testFileObj.seek((recNumList[count] - 1) * 20)
            newRec = testFileObj.read(20)
            self.assertEqual(newRec, expected)
        return
class TestCases_04_RecordFileReadRandom(unittest.TestCase):
    def setUp(self):
        recFileObj = recfile.RecordFile("test.txt", "w+b", 20, ' ', 'r\n')
        self.assert_(recFileObj is not None)
        for count in range(5):        
            rec = "Record %d" % (count + 1)
            recFileObj.write(0, rec)
        recFileObj.close()
        return
    def test_01_readRandomOne(self):
        recFileObj = recfile.RecordFile("test.txt", "r+b", 20, ' ', 'r\n')
        rec = recFileObj.read(3)
        expected = "Record 3"
        self.assertEqual(rec, expected)
        return
    def test_02_readRandomSeveral(self):
        recFileObj = recfile.RecordFile("test.txt", "r+b", 20, ' ', 'r\n')
        recNumList = [ 1, 5, 3, 1, 2, 2, 4 ]
        for recNum in recNumList:        
            rec = recFileObj.read(recNum)
            expected = "Record %d" % recNum
            self.assertEqual(rec, expected)
        return

if __name__ == "__main__":
    unittest.main()
    sys.exit(0)
   

4:47:19 PM  comment []    


Click here to visit the Radio UserLand website. © Copyright 2003 Michael Kent.
Last update: 9/23/2003; 1:07:04 PM.
This theme is based on the SoundWaves (blue) Manila theme.
September 2003
Sun Mon Tue Wed Thu Fri Sat
  1 2 3 4 5 6
7 8 9 10 11 12 13
14 15 16 17 18 19 20
21 22 23 24 25 26 27
28 29 30        
Aug   Oct

Previous/Next