root/dbf/base.py

Revision 3:493d652a7e84, 8.6 KB (checked in by tyrion-mx, 4 years ago)

Added dbf.sorteddict, django is no more needed to use pyDBF.

Line 
1"""
2This is a doc test for the lowlevel API.
3You should use the dbf.models module instead of this, it's simpler, more
4object-oriented and tries to be similar to the django's model api.
5
6Here's the documentation/testsuite.
7
8First, import the needed modules:
9
10  >>> from StringIO import StringIO
11  >>> from dbf import base, fields
12
13Then, let's define our fields.
14
15  >>> myfields = dict(
16  ...     username = fields.CharField(max_length=100),
17  ...     is_admin = fields.BooleanField(),
18  ...     last_login = fields.DateField(),
19  ... )
20
21Now, we can instanciate the DBF class of this module. The first argument can be
22either a file name or a stream (an object that must behave like a file object).
23For this example we use the StringIO module that we imported above, so we have
24not to create a real file.
25
26  >>> usersfile = StringIO() # a fake dbf file
27  >>> db = base.DBF(usersfile, myfields)
28
29When trying to select the first record in our db, we get ... a KeyError since
30the db is empty.
31
32  >>> db.select(0)
33  Traceback (most recent call last):
34  ...
35  KeyError: 0
36
37To solve this "big" problem, we can add a record. A record object is a simple
38dict. The keys of our dict have to be the same of the fields we already defined.
39
40  >>> user = dict(username='tyrion', is_admin=True, last_login=None)
41  >>> db.insert(user)
42
43The pk key has been added to our record, this is a virtual field, and it is not
44written to the database.
45
46  >>> user['pk']
47  0
48
49Now let's try to select our first record again:
50
51  >>> tyrion = db.select(0)
52  >>> tyrion['username'] == user['username']
53  True
54  >>> tyrion['is_admin']
55  True
56
57Hooray, it worked and returned a dict that is equal to our "user" one.
58
59  >>> tyrion == user
60  True
61
62We left our "last_login" field empty, we got no errors because we not specified
63"null=False" in our fieldspecs dict. Let's fill this field with a random data.
64
65  >>> import datetime
66  >>> tyrion['last_login'] = datetime.date(2008, 1, 30) # a random date
67  >>> db.update(tyrion)
68
69Now we select only the "last_login" field of the first record.
70
71  >>> db.select(0, ['last_login'])
72  {'pk': 0, 'last_login': datetime.date(2008, 1, 30)}
73
74Then, just to not leave our first user alone, we create a new one.
75
76  >>> newuser = dict(username='dummy', is_admin=False, last_login=None)
77  >>> db.insert(newuser)
78
79Our DBF instance supports the "in" keyword and the len builtin function.
80  >>> newuser['pk'] in db
81  True
82  >>> len(db)
83  2
84
85As well as iteration over the records:
86
87  >>> for record in db:
88  ...     print record['username']
89  tyrion
90  dummy
91
92We can also iterate only over the required fields:
93
94  >>> for record in db.select(fields=['username']):
95  ...     print record['username']
96  tyrion
97  dummy
98"""
99
100import os
101import datetime
102import struct
103
104from dbf import fields
105from dbf.sorteddict import SortedDict
106
107class DBF(object):
108
109    version = 3
110    header_fmt = '<BBBBLHH20x'
111    fields_fmt = '<11sc4xBB14x'
112   
113    def __init__(self, db, fieldspecs=None):
114        if isinstance(db, basestring):
115            if not os.path.exists(db):
116                os.mknod(db)
117            db = open(db, 'r+b')
118        self.db = db
119        self.fields = fieldspecs
120
121        # try to read the header infos from the db
122        header = db.read(32)
123        if header:
124            header = struct.unpack(self.header_fmt, header)
125
126        # if the user passed fieldspecs
127        if self.fields:
128            # obtain dbf meta data from them
129            self.numfields = len(self.fields)
130            self.lenheader = self.numfields * 32 + 33
131           
132            self.lenrecord = 1
133            self.record_fmt = '1s'
134            for field in self.fields.itervalues():
135                self.lenrecord += field.size
136                self.record_fmt += '%ds' % field.size
137
138            # if we have an header
139            if header:
140                # check the header's infos with the ones we have obtained from
141                # our fieldspecs
142                self.numrec, lenheader, lenrecord = header[-3:]
143                assert (lenheader == self.lenheader and
144                        lenrecord == self.lenrecord), \
145                        "database's fields doesn't match provided fields"
146            # if no header is present, write it
147            else:
148                self.numrec = 0
149
150                # header
151                now = datetime.datetime.now()
152                y, m, d = now.year-1900, now.month, now.day
153                header = struct.pack(self.header_fmt, self.version, y, m, d,
154                                     self.numrec, self.lenheader, self.lenrecord)
155                self.db.write(header)
156
157                # field specs
158                for fname, field in self.fields.iteritems():
159                    fname = fname.ljust(11, '\0')
160                    field = struct.pack(self.fields_fmt, fname, field.type,
161                                        field.size, field.deci)
162                    self.db.write(field)
163                self.db.write('\r\x1A')
164        else:
165            # if we have no fieldspecs, but we have an header in our dbf,
166            # obtain the fieldspecs from it.
167            if header:
168                self.numrec, self.lenheader, self.lenrecord = header[-3:]
169                self.numfields = (self.lenheader - 33) // 32
170                self.fields = SortedDict()
171                self.record_fmt = '1s'
172                for fieldno in xrange(self.numfields):
173                    fieldinfo = struct.unpack(self.fields_fmt, db.read(32))
174                    name, type, size, deci = fieldinfo
175                    name = name.partition('\0')[0]
176                    self.fields[name] = fields.guessField(type, size, deci)
177                    self.record_fmt += '%ds' % size
178
179            else:
180                # if we have no header and no fieldspecs, we can't help it ...
181                raise TypeError("nor fields or header present")
182
183        i = 0
184        self._fieldpos = []
185        for field in self.fields.itervalues():
186            self._fieldpos.append(i)
187            i += field.size
188
189    def gotoField(self, fname):
190        i = self.fields.keyOrder.index(fname)
191        self.db.seek(self._currec + 1 + self._fieldpos[i])
192
193    def newID(self):
194        """
195        return a new record ID.
196        """
197        self.db.seek(4)
198        self.numrec = struct.unpack('<L', self.db.read(4))[0]
199        return self.numrec
200
201    def increase_numrec(self):
202        self.numrec += 1
203        self.db.seek(4)
204        self.db.write(struct.pack('<L', self.numrec))
205
206    def gotoRecord(self, recIndex):
207        """
208        move before the record specified by the index recIndex
209        """
210        if recIndex > self.numrec:
211            raise KeyError(recIndex)
212        self._currec = self.lenheader + self.lenrecord * (recIndex)
213        self.db.seek(self._currec)
214
215    def update(self, record):
216        recId = record['pk']
217        self.gotoRecord(recId)
218
219        dflag = self.db.read(1)
220
221        for fname, field in self.fields.iteritems():
222            if fname in record:
223                self.db.write(field.encode(record[fname]))
224                self.db.flush()
225            else:
226                self.db.seek(field.size, 1)
227        self.db.flush()
228
229    def insert(self, record):
230        recId = self.newID()
231        self.gotoRecord(recId)
232        record['pk'] = recId
233
234        self.db.write(' ')
235        data = ''
236        for fname, field in self.fields.iteritems():
237            data += field.encode(record[fname])
238        self.db.write(data)
239       
240        self.db.write('\x1A')
241        self.increase_numrec()
242
243        self.db.flush()
244
245    def _iterselect(self, fields=None):
246        for recId in xrange(self.numrec):
247            yield self.select(recId, fields)
248 
249    def select(self, recId=None, fields=None):
250        if recId is None:
251            return self._iterselect(fields)
252
253        if not recId in self:
254            raise KeyError(recId)
255       
256        if not fields:
257            fields = self.fields.keys()
258
259        self.gotoRecord(recId)
260        res = {'pk': recId}
261
262        self.db.read(1)
263        for fname, field in self.fields.iteritems():
264            if fname in fields:
265                res[fname] = field.decode(self.db.read(field.size))
266            else:
267                self.db.seek(field.size, 1)
268        return res
269
270    def close(self):
271        self.db.close()
272
273    def __contains__(self, recId):
274        if isinstance(recId, int) and recId < self.numrec:
275            return True
276        return False
277
278    def __iter__(self):
279        for i in xrange(self.numrec):
280            yield self.select(i)
281
282    def __len__(self):
283        return self.numrec
284
285    def __getitem__(self, recordID):
286        return self.select(recordID)
287
288    def __setitem__(self, recordID, dict):
289        self.select(recordID, dict)
290
291def _test():
292    import doctest
293    doctest.testmod()
294
295if __name__ == '__main__':
296    _test()
Note: See TracBrowser for help on using the browser.