psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
+def one(values):
+ if values == None or len(values) == 0:
+ return None
+ else:
+ assert len(values) == 1
+ return values[0]
+
class connection(object):
def __init__(self, driver):
self.driver = driver
+ self.driver.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
@contextmanager
def cursor(self):
+ cursor = self.driver.cursor(cursor_factory=psycopg2.extras.DictCursor)
try:
- cursor = self.driver.cursor(cursor_factory=psycopg2.extras.DictCursor)
yield cursor
finally:
cursor.close()
- @contextmanager
- def execute_(self, statement, locals):
- with self.cursor() as cursor:
- cursor.execute(statement.format(**locals), locals)
- yield cursor
-
@contextmanager
def execute(self, statement, depth=0, context=None):
with self.cursor() as cursor:
@contextmanager
def transact(self, synchronous_commit=True):
- with self.cursor() as cursor:
- if not synchronous_commit:
- cursor.execute('set local synchronous_commit = off')
-
+ self.driver.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED)
try:
- yield transaction(self)
+ with self.cursor() as cursor:
+ if not synchronous_commit:
+ cursor.execute('set local synchronous_commit = off')
+
+ yield
self.driver.commit()
except:
self.driver.rollback()
raise
+ finally:
+ self.driver.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
-class transaction(object):
- def __init__(self, connection):
- self.connection = connection
+ def one_(self, statement):
+ with self.execute(statement, 2) as cursor:
+ one = cursor.fetchone()
+ if one == None:
+ return None
- def __call__(self, statement, locals=None):
- with self.connection.execute(statement, 1, locals) as cursor:
- pass
+ assert cursor.fetchone() == None
+ return one
+
+ def __call__(self, procedure, *parameters):
+ with self.execute(statement, 1) as cursor:
+ return cursor.callproc(procedure, *parameters)
+
+ def run(self, statement, locals=None):
+ with self.execute(statement, 1, locals) as cursor:
+ return cursor.rowcount
@contextmanager
def set(self, statement):
- with self.connection.execute(statement, 1) as cursor:
+ with self.execute(statement, 1) as cursor:
yield cursor
def all(self, statement):
- with self.connection.execute(statement, 1) as cursor:
+ with self.execute(statement, 1) as cursor:
return cursor.fetchall()
def one(self, statement):
- with self.connection.execute(statement, 1) as cursor:
- one = cursor.fetchone()
- if one == None:
- return None
- assert cursor.fetchone() == None
- return one
+ return self.one_(statement)
- def pull(self, statement):
- with self.connection.execute(statement, 1) as cursor:
- return cursor.fetchall()
-
- def yank(self, statement, offset=0):
- with self.connection.execute(statement, 1 + offset) as cursor:
- rows = cursor.fetchall()
- return rows[0] if len(rows) != 0 else None
-
- def push(self, statement):
- with self.connection.execute(statement, 1) as cursor:
- pass
-
- def push_(self, statement, locals):
- with self.connection.execute_(statement, locals) as cursor:
- pass
-
- def exists(self, statement):
- return self.yank('''
- select exists (
- {statement}
- )
- '''.format(**locals()), 1)[0]
+ def has(self, statement):
+ return one(self.one_('select exists(%s)' % (statement,)))
@contextmanager
def connect(dsn):
@contextmanager
def transact(dsn, **args):
with connect(dsn) as connection:
- with connection.transact(**args) as cursor:
- yield cursor
+ with connection.transact(**args):
+ yield connection
"""
def slap_(sql, table, keys, values, path):