-from __future__ import unicode_literals
+from __future__ import absolute_import
+from __future__ import division
from __future__ import print_function
+from __future__ import unicode_literals
+
+from future_builtins import ascii, filter, hex, map, oct, zip
import inspect
import os
psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
-def one(values):
- if values == None or len(values) == 0:
- return None
- else:
- assert len(values) == 1
- return values[0]
+class connect(object):
+ def __init__(self, dsn):
+ attempt = 0
+ while True:
+ try:
+ self.driver = psycopg2.connect(**dsn)
+ break
+ except psycopg2.OperationalError, e:
+ if attempt == 2:
+ raise e
+ attempt = attempt + 1
+
+ try:
+ self.driver.set_client_encoding('UNICODE')
+ self.driver.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
+ except:
+ self.driver.close()
+
+ def close(self):
+ self.driver.close()
+
+ def __enter__(self):
+ return self
-class connection(object):
- def __init__(self, driver):
- self.driver = driver
- self.driver.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
+ def __exit__(self, type, value, traceback):
+ self.close()
@contextmanager
def cursor(self):
return self.one_(statement)
def has(self, statement):
- return one(self.one_('select exists(%s)' % (statement,)))
-
-@contextmanager
-def connect(dsn):
- attempt = 0
- while True:
- try:
- driver = psycopg2.connect(**dsn)
- break
- except psycopg2.OperationalError, e:
- if attempt == 2:
- raise e
- attempt = attempt + 1
-
- try:
- driver.set_client_encoding('UNICODE')
- yield connection(driver)
- finally:
- driver.close()
+ exists, = self.one_('select exists(%s)' % (statement,))
+ return exists
def connected(dsn):
def wrapped(method):
def replaced(*args, **kw):
- with connect(dsn) as connection:
- return method(connection, *args, **kw)
+ with connect(dsn) as sql:
+ return method(*args, sql=sql, **kw)
return replaced
return wrapped