This repository was archived by the owner on May 17, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 305
Expand file tree
/
Copy pathtrino.py
More file actions
50 lines (34 loc) · 1.44 KB
/
trino.py
File metadata and controls
50 lines (34 loc) · 1.44 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
from typing import Any, ClassVar, Type
import attrs
from data_diff.abcs.database_types import TemporalType, ColType_UUID
from data_diff.databases import presto
from data_diff.databases.base import import_helper
from data_diff.databases.base import TIMESTAMP_PRECISION_POS, BaseDialect
@import_helper("trino")
def import_trino():
import trino
return trino
class Dialect(presto.Dialect):
name = "Trino"
def normalize_timestamp(self, value: str, coltype: TemporalType) -> str:
if coltype.rounds:
s = f"date_format(cast({value} as timestamp({coltype.precision})), '%Y-%m-%d %H:%i:%S.%f')"
else:
s = f"date_format(cast({value} as timestamp(6)), '%Y-%m-%d %H:%i:%S.%f')"
return (
f"RPAD(RPAD({s}, {TIMESTAMP_PRECISION_POS + coltype.precision}, '.'), {TIMESTAMP_PRECISION_POS + 6}, '0')"
)
def normalize_uuid(self, value: str, coltype: ColType_UUID) -> str:
return f"TRIM({value})"
@attrs.define(frozen=False, init=False, kw_only=True)
class Trino(presto.Presto):
DIALECT_CLASS: ClassVar[Type[BaseDialect]] = Dialect
CONNECT_URI_HELP = "trino://<user>@<host>/<catalog>/<schema>"
CONNECT_URI_PARAMS = ["catalog", "schema"]
_conn: Any
def __init__(self, **kw) -> None:
super().__init__()
trino = import_trino()
if kw.get("schema"):
self.default_schema = kw.get("schema")
self._conn = trino.dbapi.connect(**kw)