root/dbf/models.py

Revision 3:493d652a7e84, 7.5 KB (checked in by tyrion-mx, 4 years ago)

Added dbf.sorteddict, django is no more needed to use pyDBF.

Line 
1"""
2High Level interface to DBF files.
3
4
5>>> import os
6>>> import datetime
7>>> from dbf import models, fields
8
9>>> class Person(models.Model):
10...
11...    class Meta:
12...        #dbname = 'persons.dbf'
13...        lazy = True
14...
15...    first_name = fields.CharField(max_length=100, null=False)
16...    last_name = fields.CharField(max_length=100, null=True)
17...    email = fields.CharField(max_length=100, null=True)
18...    birth_date = fields.DateField()
19...    loves_python = fields.BooleanField()
20...    height = fields.DecimalField(max_length=5, deci=2)
21...
22...    def __unicode__(self):
23...        return self.first_name
24
25>>> Person.objects.get(pk=0)
26Traceback (most recent call last):
27...
28IndexError: list index out of range
29
30>>> person = Person(first_name='Tyrion', last_name='MX', email='ty@nsa.gov')
31>>> person.birth_date = datetime.date(1980, 1, 3) # random date
32>>> person.save()
33
34>>> tyrion = Person.objects.get(pk=0)
35>>> tyrion
36<Person: Tyrion>
37
38>>> assert tyrion.first_name == person.first_name
39>>> assert tyrion.birth_date == person.birth_date
40
41>>> tyrion.loves_python = True
42>>> tyrion.save()
43
44>>> if os.path.exists('Person.dbf'):
45...     os.remove('Person.dbf')
46"""
47import types
48import copy
49
50from dbf.base import DBF
51from dbf.sorteddict import SortedDict
52from dbf import fields
53
54class QuerySet(object):
55
56    defaultParams = dict(
57            #options=None,
58            slice=None,
59            get_only=None,
60            one_result=False
61    )
62
63
64    def __init__(self, Model, params={}):
65        self.Model = Model
66        self.dbf = Model._meta.dbf
67
68        include = params.pop('include', {})
69        exclude = params.pop('exclude', {})
70
71        self.params = self.defaultParams.copy()
72        self.params['options'] = dict()
73        self.params.update(params)
74
75    def _match(self, fname, fdata, **params):
76        if fname == 'pk':
77            field = fields.IntegerField
78        else:
79            field = self.dbf.fields[fname]
80        for type in ('include', 'exclude'):
81            for (method, value) in params[type]:
82                res = field.lookupMethods[method](fdata, value)
83                if (type == 'include' and not res) or \
84                   (type == 'exclude' and res):
85                    return False
86        return True       
87
88    def __iter__(self):
89        options = self.params['options']
90        slice = self.params['slice']
91        getonly = self.params['get_only']
92        i = -1
93        for record in self.dbf.select(fields=options.keys()):
94            match = True
95            for fname, params in options.iteritems():
96                match = self._match(fname, record[fname], include=params[0],
97                                    exclude=params[1])
98                if not match:
99                    break
100            if match:
101                i += 1
102                if slice and (slice[0] > i or \
103                   (slice[1] is not None and slice[1] < i)):
104                    continue
105                if getonly in (i, None):
106                    m = self.Model(record['pk'])
107                    vars(m).update(record)
108                    yield m
109                    if getonly == i:
110                        return
111
112    def prepareOptions(self, options, **params):
113        for i, option in enumerate(('include', 'exclude')):
114            for fname, value in params[option].iteritems():
115                try:
116                    fname, method = fname.split('__')
117                except ValueError:
118                    method = 'exact'
119                try:
120                    if fname == 'pk':
121                        field = fields.IntegerField
122                    else:
123                        field = self.dbf.fields[fname]
124                    if not method in field.lookupMethods:
125                        raise TypeError('inexistent %s method: %s' %
126                                (field.__class__.__name__, method))
127                except KeyError:
128                    raise TypeError('inexistent field: %s' % fname)
129                field = options.setdefault(fname, (list(),list()))
130                field[i].append((method, value))
131
132    def new(self, include={}, exclude={}, **extend):
133        params = copy.deepcopy(self.params)
134        params.update(extend)
135        self.prepareOptions(params['options'], include=include,
136                            exclude=exclude)
137        return QuerySet(self.Model, params)
138
139    def filter(self, **params):
140        return self.new(include=params)
141
142    def exclude(self, **params):
143        return self.new(exclude=params)
144
145    def all(self):
146        return self.new()
147
148    def get(self, **params):
149        res = list(self.new(include=params))
150        if len(res) > 1:
151            raise Exception('get() returned more than one result: %d' % \
152                            len(res))
153        return res[0]
154
155   
156    def __getitem__(self, key):
157        if isinstance(key, int):
158            self.params['get_only'] = key
159            return list(self)[0]
160        elif isinstance(key, slice):
161            return self.new(slice=(key.start or 0, key.stop))
162        else:
163            raise KeyError(key)
164
165    def __repr__(self):
166        return repr(list(self))
167
168class Manager(QuerySet):
169
170    __repr__ = object.__repr__
171    __iter__ = __getitem__ = None
172
173
174def makeModel(name, **meta):
175    meta = type('Meta', (object,), meta)
176    return ModelMeta(name, (Model,), {'Meta': meta})
177
178
179class ModelMeta(type):
180
181    def __new__(cls, name, bases, dict):
182        if bases != (object,):
183
184            dict.pop('pk', None)
185            meta = dict.pop('Meta', None)
186
187            if not meta:
188                meta = type('Meta', (object,), {})
189
190            def setdefault(attr, default):
191                if not hasattr(meta, attr):
192                    setattr(meta, attr, default)
193
194            setdefault('lazy', True)
195            setdefault('dbname', name + '.dbf')
196           
197            if not hasattr(meta, 'fields'):
198                meta.fields = SortedDict()
199                for key, value in dict.iteritems():
200                    if isinstance(value, fields.Field):
201                        meta.fields[key] = dict[key]
202                [dict.pop(field) for field in meta.fields]
203
204            if hasattr(meta, 'stream'):
205                meta.dbf = DBF(meta.stream, meta.fields)
206            else:
207                meta.dbf = DBF(meta.dbname, meta.fields)
208           
209            if dict.get('__unicode__'):
210                repr = (lambda self: u'<%s: %s>' % (name, self.__unicode__()))
211                dict['__repr__'] = repr
212
213            Model = type.__new__(cls, name, bases, dict)
214            Model._meta = meta
215            Model.objects = Manager(Model)
216            return Model
217
218        else:
219            return type.__new__(cls, name, bases, dict)
220
221
222class Model(object):
223
224    __metaclass__ = ModelMeta
225
226    def __init__(self, pk=None, **kwargs):
227        self.pk = pk
228        if pk is None:
229            for fname in self._meta.fields:
230                setattr(self, fname, kwargs.pop(fname, None))
231
232    def __hasattr__(self, attr):
233        return attr in self._meta.fields
234
235    def __getattr__(self, attr):
236        if attr in self._meta.fields:
237            if self.pk is not None:
238                value = self._meta.dbf.select(self.pk, [attr])[attr]
239                setattr(self, attr, value)
240                return value
241        else:
242            raise AttributeError(attr)
243
244    def getall(self):
245        vars(self).update(self._meta.dbf.select(self.pk))
246
247    def save(self):
248        if self.pk != None:
249            self._meta.dbf.update(vars(self))
250        else:
251            self._meta.dbf.insert(vars(self))
252
253def _test():
254    import doctest
255    doctest.testmod()
256
257if __name__ == '__main__':
258    _test()
Note: See TracBrowser for help on using the browser.