Files
aws-localstack/deploy/quanto/api/app.py
T

329 lines
10 KiB
Python

import json
import os
from datetime import datetime, timezone
import boto3
from botocore.config import Config
from flask import Flask, jsonify
app = Flask(__name__)
LOCALSTACK_ENDPOINT = os.environ.get("LOCALSTACK_ENDPOINT", "https://localstack.paulononato.com.br")
AWS_REGION = os.environ.get("AWS_DEFAULT_REGION", "us-east-1")
ENVIRONMENT = os.environ.get("QUANTUM_ENV", "dev")
PROJECT_NAME = os.environ.get("PROJECT_NAME", "quantum")
NAME_PREFIX = f"{PROJECT_NAME}-{ENVIRONMENT}"
BUCKET_NAME = os.environ.get("QUANTUM_BUCKET_NAME", f"{NAME_PREFIX}-artifacts")
QUEUE_NAME = os.environ.get("QUANTUM_QUEUE_NAME", f"{NAME_PREFIX}-events")
DLQ_NAME = os.environ.get("QUANTUM_DLQ_NAME", f"{NAME_PREFIX}-events-dlq")
LAMBDA_NAME = os.environ.get("QUANTUM_LAMBDA_NAME", f"{NAME_PREFIX}-processor")
ROLE_NAME = os.environ.get("QUANTUM_ROLE_NAME", f"{NAME_PREFIX}-lambda-role")
SECRET_NAME = os.environ.get("QUANTUM_SECRET_NAME", f"{NAME_PREFIX}/app")
LOG_GROUP_NAME = os.environ.get("QUANTUM_LOG_GROUP_NAME", f"/aws/lambda/{LAMBDA_NAME}")
AWS_CONFIG = Config(
region_name=AWS_REGION,
retries={"max_attempts": 2, "mode": "standard"},
)
def client(service_name):
return boto3.client(
service_name,
endpoint_url=LOCALSTACK_ENDPOINT,
region_name=AWS_REGION,
aws_access_key_id=os.environ.get("AWS_ACCESS_KEY_ID", "test"),
aws_secret_access_key=os.environ.get("AWS_SECRET_ACCESS_KEY", "test"),
config=AWS_CONFIG,
)
def ok(name, evidence):
return {"name": name, "status": "ok", "evidence": evidence}
def failed(name, error):
return {"name": name, "status": "error", "error": str(error)}
def queue_url(name):
return client("sqs").get_queue_url(QueueName=name)["QueueUrl"]
@app.get("/api/health")
def health():
checks = []
try:
identity = client("sts").get_caller_identity()
checks.append(ok("sts", {"account": identity.get("Account"), "arn": identity.get("Arn")}))
except Exception as exc:
checks.append(failed("sts", exc))
try:
buckets = [bucket["Name"] for bucket in client("s3").list_buckets().get("Buckets", [])]
checks.append(ok("s3", {"bucket": BUCKET_NAME, "exists": BUCKET_NAME in buckets}))
except Exception as exc:
checks.append(failed("s3", exc))
try:
queues = client("sqs").list_queues().get("QueueUrls", [])
checks.append(
ok(
"sqs",
{
"queue": QUEUE_NAME,
"dlq": DLQ_NAME,
"queueFound": any(url.endswith(f"/{QUEUE_NAME}") for url in queues),
"dlqFound": any(url.endswith(f"/{DLQ_NAME}") for url in queues),
},
)
)
except Exception as exc:
checks.append(failed("sqs", exc))
try:
function_names = [
function["FunctionName"]
for function in client("lambda").list_functions().get("Functions", [])
]
checks.append(ok("lambda", {"function": LAMBDA_NAME, "exists": LAMBDA_NAME in function_names}))
except Exception as exc:
checks.append(failed("lambda", exc))
try:
role = client("iam").get_role(RoleName=ROLE_NAME)["Role"]
checks.append(ok("iam", {"role": role.get("RoleName"), "arn": role.get("Arn")}))
except Exception as exc:
checks.append(failed("iam", exc))
try:
secret = client("secretsmanager").describe_secret(SecretId=SECRET_NAME)
checks.append(ok("secretsmanager", {"secret": secret.get("Name"), "arn": secret.get("ARN")}))
except Exception as exc:
checks.append(failed("secretsmanager", exc))
try:
groups = client("logs").describe_log_groups(logGroupNamePrefix=LOG_GROUP_NAME).get("logGroups", [])
checks.append(ok("cloudwatch-logs", {"logGroup": LOG_GROUP_NAME, "exists": len(groups) > 0}))
except Exception as exc:
checks.append(failed("cloudwatch-logs", exc))
return jsonify(
{
"application": "Quantum",
"environment": ENVIRONMENT,
"localstackEndpoint": LOCALSTACK_ENDPOINT,
"generatedAt": datetime.now(timezone.utc).isoformat(),
"checks": checks,
}
)
@app.post("/api/s3/marker")
def create_s3_marker():
key = f"frontend-evidence/{datetime.now(timezone.utc).strftime('%Y%m%dT%H%M%SZ')}.json"
body = {
"application": "Quantum",
"environment": ENVIRONMENT,
"source": "quanto-api",
"createdAt": datetime.now(timezone.utc).isoformat(),
}
client("s3").put_object(
Bucket=BUCKET_NAME,
Key=key,
Body=json.dumps(body, indent=2).encode("utf-8"),
ContentType="application/json",
)
return jsonify(ok("s3-marker", {"bucket": BUCKET_NAME, "key": key}))
@app.post("/api/sqs/message")
def send_sqs_message():
response = client("sqs").send_message(
QueueUrl=queue_url(QUEUE_NAME),
MessageBody=json.dumps(
{
"event": "quantum.frontend.evidence",
"environment": ENVIRONMENT,
"createdAt": datetime.now(timezone.utc).isoformat(),
}
),
)
return jsonify(ok("sqs-message", {"queue": QUEUE_NAME, "messageId": response.get("MessageId")}))
@app.get("/api/s3/objects")
def list_s3_objects():
response = client("s3").list_objects_v2(Bucket=BUCKET_NAME)
objects = [
{
"key": item.get("Key"),
"size": item.get("Size"),
"lastModified": item.get("LastModified").isoformat() if item.get("LastModified") else None,
"etag": item.get("ETag"),
}
for item in response.get("Contents", [])
]
return jsonify(ok("s3-objects", {"bucket": BUCKET_NAME, "objectCount": len(objects), "objects": objects}))
@app.get("/api/sqs/details")
def sqs_details():
sqs = client("sqs")
main_url = queue_url(QUEUE_NAME)
dlq_url = queue_url(DLQ_NAME)
attributes = [
"ApproximateNumberOfMessages",
"ApproximateNumberOfMessagesNotVisible",
"ApproximateNumberOfMessagesDelayed",
"CreatedTimestamp",
"LastModifiedTimestamp",
"VisibilityTimeout",
]
main_attrs = sqs.get_queue_attributes(QueueUrl=main_url, AttributeNames=attributes)["Attributes"]
dlq_attrs = sqs.get_queue_attributes(QueueUrl=dlq_url, AttributeNames=attributes)["Attributes"]
messages = sqs.receive_message(
QueueUrl=main_url,
MaxNumberOfMessages=5,
VisibilityTimeout=0,
WaitTimeSeconds=0,
AttributeNames=["All"],
).get("Messages", [])
visible_messages = [
{
"messageId": message.get("MessageId"),
"body": message.get("Body"),
"attributes": message.get("Attributes", {}),
}
for message in messages
]
return jsonify(
ok(
"sqs-details",
{
"queue": {"name": QUEUE_NAME, "url": main_url, "attributes": main_attrs},
"dlq": {"name": DLQ_NAME, "url": dlq_url, "attributes": dlq_attrs},
"visibleMessagesSample": visible_messages,
},
)
)
@app.get("/api/lambda/details")
def lambda_details():
lambda_client = client("lambda")
function = lambda_client.get_function(FunctionName=LAMBDA_NAME)
configuration = function.get("Configuration", {})
mappings = lambda_client.list_event_source_mappings(FunctionName=LAMBDA_NAME).get("EventSourceMappings", [])
return jsonify(
ok(
"lambda-details",
{
"functionName": configuration.get("FunctionName"),
"runtime": configuration.get("Runtime"),
"handler": configuration.get("Handler"),
"state": configuration.get("State"),
"lastModified": configuration.get("LastModified"),
"role": configuration.get("Role"),
"environment": configuration.get("Environment", {}).get("Variables", {}),
"eventSourceMappings": [
{
"uuid": mapping.get("UUID"),
"state": mapping.get("State"),
"batchSize": mapping.get("BatchSize"),
"eventSourceArn": mapping.get("EventSourceArn"),
}
for mapping in mappings
],
},
)
)
@app.get("/api/iam/details")
def iam_details():
iam = client("iam")
role = iam.get_role(RoleName=ROLE_NAME)["Role"]
policies = iam.list_attached_role_policies(RoleName=ROLE_NAME).get("AttachedPolicies", [])
return jsonify(ok("iam-details", {"role": role, "attachedPolicies": policies}))
@app.get("/api/secrets/details")
def secrets_details():
secrets = client("secretsmanager")
description = secrets.describe_secret(SecretId=SECRET_NAME)
value = secrets.get_secret_value(SecretId=SECRET_NAME)
return jsonify(
ok(
"secrets-details",
{
"name": description.get("Name"),
"arn": description.get("ARN"),
"versionIds": list(description.get("VersionIdsToStages", {}).keys()),
"secretString": value.get("SecretString"),
},
)
)
@app.get("/api/logs/details")
def log_details():
logs_client = client("logs")
groups = logs_client.describe_log_groups(logGroupNamePrefix=LOG_GROUP_NAME).get("logGroups", [])
streams = logs_client.describe_log_streams(
logGroupName=LOG_GROUP_NAME,
orderBy="LastEventTime",
descending=True,
limit=5,
).get("logStreams", []) if groups else []
return jsonify(
ok(
"cloudwatch-logs-details",
{
"logGroup": LOG_GROUP_NAME,
"groups": groups,
"streams": streams,
},
)
)
@app.post("/api/lambda/invoke")
def invoke_lambda():
response = client("lambda").invoke(
FunctionName=LAMBDA_NAME,
InvocationType="DryRun",
Payload=b"{}",
)
return jsonify(
ok(
"lambda-dry-run",
{
"function": LAMBDA_NAME,
"statusCode": response.get("StatusCode"),
"meaning": "The Lambda function exists and accepts an invocation request.",
},
)
)
@app.get("/api/logs")
def logs():
groups = client("logs").describe_log_groups(logGroupNamePrefix=LOG_GROUP_NAME).get("logGroups", [])
return jsonify(ok("cloudwatch-logs", {"logGroup": LOG_GROUP_NAME, "groups": groups}))