This repository was archived by the owner on May 17, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 305
Expand file tree
/
Copy pathredshift.py
More file actions
57 lines (46 loc) · 2.39 KB
/
redshift.py
File metadata and controls
57 lines (46 loc) · 2.39 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
from typing import List
from .database_types import Float, TemporalType, FractionalType, DbPath
from .postgresql import PostgreSQL, MD5_HEXDIGITS, CHECKSUM_HEXDIGITS, TIMESTAMP_PRECISION_POS, PostgresqlDialect
class Dialect(PostgresqlDialect):
name = "Redshift"
TYPE_CLASSES = {
**PostgresqlDialect.TYPE_CLASSES,
"double": Float,
"real": Float,
}
def md5_as_int(self, s: str) -> str:
return f"strtol(substring(md5({s}), {1+MD5_HEXDIGITS-CHECKSUM_HEXDIGITS}), 16)::decimal(38)"
def normalize_timestamp(self, value: str, coltype: TemporalType) -> str:
if coltype.rounds:
timestamp = f"{value}::timestamp(6)"
# Get seconds since epoch. Redshift doesn't support milli- or micro-seconds.
secs = f"timestamp 'epoch' + round(extract(epoch from {timestamp})::decimal(38)"
# Get the milliseconds from timestamp.
ms = f"extract(ms from {timestamp})"
# Get the microseconds from timestamp, without the milliseconds!
us = f"extract(us from {timestamp})"
# epoch = Total time since epoch in microseconds.
epoch = f"{secs}*1000000 + {ms}*1000 + {us}"
timestamp6 = (
f"to_char({epoch}, -6+{coltype.precision}) * interval '0.000001 seconds', 'YYYY-mm-dd HH24:MI:SS.US')"
)
else:
timestamp6 = f"to_char({value}::timestamp(6), 'YYYY-mm-dd HH24:MI:SS.US')"
return (
f"RPAD(LEFT({timestamp6}, {TIMESTAMP_PRECISION_POS+coltype.precision}), {TIMESTAMP_PRECISION_POS+6}, '0')"
)
def normalize_number(self, value: str, coltype: FractionalType) -> str:
return self.to_string(f"{value}::decimal(38,{coltype.precision})")
def concat(self, items: List[str]) -> str:
joined_exprs = " || ".join(items)
return f"({joined_exprs})"
def is_distinct_from(self, a: str, b: str) -> str:
return f"{a} IS NULL AND NOT {b} IS NULL OR {b} IS NULL OR {a}!={b}"
class Redshift(PostgreSQL):
dialect = Dialect()
def select_table_schema(self, path: DbPath) -> str:
schema, table = self._normalize_table_path(path)
return (
"SELECT column_name, data_type, datetime_precision, numeric_precision, numeric_scale FROM information_schema.columns "
f"WHERE table_name = '{table.lower()}' AND table_schema = '{schema.lower()}'"
)