-
Notifications
You must be signed in to change notification settings - Fork 2k
Expand file tree
/
Copy pathUnsafeUnpackQuery.qll
More file actions
138 lines (133 loc) · 4.5 KB
/
UnsafeUnpackQuery.qll
File metadata and controls
138 lines (133 loc) · 4.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
/**
* Provides a taint-tracking configuration for detecting "UnsafeUnpacking" vulnerabilities.
*/
import python
import semmle.python.Concepts
import semmle.python.dataflow.new.internal.DataFlowPublic
import semmle.python.ApiGraphs
import semmle.python.dataflow.new.TaintTracking
import semmle.python.frameworks.Stdlib
import semmle.python.dataflow.new.RemoteFlowSources
class UnsafeUnpackingConfig extends TaintTracking::Configuration {
UnsafeUnpackingConfig() { this = "UnsafeUnpackingConfig" }
override predicate isSource(DataFlow::Node source) {
// A source coming from a remote location
source instanceof RemoteFlowSource
or
// A source coming from a CLI argparse module
// see argparse: https://docs.python.org/3/library/argparse.html
exists(MethodCallNode args |
args = source.(AttrRead).getObject().getALocalSource() and
args =
API::moduleImport("argparse")
.getMember("ArgumentParser")
.getACall()
.getReturn()
.getMember("parse_args")
.getACall()
)
or
// A source catching an S3 filename download
// see boto3: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.download_file
exists(MethodCallNode mcn, Node s3, Node bc |
bc = API::moduleImport("boto3").getMember("client").getACall() and
bc = s3.getALocalSource() and
mcn.calls(s3, "download_file") and
source = mcn.getArg(2)
)
or
// A source download a file using wget
// see wget: https://pypi.org/project/wget/
exists(API::CallNode mcn |
mcn = API::moduleImport("wget").getMember("download").getACall() and
(
source = mcn.getArg(1)
or
source = mcn.getReturn().asSource() and not exists(Node arg | arg = mcn.getArg(1))
)
)
or
// catch the Django uploaded files as a source
// see HttpRequest.FILES: https://docs.djangoproject.com/en/4.1/ref/request-response/#django.http.HttpRequest.FILES
source.(AttrRead).getAttributeName() = "FILES"
}
override predicate isSink(DataFlow::Node sink) {
// A sink capturing method calls to `unpack_archive`.
sink = API::moduleImport("shutil").getMember("unpack_archive").getACall().getArg(0)
}
override predicate isAdditionalTaintStep(DataFlow::Node nodeFrom, DataFlow::Node nodeTo) {
// Reading the response
exists(MethodCallNode mc |
nodeFrom = mc.getObject() and
mc.getMethodName() = "read" and
nodeTo = mc
)
or
// Open for access
exists(MethodCallNode cn |
nodeTo = cn.getObject() and
cn.getMethodName() = "open" and
cn.flowsTo(nodeFrom)
)
or
// Write for access
exists(MethodCallNode cn |
nodeFrom = cn.getObject() and
cn.getMethodName() = "write" and
nodeTo = cn.getArg(0)
)
or
// Retrieve Django uploaded files
// see HttpRequest.FILES.getlist(): https://docs.djangoproject.com/en/4.1/ref/request-response/#django.http.QueryDict.getlist
exists(MethodCallNode mc |
nodeFrom = mc.getObject() and
mc.getMethodName() = ["getlist", "get"] and
nodeTo = mc
)
or
// Accessing the name or raw content
exists(AttrRead ar | ar.accesses(nodeFrom, ["name", "raw"]) and ar.flowsTo(nodeTo))
or
// Considering the use of "fs"
exists(API::CallNode fs, MethodCallNode mcn |
fs =
API::moduleImport("django")
.getMember("core")
.getMember("files")
.getMember("storage")
.getMember("FileSystemStorage")
.getACall() and
fs.flowsTo(mcn.getObject()) and
mcn.getMethodName() = ["save", "path"] and
nodeFrom = mcn.getArg(0) and
nodeTo = mcn
)
or
//Use of join of filename
exists(API::CallNode mcn |
mcn = API::moduleImport("os").getMember("path").getMember("join").getACall() and
nodeFrom = mcn.getArg(1) and
mcn.flowsTo(nodeTo)
)
or
// Read by chunks
exists(MethodCallNode mc |
nodeFrom = mc.getObject() and mc.getMethodName() = "chunks" and mc.flowsTo(nodeTo)
)
or
// Write access
exists(MethodCallNode cn |
nodeTo = cn.getObject() and
cn.getMethodName() = "write" and
nodeFrom = cn.getArg(0)
)
or
// Writing the response data to the archive
exists(Stdlib::FileLikeObject::InstanceSource is, Node f, MethodCallNode mc |
is.flowsTo(f) and
mc.calls(f, "write") and
nodeFrom = mc.getArg(0) and
nodeTo = is.(CallCfgNode).getArg(0)
)
}
}