-
Notifications
You must be signed in to change notification settings - Fork 2k
Expand file tree
/
Copy pathDecompressionBombs.ql
More file actions
123 lines (113 loc) · 4.37 KB
/
DecompressionBombs.ql
File metadata and controls
123 lines (113 loc) · 4.37 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
/**
* @name Uncontrolled file decompression
* @description Uncontrolled data that flows into decompression library APIs without checking the compression rate is dangerous
* @kind path-problem
* @problem.severity error
* @security-severity 7.8
* @precision high
* @id py/uncontrolled-file-decompression
* @tags security
* experimental
* external/cwe/cwe-409
*/
import python
import semmle.python.dataflow.new.DataFlow
import semmle.python.dataflow.new.TaintTracking
import semmle.python.ApiGraphs
import semmle.python.dataflow.new.RemoteFlowSources
import semmle.python.dataflow.new.internal.DataFlowPublic
import DecompressionBomb
/**
* `io.TextIOWrapper(ip, encoding='utf-8')` like following:
* ```python
* with gzip.open(bomb_input, 'rb') as ip:
* with io.TextIOWrapper(ip, encoding='utf-8') as decoder:
* content = decoder.read()
* print(content)
* ```
* I saw this builtin method many places so I added it as a AdditionalTaintStep.
* it would be nice if it is added as a global AdditionalTaintStep
*/
predicate isAdditionalTaintStepTextIOWrapper(DataFlow::Node nodeFrom, DataFlow::Node nodeTo) {
exists(API::CallNode textIOWrapper |
textIOWrapper = API::moduleImport("io").getMember("TextIOWrapper").getACall()
|
nodeFrom = textIOWrapper.getParameter(0, "input").asSink() and
nodeTo = textIOWrapper
) and
exists(nodeTo.getLocation().getFile().getRelativePath())
}
module FileAndFormRemoteFlowSource {
class FastAPI extends RemoteFlowSource::Range {
FastAPI() {
exists(API::Node fastApiParam, Expr fastApiUploadFile |
fastApiParam =
API::moduleImport("fastapi")
.getMember("FastAPI")
.getReturn()
.getMember("post")
.getReturn()
.getParameter(0)
.getKeywordParameter(_) and
fastApiUploadFile =
API::moduleImport("fastapi")
.getMember("UploadFile")
.getASubclass*()
.getAValueReachableFromSource()
.asExpr()
|
fastApiUploadFile =
fastApiParam.asSource().asExpr().(Parameter).getAnnotation().getASubExpression*() and
// Multiple Uploaded files as list of fastapi.UploadFile
exists(For f, Attribute attr, DataFlow::Node a, DataFlow::Node b |
fastApiParam.getAValueReachableFromSource().asExpr() = f.getIter().getASubExpression*()
|
TaintTracking::localExprTaint(f.getIter(), attr.getObject()) and
attr.getName() = ["filename", "content_type", "headers", "file", "read"] and
this.asExpr() = attr
)
or
// one Uploaded file as fastapi.UploadFile
this =
[
fastApiParam.getMember(["filename", "content_type", "headers"]).asSource(),
fastApiParam
.getMember("file")
.getMember(["readlines", "readline", "read"])
.getReturn()
.asSource(), fastApiParam.getMember("read").getReturn().asSource()
]
) and
exists(this.getLocation().getFile().getRelativePath())
}
override string getSourceType() { result = "HTTP FORM" }
}
}
module BombsConfig implements DataFlow::ConfigSig {
predicate isSource(DataFlow::Node source) {
source instanceof RemoteFlowSource and
// or
// source instanceof FileAndFormRemoteFlowSource::FastAPI
exists(source.getLocation().getFile().getRelativePath()) and
not source.getLocation().getFile().getRelativePath().matches("%venv%")
}
predicate isSink(DataFlow::Node sink) {
sink instanceof DecompressionBomb::Sink and
exists(sink.getLocation().getFile().getRelativePath()) and
not sink.getLocation().getFile().getRelativePath().matches("%venv%")
}
predicate isAdditionalFlowStep(DataFlow::Node pred, DataFlow::Node succ) {
(
any(DecompressionBomb::AdditionalTaintStep a).isAdditionalTaintStep(pred, succ) or
isAdditionalTaintStepTextIOWrapper(pred, succ)
) and
not succ.getLocation().getFile().inStdlib() and
not succ.getLocation().getFile().getRelativePath().matches("%venv%")
}
}
module Bombs = TaintTracking::Global<BombsConfig>;
import Bombs::PathGraph
from Bombs::PathNode source, Bombs::PathNode sink
where Bombs::flowPath(source, sink)
select sink.getNode(), source, sink, "This uncontrolled file extraction is $@.", source.getNode(),
"depends on this user controlled data"