This repository was archived by the owner on May 17, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 305
Expand file tree
/
Copy pathbigquery.py
More file actions
95 lines (72 loc) · 3.23 KB
/
bigquery.py
File metadata and controls
95 lines (72 loc) · 3.23 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
from .database_types import *
from .base import Database, import_helper, parse_table_name, ConnectError
from .base import TIMESTAMP_PRECISION_POS
@import_helper(text="Please install BigQuery and configure your google-cloud access.")
def import_bigquery():
from google.cloud import bigquery
return bigquery
class BigQuery(Database):
TYPE_CLASSES = {
# Dates
"TIMESTAMP": Timestamp,
"DATETIME": Datetime,
# Numbers
"INT64": Integer,
"INT32": Integer,
"NUMERIC": Decimal,
"BIGNUMERIC": Decimal,
"FLOAT64": Float,
"FLOAT32": Float,
}
ROUNDS_ON_PREC_LOSS = False # Technically BigQuery doesn't allow implicit rounding or truncation
def __init__(self, project, *, dataset, **kw):
bigquery = import_bigquery()
self._client = bigquery.Client(project, **kw)
self.project = project
self.dataset = dataset
self.default_schema = dataset
def quote(self, s: str):
return f"`{s}`"
def md5_to_int(self, s: str) -> str:
return f"cast(cast( ('0x' || substr(TO_HEX(md5({s})), 18)) as int64) as numeric)"
def _normalize_returned_value(self, value):
if isinstance(value, bytes):
return value.decode()
return value
def _query(self, sql_code: str):
from google.cloud import bigquery
try:
res = list(self._client.query(sql_code))
except Exception as e:
msg = "Exception when trying to execute SQL code:\n %s\n\nGot error: %s"
raise ConnectError(msg % (sql_code, e))
if res and isinstance(res[0], bigquery.table.Row):
res = [tuple(self._normalize_returned_value(v) for v in row.values()) for row in res]
return res
def to_string(self, s: str):
return f"cast({s} as string)"
def close(self):
self._client.close()
def select_table_schema(self, path: DbPath) -> str:
schema, table = self._normalize_table_path(path)
return (
f"SELECT column_name, data_type, 6 as datetime_precision, 38 as numeric_precision, 9 as numeric_scale FROM {schema}.INFORMATION_SCHEMA.COLUMNS "
f"WHERE table_name = '{table}' AND table_schema = '{schema}'"
)
def normalize_timestamp(self, value: str, coltype: TemporalType) -> str:
if coltype.rounds:
timestamp = f"timestamp_micros(cast(round(unix_micros(cast({value} as timestamp))/1000000, {coltype.precision})*1000000 as int))"
return f"FORMAT_TIMESTAMP('%F %H:%M:%E6S', {timestamp})"
if coltype.precision == 0:
return f"FORMAT_TIMESTAMP('%F %H:%M:%S.000000, {value})"
elif coltype.precision == 6:
return f"FORMAT_TIMESTAMP('%F %H:%M:%E6S', {value})"
timestamp6 = f"FORMAT_TIMESTAMP('%F %H:%M:%E6S', {value})"
return (
f"RPAD(LEFT({timestamp6}, {TIMESTAMP_PRECISION_POS+coltype.precision}), {TIMESTAMP_PRECISION_POS+6}, '0')"
)
def normalize_number(self, value: str, coltype: FractionalType) -> str:
return f"format('%.{coltype.precision}f', {value})"
def parse_table_name(self, name: str) -> DbPath:
path = parse_table_name(name)
return self._normalize_table_path(path)