0405 | Extending Python
BeautifulSoup
- Web Scraping | the process of copying and extracting data from websites
- in hacking | to speed up the enumeration or brute forcing process
- can be difficult due to the unstructured content of web pages
- Steps of web scraping
- Step-1 | download the html for the page
- Step-2 | parse the html
- Step-3 | navigate the parse tree to extract the data you are looking for
- Install |
pip install beautifulsoup4
- Note | some websites do not like scraping
- use a different request user agent header
- limit the number of request
- include a pause or sleep
- demo-example | "beautifulsoup_demo.py" | (play around with commenting/uncommenting as you see fit)
import requests
print("-"*80)
## ----- Web Scraping -- Grabbing the DOM ----- ##
# download the scoreboard from 247ctf.com
page = requests.get("https://247ctf.com/scoreboard")
# dump the dom/text
# print(page.text)
print("-"*80)
## ----- BeautifulSoup -- Parsing ----- ##
# importing beautifulsoup
# -- basically, it converts the complex html structure into a
# -- bunch of pyhton objects, which we can query and interact with
# -- based on the structure of the html tree
from bs4 import BeautifulSoup
# use bs to parse the html
# -- 1st param -- the downloaded html
# -- 2nd param -- the parser we want to use
soup = BeautifulSoup(page.content, "html.parser")
# the raw text content of the site without displaying the elements
print(soup.text)
print("-"*80)
## ----- BeautifulSoup -- Other Functions -- Page Title ----- ##
# title of the html page
print(soup.title)
print(soup.title.name)
# the raw text string of the title
print(soup.title.string)
print("-"*80)
## ----- BeautifulSoup -- Looking for Elements -- Links----- ##
# to find html elements -- the first link
print(soup.find("a"))
# find all html links on the page
for line in soup.find_all("a"):
print(line)
# print only the href
print(line.get('href'))
print("-"*80)
## ----- BeautifulSoup -- Looking for Elements -- Other restrictions----- ##
# look for an id with a specific tag
print(soup.find(id="fetch-error"))
# look for a specific class -- include _ (class is python keyword)
print(soup.find(class_="nav-link"))
# look for a link with a specific class
print(soup.find("a", class_="nav-link"))
print("-"*80)
## ----- BeautifulSoup -- Tracking the top players----- ##
# identifying the score boad table
table = soup.find("table")
#print(table)
# extract the tbody -- table body
table_body = table.find("tbody")
# print(table_body)
# one row per each user -- tr:table-row
rows = table_body.find_all("tr")
for row in rows:
print("-"*5)
# print(row)
# 6 columns per row
# cols = [x for x in row.find_all("td")]
# look only for the text -- strip out the noise
cols = [x.text.strip() for x in row.find_all("td")]
# print(cols)
print("{} is in {} place with {} points".format(cols[2], cols[0], cols[4]))
print("-"*80)
## ----- END ----- ##
Py2exe
- Notes
- a limitation of our scripts so far is that they need a python interpreter in order to run
- and depending on what we are doing, not just the basic interpreter either, but a bunch of other modules too
- such as those we installed with pip
py2exe
library- turns python programs into packages that can be run on other windows computers without needing to first install python on those computers
- to create executable files which can be run on windows systems without a python installation
- in the background | it just extends the module
distutils
with a new command- a module that installs something
- usually a python module
- Install |
pip install py2exe
- Requirement |
python3.11
- Requirement |
- Personal Note
- sadly will not install properly
- a separate script is needed which tells
py2exe
how to bundle up our script as an executable
- demo-example | "py2exe_demo.py"
print("hacked")
- demo-example | "demo_setup.py" | (play around with commenting/uncommenting as you see fit)
# reference setup from diskutils core
from distutils.core import setup
import py2exe
# call setup and tell it the entry point that we want -- our hack script
# -- will create a new directory `dist`
# -- this distribution folder contains the executable and all the files needed
# -- to run the executable -- run it -- `cd dist` and then `py2exe_demo.exe`
# setup(console=['py2exe_demo.py'])
# passing in parameters -- the new `dist` folder is much smaller
# -- bundle_files: will bundle everything including the python interpreter
# -- compressed:
# -- zipfile:None --> the files will be bundled within the executable
setup(
options = {'py2exe' {'bundle_files': 1, 'compressed': True}},
console=[{'script':'py2exe_demo.py'}],
zipfile = None
)
Sockets
- General | a socket is a bidirectional communication channel
- sockets can communicate within a process, between processes, or between different systems on a network
- Here | focusing on tcp-connection oriented networking sockets
- but the same library can be used for udp too
- part of the standard python library
- putty | to verify socket connections
- terminal emulator; serial console; network file transfer application
- supports raw socket connections
- Download and Install
- url | https://putty.org/ > "Alternative binary files" >
"the SSH and Telnet client itself"/64-bit x86
>putty.exe
| (standalone version)
- url | https://putty.org/ > "Alternative binary files" >
- wireshark | to see and verify the connections
- a packet analyzer
- used for network troubleshooting and analysis
- Download and Install
- url | https://www.wireshark.org/ > "Windows x64 Installer"
- demo-example | "sockets_demo.py" | (play around with commenting/uncommenting as you see fit)
import socket
print("-"*80)
## ----- Socket ----- ##
# identify the ip address of a server
ip = socket.gethostbyname('247ctf.com')
print(ip)
# create a socket -- specify parameters for the type of socket
# -- AF_INET: transport mechanism -- IPv4
# -- SOCK_STREAM: for connection oriented protocol -- TCP
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# NOTE -- options with socket
# -- binding the socket to a port and listening on that port
# -- use the socket to make an outbound connection to an other system
print("-"*80)
## ----- Socket -- Outbound connection ----- ##
# a host port pair is required --
# -- host: ipv4 address or hostname -- string
# -- port: int
s.connect(("247ctf.com", 80))
# sending and receiving data via the socket
# sending data -- a head request -- a get request without a msg body
s.sendall(b"HEAD / HTTP/1.1\r\nHost: 247ctf.com\r\n\r\n")
# check the response -- 1024 bytes -- the max amount of data it will receive
# at once
print(s.recv(1024).decode())
# close the socket
s.close()
print("-"*80)
## ----- Socket -- Binding to a Port and Listening ----- ##
# NOTE
# -- depending on the used protocol
# -- you will typically need to have both the server and the client component
# -- server --> waiting for connections
# -- client --> initiating the connections
# adding both the client and the server
client = False
server = False
# secify the port for the socket
port = 8080
# create the socket
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# create the server -- listening for connections
# server = True
if server:
# bind the socket to the localhost to a local port
s.bind(("127.0.0.1", port))
# listen to connections
s.listen()
# verify listening with netstat -- ``netstat -an`
# accepting connections in the socket
while True:
# accept: accept a connection by blocking and waiting
# for incoming connections
# -- return values -- pair of a new socket object
# -- usable to send and receive data to the other end of the connection
# -- the client connecting to the server
connect, addr = s.accept()
connect.send(b"You made it to the socket!")
connect.close()
# use paddy to connect to the socket and verify if the server is
# working correctly
# configuration:
# -- 127.0.0.1 -- 8080 -- Connection type:Other Raw
# -- Close window on exit: Never
# print("-"*80)
## ----- Socket -- Client mode ----- ##
# NOTE -- running the client
# -- only works if server is already running
# NOTE -- verifying it with wireshark
# -- use the adapter for loopback traffic capture
# -- start capture -- filter for "tcp.port == 8080"
# -- launch the client part again -- while server is already running
# -- click on one packet > Follow > TCP Stream
# create the client -- making outbound connections
# client = True
if client:
s.connect(("127.0.0.1", port))
print(s.recv(1024).decode())
s.close()
print("-"*80)
## ----- Socket -- Port Scanner ----- ##
# NOTE -- port scanner
# -- use the sockets library to create a port scanner
# -- idea: run through a list of ports, try to connect to each of them and
# -- in doing so, test, whether that port is open
# iterate over common ports
for port in [22, 80, 139, 443, 445, 8080]:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# specify the default time out
socket.setdefaulttimeout( 1)
# check the result of our connection attempt -- localhost as target
# -- connect_ex: like connect, but returns an error rather than
# -- raising an exception -- error indicator=0 upon sucessful connect
result = s.connect_ex(("127.0.0.1", port))
if result == 0:
print("{} is open!".format(port))
else:
print("{} is closed!".format(port))
# close the socket
s.close()
# # adding a input() so that it does not die
# # input()
print("-"*80)
## ----- END ----- ##
Scapy
- General | a packet manipulation library
- to forge, decode, send and capture packets at a low level
- all parts of the packets are modifiable an can be inspected
- has it's own cli | (you can interface with it directly)
- possible to craft packets at different layers
- Install |
pip install scapy
- demo-example | "scapy_demo.py" | (play around with commenting/uncommenting as you see fit)
from scapy.all import *
# print("-"*80)
# ## ----- Creating/Sending an ICMP packet ----- ##
# # ip layer
# ip_layer = IP(dst="247ctf.com")
# # icmp layer
# icmp_layer = ICMP()
# # creating a packet -- combine them with the slash
# packet = ip_layer / icmp_layer
# # sending the packet
# r = send(packet)
# # view the response -- detailed information about the packet
# # print(packet.show())
# # or check it via wireshard
# # -- Manually: wireshark > Ethernet capture > filter on "icmp" -- re-run script
# # -- Automatically:
# # call wireshark automatically from within scapy
# # wireshark(packet)
# print("-"*80)
# ## ----- ARP Scan on the local network ----- ##
# # NOTE -- depending on the configuration of the remote systems, they may not
# # -- directly respond to ICMP traffic
# # -- therefore, we can go a layer deeper and instead
# # -- perform an address resolution protocol -- arp scan on the local network
# # use SRP to send and receive packets at layer 2
# # -- specify a broadcast destination address -- dst:ff..ff --> broadcast address
# # -- specify the arp target protocol address -- the local subnet the machine is on
# # ---- psdt:subnet
# # -- answering/unanswering hosts
# ans, unans = srp(Ether(dst="ff:ff:ff:ff:ff:ff") / ARP(pdst="192.168.100.0/24"), timeout=3, verbose=False)
# # # only check on the live hosts
# # for i in ans:
# # # dumping the whole information
# # # print(i)
# # # print only the ip addresses
# # print(i[1].psrc)
# print("-"*80)
# ## ----- Port Scanner -- TCP Connect Scan -- LOCALHOST ----- ##
# # NOTE: perform the same scan with scapy at a lower level
# # NOTE -- TCP
# # -- TCP handshake involves 3 steps -- if it completes --> we have a connection
# # -- 1 -- To initialize the connection -- the client send a packet with a "SYN"
# # -- - -- Flag to the port it wants to connect to
# # -- 2 -- if the port is open and the server is accepting connections, it will
# # -- - -- respond with a tcp packet with the "SYN" and "ACK" flag set
# # -- 3 -- to complete the connection -- the client sends a packet with the
# # -- - -- "ACK" and the "RST" (reset) flag
# # use this information to create a TCP Connect port scanner with scapy
# # -- if we can complete the handshake --> open port
# # -- if we can's and if server is responding with "RST" --> port is not accepting
# # -- connections and is probably closed
# # hardcoding the flags
# SYN = 0x02
# RST = 0x04
# ACK = 0x10
# # common ports to scan
# for port in [22, 80, 139, 443, 445, 8080]:
# # sr1() -- send a packet and return ONLY the first packet that answered it
# # set the source port to some random value
# # try to connect -- localhost -- random source port -- sending a SYN
# tcp_connect = sr1(IP(dst="127.0.0.1")/TCP(sport=RandShort(), dport=port, flags="S"), timeout=1, verbose=False)
# # if succeeded and the packet has a tcp layer
# if tcp_connect and tcp_connect.haslayer(TCP):
# # extract the response flags
# response_flags = tcp_connect.getlayer(TCP).flags
# # # verify it
# # print(response_flags)
# # check the response flags
# # open port --> send reset
# if response_flags == (SYN + ACK):
# # send() -- we do NOT expect a response from this
# snd_rst = send(IP(dst="127.0.0.1")/TCP(sport=RandShort(), dport=port, flags="AR"), verbose=False)
# print("{} is open!".format(port))
# # closed port
# elif response_flags == (RST + ACK):
# print("{} is closed!".format(port))
# # could not make the connection
# else:
# print("{} is closed!".format(port))
# print("-"*80)
# ## ----- Port Scanner -- TCP Connect Scan -- REMOTEHOST ----- ##
# # hardcoding the flags
# SYN = 0x02
# RST = 0x04
# ACK = 0x10
# # common ports to scan
# for port in [22, 80, 139, 443, 445, 8080]:
# tcp_connect = sr1(IP(dst="247ctf.com")/TCP(sport=RandShort(), dport=port, flags="S"), timeout=1, verbose=False)
# if tcp_connect and tcp_connect.haslayer(TCP):
# response_flags = tcp_connect.getlayer(TCP).flags
# if response_flags == (SYN + ACK):
# snd_rst = send(IP(dst="247ctf.com")/TCP(sport=RandShort(), dport=port, flags="AR"), verbose=False)
# print("{} is open!".format(port))
# elif response_flags == (RST + ACK):
# print("{} is closed!".format(port))
# else:
# print("{} is closed!".format(port))
# print("-"*80)
# ## ----- Packet Sniffing ----- ##
# # NOTE -- example scenario
# # -- sniffing usernames;passwords sent to a website over http
# # import http
# from scapy.layers.http import HTTPRequest
# # define the callback function
# def process(packet):
# if packet.haslayer(HTTPRequest):
# # view the host and the path where the data is being sent
# print(packet[HTTPRequest].Host.decode() + packet[HTTPRequest].Path.decode())
# # create a function as a callback and use the built-in sniff function
# # -- prn:callback function -- store:don't want to store anything
# sniff(filter="port 80", prn=process, store=False)
# # NOTE -- sniffing for packets
# # -- beware -- browsers will force the https connections -- will not display them
# # -- use curl instead -- `curl "http://247ctf.com/?user=test&password=test"`
# print("-"*80)
# ## ----- Analyzing/Modifying existing packet captures ----- ##
# # NOTE -- for demonstration -- take a look at a challenge from 247ctf
# # -- Challenges > Networking > Error Reporting Protocol
# # -- LOGIN NEEDED > Download pcap -- *error_reporting.pcap*
# # NOTE -- analyzing the packet capture
# # -- full of ICMP traffic
# # -- the ICMP packet contains a data field --> can be used for error messages
# # -- however --> this data field can be used for anything
# # -- examples: exfiltrating data; bypassing a firewall
# # NOTE -- extracting the data from the packet capture
# # -- manually -- we can do it with wireshark, but it's slow
# # -- automatically -- below
# # read the packet capture -- pcap file in the same folder
# scapy_cap = rdpcap("error_reporting.pcap")
# # iterate over the packets
# for packet in scapy_cap:
# # if the packet has an ICMP layer
# if packet.getlayer(ICMP):
# # check the data
# print(packet.load)
# # testing -- halt execution after the first packet
# # input()
print("-"*80)
## ----- END ----- ##
Subprocess
- General | Process/Subprocess
- you can think of a running program as just a process
- we are executing the python interpreter process every time we run a script
- a process can start another process | but these sub processes/child processes run independently of their parent process
- because they are independent | some processes will execute concurrently with the parent
- just because we launched a sub process, it does not mean that the current process will have to hang or wait for input/output or that child process to finish execution
- Python Subprocess Module
- the python subprocess module provides an interface to create and work with additional processes
- to spawn new processes; connect to standard pipes; obtain process error codes
- cross-platform module | can be used to work with processes on linux/window/macos
- part of the standard library
- Note | Underlying Process | Under the Hood
- the underlying process creation and management of these functions is handled by the
popen
class on windows - under the hood it makes use of the create process function and
popen
- gives us the ability to perform fine-grained configuration as to how our process should be executed
- the underlying process creation and management of these functions is handled by the
- Note | Underlying Process | Example
- we have the ability to control the start-up info object and associated creation flags
- useful for more complicated process execution requirements
- BEWARE | when using subprocess make sure you are careful with how you make the call
- because you do NOT want to enable an attacker to gain code execution by passing input directly to a dangerous function
- demo-example | "subprocess_demo.py" | (play around with commenting/uncommenting as you see fit)
import subprocess
print("-"*80)
## ----- EXECUTING A PROCESS ----- ##
# # execute a command without needing to interact with it
# # launch the calculator app -- (not default on every windows|was deleted)
# subprocess.call("calc")
# # better alternative -- more secure way to spawn it
# # pass the commands as a list -- 1st element --> the command to call
# # -- next elements --> the arguments for that call
# subprocess.call(["calc"])
# # NOTE: if shell=True is passed as an argument
# # -- the command string is interpreted as a raw shell command
# # -- meaning: we are invoking a shell before executing the sub process
# # -- BEWARE: if user controlled input is passed --> opening to code execution
# subprocess.call(["calc"], shell=True)
print("-"*80)
## ----- EXECUTING A PROCESS -- checking for errors ----- ##
# the same as call -- but the exit code is checked
# -- if error occured -- exception is raised
# # -- will run but with warning -- "asd" is not recognized as an internal
# # -- command...
# out = subprocess.call(["cmd", "/c", "asd"])
# # will raise the CalledProcessError -- returned non-zero exit status 1
# out = subprocess.check_call(["cmd", "/c", "asd"])
# # will run after checks
# out = subprocess.check_call(["cmd", "/c", "calc"])
print("-"*80)
## ----- EXECUTING A PROCESS -- STDIN/STDOUT ----- ##
# NOTE -- when using the call method, the standard input and output is bound
# -- to the parent
# -- meaning: the calling program can NOT capture the output of the command
# -- which was executed
# capturing the output of the called command
out = subprocess.check_output(["cmd", "/c", "whoami"])
# having access to the output as a python variable
# print(out.decode())
print("The output was: {}".format(out.decode()))
# NOTE -- the underlying process creation and management of these functions
# -- is handled by the popen class on windows
# -- under the hood it makes use of the create process function and popen
# -- gives us the ability to perform fine-grained configuration as to how
# -- our process should be executed
# Example -- we have the ability to control the start-up info object and
# -- associated creation flags -- useful for more complicated process execution
# -- requirements
print("-"*80)
## ----- END ----- ##
Threading
- General | Threads
- just small units of work
- usually contained in processes
- a process can have more than one thread
- the threads share the state and memory of the process
- threading in python | basically telling your code to do more than one thing at once
- useful
- to improve speed by performing actions concurrently
- to a program to remain responsive to input while also performing some other action which may be blocking
- With Python | each process has at least ONE thread | the main thread
- where we have been executing our scripts for the most part till now
- BEWARE
- once the main thread exit's, it attempts to kill all of it's child threads | unless the thread is specified as a daemon
- demo-example | "threading_demo.py" | (play around with commenting/uncommenting as you see fit)
import threading, time
from datetime import datetime
print("-"*80)
## ----- Compare sequential vs concurrent cleeping ----- ##
def sleeper(i):
print("hello from %d!" % i)
# sleep for i seconds
time.sleep(i)
print("goodbye from %d!" %i)
"""
## ----- Sequential execution ----- ##
# NOTE -- it takes around 6 sec to execute
# -- reason: we are being blocked by time.sleep()
# -- before executing sleeper(2), sleeper(1) needs to finish ...
# print out the current time -- hours, minutes, seconds
print(datetime.now().strftime("%H:%M:%S"))
sleeper(0)
sleeper(1)
sleeper(2)
sleeper(3)
print(datetime.now().strftime("%H:%M:%S"))
"""
"""
## ----- Concurrent/paralell execution -- starting right away ----- ##
# NOTE -- each run provides a different output, since the paralell
# -- threads could be scheduled differently with each execution
# print out the current time -- hours, minutes, seconds
print(datetime.now().strftime("%H:%M:%S"))
# create a new thread and start it
threading.Thread(target=sleeper, args=(0,)).start()
threading.Thread(target=sleeper, args=(1,)).start()
threading.Thread(target=sleeper, args=(2,)).start()
threading.Thread(target=sleeper, args=(3,)).start()
print(datetime.now().strftime("%H:%M:%S"))
## ----- Concurrent/paralell execution -- delayed execution with timer ----- ##
# NOTE -- timer -- for delayed thread execution
# -- takes an array of arguments and a dictionary of keyword arguments
# print out the current time -- hours, minutes, seconds
print(datetime.now().strftime("%H:%M:%S"))
# wait for the timer before executing the thread
threading.Timer(0, sleeper, [0]).start()
threading.Timer(1, sleeper, [1]).start()
threading.Timer(2, sleeper, [2]).start()
threading.Timer(3, sleeper, [3]).start()
print(datetime.now().strftime("%H:%M:%S"))
"""
"""
## ----- Printing/getting input at the same time ----- ##
# ISSUE -- how can we print something to the screen and at the same time
# -- get input from the user
# -- with input() -- the program will be busy wainting for our input
# print("hello")
# # will wait for the input before printing "world"
# input()
# print("world")
# NOTE -- idea: 2 threads which can communicate with each other using a simple
# -- global variable
# global variable
stop = False
# constantly asking the user for input
# -- if exit is desired --> stop both functions
def input_thread():
global stop
while True:
# ask a user for input
user_input = input("Should we stop?:")
print(">> User says: {}".format(user_input))
# check if stop
if user_input == "yes":
stop = True
break
# prints the counter -- increases the coounter -- sleeps for 1 sec
def output_thread():
global stop
count = 0
while not stop:
print(count)
count += 1
time.sleep(1)
# starting both of the threads
t1 = threading.Thread(target=input_thread).start()
t2 = threading.Thread(target=output_thread).start()
"""
"""
## ----- Synchronizing Threads -- WITHOUT LOCKING ----- ##
# NOTE -- threading module has built-in functionality to help implement
# -- locking that can be used to synchronize threads
# NOTE -- locking can be used to control access to shared resources
# -- or to prevent corruption or dead-lock issues with paralell processing
# -- the lock controlls which thread is allowed to access a resource during
# -- execution
# EXAMPLE -- we have a list, and want a number of threads to pop a value
# -- off of that list
# WITHOUT A LOCK: different order with each execution
# ISSUE: we do NOT want to multiple threads to pop the same value from the list
# -- by mistake
# IDEA -- prevent simultaneous modification of a variable
# -- lock will force an operation to wait, until the variable is unlocked in
# -- order to access or modify it
# create a lock
data_lock = threading.Lock()
# create the list
data = [x for x in range(1000)]
# consume thread
def consume_thread():
global data_lock
while len(data) > 0:
print(data.pop())
# create the thread and start straight away
threading.Thread(target=consume_thread).start()
threading.Thread(target=consume_thread).start()
threading.Thread(target=consume_thread).start()
"""
## ----- Synchronizing Threads -- Locking ----- ##
# NOTE -- due to the locking mechanism, the popping is done in
# -- sequential order
# create a lock
data_lock = threading.Lock()
# create the list
data = [x for x in range(1000)]
# consume thread
def sync_consume_thread():
global data_lock, data
while True:
# acquire the lock -- this is blocking
data_lock.acquire()
while len(data) > 0:
# display the current thread, and the popped item
print(threading.current_thread().name, data.pop())
# release the lock -- the waiting threads are allowed to take it
data_lock.release()
# create the thread and start straight away
threading.Thread(target=sync_consume_thread).start()
threading.Thread(target=sync_consume_thread).start()
threading.Thread(target=sync_consume_thread).start()
print("-"*80)
## --------------- END --------------- ##
Pycryptodome
- General
- fork of the pycrypto library | last commit to the original in 2014
- here | to perform common encryption tasks
- can be used for symmetric/asymmetric ciphers, hashes, message authentication codes,
- Setup |
pip install pycryptodome
- Note | Password Based Derivation Functions
- Basic Idea | use the function to generate a byte sequence based on a string provided from the user
- Note | Padding
- for a symmetric cipher we need to be able to pad our data up to the length of a block size
- demo-example | "crypto_demo.py" | (play around with commenting/uncommenting as you see fit)
from Crypto.Random import get_random_bytes
print("-"*80)
## ----- Crypto -- Get Random Bytes ----- ##
# return random byte stream of the lenght specified
key = get_random_bytes(32)
print(key)
# key length: 32 bytes = 256 bits
print(len(key))
print("-"*80)
## ----- Crypto -- Key Derivation Functions ----- ##
from Crypto.Protocol.KDF import PBKDF2
# salt is needed for PBKDF2
# -- to help prevent against dictionary based attacks
# -- the value does NOT necessary need to be kept secret
# -- but need to be random for each derivation
# random salt
salt = get_random_bytes(32)
# static salt -- to demonstrate static behaviour for the derived key
# -- each time the same key will be derived with the same salt and password
salt = b'\x8f\xa4\xc2\xc2s\x95\xf3F\xee\x97`\x10n\xcd\x88\xce\xb5v\xafi\x0cJ\x05\xce\x01\xbbSTc\xe02\xe0'
# the secret password used to generate the key
password = "hunter21"
# generate the key -- dkLen:cummulative length of the keys to produce
key = PBKDF2(password, salt, dkLen=32)
print(key)
print(len(key))
print("-"*80)
## ----- Symmetric Encryption -- Block ciphers ----- ##
# import the AES cipher
from Crypto.Cipher import AES
# import some padding function
from Crypto.Util.Padding import pad, unpad
# the stream to encrypt
to_encrypt = b"encrypt_me!"
# create the cipher -- mode:CBC (cipher block chaining)
cipher = AES.new(key, AES.MODE_CBC)
# automatically generated initialization vector -- IV
print(cipher.iv)
# encrypting data with AES
ciphered_data = cipher.encrypt(pad(to_encrypt, AES.block_size))
print(ciphered_data)
"""
print("-"*80)
## ----- Symmetric Decryption -- Block ciphers -- New IV issue ----- ##
# NOTE: a cipher object is stateful
# -- once we already encrypted a message, we CAN also perform decryption
# -- with that same cipher object -- it goes both ways
# BEWARE: using a different object for decryption will use a different
# -- new random IV --> will NOT decrypt properly
# setting up a new cipher
cipher = AES.new(key, AES.MODE_CBC)
# decrypt the data
plaintext_data = cipher.decrypt(ciphered_data)
# verify -- jumbled data -- not "encrypt_me!" text
# -- REASON: we are using an other random IV
print(plaintext_data)
"""
"""
print("-"*80)
## ----- Symmetric Decryption -- Block ciphers -- Same IV ----- ##
# specify the SAME for the encryption used IV
cipher = AES.new(key, AES.MODE_CBC, iv=cipher.iv)
# unpad the decrypted data -- remove the padding bytes
plaintext_data = unpad(cipher.decrypt(ciphered_data), AES.block_size)
print(plaintext_data)
"""
"""
print("-"*80)
## ----- Symmetric Encryption/Decryption -- Stream ciphers ----- ##
from Crypto.Cipher import ARC4
# create a new cipher -- use the same key
cipher = ARC4.new(key)
# encrypt the data -- not padded to a blocksize
encrypted = cipher.encrypt(to_encrypt)
print(encrypted)
# decrypt the data
cipher = ARC4.new(key)
plaintext = cipher.decrypt(encrypted)
print(plaintext)
"""
print("-"*80)
## ----- Asymmetric Ciphers -- RSA -- Public/Private Key Generation ----- ##
# NOTE -- access to a public and private RSA key pair is needed
# -- or generate them here
# importing RSA
from Crypto.PublicKey import RSA
# generate an RSA key pair -- 1024 bit
key = RSA.generate(1024)
# password protect the key
encrypted_key = key.exportKey(passphrase=password)
# print the encrypted RSA private key
print(encrypted_key)
# the public RSA key
pub = key.publickey()
# print the RSA public key
print(pub.exportKey())
"""
print("-"*80)
## ----- Asymmetric Ciphers -- RSA -- Checking Pub/Priv Key Capabilities ----- ##
# check the capability for encrypting data for the key
print(key.can_encrypt())
# check the capability for signing data for the key
print(key.can_sign())
# check whether the private key is present in the object or not
# -- the private key is indeed present in the private key object
print(key.has_private())
# check whether the public key is present in public key object or not
# -- no, the private key is not present in the public key object
print(pub.has_private())
"""
"""
print("-"*80)
## ----- Asymmetric Ciphers -- RSA -- Enryption/Decryption ----- ##
# NOTE -- we can only encrypt data that is slightly shorter than the
# -- RSA modulus
# HERE -- our encryption message is also short --> no issue
# PADDING -- Optimal Asymmetric Encryption Padding -- OAEP
# -- padding scheme often used together with RSA encryption
from Crypto.Cipher import PKCS1_OAEP
# ISSUE -- trying to encrypt a larger message
# -- will raise an error: ValueError -- "Plaintext is too long"
# to_encrypt = b"A" * 1000
# create the cipher
cipher = PKCS1_OAEP.new(pub)
# encrypt the data -- encrypt with the public key
encrypted = cipher.encrypt(to_encrypt)
print(encrypted)
# decrypt the data -- decrypt with the private key
cipher = PKCS1_OAEP.new(key)
plaintext = cipher.decrypt(encrypted)
print(plaintext)
"""
print("-"*80)
## ----- Create and Verify Digital Signatures ----- ##
# NOTE -- digital signatures are based on a public key cryptography concept
# -- whoever signs the message, needs to have the private key
# -- whoever verifies it, needs to have the public key
# HERE -- create and verify simple digital signatures by performing a modular
# -- exponentiation, by encrypting and decrypting a message hash
from Crypto.Hash import SHA512
# calculate the SHA512 hash -- in raw bytes format
plain_hash = SHA512.new(to_encrypt).digest()
# calculate the integer represented by this given array of bytes -- in dec format
hashed = int.from_bytes(plain_hash, byteorder='big')
print(hashed)
# calculate the signature
# -- raising the hash to the power of d modulo n
# -- d and n -- were created when we set up our key
signature = pow(hashed, key.d, key.n)
print("Signature: {}".format(signature))
# verify the signature
# -- raising the signature to the power of e modulo n
# -- using the key material from our public key
signature_hash = pow(signature, key.e, key.n)
print(signature_hash)
# verify they are the same --> valid signature
print(hashed == signature_hash)
print("-"*80)
## --------------- END --------------- ##
Argparse
- General | similar to
sys.argv
for parsing and using command line arguments- messy and difficult to maintain and use over time
- Here | to parse and perform sanity checks on data passed as part of the command line
- to write command line interfaces (cli) flexibly
- standard library module | no need to install
- Note | Positional Arguments
- are read from the command line in the order that they appear after the script name
- demo-example | "argparse_demo.py" | (play around with commenting/uncommenting as you see fit)
import argparse
print("-"*80)
## ----- Positional arguments ----- ##
# create a parser
parser = argparse.ArgumentParser(description="Example Python CLI")
# adding an arguments -- help will be displayed when "-h" is used
# -- information will be included as part of the help output
parser.add_argument("hacker_name", help="Enter the hacker name", type=str)
parser.add_argument("hacker_power", help="Enter the hacker power", type=int)
print("-"*80)
## ----- Optional arguments ----- ##
# NOTE -- optional arguments are not required, unless specified
# -- can be added to the command line in any order
# -- by default given the value of None when it's not being used
# shorthand;long;
# -- action: defines what happens when the command is provided on the cli
# -- store_true: stores the value true into the argument if it is provided
# -- on the command line
parser.add_argument("-bh", "--blackhat", default=False, action="store_true")
# make it required
# parser.add_argument("-bh", "--blackhat", required=True, default=False, action="store_true")
parser.add_argument("-wh", "--whitehat", default=True, action="store_true")
# parser.add_argument("-wh", "--whitehat", default=True, action="store_false")
# specify what a valid choice looks like
# -- ht:hacker's type
parser.add_argument("-ht", "--hackertype", choices=["whitehat","blackhat","greyhat"])
## using it `python argparse_demo.py neut 10 -ht whitehat`
print("-"*80)
## ----- Accessing the Arguments ----- ##
# parsing the arguments
args = parser.parse_args()
# check the arguments
print(args)
print(args.hacker_name)
if args.blackhat:
hacker_type = "blackhat"
elif args.whitehat:
hacker_type = "whitehat"
else:
hacker_type = "unknown"
print("{} is a {} type of hacker".format(args.hacker_name, hacker_type))
print("-"*80)
## --------------- END --------------- ##