1 from __future__
import unicode_literals
2 from __future__
import print_function
7 from contextlib
import contextmanager
10 import psycopg2
.extras
13 psycopg2
.extensions
.register_type(psycopg2
.extensions
.UNICODE
)
16 if values
== None or len(values
) == 0:
19 assert len(values
) == 1
22 class connection(object):
23 def __init__(self
, driver
):
25 self
.driver
.set_isolation_level(psycopg2
.extensions
.ISOLATION_LEVEL_AUTOCOMMIT
)
29 cursor
= self
.driver
.cursor(cursor_factory
=psycopg2
.extras
.DictCursor
)
36 def execute(self
, statement
, depth
=0, context
=None):
37 with self
.cursor() as cursor
:
38 # two frames, accounting for execute() and @contextmanager
39 locals = inspect
.currentframe(depth
+ 2).f_locals
43 cursor
.execute(statement
.format(**locals), context
)
49 def transact(self
, synchronous_commit
=True):
50 self
.driver
.set_isolation_level(psycopg2
.extensions
.ISOLATION_LEVEL_READ_COMMITTED
)
52 with self
.cursor() as cursor
:
53 if not synchronous_commit
:
54 cursor
.execute('set local synchronous_commit = off')
59 self
.driver
.rollback()
62 self
.driver
.set_isolation_level(psycopg2
.extensions
.ISOLATION_LEVEL_AUTOCOMMIT
)
64 def one_(self
, statement
):
65 with self
.execute(statement
, 2) as cursor
:
66 one
= cursor
.fetchone()
70 assert cursor
.fetchone() == None
73 def __call__(self
, procedure
, *parameters
):
74 with self
.execute(statement
, 1) as cursor
:
75 return cursor
.callproc(procedure
, *parameters
)
77 def run(self
, statement
, locals=None):
78 with self
.execute(statement
, 1, locals) as cursor
:
79 return cursor
.rowcount
82 def set(self
, statement
):
83 with self
.execute(statement
, 1) as cursor
:
86 def all(self
, statement
):
87 with self
.execute(statement
, 1) as cursor
:
88 return cursor
.fetchall()
90 def one(self
, statement
):
91 return self
.one_(statement
)
93 def has(self
, statement
):
94 return one(self
.one_('select exists(%s)' % (statement
,)))
101 driver
= psycopg2
.connect(**dsn
)
103 except psycopg2
.OperationalError
, e
:
106 attempt
= attempt
+ 1
109 driver
.set_client_encoding('UNICODE')
110 yield connection(driver
)
116 def replaced(*args
, **kw
):
117 with connect(dsn
) as sql
:
118 return method(*args
, sql
=sql
, **kw
)
123 def transact(dsn
, *args
, **kw
):
124 with connect(dsn
) as connection
:
125 with connection
.transact(*args
, **kw
):
129 def slap_(sql, table, keys, values, path):
132 csr.execute('savepoint iou')
134 both = dict(keys, **values)
138 insert into %s (%s) values (%s)
142 ', '.join(['%s' for key in fields])
144 except psycopg2.IntegrityError, e:
145 csr.execute('rollback to savepoint iou')
148 update %s set %s where %s
153 for key in values.keys()]),
156 for key in keys.keys()])
157 ), values.values() + keys.values())
159 return path_(csr, path)