设计思路 安全协议的目的就是传输数据 并且实现两个基础目标 安全性和可靠性 前者指数据不会被泄露 后者指数据不会被篡改 所以数据的传输过程 可以归类于四个阶段 “发” “传“ ”收“ ”验“ 发送阶段 需要对数据进行加密 这里有两种思路 一种是使用对称加密 还有一种是选择非对称加密 从安全性上 个人更倾向使用后者 传输阶段 这里打算使用socket来负责两个设备之间的通讯 同时还需要考虑进行身份验证 防止非法用户传输 接受阶段 为了防止攻击者使用过量的数据来瘫痪通讯 也许需要加入过滤机制来接受数据 这一点考虑到实现的复杂 是否加入还有待考虑 验证阶段 需要对接受到的数据进行检测 以防止数据被篡改或者是传输不完整
传输阶段的一些尝试 对于socket的使用并不是非常熟练 打算先通过一些简单的交互来加深一下 本机的ubuntu虚拟机同时充当客户端和服务端 使用python语言编写
v0.1 服务端代码
import socket s = socket.socket() #创建套接字 默认使用TCP协议 s.bind(("127.0.0.1",6666)) s.listen(5) #最多连接5个客户端 print("等待连接中") while 1: sock,addr = s.accept() print(sock,addr) while 1: text = sock.recv(1024) #接受的最大字节数为1024 if len(text.strip()) == 0: print("服务端接收到客户端的数据为空") else: print("收到客户端发送的数据为:{}".format(text.decode())) content = input("请输入发送给客户端的信息:") sock.send(content.encode()) sock.close()
客户端代码
import socket s = socket.socket() s.connect(("127.0.0.1",6666)) print("成功连接") while 1: data = input("需要向服务端传输的数据 :") s.send(data.encode()) text = s.recv(1024).decode() print("服务端发送的数据为:{}",format(text))
实现效果
身份验证的实现 目前思考的是采用哪种身份验证方案 一种是采用第三方验证 比如auth0 这种的虽然实现起来更加高级且安全 第二种就是只需要做到安全的密钥交换 然后采用静态的用户密码库存储(也可以更换成sql) 然后加密传输用户名密码即可 考虑到第一种方法工作量太大 而且实现起来复杂(其实就是不会得边学边写) 还是乖乖采用第二种方法吧 这里采用dh算法 来实现在不安全的通讯中传输共享密钥 获得到共享密钥后 使用aes加密算法来加密用户名和密码 暂时是通过静态的用户名和密码库来实现身份验证 同时这里还存在一个攻击漏洞 对于密码的检测是遍历密码库中的所有密码 有满足的即可 并没有检验是否对应该用户
v0.2 服务端代码
import socket import random import math from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import dh from cryptography.hazmat.backends import default_backend from Crypto.Util.Padding import pad, unpad from Crypto.Cipher import AES users = ['user1','user2','user3'] passwds = ['aaaa','bbbb','cccc'] def key_switch(s,public_key_server): print("开始进行公钥交换") sock,addr = s.accept() public_key_client = sock.recv(1024) public_key_bytes = public_key_server.public_bytes( encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo ) sock.send(public_key_bytes) return serialization.load_pem_public_key(public_key_client),sock,addr def Authentication(sock,shared_key): key = shared_key[:16] # 确保密钥长度符合要求(256 位) aes = AES.new(key,AES.MODE_ECB) username = (sock.recv(1024)) decrypted_username = aes.decrypt(username) username = (unpad(decrypted_username, 16)).decode() if username in users: sock.send(f"找到用户{username} 接下来进行密码验证".encode()) print("用户名正确") password = (sock.recv(1024)) decrypted_password = aes.decrypt(password) password = (unpad(decrypted_password, 16)).decode() if password in passwds: sock.send("身份验证通过".encode()) print("密码正确") return else : sock.send("密码错误,终止连接".encode()) sock.close() exit() else: sock.send("你输入的用户名不存在 终止本次连接".encode()) sock.close() exit() def main(): # 固定的 DH 参数 p = 0xB7E4DE6A253A6E3A1F7F6E2A8F9FBA6E8C6F1A8F9C7D7F7AA6A3E1E9B7EAE4E8C6F7F6B7E4D6E8C6A3E2E9E8F9F7E5E8C2D1E8F8C6D2E5A1F5E2A5E8D8C5D8A5E5B7A5F5A8D7A5F1E5B4D5A1 g = 2 # 创建 DH 参数对象 parameters = dh.DHParameterNumbers(p, g).parameters(backend=default_backend()) private_key_server = parameters.generate_private_key() public_key_server = private_key_server.public_key() s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) #创建套接字 默认使用TCP协议 s.bind(("10.0.137.2",6666)) s.listen(5) #最多连接5个客户端 public_key_client,sock,addr = key_switch(s,public_key_server) print("公钥交换成功") print("开始计算共享密钥") shared_key = private_key_server.exchange(public_key_client) print("等待连接中") while 1: print(sock,addr) print("开始进行身份验证") Authentication(sock,shared_key) while 1: text = sock.recv(1024) #接受的最大字节数为1024 if len(text.strip()) == 0: print("服务端接收到客户端的数据为空") else: print(f"收到客户端发送的数据为:{text.decode()}") content = input("请输入发送给客户端的信息:") sock.send(content.encode()) sock.close() if __name__ == "__main__": main()
客户端代码
import socket import math import random from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import dh from cryptography.hazmat.backends import default_backend from Crypto.Util.Padding import pad, unpad from Crypto.Cipher import AES def key_switch(s,public_key_client): print("开始进行公钥交换") s.connect(("69.165.67.133",6666)) public_key_bytes = public_key_client.public_bytes( encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo ) s.send(public_key_bytes) public_key_server = s.recv(1024) return serialization.load_pem_public_key(public_key_server) def Authentication(s,shared_key): print("进行身份验证") key = shared_key[:16] # 确保密钥长度符合要求(256 位) aes = AES.new(key,AES.MODE_ECB) username = input("输入用户名:") username_bytes = pad(username.encode('utf-8'), AES.block_size) encrypt_username = aes.encrypt(username_bytes) s.send(encrypt_username) text = s.recv(1024).decode() if text == "你输入的用户名不存在 终止本次连接": print(text) exit() else: password = input("输入密码:") password_bytes = pad(password.encode('utf-8'),AES.block_size) encrypt_password = aes.encrypt(password_bytes) s.send(encrypt_password) text = s.recv(1024).decode() if text == "密码错误,终止连接": print(text) exit() else : print(text) def main(): # 固定的 DH 参数 p = 0xB7E4DE6A253A6E3A1F7F6E2A8F9FBA6E8C6F1A8F9C7D7F7AA6A3E1E9B7EAE4E8C6F7F6B7E4D6E8C6A3E2E9E8F9F7E5E8C2D1E8F8C6D2E5A1F5E2A5E8D8C5D8A5E5B7A5F5A8D7A5F1E5B4D5A1 g = 2 # 创建 DH 参数对象 parameters = dh.DHParameterNumbers(p, g).parameters(backend=default_backend()) private_key_client = parameters.generate_private_key() public_key_client = private_key_client.public_key() s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) public_key_server = key_switch(s,public_key_client) print("公钥交换成功") print("开始计算共享密钥") shared_key = private_key_client.exchange(public_key_server) Authentication(s,shared_key) print("成功连接") while 1: data = input("需要向服务端传输的数据 :") s.send(data.encode()) text = s.recv(1024).decode() print(f"服务端发送的数据为:{text}") if __name__ == "__main__": main()
针对账号密码的问题 进行了一波优化 使用sqlite3模块和sql数据库配合使用 实现账号密码的校验 暂时只打算把注册账号的权限给admin用户 在权限校验这块还存在问题 使用的是username的检测 严谨一点的应该加入数据库的参数 这样方便后面更改用户的操作权限 这一版大致优化了一些交互 不过还是很简陋 预计后面要增加窗口化界面 同时目前最关键的问题是 dh算法的g参数 我还没去跑出来一个较大的数 还是使用的2
v0.3 服务端
import socket import random import math from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import dh from cryptography.hazmat.backends import default_backend from Crypto.Util.Padding import pad, unpad from Crypto.Cipher import AES import sqlite3 users = ['user1','user2','user3'] passwds = ['aaaa','bbbb','cccc'] conn = sqlite3.connect('users.db') cursor = conn.cursor() def key_switch(s,public_key_server): print("开始进行公钥交换") sock,addr = s.accept() public_key_client = sock.recv(1024) public_key_bytes = public_key_server.public_bytes( encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo ) sock.send(public_key_bytes) return serialization.load_pem_public_key(public_key_client),sock,addr def decrypt_data(shared_key,data): key = shared_key[:16] aes = AES.new(key,AES.MODE_ECB) decrypted_data = aes.decrypt(data) data = (unpad(decrypted_data, 16)).decode() return data def Authentication(sock,shared_key,cursor): key = shared_key[:16] aes = AES.new(key,AES.MODE_ECB) username = (sock.recv(1024)) username = decrypt_data(shared_key,username) cursor.execute("SELECT username FROM users WHERE username = ?", (username,)) result = cursor.fetchone() if result != None: sock.send(f"找到用户{username} 接下来进行密码验证".encode()) print("用户名正确") password = (sock.recv(1024)) decrypted_password = aes.decrypt(password) password = (unpad(decrypted_password, 16)).decode() cursor.execute("SELECT password FROM users WHERE password = ?", (password,)) result = cursor.fetchone() if result != None: sock.send("身份验证通过".encode()) print("密码正确") if username == "admin": permission_value = 1 else : permission_value = 0 return permission_value else : sock.send("密码错误,终止连接".encode()) sock.close() exit() else: sock.send("你输入的用户名不存在 终止本次连接".encode()) sock.close() exit() def menu(sock): text = """ 菜单选项: 1. 传输文本数据 2. 传输附件 3. 注册新用户 """ sock.send(text.encode()) def transfer_txt(): #还没写 return def transfer_file(): #还没写 return def register(permission_value,sock,shared_key,cursor): if permission_value == 0: sock.send("你所登录的用户并没有拥有管理员权限 无法注册用户".encode()) return else : sock.send("输入注册用户名".encode()) username = (sock.recv(1024)) username = decrypt_data(shared_key,username) sock.send("输入密码".encode()) password = sock.recv(1024) password = decrypt_data(shared_key,password) try: # 插入新的用户数据 cursor.execute("INSERT INTO users (username, password) VALUES (?, ?)", (username, password)) conn.commit() sock.send("用户注册成功".encode()) except sqlite3.IntegrityError: sock.send("用户已存在".encode()) except Exception as e: sock.send(e.encode()) def main(): # 固定的 DH 参数 p = 167369435262828772525424092470263277532300828423668354542113388987940885403967569624276882159444702063514952468172684099658558089467275283830276841486674376457626767287709752368374119769545320657500942952067517443133715688577345716588234520369470508343544601072272539124731608794518466242096737284389743744833 g = 2 # 创建 DH 参数对象 parameters = dh.DHParameterNumbers(p, g).parameters(backend=default_backend()) private_key_server = parameters.generate_private_key() public_key_server = private_key_server.public_key() s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) #创建套接字 默认使用TCP协议 s.bind(("10.0.137.2",6666)) s.listen(5) #最多连接5个客户端 public_key_client,sock,addr = key_switch(s,public_key_server) print("公钥交换成功") print("开始计算共享密钥") shared_key = private_key_server.exchange(public_key_client) print("等待连接中") while 1: print(sock,addr) print("开始进行身份验证") permission_value = Authentication(sock,shared_key,cursor) while 1: menu(sock) choice = (sock.recv(1024)).decode() if choice == "1": transfer_txt() elif choice == "2": transfer_file() elif choice == "3": register(permission_value,sock,shared_key,cursor) else: sock.send("输入的选项错误 请重新输入".encode()) sock.close() if __name__ == "__main__": main()
客户端
import socket import math import random from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import dh from cryptography.hazmat.backends import default_backend from Crypto.Util.Padding import pad, unpad from Crypto.Cipher import AES def key_switch(s,public_key_client): print("开始进行公钥交换") s.connect(("69.165.67.133",6666)) public_key_bytes = public_key_client.public_bytes( encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo ) s.send(public_key_bytes) public_key_server = s.recv(1024) return serialization.load_pem_public_key(public_key_server) def encrypt_data(shared_key,data): key = shared_key[:16] # 确保密钥长度符合要求(256 位) aes = AES.new(key,AES.MODE_ECB) data_bytes = pad(data.encode('utf-8'), AES.block_size) encrypt_data = aes.encrypt(data_bytes) return encrypt_data def Authentication(s,shared_key): print("进行身份验证") key = shared_key[:16] # 确保密钥长度符合要求(256 位) aes = AES.new(key,AES.MODE_ECB) username = input("输入用户名:") encrypt_username = encrypt_data(shared_key,username) s.send(encrypt_username) text = s.recv(1024).decode() if text == "你输入的用户名不存在 终止本次连接": print(text) exit() else: print(text) password = input("输入密码:") encrypt_password = encrypt_data(shared_key,password) s.send(encrypt_password) text = s.recv(1024).decode() if text == "密码错误,终止连接": print(text) exit() else : print(text) def register(s,shared_key): print((s.recv(1024)).decode()) username = input("输入用户名") encrypt_username = encrypt_data(shared_key,username) s.send(encrypt_username) password = input("输入密码") encrypt_password = encrypt_data(shared_key,password) s.send(encrypt_password) print((s.recv(1024)).decode()) def main(): # 固定的 DH 参数 p = 167369435262828772525424092470263277532300828423668354542113388987940885403967569624276882159444702063514952468172684099658558089467275283830276841486674376457626767287709752368374119769545320657500942952067517443133715688577345716588234520369470508343544601072272539124731608794518466242096737284389743744833 g = 2 # 创建 DH 参数对象 parameters = dh.DHParameterNumbers(p, g).parameters(backend=default_backend()) private_key_client = parameters.generate_private_key() public_key_client = private_key_client.public_key() s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) public_key_server = key_switch(s,public_key_client) print("公钥交换成功") print("开始计算共享密钥") shared_key = private_key_client.exchange(public_key_server) Authentication(s,shared_key) print("成功连接") while 1: print((s.recv(1024)).decode()) choice = input("输入选项:") s.send(choice.encode()) if choice == "3": register(s,shared_key) if __name__ == "__main__": main()
传输阶段的数据加密 这一章节的目的是设计出来一个数据包格式 常规的应该是由这几个部分组成 包头+密文长度+密文+校验和 这就要设计到两个算法 消息加密算法以及消息摘要算法 加密还是老样子 使用dh算法得到的共享密钥来进行aes加密 摘要算法的目的是为了保证消息的完整性 这里就使用md5算法 目前打算将数据包格式定义成下图所示 消息类型负责优化原本的服务端和客户端 原本的服务端针对传输数据类型不同以及新增用户的功能 是采用了菜单形式 过于简单且多了许多额外的交互 这里选择将choice变量融合到数据中一起传输 然后还得完善上一章节预留的 传输文件 这里打算暂时只包括传输图像 音频 文本文件 更多的格式和文件类型由于没有经过测试 不知道能不能完整的传输过去 对于服务端来说 接受到文件后 还需要判断属于哪种类型的文件 方便保存 想到的是两种方案 一种是客户端发送文件的时候额外发送文件的类型 服务端接受后按照类型保存文件 第二种是 在服务端接受数据后利用magic库来判断文件类型 但是这两种方法 似乎都很容易被绕过 要是客户端将马伪造成正常格式的文件传过来就不好玩了 虽然但是 本次只是打算设计一个能用的安全协议 所以安全性这里就只是预警一下 不去研究如何改进了 这里采用第二种方法 在编写传输图像文件的时候 遇到了一个新的问题 按照上面的数据包 单个数据包最多传输0xffffbytes 但是一个图像不止这些字节 所以就需要分段传输 那么我们就需要在数据包格式中的消息类型 加入是否属于切片传输的位置 来供服务端判断 迭代了数据包格式 前面的判断位之所以给了4byte 考虑到服务端处理切片数据的时候 需要确定当前是否属于最后一个切片(常规的协议一般在数据包中增加当前切片对于整段数据的偏移 这里我打算做简单一点) 所以前2个byte用来存储一共有多少个切片 后两个byte用来存储当前是第几个切片 而检测是否属于切片数据包也很容易 只要这4个byte不为\x00即可 同时 这个版本还优化了之前代码中的一些逻辑漏洞 容易造成服务端的server运行中断
v0.4 服务端
import socket import hashlib import magic from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import dh from cryptography.hazmat.backends import default_backend from Crypto.Util.Padding import pad, unpad from Crypto.Cipher import AES import sqlite3 import base64 users = ['user1','user2','user3'] passwds = ['aaaa','bbbb','cccc'] mime = magic.Magic(mime=True) conn = sqlite3.connect('users.db') cursor = conn.cursor() def md5_encrypt(data): # 创建 MD5 哈希对象 md5_hash = hashlib.md5() # 更新哈希对象 md5_hash.update(data) # 获取十六进制的哈希值 hash_result = md5_hash.hexdigest() return hash_result def key_switch(s,public_key_server): print("开始进行公钥交换") sock,addr = s.accept() public_key_client = sock.recv(1024) public_key_bytes = public_key_server.public_bytes( encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo ) sock.send(public_key_bytes) return serialization.load_pem_public_key(public_key_client),sock,addr def decrypt_data(shared_key,data): #data要为byte型 key = shared_key[:16] aes = AES.new(key,AES.MODE_ECB) decrypted_data = aes.decrypt(data) data = (unpad(decrypted_data, 16)).decode() return data def unpack_data(sock,shared_key,orign_data): sliced_sum = int.from_bytes(orign_data[:2],byteorder = 'big') sliced_num = int.from_bytes(orign_data[2:4],byteorder='big') data_length = int.from_bytes(orign_data[4:8], byteorder='big') data_type = int.from_bytes(orign_data[8:12], byteorder='big') data_md5 = (orign_data[-32:]).decode() base_data = decrypt_data(shared_key,orign_data[12:(data_length-32)]) data = base64.b64decode(base_data.encode()) value = verify_md5(data,data_md5,sock) if value == 0: sock.send(f"第{sliced_num+1}组数据校验失败".encode()) else : sock.send(f"第{sliced_num+1}组数据校验成功".encode()) return sliced_sum,sliced_num,data_type,data,value def Authentication(sock,shared_key,cursor): key = shared_key[:16] aes = AES.new(key,AES.MODE_ECB) username = (sock.recv(1024)) username = decrypt_data(shared_key,username) cursor.execute("SELECT username FROM users WHERE username = ?", (username,)) result = cursor.fetchone() if result != None: sock.send(f"找到用户{username} 接下来进行密码验证".encode()) print("用户名正确") password = (sock.recv(1024)) decrypted_password = aes.decrypt(password) password = (unpad(decrypted_password, 16)).decode() cursor.execute("SELECT password FROM users WHERE password = ?", (password,)) result = cursor.fetchone() if result != None: sock.send("身份验证通过".encode()) print("密码正确") if username == "admin": permission_value = 1 else : permission_value = 0 return permission_value else : sock.send("密码错误,终止连接".encode()) sock.close() return 3 else: sock.send("你输入的用户名不存在 终止本次连接".encode()) sock.close() return 3 def menu(sock): text = "开始进行数据传输 请按照规定格式发送数据" sock.send(text.encode()) def transfer_txt(addr,data): print(f"客户机{addr}传输的数据为: {data}") def transfer_file(sock,data): data_type = mime.from_buffer(data) if data_type == "text/plain": with open("output.txt", 'wb') as f: f.write(data.encode()) sock.send("成功接受到文件\n".encode()) def transfer_file2(sock,data,sliced_sum,sliced_num,shared_key): data_bytes = data while sliced_num != sliced_sum-1: orign_data = (sock.recv(2048)) sliced_sum,sliced_num,data_type,data,value = unpack_data(sock,shared_key,orign_data) data_bytes += data data_type = mime.from_buffer(data_bytes) if data_type == "image/png": with open("output.png", 'wb') as f: f.write(data_bytes) sock.send("成功接受到文件\n".encode()) # if data_type == "text/plain": # with open("output.txt", 'wb') as f: # f.write(data_bytes) # sock.send("成功接受到文件".encode()) def register(permission_value,sock,shared_key,cursor,data): if permission_value == 0: sock.send("你所登录的用户并没有拥有管理员权限 无法注册用户".encode()) return else : parts = data.split(b';') username = parts[0] password = parts[1] try: # 插入新的用户数据 cursor.execute("INSERT INTO users (username, password) VALUES (?, ?)", (username, password)) conn.commit() sock.send("用户注册成功".encode()) except sqlite3.IntegrityError: sock.send("用户已存在".encode()) except Exception as e: sock.send(e.encode()) def verify_md5(data, original_md5,sock): current_md5 = md5_encrypt(data) return current_md5 == original_md5 def main(): # 固定的 DH 参数 p = 167369435262828772525424092470263277532300828423668354542113388987940885403967569624276882159444702063514952468172684099658558089467275283830276841486674376457626767287709752368374119769545320657500942952067517443133715688577345716588234520369470508343544601072272539124731608794518466242096737284389743744833 g = 2 # 创建 DH 参数对象 parameters = dh.DHParameterNumbers(p, g).parameters(backend=default_backend()) private_key_server = parameters.generate_private_key() public_key_server = private_key_server.public_key() s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) #创建套接字 默认使用TCP协议 s.bind(("10.0.137.2",6666)) s.listen(5) #最多连接5个客户端 while 1: public_key_client,sock,addr = key_switch(s,public_key_server) print("公钥交换成功") print("开始计算共享密钥") shared_key = private_key_server.exchange(public_key_client) print("等待连接中") print(sock,addr) print("开始进行身份验证") permission_value = Authentication(sock,shared_key,cursor) if permission_value == 3: continue while 1: menu(sock) orign_data = (sock.recv(2048)) if len(orign_data) == 0: break sliced_sum,sliced_num,data_type,data,value = unpack_data(sock,shared_key,orign_data) if data_type == 1: transfer_txt(addr,data) elif data_type == 2: if sliced_sum != 0 or sliced_num != 0: transfer_file2(sock,data,sliced_sum,sliced_num,shared_key) else: transfer_file(sock,data) elif data_type ==3: register(permission_value,sock,shared_key,cursor,data) sock.close() if __name__ == "__main__": main()
客户端
import socket import struct import hashlib import base64 from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import dh from cryptography.hazmat.backends import default_backend from Crypto.Util.Padding import pad, unpad from Crypto.Cipher import AES choice_text = """ 1.传输文本数据 2.传输文件数据 3.新增用户 """ def md5_encrypt(data): # 创建 MD5 哈希对象 md5_hash = hashlib.md5() # 更新哈希对象 md5_hash.update(data) # 获取十六进制的哈希值 hash_result = md5_hash.hexdigest() return hash_result def pack_data(choice,base_data,shared_key,sliced,sum,data): sliced_sum = struct.pack('>H',sum) sliced_num = struct.pack('>H',sliced) choice_type = struct.pack('>I',choice) en_data = encrypt_data(shared_key,base_data.decode()) hash_data = md5_encrypt(data).encode() temp = choice_type+en_data+hash_data transfer_data = sliced_sum+sliced_num+struct.pack('>I',len(temp)+8)+temp return transfer_data def key_switch(s,public_key_client): print("开始进行公钥交换") s.connect(("69.165.67.133",6666)) public_key_bytes = public_key_client.public_bytes( encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo ) s.send(public_key_bytes) public_key_server = s.recv(1024) return serialization.load_pem_public_key(public_key_server) def encrypt_data(shared_key,data): key = shared_key[:16] aes = AES.new(key,AES.MODE_ECB) data_bytes = pad(data.encode('utf-8'), AES.block_size) encrypt_data = aes.encrypt(data_bytes) return encrypt_data def Authentication(s,shared_key): print("进行身份验证") key = shared_key[:16] aes = AES.new(key,AES.MODE_ECB) username = input("输入用户名:") encrypt_username = encrypt_data(shared_key,username) s.send(encrypt_username) text = s.recv(1024).decode() if text == "你输入的用户名不存在 终止本次连接": print(text) exit() else: print(text) password = input("输入密码:") encrypt_password = encrypt_data(shared_key,password) s.send(encrypt_password) text = s.recv(1024).decode() if text == "密码错误,终止连接": print(text) exit() else : print(text) def register(s,shared_key): username = input("输入用户名 :") password = input("输入密码 :") data = username+";"+password base_data = base64.b64encode(data.encode()) transfer_data = pack_data(3,base_data,shared_key,0,0,data.encode()) s.send(transfer_data) def transfer_txt(s,shared_key): text = input("输入数据 :") if len(text) > 1000: sliced = 0 base64_text = base64.b64encode(text.encode()) chunk_size = 1000 sliced_data = slice_string(base64_text, chunk_size) sliced_sum = len(sliced_data) for i in range(len(sliced_data)): transfer_data = pack_data(1,sliced_data[i],shared_key,sliced,sliced_sum,sliced_data[i]) sliced += 1 s.send(transfer_data) else : sliced = 0 sliced_sum = 0 base_text = base64.b64encode(text.encode()) transfer_data = pack_data(1,base_text,shared_key,sliced,sliced_sum,text.encode()) s.send(transfer_data) print((s.recv(1024)).decode()+"\n") def slice_string(data, chunk_size): #根据指定大小切片字符串 return [data[i:i + chunk_size] for i in range(0, len(data), chunk_size)] def transfer_file(s,shared_key): file_addr = input("请输入文件对于client.py的相对路径 :") with open(file_addr,'rb') as file: text = file.read() if len(text) > 1000: sliced = 0 chunk_size = 1000 sliced_data = slice_string(text, chunk_size) sliced_sum = len(sliced_data) print(f"一共有{sliced_sum}组数据需要传输\n") for i in range(len(sliced_data)): sliced_basedata = base64.b64encode(sliced_data[i]) transfer_data = pack_data(2,sliced_basedata,shared_key,sliced,sliced_sum,sliced_data[i]) sliced += 1 s.send(transfer_data) print((s.recv(1024)).decode()+"\n") else : sliced = 0 sliced_sum = 0 base_text = base64.b64encode(text) transfer_data = pack_data(2,base_text,shared_key,sliced,sliced_sum,text) s.send(transfer_data) print((s.recv(1024)).decode()+"\n") def main(): # 固定的 DH 参数 p = 167369435262828772525424092470263277532300828423668354542113388987940885403967569624276882159444702063514952468172684099658558089467275283830276841486674376457626767287709752368374119769545320657500942952067517443133715688577345716588234520369470508343544601072272539124731608794518466242096737284389743744833 g = 2 # 创建 DH 参数对象 parameters = dh.DHParameterNumbers(p, g).parameters(backend=default_backend()) private_key_client = parameters.generate_private_key() public_key_client = private_key_client.public_key() s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) public_key_server = key_switch(s,public_key_client) print("公钥交换成功") print("开始计算共享密钥") shared_key = private_key_client.exchange(public_key_server) Authentication(s,shared_key) print("成功连接") while 1: print((s.recv(1024)).decode()) choice = input(choice_text) if choice == "1": transfer_txt(s,shared_key) elif choice == "2": transfer_file(s,shared_key) elif choice == "3": register(s,shared_key) else : print("未找到输入的选项,请重新输入") continue if __name__ == "__main__": main()
代码迭代 针对一个简单协议所需要的功能已经大体实现了 接下来的任务就是优化服务端和客户端的代码 主要的目标大致如下 1.添加图形化界面 优化交互过程 2.优化代码逻辑 增加注释 提高代码美观度 3.完善程序功能
v0.5 该版本优化了代码逻辑 完善了注释 客户端
import socket import struct import hashlib import base64 import time from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import dh from cryptography.hazmat.backends import default_backend from Crypto.Util.Padding import pad, unpad from Crypto.Cipher import AES choice_text = """ 1.传输文本数据 2.传输文件数据 3.新增用户 输入选项: """ def md5_encrypt(data): #md5加密 用于完整性检测 md5_hash = hashlib.md5() md5_hash.update(data) hash_result = md5_hash.hexdigest() return hash_result def pack_data(choice,base_data,shared_key,sliced,sum,data): #将数据打包成报文 格式: sliced_judge(4bytes)+data_length(4bytes)+data_type(4bytes)+encrypted_data(Dynamic)+md5_data(32bytes) sliced_sum = struct.pack('>H',sum) sliced_num = struct.pack('>H',sliced) choice_type = struct.pack('>I',choice) en_data = encrypt_data(shared_key,base_data.decode()) hash_data = md5_encrypt(data).encode() temp = choice_type+en_data+hash_data transfer_data = sliced_sum+sliced_num+struct.pack('>I',len(temp)+8)+temp return transfer_data def key_switch(s,public_key_client): #交换公钥 print("等待连接服务端") s.connect(("69.165.67.133",6666)) print("连接上服务端 开始进行公钥交换") public_key_bytes = public_key_client.public_bytes( encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo ) s.send(public_key_bytes) public_key_server = s.recv(1024) return serialization.load_pem_public_key(public_key_server) def encrypt_data(shared_key,data): #aes加密 key = shared_key[:16] aes = AES.new(key,AES.MODE_ECB) data_bytes = pad(data.encode('utf-8'), AES.block_size) encrypt_data = aes.encrypt(data_bytes) return encrypt_data def Authentication(s,shared_key): print("开始进行身份验证") username = input("输入用户名:") encrypt_username = encrypt_data(shared_key,username) #用户名经过aes加密后传输 s.send(encrypt_username) text = s.recv(1024).decode() if text == "你输入的用户名不存在 终止本次连接": print(text) exit() else: print(text) password = input("输入密码:") encrypt_password = encrypt_data(shared_key,password) #密码经过aes加密后传输 s.send(encrypt_password) text = s.recv(1024).decode() if text == "密码错误,终止连接": print(text) exit() else : print(text) def register(s,shared_key): username = input("输入用户名 :") #账号密码用;来间隔后 采用base64编码后丢到pack_data函数中处理 password = input("输入密码 :") data = username+";"+password base_data = base64.b64encode(data.encode()) transfer_data = pack_data(3,base_data,shared_key,0,0,data.encode()) s.send(transfer_data) def transfer_txt(s,shared_key): text = input("输入数据 :") #这个函数负责处理直接传输的文本数据 但是仍然对于是否需要切片进行了判断 if len(text) > 1000: sliced = 0 base64_text = base64.b64encode(text.encode()) chunk_size = 1000 sliced_data = slice_string(base64_text, chunk_size) sliced_sum = len(sliced_data) for i in range(len(sliced_data)): transfer_data = pack_data(1,sliced_data[i],shared_key,sliced,sliced_sum,sliced_data[i]) sliced += 1 s.send(transfer_data) else : sliced = 0 sliced_sum = 0 base_text = base64.b64encode(text.encode()) transfer_data = pack_data(1,base_text,shared_key,sliced,sliced_sum,text.encode()) s.send(transfer_data) print((s.recv(1024)).decode()+"\n") def slice_string(data, chunk_size): #根据指定大小切片字符串 return [data[i:i + chunk_size] for i in range(0, len(data), chunk_size)] def transfer_file(s,shared_key): file_addr = input("请输入文件对于client.py的相对路径 :") #读取客户端本地文件的二进制流 with open(file_addr,'rb') as file: text = file.read() if len(text) > 1000: sliced = 0 chunk_size = 1000 sliced_data = slice_string(text, chunk_size) sliced_sum = len(sliced_data) print(f"一共有{sliced_sum}组数据需要传输\n") begin_time = time.time() for i in range(len(sliced_data)): sliced_basedata = base64.b64encode(sliced_data[i]) transfer_data = pack_data(2,sliced_basedata,shared_key,sliced,sliced_sum,sliced_data[i]) sliced += 1 s.send(transfer_data) text = (s.recv(1024)).decode() currect_time = time.time() temp = currect_time-begin_time print("\r"+text+f" 当前运行了{temp:.0f}s",end="") else : sliced = 0 sliced_sum = 0 base_text = base64.b64encode(text) transfer_data = pack_data(2,base_text,shared_key,sliced,sliced_sum,text) s.send(transfer_data) print((s.recv(1024)).decode()+"\n") def main(): # 固定的 DH 参数 p = 167369435262828772525424092470263277532300828423668354542113388987940885403967569624276882159444702063514952468172684099658558089467275283830276841486674376457626767287709752368374119769545320657500942952067517443133715688577345716588234520369470508343544601072272539124731608794518466242096737284389743744833 g = 2 parameters = dh.DHParameterNumbers(p, g).parameters(backend=default_backend()) private_key_client = parameters.generate_private_key() public_key_client = private_key_client.public_key() s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) public_key_server = key_switch(s,public_key_client) print("公钥交换成功") print("开始计算共享密钥") shared_key = private_key_client.exchange(public_key_server) Authentication(s,shared_key) print("成功连接") while 1: print((s.recv(1024)).decode()) choice = input(choice_text) if choice == "1": transfer_txt(s,shared_key) elif choice == "2": transfer_file(s,shared_key) elif choice == "3": register(s,shared_key) else : print("未找到输入的选项,请重新输入") continue if __name__ == "__main__": main()
服务端
import socket import hashlib import magic from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import dh from cryptography.hazmat.backends import default_backend from Crypto.Util.Padding import pad, unpad from Crypto.Cipher import AES import sqlite3 import base64 mime = magic.Magic(mime=True) conn = sqlite3.connect('users.db') cursor = conn.cursor() def md5_encrypt(data): md5_hash = hashlib.md5() #md5加密 md5_hash.update(data) hash_result = md5_hash.hexdigest() return hash_result def key_switch(s,public_key_server): print("等待客户机连接") sock,addr = s.accept() print(f"客户机{sock}{addr}连接成功") print("开始进行公钥交换") public_key_client = sock.recv(1024) public_key_bytes = public_key_server.public_bytes( #将客户端的公钥序列化为 PEM 格式的字节串 encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo ) sock.send(public_key_bytes) return serialization.load_pem_public_key(public_key_client),sock,addr #将接受到的服务端公钥反序列化为公钥对象 def decrypt_data(shared_key,data): #data要为byte型 key = shared_key[:16] aes = AES.new(key,AES.MODE_ECB) decrypted_data = aes.decrypt(data) data = (unpad(decrypted_data, 16)).decode() return data def unpack_data(sock,shared_key,orign_data): #拆解数据包 sliced_sum = int.from_bytes(orign_data[:2],byteorder = 'big') sliced_num = int.from_bytes(orign_data[2:4],byteorder='big') data_length = int.from_bytes(orign_data[4:8], byteorder='big') data_type = int.from_bytes(orign_data[8:12], byteorder='big') data_md5 = (orign_data[-32:]).decode() base_data = decrypt_data(shared_key,orign_data[12:(data_length-32)]) data = base64.b64decode(base_data.encode()) value = verify_md5(data,data_md5,sock) if value == 0: sock.send((f"第{sliced_num+1}组数据校验成功").encode()) else : sock.send((f"第{sliced_num+1}组数据校验成功").encode()) return sliced_sum,sliced_num,data_type,data,value def Authentication(sock,shared_key,cursor): #身份验证部分 账号密码存储在users.db数据库中 key = shared_key[:16] aes = AES.new(key,AES.MODE_ECB) username = (sock.recv(1024)) username = decrypt_data(shared_key,username) cursor.execute("SELECT username FROM users WHERE username = ?", (username,)) result = cursor.fetchone() if result != None: sock.send(f"找到用户{username} 接下来进行密码验证".encode()) print("用户名正确") password = (sock.recv(1024)) decrypted_password = aes.decrypt(password) password = (unpad(decrypted_password, 16)).decode() cursor.execute("SELECT password FROM users WHERE password = ?", (password,)) result = cursor.fetchone() if result != None: sock.send("身份验证通过".encode()) print("密码正确") if username == "admin": permission_value = 1 else : permission_value = 0 return permission_value else : sock.send("密码错误,终止连接".encode()) sock.close() return 3 else: sock.send("你输入的用户名不存在 终止本次连接".encode()) sock.close() return 3 def menu(sock): text = "开始进行数据传输" sock.send(text.encode()) def transfer_txt(addr,data): print(f"客户机{addr}传输的数据为: {data}") def transfer_file(sock,data): #处理非切片数据 data_type = mime.from_buffer(data) if data_type == "text/plain": with open("output.txt", 'wb') as f: f.write(data.encode()) sock.send("成功接受到文件\n".encode()) def transfer_file2(sock,data,sliced_sum,sliced_num,shared_key): #处理切片数据 data_bytes = data while sliced_num != sliced_sum-1: orign_data = (sock.recv(2048)) if len(orign_data) == 0: print("客户机意外中断连接") return sliced_sum,sliced_num,data_type,data,value = unpack_data(sock,shared_key,orign_data) data_bytes += data data_type = mime.from_buffer(data_bytes) if data_type == "image/png": with open("output.png", 'wb') as f: f.write(data_bytes) elif data_type == "audio/mpeg": with open("output.mp3", 'wb') as f: f.write(data_bytes) sock.send("成功接受到文件\n".encode()) def register(permission_value,sock,shared_key,cursor,data): if permission_value == 0: #管理员权限的检验较为简单 sock.send("你所登录的用户并没有拥有管理员权限 无法注册用户".encode()) return else : parts = data.split(b';') username = parts[0] password = parts[1] try: # 插入新的用户数据 cursor.execute("INSERT INTO users (username, password) VALUES (?, ?)", (username, password)) conn.commit() sock.send("用户注册成功".encode()) except sqlite3.IntegrityError: sock.send("用户已存在".encode()) except Exception as e: sock.send(e.encode()) def verify_md5(data, original_md5,sock): current_md5 = md5_encrypt(data) return current_md5 == original_md5 def main(): # 固定的 DH 参数 p = 167369435262828772525424092470263277532300828423668354542113388987940885403967569624276882159444702063514952468172684099658558089467275283830276841486674376457626767287709752368374119769545320657500942952067517443133715688577345716588234520369470508343544601072272539124731608794518466242096737284389743744833 g = 2 parameters = dh.DHParameterNumbers(p, g).parameters(backend=default_backend()) private_key_server = parameters.generate_private_key() public_key_server = private_key_server.public_key() s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) #创建套接字 默认使用TCP协议 s.bind(("10.0.137.2",6666)) s.listen(5) #最多连接5个客户端 while 1: public_key_client,sock,addr = key_switch(s,public_key_server) print("公钥交换成功") print("开始计算共享密钥") shared_key = private_key_server.exchange(public_key_client) print("开始进行身份验证") permission_value = Authentication(sock,shared_key,cursor) if permission_value == 3: print("客户机意外中断连接") continue while 1: menu(sock) orign_data = (sock.recv(2048)) if len(orign_data) == 0: print("客户机意外中断连接") break sliced_sum,sliced_num,data_type,data,value = unpack_data(sock,shared_key,orign_data) if data_type == 1: transfer_txt(addr,data) elif data_type == 2: if sliced_sum != 0 or sliced_num != 0: transfer_file2(sock,data,sliced_sum,sliced_num,shared_key) else: transfer_file(sock,data) elif data_type ==3: register(permission_value,sock,shared_key,cursor,data) sock.close() if __name__ == "__main__": main()
v0.6 这一版针对传输过慢的原因进行了排除 感觉是因为传之前过了一遍base64的原因 这里进行了删除 以前的代码实际上是没有考虑过多端连接的 所以这一版的代码主要针对多端连接进行优化 原本的代码 client1连接后 client2能够连接上服务器 但是交换公钥的时候会阻塞 这是因为公钥交换相关的代码没有考虑到多端的情况 来假设一种情况 客户机A连接上服务端后 进行密钥交换完成后 开始进入数据传输阶段 按照原本的程序设计 此时的服务端程序已经进入了while循环处理数据传输的请求 但是如果此时有客户机B想要连接服务端 服务端就不会去处理客户机B的密钥交换 为了解决这个问题 应该要使用线程池来处理多线程的情况 多线程的实现难点应该在于确保多个线程之间的共享数据同步 在设计程序之间并没有这方面的经验 所以打算先通过编写几个简单的程序来熟悉一下
多线程实现的学习 import threading def print_name(name): print(f"当前轮到{name}执行") print(threading.current_thread().name) def main(): print("多线程测试") thread1 = threading.Thread(target=print_name, args=("线程1",)) thread2 = threading.Thread(target=print_name,args=("线程2",)) thread1.start() thread2.start() if __name__ == "__main__": main()
通过定义一个thread对象 并且赋值target 即需要该线程执行的函数 同时给予参数 定义完成后thread对象完成初始化 通过start来运行线程 主进程和线程是同时运行的 如果想要让主进程阻塞 等待线程运行结束后再运行主线程 这时候可以使用join来让主进程等待线程执行完毕
import threading def print_name(name): print(f"当前轮到{name}执行") def main(): print("多线程测试") thread1 = threading.Thread(target=print_name, args=("线程1",)) # thread2 = threading.Thread(target=print_name,args=("线程2",)) thread1.start() print(f"thread1线程当前存活情况{thread1.is_alive()}") if __name__ == "__main__": main()
接下来加入join
import threading def print_name(name): print(f"当前轮到{name}执行") def main(): print("多线程测试") thread1 = threading.Thread(target=print_name, args=("线程1",)) # thread2 = threading.Thread(target=print_name,args=("线程2",)) thread1.start() thread1.join(1) print(f"thread1线程当前存活情况{thread1.is_alive()}") if __name__ == "__main__": main()
join的原理是等待对应thread对象执行完毕 或者给予参数 等待参数时间 所以也可以用来规划两个进程的执行先后顺序
import threading import time def print_name(name): time.sleep(0.5) print(f"当前轮到{name}执行") def main(): print("多线程测试") thread1 = threading.Thread(target=print_name, args=("线程1",)) thread2 = threading.Thread(target=print_name,args=("线程2",)) thread1.start() thread2.start() thread2.join() if __name__ == "__main__": main()
上述代码应该是先执行thread1 但是由于thread2调用了join 此时应该会先等待thread2执行完毕再执行其他线程 这里print_name函数的sleep也是为了防止thread1执行过快导致还没主进程还没执行thread2.join()就结束了 接着来看下面的程序
import threading import time def print_name(name): for i in range(10): time.sleep(0.5) print(f"现在是线程{threading.current_thread().name}第{i}次执行循环") def main(): print("多线程测试") thread1 = threading.Thread(target=print_name, args=("线程1",)) thread1.start() time.sleep(2) print("主进程即将结束") if __name__ == "__main__": main()
按照预期的效果 在输出”主进程即将结束”后 主进程应该就结束了 同时thread1执行全部循环至少需要5s 而主进程执行时间应该在2-5以内 但是事实是主进程要一直等到thread1执行完毕才能结束 这一点是由定义thread对象的时候 一个叫daemon的参数决定的 其默认值为False 即主进程等待thread1线程执行完毕后才能结束 如果改为True 主进程结束时 线程就会结束
import threading import time def print_name(name): for i in range(10): time.sleep(0.5) print(f"现在是线程{threading.current_thread().name}第{i}次执行循环") def main(): print("多线程测试") thread1 = threading.Thread(target=print_name, args=("线程1",),daemon=True) thread1.start() time.sleep(2) print("主进程即将结束") if __name__ == "__main__": main()
如果想要让线程1执行两个函数呢 即执行完函数A后再执行函数B 这一点实现起来也很简单 只需要在函数A的最后调用函数B即可
import threading import time def next_task(): print("这是第二个任务") def print_name(name): print("这是第一个任务") next_task() def main(): print("多线程测试") thread1 = threading.Thread(target=print_name, args=("线程1",)) thread1.start() if __name__ == "__main__": main()
但是这一操作的弊端也很明显 那就是假如此时我们需要创建两个线程 线程1需要先执行函数A再执行函数B 线程2需要先执行函数B再执行函数A 这时候就会产生冲突 这又得去编写一个中转函数来识别需求 然后做if分支了 显然十分麻烦 这时候就可以自定义thread类
import threading import time class myself_thread(threading.Thread): def __init__(self,name = None): threading.Thread.__init__(self,name=name) def run(self): print(f"这是线程{self.name}在运行") def test(self): print("这是一个测试函数") def main(): print("多线程测试") thread1 = myself_thread() thread1.run() thread1.start() thread1.test() if __name__ == "__main__": main()
如上述代码 我们自定义了一个myself_thread类 并且继承了threading.Thread 这个类的run函数以及其他函数都可以由我们自定义 针对不同的需求来调用函数也变得十分简单 如果服务端采用这种方式来新增线程 考虑到如果遭受ddos攻击 那么处理的线程数就会过多 这时候应该考虑采用线程池的方法来处理 线程池在程序启动时就创建大量空闲线程 在程序需要线程来执行函数的时候 就会分配一个线程出来 当线程执行完毕后 又会回到线程池 因此使用线程池可以有效的控制程序中并发线程的数量 运行下面这个程序 创建一个含有10个空闲线程的线程池
from concurrent.futures import ThreadPoolExecutor import threading import time def print_name(): print(f"现在是线程{threading.current_thread().name}正在执行") pool = ThreadPoolExecutor(max_workers=10) thread1 = pool.submit(print_name) thread2 = pool.submit(print_name)
输出的结果为 说明执行时间过短 在线程0执行完毕后 就作为空闲线程返回线程池 随后第二次需要执行print_name函数的时候 就又被分配出来了 为了印证猜想 在print_name函数中加入time.sleep(0.5) 这时就是由两个线程来处理了 还可以使用map方法来并发的调用线程 不过需要注意的是 (1,2,3)是赋予给print_name函数的参数 但是实际上print_name不需要参数 这一点暂时还没找到方法来优化 总感觉有点多余 但是不给予参数 map又无法指定调用多少个线程
from concurrent.futures import ThreadPoolExecutor import threading import time def print_name(number): print(f"现在是线程{threading.current_thread().name}正在执行") pool = ThreadPoolExecutor(max_workers=10) thread = pool.map(print_name,(1,2,3))
最后的输出结果仍然是全部由线程0来负责 说明map方法就是简化了多个submit方法
多线程应用 目前的整改思路如下: 服务端: 主进程负责处理accpet 有客户机连接就调用一个线程来处理密钥交换和数据传输 然后这个线程就专门负责这台客户机 如果客户机中断连接 那就释放线程 回到线程池 客户端: 这块的整改还在考虑中 因为就目前来说 传输效率确实有点感人了 传一个10mb的音频都需要十来分钟 如果客户端改成 可以并发的传输数据 即同时传输音频和图像 或者是把一个音频拆成多个线程来传输 这样效率应该可以大大提高 但是感觉编写难度有点大啊 看着来吧 服务端
import socket import hashlib import magic from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import dh from cryptography.hazmat.backends import default_backend from Crypto.Util.Padding import pad, unpad from Crypto.Cipher import AES import sqlite3 import base64 from concurrent.futures import ThreadPoolExecutor import threading mime = magic.Magic(mime=True) class query_thread(threading.Thread): def __init__(self,name = None): threading.Thread.__init__(self,name=name) def query_username(self,username): conn = sqlite3.connect('users.db') cursor = conn.cursor() cursor.execute("SELECT username FROM users WHERE username = ?", (username,)) result = cursor.fetchone() cursor.close() return result def query_password(self,username,password): conn = sqlite3.connect('users.db') cursor = conn.cursor() cursor.execute("SELECT password FROM users WHERE username = ? AND password = ?", (username, password)) result = cursor.fetchone() cursor.close() return result def add_user(sock,username,password): conn = sqlite3.connect('users.db') cursor = conn.cursor() try: # 插入新的用户数据 cursor.execute("INSERT INTO users (username, password) VALUES (?, ?)", (username, password)) conn.commit() sock.send("用户注册成功".encode()) except sqlite3.IntegrityError: sock.send("用户已存在".encode()) except Exception as e: sock.send(e.encode()) def md5_encrypt(data): if isinstance(data, str): data = data.encode() # md5加密需要bytes参数 md5_hash = hashlib.md5() #md5加密 md5_hash.update(data) hash_result = md5_hash.hexdigest() return hash_result def key_switch(sock,public_key_server): print(f"{threading.current_thread().name}开始进行公钥交换") try: public_key_client = sock.recv(1024) except Exception as e: print("接受错误",e) public_key_bytes = public_key_server.public_bytes( #将客户端的公钥序列化为 PEM 格式的字节串 encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo ) sock.send(public_key_bytes) return serialization.load_pem_public_key(public_key_client) #将接受到的服务端公钥反序列化为公钥对象 def decrypt_data(shared_key,data): #data要为byte型 key = shared_key[:16] aes = AES.new(key,AES.MODE_ECB) if isinstance(data,str): data = data.encode() decrypted_data = aes.decrypt(data) data = (unpad(decrypted_data, 16)).decode('latin1') return data def unpack_data(sock,shared_key,orign_data): #拆解数据包 sliced_sum = int.from_bytes(orign_data[:2],byteorder = 'big') sliced_num = int.from_bytes(orign_data[2:4],byteorder='big') data_length = int.from_bytes(orign_data[4:8], byteorder='big') data_type = int.from_bytes(orign_data[8:12], byteorder='big') data_md5 = (orign_data[-32:]).decode() data = decrypt_data(shared_key,orign_data[12:(data_length-32)]) value = verify_md5(data,data_md5) if value == 0: sock.send((f"第{sliced_num+1}组数据校验成功").encode()) else : sock.send((f"第{sliced_num+1}组数据校验成功").encode()) return sliced_sum,sliced_num,data_type,data def Authentication(sock,shared_key,query_thread1): #身份验证部分 账号密码存储在users.db数据库中 key = shared_key[:16] aes = AES.new(key,AES.MODE_ECB) username = (sock.recv(1024)) username = decrypt_data(shared_key,username) result = query_thread1.query_username(username) if result != None: sock.send(f"找到用户{username} 接下来进行密码验证".encode()) password = (sock.recv(1024)) decrypted_password = aes.decrypt(password) password = (unpad(decrypted_password, 16)).decode() result = query_thread1.query_password(username,password) if result != None: sock.send("身份验证通过".encode()) if username == "admin": permission_value = 1 else : permission_value = 0 return permission_value else : sock.send("密码错误,终止连接".encode()) sock.close() return 3 else: sock.send("你输入的用户名不存在 终止本次连接".encode()) sock.close() return 3 def menu(sock): text = "开始进行数据传输" sock.send(text.encode()) def transfer_txt(addr,data): print(f"客户机{addr}传输的数据为: {data}") def transfer_file(sock,data): #处理非切片数据 if isinstance(data,str): data = data.encode( ) data_type = mime.from_buffer(data) if data_type == "text/plain": with open("output.txt", 'wb') as f: f.write(data) sock.send("\n成功接受到文件".encode()) print(f"成功接受到文件") def transfer_file2(sock,data,sliced_sum,sliced_num,shared_key): #处理切片数据 data_bytes = data while sliced_num != sliced_sum-1: orign_data = (sock.recv(3000)) if len(orign_data) == 0: print("客户机中断连接") return sliced_sum,sliced_num,data_type,data = unpack_data(sock,shared_key,orign_data) data_bytes += data data_type = mime.from_buffer(data_bytes) if data_type == "image/png": with open("output.png", 'wb') as f: f.write(data_bytes) elif data_type == "audio/mpeg": with open("output.mp3", 'wb') as f: f.write(data_bytes) sock.send("\n成功接受到文件".encode()) print(f"成功接受到文件") def register(permission_value,sock,data,query_thread1): if permission_value == 0: #管理员权限的检验较为简单 sock.send("你所登录的用户并没有拥有管理员权限 无法注册用户".encode()) return else : parts = data.split(b';') username = parts[0] password = parts[1] query_thread1.adduser(sock,username,password) print(f"{threading.current_thread().name}:用户{username}注册成功") def verify_md5(data, original_md5): current_md5 = md5_encrypt(data) return current_md5 == original_md5 def transfer_function(sock,addr,public_key_server,private_key_server,query_thread1): print(f"{threading.current_thread().name}:客户机{addr}连接成功") public_key_client = key_switch(sock,public_key_server) print(f"{threading.current_thread().name}客户机{addr}公钥交换成功") print(f"{threading.current_thread().name}开始计算共享密钥") shared_key = private_key_server.exchange(public_key_client) print(f"{threading.current_thread().name}开始进行身份验证") permission_value = Authentication(sock,shared_key,query_thread1) print(f"{threading.current_thread().name}身份验证通过 开始处理客户机{addr}的数据传输") if permission_value == 3: print(f"{threading.current_thread().name}客户机中断连接") return while 1: menu(sock) orign_data = (sock.recv(3000)) if len(orign_data) == 0: print(f"{threading.current_thread().name}客户机中断连接") break sliced_sum,sliced_num,data_type,data = unpack_data(sock,shared_key,orign_data) if data_type == 1: transfer_txt(addr,data) elif data_type == 2: if sliced_sum != 0 or sliced_num != 0: transfer_file2(sock,data,sliced_sum,sliced_num,shared_key) else: transfer_file(sock,data) elif data_type ==3: register(permission_value,sock,data,query_thread1) def main(): # 固定的 DH 参数 p = 167369435262828772525424092470263277532300828423668354542113388987940885403967569624276882159444702063514952468172684099658558089467275283830276841486674376457626767287709752368374119769545320657500942952067517443133715688577345716588234520369470508343544601072272539124731608794518466242096737284389743744833 g = 2 pool = ThreadPoolExecutor(max_workers=5) #定义一个线程池 拥有5个空闲线程 query_thread1 = query_thread() #定义一个线程对象 后面用于数据库查询 因为数据库查询只能放在一个线程里使用 parameters = dh.DHParameterNumbers(p, g).parameters(backend=default_backend()) private_key_server = parameters.generate_private_key() public_key_server = private_key_server.public_key() s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) #创建套接字 默认使用TCP协议 s.bind(("10.0.137.2",6666)) s.listen(5) #最多连接5个客户端 while 1: print(f"{threading.current_thread().name}:等待客户机连接") sock,addr = s.accept() client_thread = pool.submit(transfer_function,sock,addr,public_key_server,private_key_server,query_thread1) if __name__ == "__main__": main()
客户端
import socket import struct import hashlib import base64 import time from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import dh from cryptography.hazmat.backends import default_backend from Crypto.Util.Padding import pad, unpad from Crypto.Cipher import AES choice_text = """ 1.传输文本数据 2.传输文件数据 3.新增用户 4.断开连接 输入选项: """ def md5_encrypt(data): #md5加密 用于完整性检测 md5_hash = hashlib.md5() md5_hash.update(data) hash_result = md5_hash.hexdigest() return hash_result def pack_data(choice,data,shared_key,sliced,sum): #将数据打包成报文 格式: sliced_judge(4bytes)+data_length(4bytes)+data_type(4bytes)+encrypted_data(Dynamic)+md5_data(32bytes) sliced_sum = struct.pack('>H',sum) sliced_num = struct.pack('>H',sliced) choice_type = struct.pack('>I',choice) en_data = encrypt_data(shared_key,data) if isinstance(data,str): data = data.encode() hash_data = md5_encrypt(data).encode() temp = choice_type+en_data+hash_data transfer_data = sliced_sum+sliced_num+struct.pack('>I',len(temp)+8)+temp return transfer_data def key_switch(s,public_key_client): #交换公钥 print("等待连接服务端") s.connect(("69.165.67.133",6666)) print("连接上服务端 开始进行公钥交换") public_key_bytes = public_key_client.public_bytes( encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo ) s.send(public_key_bytes) public_key_server = s.recv(1024) return serialization.load_pem_public_key(public_key_server) def encrypt_data(shared_key,data): #aes加密 key = shared_key[:16] aes = AES.new(key,AES.MODE_ECB) if isinstance(data,str): data = data.encode() data_bytes = pad(data, AES.block_size) encrypt_data = aes.encrypt(data_bytes) return encrypt_data def Authentication(s,shared_key): print("开始进行身份验证") username = input("输入用户名:") encrypt_username = encrypt_data(shared_key,username) #用户名经过aes加密后传输 s.send(encrypt_username) text = s.recv(1024).decode() if text == "你输入的用户名不存在 终止本次连接": print(text) exit() else: print(text) password = input("输入密码:") encrypt_password = encrypt_data(shared_key,password) #密码经过aes加密后传输 s.send(encrypt_password) text = s.recv(1024).decode() if text == "密码错误,终止连接": print(text) exit() else : print(text) def register(s,shared_key): username = input("输入用户名 :") #账号密码用;来间隔后 采用base64编码后丢到pack_data函数中处理 password = input("输入密码 :") data = username+";"+password transfer_data = pack_data(3,data,shared_key,0,0) s.send(transfer_data) def transfer_txt(s,shared_key): text = input("输入数据 :") #这个函数负责处理直接传输的文本数据 但是仍然对于是否需要切片进行了判断 if len(text) > 1000: sliced = 0 chunk_size = 1000 sliced_data = slice_string(text, chunk_size) sliced_sum = len(sliced_data) for i in range(len(sliced_data)): transfer_data = pack_data(1,sliced_data[i],shared_key,sliced,sliced_sum) sliced += 1 s.send(transfer_data) else : sliced = 0 sliced_sum = 0 transfer_data = pack_data(1,text,shared_key,sliced,sliced_sum) s.send(transfer_data) print((s.recv(1024)).decode()+"\n") def slice_string(data, chunk_size): #根据指定大小切片字符串 return [data[i:i + chunk_size] for i in range(0, len(data), chunk_size)] def transfer_file(s,shared_key): file_addr = input("请输入文件对于client.py的相对路径 :") #读取客户端本地文件的二进制流 with open(file_addr,'rb') as file: text = file.read() if len(text) > 1000: sliced = 0 chunk_size = 1000 sliced_data = slice_string(text, chunk_size) sliced_sum = len(sliced_data) print(f"一共有{sliced_sum}组数据需要传输\n") begin_time = time.time() for i in range(len(sliced_data)): transfer_data = pack_data(2,sliced_data[i],shared_key,sliced,sliced_sum) sliced += 1 s.send(transfer_data) text = (s.recv(1024)).decode() currect_time = time.time() temp = currect_time-begin_time print("\r"+text+f" 当前运行了{temp:.0f}s",end="") else : sliced = 0 sliced_sum = 0 transfer_data = pack_data(2,text,shared_key,sliced,sliced_sum) s.send(transfer_data) print((s.recv(1024)).decode()+"\n") def main(): # 固定的 DH 参数 p = 167369435262828772525424092470263277532300828423668354542113388987940885403967569624276882159444702063514952468172684099658558089467275283830276841486674376457626767287709752368374119769545320657500942952067517443133715688577345716588234520369470508343544601072272539124731608794518466242096737284389743744833 g = 2 parameters = dh.DHParameterNumbers(p, g).parameters(backend=default_backend()) private_key_client = parameters.generate_private_key() public_key_client = private_key_client.public_key() s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) public_key_server = key_switch(s,public_key_client) print("公钥交换成功") print("开始计算共享密钥") shared_key = private_key_client.exchange(public_key_server) Authentication(s,shared_key) print("成功连接") while 1: print((s.recv(1024)).decode()) choice = input(choice_text) if choice == "1": transfer_txt(s,shared_key) elif choice == "2": transfer_file(s,shared_key) elif choice == "3": register(s,shared_key) elif choice == "4": print("程序终止") s.close() exit() else : print("未找到输入的选项,请重新输入") continue if __name__ == "__main__": main()
v0.7 程序到这一版已经设计的比较完善了 可能还存在一些逻辑漏洞可以造成服务端宕机等问题 这个就留到后续的安全性分析吧 这一版本主要还是想优化一下文件传输的问题 目前的问题在于 校验数据完整性没有进行处理 比如如果失败 应该记录下失败的数据组序号 然后返回给客户机 然后就是识别文件类别的那块代码 现在用的是python-magic库来识别是属于什么文件 然后创建对应的文件用来存储接收到的数据 不过不是很确定这个库他是不是只是对文件头进行识别 如果是的话感觉还是相当好伪造的 这里应该有一个比较大的安全隐患 同时原本的类型处理也比较少 只适配了txt png mp3这三个最常见的格式 需要增加如果识别文件类型失败 就进行临时存储 同时报错 客户端
import socket import struct import hashlib import time import re from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import dh from cryptography.hazmat.backends import default_backend from Crypto.Util.Padding import pad, unpad from Crypto.Cipher import AES choice_text = """ 1.传输文本数据 2.传输文件数据 3.新增用户 4.断开连接 输入选项: """ def md5_encrypt(data): #md5加密 用于完整性检测 md5_hash = hashlib.md5() md5_hash.update(data) hash_result = md5_hash.hexdigest() return hash_result def pack_data(choice,data,shared_key,sliced,sum): #将数据打包成报文 格式: sliced_judge(4bytes)+data_length(4bytes)+data_type(4bytes)+encrypted_data(Dynamic)+md5_data(32bytes) sliced_sum = struct.pack('>H',sum) sliced_num = struct.pack('>H',sliced) choice_type = struct.pack('>I',choice) en_data = encrypt_data(shared_key,data) if isinstance(data,str): data = data.encode() hash_data = md5_encrypt(data).encode() temp = choice_type+en_data+hash_data transfer_data = sliced_sum+sliced_num+struct.pack('>I',len(temp)+8)+temp return transfer_data def key_switch(s,public_key_client): #交换公钥 print("等待连接服务端") s.connect(("69.165.67.133",6666)) print("连接上服务端 开始进行公钥交换") public_key_bytes = public_key_client.public_bytes( encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo ) s.send(public_key_bytes) public_key_server = s.recv(1024) return serialization.load_pem_public_key(public_key_server) def encrypt_data(shared_key,data): #aes加密 key = shared_key[:16] aes = AES.new(key,AES.MODE_ECB) if isinstance(data,str): data = data.encode() data_bytes = pad(data, AES.block_size) encrypt_data = aes.encrypt(data_bytes) return encrypt_data def Authentication(s,shared_key): print("开始进行身份验证") username = input("输入用户名:") encrypt_username = encrypt_data(shared_key,username) #用户名经过aes加密后传输 s.send(encrypt_username) text = s.recv(1024).decode() if text == "你输入的用户名不存在 终止本次连接": print(text) exit() else: print(text) password = input("输入密码:") encrypt_password = encrypt_data(shared_key,password) #密码经过aes加密后传输 s.send(encrypt_password) text = s.recv(1024).decode() if text == "密码错误,终止连接": print(text) exit() else : print(text) def register(s,shared_key): username = input("输入用户名 :") #账号密码用;来间隔后 采用base64编码后丢到pack_data函数中处理 password = input("输入密码 :") data = username+";"+password transfer_data = pack_data(3,data,shared_key,0,0) s.send(transfer_data) def transfer_txt(s,shared_key): text = input("输入数据 :") #这个函数负责处理直接传输的文本数据 但是仍然对于是否需要切片进行了判断 if len(text) > 1024: sliced = 0 chunk_size = 1024 sliced_data = slice_string(text, chunk_size) sliced_sum = len(sliced_data) for i in range(len(sliced_data)): transfer_data = pack_data(1,sliced_data[i],shared_key,sliced,sliced_sum) sliced += 1 s.send(transfer_data) else : sliced = 0 sliced_sum = 0 transfer_data = pack_data(1,text,shared_key,sliced,sliced_sum) s.send(transfer_data) print((s.recv(1024)).decode()+"\n") def slice_string(data, chunk_size): #根据指定大小切片字符串 return [data[i:i + chunk_size] for i in range(0, len(data), chunk_size)] def transfer_file(s,shared_key): file_addr = input("请输入文件对于client.py的相对路径 :") #读取客户端本地文件的二进制流 with open(file_addr,'rb') as file: text = file.read() if len(text) > 1024: sliced = 0 chunk_size = 1024 sliced_data = slice_string(text, chunk_size) sliced_sum = len(sliced_data) print(f"一共有{sliced_sum}组数据需要传输\n") begin_time = time.time() failure_msg = "第" failure_number = None for i in range(len(sliced_data)): transfer_data = pack_data(2,sliced_data[i],shared_key,sliced,sliced_sum) sliced += 1 s.send(transfer_data) text = (s.recv(1024)).decode() if "失败" in text: start_index = text.find('{') + 1 end_index = text.find('}') failure_number = text[start_index:end_index] failure_msg += failure_number+"," currect_time = time.time() temp = currect_time-begin_time print("\r"+text+f" 当前运行了{temp:.0f}s",end="") failure_msg += "组传输失败" if failure_number != None: print(failure_msg) print("\n",end="") else : sliced = 0 sliced_sum = 0 transfer_data = pack_data(2,text,shared_key,sliced,sliced_sum) s.send(transfer_data) print((s.recv(1024)).decode()+"\n") def main(): # 固定的 DH 参数 p = 167369435262828772525424092470263277532300828423668354542113388987940885403967569624276882159444702063514952468172684099658558089467275283830276841486674376457626767287709752368374119769545320657500942952067517443133715688577345716588234520369470508343544601072272539124731608794518466242096737284389743744833 g = 2 parameters = dh.DHParameterNumbers(p, g).parameters(backend=default_backend()) private_key_client = parameters.generate_private_key() public_key_client = private_key_client.public_key() s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) public_key_server = key_switch(s,public_key_client) print("公钥交换成功") print("开始计算共享密钥") shared_key = private_key_client.exchange(public_key_server) Authentication(s,shared_key) print("成功连接") while 1: print((s.recv(1024)).decode()) choice = input(choice_text) if choice == "1": transfer_txt(s,shared_key) elif choice == "2": transfer_file(s,shared_key) elif choice == "3": register(s,shared_key) elif choice == "4": print("程序终止") s.close() exit() else : print("未找到输入的选项,请重新输入") continue if __name__ == "__main__": main()
服务端
import socket import hashlib import magic from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import dh from cryptography.hazmat.backends import default_backend from Crypto.Util.Padding import pad, unpad from Crypto.Cipher import AES import sqlite3 from concurrent.futures import ThreadPoolExecutor import threading mime = magic.Magic(mime=True) class query_thread(threading.Thread): def __init__(self,name = None): threading.Thread.__init__(self,name=name) def query_username(self,username): conn = sqlite3.connect('users.db') cursor = conn.cursor() cursor.execute("SELECT username FROM users WHERE username = ?", (username,)) result = cursor.fetchone() cursor.close() return result def query_password(self,username,password): conn = sqlite3.connect('users.db') cursor = conn.cursor() cursor.execute("SELECT password FROM users WHERE username = ? AND password = ?", (username, password)) result = cursor.fetchone() cursor.close() return result def add_user(sock,username,password): conn = sqlite3.connect('users.db') cursor = conn.cursor() try: # 插入新的用户数据 cursor.execute("INSERT INTO users (username, password) VALUES (?, ?)", (username, password)) conn.commit() sock.send("用户注册成功".encode()) except sqlite3.IntegrityError: sock.send("用户已存在".encode()) except Exception as e: sock.send(e.encode()) def md5_encrypt(data): if isinstance(data, str): data = data.encode() # md5加密需要bytes参数 md5_hash = hashlib.md5() #md5加密 md5_hash.update(data) hash_result = md5_hash.hexdigest() return hash_result def key_switch(sock,public_key_server): print(f"{threading.current_thread().name}开始进行公钥交换") try: public_key_client = sock.recv(1024) except Exception as e: print("接受错误",e) public_key_bytes = public_key_server.public_bytes( #将客户端的公钥序列化为 PEM 格式的字节串 encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo ) sock.send(public_key_bytes) return serialization.load_pem_public_key(public_key_client) #将接受到的服务端公钥反序列化为公钥对象 def decrypt_data(shared_key,data): #data要为byte型 key = shared_key[:16] aes = AES.new(key,AES.MODE_ECB) if isinstance(data,str): data = data.encode() decrypted_data = aes.decrypt(data) data = (unpad(decrypted_data, 16)).decode('latin1') return data def unpack_data(sock,shared_key,orign_data): #拆解数据包 sliced_sum = int.from_bytes(orign_data[:2],byteorder = 'big') sliced_num = int.from_bytes(orign_data[2:4],byteorder='big') data_length = int.from_bytes(orign_data[4:8], byteorder='big') data_type = int.from_bytes(orign_data[8:12], byteorder='big') data_md5 = (orign_data[-32:]).decode() data = decrypt_data(shared_key,orign_data[12:(data_length-32)]) value = verify_md5(data,data_md5) if value == 0: sock.send((f"第{sliced_num+1}组数据校验成功").encode()) else : sock.send((f"第{sliced_num+1}组数据校验失败").encode()) return sliced_sum,sliced_num,data_type,data def Authentication(sock,shared_key,query_thread1): #身份验证部分 账号密码存储在users.db数据库中 key = shared_key[:16] aes = AES.new(key,AES.MODE_ECB) username = (sock.recv(1024)) username = decrypt_data(shared_key,username) result = query_thread1.query_username(username) if result != None: sock.send(f"找到用户{username} 接下来进行密码验证".encode()) password = (sock.recv(1024)) decrypted_password = aes.decrypt(password) password = (unpad(decrypted_password, 16)).decode() result = query_thread1.query_password(username,password) if result != None: sock.send("身份验证通过".encode()) if username == "admin": permission_value = 1 else : permission_value = 0 return permission_value else : sock.send("密码错误,终止连接".encode()) sock.close() return 3 else: sock.send("你输入的用户名不存在 终止本次连接".encode()) sock.close() return 3 def menu(sock): text = "开始进行数据传输" sock.send(text.encode()) def transfer_txt(addr,data): print(f"客户机{addr}传输的数据为: {data}") def transfer_file(sock,data): #处理非切片数据 data_type = mime.from_buffer(data) filename = "output/"+threading.current_thread().name try: if data_type == "text/plain": filename += ".txt" with open(filename, 'wb') as f: f.write(data) elif data_type == "text/x-c": filename += ".c" with open(filename,'wb') as f: f.write(data) else : print(f"没有识别到文件类型{data_type},暂时存放在temp中") filename += "-temp" with open(filename,'wb') as f: f.write(data) sock.send("\n成功接受到文件".encode()) print(f"{threading.current_thread().name}:成功接受到文件{filename}") except Exception as e: text = f"保存文件时发生错误 错误信息:{e}" print(f"{threading.current_thread().name}:"+text) sock.send(text.encode()) def transfer_file2(sock,data,sliced_sum,sliced_num,shared_key): #处理切片数据 data_bytes = data while sliced_num != sliced_sum-1: orign_data = (sock.recv(3000)) if len(orign_data) == 0: print(f"{threading.current_thread().name}:客户机中断连接") return sliced_sum,sliced_num,data_type,data = unpack_data(sock,shared_key,orign_data) data_bytes += data data_bytes = data_bytes.encode('latin1') data_type = mime.from_buffer(data_bytes) filename = "output/"+threading.current_thread().name try: if data_type == "image/png": filename += ".png" with open(filename, 'wb') as f: f.write(data_bytes) elif data_type == "audio/mpeg": filename += ".mp3" with open(filename, 'wb') as f: f.write(data_bytes) elif data_type == "text/x-c": filename += ".c" with open(filename,'wb') as f: f.write(data_bytes) else: print(f"{threading.current_thread().name}:没有识别到文件类型{data_type},暂时存放在temp中") filename += "-temp" with open(filename,'wb') as f: f.write(data_bytes) sock.send("\n成功接受到文件".encode()) print(f"{threading.current_thread().name}:成功接受到文件{filename}") except Exception as e: text = f"保存文件时发生错误 错误信息:{e}" print(f"{threading.current_thread().name}:"+text) sock.send(text.encode()) def register(permission_value,sock,data,query_thread1): if permission_value == 0: #管理员权限的检验较为简单 sock.send("你所登录的用户并没有拥有管理员权限 无法注册用户".encode()) return else : parts = data.split(b';') username = parts[0] password = parts[1] query_thread1.adduser(sock,username,password) print(f"{threading.current_thread().name}:用户{username}注册成功") def verify_md5(data, original_md5): current_md5 = md5_encrypt(data) return current_md5 == original_md5 def transfer_function(sock,addr,public_key_server,private_key_server,query_thread1): print(f"{threading.current_thread().name}:客户机{addr}连接成功") public_key_client = key_switch(sock,public_key_server) print(f"{threading.current_thread().name}:客户机{addr}公钥交换成功") print(f"{threading.current_thread().name}:开始计算共享密钥") shared_key = private_key_server.exchange(public_key_client) print(f"{threading.current_thread().name}:开始进行身份验证") permission_value = Authentication(sock,shared_key,query_thread1) print(f"{threading.current_thread().name}:身份验证通过 开始处理客户机{addr}的数据传输") if permission_value == 3: print(f"{threading.current_thread().name}:客户机中断连接") return while 1: menu(sock) orign_data = (sock.recv(3000)) if len(orign_data) == 0: print(f"{threading.current_thread().name}:客户机中断连接") break sliced_sum,sliced_num,data_type,data = unpack_data(sock,shared_key,orign_data) if data_type == 1: transfer_txt(addr,data) elif data_type == 2: if sliced_sum != 0 or sliced_num != 0: transfer_file2(sock,data,sliced_sum,sliced_num,shared_key) else: transfer_file(sock,data) elif data_type ==3: register(permission_value,sock,data,query_thread1) def main(): # 固定的 DH 参数 p = 167369435262828772525424092470263277532300828423668354542113388987940885403967569624276882159444702063514952468172684099658558089467275283830276841486674376457626767287709752368374119769545320657500942952067517443133715688577345716588234520369470508343544601072272539124731608794518466242096737284389743744833 g = 2 pool = ThreadPoolExecutor(max_workers=5) #定义一个线程池 拥有5个空闲线程 query_thread1 = query_thread() #定义一个线程对象 后面用于数据库查询 因为数据库查询只能放在一个线程里使用 parameters = dh.DHParameterNumbers(p, g).parameters(backend=default_backend()) private_key_server = parameters.generate_private_key() public_key_server = private_key_server.public_key() s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) #创建套接字 默认使用TCP协议 s.bind(("10.0.137.2",6666)) s.listen(5) #最多连接5个客户端 while 1: print(f"{threading.current_thread().name}:等待客户机连接") sock,addr = s.accept() client_thread = pool.submit(transfer_function,sock,addr,public_key_server,private_key_server,query_thread1) if __name__ == "__main__": main()
v0.8 这一版本的目的在于增加日志功能 因为就目前来说 一旦连接的客户机多了 服务端那边的输出就会比较混乱 具体的实现应该也比较简单 在accpet连接后就创建一个文件以及字符串(作为记录该客户机操作记录的缓冲区) 然后在检测客户机中断连接后就把缓冲区的内容写入日志 并且加入时间 来方便朔源 或者直接在执行对应操作后就写入日志 不过从理论上来说应该是前者的方案更加节省资源 预计难点在于在哪里塞入判断来check什么时候写入日志 所以还是选择后者吧 同时还把原本写的和屎一样的传输数据类型转化优化了一下 变成了好闻的屎 服务端
import socket import hashlib import magic import time from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import dh from cryptography.hazmat.backends import default_backend from Crypto.Util.Padding import pad, unpad from Crypto.Cipher import AES import sqlite3 from concurrent.futures import ThreadPoolExecutor import threading mime = magic.Magic(mime=True) def write_log(log_file,text): current_time = time.asctime(time.localtime(time.time())) text += " "+current_time log_file.write(text.encode()+b"\n") class query_thread(threading.Thread): def __init__(self,name = None): threading.Thread.__init__(self,name=name) def query_username(self,username): conn = sqlite3.connect('users.db') cursor = conn.cursor() cursor.execute("SELECT username FROM users WHERE username = ?", (username,)) result = cursor.fetchone() cursor.close() return result def query_password(self,username,password): conn = sqlite3.connect('users.db') cursor = conn.cursor() cursor.execute("SELECT password FROM users WHERE username = ? AND password = ?", (username, password)) result = cursor.fetchone() cursor.close() return result def add_user(sock,username,password): conn = sqlite3.connect('users.db') cursor = conn.cursor() try: # 插入新的用户数据 cursor.execute("INSERT INTO users (username, password) VALUES (?, ?)", (username, password)) conn.commit() sock.send("用户注册成功".encode()) except sqlite3.IntegrityError: sock.send("用户已存在".encode()) except Exception as e: sock.send(e.encode()) def md5_encrypt(data): if isinstance(data, str): data = data.encode() # md5加密需要bytes参数 md5_hash = hashlib.md5() #md5加密 md5_hash.update(data) hash_result = md5_hash.hexdigest() return hash_result def key_switch(sock,public_key_server): print(f"{threading.current_thread().name}开始进行公钥交换") try: public_key_client = sock.recv(1024) except Exception as e: print("接受错误",e) public_key_bytes = public_key_server.public_bytes( #将客户端的公钥序列化为 PEM 格式的字节串 encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo ) sock.send(public_key_bytes) return serialization.load_pem_public_key(public_key_client) #将接受到的服务端公钥反序列化为公钥对象 def decrypt_data(shared_key,data): #data要为byte型 key = shared_key[:16] aes = AES.new(key,AES.MODE_ECB) if isinstance(data,str): data = data.encode() decrypted_data = aes.decrypt(data) data = (unpad(decrypted_data, 16)) return data def unpack_data(sock,shared_key,orign_data,log_file,addr): #拆解数据包 sliced_sum = int.from_bytes(orign_data[:2],byteorder = 'big') sliced_num = int.from_bytes(orign_data[2:4],byteorder='big') data_length = int.from_bytes(orign_data[4:8], byteorder='big') data_type = int.from_bytes(orign_data[8:12], byteorder='big') data_md5 = (orign_data[-32:]).decode() data = decrypt_data(shared_key,orign_data[12:(data_length-32)]) value = verify_md5(data,data_md5) if value == 0: sock.send((f"第{sliced_num+1}组数据校验失败").encode()) text = f"客户机{addr}传输数据过程中第{sliced_num+1}组数据校验失败" write_log(log_file,text) else : sock.send((f"第{sliced_num+1}组数据校验成功").encode()) return sliced_sum,sliced_num,data_type,data def Authentication(sock,shared_key,query_thread1,addr,log_file): #身份验证部分 账号密码存储在users.db数据库中 key = shared_key[:16] aes = AES.new(key,AES.MODE_ECB) username = (sock.recv(1024)) if len(username) == 0: text = f"客户机{addr}中断连接" write_log(log_file,text) log_file.close() sock.close() print(f"{threading.current_thread().name}:客户机中断连接") return username = decrypt_data(shared_key,username) username = username.decode() result = query_thread1.query_username(username) if result != None: sock.send(f"找到用户{username} 接下来进行密码验证".encode()) password = (sock.recv(1024)) if len(password) == 0: text = f"客户机{addr}中断连接" write_log(log_file,text) log_file.close() sock.close() print(f"{threading.current_thread().name}:客户机中断连接") return password = decrypt_data(shared_key,password) password = password.decode() result = query_thread1.query_password(username,password) if result != None: sock.send("身份验证通过".encode()) if username == "admin": permission_value = 1 else : permission_value = 0 text = f"客户机{addr}使用用户名({username})密码({password})进行身份验证通过" write_log(log_file,text) return permission_value,username else : sock.send("密码错误,终止连接".encode()) text = f"客户机{addr}尝试登录用户({username})输入密码({password}),密码错误登录失败" write_log(log_file,text) sock.close() return 3,username else: sock.send("你输入的用户名不存在 终止本次连接".encode()) text = f"客户机{addr}尝试登录用户({username}),但是用户不存在" write_log(log_file,text) sock.close() return 3,username def menu(sock): text = "开始进行数据传输" sock.send(text.encode()) def transfer_txt(addr,data,log_file): print(f"{threading.current_thread().name}客户机{addr}传输的数据为: {data}") current_time = time.asctime(time.localtime(time.time())) text = f"客户机{addr}传输的数据为: {data} "+current_time write_log(log_file,text) def transfer_file(sock,data,addr,log_file): #处理非切片数据 data_type = mime.from_buffer(data) filename = "output/"+addr try: if data_type == "text/plain": filename += ".txt" with open(filename, 'wb') as f: f.write(data) elif data_type == "text/x-c": filename += ".c" with open(filename,'wb') as f: f.write(data) else : print(f"没有识别到文件类型{data_type},暂时存放在temp中") filename += "-temp" with open(filename,'wb') as f: f.write(data) text = f"客户机{addr}传输了{filename}" write_log(log_file,text) sock.send("\n成功接受到文件".encode()) print(f"{threading.current_thread().name}:成功接受到文件{filename}") except Exception as e: text = f"保存文件时发生错误 错误信息:{e}" print(f"{threading.current_thread().name}:"+text) text = f"客户机保存文件时发生错误 错误信息{e}" write_log(log_file,text) sock.send(text.encode()) def transfer_file2(sock,data,sliced_sum,sliced_num,shared_key,addr,log_file): #处理切片数据 data_bytes = data while sliced_num != (sliced_sum-1): orign_data = (sock.recv(3000)) if len(orign_data) == 0: text = f"客户机{addr}中断连接" write_log(log_file,text) log_file.close() print(f"{threading.current_thread().name}:客户机中断连接") sock.close() return sliced_sum,sliced_num,data_type,data = unpack_data(sock,shared_key,orign_data,log_file,addr) data_bytes += data data_type = mime.from_buffer(data_bytes) filename = "output/"+addr try: if data_type == "image/png": filename += ".png" with open(filename, 'wb') as f: f.write(data_bytes) elif data_type == "audio/mpeg": filename += ".mp3" with open(filename, 'wb') as f: f.write(data_bytes) elif data_type == "text/x-c": filename += ".c" with open(filename,'wb') as f: f.write(data_bytes) else: print(f"{threading.current_thread().name}:没有识别到文件类型{data_type},暂时存放在temp中") filename += "-temp" with open(filename,'wb') as f: f.write(data_bytes) text = f"客户机{addr}传输了{filename}" write_log(log_file,text) sock.send("\n成功接受到文件".encode()) print(f"{threading.current_thread().name}:成功接受到文件{filename}") except Exception as e: text = f"保存文件时发生错误 错误信息:{e}" print(f"{threading.current_thread().name}:"+text) text = f"客户机保存文件时发生错误 错误信息{e}" write_log(log_file,text) sock.send(text.encode()) def register(permission_value,sock,data,query_thread1,log_file,addr,login_username): if permission_value == 0: #管理员权限的检验较为简单 sock.send("你所登录的用户并没有拥有管理员权限 无法注册用户".encode()) text = f"客户机{addr}尝试使用非管理员权限用户({login_username})注册新用户" write_log(log_file,text) return else : parts = data.split(b';') username = parts[0] password = parts[1] query_thread1.adduser(sock,username,password) print(f"{threading.current_thread().name}:用户{username}注册成功") text = f"客户机{addr}使用管理员权限用户({login_username})注册新用户({username})成功 对应用户密码为({password})" def verify_md5(data, original_md5): current_md5 = md5_encrypt(data) return current_md5 == original_md5 def transfer_function(sock,addr,public_key_server,private_key_server,query_thread1): addr = format(addr[0]) log_name = "logfile/"+addr+"-log" try: log_file = open(log_name,'ab+') except Exception as e: print(f"{threading.current_thread().name}创建日志文件失败,错误信息:{e}") text = f"客户机{addr}连接成功" write_log(log_file,text) print(f"{threading.current_thread().name}:客户机{addr}连接成功") public_key_client = key_switch(sock,public_key_server) print(f"{threading.current_thread().name}:客户机{addr}公钥交换成功") print(f"{threading.current_thread().name}:开始计算共享密钥") shared_key = private_key_server.exchange(public_key_client) print(f"{threading.current_thread().name}:开始进行身份验证") permission_value,username = Authentication(sock,shared_key,query_thread1,addr,log_file) if permission_value == 3: text = f"客户机{addr}中断连接" write_log(log_file,text) log_file.close() sock.close() print(f"{threading.current_thread().name}:客户机中断连接") return print(f"{threading.current_thread().name}:身份验证通过 开始处理客户机{addr}的数据传输") while 1: menu(sock) orign_data = (sock.recv(3000)) if len(orign_data) == 0: text = f"客户机{addr}中断连接" write_log(log_file,text) log_file.close() print(f"{threading.current_thread().name}:客户机中断连接") break sliced_sum,sliced_num,data_type,data = unpack_data(sock,shared_key,orign_data,log_file,addr) if data_type == 1: text = f"客户机{addr}执行了传输文本数据操作" write_log(log_file,text) transfer_txt(addr,data,log_file) elif data_type == 2: if sliced_sum != 0 or sliced_num != 0: text = f"客户机{addr}执行了传输文件操作,并且传输的文件过大需要切片传输,切片总数为{sliced_sum}" write_log(log_file,text) transfer_file2(sock,data,sliced_sum,sliced_num,shared_key,addr,log_file) else: text = f"客户机{addr}执行了传输文件操作" write_log(log_file,text) transfer_file(sock,data,addr,log_file) elif data_type ==3: text = f"客户机{addr}执行了注册账号操作" write_log(log_file,text) register(permission_value,sock,data,query_thread1,log_file,addr,username) def main(): # 固定的 DH 参数 p = 167369435262828772525424092470263277532300828423668354542113388987940885403967569624276882159444702063514952468172684099658558089467275283830276841486674376457626767287709752368374119769545320657500942952067517443133715688577345716588234520369470508343544601072272539124731608794518466242096737284389743744833 g = 2 pool = ThreadPoolExecutor(max_workers=5) #定义一个线程池 拥有5个空闲线程 query_thread1 = query_thread() #定义一个线程对象 后面用于数据库查询 因为数据库查询只能放在一个线程里使用 parameters = dh.DHParameterNumbers(p, g).parameters(backend=default_backend()) private_key_server = parameters.generate_private_key() public_key_server = private_key_server.public_key() s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) #创建套接字 默认使用TCP协议 s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) #允许在端口上处于 TIME_WAIT 状态时再次绑定 s.bind(("10.0.137.2",6666)) s.listen(5) #最多连接5个客户端 while 1: print(f"{threading.current_thread().name}:等待客户机连接") sock,addr = s.accept() client_thread = pool.submit(transfer_function,sock,addr,public_key_server,private_key_server,query_thread1) if __name__ == "__main__": main()
客户端
import socket import struct import hashlib import time import re from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import dh from cryptography.hazmat.backends import default_backend from Crypto.Util.Padding import pad, unpad from Crypto.Cipher import AES choice_text = """ 1.传输文本数据 2.传输文件数据 3.新增用户 4.断开连接 输入选项: """ def md5_encrypt(data): #md5加密 用于完整性检测 md5_hash = hashlib.md5() if isinstance(data,str): data = data.encode() md5_hash.update(data) hash_result = md5_hash.hexdigest() return hash_result def pack_data(choice,data,shared_key,sliced,sum): #将数据打包成报文 格式: sliced_judge(4bytes)+data_length(4bytes)+data_type(4bytes)+encrypted_data(Dynamic)+md5_data(32bytes) sliced_sum = struct.pack('>H',sum) sliced_num = struct.pack('>H',sliced) choice_type = struct.pack('>I',choice) en_data = encrypt_data(shared_key,data) hash_data = md5_encrypt(data).encode() temp = choice_type+en_data+hash_data transfer_data = sliced_sum+sliced_num+struct.pack('>I',len(temp)+8)+temp return transfer_data def key_switch(s,public_key_client): #交换公钥 print("等待连接服务端") s.connect(("69.165.67.133",6666)) print("连接上服务端 开始进行公钥交换") public_key_bytes = public_key_client.public_bytes( encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo ) s.send(public_key_bytes) public_key_server = s.recv(1024) return serialization.load_pem_public_key(public_key_server) def encrypt_data(shared_key,data): #aes加密 key = shared_key[:16] aes = AES.new(key,AES.MODE_ECB) if isinstance(data,str): data = data.encode() data_bytes = pad(data, AES.block_size) encrypt_data = aes.encrypt(data_bytes) return encrypt_data def Authentication(s,shared_key): print("开始进行身份验证") username = input("输入用户名:") encrypt_username = encrypt_data(shared_key,username) #用户名经过aes加密后传输 s.send(encrypt_username) text = s.recv(1024).decode() if text == "你输入的用户名不存在 终止本次连接": print(text) exit() else: print(text) password = input("输入密码:") encrypt_password = encrypt_data(shared_key,password) #密码经过aes加密后传输 s.send(encrypt_password) text = s.recv(1024).decode() if text == "密码错误,终止连接": print(text) exit() else : print(text) def register(s,shared_key): username = input("输入用户名 :") #账号密码用;来间隔后 采用base64编码后丢到pack_data函数中处理 password = input("输入密码 :") data = username+";"+password transfer_data = pack_data(3,data,shared_key,0,0) s.send(transfer_data) def transfer_txt(s,shared_key): text = input("输入数据 :") #这个函数负责处理直接传输的文本数据 但是仍然对于是否需要切片进行了判断 if len(text) > 1024: sliced = 0 chunk_size = 1024 sliced_data = slice_string(text, chunk_size) sliced_sum = len(sliced_data) for i in range(len(sliced_data)): transfer_data = pack_data(1,sliced_data[i],shared_key,sliced,sliced_sum) sliced += 1 s.send(transfer_data) else : sliced = 0 sliced_sum = 0 transfer_data = pack_data(1,text,shared_key,sliced,sliced_sum) s.send(transfer_data) print((s.recv(1024)).decode()+"\n") def slice_string(data, chunk_size): #根据指定大小切片字符串 return [data[i:i + chunk_size] for i in range(0, len(data), chunk_size)] def transfer_file(s,shared_key): file_addr = input("请输入文件对于client.py的相对路径 :") #读取客户端本地文件的二进制流 with open(file_addr,'rb') as file: text = file.read() if len(text) > 1024: sliced = 0 chunk_size = 1024 sliced_data = slice_string(text, chunk_size) sliced_sum = len(sliced_data) print(f"一共有{sliced_sum}组数据需要传输\n") begin_time = time.time() failure_msg = "第" failure_number = None for i in range(len(sliced_data)): transfer_data = pack_data(2,sliced_data[i],shared_key,sliced,sliced_sum) sliced += 1 s.send(transfer_data) text = (s.recv(1024)).decode() if "失败" in text: start_index = text.find('{') + 1 end_index = text.find('}') failure_number = text[start_index:end_index] failure_msg += failure_number+"," currect_time = time.time() temp = currect_time-begin_time print("\r"+text+f" 当前运行了{temp:.0f}s",end="") failure_msg += "组传输失败" if failure_number != None: print(failure_msg) print("\n",end="") else : sliced = 0 sliced_sum = 0 transfer_data = pack_data(2,text,shared_key,sliced,sliced_sum) s.send(transfer_data) print((s.recv(1024)).decode()+"\n") def main(): # 固定的 DH 参数 p = 167369435262828772525424092470263277532300828423668354542113388987940885403967569624276882159444702063514952468172684099658558089467275283830276841486674376457626767287709752368374119769545320657500942952067517443133715688577345716588234520369470508343544601072272539124731608794518466242096737284389743744833 g = 2 parameters = dh.DHParameterNumbers(p, g).parameters(backend=default_backend()) private_key_client = parameters.generate_private_key() public_key_client = private_key_client.public_key() s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) public_key_server = key_switch(s,public_key_client) print("公钥交换成功") print("开始计算共享密钥") shared_key = private_key_client.exchange(public_key_server) Authentication(s,shared_key) print("成功连接") while 1: print((s.recv(1024)).decode()) choice = input(choice_text) if choice == "1": transfer_txt(s,shared_key) elif choice == "2": transfer_file(s,shared_key) elif choice == "3": register(s,shared_key) elif choice == "4": print("程序终止") s.close() exit() else : print("未找到输入的选项,请重新输入") continue if __name__ == "__main__": main()
v0.9 这个版本的目的是修修bug 尽量优化一下代码 准备在1.0版本实现图形化管理界面 然后就正式结束了这个项目 针对传输速率过慢的问题 大概有了头绪了 原因出在客户端输出当前组数据校验是否正确 这一功能是利用客户端和服务端高频send和recv的 所以会造成卡顿 取消掉了以后发送速度明显提高 但是接受速度跟不上了 这里想到的解决办法是专门给大文件收发开一个新线程 看看能不能实现 然后关于切片那一块可能也要优化一下了 11/9记 mlgb 我觉得用socket作为这个协议的基础框架根本就是个错误的选择 服务端运行在服务器上的时候 会因为各种奇奇怪怪的原因导致丢包 最后只能通过sleep来控制客户端发包的速度 服务端
import socket import hashlib import magic import time from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import dh from cryptography.hazmat.backends import default_backend from Crypto.Util.Padding import pad, unpad from Crypto.Cipher import AES import sqlite3 from concurrent.futures import ThreadPoolExecutor import threading mime = magic.Magic(mime=True) def write_log(log_file,text): current_time = time.asctime(time.localtime(time.time())) text += " "+current_time log_file.write(text.encode()+b"\n") class query_thread(threading.Thread): def __init__(self,name = None): threading.Thread.__init__(self,name=name) def query_username(self,username): conn = sqlite3.connect('users.db') cursor = conn.cursor() cursor.execute("SELECT username FROM users WHERE username = ?", (username,)) result = cursor.fetchone() cursor.close() return result def query_password(self,username,password): conn = sqlite3.connect('users.db') cursor = conn.cursor() cursor.execute("SELECT password FROM users WHERE username = ? AND password = ?", (username, password)) result = cursor.fetchone() cursor.close() return result def add_user(sock,username,password): conn = sqlite3.connect('users.db') cursor = conn.cursor() try: # 插入新的用户数据 cursor.execute("INSERT INTO users (username, password) VALUES (?, ?)", (username, password)) conn.commit() sock.send("用户注册成功".encode()) except sqlite3.IntegrityError: sock.send("用户已存在".encode()) except Exception as e: sock.send(e.encode()) def md5_encrypt(data): if isinstance(data, str): data = data.encode() # md5加密需要bytes参数 md5_hash = hashlib.md5() #md5加密 md5_hash.update(data) hash_result = md5_hash.hexdigest() return hash_result def key_switch(sock,public_key_server): print(f"{threading.current_thread().name}开始进行公钥交换") try: public_key_client = sock.recv(1024) except Exception as e: print("接受错误",e) public_key_bytes = public_key_server.public_bytes( #将客户端的公钥序列化为 PEM 格式的字节串 encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo ) sock.send(public_key_bytes) return serialization.load_pem_public_key(public_key_client) #将接受到的服务端公钥反序列化为公钥对象 def decrypt_data(shared_key,data): #data要为byte型 key = shared_key[:16] aes = AES.new(key,AES.MODE_ECB) if isinstance(data,str): data = data.encode() decrypted_data = aes.decrypt(data) data = (unpad(decrypted_data, 16)) return data def unpack_data(sock,shared_key,orign_data,log_file,addr): #拆解数据包 sliced_sum = int.from_bytes(orign_data[:2],byteorder = 'big') sliced_num = int.from_bytes(orign_data[2:4],byteorder='big') data_length = int.from_bytes(orign_data[4:8], byteorder='big') data_type = int.from_bytes(orign_data[8:12], byteorder='big') data_md5 = (orign_data[-32:]).decode() data = decrypt_data(shared_key,orign_data[12:(data_length-32)]) value = verify_md5(data,data_md5) if value == False: fail_number = sliced_num else: fail_number = None return sliced_sum,sliced_num,data_type,data,fail_number def Authentication(sock,shared_key,query_thread1,addr,log_file): #身份验证部分 账号密码存储在users.db数据库中 key = shared_key[:16] aes = AES.new(key,AES.MODE_ECB) username = (sock.recv(1024)) if len(username) == 0: text = f"客户机{addr}中断连接" write_log(log_file,text) log_file.close() sock.close() print(f"{threading.current_thread().name}:客户机中断连接") return username = decrypt_data(shared_key,username) username = username.decode() result = query_thread1.query_username(username) if result != None: sock.send(f"找到用户{username} 接下来进行密码验证".encode()) password = (sock.recv(1024)) if len(password) == 0: text = f"客户机{addr}中断连接" write_log(log_file,text) log_file.close() sock.close() print(f"{threading.current_thread().name}:客户机中断连接") return password = decrypt_data(shared_key,password) password = password.decode() result = query_thread1.query_password(username,password) if result != None: sock.send("身份验证通过".encode()) if username == "admin": permission_value = 1 else : permission_value = 0 text = f"客户机{addr}使用用户名({username})密码({password})进行身份验证通过" write_log(log_file,text) return permission_value,username else : sock.send("密码错误,终止连接".encode()) text = f"客户机{addr}尝试登录用户({username})输入密码({password}),密码错误登录失败" write_log(log_file,text) sock.close() return 3,username else: sock.send("你输入的用户名不存在 终止本次连接".encode()) text = f"客户机{addr}尝试登录用户({username}),但是用户不存在" write_log(log_file,text) sock.close() return 3,username def menu(sock): text = "开始进行数据传输" sock.send(text.encode()) def transfer_txt(addr,data,log_file): print(f"{threading.current_thread().name}客户机{addr}传输的数据为: {data}") current_time = time.asctime(time.localtime(time.time())) text = f"客户机{addr}传输的数据为: {data} "+current_time write_log(log_file,text) def transfer_file(sock,data,addr,log_file): #处理非切片数据 data_type = mime.from_buffer(data) filename = "output/"+addr try: if data_type == "text/plain": filename += ".txt" with open(filename, 'wb') as f: f.write(data) elif data_type == "text/x-c": filename += ".c" with open(filename,'wb') as f: f.write(data) else : print(f"没有识别到文件类型{data_type},暂时存放在temp中") filename += "-temp" with open(filename,'wb') as f: f.write(data) text = f"客户机{addr}传输了{filename}" write_log(log_file,text) sock.send("\n成功接受到文件".encode()) print(f"{threading.current_thread().name}:成功接受到文件{filename}") except Exception as e: text = f"保存文件时发生错误 错误信息:{e}" print(f"{threading.current_thread().name}:"+text) text = f"客户机{addr}保存文件时发生错误 错误信息{e}" write_log(log_file,text) sock.send(text.encode()) def transfer_file2(sock,first_data,sliced_sum,sliced_num,shared_key,addr,log_file): #处理切片数据 data_bytes = first_data orign_data = [] fail_number = [] for i in range(sliced_sum-1): text = sock.recv(1084) orign_data.append(text) if len(text) == 0: text = f"客户机{addr}中断连接" write_log(log_file,text) log_file.close() print(f"{threading.current_thread().name}:客户机中断连接") sock.close() return print(f"{threading.current_thread().name}:开始进行数据处理") for i in range(sliced_sum-1): sliced_sum,sliced_num,data_type,data,fail = unpack_data(sock,shared_key,orign_data[i],log_file,addr) if fail != None: fail_number.append(fail) data_bytes += data if len(fail_number)>0: fail_meg = ",".join(fail_number) print(f"{threading.current_thread().name}:传输中出现校验错误的切片编号为:{fail_meg}") text = f"客户机{addr}传输中出现了校验错误 切片编号为{fail_meg}" write_log(log_file,text) else: print(f"{threading.current_thread().name}:本次传输没有出现校验错误") data_type = mime.from_buffer(data_bytes) filename = "output/"+addr try: if data_type == "image/png": filename += ".png" with open(filename, 'wb') as f: f.write(data_bytes) elif data_type == "audio/mpeg": filename += ".mp3" with open(filename, 'wb') as f: f.write(data_bytes) elif data_type == "text/x-c": filename += ".c" with open(filename,'wb') as f: f.write(data_bytes) else: print(f"{threading.current_thread().name}:没有识别到文件类型{data_type},暂时存放在temp中") filename += "-temp" with open(filename,'wb') as f: f.write(data_bytes) text = f"客户机{addr}传输了{filename}" write_log(log_file,text) sock.send("\n成功接受到文件".encode()) print(f"{threading.current_thread().name}:成功接受到文件{filename}") except Exception as e: text = f"保存文件时发生错误 错误信息:{e}" print(f"{threading.current_thread().name}:"+text) text = f"客户机保存文件时发生错误 错误信息{e}" write_log(log_file,text) sock.send(text.encode()) def register(permission_value,sock,data,query_thread1,log_file,addr,login_username): if permission_value == 0: #管理员权限的检验较为简单 sock.send("你所登录的用户并没有拥有管理员权限 无法注册用户".encode()) text = f"客户机{addr}尝试使用非管理员权限用户({login_username})注册新用户" write_log(log_file,text) return else : parts = data.split(b';') username = parts[0] password = parts[1] query_thread1.adduser(sock,username,password) print(f"{threading.current_thread().name}:用户{username}注册成功") text = f"客户机{addr}使用管理员权限用户({login_username})注册新用户({username})成功 对应用户密码为({password})" def verify_md5(data, original_md5): current_md5 = md5_encrypt(data) return current_md5 == original_md5 def transfer_function(sock,addr,public_key_server,private_key_server,query_thread1): addr = format(addr[0]) log_name = "logfile/"+addr+"-log" try: log_file = open(log_name,'ab+') except Exception as e: print(f"{threading.current_thread().name}创建日志文件失败,错误信息:{e}") text = f"客户机{addr}连接成功" write_log(log_file,text) print(f"{threading.current_thread().name}:客户机{addr}连接成功") public_key_client = key_switch(sock,public_key_server) print(f"{threading.current_thread().name}:客户机{addr}公钥交换成功") print(f"{threading.current_thread().name}:开始计算共享密钥") shared_key = private_key_server.exchange(public_key_client) print(f"{threading.current_thread().name}:开始进行身份验证") permission_value,username = Authentication(sock,shared_key,query_thread1,addr,log_file) if permission_value == 3: text = f"客户机{addr}中断连接" write_log(log_file,text) log_file.close() sock.close() print(f"{threading.current_thread().name}:客户机中断连接") return print(f"{threading.current_thread().name}:身份验证通过 开始处理客户机{addr}的数据传输") while 1: menu(sock) fail_number = [] orign_data = (sock.recv(1084)) if len(orign_data) == 0: text = f"客户机{addr}中断连接" write_log(log_file,text) log_file.close() print(f"{threading.current_thread().name}:客户机中断连接") break sliced_sum,sliced_num,data_type,data,fail = unpack_data(sock,shared_key,orign_data,log_file,addr) if fail != None: fail_number.append(fail) if len(fail_number)>0: fail_meg = ",".join(fail_number) print(f"{threading.current_thread().name}:传输中出现校验错误的切片编号为:{fail_meg}") text = f"客户机{addr}传输中出现了校验错误 切片编号为{fail_meg}" write_log(log_file,text) else: print(f"{threading.current_thread().name}:本次传输没有出现校验错误") if data_type == 1: text = f"客户机{addr}执行了传输文本数据操作" write_log(log_file,text) transfer_txt(addr,data,log_file) elif data_type == 2: if sliced_sum != 0 or sliced_num != 0: text = f"客户机{addr}执行了传输文件操作,并且传输的文件过大需要切片传输,切片总数为{sliced_sum}" write_log(log_file,text) transfer_file2(sock,data,sliced_sum,sliced_num,shared_key,addr,log_file) else: text = f"客户机{addr}执行了传输文件操作" write_log(log_file,text) transfer_file(sock,data,addr,log_file) elif data_type ==3: text = f"客户机{addr}执行了注册账号操作" write_log(log_file,text) register(permission_value,sock,data,query_thread1,log_file,addr,username) def main(): # 固定的 DH 参数 p = 167369435262828772525424092470263277532300828423668354542113388987940885403967569624276882159444702063514952468172684099658558089467275283830276841486674376457626767287709752368374119769545320657500942952067517443133715688577345716588234520369470508343544601072272539124731608794518466242096737284389743744833 g = 2 pool = ThreadPoolExecutor(max_workers=5) #定义一个线程池 拥有5个空闲线程 query_thread1 = query_thread() #定义一个线程对象 后面用于数据库查询 因为数据库查询只能放在一个线程里使用 parameters = dh.DHParameterNumbers(p, g).parameters(backend=default_backend()) private_key_server = parameters.generate_private_key() public_key_server = private_key_server.public_key() s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) #创建套接字 默认使用TCP协议 s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) #允许在端口上处于 TIME_WAIT 状态时再次绑定 s.bind(("10.0.137.2",6666)) s.listen(5) #最多连接5个客户端 while 1: print(f"{threading.current_thread().name}:等待客户机连接") sock,addr = s.accept() client_thread = pool.submit(transfer_function,sock,addr,public_key_server,private_key_server,query_thread1) if __name__ == "__main__": main()
客户端
import socket import struct import hashlib import time import re from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import dh from cryptography.hazmat.backends import default_backend from Crypto.Util.Padding import pad, unpad from Crypto.Cipher import AES choice_text = """ 1.传输文本数据 2.传输文件数据 3.新增用户 4.断开连接 输入选项: """ def md5_encrypt(data): #md5加密 用于完整性检测 md5_hash = hashlib.md5() if isinstance(data,str): data = data.encode() md5_hash.update(data) hash_result = md5_hash.hexdigest() return hash_result def pack_data(choice,data,shared_key,sliced,sum): #将数据打包成报文 格式: sliced_judge(4bytes)+data_length(4bytes)+data_type(4bytes)+encrypted_data(Dynamic)+md5_data(32bytes) sliced_sum = struct.pack('>H',sum) sliced_num = struct.pack('>H',sliced) choice_type = struct.pack('>I',choice) en_data = encrypt_data(shared_key,data) hash_data = md5_encrypt(data).encode() temp = choice_type+en_data+hash_data transfer_data = sliced_sum+sliced_num+struct.pack('>I',len(temp)+8)+temp return transfer_data def key_switch(s,public_key_client): #交换公钥 print("等待连接服务端") s.connect(("69.165.67.133",6666)) print("连接上服务端 开始进行公钥交换") public_key_bytes = public_key_client.public_bytes( encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo ) s.send(public_key_bytes) public_key_server = s.recv(1024) return serialization.load_pem_public_key(public_key_server) def encrypt_data(shared_key,data): #aes加密 key = shared_key[:16] aes = AES.new(key,AES.MODE_ECB) if isinstance(data,str): data = data.encode() data_bytes = pad(data, AES.block_size) encrypt_data = aes.encrypt(data_bytes) return encrypt_data def Authentication(s,shared_key): print("开始进行身份验证") username = input("输入用户名:") encrypt_username = encrypt_data(shared_key,username) #用户名经过aes加密后传输 s.send(encrypt_username) text = s.recv(1024).decode() if text == "你输入的用户名不存在 终止本次连接": print(text) exit() else: print(text) password = input("输入密码:") encrypt_password = encrypt_data(shared_key,password) #密码经过aes加密后传输 s.send(encrypt_password) text = s.recv(1024).decode() if text == "密码错误,终止连接": print(text) exit() else : print(text) def register(s,shared_key): username = input("输入用户名 :") #账号密码用;来间隔后 采用base64编码后丢到pack_data函数中处理 password = input("输入密码 :") data = username+";"+password transfer_data = pack_data(3,data,shared_key,0,0) s.send(transfer_data) def transfer_txt(s,shared_key): text = input("输入数据 :") #这个函数负责处理直接传输的文本数据 但是仍然对于是否需要切片进行了判断 if len(text) > 1024: sliced = 0 chunk_size = 1024 sliced_data = slice_string(text, chunk_size) sliced_sum = len(sliced_data) for i in range(len(sliced_data)): transfer_data = pack_data(1,sliced_data[i],shared_key,sliced,sliced_sum) sliced += 1 s.send(transfer_data) else : sliced = 0 sliced_sum = 0 transfer_data = pack_data(1,text,shared_key,sliced,sliced_sum) s.send(transfer_data) print((s.recv(1024)).decode()) def slice_string(data, chunk_size): #根据指定大小切片字符串 return [data[i:i + chunk_size] for i in range(0, len(data), chunk_size)] def transfer_file(s,shared_key): file_addr = input("请输入文件对于client.py的相对路径 :") #读取客户端本地文件的二进制流 with open(file_addr,'rb') as file: text = file.read() transfer_data = [] if len(text) > 1024: sliced = 0 chunk_size = 1024 sliced_data = slice_string(text, chunk_size) sliced_sum = len(sliced_data) print(f"一共有{sliced_sum}组数据需要传输\n") begin_time = time.time() for i in range(len(sliced_data)): sliced += 1 transfer_data.append(pack_data(2,sliced_data[i],shared_key,sliced,sliced_sum)) for i in range(sliced_sum): s.sendall(transfer_data[i]) time.sleep(0.06) currect_time = time.time() temp = currect_time-begin_time print("\r"+f"正在传输第{i}组 当前运行了{temp:.2f}s",end="") print("\n",end="") else : sliced = 0 sliced_sum = 0 transfer_data = pack_data(2,text,shared_key,sliced,sliced_sum) s.send(transfer_data) print((s.recv(1024)).decode()+"\n") def main(): # 固定的 DH 参数 p = 167369435262828772525424092470263277532300828423668354542113388987940885403967569624276882159444702063514952468172684099658558089467275283830276841486674376457626767287709752368374119769545320657500942952067517443133715688577345716588234520369470508343544601072272539124731608794518466242096737284389743744833 g = 2 parameters = dh.DHParameterNumbers(p, g).parameters(backend=default_backend()) private_key_client = parameters.generate_private_key() public_key_client = private_key_client.public_key() s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) public_key_server = key_switch(s,public_key_client) print("公钥交换成功") print("开始计算共享密钥") shared_key = private_key_client.exchange(public_key_server) Authentication(s,shared_key) print("成功连接") while 1: # print((s.recv(1024)).decode()) choice = input(choice_text) if choice == "1": transfer_txt(s,shared_key) elif choice == "2": transfer_file(s,shared_key) elif choice == "3": register(s,shared_key) elif choice == "4": print("程序终止") s.close() exit() else : print("未找到输入的选项,请重新输入") continue if __name__ == "__main__": main()
v1.0 最终版本! 最大的更新就是给客户端增加了图形化界面 看起来更高级一点 而且为了服务图形化界面的逻辑 所以一些客户机和服务机的交互也更改了
tkinter部分控件学习 第一次尝试编写图形化界面 这里就选用tkinter库
from tkinter import * root = Tk() root.mainloop()
这样短短三行 即可生成一个根窗口 子窗口关闭后根窗口不会关闭 根窗口关闭后子窗口会关闭 可以对窗口进行一些简单的操作
from tkinter import * root = Tk() root.title("安全协议期末大作业设计") root.geometry("1000x800") #设置窗口大小 root.iconbitmap("test.ico") #设置窗口左上角图标 root.config(bg="green") #设置窗口背景颜色 root.mainloop()
接下来可以使用相关控件来帮助我们构建
lable from tkinter import * root = Tk() root.title("安全协议期末大作业设计") root.geometry("600x600") root.iconbitmap("test.ico") meg = Label(root,text="this is a test") meg.pack(padx=100,side="left") root.mainloop()
lable控件可以在窗口上显示文字或者图像 size可以控制控件在哪里显示 默认是top padx和pady则是代表着和x轴或者y轴的偏移是多少 显示图片的话 最好再加上pillow库来配合 因为tk本身支持的图片格式有点少 只支持png
from tkinter import * from PIL import Image, ImageTk root = Tk() root.title("安全协议期末大作业设计") root.geometry("600x600") root.iconbitmap("test.ico") temp = Image.open("1.png") temp.thumbnail((100,100)) #如果图片过大的话 可以利用thumbnail来限制图片大小 image = ImageTk.PhotoImage(temp) meg = Label(root,image=image) meg.pack(side="left",padx=250) root.mainloop()
entry 这个控件可以在窗口上显示一个输入框 配合lable控件就可以实现引导程序使用者输入变量
from tkinter import * from PIL import Image, ImageTk root = Tk() root.title("安全协议期末大作业设计") root.geometry("600x600") root.iconbitmap("test.ico") control1 = Label(root,text="测试输出") control1.pack(side="left") control2 = Entry(root) control2.pack(side="left") root.mainloop()
不过这样需要两个控件的位置相匹配 用pack中的padx,pady,side处理一对还好 要是界面需要设计的点多 就会比较杂乱 这里可以使用grid控件来配合
grid grid插件相当于把整个窗口界面转化成了一个二维坐标系 row参数用来规定行数 column用来规定列数 于是上面用pack需要计算padx和pady来实现的效果 可以简单的用这两个参数来实现
from tkinter import * from PIL import Image, ImageTk root = Tk() root.title("安全协议期末大作业设计") root.geometry("600x600") root.iconbitmap("test.ico") control1 = Label(root,text="测试输出") control1.grid(row=0) control2 = Entry(root) control2.grid(row=0,column=1) root.mainloop()
上述控件和button控件组合起来 就可以实现一个简单的账号密码登录界面
from tkinter import * from PIL import Image, ImageTk root = Tk() var=IntVar() root.title("安全协议期末大作业设计") # root.geometry("200x200") # root.iconbitmap("test.ico") def check(): username = entry1.get() password = entry2.get() if username == "admin" and password == "admin": print("账号密码正确") Label(root,text="用户名",padx=10,pady=10).grid(row=0,sticky=W) Label(root,text="密码").grid(row=1,sticky=E,padx=10,pady=10) entry1 = Entry(root) entry1.grid(row=0,column=1) entry2 = Entry(root) entry2.grid(row=1,column=1) button1 = Button(root,text="登录",command=check) button1.grid(row=0,column=2,rowspan=2,columnspan=2,padx=10) root.mainloop()
frame 大部分时候 主窗口的大小都需要提前设置 而非由控件大小来自动规划窗口大小 这个时候控件相对于主窗口就会较小 造成界面的不美观 下面这个示例 frame1容器收纳了三个控件 这三个控件的grid控制的是在frame1容器中的位置 而frame1的grid控制的则是在主窗口中的位置 这样可以更加方便的设置不同控件的相对位置 不过最好使用grid_propagate方法来固定容器的位置 不然随着后续容器中的控件位置发生变化 容器的位置也会发生变化
from tkinter import * import time from PIL import Image, ImageTk root = Tk() root.title("安全协议期末大作业设计") root.geometry("300x100") # root.iconbitmap("test.ico") def main(): frame1 = Frame(root) Label(frame1,text="ip地址").grid(row=0,column=0) Entry(frame1).grid(row=0,column=1) Button(frame1,text="连接").grid(row=0,column=2,padx=10) frame1.grid(padx=50,pady=30) root.mainloop() if __name__ == "__main__": main()
text 这一个控件的作用和entry相似 只不过entry只能支持一行的输入
from tkinter import * import time from PIL import Image, ImageTk root = Tk() root.title("安全协议期末大作业设计") root.geometry("300x200") # root.iconbitmap("test.ico") def main(): text = Text(root,width=40,height=10) text.grid(padx=7) text.insert('1.0',"test") root.mainloop() if __name__ == "__main__": main()
上述代码中insert负责给文本框写入数据 这里的1.0代表的是第一行第0个字符开始写入 你也可以使用1.end来表示第一行的最后一个字符
这个控件主要是配合text使用 在最右边加一个滚动条 看起来美观一点 也方便用户更直观的知道当前文本框中的所处位置
from tkinter import * import time from PIL import Image, ImageTk root = Tk() root.title("安全协议期末大作业设计") root.geometry("300x200") # root.iconbitmap("test.ico") def main(): s1 = Scrollbar(root,width=20) s1.grid(row=0,column=1,sticky=NS) text = Text(root,width=35,height=10,yscrollcommand=s1.set,wrap=CHAR) text.grid(row=0,column=0,padx=7) text.insert('1.0',"test") root.mainloop() if __name__ == "__main__": main()
yscrollcommand用来绑定滚动条和text wrap参数用来规定是否自动换行 参数有WORD CHAR NONE三种 第一种是单词换行 第二种是字符换行 第三种是不自动换行
canvas 这个控件主要用于显示文本 图片等 主要是可以绑定scrollbar
from tkinter import * import time from PIL import Image, ImageTk root = Tk() root.title("安全协议期末大作业设计") root.geometry("300x200") # root.iconbitmap("test.ico") def main(): c1 = Canvas(root,width=280) c1.grid(row=0,column=0,sticky=W) s1 = Scrollbar(root,width=20,command=c1.yview) s1.grid(row=0,column=1,sticky=NS) c1.config(yscrollcommand=s1.set) frame1 = Frame(c1) c1.create_window((0, 0), window=frame1, anchor='nw') for i in range(20): Label(frame1, text=f"Label {i+1}").pack() frame1.update_idletasks() c1.config(scrollregion=c1.bbox("all")) root.mainloop() if __name__ == "__main__": main()
创建出一个canvas控件后 还需要创建一个内部窗口 用来显示文本或者图片 scrollbar绑定的是canvas 但是输出的文本还是依靠label
tkinter实际利用 基于tk来实现图形化界面 现在主要是卡在滚动条的实现 本来预期是打算设计成 上面部分用来显示操作界面 下面用来输出信息 然后下面的部分本来想着是加一个滚动条来查看输出信息过多的情况下以往的输出信息 但是tk的滚动条实现起来实在有点太麻烦了 而且tk的美观程度也很差 就是因为他的控件布局太难用了 所以写到这里就烂尾了 看看后面还有没有兴趣搞下去吧 暂时是打算转用pyside6了
import socket import struct import hashlib import time from tkinter import * from PIL import Image, ImageTk from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import dh from cryptography.hazmat.backends import default_backend from Crypto.Util.Padding import pad, unpad from tkinter.ttk import Separator from Crypto.Cipher import AES def add_message_to_frame2(frame2, text, row=None, column=0, sticky=W): """统一处理向frame2添加消息的函数""" if not hasattr(frame2, 'current_row'): frame2.current_row = 0 if not hasattr(frame2, 'max_rows'): frame2.max_rows = 5 if row is None: row = frame2.current_row # 如果超过最大行数,清除所有控件并重置行计数 if frame2.current_row >= frame2.max_rows: for widget in frame2.winfo_children(): widget.destroy() frame2.current_row = 0 Label(frame2, text=text, bg="white").grid(row=frame2.current_row, column=column, sticky=sticky) frame2.current_row += 1 def md5_encrypt(data): #md5加密 用于完整性检测 md5_hash = hashlib.md5() if isinstance(data,str): data = data.encode() md5_hash.update(data) hash_result = md5_hash.hexdigest() return hash_result def pack_data(choice,data,shared_key,sliced,sum): #将数据打包成报文 格式: sliced_judge(4bytes)+data_length(4bytes)+data_type(4bytes)+encrypted_data(Dynamic)+md5_data(32bytes) sliced_sum = struct.pack('>H',sum) sliced_num = struct.pack('>H',sliced) choice_type = struct.pack('>I',choice) en_data = encrypt_data(shared_key,data) hash_data = md5_encrypt(data).encode() temp = choice_type+en_data+hash_data transfer_data = sliced_sum+sliced_num+struct.pack('>I',len(temp)+8)+temp return transfer_data def key_switch(private_key_client,frame1,frame2,ipaddr,s,public_key_client): #交换公钥 ip = ipaddr.get() s.connect((ip,6666)) add_message_to_frame2(frame2, "连接上服务端 开始进行公钥交换") for widget in frame1.winfo_children(): widget.destroy() public_key_bytes = public_key_client.public_bytes( encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo ) s.send(public_key_bytes) public_key_server = s.recv(1024) public_key_server = serialization.load_pem_public_key(public_key_server) add_message_to_frame2(frame2, "公钥交换成功,开始计算共享密钥") shared_key = private_key_client.exchange(public_key_server) Authentication(s,shared_key,frame1,frame2) def encrypt_data(shared_key,data): #aes加密 key = shared_key[:16] aes = AES.new(key,AES.MODE_ECB) if isinstance(data,str): data = data.encode() data_bytes = pad(data, AES.block_size) encrypt_data = aes.encrypt(data_bytes) return encrypt_data def finish_connect(s): s.close() exit() def menu(s,shared_key,frame1,frame2): for widget in frame1.winfo_children(): widget.destroy() button1 = Button(frame1,text="传输文本",command=lambda:transfer_txt(s,shared_key,frame1,frame2)).grid(row=0,column=0,padx=60,pady=20) button2 = Button(frame1,text="传输文件",command=lambda:transfer_file(s,shared_key)).grid(row=0,column=1) button3 = Button(frame1,text="注册用户",command=lambda:register(s,shared_key)).grid(row=1,column=0) button4 = Button(frame1,text="退出程序",command=lambda:finish_connect(s)).grid(row=1,column=1) def login(entry1,entry2,s,shared_key,frame2,frame1): username = entry1.get() password = entry2.get() data = username+";"+password transfer_data = encrypt_data(shared_key,data) #使用aes加密账号密码 s.send(transfer_data) text = (s.recv(1024)).decode() if text == "你输入的用户名不存在 终止本次连接": add_message_to_frame2(frame2, text) s.close() elif text == "密码错误,终止连接": add_message_to_frame2(frame2, text) s.close() else: add_message_to_frame2(frame2, text) menu(s,shared_key,frame1,frame2) def focus_next(entry): entry.focus() def Authentication(s,shared_key,frame1,frame2): add_message_to_frame2(frame2, "开始进行身份验证") Label(frame1,text="用户名").grid(row=0,pady=30) Label(frame1,text="密码").grid(row=1) entry1 = Entry(frame1) entry1.grid(row=0,column=1,padx=10) entry2 = Entry(frame1) entry1.bind("<Return>",lambda event:focus_next(entry2)) entry2.grid(row=1,column=1,padx=10) entry2.bind("<Return>",lambda event:login(entry1,entry2,s,shared_key,frame2,frame1)) Button(frame1,text="登录",height=3,width=7,command=lambda : login(entry1,entry2,s,shared_key,frame2,frame1)).grid(row=0,column=2,rowspan=2,pady=(30,0)) def register(s,shared_key): username = input("输入用户名 :") #账号密码用;来间隔后 采用base64编码后丢到pack_data函数中处理 password = input("输入密码 :") data = username+";"+password transfer_data = pack_data(3,data,shared_key,0,0) s.send(transfer_data) def transfer_txt1(entry1,s,shared_key,frame2): text = entry1.get('1.0',END) if len(text) > 1024: sliced = 0 chunk_size = 1024 sliced_data = slice_string(text, chunk_size) sliced_sum = len(sliced_data) for i in range(len(sliced_data)): transfer_data = pack_data(1,sliced_data[i],shared_key,sliced,sliced_sum) sliced += 1 try: s.send(transfer_data) add_message_to_frame2(frame2, "发送成功") except Exception as e: add_message_to_frame2(frame2, f"发送失败,错误信息:{e}") else : sliced = 0 sliced_sum = 0 transfer_data = pack_data(1,text,shared_key,sliced,sliced_sum) try: s.send(transfer_data) add_message_to_frame2(frame2, "发送成功") except Exception as e: add_message_to_frame2(frame2, f"发送失败,错误信息:{e}") print((s.recv(1024)).decode()) def transfer_txt(s,shared_key,frame1,frame2): #这个函数负责处理直接传输的文本数据 但是仍然对于是否需要切片进行了判断 for widget in frame1.winfo_children(): widget.destroy() s1 = Scrollbar(frame1,width=20) Label(frame1,text="传输数据:").grid(row=0,column=0,sticky=W,padx=5) s1.grid(row=1,column=1,sticky=NS) text = Text(frame1,width=38,height=6,yscrollcommand=s1.set,wrap=CHAR) text.grid(row=1,column=0,padx=5) Button(frame1,command=lambda:transfer_txt1(text,s,shared_key,frame2),text="发送").grid(row=2,column=0,pady=5,sticky=E) def slice_string(data, chunk_size): #根据指定大小切片字符串 return [data[i:i + chunk_size] for i in range(0, len(data), chunk_size)] def transfer_file(s,shared_key): file_addr = input("请输入文件对于client.py的相对路径 :") #读取客户端本地文件的二进制流 with open(file_addr,'rb') as file: text = file.read() transfer_data = [] if len(text) > 1024: sliced = 0 chunk_size = 1024 sliced_data = slice_string(text, chunk_size) sliced_sum = len(sliced_data) print(f"一共有{sliced_sum}组数据需要传输\n") begin_time = time.time() for i in range(len(sliced_data)): sliced += 1 transfer_data.append(pack_data(2,sliced_data[i],shared_key,sliced,sliced_sum)) for i in range(sliced_sum): s.sendall(transfer_data[i]) time.sleep(0.06) currect_time = time.time() temp = currect_time-begin_time print("\r"+f"正在传输第{i+1}组 当前运行了{temp:.2f}s",end="") print("\n",end="") else : sliced = 0 sliced_sum = 0 transfer_data = pack_data(2,text,shared_key,sliced,sliced_sum) s.send(transfer_data) print((s.recv(1024)).decode()+"\n") def main(): # 固定的 DH 参数 p = 167369435262828772525424092470263277532300828423668354542113388987940885403967569624276882159444702063514952468172684099658558089467275283830276841486674376457626767287709752368374119769545320657500942952067517443133715688577345716588234520369470508343544601072272539124731608794518466242096737284389743744833 g = 2 parameters = dh.DHParameterNumbers(p, g).parameters(backend=default_backend()) private_key_client = parameters.generate_private_key() public_key_client = private_key_client.public_key() s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) root = Tk() root.title("协议大作业-第六组") root.geometry("300x300") frame1 = Frame(root,width=300,height=150) frame2 = Frame(root,width=300,height=150,bg="white") frame2.current_row = 0 frame2.max_rows = 6 # 设置最大显示行数 Label(frame1,text="ip地址:").grid(row=0,padx=10,pady=50) #输入ip地址 ipaddr = Entry(frame1) ipaddr.bind("<Return>",lambda event:key_switch(private_key_client,frame1,frame2,ipaddr,s,public_key_client)) ipaddr.grid(row=0,column=1) Button(frame1,text="连接",command=lambda: key_switch(private_key_client,frame1,frame2,ipaddr,s,public_key_client),width=10).grid(row=0,column=2,padx=10) frame1.grid(row=0) frame1.grid_propagate(0) frame2.grid(row=2) frame2.grid_propagate(0) separator = Frame(root, height=2, bd=1, relief=SUNKEN) #分割线 separator.grid(row=1, column=0, sticky=EW) root.mainloop() if __name__ == "__main__": main()
pyside6学习 简单窗口显示 from PySide6.QtWidgets import * def main(): app = QApplication() window = QWidget() window.setWindowTitle("安全协议大作业") window.setFixedSize(400,300) label = QLabel("测试输出",window) label.move(175,125) window.show() app.exec() if __name__ == "__main__": main()
上述程序可以创建出一个窗口后简单的输出文字 总体的逻辑是和tk差不多的 需要创建一个窗口对象 然后给窗口增加控件
qt designer 我觉得pyside6相较于tk最显著的优势就是 可以使用编辑工具来简化ui的设计 如图所示 设计一个简单的ui 保存ui文件 在程序中载入ui 然后加载到一个窗口上
import sys from PySide6.QtWidgets import * from PySide6.QtCore import QFile, QIODevice from PySide6.QtUiTools import QUiLoader from PySide6.QtWidgets import QApplication def main(): ui_file = QFile("test.ui") loader = QUiLoader() app = QApplication() window = loader.load(ui_file) window.setWindowTitle("安全协议大作业") # window.setFixedSize(400,300) ui_file.close() window.show() app.exec() if __name__ == "__main__": main()
可以看到非常简单的就实现了这几个控件 比tk挪半天位置好用多了 tk是人用的吗???
函数绑定 解决了ui设计的难题后 接下来的重点就是放在函数的绑定上 其实也很简单 就是要注意自己在qt designer中定义的控件名称 然后需要把ui文件转化成py文件
pyside6-uic filename.ui -o filename.py
随后在程序中直接导入即可 然后函数绑定的大体上是和tk一致的 具体到各个控件的方法有些不太一样 就有需求的自己查询 这里放一个我自己弄的账号密码登录程序
import sys from PySide6.QtWidgets import * from ui import Ui_Form class mywindow(QMainWindow,Ui_Form): def __init__(self): QMainWindow.__init__(self) self.setupUi(self) hide_pwd = self.checkBox.stateChanged.connect(lambda:self.hide_password()) login_button = self.b1.clicked.connect(lambda:self.login()) exit_button = self.pushButton_2.clicked.connect(lambda:self.exit_process()) def exit_process(self): exit() def hide_password(self): state = self.lineEdit_2.echoMode() if str(state) == "EchoMode.Password": self.lineEdit_2.setEchoMode(QLineEdit.EchoMode.Normal) else : self.lineEdit_2.setEchoMode(QLineEdit.EchoMode.Password) def login(self): username = self.lineEdit.text() password = self.lineEdit_2.text() if username == "admin" and password == "admin": msg = "登录成功" QMessageBox.information(self,'输出信息',msg) elif len(username)==0: msg = "必须输入账号" QMessageBox.information(self,'输出信息',msg) elif len(password) ==0: msg = "必须输入密码" QMessageBox.information(self,'输出信息',msg) elif username != "admin": msg = "账号不存在" QMessageBox.information(self,'输出信息',msg) elif password != "admin": msg = "密码错误" QMessageBox.information(self,'输出信息',msg) def main(): app = QApplication() w1 = mywindow() w1.show() app.exec() if __name__ == "__main__": main()
写这么久博客第一次放gif图 看起来画质一般般 而且也捕获不到弹出的第二个窗口 哎下次改进吧
最终代码 v1.0版本最后采用了pyside6作为图形化界面框架 把客户端用图形化界面包了起来 同时为了配合客户端的优化 服务端也作了一些修改 服务端
import socket import hashlib import magic import time from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import dh from cryptography.hazmat.backends import default_backend from Crypto.Util.Padding import pad, unpad from Crypto.Cipher import AES import sqlite3 from concurrent.futures import ThreadPoolExecutor import threading mime = magic.Magic(mime=True) def menu(sock): text = "开始传输数据" sock.send(text.encode()) def write_log(log_file,text): current_time = time.asctime(time.localtime(time.time())) text += " "+current_time log_file.write(text.encode()+b"\n") class query_thread(threading.Thread): def __init__(self,name = None): threading.Thread.__init__(self,name=name) def query_username(self,username): conn = sqlite3.connect('users.db') cursor = conn.cursor() cursor.execute("SELECT username FROM users WHERE username = ?", (username,)) result = cursor.fetchone() cursor.close() return result def query_password(self,username,password): conn = sqlite3.connect('users.db') cursor = conn.cursor() cursor.execute("SELECT password FROM users WHERE username = ? AND password = ?", (username, password)) result = cursor.fetchone() cursor.close() return result def add_user(self,sock,username,password,addr,login_username,log_file): conn = sqlite3.connect('users.db') cursor = conn.cursor() try: # 插入新的用户数据 cursor.execute("INSERT INTO users (username, password) VALUES (?, ?)", (str(username), str(password))) conn.commit() except sqlite3.IntegrityError: sock.send("用户已存在".encode()) print(f"{threading.current_thread().name}:用户{username}已存在,注册失败") text = f"客户机{addr}使用管理员权限用户({login_username})注册新用户({username})失败 因为用户已存在" write_log(log_file,text) except Exception as e: sock.send(e.encode()) sock.send("用户注册成功".encode()) print(f"{threading.current_thread().name}:用户{username}注册成功") text = f"客户机{addr}使用管理员权限用户({login_username})注册新用户({username})成功 对应用户密码为({password})" write_log(log_file,text) def md5_encrypt(data): if isinstance(data, str): data = data.encode() # md5加密需要bytes参数 md5_hash = hashlib.md5() #md5加密 md5_hash.update(data) hash_result = md5_hash.hexdigest() return hash_result def key_switch(sock,public_key_server): print(f"{threading.current_thread().name}开始进行公钥交换") try: public_key_client = sock.recv(1024) except Exception as e: print("接受错误",e) public_key_bytes = public_key_server.public_bytes( #将客户端的公钥序列化为 PEM 格式的字节串 encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo ) sock.send(public_key_bytes) return serialization.load_pem_public_key(public_key_client) #将接受到的服务端公钥反序列化为公钥对象 def decrypt_data(shared_key,data): #data要为byte型 key = shared_key[:16] aes = AES.new(key,AES.MODE_ECB) if isinstance(data,str): data = data.encode() decrypted_data = aes.decrypt(data) data = (unpad(decrypted_data, 16)) return data def unpack_data(sock,shared_key,orign_data,log_file,addr): #拆解数据包 sliced_sum = int.from_bytes(orign_data[:2],byteorder = 'big') sliced_num = int.from_bytes(orign_data[2:4],byteorder='big') data_length = int.from_bytes(orign_data[4:8], byteorder='big') data_type = int.from_bytes(orign_data[8:12], byteorder='big') data_md5 = (orign_data[-32:]).decode() data = decrypt_data(shared_key,orign_data[12:(data_length-32)]) value = verify_md5(data,data_md5) if value == False: fail_number = sliced_num else: fail_number = None return sliced_sum,sliced_num,data_type,data,fail_number def Authentication(sock,shared_key,query_thread1,addr,log_file): #身份验证部分 账号密码存储在users.db数据库中 key = shared_key[:16] aes = AES.new(key,AES.MODE_ECB) en_data = (sock.recv(1024)) if len(en_data) == 0: text = f"客户机{addr}中断连接" write_log(log_file,text) log_file.close() sock.close() print(f"{threading.current_thread().name}:客户机中断连接") return data = decrypt_data(shared_key,en_data) parts = data.split(b';') username = parts[0].decode() password = parts[1].decode() result = query_thread1.query_username(username) if result != None: result = query_thread1.query_password(username,password) if result != None: sock.send("身份验证通过".encode()) if username == "admin": permission_value = 1 else : permission_value = 0 text = f"客户机{addr}使用用户名({username})密码({password})进行身份验证通过" write_log(log_file,text) return permission_value,username else : sock.send("密码错误,终止连接".encode()) text = f"客户机{addr}尝试登录用户({username})输入密码({password}),密码错误登录失败" write_log(log_file,text) sock.close() return 3,username else: sock.send("你输入的用户名不存在 终止本次连接".encode()) text = f"客户机{addr}尝试登录用户({username}),但是用户不存在" write_log(log_file,text) sock.close() return 3,username def transfer_txt(addr,data,log_file): print(f"{threading.current_thread().name}客户机{addr}传输的数据为: {data}") current_time = time.asctime(time.localtime(time.time())) text = f"客户机{addr}传输的数据为: {data} "+current_time write_log(log_file,text) def transfer_file(sock,data,addr,log_file): #处理非切片数据 data_type = mime.from_buffer(data) filename = "output/"+addr try: if data_type == "text/plain": filename += ".txt" with open(filename, 'wb') as f: f.write(data) elif data_type == "text/x-c": filename += ".c" with open(filename,'wb') as f: f.write(data) else : print(f"没有识别到文件类型{data_type},暂时存放在temp中") filename += "-temp" with open(filename,'wb') as f: f.write(data) text = f"客户机{addr}传输了{filename}" write_log(log_file,text) sock.send("\n成功接受到文件".encode()) print(f"{threading.current_thread().name}:成功接受到文件{filename}") except Exception as e: text = f"保存文件时发生错误 错误信息:{e}" print(f"{threading.current_thread().name}:"+text) text = f"客户机{addr}保存文件时发生错误 错误信息{e}" write_log(log_file,text) sock.send(text.encode()) def transfer_file2(sock,first_data,sliced_sum,sliced_num,shared_key,addr,log_file): #处理切片数据 data_bytes = first_data orign_data = [] fail_number = [] for i in range(sliced_sum-1): text = sock.recv(1084) orign_data.append(text) if len(text) == 0: text = f"客户机{addr}中断连接" write_log(log_file,text) log_file.close() print(f"{threading.current_thread().name}:客户机中断连接") sock.close() return print(f"{threading.current_thread().name}:开始进行数据处理") for i in range(sliced_sum-1): print(f"\r{threading.current_thread().name}:正在处理第{i+1}组数据",end="") sliced_sum,sliced_num,data_type,data,fail = unpack_data(sock,shared_key,orign_data[i],log_file,addr) if fail != None: fail_number.append(fail) data_bytes += data if len(fail_number)>0: fail_meg = ",".join(fail_number) print(f"{threading.current_thread().name}:传输中出现校验错误的切片编号为:{fail_meg}") text = f"客户机{addr}传输中出现了校验错误 切片编号为{fail_meg}" write_log(log_file,text) else: print(f"{threading.current_thread().name}:本次传输没有出现校验错误") data_type = mime.from_buffer(data_bytes) filename = "output/"+addr try: if data_type == "image/png": filename += ".png" with open(filename, 'wb') as f: f.write(data_bytes) elif data_type == "audio/mpeg": filename += ".mp3" with open(filename, 'wb') as f: f.write(data_bytes) elif data_type == "text/x-c": filename += ".c" with open(filename,'wb') as f: f.write(data_bytes) else: print(f"{threading.current_thread().name}:没有识别到文件类型{data_type},暂时存放在temp中") filename += "-temp" with open(filename,'wb') as f: f.write(data_bytes) text = f"客户机{addr}传输了{filename}" write_log(log_file,text) sock.send("\n成功接受到文件".encode()) print(f"{threading.current_thread().name}:成功接受到文件{filename}") except Exception as e: text = f"保存文件时发生错误 错误信息:{e}" print(f"{threading.current_thread().name}:"+text) text = f"客户机保存文件时发生错误 错误信息{e}" write_log(log_file,text) sock.send(text.encode()) def register(permission_value,sock,data,query_thread1,log_file,addr,login_username): if permission_value == 0: #管理员权限的检验较为简单 sock.send("你所登录的用户并没有拥有管理员权限 无法注册用户".encode()) text = f"客户机{addr}尝试使用非管理员权限用户({login_username})注册新用户" write_log(log_file,text) return else : parts = data.split(b';') username = parts[0].decode() password = parts[1].decode() try: query_thread1.add_user(sock,username,password,addr,login_username,log_file) except Exception as e: print(e) return def verify_md5(data, original_md5): current_md5 = md5_encrypt(data) return current_md5 == original_md5 def transfer_function(sock,addr,public_key_server,private_key_server,query_thread1): addr = format(addr[0]) log_name = "logfile/"+addr+"-log" try: log_file = open(log_name,'ab+') except Exception as e: print(f"{threading.current_thread().name}创建日志文件失败,错误信息:{e}") text = f"客户机{addr}连接成功" write_log(log_file,text) print(f"{threading.current_thread().name}:客户机{addr}连接成功") public_key_client = key_switch(sock,public_key_server) print(f"{threading.current_thread().name}:客户机{addr}公钥交换成功") print(f"{threading.current_thread().name}:开始计算共享密钥") shared_key = private_key_server.exchange(public_key_client) print(f"{threading.current_thread().name}:开始进行身份验证") permission_value,username = Authentication(sock,shared_key,query_thread1,addr,log_file) if permission_value == 3: text = f"客户机{addr}中断连接" write_log(log_file,text) log_file.close() sock.close() print(f"{threading.current_thread().name}:客户机中断连接") return print(f"{threading.current_thread().name}:身份验证通过 开始处理客户机{addr}的数据传输") while 1: fail_number = [] orign_data = (sock.recv(1084)) if len(orign_data) == 0: text = f"客户机{addr}中断连接" write_log(log_file,text) log_file.close() print(f"{threading.current_thread().name}:客户机中断连接") break sliced_sum,sliced_num,data_type,data,fail = unpack_data(sock,shared_key,orign_data,log_file,addr) if fail != None: fail_number.append(fail) if len(fail_number)>0: fail_meg = ",".join(fail_number) print(f"{threading.current_thread().name}:传输中出现校验错误的切片编号为:{fail_meg}") text = f"客户机{addr}传输中出现了校验错误 切片编号为{fail_meg}" write_log(log_file,text) else: print(f"{threading.current_thread().name}:本次传输没有出现校验错误") if data_type == 1: text = f"客户机{addr}执行了传输文本数据操作" write_log(log_file,text) transfer_txt(addr,data,log_file) elif data_type == 2: if sliced_sum != 0 or sliced_num != 0: text = f"客户机{addr}执行了传输文件操作,并且传输的文件过大需要切片传输,切片总数为{sliced_sum}" write_log(log_file,text) transfer_file2(sock,data,sliced_sum,sliced_num,shared_key,addr,log_file) else: text = f"客户机{addr}执行了传输文件操作" write_log(log_file,text) transfer_file(sock,data,addr,log_file) elif data_type ==3: text = f"客户机{addr}执行了注册账号操作" write_log(log_file,text) register(permission_value,sock,data,query_thread1,log_file,addr,username) def main(): # 固定的 DH 参数 p = 167369435262828772525424092470263277532300828423668354542113388987940885403967569624276882159444702063514952468172684099658558089467275283830276841486674376457626767287709752368374119769545320657500942952067517443133715688577345716588234520369470508343544601072272539124731608794518466242096737284389743744833 g = 2 pool = ThreadPoolExecutor(max_workers=5) #定义一个线程池 拥有5个空闲线程 query_thread1 = query_thread() #定义一个线程对象 后面用于数据库查询 因为数据库查询只能放在一个线程里使用 parameters = dh.DHParameterNumbers(p, g).parameters(backend=default_backend()) private_key_server = parameters.generate_private_key() public_key_server = private_key_server.public_key() s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) #创建套接字 默认使用TCP协议 s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) #允许在端口上处于 TIME_WAIT 状态时再次绑定 s.bind(("192.168.220.130",6666)) s.listen(5) #最多连接5个客户端 while 1: print(f"{threading.current_thread().name}:等待客户机连接") sock,addr = s.accept() client_thread = pool.submit(transfer_function,sock,addr,public_key_server,private_key_server,query_thread1) if __name__ == "__main__": main()
客户端
```import socket
import struct
import hashlib
import time
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import dh
from cryptography.hazmat.backends import default_backend
from Crypto.Util.Padding import pad, unpad
from PySide6.QtWidgets import *
from Crypto.Cipher import AES
from ip_connect import Ipwindow
from login import loginwindow
from main import mainwindow
from progressbar import barwindow
from register import registerwindow
class ipwindow(QMainWindow,Ipwindow):
def __init__(self):
QMainWindow.__init__(self)
self.setupUi(self)
self.setWindowTitle("安全协议大作业-第六组")
l1 = self.lineEdit.returnPressed.connect(lambda:self.ip_connect())
b1 = self.pushButton.clicked.connect(lambda:self.ip_connect())
def ip_connect(self):
ipaddr = self.lineEdit.text()
key_switch(ipaddr,self)
class loginwindow(QMainWindow,loginwindow):
def __init__(self,s,shared_key):
QMainWindow.__init__(self)
self.setupUi(self)
self.setWindowTitle("安全协议大作业-第六组")
b1 = self.pushButton.clicked.connect(lambda:self.login(s,shared_key))
b2 = self.pushButton_2.clicked.connect(lambda:self.return_ipconnect())
l1 = self.lineEdit.returnPressed.connect(lambda:self.move_focus(self.lineEdit_2))
l2 = self.lineEdit_2.returnPressed.connect(lambda:self.login(s,shared_key))
def move_focus(self,lineEdit):
lineEdit.setFocus()
def return_ipconnect(self):
self.close()
ip_window = ipwindow()
ip_window.show()
def login(self,s,shared_key):
username = self.lineEdit.text()
if len(username) == 0:
QMessageBox.information(None,"报错信息","用户名不能为空",QMessageBox.StandardButton.Ok,QMessageBox.StandardButton.NoButton)
else:
password = self.lineEdit_2.text()
if len(password) == 0:
QMessageBox.information(None,"报错信息","密码不能为空",QMessageBox.StandardButton.Ok,QMessageBox.StandardButton.NoButton)
else:
Authentication(s,shared_key,username,password,self)
class mainwindow(QMainWindow,mainwindow):
def __init__(self,s,shared_key):
QMainWindow.__init__(self)
self.setupUi(self)
self.setWindowTitle("安全协议大作业-第六组")
self.current_number = 1
b1 = self.pushButton.clicked.connect(lambda:self.choose_mode(1,s,shared_key))
b2 = self.pushButton_2.clicked.connect(lambda:self.choose_mode(2,s,shared_key))
b3 = self.pushButton_3.clicked.connect(lambda:self.choose_mode(3,s,shared_key))
b4 = self.pushButton_4.clicked.connect(lambda:self.return_ipconnect())
b5 = self.pushButton_5.clicked.connect(lambda:self.handle_send(s,shared_key))
b6 = self.pushButton_6.clicked.connect(lambda:self.clear_text())
def return_ipconnect(self):
self.close()
ip_window = ipwindow()
ip_window.show()
def clear_text(self):
self.textEdit.clear()
def choose_mode(self,mode,s,shared_key):
self.current_number = mode
if mode == 1:
self.pushButton.setStyleSheet("background-color: grey;")
self.pushButton_2.setStyleSheet(u"background-image: url(:/F:/\u4f5c\u4e1a/python\u7a0b\u5e8f/1.jpg);")
self.pushButton_3.setStyleSheet(u"background-image: url(:/F:/\u4f5c\u4e1a/python\u7a0b\u5e8f/1.jpg);")
elif mode ==2:
self.pushButton_2.setStyleSheet("background-color: grey;")
self.pushButton_3.setStyleSheet(u"background-image: url(:/F:/\u4f5c\u4e1a/python\u7a0b\u5e8f/1.jpg);")
self.pushButton.setStyleSheet(u"background-image: url(:/F:/\u4f5c\u4e1a/python\u7a0b\u5e8f/1.jpg);")
elif mode ==3:
self.pushButton_3.setStyleSheet("background-color: grey;")
self.pushButton_2.setStyleSheet(u"background-image: url(:/F:/\u4f5c\u4e1a/python\u7a0b\u5e8f/1.jpg);")
self.pushButton.setStyleSheet(u"background-image: url(:/F:/\u4f5c\u4e1a/python\u7a0b\u5e8f/1.jpg);")
register(s,shared_key,self)
def handle_send(self,s,shared_key):
if self.current_number == 1:
transfer_txt(s,shared_key,self)
elif self.current_number == 2:
transfer_file(s,shared_key,self)
class barwindow(QMainWindow,barwindow):
def __init__(self,text,sliced_sum,transfer_data,s,main_window):
QMainWindow.__init__(self)
self.setupUi(self)
self.progressBar.setValue(0)
self.setWindowTitle("安全协议大作业-第六组")
l1 = self.label.setText(text)
b1 = self.pushButton_2.clicked.connect(lambda:self.start_transfer(sliced_sum,transfer_data,s,main_window))
b2 = self.pushButton.clicked.connect(lambda:self.end_transfer(s))
def start_transfer(self,sliced_sum,transfer_data,s,main_window):
for i in range(sliced_sum):
s.sendall(transfer_data[i])
time.sleep(0.06)
self.progressBar.setValue((i/(sliced_sum-1))*100)
QApplication.processEvents()
self.pushButton_2.setText("再次发送")
QMessageBox.information(None,"输出信息","信息发送成功",QMessageBox.StandardButton.Ok,QMessageBox.StandardButton.NoButton)
def end_transfer(self,s):
s.send(b"")
self.close()
class registerwindow(QMainWindow,registerwindow):
def __init__(self,s,shared_key,main_window):
QMainWindow.__init__(self)
self.setupUi(self)
self.setWindowTitle("安全协议大作业-第六组")
b1 = self.pushButton.clicked.connect(lambda:self.register(s,shared_key,main_window))
b2 = self.pushButton_2.clicked.connect(lambda:self.return_mainwindow())
def register(self,s,shared_key,main_window):
username = self.lineEdit.text()
password = self.lineEdit_2.text()
if len(username) == 0:
QMessageBox.information(None,"报错信息","用户名不能为空",QMessageBox.StandardButton.Ok,QMessageBox.StandardButton.NoButton)
return
else:
password = self.lineEdit_2.text()
if len(password) == 0:
QMessageBox.information(None,"报错信息","密码不能为空",QMessageBox.StandardButton.Ok,QMessageBox.StandardButton.NoButton)
return
data = username+";"+password
transfer_data = pack_data(3,data,shared_key,0,0)
s.send(transfer_data)
text = s.recv(1024).decode()
if "成功" in text:
QMessageBox.information(None,"输出信息","用户注册成功",QMessageBox.StandardButton.Ok,QMessageBox.StandardButton.NoButton)
elif "存在" in text:
QMessageBox.information(None,"报错信息","用户已存在",QMessageBox.StandardButton.Ok,QMessageBox.StandardButton.NoButton)
else:
QMessageBox.information(None,"报错信息",text,QMessageBox.StandardButton.Ok,QMessageBox.StandardButton.NoButton)
def return_mainwindow(self):
self.close()
def md5_encrypt(data): #md5加密 用于完整性检测
md5_hash = hashlib.md5()
if isinstance(data,str):
data = data.encode()
md5_hash.update(data)
hash_result = md5_hash.hexdigest()
return hash_result
def pack_data(choice,data,shared_key,sliced,sum): #将数据打包成报文 格式: sliced_judge(4bytes)+data_length(4bytes)+data_type(4bytes)+encrypted_data(Dynamic)+md5_data(32bytes)
sliced_sum = struct.pack('>H',sum)
sliced_num = struct.pack('>H',sliced)
choice_type = struct.pack('>I',choice)
en_data = encrypt_data(shared_key,data)
hash_data = md5_encrypt(data).encode()
temp = choice_type+en_data+hash_data
transfer_data = sliced_sum+sliced_num+struct.pack('>I',len(temp)+8)+temp
return transfer_data
def key_switch(ipaddr,ip_window): #交换公钥
# 固定的 DH 参数
p = 167369435262828772525424092470263277532300828423668354542113388987940885403967569624276882159444702063514952468172684099658558089467275283830276841486674376457626767287709752368374119769545320657500942952067517443133715688577345716588234520369470508343544601072272539124731608794518466242096737284389743744833
g = 2
parameters = dh.DHParameterNumbers(p, g).parameters(backend=default_backend())
private_key_client = parameters.generate_private_key()
public_key_client = private_key_client.public_key()
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
s.connect((ipaddr,6666))
except Exception as e:
QMessageBox.information(None,"报错信息",str(e),QMessageBox.StandardButton.Ok,QMessageBox.StandardButton.NoButton)
public_key_bytes = public_key_client.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
)
s.send(public_key_bytes)
public_key_server = s.recv(1024)
public_key_server = serialization.load_pem_public_key(public_key_server)
shared_key = private_key_client.exchange(public_key_server)
login_window = loginwindow(s,shared_key)
ip_window.close()
login_window.show()
def encrypt_data(shared_key,data): #aes加密
key = shared_key[:16]
aes = AES.new(key,AES.MODE_ECB)
if isinstance(data,str):
data = data.encode()
data_bytes = pad(data, AES.block_size)
encrypt_data = aes.encrypt(data_bytes)
return encrypt_data
def Authentication(s,shared_key,username,password,login_window):
data = username+";"+password
transfer_data = encrypt_data(shared_key,data) #使用aes加密账号密码
s.send(transfer_data)
text = (s.recv(1024)).decode()
if text == "你输入的用户名不存在 终止本次连接":
QMessageBox.information(None,"报错信息",text,QMessageBox.StandardButton.Ok,QMessageBox.StandardButton.NoButton)
s.close()
exit()
elif text == "密码错误,终止连接":
QMessageBox.information(None,"报错信息",text,QMessageBox.StandardButton.Ok,QMessageBox.StandardButton.NoButton)
s.close()
exit()
else:
main_window = mainwindow(s,shared_key)
login_window.close()
main_window.show()
def register(s,shared_key,main_window):
register_window = registerwindow(s,shared_key,main_window)
register_window.show()
def transfer_txt(s,shared_key,main_window):
text = main_window.textEdit.toPlainText()
if len(text) > 1024:
sliced = 0
chunk_size = 1024
sliced_data = slice_string(text, chunk_size)
sliced_sum = len(sliced_data)
for i in range(len(sliced_data)):
transfer_data = pack_data(1,sliced_data[i],shared_key,sliced,sliced_sum)
sliced += 1
s.send(transfer_data)
else :
sliced = 0
sliced_sum = 0
transfer_data = pack_data(1,text,shared_key,sliced,sliced_sum)
s.send(transfer_data)
main_window.textBrowser.append("文本数据传输成功")
def slice_string(data, chunk_size):
#根据指定大小切片字符串
return [data[i:i + chunk_size] for i in range(0, len(data), chunk_size)]
def transfer_file(s,shared_key,main_window):
file_addr = main_window.textEdit.toPlainText()
file_addr = file_addr.replace("file:///","")
with open(file_addr,'rb') as file:
text = file.read()
transfer_data = []
if len(text) > 1024:
sliced = 0
chunk_size = 1024
sliced_data = slice_string(text, chunk_size)
sliced_sum = len(sliced_data)
for i in range(len(sliced_data)):
sliced += 1
transfer_data.append(pack_data(2,sliced_data[i],shared_key,sliced,sliced_sum))
text = f"需要传输{sliced_sum}组数据"
bar_window = barwindow(text,sliced_sum,transfer_data,s,main_window)
bar_window.show()
else :
sliced = 0
sliced_sum = 0
transfer_data = pack_data(2,text,shared_key,sliced,sliced_sum)
text = f"需要传输{sliced_sum}组数据"
bar_window = barwindow(text,sliced_sum,transfer_data,s,main_window)
bar_window.show()
def main():
app = QApplication()
ip_window = ipwindow()
ip_window.show()
app.exec()
if __name__ == "__main__":
main()