Skip to main content

0405 | Extending Python

BeautifulSoup

  • Web Scraping | the process of copying and extracting data from websites
    • in hacking | to speed up the enumeration or brute forcing process
    • can be difficult due to the unstructured content of web pages
    • Steps of web scraping
      • Step-1 | download the html for the page
      • Step-2 | parse the html
      • Step-3 | navigate the parse tree to extract the data you are looking for
  • Install | pip install beautifulsoup4
  • Note | some websites do not like scraping
    • use a different request user agent header
    • limit the number of request
      • include a pause or sleep
  • demo-example | "beautifulsoup_demo.py" | (play around with commenting/uncommenting as you see fit)
    import requests

    print("-"*80)
    ## ----- Web Scraping -- Grabbing the DOM ----- ##

    # download the scoreboard from 247ctf.com
    page = requests.get("https://247ctf.com/scoreboard")
    # dump the dom/text
    # print(page.text)

    print("-"*80)
    ## ----- BeautifulSoup -- Parsing ----- ##

    # importing beautifulsoup
    # -- basically, it converts the complex html structure into a
    # -- bunch of pyhton objects, which we can query and interact with
    # -- based on the structure of the html tree
    from bs4 import BeautifulSoup

    # use bs to parse the html
    # -- 1st param -- the downloaded html
    # -- 2nd param -- the parser we want to use
    soup = BeautifulSoup(page.content, "html.parser")

    # the raw text content of the site without displaying the elements
    print(soup.text)

    print("-"*80)
    ## ----- BeautifulSoup -- Other Functions -- Page Title ----- ##

    # title of the html page
    print(soup.title)
    print(soup.title.name)
    # the raw text string of the title
    print(soup.title.string)

    print("-"*80)
    ## ----- BeautifulSoup -- Looking for Elements -- Links----- ##

    # to find html elements -- the first link
    print(soup.find("a"))

    # find all html links on the page
    for line in soup.find_all("a"):
    print(line)
    # print only the href
    print(line.get('href'))


    print("-"*80)
    ## ----- BeautifulSoup -- Looking for Elements -- Other restrictions----- ##

    # look for an id with a specific tag
    print(soup.find(id="fetch-error"))

    # look for a specific class -- include _ (class is python keyword)
    print(soup.find(class_="nav-link"))

    # look for a link with a specific class
    print(soup.find("a", class_="nav-link"))


    print("-"*80)
    ## ----- BeautifulSoup -- Tracking the top players----- ##

    # identifying the score boad table
    table = soup.find("table")
    #print(table)

    # extract the tbody -- table body
    table_body = table.find("tbody")
    # print(table_body)

    # one row per each user -- tr:table-row
    rows = table_body.find_all("tr")

    for row in rows:
    print("-"*5)
    # print(row)

    # 6 columns per row
    # cols = [x for x in row.find_all("td")]
    # look only for the text -- strip out the noise
    cols = [x.text.strip() for x in row.find_all("td")]
    # print(cols)

    print("{} is in {} place with {} points".format(cols[2], cols[0], cols[4]))


    print("-"*80)
    ## ----- END ----- ##

Py2exe

  • Notes
    • a limitation of our scripts so far is that they need a python interpreter in order to run
    • and depending on what we are doing, not just the basic interpreter either, but a bunch of other modules too
      • such as those we installed with pip
  • py2exe library
    • turns python programs into packages that can be run on other windows computers without needing to first install python on those computers
    • to create executable files which can be run on windows systems without a python installation
    • in the background | it just extends the module distutils with a new command
      • a module that installs something
      • usually a python module
  • Install | pip install py2exe
    • Requirement | python3.11
  • Personal Note
    • sadly will not install properly
    • a separate script is needed which tells py2exe how to bundle up our script as an executable
  • demo-example | "py2exe_demo.py"
    print("hacked")
  • demo-example | "demo_setup.py" | (play around with commenting/uncommenting as you see fit)
    # reference setup from diskutils core
    from distutils.core import setup

    import py2exe

    # call setup and tell it the entry point that we want -- our hack script
    # -- will create a new directory `dist`
    # -- this distribution folder contains the executable and all the files needed
    # -- to run the executable -- run it -- `cd dist` and then `py2exe_demo.exe`
    # setup(console=['py2exe_demo.py'])

    # passing in parameters -- the new `dist` folder is much smaller
    # -- bundle_files: will bundle everything including the python interpreter
    # -- compressed:
    # -- zipfile:None --> the files will be bundled within the executable
    setup(
    options = {'py2exe' {'bundle_files': 1, 'compressed': True}},
    console=[{'script':'py2exe_demo.py'}],
    zipfile = None
    )

Sockets

  • General | a socket is a bidirectional communication channel
    • sockets can communicate within a process, between processes, or between different systems on a network
  • Here | focusing on tcp-connection oriented networking sockets
    • but the same library can be used for udp too
  • part of the standard python library
  • putty | to verify socket connections
    • terminal emulator; serial console; network file transfer application
    • supports raw socket connections
    • Download and Install
      • url | https://putty.org/ > "Alternative binary files" > "the SSH and Telnet client itself"/64-bit x86 > putty.exe | (standalone version)
  • wireshark | to see and verify the connections
    • a packet analyzer
    • used for network troubleshooting and analysis
    • Download and Install
  • demo-example | "sockets_demo.py" | (play around with commenting/uncommenting as you see fit)
    import socket

    print("-"*80)
    ## ----- Socket ----- ##

    # identify the ip address of a server
    ip = socket.gethostbyname('247ctf.com')
    print(ip)

    # create a socket -- specify parameters for the type of socket
    # -- AF_INET: transport mechanism -- IPv4
    # -- SOCK_STREAM: for connection oriented protocol -- TCP
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    # NOTE -- options with socket
    # -- binding the socket to a port and listening on that port
    # -- use the socket to make an outbound connection to an other system

    print("-"*80)
    ## ----- Socket -- Outbound connection ----- ##

    # a host port pair is required --
    # -- host: ipv4 address or hostname -- string
    # -- port: int
    s.connect(("247ctf.com", 80))

    # sending and receiving data via the socket

    # sending data -- a head request -- a get request without a msg body
    s.sendall(b"HEAD / HTTP/1.1\r\nHost: 247ctf.com\r\n\r\n")
    # check the response -- 1024 bytes -- the max amount of data it will receive
    # at once
    print(s.recv(1024).decode())

    # close the socket
    s.close()

    print("-"*80)
    ## ----- Socket -- Binding to a Port and Listening ----- ##

    # NOTE
    # -- depending on the used protocol
    # -- you will typically need to have both the server and the client component
    # -- server --> waiting for connections
    # -- client --> initiating the connections

    # adding both the client and the server
    client = False
    server = False
    # secify the port for the socket
    port = 8080

    # create the socket
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)


    # create the server -- listening for connections
    # server = True
    if server:
    # bind the socket to the localhost to a local port
    s.bind(("127.0.0.1", port))
    # listen to connections
    s.listen()

    # verify listening with netstat -- ``netstat -an`

    # accepting connections in the socket
    while True:
    # accept: accept a connection by blocking and waiting
    # for incoming connections
    # -- return values -- pair of a new socket object
    # -- usable to send and receive data to the other end of the connection
    # -- the client connecting to the server
    connect, addr = s.accept()
    connect.send(b"You made it to the socket!")
    connect.close()

    # use paddy to connect to the socket and verify if the server is
    # working correctly
    # configuration:
    # -- 127.0.0.1 -- 8080 -- Connection type:Other Raw
    # -- Close window on exit: Never


    # print("-"*80)
    ## ----- Socket -- Client mode ----- ##

    # NOTE -- running the client
    # -- only works if server is already running

    # NOTE -- verifying it with wireshark
    # -- use the adapter for loopback traffic capture
    # -- start capture -- filter for "tcp.port == 8080"
    # -- launch the client part again -- while server is already running
    # -- click on one packet > Follow > TCP Stream

    # create the client -- making outbound connections
    # client = True
    if client:
    s.connect(("127.0.0.1", port))
    print(s.recv(1024).decode())
    s.close()


    print("-"*80)
    ## ----- Socket -- Port Scanner ----- ##

    # NOTE -- port scanner
    # -- use the sockets library to create a port scanner
    # -- idea: run through a list of ports, try to connect to each of them and
    # -- in doing so, test, whether that port is open

    # iterate over common ports
    for port in [22, 80, 139, 443, 445, 8080]:
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    # specify the default time out
    socket.setdefaulttimeout( 1)

    # check the result of our connection attempt -- localhost as target
    # -- connect_ex: like connect, but returns an error rather than
    # -- raising an exception -- error indicator=0 upon sucessful connect
    result = s.connect_ex(("127.0.0.1", port))

    if result == 0:
    print("{} is open!".format(port))
    else:
    print("{} is closed!".format(port))

    # close the socket
    s.close()


    # # adding a input() so that it does not die
    # # input()

    print("-"*80)
    ## ----- END ----- ##

Scapy

  • General | a packet manipulation library
  • to forge, decode, send and capture packets at a low level
  • all parts of the packets are modifiable an can be inspected
  • has it's own cli | (you can interface with it directly)
  • possible to craft packets at different layers
  • Install | pip install scapy
  • demo-example | "scapy_demo.py" | (play around with commenting/uncommenting as you see fit)
    from scapy.all import *

    # print("-"*80)
    # ## ----- Creating/Sending an ICMP packet ----- ##

    # # ip layer
    # ip_layer = IP(dst="247ctf.com")

    # # icmp layer
    # icmp_layer = ICMP()

    # # creating a packet -- combine them with the slash
    # packet = ip_layer / icmp_layer

    # # sending the packet
    # r = send(packet)

    # # view the response -- detailed information about the packet
    # # print(packet.show())

    # # or check it via wireshard
    # # -- Manually: wireshark > Ethernet capture > filter on "icmp" -- re-run script
    # # -- Automatically:

    # # call wireshark automatically from within scapy
    # # wireshark(packet)

    # print("-"*80)
    # ## ----- ARP Scan on the local network ----- ##

    # # NOTE -- depending on the configuration of the remote systems, they may not
    # # -- directly respond to ICMP traffic
    # # -- therefore, we can go a layer deeper and instead
    # # -- perform an address resolution protocol -- arp scan on the local network


    # # use SRP to send and receive packets at layer 2
    # # -- specify a broadcast destination address -- dst:ff..ff --> broadcast address
    # # -- specify the arp target protocol address -- the local subnet the machine is on
    # # ---- psdt:subnet
    # # -- answering/unanswering hosts
    # ans, unans = srp(Ether(dst="ff:ff:ff:ff:ff:ff") / ARP(pdst="192.168.100.0/24"), timeout=3, verbose=False)

    # # # only check on the live hosts
    # # for i in ans:
    # # # dumping the whole information
    # # # print(i)

    # # # print only the ip addresses
    # # print(i[1].psrc)


    # print("-"*80)
    # ## ----- Port Scanner -- TCP Connect Scan -- LOCALHOST ----- ##

    # # NOTE: perform the same scan with scapy at a lower level

    # # NOTE -- TCP
    # # -- TCP handshake involves 3 steps -- if it completes --> we have a connection
    # # -- 1 -- To initialize the connection -- the client send a packet with a "SYN"
    # # -- - -- Flag to the port it wants to connect to
    # # -- 2 -- if the port is open and the server is accepting connections, it will
    # # -- - -- respond with a tcp packet with the "SYN" and "ACK" flag set
    # # -- 3 -- to complete the connection -- the client sends a packet with the
    # # -- - -- "ACK" and the "RST" (reset) flag

    # # use this information to create a TCP Connect port scanner with scapy
    # # -- if we can complete the handshake --> open port
    # # -- if we can's and if server is responding with "RST" --> port is not accepting
    # # -- connections and is probably closed

    # # hardcoding the flags
    # SYN = 0x02
    # RST = 0x04
    # ACK = 0x10

    # # common ports to scan
    # for port in [22, 80, 139, 443, 445, 8080]:
    # # sr1() -- send a packet and return ONLY the first packet that answered it
    # # set the source port to some random value

    # # try to connect -- localhost -- random source port -- sending a SYN
    # tcp_connect = sr1(IP(dst="127.0.0.1")/TCP(sport=RandShort(), dport=port, flags="S"), timeout=1, verbose=False)

    # # if succeeded and the packet has a tcp layer
    # if tcp_connect and tcp_connect.haslayer(TCP):
    # # extract the response flags
    # response_flags = tcp_connect.getlayer(TCP).flags

    # # # verify it
    # # print(response_flags)

    # # check the response flags
    # # open port --> send reset
    # if response_flags == (SYN + ACK):
    # # send() -- we do NOT expect a response from this
    # snd_rst = send(IP(dst="127.0.0.1")/TCP(sport=RandShort(), dport=port, flags="AR"), verbose=False)
    # print("{} is open!".format(port))

    # # closed port
    # elif response_flags == (RST + ACK):
    # print("{} is closed!".format(port))

    # # could not make the connection
    # else:
    # print("{} is closed!".format(port))

    # print("-"*80)
    # ## ----- Port Scanner -- TCP Connect Scan -- REMOTEHOST ----- ##

    # # hardcoding the flags
    # SYN = 0x02
    # RST = 0x04
    # ACK = 0x10

    # # common ports to scan
    # for port in [22, 80, 139, 443, 445, 8080]:
    # tcp_connect = sr1(IP(dst="247ctf.com")/TCP(sport=RandShort(), dport=port, flags="S"), timeout=1, verbose=False)

    # if tcp_connect and tcp_connect.haslayer(TCP):
    # response_flags = tcp_connect.getlayer(TCP).flags

    # if response_flags == (SYN + ACK):
    # snd_rst = send(IP(dst="247ctf.com")/TCP(sport=RandShort(), dport=port, flags="AR"), verbose=False)
    # print("{} is open!".format(port))

    # elif response_flags == (RST + ACK):
    # print("{} is closed!".format(port))
    # else:
    # print("{} is closed!".format(port))


    # print("-"*80)
    # ## ----- Packet Sniffing ----- ##

    # # NOTE -- example scenario
    # # -- sniffing usernames;passwords sent to a website over http

    # # import http
    # from scapy.layers.http import HTTPRequest

    # # define the callback function
    # def process(packet):
    # if packet.haslayer(HTTPRequest):
    # # view the host and the path where the data is being sent
    # print(packet[HTTPRequest].Host.decode() + packet[HTTPRequest].Path.decode())


    # # create a function as a callback and use the built-in sniff function
    # # -- prn:callback function -- store:don't want to store anything
    # sniff(filter="port 80", prn=process, store=False)

    # # NOTE -- sniffing for packets
    # # -- beware -- browsers will force the https connections -- will not display them
    # # -- use curl instead -- `curl "http://247ctf.com/?user=test&password=test"`


    # print("-"*80)
    # ## ----- Analyzing/Modifying existing packet captures ----- ##

    # # NOTE -- for demonstration -- take a look at a challenge from 247ctf
    # # -- Challenges > Networking > Error Reporting Protocol
    # # -- LOGIN NEEDED > Download pcap -- *error_reporting.pcap*

    # # NOTE -- analyzing the packet capture
    # # -- full of ICMP traffic
    # # -- the ICMP packet contains a data field --> can be used for error messages
    # # -- however --> this data field can be used for anything
    # # -- examples: exfiltrating data; bypassing a firewall

    # # NOTE -- extracting the data from the packet capture
    # # -- manually -- we can do it with wireshark, but it's slow
    # # -- automatically -- below

    # # read the packet capture -- pcap file in the same folder
    # scapy_cap = rdpcap("error_reporting.pcap")

    # # iterate over the packets
    # for packet in scapy_cap:
    # # if the packet has an ICMP layer
    # if packet.getlayer(ICMP):
    # # check the data
    # print(packet.load)

    # # testing -- halt execution after the first packet
    # # input()

    print("-"*80)
    ## ----- END ----- ##

Subprocess

  • General | Process/Subprocess
    • you can think of a running program as just a process
    • we are executing the python interpreter process every time we run a script
    • a process can start another process | but these sub processes/child processes run independently of their parent process
    • because they are independent | some processes will execute concurrently with the parent
    • just because we launched a sub process, it does not mean that the current process will have to hang or wait for input/output or that child process to finish execution
  • Python Subprocess Module
    • the python subprocess module provides an interface to create and work with additional processes
    • to spawn new processes; connect to standard pipes; obtain process error codes
    • cross-platform module | can be used to work with processes on linux/window/macos
    • part of the standard library
  • Note | Underlying Process | Under the Hood
    • the underlying process creation and management of these functions is handled by the popen class on windows
    • under the hood it makes use of the create process function and popen
    • gives us the ability to perform fine-grained configuration as to how our process should be executed
  • Note | Underlying Process | Example
    • we have the ability to control the start-up info object and associated creation flags
    • useful for more complicated process execution requirements
  • BEWARE | when using subprocess make sure you are careful with how you make the call
    • because you do NOT want to enable an attacker to gain code execution by passing input directly to a dangerous function
  • demo-example | "subprocess_demo.py" | (play around with commenting/uncommenting as you see fit)
    import subprocess

    print("-"*80)
    ## ----- EXECUTING A PROCESS ----- ##

    # # execute a command without needing to interact with it
    # # launch the calculator app -- (not default on every windows|was deleted)
    # subprocess.call("calc")

    # # better alternative -- more secure way to spawn it
    # # pass the commands as a list -- 1st element --> the command to call
    # # -- next elements --> the arguments for that call
    # subprocess.call(["calc"])

    # # NOTE: if shell=True is passed as an argument
    # # -- the command string is interpreted as a raw shell command
    # # -- meaning: we are invoking a shell before executing the sub process
    # # -- BEWARE: if user controlled input is passed --> opening to code execution
    # subprocess.call(["calc"], shell=True)

    print("-"*80)
    ## ----- EXECUTING A PROCESS -- checking for errors ----- ##

    # the same as call -- but the exit code is checked
    # -- if error occured -- exception is raised

    # # -- will run but with warning -- "asd" is not recognized as an internal
    # # -- command...
    # out = subprocess.call(["cmd", "/c", "asd"])

    # # will raise the CalledProcessError -- returned non-zero exit status 1
    # out = subprocess.check_call(["cmd", "/c", "asd"])

    # # will run after checks
    # out = subprocess.check_call(["cmd", "/c", "calc"])


    print("-"*80)
    ## ----- EXECUTING A PROCESS -- STDIN/STDOUT ----- ##

    # NOTE -- when using the call method, the standard input and output is bound
    # -- to the parent
    # -- meaning: the calling program can NOT capture the output of the command
    # -- which was executed

    # capturing the output of the called command
    out = subprocess.check_output(["cmd", "/c", "whoami"])

    # having access to the output as a python variable
    # print(out.decode())
    print("The output was: {}".format(out.decode()))

    # NOTE -- the underlying process creation and management of these functions
    # -- is handled by the popen class on windows
    # -- under the hood it makes use of the create process function and popen
    # -- gives us the ability to perform fine-grained configuration as to how
    # -- our process should be executed
    # Example -- we have the ability to control the start-up info object and
    # -- associated creation flags -- useful for more complicated process execution
    # -- requirements

    print("-"*80)
    ## ----- END ----- ##

Threading

  • General | Threads
    • just small units of work
    • usually contained in processes
    • a process can have more than one thread
    • the threads share the state and memory of the process
    • threading in python | basically telling your code to do more than one thing at once
    • useful
      • to improve speed by performing actions concurrently
      • to a program to remain responsive to input while also performing some other action which may be blocking
  • With Python | each process has at least ONE thread | the main thread
    • where we have been executing our scripts for the most part till now
  • BEWARE
    • once the main thread exit's, it attempts to kill all of it's child threads | unless the thread is specified as a daemon
  • demo-example | "threading_demo.py" | (play around with commenting/uncommenting as you see fit)
    import threading, time
    from datetime import datetime

    print("-"*80)
    ## ----- Compare sequential vs concurrent cleeping ----- ##

    def sleeper(i):
    print("hello from %d!" % i)

    # sleep for i seconds
    time.sleep(i)

    print("goodbye from %d!" %i)


    """
    ## ----- Sequential execution ----- ##

    # NOTE -- it takes around 6 sec to execute
    # -- reason: we are being blocked by time.sleep()
    # -- before executing sleeper(2), sleeper(1) needs to finish ...

    # print out the current time -- hours, minutes, seconds
    print(datetime.now().strftime("%H:%M:%S"))
    sleeper(0)
    sleeper(1)
    sleeper(2)
    sleeper(3)
    print(datetime.now().strftime("%H:%M:%S"))
    """


    """
    ## ----- Concurrent/paralell execution -- starting right away ----- ##

    # NOTE -- each run provides a different output, since the paralell
    # -- threads could be scheduled differently with each execution

    # print out the current time -- hours, minutes, seconds
    print(datetime.now().strftime("%H:%M:%S"))

    # create a new thread and start it
    threading.Thread(target=sleeper, args=(0,)).start()
    threading.Thread(target=sleeper, args=(1,)).start()
    threading.Thread(target=sleeper, args=(2,)).start()
    threading.Thread(target=sleeper, args=(3,)).start()

    print(datetime.now().strftime("%H:%M:%S"))

    ## ----- Concurrent/paralell execution -- delayed execution with timer ----- ##

    # NOTE -- timer -- for delayed thread execution
    # -- takes an array of arguments and a dictionary of keyword arguments

    # print out the current time -- hours, minutes, seconds
    print(datetime.now().strftime("%H:%M:%S"))

    # wait for the timer before executing the thread
    threading.Timer(0, sleeper, [0]).start()
    threading.Timer(1, sleeper, [1]).start()
    threading.Timer(2, sleeper, [2]).start()
    threading.Timer(3, sleeper, [3]).start()

    print(datetime.now().strftime("%H:%M:%S"))
    """


    """
    ## ----- Printing/getting input at the same time ----- ##

    # ISSUE -- how can we print something to the screen and at the same time
    # -- get input from the user
    # -- with input() -- the program will be busy wainting for our input

    # print("hello")
    # # will wait for the input before printing "world"
    # input()
    # print("world")

    # NOTE -- idea: 2 threads which can communicate with each other using a simple
    # -- global variable

    # global variable
    stop = False

    # constantly asking the user for input
    # -- if exit is desired --> stop both functions
    def input_thread():
    global stop
    while True:
    # ask a user for input
    user_input = input("Should we stop?:")
    print(">> User says: {}".format(user_input))

    # check if stop
    if user_input == "yes":
    stop = True
    break

    # prints the counter -- increases the coounter -- sleeps for 1 sec
    def output_thread():
    global stop
    count = 0
    while not stop:
    print(count)
    count += 1
    time.sleep(1)


    # starting both of the threads
    t1 = threading.Thread(target=input_thread).start()
    t2 = threading.Thread(target=output_thread).start()
    """

    """
    ## ----- Synchronizing Threads -- WITHOUT LOCKING ----- ##

    # NOTE -- threading module has built-in functionality to help implement
    # -- locking that can be used to synchronize threads

    # NOTE -- locking can be used to control access to shared resources
    # -- or to prevent corruption or dead-lock issues with paralell processing
    # -- the lock controlls which thread is allowed to access a resource during
    # -- execution

    # EXAMPLE -- we have a list, and want a number of threads to pop a value
    # -- off of that list
    # WITHOUT A LOCK: different order with each execution
    # ISSUE: we do NOT want to multiple threads to pop the same value from the list
    # -- by mistake

    # IDEA -- prevent simultaneous modification of a variable
    # -- lock will force an operation to wait, until the variable is unlocked in
    # -- order to access or modify it

    # create a lock
    data_lock = threading.Lock()

    # create the list
    data = [x for x in range(1000)]

    # consume thread
    def consume_thread():
    global data_lock
    while len(data) > 0:
    print(data.pop())

    # create the thread and start straight away
    threading.Thread(target=consume_thread).start()
    threading.Thread(target=consume_thread).start()
    threading.Thread(target=consume_thread).start()
    """

    ## ----- Synchronizing Threads -- Locking ----- ##

    # NOTE -- due to the locking mechanism, the popping is done in
    # -- sequential order

    # create a lock
    data_lock = threading.Lock()

    # create the list
    data = [x for x in range(1000)]

    # consume thread
    def sync_consume_thread():
    global data_lock, data

    while True:
    # acquire the lock -- this is blocking
    data_lock.acquire()

    while len(data) > 0:
    # display the current thread, and the popped item
    print(threading.current_thread().name, data.pop())

    # release the lock -- the waiting threads are allowed to take it
    data_lock.release()

    # create the thread and start straight away
    threading.Thread(target=sync_consume_thread).start()
    threading.Thread(target=sync_consume_thread).start()
    threading.Thread(target=sync_consume_thread).start()


    print("-"*80)
    ## --------------- END --------------- ##

Pycryptodome

  • General
    • fork of the pycrypto library | last commit to the original in 2014
    • here | to perform common encryption tasks
    • can be used for symmetric/asymmetric ciphers, hashes, message authentication codes,
  • Setup | pip install pycryptodome
  • Note | Password Based Derivation Functions
    • Basic Idea | use the function to generate a byte sequence based on a string provided from the user
  • Note | Padding
    • for a symmetric cipher we need to be able to pad our data up to the length of a block size
  • demo-example | "crypto_demo.py" | (play around with commenting/uncommenting as you see fit)
    from Crypto.Random import get_random_bytes

    print("-"*80)
    ## ----- Crypto -- Get Random Bytes ----- ##

    # return random byte stream of the lenght specified
    key = get_random_bytes(32)
    print(key)
    # key length: 32 bytes = 256 bits
    print(len(key))


    print("-"*80)
    ## ----- Crypto -- Key Derivation Functions ----- ##

    from Crypto.Protocol.KDF import PBKDF2

    # salt is needed for PBKDF2
    # -- to help prevent against dictionary based attacks
    # -- the value does NOT necessary need to be kept secret
    # -- but need to be random for each derivation

    # random salt
    salt = get_random_bytes(32)
    # static salt -- to demonstrate static behaviour for the derived key
    # -- each time the same key will be derived with the same salt and password
    salt = b'\x8f\xa4\xc2\xc2s\x95\xf3F\xee\x97`\x10n\xcd\x88\xce\xb5v\xafi\x0cJ\x05\xce\x01\xbbSTc\xe02\xe0'

    # the secret password used to generate the key
    password = "hunter21"

    # generate the key -- dkLen:cummulative length of the keys to produce
    key = PBKDF2(password, salt, dkLen=32)
    print(key)
    print(len(key))


    print("-"*80)
    ## ----- Symmetric Encryption -- Block ciphers ----- ##

    # import the AES cipher
    from Crypto.Cipher import AES
    # import some padding function
    from Crypto.Util.Padding import pad, unpad

    # the stream to encrypt
    to_encrypt = b"encrypt_me!"

    # create the cipher -- mode:CBC (cipher block chaining)
    cipher = AES.new(key, AES.MODE_CBC)

    # automatically generated initialization vector -- IV
    print(cipher.iv)

    # encrypting data with AES
    ciphered_data = cipher.encrypt(pad(to_encrypt, AES.block_size))
    print(ciphered_data)

    """
    print("-"*80)
    ## ----- Symmetric Decryption -- Block ciphers -- New IV issue ----- ##

    # NOTE: a cipher object is stateful
    # -- once we already encrypted a message, we CAN also perform decryption
    # -- with that same cipher object -- it goes both ways
    # BEWARE: using a different object for decryption will use a different
    # -- new random IV --> will NOT decrypt properly

    # setting up a new cipher
    cipher = AES.new(key, AES.MODE_CBC)

    # decrypt the data
    plaintext_data = cipher.decrypt(ciphered_data)

    # verify -- jumbled data -- not "encrypt_me!" text
    # -- REASON: we are using an other random IV
    print(plaintext_data)
    """

    """
    print("-"*80)
    ## ----- Symmetric Decryption -- Block ciphers -- Same IV ----- ##

    # specify the SAME for the encryption used IV
    cipher = AES.new(key, AES.MODE_CBC, iv=cipher.iv)

    # unpad the decrypted data -- remove the padding bytes
    plaintext_data = unpad(cipher.decrypt(ciphered_data), AES.block_size)
    print(plaintext_data)
    """

    """
    print("-"*80)
    ## ----- Symmetric Encryption/Decryption -- Stream ciphers ----- ##

    from Crypto.Cipher import ARC4

    # create a new cipher -- use the same key
    cipher = ARC4.new(key)

    # encrypt the data -- not padded to a blocksize
    encrypted = cipher.encrypt(to_encrypt)
    print(encrypted)

    # decrypt the data
    cipher = ARC4.new(key)
    plaintext = cipher.decrypt(encrypted)
    print(plaintext)
    """

    print("-"*80)
    ## ----- Asymmetric Ciphers -- RSA -- Public/Private Key Generation ----- ##
    # NOTE -- access to a public and private RSA key pair is needed
    # -- or generate them here

    # importing RSA
    from Crypto.PublicKey import RSA

    # generate an RSA key pair -- 1024 bit
    key = RSA.generate(1024)

    # password protect the key
    encrypted_key = key.exportKey(passphrase=password)

    # print the encrypted RSA private key
    print(encrypted_key)

    # the public RSA key
    pub = key.publickey()

    # print the RSA public key
    print(pub.exportKey())


    """
    print("-"*80)
    ## ----- Asymmetric Ciphers -- RSA -- Checking Pub/Priv Key Capabilities ----- ##

    # check the capability for encrypting data for the key
    print(key.can_encrypt())

    # check the capability for signing data for the key
    print(key.can_sign())

    # check whether the private key is present in the object or not
    # -- the private key is indeed present in the private key object
    print(key.has_private())

    # check whether the public key is present in public key object or not
    # -- no, the private key is not present in the public key object
    print(pub.has_private())
    """

    """
    print("-"*80)
    ## ----- Asymmetric Ciphers -- RSA -- Enryption/Decryption ----- ##
    # NOTE -- we can only encrypt data that is slightly shorter than the
    # -- RSA modulus
    # HERE -- our encryption message is also short --> no issue

    # PADDING -- Optimal Asymmetric Encryption Padding -- OAEP
    # -- padding scheme often used together with RSA encryption
    from Crypto.Cipher import PKCS1_OAEP

    # ISSUE -- trying to encrypt a larger message
    # -- will raise an error: ValueError -- "Plaintext is too long"
    # to_encrypt = b"A" * 1000

    # create the cipher
    cipher = PKCS1_OAEP.new(pub)

    # encrypt the data -- encrypt with the public key
    encrypted = cipher.encrypt(to_encrypt)
    print(encrypted)

    # decrypt the data -- decrypt with the private key
    cipher = PKCS1_OAEP.new(key)
    plaintext = cipher.decrypt(encrypted)
    print(plaintext)
    """


    print("-"*80)
    ## ----- Create and Verify Digital Signatures ----- ##
    # NOTE -- digital signatures are based on a public key cryptography concept
    # -- whoever signs the message, needs to have the private key
    # -- whoever verifies it, needs to have the public key

    # HERE -- create and verify simple digital signatures by performing a modular
    # -- exponentiation, by encrypting and decrypting a message hash

    from Crypto.Hash import SHA512

    # calculate the SHA512 hash -- in raw bytes format
    plain_hash = SHA512.new(to_encrypt).digest()

    # calculate the integer represented by this given array of bytes -- in dec format
    hashed = int.from_bytes(plain_hash, byteorder='big')
    print(hashed)

    # calculate the signature
    # -- raising the hash to the power of d modulo n
    # -- d and n -- were created when we set up our key
    signature = pow(hashed, key.d, key.n)
    print("Signature: {}".format(signature))

    # verify the signature
    # -- raising the signature to the power of e modulo n
    # -- using the key material from our public key
    signature_hash = pow(signature, key.e, key.n)
    print(signature_hash)

    # verify they are the same --> valid signature
    print(hashed == signature_hash)


    print("-"*80)
    ## --------------- END --------------- ##

Argparse

  • General | similar to sys.argv for parsing and using command line arguments
    • messy and difficult to maintain and use over time
  • Here | to parse and perform sanity checks on data passed as part of the command line
    • to write command line interfaces (cli) flexibly
  • standard library module | no need to install
  • Note | Positional Arguments
    • are read from the command line in the order that they appear after the script name
  • demo-example | "argparse_demo.py" | (play around with commenting/uncommenting as you see fit)
    import argparse

    print("-"*80)
    ## ----- Positional arguments ----- ##

    # create a parser
    parser = argparse.ArgumentParser(description="Example Python CLI")

    # adding an arguments -- help will be displayed when "-h" is used
    # -- information will be included as part of the help output
    parser.add_argument("hacker_name", help="Enter the hacker name", type=str)
    parser.add_argument("hacker_power", help="Enter the hacker power", type=int)


    print("-"*80)
    ## ----- Optional arguments ----- ##
    # NOTE -- optional arguments are not required, unless specified
    # -- can be added to the command line in any order
    # -- by default given the value of None when it's not being used

    # shorthand;long;
    # -- action: defines what happens when the command is provided on the cli
    # -- store_true: stores the value true into the argument if it is provided
    # -- on the command line
    parser.add_argument("-bh", "--blackhat", default=False, action="store_true")

    # make it required
    # parser.add_argument("-bh", "--blackhat", required=True, default=False, action="store_true")

    parser.add_argument("-wh", "--whitehat", default=True, action="store_true")
    # parser.add_argument("-wh", "--whitehat", default=True, action="store_false")


    # specify what a valid choice looks like
    # -- ht:hacker's type
    parser.add_argument("-ht", "--hackertype", choices=["whitehat","blackhat","greyhat"])
    ## using it `python argparse_demo.py neut 10 -ht whitehat`


    print("-"*80)
    ## ----- Accessing the Arguments ----- ##

    # parsing the arguments
    args = parser.parse_args()

    # check the arguments
    print(args)

    print(args.hacker_name)

    if args.blackhat:
    hacker_type = "blackhat"
    elif args.whitehat:
    hacker_type = "whitehat"
    else:
    hacker_type = "unknown"

    print("{} is a {} type of hacker".format(args.hacker_name, hacker_type))


    print("-"*80)
    ## --------------- END --------------- ##