-
Notifications
You must be signed in to change notification settings - Fork 2k
Expand file tree
/
Copy pathUnsafeUnpack.ql
More file actions
136 lines (130 loc) · 4.79 KB
/
UnsafeUnpack.ql
File metadata and controls
136 lines (130 loc) · 4.79 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
/**
* @name Arbitrary file write during a tarball extraction from a user controlled source
* @description Extracting files from a potentially malicious tarball using `shutil.unpack_archive()` without validating
* that the destination file path is within the destination directory can cause files outside
* the destination directory to be overwritten. More precisely, if the tarball comes from a user controlled
* location either a remote one or cli argument.
* @kind path-problem
* @id py/unsafe-unpacking
* @problem.severity error
* @security-severity 7.5
* @precision high
* @tags security
* external/cwe/cwe-022
*/
import python
import semmle.python.Concepts
import semmle.python.dataflow.new.internal.DataFlowPublic
import semmle.python.ApiGraphs
import DataFlow::PathGraph
import semmle.python.dataflow.new.TaintTracking
import semmle.python.frameworks.Stdlib
class UnsafeUnpackingConfig extends TaintTracking::Configuration {
UnsafeUnpackingConfig() { this = "UnsafeUnpackingConfig" }
override predicate isSource(DataFlow::Node source) {
// A source coming from a remote location
exists(Http::Client::Request request | source = request)
or
//A source coming from a CLI argparse module
exists(Node o, API::Node ap, MethodCallNode args |
ap = API::moduleImport("argparse").getMember("ArgumentParser").getACall().getReturn() and
args = ap.getMember("parse_args").getACall() and
args.flowsTo(o) and
source.(AttrRead).accesses(o, any(string s))
)
or
// A source catching a S3 filename download
exists(API::Node s3 | source = s3.getMember("download_file").getACall().getArg(2))
or
// A source download a file using wget
exists(MethodCallNode mcn |
mcn = API::moduleImport("wget").getMember("download").getACall() and source = mcn.getArg(1)
)
or
// catch the uploaded files as a source
exists(Subscript s, Attribute at |
at = s.getObject() and at.getAttr() = "FILES" and source.asExpr() = s
)
or
exists(Node obj, AttrRead ar |
ar.getAMethodCall("get").flowsTo(source) and
ar.accesses(obj, "FILES")
)
or
exists(Node obj, AttrRead ar |
ar.getAMethodCall("getlist").flowsTo(source) and
ar.accesses(obj, "FILES")
)
}
override predicate isSink(DataFlow::Node sink) {
// A sink capturing method calls to `unpack_archive`.
sink = API::moduleImport("shutil").getMember("unpack_archive").getACall().getArg(0)
}
override predicate isAdditionalTaintStep(DataFlow::Node nodeFrom, DataFlow::Node nodeTo) {
// Writing the response data to the archive
exists(Stdlib::FileLikeObject::InstanceSource is, Node f, MethodCallNode mc |
is.flowsTo(f) and
mc.getMethodName() = "write" and
f = mc.getObject() and
nodeFrom = mc.getArg(0) and
nodeTo = is.(CallCfgNode).getArg(0)
)
or
// Copying the response data to the archive
exists(Stdlib::FileLikeObject::InstanceSource is, Node f, MethodCallNode mc |
is.flowsTo(f) and
mc = API::moduleImport("shutil").getMember("copyfileobj").getACall() and
f = mc.getArg(1) and
nodeFrom = mc.getArg(0) and
nodeTo = is.(CallCfgNode).getArg(0)
)
or
// Reading the response
exists(MethodCallNode mc |
nodeFrom = mc.getObject() and
mc.getMethodName() = "read" and
mc.flowsTo(nodeTo)
)
or
// Accessing the name or raw content
exists(AttrRead ar | ar.accesses(nodeFrom, ["name", "raw"]) and ar.flowsTo(nodeTo))
or
//Use of join of filename
exists(API::CallNode mcn |
mcn = API::moduleImport("os").getMember("path").getMember("join").getACall() and
nodeFrom = mcn.getArg(1) and
mcn.flowsTo(nodeTo)
)
or
// Read by chunks
exists(MethodCallNode mc |
nodeFrom = mc.getObject() and mc.getMethodName() = "chunks" and mc.flowsTo(nodeTo)
)
or
// Considering the use of closing()
exists(API::CallNode closing |
closing = API::moduleImport("contextlib").getMember("closing").getACall() and
closing.flowsTo(nodeTo) and
nodeFrom = closing.getArg(0)
)
or
// Considering the use of "fs"
exists(API::CallNode fs, MethodCallNode mcn |
fs =
API::moduleImport("django")
.getMember("core")
.getMember("files")
.getMember("storage")
.getMember("FileSystemStorage")
.getACall() and
fs.flowsTo(mcn.getObject()) and
mcn.getMethodName() = ["save", "path"] and
nodeFrom = mcn.getArg(0) and
nodeTo = mcn
)
}
}
from UnsafeUnpackingConfig config, DataFlow::PathNode source, DataFlow::PathNode sink
where config.hasFlowPath(source, sink)
select sink.getNode(), source, sink,
"Unsafe extraction from a malicious tarball retrieved from a remote location."