Today I Learned …

[Python] 간이 Proxy 만들기 (Python Socket Programming) 본문

Computer/취미

[Python] 간이 Proxy 만들기 (Python Socket Programming)

염베리 2021. 12. 29. 02:18

* 개인적인 공부 내용을 기록한 글입니다.


나만의 Proxy 만들기 (Python Socket Programming)

 

 

정말 뜬금없이 조금은 특이한 도전을 해보게 되었다.

바로 소켓 프로그래밍을 이용하여 간이 프록시를 만드는 것이다!


구현 목표는 다음과 같다.

1. telnet을 이용해 내 프록시에 접근한다.

2. telnet쪽에서 send 명령어를 통해 보낸 HTTP 요청을 적절히 가공하여 본래의 서버로 대신 요청한다.

3. 본래의 서버로부터 받은 응답 패킷을 프록시에 캐싱한 후 telnet측에 대신 응답한다.

4. 이후 telnet측으로부터 동일한 요청을 받았을 시 본래의 서버로 요청을 넘기지 않고 즉시 캐시 데이터를 전달한다.

 

이를 위해 고안한 메커니즘은 다음과 같다.

1. 사용자로부터 요청 패킷을 받을 서버 소켓을 만든다.

2. 전달받은 요청 패킷을 디코딩한 후, 본래의 서버로 전달하기 적절한 형태로 가공해준다.

3. 본래의 서버로 요청 패킷을 전송할 클라이언트 소켓을 만든다.

4. 가공한 패킷을 인코딩하여 본래의 서버로 전달한다.

5. 전달받은 응답 패킷을 디코딩한 후, URL과 페이로드를 매칭하여 캐싱한다.

6. 응답 패킷을 인코딩한 후 사용자에게 전달한다.

 

추가적으로 구현할 사항은 다음과 같다.

1. 프록시 프로그램 실행 시 옵션으로 포트 번호를 지정할 수 있도록 한다.

2. 유효하지 않은 요청을 받았을 시 Bad Request로 응답한다.

3. GET 이외의 요청을 받았을 시 Not Implemented로 응답한다.

4. Selectors를 사용하여 소켓 멀티플렉싱을 구현한다.


최종 스크립트

 

 

소켓 멀티플렉싱 구현 전/후의 최종 스크립트를 모두 첨부한다.

* 소켓 멀티플렉싱을 구현하지 않은 스크립트가 가독성이 훨씬 높기 때문에 흐름 파악에 적절하다.


My Proxy (w/o. Socket Multiplexing)

import os
import sys
import socket
import argparse

#커맨드로부터 포트번호 받기
parser = argparse.ArgumentParser()
parser.add_argument('-p', type=int, help="port number")
args = parser.parse_args()
my_port = args.p
if(my_port):
    pass
else:
    print("")
    print("[!] python <file.py> -p <port> 형식으로 접속해주세요!")
    sys.exit()

#터미널창 비우기
os.system('cls')
print("Proxy Started\n")

#Cache 사전
cache = {}
#HTTP 메소드 종류
http = ["GET","POST","DELETE","PUT","PATCH","HEAD","OPTION","TRACE"]

#사용자와 연결하기 위한 소켓 생성
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(('',my_port))
s.listen(10) #리슨
print(f">> [{my_port}] Listening...\n")
c, addr = s.accept() #연결
print(">> Connected!\n")

while True:
    req = c.recv(65535).decode() #요청 패킷 가져와서 디코딩

    #telnet에서 엔터쳤을 때 IndexError방지
    try:
        msg = req.split()[0] #메소드
        url = req.split()[1] #url
    except:
        info = "[!] Send some Request!\n"
        info = info.encode()
        c.send(info)
        continue

    try:
        index = '/'.join(url.split("/")[3:]) #파일경로
        root = url.split("/")[2] #IP/도메인
    except:
        index = ''
        root = url

    if(msg!="GET"): #GET이 아닌 경우
        if(msg in http):
            info = "[!] Not Implemented(501)!\n"
        else:
            info = "[!] Bad Request(400)!\n"
        print(info)
        info = info.encode()
        c.send(info)
        print(">> Keep going...\n")
        continue
    else: #GET인 경우
        print("* * * Request * * *\n")
        print(req+"\n")

        #root + index
        target = root+"/"+index

        #Cache가 있는 경우
        if(target in cache):
            get = cache[target]
            get = get.replace("$$$",":")
            get = get.replace("###","{")
            get = get.replace("@@@",",")
            get = get.encode()
            c.sendall(get)
            print(">> Cache Used!\n")
            continue

        #절대경로 -> 상대경로
        new_req = f"GET /{index} HTTP/1.1\r\nHost: {root}\r\nConnection: close\r\n\r\n"
        new_req = new_req.encode()

        #오리진 서버와 연결하기 위한 소켓 생성
        s2 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        try:
            host = socket.gethostbyname(f"{root}")
        except:
            info = "[!] There's no such Host!\n"
            print(info)
            info = info.encode()
            c.send(info)
            print(">> Keep going...\n")
            continue
        host_port = 80
        s2.connect((host, host_port))

        #오리진 서버에 요청 패킷 전송
        s2.sendall(new_req)
        value = "" #응답 패킷 내용을 저장할 빈 문자열
        bad = False

        while True:
            res = s2.recv(65535) #응답 가져오기
            if not res:
                break
            #응답 패킷을 Cache 사전에 저장하기 위해 디코딩
            deco = res.decode()
            deco = deco.replace(":","$$$")
            deco = deco.replace("{","###")
            deco = deco.replace(",","@@@")
            value += deco #문자열에 패킷 내용 저장
            if(deco.split()[1]=="400"):
                info = "[!] Bad Request(400)!\n"
                print(info)
                info = info.encode()
                c.send(info)
                bad = True
                break
            #응답 패킷을 사용자에게 전송
            c.sendall(res)
        print(">> Send Success!\n")

        if(bad):
            print(">> Keep going...\n")
            continue

        #Cache 저장
        cache[target] = value
        print(">> New Cache Written!\n")
        print("* * * Cache List * * *")
        for index, key in enumerate(cache.keys()):
            print(f"[{index+1}] {key}")
        print("\n>> Keep going...\n")

My Proxy (w. Socket Multiplexing)

import os
import sys
import socket
import argparse
import selectors

sel = selectors.DefaultSelector()
conn_list = []

def accept(sock, mask):
    conn, addr = sock.accept()  # Should be ready
    conn_list.append(conn)  
    print(f">> [Socket {conn_list.index(conn)}] Connected! : {conn}\n")
    print(f"* * * Connection List * * *")
    for index, c in enumerate(conn_list):
        print(f"[Socket {index}] {c}")
    print("* * * * * * * * * * * * * *\n")
    conn.setblocking(False)
    sel.register(conn, selectors.EVENT_READ, read)

def read(conn, mask):
    req = conn.recv(65535)
    if req:
        req = req.decode() #요청 패킷 가져와서 디코딩

        #telnet에서 엔터쳤을 때 IndexError방지
        try:
            msg = req.split()[0] #메소드
            url = req.split()[1] #url
        except:
            info = "[!] Send some Request!\n"
            info = info.encode()
            conn.send(info)
            return

        try:
            index = '/'.join(url.split("/")[3:]) #파일경로
            root = url.split("/")[2] #IP/도메인
        except:
            index = ''
            root = url.split("/")[2]

        if(msg!="GET"): #GET이 아닌 경우
            if(msg in http):
                info = "[!] Not Implemented(501)!\n"
            else:
                info = "[!] Bad Request(400)!\n"
            print(f"[Socket {conn_list.index(conn)}] {info}")
            info = info.encode()
            conn.send(info)
            print(">> Keep going...\n")
            return
        else: #GET인 경우
            print(f"* * * Request From [Socket {conn_list.index(conn)}] * * *")
            print(req)
            print("* * * * * * * * * * * * * * * * * *\n")

            #root + index
            target = root+"/"+index

            #Cache가 있는 경우
            if(target in cache):
                get = cache[target]
                get = get.replace("$$$",":")
                get = get.replace("###","{")
                get = get.replace("@@@",",")
                get = get.encode()
                conn.sendall(get)
                print(f">> [Socket {conn_list.index(conn)}] Cache Sent!\n")
                return

            #절대경로 -> 상대경로
            new_req = f"GET /{index} HTTP/1.1\r\nHost: {root}\r\nConnection: close\r\n\r\n"
            new_req = new_req.encode()

            #오리진 서버와 연결하기 위한 소켓 생성
            s2 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            try:
                host = socket.gethostbyname(f"{root}")
            except:
                info = "[!] There's no such Host!\n"
                print(info)
                info = info.encode()
                conn.send(info)
                print(">> Keep going...\n")
                return
            host_port = 80
            s2.connect((host, host_port))

            #오리진 서버에 요청 패킷 전송
            s2.sendall(new_req)
            value = "" #응답 패킷 내용을 저장할 빈 문자열
            bad = False

            while True:
                res = s2.recv(65535) #응답 가져오기
                if not res:
                    break
                #응답 패킷을 Cache 사전에 저장하기 위해 디코딩
                deco = res.decode()
                deco = deco.replace(":","$$$")
                deco = deco.replace("{","###")
                deco = deco.replace(",","@@@")
                value += deco #문자열에 패킷 내용 저장
                if(deco.split()[1]=="400"):
                    info = "[!] Bad Request(400)!\n"
                    print(f"[Socket {conn_list.index(conn)}] {info}")
                    info = info.encode()
                    conn.send(info)
                    bad = True
                    break
                #응답 패킷을 사용자에게 전송
                conn.sendall(res)
            print(f">> [Socket {conn_list.index(conn)}] Send Success!\n")

            if(bad):
                print(">> Keep going...\n")
                return

            #Cache 저장
            cache[target] = value
            print(">> New Cache Written!\n")
            print("* * * Cache List * * *")
            for index, key in enumerate(cache.keys()):
                print(f"{index+1}. {key}")
            print("* * * * * * * * * * *")
            print("\n>> Keep going...\n")
        
    else:
        print(f">> [Socket {conn_list.index(conn)}] Socket Closed! : {conn}\n")
        conn_list[conn_list.index(conn)] = "Closed"
        print(f"* * * Connection List * * *")
        for index, c in enumerate(conn_list):
            print(f"[Socket {index}] {c}")
        print("* * * * * * * * * * * * * *\n")
        sel.unregister(conn)
        conn.close()

#커맨드로부터 포트번호 받기
parser = argparse.ArgumentParser()
parser.add_argument('-p', type=int, help="port number")
args = parser.parse_args()
my_port = args.p
if(my_port):
    pass
else:
    print("")
    print("[!] python <file.py> -p <port> 형식으로 접속해주세요!")
    sys.exit()

#터미널창 비우기
os.system('cls')
print("Proxy Started\n")

#Cache 사전
cache = {}
#HTTP 메소드 종류
http = ["GET","POST","DELETE","PUT","PATCH","HEAD","OPTION","TRACE"]

#사용자와 연결하기 위한 소켓 생성
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(('',my_port))
sock.listen(10) #리슨
print(f">> [Port {my_port}] Listening...\n")
sock.setblocking(False)
sel.register(sock, selectors.EVENT_READ, accept)

while True:
    events = sel.select()
    for key, mask in events:
        callback = key.data
        callback(key.fileobj, mask)

실행 결과

 

 

다음은 위 스크립트를 각각 실행하여 프록시 기능을 테스트한 결과이다.


1. My Proxy (w/o. Socket Multiplexing)


2. My Proxy (w. Socket Multiplexing)

프로필사진
berry
FE Developer, loves React & better DX
Comments