This class is critical to the creation of least privilege policies.
It uses the SIDs as namespaces. The namespaces follow this format:
{Servicename}{Accesslevel}{Resourcetypename}
So, a resulting statement's SID might look like 'S3ListBucket'
If a condition key is supplied (like s3:RequestJob
), the SID string will be significantly longer.
It will resemble this format:
{Servicename}{Accesslevel}{Resourcetypename}{Conditionkeystring}{Conditiontypestring}{Conditionkeyvalue}
For example: EC2 write actions on the security-group resource, using the following condition map:
"Condition": {
"StringEquals": {"ec2:ResourceTag/Owner": "${aws:username}"}
}
The resulting SID would be:
Ec2WriteSecuritygroupResourcetagownerStringequalsAwsusername
Or, for actions that support wildcard ARNs only, an example could be:
Ec2WriteMultResourcetagownerStringequalsAwsusername
add_action_without_resource_constraint(self, action, sid_namespace='MultMultNone')
This handles the cases where certain actions do not handle resource constraints - either by AWS, or for
flexibility when adding dependent actions.
Parameters:
Name |
Type |
Description |
Default |
action |
|
The single action to add to the SID namespace. For instance, s3:ListAllMyBuckets
|
required |
sid_namespace |
|
MultMultNone by default. Other valid option is "SkipResourceConstraints"
|
'MultMultNone' |
Source code in policy_sentry/writing/sid_group.py
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
|
def add_action_without_resource_constraint(
self, action, sid_namespace="MultMultNone"
):
"""
This handles the cases where certain actions do not handle resource constraints - either by AWS, or for
flexibility when adding dependent actions.
Arguments:
action: The single action to add to the SID namespace. For instance, s3:ListAllMyBuckets
sid_namespace: MultMultNone by default. Other valid option is "SkipResourceConstraints"
"""
if sid_namespace == "SkipResourceConstraints":
temp_sid_dict = {
"arn": ["*"],
"service": "Skip",
"access_level": "ResourceConstraints",
"arn_format": "*",
"actions": [action],
}
elif sid_namespace == "MultMultNone":
temp_sid_dict = {
"arn": ["*"],
"service": "Mult",
"access_level": "Mult",
"arn_format": "*",
"actions": [action],
}
else:
raise Exception(
"Please specify the sid_namespace as either 'SkipResourceConstraints' or "
"'MultMultNone'."
)
if isinstance(action, str):
if sid_namespace in self.sids.keys():
if action not in self.sids[sid_namespace]["actions"]:
self.sids[sid_namespace]["actions"].append(action)
else:
self.sids[sid_namespace] = temp_sid_dict
else:
raise Exception("Please provide the action as a string, not a list.")
return self.sids
|
add_by_arn_and_access_level(self, arn_list, access_level, conditions_block=None)
This adds the user-supplied ARN(s), service prefixes, access levels, and condition keys (if applicable) given
by the user. It derives the list of IAM actions based on the user's requested ARNs and access levels.
Parameters:
Name |
Type |
Description |
Default |
arn_list |
|
Just a list of resource ARNs.
|
required |
access_level |
|
"Read", "List", "Tagging", "Write", or "Permissions management"
|
required |
conditions_block |
|
Optionally, a condition block with one or more conditions
|
None |
Source code in policy_sentry/writing/sid_group.py
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
|
def add_by_arn_and_access_level(
self, arn_list, access_level, conditions_block=None
):
"""
This adds the user-supplied ARN(s), service prefixes, access levels, and condition keys (if applicable) given
by the user. It derives the list of IAM actions based on the user's requested ARNs and access levels.
Arguments:
arn_list: Just a list of resource ARNs.
access_level: "Read", "List", "Tagging", "Write", or "Permissions management"
conditions_block: Optionally, a condition block with one or more conditions
"""
for arn in arn_list:
service_prefix = get_service_from_arn(arn)
service_action_data = get_action_data(service_prefix, "*")
for service_prefix in service_action_data:
for row in service_action_data[service_prefix]:
if (
does_arn_match(arn, row["resource_arn_format"])
and row["access_level"] == access_level
):
raw_arn_format = row["resource_arn_format"]
resource_type_name = get_resource_type_name_with_raw_arn(
raw_arn_format
)
sid_namespace = create_policy_sid_namespace(
service_prefix, access_level, resource_type_name
)
actions = get_actions_with_arn_type_and_access_level(
service_prefix, resource_type_name, access_level
)
# Make supplied actions lowercase
# supplied_actions = [x.lower() for x in actions]
supplied_actions = actions.copy()
dependent_actions = get_dependent_actions(supplied_actions)
# List comprehension to get all dependent actions that are not in the supplied actions.
dependent_actions = [
x for x in dependent_actions if x not in supplied_actions
]
if len(dependent_actions) > 0:
for dep_action in dependent_actions:
self.add_action_without_resource_constraint(dep_action)
# self.add_action_without_resource_constraint(
# str.lower(dep_action)
# )
temp_sid_dict = {
"arn": [arn],
"service": service_prefix,
"access_level": access_level,
"arn_format": raw_arn_format,
"actions": actions,
"conditions": [], # TODO: Add conditions
}
if sid_namespace in self.sids.keys():
# If the ARN already exists there, skip it.
if arn not in self.sids[sid_namespace]["arn"]:
self.sids[sid_namespace]["arn"].append(arn)
# If it did not exist before at all, create it.
else:
self.sids[sid_namespace] = temp_sid_dict
|
add_by_list_of_actions(self, supplied_actions)
Takes a list of actions, queries the database for corresponding arns, adds them to the object.
Parameters:
Name |
Type |
Description |
Default |
supplied_actions |
|
A list of supplied actions
|
required |
Source code in policy_sentry/writing/sid_group.py
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
|
def add_by_list_of_actions(self, supplied_actions):
"""
Takes a list of actions, queries the database for corresponding arns, adds them to the object.
Arguments:
supplied_actions: A list of supplied actions
"""
# actions_list = get_dependent_actions(supplied_actions)
dependent_actions = get_dependent_actions(supplied_actions)
dependent_actions = [x for x in dependent_actions if x not in supplied_actions]
logger.debug("Adding by list of actions")
logger.debug(f"Supplied actions: {str(supplied_actions)}")
logger.debug(f"Dependent actions: {str(dependent_actions)}")
arns_matching_supplied_actions = []
# arns_matching_supplied_actions is a list of dicts.
# It must do this rather than dictionaries because there will be duplicate
# values by nature of how the entries in the IAM database are structured.
# I'll provide the example values here to improve readability.
for action in supplied_actions:
service_name, action_name = action.split(":")
action_data = get_action_data(service_name, action_name)
for row in action_data[service_name]:
if row["resource_arn_format"] not in arns_matching_supplied_actions:
arns_matching_supplied_actions.append(
{
"resource_arn_format": row["resource_arn_format"],
"access_level": row["access_level"],
"action": row["action"],
}
)
# Identify the actions that do not support resource constraints
# If that's the case, add it to the wildcard namespace. Otherwise, don't add it.
actions_without_resource_constraints = []
for item in arns_matching_supplied_actions:
if item["resource_arn_format"] != "*":
self.add_by_arn_and_access_level(
[item["resource_arn_format"]], item["access_level"]
)
else:
actions_without_resource_constraints.append(item["action"])
# If there are any dependent actions, we need to add them without resource constraints.
# Otherwise, we get into issues where the amount of extra SIDs will balloon.
# Also, the user has no way of knowing what those dependent actions are beforehand.
# TODO: This is, in fact, a great opportunity to introduce conditions. But we aren't there yet.
if len(dependent_actions) > 0:
for dep_action in dependent_actions:
self.add_action_without_resource_constraint(dep_action)
# self.add_action_without_resource_constraint(str.lower(dep_action))
# Now, because add_by_arn_and_access_level() adds all actions under an access level, we have to
# remove all actions that do not match the supplied_actions. This is done in-place.
logger.debug(
"Purging actions that do not match the requested actions and dependent actions"
)
logger.debug(f"Supplied actions: {str(supplied_actions)}")
logger.debug(f"Dependent actions: {str(dependent_actions)}")
self.remove_actions_not_matching_these(supplied_actions + dependent_actions)
for action in actions_without_resource_constraints:
logger.debug(
f"Deliberately adding the action {action} without resource constraints"
)
self.add_action_without_resource_constraint(action)
logger.debug(
"Removing actions that are in the wildcard arn (Resources = '*') as well as other statements that have "
"resource constraints "
)
self.remove_actions_duplicated_in_wildcard_arn()
logger.debug("Getting the rendered policy")
rendered_policy = self.get_rendered_policy()
return rendered_policy
|
add_exclude_actions(self, exclude_actions)
To exclude actions from the output
Source code in policy_sentry/writing/sid_group.py
|
def add_exclude_actions(self, exclude_actions):
"""To exclude actions from the output"""
if exclude_actions:
expanded_actions = determine_actions_to_expand(exclude_actions)
self.exclude_actions = [x.lower() for x in expanded_actions]
else:
self.exclude_actions = []
|
add_requested_service_wide(self, service_prefixes, access_level)
When a user requests all wildcard-only actions available under a service at a specific access level
Parameters:
Name |
Type |
Description |
Default |
service_prefixes |
|
A list of service prefixes
|
required |
access_level |
|
The requested access level
|
required |
Source code in policy_sentry/writing/sid_group.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
|
def add_requested_service_wide(self, service_prefixes, access_level):
"""
When a user requests all wildcard-only actions available under a service at a specific access level
Arguments:
service_prefixes: A list of service prefixes
access_level: The requested access level
"""
if access_level == "Read":
self.wildcard_only_service_read = service_prefixes
elif access_level == "Write":
self.wildcard_only_service_write = service_prefixes
elif access_level == "List":
self.wildcard_only_service_list = service_prefixes
elif access_level == "Tagging":
self.wildcard_only_service_tagging = service_prefixes
elif access_level == "Permissions management":
self.wildcard_only_service_permissions_management = service_prefixes
|
add_skip_resource_constraints(self, skip_resource_constraints_actions)
To override resource constraint requirements - i.e., instead of restricting s3:PutObject
to a path and
allowing s3:PutObject
to *
resources, put s3:GetObject
here.
Source code in policy_sentry/writing/sid_group.py
99
100
101
102
103
104
105
106
107
108
109
|
def add_skip_resource_constraints(self, skip_resource_constraints_actions):
"""
To override resource constraint requirements - i.e., instead of restricting `s3:PutObject` to a path and
allowing `s3:PutObject` to `*` resources, put `s3:GetObject` here.
"""
if isinstance(skip_resource_constraints_actions, list):
self.skip_resource_constraints.extend(skip_resource_constraints_actions)
elif isinstance(skip_resource_constraints_actions, str):
self.skip_resource_constraints.append([skip_resource_constraints_actions])
else:
raise Exception("Please provide 'skip_resource_constraints' as a list of IAM actions.")
|
add_wildcard_only_actions(self, provided_wildcard_actions)
Given a list of IAM actions, add individual IAM Actions that do not support resource constraints to the MultMultNone SID
Parameters:
Name |
Type |
Description |
Default |
provided_wildcard_actions |
|
list actions provided by the user.
|
required |
Source code in policy_sentry/writing/sid_group.py
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
|
def add_wildcard_only_actions(self, provided_wildcard_actions):
"""
Given a list of IAM actions, add individual IAM Actions that do not support resource constraints to the MultMultNone SID
Arguments:
provided_wildcard_actions: list actions provided by the user.
"""
if isinstance(provided_wildcard_actions, list):
verified_wildcard_actions = remove_actions_that_are_not_wildcard_arn_only(
provided_wildcard_actions
)
if len(verified_wildcard_actions) > 0:
logger.debug(
"Attempting to add the following actions to the policy: %s",
verified_wildcard_actions,
)
self.add_by_list_of_actions(verified_wildcard_actions)
logger.debug(
"Added the following wildcard-only actions to the policy: %s",
verified_wildcard_actions,
)
|
add_wildcard_only_actions_matching_services_and_access_level(self, services, access_level)
Parameters:
Name |
Type |
Description |
Default |
services |
|
A list of AWS services
|
required |
access_level |
|
An access level as it is written in the database, such as 'Read', 'Write', 'List', 'Permissions management', or 'Tagging'
|
required |
Source code in policy_sentry/writing/sid_group.py
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
|
def add_wildcard_only_actions_matching_services_and_access_level(
self, services, access_level
):
"""
Arguments:
services: A list of AWS services
access_level: An access level as it is written in the database, such as 'Read', 'Write', 'List', 'Permissions management', or 'Tagging'
"""
wildcard_only_actions_to_add = []
for service in services:
actions = get_actions_at_access_level_that_support_wildcard_arns_only(
service, access_level
)
wildcard_only_actions_to_add.extend(actions)
self.add_wildcard_only_actions(wildcard_only_actions_to_add)
|
get_rendered_policy(self, minimize=None)
Get the JSON rendered policy
Parameters:
Name |
Type |
Description |
Default |
minimize |
|
Reduce the character count of policies without creating overlap with other action names
|
None |
Returns:
Type |
Description |
Dictionary |
The IAM Policy JSON
|
Source code in policy_sentry/writing/sid_group.py
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
|
def get_rendered_policy(self, minimize=None):
"""
Get the JSON rendered policy
Arguments:
minimize: Reduce the character count of policies without creating overlap with other action names
Returns:
Dictionary: The IAM Policy JSON
"""
statements = []
# Only set the actions to lowercase if minimize is provided
all_actions = get_all_actions(lowercase=True)
# render the policy
for sid in self.sids:
temp_actions = self.sids[sid]["actions"]
if len(temp_actions) == 0:
logger.debug(f"No actions for sid {sid}")
continue
actions = []
if self.exclude_actions:
for temp_action in temp_actions:
if temp_action.lower() in self.exclude_actions:
logger.debug(f"\tExcluded action: {temp_action}")
else:
if temp_action not in actions:
actions.append(temp_action)
else:
actions = temp_actions
# temp_actions.clear()
if minimize is not None and isinstance(minimize, int):
logger.debug("Minimizing statements...")
actions = minimize_statement_actions(
actions, all_actions, minchars=minimize
)
logger.debug(f"Adding statement with SID {sid}")
logger.debug(f"{sid} SID has the actions: {actions}")
logger.debug(f"{sid} SID has the resources: {self.sids[sid]['arn']}")
statements.append(
{
"Sid": sid,
"Effect": "Allow",
"Action": actions,
"Resource": self.sids[sid]["arn"],
}
)
policy = {"Version": POLICY_LANGUAGE_VERSION, "Statement": statements}
return policy
|
get_sid(self, sid)
Get a single group by the SID identifier
Source code in policy_sentry/writing/sid_group.py
|
def get_sid(self, sid):
"""Get a single group by the SID identifier"""
if self.sids[sid]:
return self.sids[sid]
else:
raise Exception(f"No SID with the value of {sid}")
|
get_sid_group(self)
Get the whole SID group as JSON
Source code in policy_sentry/writing/sid_group.py
|
def get_sid_group(self):
"""
Get the whole SID group as JSON
"""
return self.sids
|
list_sids(self)
Get a list of all of the SIDs by their identifiers
Returns:
Type |
Description |
List |
A list of SIDs in the SID group
|
Source code in policy_sentry/writing/sid_group.py
|
def list_sids(self):
"""
Get a list of all of the SIDs by their identifiers
Returns:
List: A list of SIDs in the SID group
"""
return self.sids.keys()
|
process_template(self, cfg, minimize=None)
Process the Policy Sentry template as a dict. This auto-detects whether or not the file is in CRUD mode or
Actions mode.
Parameters:
Name |
Type |
Description |
Default |
cfg |
|
The loaded YAML as a dict. Must follow Policy Sentry dictated format.
|
required |
minimize |
|
Minimize the resulting statement with safe usage of wildcards to reduce policy length. Set this to the character length you want - for example, 0, or 4. Defaults to none.
|
None |
Returns:
Type |
Description |
Dictionary |
The rendered IAM JSON Policy
|
Source code in policy_sentry/writing/sid_group.py
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
|
def process_template(self, cfg, minimize=None):
"""
Process the Policy Sentry template as a dict. This auto-detects whether or not the file is in CRUD mode or
Actions mode.
Arguments:
cfg: The loaded YAML as a dict. Must follow Policy Sentry dictated format.
minimize: Minimize the resulting statement with *safe* usage of wildcards to reduce policy length. Set this to the character length you want - for example, 0, or 4. Defaults to none.
Returns:
Dictionary: The rendered IAM JSON Policy
"""
if "mode" in cfg.keys():
if cfg["mode"] == "crud":
logger.debug("CRUD mode selected")
check_crud_schema(cfg)
if "exclude-actions" in cfg:
if cfg["exclude-actions"]:
if cfg["exclude-actions"][0] != "":
self.add_exclude_actions(cfg["exclude-actions"])
if "wildcard-only" in cfg.keys():
if "single-actions" in cfg["wildcard-only"]:
if cfg["wildcard-only"]["single-actions"]:
if cfg["wildcard-only"]["single-actions"][0] != "":
provided_wildcard_actions = cfg["wildcard-only"][
"single-actions"
]
logger.debug(
f"Requested wildcard-only actions: {str(provided_wildcard_actions)}"
)
self.wildcard_only_single_actions = provided_wildcard_actions
if "service-read" in cfg["wildcard-only"]:
if cfg["wildcard-only"]["service-read"]:
if cfg["wildcard-only"]["service-read"][0] != "":
service_read = cfg["wildcard-only"]["service-read"]
logger.debug(
f"Requested wildcard-only actions: {str(service_read)}"
)
self.wildcard_only_service_read = service_read
if "service-write" in cfg["wildcard-only"]:
if cfg["wildcard-only"]["service-write"]:
if cfg["wildcard-only"]["service-write"][0] != "":
service_write = cfg["wildcard-only"]["service-write"]
logger.debug(
f"Requested wildcard-only actions: {str(service_write)}"
)
self.wildcard_only_service_write = service_write
if "service-list" in cfg["wildcard-only"]:
if cfg["wildcard-only"]["service-list"]:
if cfg["wildcard-only"]["service-list"][0] != "":
service_list = cfg["wildcard-only"]["service-list"]
logger.debug(
f"Requested wildcard-only actions: {str(service_list)}"
)
self.wildcard_only_service_list = service_list
if "service-tagging" in cfg["wildcard-only"]:
if cfg["wildcard-only"]["service-tagging"]:
if cfg["wildcard-only"]["service-tagging"][0] != "":
service_tagging = cfg["wildcard-only"][
"service-tagging"
]
logger.debug(
f"Requested wildcard-only actions: {str(service_tagging)}"
)
self.wildcard_only_service_tagging = service_tagging
if "service-permissions-management" in cfg["wildcard-only"]:
if cfg["wildcard-only"]["service-permissions-management"]:
if (
cfg["wildcard-only"]["service-permissions-management"][
0
]
!= ""
):
service_permissions_management = cfg["wildcard-only"][
"service-permissions-management"
]
logger.debug(
f"Requested wildcard-only actions: {str(service_permissions_management)}"
)
self.wildcard_only_service_permissions_management = service_permissions_management
self.process_wildcard_only_actions()
if "read" in cfg.keys():
if cfg["read"] is not None and cfg["read"][0] != "":
logger.debug(f"Requested access to arns: {str(cfg['read'])}")
self.add_by_arn_and_access_level(cfg["read"], "Read")
if "write" in cfg.keys():
if cfg["write"] is not None and cfg["write"][0] != "":
logger.debug(f"Requested access to arns: {str(cfg['write'])}")
self.add_by_arn_and_access_level(cfg["write"], "Write")
if "list" in cfg.keys():
if cfg["list"] is not None and cfg["list"][0] != "":
logger.debug(f"Requested access to arns: {str(cfg['list'])}")
self.add_by_arn_and_access_level(cfg["list"], "List")
if "permissions-management" in cfg.keys():
if (
cfg["permissions-management"] is not None
and cfg["permissions-management"][0] != ""
):
logger.debug(
f"Requested access to arns: {str(cfg['permissions-management'])}"
)
self.add_by_arn_and_access_level(
cfg["permissions-management"], "Permissions management",
)
if "tagging" in cfg.keys():
if cfg["tagging"] is not None and cfg["tagging"][0] != "":
logger.debug(f"Requested access to arns: {str(cfg['tagging'])}")
self.add_by_arn_and_access_level(cfg["tagging"], "Tagging")
if "skip-resource-constraints" in cfg.keys():
if cfg["skip-resource-constraints"]:
if cfg["skip-resource-constraints"][0] != "":
logger.debug(
f"Requested override: the actions {str(cfg['skip-resource-constraints'])} will "
f"skip resource constraints."
)
self.add_skip_resource_constraints(cfg["skip-resource-constraints"])
for skip_resource_constraints_action in self.skip_resource_constraints:
self.add_action_without_resource_constraint(
skip_resource_constraints_action, "SkipResourceConstraints"
)
if cfg["mode"] == "actions":
check_actions_schema(cfg)
if "actions" in cfg.keys():
if cfg["actions"] is not None and cfg["actions"][0] != "":
self.add_by_list_of_actions(cfg["actions"])
rendered_policy = self.get_rendered_policy(minimize)
return rendered_policy
|
process_wildcard_only_actions(self)
After (1) the list of wildcard-only single actions have been added and (2) the list of wildcard-only service-wide actions have been added, process them and store them under the proper SID.
Source code in policy_sentry/writing/sid_group.py
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
|
def process_wildcard_only_actions(self):
"""
After (1) the list of wildcard-only single actions have been added and (2) the list of wildcard-only service-wide actions have been added, process them and store them under the proper SID.
"""
provided_wildcard_actions = (
self.wildcard_only_single_actions
+ get_wildcard_only_actions_matching_services_and_access_level(self.wildcard_only_service_read, "Read")
+ get_wildcard_only_actions_matching_services_and_access_level(self.wildcard_only_service_list, "List")
+ get_wildcard_only_actions_matching_services_and_access_level(self.wildcard_only_service_write, "Write")
+ get_wildcard_only_actions_matching_services_and_access_level(self.wildcard_only_service_tagging, "Tagging")
+ get_wildcard_only_actions_matching_services_and_access_level(self.wildcard_only_service_permissions_management, "Permissions management")
)
self.add_wildcard_only_actions(
provided_wildcard_actions
)
|
remove_actions_duplicated_in_wildcard_arn(self)
Removes actions from the object that are in a resource-specific ARN, as well as the *
resource.
For example, if ssm:GetParameter
is restricted to a specific parameter path, as well as *
, then we want to
remove the *
option to force least privilege.
Source code in policy_sentry/writing/sid_group.py
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
|
def remove_actions_duplicated_in_wildcard_arn(self):
"""
Removes actions from the object that are in a resource-specific ARN, as well as the `*` resource.
For example, if `ssm:GetParameter` is restricted to a specific parameter path, as well as `*`, then we want to
remove the `*` option to force least privilege.
"""
actions_under_wildcard_resources = []
actions_under_wildcard_resources_to_nuke = []
# Build a temporary list. Contains actions in MultMultNone SID (where resources = "*")
for sid in self.sids:
if self.sids[sid]["arn_format"] == "*":
actions_under_wildcard_resources.extend(self.sids[sid]["actions"])
# If the actions under the MultMultNone SID exist under other SIDs
if len(actions_under_wildcard_resources) > 0:
for sid in self.sids:
if "*" not in self.sids[sid]["arn_format"]:
for action in actions_under_wildcard_resources:
if action in self.sids[sid]["actions"]:
if action not in self.skip_resource_constraints:
# add it to a list of actions to nuke when they are under other SIDs
actions_under_wildcard_resources_to_nuke.append(action)
# If there are actions that we need to remove from SIDs outside of MultMultNone SID
if len(actions_under_wildcard_resources_to_nuke) > 0:
for sid in self.sids:
if "*" in self.sids[sid]["arn_format"]:
for action in actions_under_wildcard_resources_to_nuke:
try:
self.sids[sid]["actions"].remove(str(action))
except BaseException: # pylint: disable=broad-except
logger.debug("Removal not successful")
|
remove_actions_not_matching_these(self, actions_to_keep)
Parameters:
Name |
Type |
Description |
Default |
actions_to_keep |
|
A list of actions to leave in the policy. All actions not in this list are removed.
|
required |
Source code in policy_sentry/writing/sid_group.py
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
|
def remove_actions_not_matching_these(self, actions_to_keep):
"""
Arguments:
actions_to_keep: A list of actions to leave in the policy. All actions not in this list are removed.
"""
actions_to_keep = get_lowercase_action_list(actions_to_keep)
actions_deleted = []
for sid in self.sids:
placeholder_actions_list = []
for action in self.sids[sid]["actions"]:
# if the action is not in the list of selected actions, don't copy it to the placeholder list
if action.lower() in actions_to_keep:
placeholder_actions_list.append(action)
elif action.lower() not in actions_to_keep:
logger.debug("%s not found in list of actions to keep: %s", action.lower(), actions_to_keep)
actions_deleted.append(action)
# Clear the list and then extend it to include the updated actions only
self.sids[sid]["actions"].clear()
self.sids[sid]["actions"].extend(placeholder_actions_list.copy())
# Highlight the actions that you remove
logger.debug("Actions deleted: %s", str(actions_deleted))
# Now that we've removed a bunch of actions, if there are SID groups without any actions,
# remove them so we don't get SIDs with empty action lists
self.remove_sids_with_empty_action_lists()
|
remove_sids_with_empty_action_lists(self)
Now that we've removed a bunch of actions, if there are SID groups without any actions, remove them so we don't get SIDs with empty action lists
Source code in policy_sentry/writing/sid_group.py
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
|
def remove_sids_with_empty_action_lists(self):
"""
Now that we've removed a bunch of actions, if there are SID groups without any actions, remove them so we don't get SIDs with empty action lists
"""
sid_namespaces_to_delete = []
for sid in self.sids:
if len(self.sids[sid]["actions"]) > 0:
pass
# If the size is zero, add it to the indexes_to_delete list.
else:
sid_namespaces_to_delete.append(sid)
# Loop through sid_namespaces_to_delete in reverse order (so we delete index
# 10 before index 8, for example)
if len(sid_namespaces_to_delete) > 0:
for i in reversed(range(len(sid_namespaces_to_delete))):
del self.sids[sid_namespaces_to_delete[i]]
|