CloudGoat - Fixing vulnerable_lambda scenario
This is a follow-up to my recent blog post CloudGoat – vulnerable_lambda with Pacu.
In my recent exploration of cloud vulnerabilities with the help of Pacu and CloudGoat by Rhino Security Labs, we were able to demonstrate how a combination of overlooked permissions and an SQL injection vulnerability in a Lambda function could lead to unauthorized privilege escalation.
Today's discussion shifts from exploits to solutions focusing on how to fix these weak points effectively. Can we fix it?
Yes, we can! We will look into specific remedial actions that could seal the previously exploited gaps, ensuring our cloud fortifications are as steadfast as they can be.
So, put on your overalls, and off we go!
Fixing the SQL Injection Vulnerability
We are going to start with the root cause, SQL injection vulnerability. The fix is pretty easy actually, the solution is called prepared SQL statements.
Here's what the original code looked like:
statement = f"select policy_name from policies where policy_name='{policy}' and public='True'"
for row in db.query(statement):
As you can see it is dirty! Why? Because it is not sanitized. The variable policy
was directly interpolated into the SQL query string. Don't do that! Never! And if you see someone doing it, you've got my permission to slap them.
An attacker could manipulate the value of policy
to alter the structure of the SQL query and execute malicious commands. Check out the last post for detailed steps.
Luckily the fix is super easy:
prepared_statement = "select policy_name from policies where policy_name=? and public='True'"
for row in db.query(prepared_statement, [policy]):
This is now a prepared SQL statement. It ensures that the policy
variable is never directly incorporated into the SQL query, thereby eliminating the possibility of SQL injection. Easy right?
The full code after making the changes would look like this:
# main.py
import boto3
from sqlite_utils import Database
db = Database("my_database.db")
iam_client = boto3.client('iam')
def handler(event, context):
target_policys = event['policy_names']
user_name = event['user_name']
print(f"target policys are : {target_policys}")
for policy in target_policys:
statement_returns_valid_policy = False
prepared_statement = "select policy_name from policies where policy_name=? and public='True'"
for row in db.query(prepared_statement, [policy]):
statement_returns_valid_policy = True
print(f"applying {row['policy_name']} to {user_name}")
response = iam_client.attach_user_policy(
UserName=user_name,
PolicyArn=f"arn:aws:iam::aws:policy/{row['policy_name']}"
)
print("result: " + str(response['ResponseMetadata']['HTTPStatusCode']))
if not statement_returns_valid_policy:
invalid_policy_statement = f"{policy} is not an approved policy, please only choose from approved " \
f"policies and don't cheat. :) "
print(invalid_policy_statement)
return invalid_policy_statement
return "All managed policies were applied as expected."
Do you want to see more?
Ok, another solution is to use Object-Relational Mapping (ORM), e.g. SQLAlchemy. They provide a higher level of abstraction over SQL databases, which means you can interact with the database in object-oriented syntax rather than raw SQL queries.
Here's how the code would look like if we were to rewrite it using SQLAlchemy:
# main.py
import boto3
import sqlalchemy as db
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import Session
Base = declarative_base()
class Policy(Base):
__tablename__ = "policies"
policy_name = db.Column(db.String, primary_key=True)
public = db.Column(db.Boolean)
def __repr__(self):
return "<Policy(policy_name='%s', public='%s')>" % (
self.policy_name,
self.public,
)
engine = db.create_engine("sqlite:///my_database.db")
session = Session(engine)
iam_client = boto3.client("iam")
def handler(event, context):
target_policys = event["policy_names"]
user_name = event["user_name"]
print(f"target policys are : {target_policys}")
for policy in target_policys:
statement_returns_valid_policy = False
rows = db.select(Policy).where(
Policy.policy_name == policy and Policy.public == True
)
for row in session.scalars(rows):
statement_returns_valid_policy = True
print(f"applying {row.policy_name} to {user_name}")
response = iam_client.attach_user_policy(
UserName=user_name,
PolicyArn=f"arn:aws:iam::aws:policy/{row.policy_name}",
)
print("result: " + str(response["ResponseMetadata"]["HTTPStatusCode"]))
if not statement_returns_valid_policy:
invalid_policy_statement = (
f"{policy} is not an approved policy, please only choose from approved "
f"policies and don't cheat. :) "
)
print(invalid_policy_statement)
return invalid_policy_statement
return "All managed policies were applied as expected."
Applying a Least Privilege Policy to the Bilbo User
Now that we have fixed the SQL injection vulnerability, would you like to secure this account even more?
Alright, Alright. Somebody is really into securing his cloud environment.
Time for the least privilege. A least privilege policy grants a user only the permissions they need to perform their job, and nothing more.
I don't know much about the application here, but I'm certain the 'bilbo' user does not need AdministratorAccess
, ever. Hell, I'm going to go even one step further and only allow that user to assume a role and nothing more.
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Deny",
"NotAction": [
"sts:AssumeRole"
],
"Resource": "*",
"Condition": {
"ArnNotLike": {
"aws:SourceArn": "arn:aws:iam::<account_id>:role/cg-lambda-invoker*"
}
}
}
]
}
With this policy attached the 'bilbo' user has an explicit deny on everything! The only exception is an assume-role action on the specific role. Least privilege. So, even when that user gets administrator access, he still would not be able to access anything. Why? Because an explicit deny in any of these policies overrides the allow.
It is like when you ask your Dad for something and he allows it, but then Mom says no, it is a no.
Let's verify the theory. First, we are going to list the attached policies user again. Still, AdministratorAccess policy is attached, perfect!
Pacu (vulnerable_lambda:bilbo) > aws iam list-attached-user-policies --user-name cg-bilbo-vulnerable_lambda_cgidsr4kzoh3iq
{
"AttachedPolicies": [
{
"PolicyName": "AdministratorAccess",
"PolicyArn": "arn:aws:iam::aws:policy/AdministratorAccess"
}
]
}
Now let's check if we can still access the secret:
Pacu (vulnerable_lambda:bilbo) > aws secretsmanager list-secrets
An error occurred (AccessDeniedException) when calling the ListSecrets operation: User: arn:aws:iam::<account_id>:user/cg-bilbo-vulnerable_lambda_cgidsr4kzoh3iq is not authorized to perform: secretsmanager:ListSecrets with an explicit deny in an identity-based policy
As you can see, he is no longer allowed to even list the secrets. Which means the policy is how it should be: Least privilege!
Conclusion
Addressing the "vulnerable_lambda" scenario in CloudGoat required us to identify the vulnerable code, eliminate the SQL injection vulnerability, and apply a least privilege policy to the 'bilbo' user.
By taking these steps, we have significantly improved the security not only for the application but also for the cloud environment.
Looking ahead, I plan to continue exploring more scenarios in CloudGoat. Each scenario presents a unique set of challenges and opportunities to learn about different types of vulnerabilities and how to mitigate them. I encourage you to join us on this journey of learning and improvement.
Don't hesitate to share your thoughts, experiences, and solutions to this scenario.