]> git.saurik.com Git - cyql.git/blob - __init__.py
New kw-based @cyql.connected().
[cyql.git] / __init__.py
1 from __future__ import unicode_literals
2 from __future__ import print_function
3
4 import inspect
5 import os
6
7 from contextlib import contextmanager
8
9 import psycopg2
10 import psycopg2.extras
11 import psycopg2.pool
12
13 psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
14
15 def one(values):
16 if values == None or len(values) == 0:
17 return None
18 else:
19 assert len(values) == 1
20 return values[0]
21
22 class connection(object):
23 def __init__(self, driver):
24 self.driver = driver
25 self.driver.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
26
27 @contextmanager
28 def cursor(self):
29 cursor = self.driver.cursor(cursor_factory=psycopg2.extras.DictCursor)
30 try:
31 yield cursor
32 finally:
33 cursor.close()
34
35 @contextmanager
36 def execute(self, statement, depth=0, context=None):
37 with self.cursor() as cursor:
38 # two frames, accounting for execute() and @contextmanager
39 locals = inspect.currentframe(depth + 2).f_locals
40 try:
41 if context == None:
42 context = locals
43 cursor.execute(statement.format(**locals), context)
44 finally:
45 del locals
46 yield cursor
47
48 @contextmanager
49 def transact(self, synchronous_commit=True):
50 self.driver.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED)
51 try:
52 with self.cursor() as cursor:
53 if not synchronous_commit:
54 cursor.execute('set local synchronous_commit = off')
55
56 yield
57 self.driver.commit()
58 except:
59 self.driver.rollback()
60 raise
61 finally:
62 self.driver.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
63
64 def one_(self, statement):
65 with self.execute(statement, 2) as cursor:
66 one = cursor.fetchone()
67 if one == None:
68 return None
69
70 assert cursor.fetchone() == None
71 return one
72
73 def __call__(self, procedure, *parameters):
74 with self.execute(statement, 1) as cursor:
75 return cursor.callproc(procedure, *parameters)
76
77 def run(self, statement, locals=None):
78 with self.execute(statement, 1, locals) as cursor:
79 return cursor.rowcount
80
81 @contextmanager
82 def set(self, statement):
83 with self.execute(statement, 1) as cursor:
84 yield cursor
85
86 def all(self, statement):
87 with self.execute(statement, 1) as cursor:
88 return cursor.fetchall()
89
90 def one(self, statement):
91 return self.one_(statement)
92
93 def has(self, statement):
94 return one(self.one_('select exists(%s)' % (statement,)))
95
96 @contextmanager
97 def connect(dsn):
98 attempt = 0
99 while True:
100 try:
101 driver = psycopg2.connect(**dsn)
102 break
103 except psycopg2.OperationalError, e:
104 if attempt == 2:
105 raise e
106 attempt = attempt + 1
107
108 try:
109 driver.set_client_encoding('UNICODE')
110 yield connection(driver)
111 finally:
112 driver.close()
113
114 def connected(dsn):
115 def wrapped(method):
116 def replaced(*args, **kw):
117 with connect(dsn) as sql:
118 return method(*args, sql=sql, **kw)
119 return replaced
120 return wrapped
121
122 @contextmanager
123 def transact(dsn, *args, **kw):
124 with connect(dsn) as connection:
125 with connection.transact(*args, **kw):
126 yield connection
127
128 """
129 def slap_(sql, table, keys, values, path):
130 csr = sql.cursor()
131 try:
132 csr.execute('savepoint iou')
133 try:
134 both = dict(keys, **values)
135 fields = both.keys()
136
137 csr.execute('''
138 insert into %s (%s) values (%s)
139 ''' % (
140 table,
141 ', '.join(fields),
142 ', '.join(['%s' for key in fields])
143 ), both.values())
144 except psycopg2.IntegrityError, e:
145 csr.execute('rollback to savepoint iou')
146
147 csr.execute('''
148 update %s set %s where %s
149 ''' % (
150 table,
151 ', '.join([
152 key + ' = %s'
153 for key in values.keys()]),
154 ' and '.join([
155 key + ' = %s'
156 for key in keys.keys()])
157 ), values.values() + keys.values())
158
159 return path_(csr, path)
160 finally:
161 csr.close()
162 """