]> git.saurik.com Git - cyql.git/blob - __init__.py
765a0825cb3780688e5cc45d9950dd33162b8dd4
[cyql.git] / __init__.py
1 from __future__ import unicode_literals
2 from __future__ import print_function
3
4 import inspect
5 import os
6
7 from contextlib import contextmanager
8
9 import psycopg2
10 import psycopg2.extras
11 import psycopg2.pool
12
13 psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
14
15 class connect(object):
16 def __init__(self, dsn):
17 attempt = 0
18 while True:
19 try:
20 self.driver = psycopg2.connect(**dsn)
21 break
22 except psycopg2.OperationalError, e:
23 if attempt == 2:
24 raise e
25 attempt = attempt + 1
26
27 try:
28 self.driver.set_client_encoding('UNICODE')
29 self.driver.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
30 except:
31 self.driver.close()
32
33 def close(self):
34 self.driver.close()
35
36 def __enter__(self):
37 return self
38
39 def __exit__(self, type, value, traceback):
40 self.close()
41
42 @contextmanager
43 def cursor(self):
44 cursor = self.driver.cursor(cursor_factory=psycopg2.extras.DictCursor)
45 try:
46 yield cursor
47 finally:
48 cursor.close()
49
50 @contextmanager
51 def execute(self, statement, depth=0, context=None):
52 with self.cursor() as cursor:
53 # two frames, accounting for execute() and @contextmanager
54 locals = inspect.currentframe(depth + 2).f_locals
55 try:
56 if context == None:
57 context = locals
58 cursor.execute(statement.format(**locals), context)
59 finally:
60 del locals
61 yield cursor
62
63 @contextmanager
64 def transact(self, synchronous_commit=True):
65 self.driver.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED)
66 try:
67 with self.cursor() as cursor:
68 if not synchronous_commit:
69 cursor.execute('set local synchronous_commit = off')
70
71 yield
72 self.driver.commit()
73 except:
74 self.driver.rollback()
75 raise
76 finally:
77 self.driver.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
78
79 def one_(self, statement):
80 with self.execute(statement, 2) as cursor:
81 one = cursor.fetchone()
82 if one == None:
83 return None
84
85 assert cursor.fetchone() == None
86 return one
87
88 def __call__(self, procedure, *parameters):
89 with self.execute(statement, 1) as cursor:
90 return cursor.callproc(procedure, *parameters)
91
92 def run(self, statement, locals=None):
93 with self.execute(statement, 1, locals) as cursor:
94 return cursor.rowcount
95
96 @contextmanager
97 def set(self, statement):
98 with self.execute(statement, 1) as cursor:
99 yield cursor
100
101 def all(self, statement):
102 with self.execute(statement, 1) as cursor:
103 return cursor.fetchall()
104
105 def one(self, statement):
106 return self.one_(statement)
107
108 def has(self, statement):
109 exists, = self.one_('select exists(%s)' % (statement,))
110 return exists
111
112 def connected(dsn):
113 def wrapped(method):
114 def replaced(*args, **kw):
115 with connect(dsn) as sql:
116 return method(*args, sql=sql, **kw)
117 return replaced
118 return wrapped
119
120 @contextmanager
121 def transact(dsn, *args, **kw):
122 with connect(dsn) as connection:
123 with connection.transact(*args, **kw):
124 yield connection
125
126 """
127 def slap_(sql, table, keys, values, path):
128 csr = sql.cursor()
129 try:
130 csr.execute('savepoint iou')
131 try:
132 both = dict(keys, **values)
133 fields = both.keys()
134
135 csr.execute('''
136 insert into %s (%s) values (%s)
137 ''' % (
138 table,
139 ', '.join(fields),
140 ', '.join(['%s' for key in fields])
141 ), both.values())
142 except psycopg2.IntegrityError, e:
143 csr.execute('rollback to savepoint iou')
144
145 csr.execute('''
146 update %s set %s where %s
147 ''' % (
148 table,
149 ', '.join([
150 key + ' = %s'
151 for key in values.keys()]),
152 ' and '.join([
153 key + ' = %s'
154 for key in keys.keys()])
155 ), values.values() + keys.values())
156
157 return path_(csr, path)
158 finally:
159 csr.close()
160 """