Skip to content
This repository was archived by the owner on May 17, 2024. It is now read-only.

Commit 4552aef

Browse files
committed
add json format for dbt diff values
1 parent 496fd4f commit 4552aef

File tree

6 files changed

+353
-6
lines changed

6 files changed

+353
-6
lines changed

data_diff/__main__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,7 @@ def main(conf, run, **kw):
279279
project_dir_override=project_dir_override,
280280
is_cloud=kw["cloud"],
281281
dbt_selection=kw["select"],
282+
json_output=kw["json_output"]
282283
)
283284
else:
284285
return _data_diff(dbt_project_dir=project_dir_override,

data_diff/dbt.py

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import json
12
import os
23
import time
34
import webbrowser
@@ -11,6 +12,8 @@
1112
from . import connect_to_table, diff_tables, Algorithm
1213
from .cloud import DatafoldAPI, TCloudApiDataDiff, TCloudApiOrgMeta, get_or_create_data_source
1314
from .dbt_parser import DbtParser, PROJECT_FILE
15+
from .diff_tables import DiffResultWrapper
16+
from .format import jsonify
1417
from .tracking import (
1518
set_entrypoint_name,
1619
set_dbt_user_id,
@@ -52,6 +55,7 @@ def dbt_diff(
5255
project_dir_override: Optional[str] = None,
5356
is_cloud: bool = False,
5457
dbt_selection: Optional[str] = None,
58+
json_output: bool = False,
5559
) -> None:
5660
print_version_info()
5761
diff_threads = []
@@ -113,7 +117,7 @@ def dbt_diff(
113117
diff_thread = run_as_daemon(_cloud_diff, diff_vars, datasource_id, api, org_meta)
114118
diff_threads.append(diff_thread)
115119
else:
116-
_local_diff(diff_vars)
120+
_local_diff(diff_vars, json_output)
117121
else:
118122
rich.print(
119123
_diff_output_base(".".join(diff_vars.dev_path), ".".join(diff_vars.prod_path))
@@ -186,7 +190,7 @@ def _get_diff_vars(
186190
)
187191

188192

189-
def _local_diff(diff_vars: TDiffVars) -> None:
193+
def _local_diff(diff_vars: TDiffVars, json_output: bool = False) -> None:
190194
dev_qualified_str = ".".join(diff_vars.dev_path)
191195
prod_qualified_str = ".".join(diff_vars.prod_path)
192196
diff_output_str = _diff_output_base(dev_qualified_str, prod_qualified_str)
@@ -236,7 +240,7 @@ def _local_diff(diff_vars: TDiffVars) -> None:
236240

237241
extra_columns = tuple(column_set)
238242

239-
diff = diff_tables(
243+
diff: DiffResultWrapper = diff_tables(
240244
table1,
241245
table2,
242246
threaded=True,
@@ -245,6 +249,16 @@ def _local_diff(diff_vars: TDiffVars) -> None:
245249
where=diff_vars.where_filter,
246250
skip_null_keys=True,
247251
)
252+
if json_output:
253+
# drain the iterator to get accumulated stats in diff.info_tree
254+
list(diff)
255+
256+
print(json.dumps(jsonify(diff, with_summary=True, with_columns={
257+
"added": columns_added,
258+
"removed": columns_removed,
259+
"changed": columns_type_changed,
260+
})))
261+
return
248262

249263
if list(diff):
250264
diff_output_str += f"{diff.get_stats_string(is_dbt=True)} \n"

data_diff/format.py

Lines changed: 236 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,236 @@
1+
import collections
2+
3+
from data_diff.diff_tables import DiffResultWrapper
4+
from typing import TypedDict, Any, Optional
5+
6+
7+
8+
class ColumnsDiff(TypedDict):
9+
removed: list[str]
10+
added: list[str]
11+
changed: list[str]
12+
13+
14+
def jsonify(diff: DiffResultWrapper,
15+
with_summary: bool = False,
16+
with_columns: ColumnsDiff | None = None) -> 'JsonDiff':
17+
"""
18+
Converts the diff result into a JSON-serializable format.
19+
Optionally add stats summary and schema diff.
20+
"""
21+
diff_info = diff.info_tree.info
22+
table1 = diff_info.tables[0]
23+
table2 = diff_info.tables[1]
24+
key_columns = table1.key_columns
25+
26+
t1_exclusive_rows = []
27+
t2_exclusive_rows = []
28+
diff_rows = []
29+
schema = [field for field, _ in diff_info.diff_schema]
30+
31+
t1_exclusive_rows, t2_exclusive_rows, diff_rows = _group_rows(diff_info, schema)
32+
33+
34+
diff_rows_jsonified = []
35+
for row in diff_rows:
36+
diff_rows_jsonified.append(_jsonify_diff(row, key_columns))
37+
38+
t1_exclusive_rows_jsonified = []
39+
for row in t1_exclusive_rows:
40+
t1_exclusive_rows_jsonified.append(_jsonify_exclusive(row, key_columns))
41+
42+
t2_exclusive_rows_jsonified = []
43+
for row in t2_exclusive_rows:
44+
t2_exclusive_rows_jsonified.append(_jsonify_exclusive(row, key_columns))
45+
46+
summary = None
47+
if with_summary:
48+
summary = _jsonify_diff_summary(diff.get_stats_dict())
49+
50+
columns = None
51+
if with_columns:
52+
added, removed, changed = with_columns['added'], with_columns['removed'], with_columns['changed']
53+
columns = _jsonify_columns_diff(added, removed, changed)
54+
55+
is_different = bool(
56+
t1_exclusive_rows
57+
or t2_exclusive_rows
58+
or diff_rows
59+
or with_columns and (
60+
with_columns['added']
61+
or with_columns['removed']
62+
or with_columns['changed']
63+
)
64+
)
65+
return {
66+
'isDifferent': is_different,
67+
'table1': list(table1.table_path),
68+
'table2': list(table2.table_path),
69+
'rows': {
70+
'exclusive': {
71+
'table1': t1_exclusive_rows_jsonified,
72+
'table2': t2_exclusive_rows_jsonified,
73+
},
74+
'diff': diff_rows_jsonified,
75+
},
76+
'summary': summary,
77+
'columns': columns,
78+
}
79+
80+
class JsonDiff(TypedDict):
81+
table1: list[str]
82+
table2: list[str]
83+
rows: TypedDict('Rows', {
84+
'exclusive': TypedDict('Exclusive', {
85+
'table1': list['JsonExclusiveRow'],
86+
'table2': list['JsonExclusiveRow'],
87+
}),
88+
'diff': list['JsonDiffRow'],
89+
})
90+
summary: Optional['JsonDiffSummary' ]
91+
columns: Optional['JsonColumnsSummary']
92+
93+
94+
95+
class JsonExclusiveRowValue(TypedDict):
96+
"""
97+
Value of a single column in a row
98+
"""
99+
isPK: bool
100+
value: Any
101+
102+
class JsonDiffRowValue(TypedDict):
103+
"""
104+
Pair of diffed values for 2 rows with equal PKs
105+
"""
106+
table1: Any
107+
table2: Any
108+
isDiff: bool
109+
isPK: bool
110+
111+
112+
JsonDiffRow = dict[str, JsonDiffRowValue]
113+
JsonExclusiveRow = dict[str, JsonExclusiveRowValue]
114+
115+
116+
class JsonDiffSummary(TypedDict):
117+
rows: TypedDict('Rows', {
118+
'total': TypedDict('Total', {
119+
'table1': int,
120+
'table2': int,
121+
}),
122+
'exclusive': TypedDict('Exclusive', {
123+
'table1': int,
124+
'table2': int,
125+
}),
126+
'updated': int,
127+
'unchanged': int,
128+
})
129+
stats: TypedDict('Stats', {
130+
'diffCounts': dict[str, int],
131+
})
132+
133+
class JsonColumnsSummary(TypedDict):
134+
exclusive: TypedDict('Exclusive', {
135+
'table1': list[str],
136+
'table2': list[str],
137+
})
138+
typeChanged: list[str]
139+
140+
141+
142+
def _group_rows(diff_info: DiffResultWrapper,
143+
schema: list[str]) -> tuple[list[dict[str, Any]], list[dict[str, Any]], list[dict[str, Any]]]:
144+
t1_exclusive_rows = []
145+
t2_exclusive_rows = []
146+
diff_rows = []
147+
148+
for row in diff_info.diff:
149+
row_w_schema = dict(zip(schema, row))
150+
is_t1_exclusive = row_w_schema['is_exclusive_a']
151+
is_t2_exclusive = row_w_schema['is_exclusive_b']
152+
153+
if is_t1_exclusive:
154+
t1_exclusive_rows.append(row_w_schema)
155+
156+
elif is_t2_exclusive:
157+
t2_exclusive_rows.append(row_w_schema)
158+
159+
else:
160+
diff_rows.append(row_w_schema)
161+
162+
return t1_exclusive_rows, t2_exclusive_rows, diff_rows
163+
164+
165+
def _jsonify_diff(row: dict[str, Any], key_columns: list[str]) -> JsonDiffRowValue:
166+
columns = collections.defaultdict(dict)
167+
for field, value in row.items():
168+
if field in ('is_exclusive_a', 'is_exclusive_b'):
169+
continue
170+
171+
if field.startswith('is_diff_'):
172+
column_name = field.replace('is_diff_', '')
173+
columns[column_name]['isDiff'] = bool(value)
174+
175+
elif field.endswith('_a'):
176+
column_name = field.replace('_a', '')
177+
columns[column_name]['table1'] = value
178+
columns[column_name]['isPK'] = column_name in key_columns
179+
180+
elif field.endswith('_b'):
181+
column_name = field.replace('_b', '')
182+
columns[column_name]['table2'] = value
183+
columns[column_name]['isPK'] = column_name in key_columns
184+
185+
return columns
186+
187+
188+
def _jsonify_exclusive(row: dict[str, Any], key_columns: list[str]) -> JsonExclusiveRow:
189+
columns = collections.defaultdict(dict)
190+
for field, value in row.items():
191+
if field in ('is_exclusive_a', 'is_exclusive_b'):
192+
continue
193+
if field.startswith('is_diff_'):
194+
continue
195+
if field.endswith('_b') and row['is_exclusive_b']:
196+
column_name = field.replace('_b', '')
197+
columns[column_name]['isPK'] = column_name in key_columns
198+
columns[column_name]['value'] = value
199+
elif field.endswith('_a') and row['is_exclusive_a']:
200+
column_name = field.replace('_a', '')
201+
columns[column_name]['isPK'] = column_name in key_columns
202+
columns[column_name]['value'] = value
203+
return columns
204+
205+
206+
def _jsonify_diff_summary(stats_dict: dict) -> JsonDiffSummary:
207+
return {
208+
'rows': {
209+
'total': {
210+
'table1': stats_dict["rows_A"],
211+
'table2': stats_dict["rows_B"]
212+
},
213+
'exclusive': {
214+
'table1': stats_dict["exclusive_A"],
215+
'table2': stats_dict["exclusive_B"],
216+
},
217+
'updated': stats_dict["updated"],
218+
'unchanged': stats_dict["unchanged"]
219+
},
220+
'stats': {
221+
'diffCounts': stats_dict["stats"]['diff_counts']
222+
}
223+
}
224+
225+
226+
def _jsonify_columns_diff(added_columns: list[str],
227+
removed_columns: list[str],
228+
changed_columns: list[str]) -> JsonColumnsSummary:
229+
columns = {
230+
'exclusive': {
231+
'table2': list(added_columns),
232+
'table1': list(removed_columns),
233+
},
234+
'typeChanged': list(changed_columns),
235+
}
236+
return columns

data_diff/info_tree.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from typing import Any
12
from typing import List, Dict
23

34
from runtype import dataclass
@@ -9,14 +10,16 @@
910
class SegmentInfo:
1011
tables: List[TableSegment]
1112

12-
diff: list = None
13+
diff: list[tuple[Any, ...]] = None
14+
diff_schema: tuple[tuple[str, type], ...] = None
1315
is_diff: bool = None
1416
diff_count: int = None
1517

1618
rowcounts: Dict[int, int] = {}
1719
max_rows: int = None
1820

19-
def set_diff(self, diff):
21+
def set_diff(self, schema: tuple[tuple[str, type]], diff: list[tuple[Any, ...]]):
22+
self.diff_schema = schema
2023
self.diff = diff
2124
self.diff_count = len(diff)
2225
self.is_diff = self.diff_count > 0

data_diff/joindiff_tables.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ def _diff_segments(
206206
assert len(a_cols) == len(b_cols)
207207
logger.debug("Querying for different rows")
208208
diff = db.query(diff_rows, list)
209-
info_tree.info.set_diff(diff)
209+
info_tree.info.set_diff(tuple(diff_rows.schema.items()), diff)
210210
for is_xa, is_xb, *x in diff:
211211
if is_xa and is_xb:
212212
# Can't both be exclusive, meaning a pk is NULL

0 commit comments

Comments
 (0)