This repository was archived by the owner on May 17, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 305
Expand file tree
/
Copy pathpostgresql.py
More file actions
83 lines (65 loc) · 2.63 KB
/
postgresql.py
File metadata and controls
83 lines (65 loc) · 2.63 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
from .database_types import *
from .base import ThreadedDatabase, import_helper, ConnectError
from .base import MD5_HEXDIGITS, CHECKSUM_HEXDIGITS, _CHECKSUM_BITSIZE, TIMESTAMP_PRECISION_POS
SESSION_TIME_ZONE = None # Changed by the tests
@import_helper("postgresql")
def import_postgresql():
import psycopg2
import psycopg2.extras
psycopg2.extensions.set_wait_callback(psycopg2.extras.wait_select)
return psycopg2
class PostgreSQL(ThreadedDatabase):
TYPE_CLASSES = {
# Timestamps
"timestamp with time zone": TimestampTZ,
"timestamp without time zone": Timestamp,
"timestamp": Timestamp,
# Numbers
"double precision": Float,
"real": Float,
"decimal": Decimal,
"integer": Integer,
"numeric": Decimal,
"bigint": Integer,
# Text
"character": Text,
"character varying": Text,
"varchar": Text,
"text": Text,
# UUID
"uuid": Native_UUID,
}
ROUNDS_ON_PREC_LOSS = True
default_schema = "public"
def __init__(self, *, thread_count, **kw):
self._args = kw
super().__init__(thread_count=thread_count)
def _convert_db_precision_to_digits(self, p: int) -> int:
# Subtracting 2 due to wierd precision issues in PostgreSQL
return super()._convert_db_precision_to_digits(p) - 2
def create_connection(self):
if not self._args:
self._args["host"] = None # psycopg2 requires 1+ arguments
pg = import_postgresql()
try:
c = pg.connect(**self._args)
if SESSION_TIME_ZONE:
c.cursor().execute(f"SET TIME ZONE '{SESSION_TIME_ZONE}'")
return c
except pg.OperationalError as e:
raise ConnectError(*e.args) from e
def quote(self, s: str):
return f'"{s}"'
def md5_to_int(self, s: str) -> str:
return f"('x' || substring(md5({s}), {1+MD5_HEXDIGITS-CHECKSUM_HEXDIGITS}))::bit({_CHECKSUM_BITSIZE})::bigint"
def to_string(self, s: str):
return f"{s}::varchar"
def normalize_timestamp(self, value: str, coltype: TemporalType) -> str:
if coltype.rounds:
return f"to_char({value}::timestamp({coltype.precision}), 'YYYY-mm-dd HH24:MI:SS.US')"
timestamp6 = f"to_char({value}::timestamp(6), 'YYYY-mm-dd HH24:MI:SS.US')"
return (
f"RPAD(LEFT({timestamp6}, {TIMESTAMP_PRECISION_POS+coltype.precision}), {TIMESTAMP_PRECISION_POS+6}, '0')"
)
def normalize_number(self, value: str, coltype: FractionalType) -> str:
return self.to_string(f"{value}::decimal(38, {coltype.precision})")