自己看过网上的很多五子棋联机对战代码,但是都多多少少部署下来有问题。所以自己基于python写了一个程序,
实现功能
1.对战功能
支持双人对战,后续进入的人自动到旁观席位
2.聊天功能
支持发送文字聊天,且对局中旁观者的消息只能旁观者观看,棋手发的消息只能棋手看
3.作弊检测
PS:简单写写,只是检测非法移动和过快操作
检测到作弊者自动判定未作弊一方获胜并将作弊者踢出服务器并封禁10分钟(如需手动封禁请修改目录下的banned.json)
有bug一定要告诉我,之前写这个功能总有bug
4.支持查看在线用户
所有人能看到在线用户
5.可以看对局回放和聊天记录回放
代码开源:
import json
import tkinter as tk
from tkinter import filedialog, messagebox, Toplevel, Scrollbar, Text
import time
import os
from PIL import Image, ImageTk
import threading
class GomokuReplayViewer:
def __init__(self):
self.root = tk.Tk()
self.root.title("五子棋对局回放查看器")
self.root.geometry("1000x800")
self.menu_bar = tk.Menu(self.root)
self.file_menu = tk.Menu(self.menu_bar, tearoff=0)
self.file_menu.add_command(label="打开回放文件", command=self.open_replay_file)
self.file_menu.add_command(label="打开聊天记录", command=self.open_chat_log)
self.file_menu.add_separator()
self.file_menu.add_command(label="退出", command=self.root.quit)
self.menu_bar.add_cascade(label="文件", menu=self.file_menu)
self.help_menu = tk.Menu(self.menu_bar, tearoff=0)
self.help_menu.add_command(label="使用说明", command=self.show_help)
self.menu_bar.add_cascade(label="帮助", menu=self.help_menu)
self.root.config(menu=self.menu_bar)
self.main_frame = tk.PanedWindow(self.root, orient=tk.HORIZONTAL)
self.main_frame.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
self.left_frame = tk.Frame(self.main_frame)
self.main_frame.add(self.left_frame, width=600)
self.info_frame = tk.Frame(self.left_frame)
self.info_frame.pack(pady=10, fill=tk.X)
self.game_id_label = tk.Label(self.info_frame, text="对局ID: 未加载", font=("Arial", 10, "bold"))
self.game_id_label.pack(side=tk.LEFT, padx=10)
self.duration_label = tk.Label(self.info_frame, text="对局时长: 未加载")
self.duration_label.pack(side=tk.LEFT, padx=10)
self.winner_label = tk.Label(self.info_frame, text="获胜方: 未加载")
self.winner_label.pack(side=tk.LEFT, padx=10)
self.canvas_frame = tk.Frame(self.left_frame)
self.canvas_frame.pack(pady=10)
self.canvas = tk.Canvas(self.canvas_frame, width=450, height=450, bg="#E8C87E")
self.canvas.pack()
self.control_frame = tk.Frame(self.left_frame)
self.control_frame.pack(pady=10)
self.btn_first = tk.Button(self.control_frame, text="第一步", command=self.go_to_first)
self.btn_first.pack(side=tk.LEFT, padx=5)
self.btn_prev = tk.Button(self.control_frame, text="上一步", command=self.go_to_previous)
self.btn_prev.pack(side=tk.LEFT, padx=5)
self.btn_next = tk.Button(self.control_frame, text="下一步", command=self.go_to_next)
self.btn_next.pack(side=tk.LEFT, padx=5)
self.btn_last = tk.Button(self.control_frame, text="最后一步", command=self.go_to_last)
self.btn_last.pack(side=tk.LEFT, padx=5)
self.btn_play = tk.Button(self.control_frame, text="播放", command=self.toggle_play)
self.btn_play.pack(side=tk.LEFT, padx=5)
self.btn_help = tk.Button(self.control_frame, text="疑问", command=self.show_help)
self.btn_help.pack(side=tk.LEFT, padx=5)
self.progress_frame = tk.Frame(self.left_frame)
self.progress_frame.pack(fill=tk.X, padx=20, pady=5)
self.progress_label = tk.Label(self.progress_frame, text="步数: 0/0")
self.progress_label.pack()
self.progress_scale = tk.Scale(self.progress_frame, from_=0, to=0, orient=tk.HORIZONTAL,
command=self.on_progress_change, showvalue=False)
self.progress_scale.pack(fill=tk.X)
self.right_frame = tk.Frame(self.main_frame)
self.main_frame.add(self.right_frame, width=400)
self.detail_frame = tk.LabelFrame(self.right_frame, text="对局详情")
self.detail_frame.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
self.detail_text = Text(self.detail_frame, height=10, state=tk.DISABLED)
self.detail_text.pack(fill=tk.BOTH, expand=True)
self.chat_frame = tk.LabelFrame(self.right_frame, text="聊天记录")
self.chat_frame.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
self.chat_text = Text(self.chat_frame, height=15, state=tk.DISABLED)
self.chat_text.pack(fill=tk.BOTH, expand=True)
self.chat_control_frame = tk.Frame(self.right_frame)
self.chat_control_frame.pack(fill=tk.X, padx=5, pady=2)
self.chat_sync_var = tk.BooleanVar(value=True)
self.chat_sync_check = tk.Checkbutton(self.chat_control_frame, text="同步聊天记录",
variable=self.chat_sync_var)
self.chat_sync_check.pack(side=tk.LEFT)
self.status_bar = tk.Label(self.root, text="就绪", relief=tk.SUNKEN, anchor=tk.W)
self.status_bar.pack(side=tk.BOTTOM, fill=tk.X)
self.replay_data = None
self.chat_data = None
self.current_step = 0
self.total_steps = 0
self.playing = False
self.play_delay = 1.0 # 每秒一步
self.cell_size = 30
self.margin = 20
self.draw_board()
self.root.protocol("WM_DELETE_WINDOW", self.on_closing)
self.root.mainloop()
def draw_board(self):
self.canvas.delete("all")
for i in range(15):
self.canvas.create_line(
self.margin,
self.margin + i * self.cell_size,
self.margin + 14 * self.cell_size,
self.margin + i * self.cell_size
)
self.canvas.create_line(
self.margin + i * self.cell_size,
self.margin,
self.margin + i * self.cell_size,
self.margin + 14 * self.cell_size
)
star_points = [(3, 3), (3, 11), (7, 7), (11, 3), (11, 11)]
for x, y in star_points:
self.canvas.create_oval(
self.margin + y * self.cell_size - 3,
self.margin + x * self.cell_size - 3,
self.margin + y * self.cell_size + 3,
self.margin + x * self.cell_size + 3,
fill="black"
)
def open_replay_file(self):
file_path = filedialog.askopenfilename(
title="选择回放文件",
filetypes=[("JSON文件", "*.json"), ("所有文件", "*.*")],
initialdir="replays" if os.path.exists("replays") else "."
)
if not file_path:
return
try:
with open(file_path, 'r', encoding='utf-8') as f:
self.replay_data = json.load(f)
self.game_id_label.config(text=f"对局ID: {self.replay_data.get('game_id', '未知')}")
start_time = self.replay_data.get('start_time', 0)
end_time = self.replay_data.get('end_time', 0)
duration = end_time - start_time
minutes = int(duration // 60)
seconds = int(duration % 60)
self.duration_label.config(text=f"对局时长: {minutes}分{seconds}秒")
winner = self.replay_data.get('winner', '未知')
self.winner_label.config(text=f"获胜方: {winner}")
self.total_steps = len(self.replay_data.get('moves', []))
self.current_step = 0
self.progress_scale.config(to=self.total_steps)
self.update_progress()
self.update_detail_text()
game_id = self.replay_data.get('game_id')
if game_id:
chat_path = os.path.join("chat_logs", f"{game_id}.json")
if os.path.exists(chat_path):
self.load_chat_log(chat_path)
self.draw_current_step()
self.status_bar.config(text=f"已加载回放文件: {os.path.basename(file_path)}")
except Exception as e:
messagebox.showerror("错误", f"无法加载回放文件: {e}")
self.status_bar.config(text="加载回放文件失败")
def open_chat_log(self):
"""打开聊天记录文件"""
file_path = filedialog.askopenfilename(
title="选择聊天记录文件",
filetypes=[("JSON文件", "*.json"), ("所有文件", "*.*")],
initialdir="chat_logs" if os.path.exists("chat_logs") else "."
)
if not file_path:
return
self.load_chat_log(file_path)
def load_chat_log(self, file_path):
"""加载聊天记录文件"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
self.chat_data = json.load(f)
self.update_chat_display()
self.status_bar.config(text=f"已加载聊天记录: {os.path.basename(file_path)}")
except Exception as e:
messagebox.showerror("错误", f"无法加载聊天记录文件: {e}")
self.status_bar.config(text="加载聊天记录失败")
def draw_current_step(self):
"""绘制当前步数的棋盘状态"""
self.draw_board()
for i in range(self.current_step):
move = self.replay_data['moves']
color = "black" if move["piece"] == 'B' else "white"
self.canvas.create_oval(
self.margin + move["y"] * self.cell_size - 13,
self.margin + move["x"] * self.cell_size - 13,
self.margin + move["y"] * self.cell_size + 13,
self.margin + move["x"] * self.cell_size + 13,
fill=color, outline="black"
)
if self.current_step > 0:
move = self.replay_data['moves'][self.current_step - 1]
self.canvas.create_oval(
self.margin + move["y"] * self.cell_size - 16,
self.margin + move["x"] * self.cell_size - 16,
self.margin + move["y"] * self.cell_size + 16,
self.margin + move["x"] * self.cell_size + 16,
outline="red", width=2
)
def update_progress(self):
"""更新进度显示"""
self.progress_label.config(text=f"步数: {self.current_step}/{self.total_steps}")
self.progress_scale.set(self.current_step)
def update_detail_text(self):
self.detail_text.config(state=tk.NORMAL)
self.detail_text.delete(1.0, tk.END)
if self.replay_data:
self.detail_text.insert(tk.END, f"对局ID: {self.replay_data.get('game_id', '未知')}\n")
self.detail_text.insert(tk.END, f"棋盘大小: {self.replay_data.get('board_size', 15)}x{self.replay_data.get('board_size', 15)}\n")
start_time = self.replay_data.get('start_time', 0)
end_time = self.replay_data.get('end_time', 0)
if start_time and end_time:
start_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(start_time))
end_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(end_time))
duration = end_time - start_time
minutes = int(duration // 60)
seconds = int(duration % 60)
self.detail_text.insert(tk.END, f"开始时间: {start_str}\n")
self.detail_text.insert(tk.END, f"结束时间: {end_str}\n")
self.detail_text.insert(tk.END, f"对局时长: {minutes}分{seconds}秒\n")
self.detail_text.insert(tk.END, f"获胜方: {self.replay_data.get('winner', '未知')}\n\n")
if self.current_step > 0:
move = self.replay_data['moves'][self.current_step - 1]
self.detail_text.insert(tk.END, f"当前步数: {self.current_step}\n")
self.detail_text.insert(tk.END, f"玩家: {move['username']}\n")
self.detail_text.insert(tk.END, f"棋子: {'黑棋' if move['piece'] == 'B' else '白棋'}\n")
self.detail_text.insert(tk.END, f"位置: ({move['x']}, {move['y']})\n")
move_time = move.get('timestamp', 0)
if move_time:
time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(move_time))
self.detail_text.insert(tk.END, f"时间: {time_str}\n")
self.detail_text.config(state=tk.DISABLED)
def update_chat_display(self):
self.chat_text.config(state=tk.NORMAL)
self.chat_text.delete(1.0, tk.END)
if self.chat_data:
chats = self.chat_data.get('chats', [])
for chat in chats:
username = chat.get('username', '未知用户')
role = chat.get('role', '未知身份')
message = chat.get('message', '')
timestamp = chat.get('timestamp', 0)
if timestamp:
time_str = time.strftime("%H:%M:%S", time.localtime(timestamp))
self.chat_text.insert(tk.END, f"[{time_str}] {username}({role}): {message}\n")
else:
self.chat_text.insert(tk.END, f"{username}({role}): {message}\n")
self.chat_text.config(state=tk.DISABLED)
self.chat_text.see(tk.END)
def update_chat_by_time(self, current_time):
if not self.chat_data or not self.chat_sync_var.get():
return
self.chat_text.config(state=tk.NORMAL)
self.chat_text.delete(1.0, tk.END)
chats = self.chat_data.get('chats', [])
for chat in chats:
chat_time = chat.get('timestamp', 0)
if chat_time 0:
move = self.replay_data['moves'][self.current_step - 1]
move_time = move.get('timestamp', 0)
self.update_chat_by_time(move_time)
def go_to_previous(self):
"""上一步"""
if self.replay_data and self.current_step > 0:
self.current_step -= 1
self.draw_current_step()
self.update_progress()
self.update_detail_text()
if self.current_step > 0:
move = self.replay_data['moves'][self.current_step - 1]
move_time = move.get('timestamp', 0)
self.update_chat_by_time(move_time)
def go_to_next(self):
"""下一步"""
if self.replay_data and self.current_step 0:
move = self.replay_data['moves'][self.current_step - 1]
move_time = move.get('timestamp', 0)
self.update_chat_by_time(move_time)
def go_to_last(self):
"""跳到最后一步"""
if self.replay_data:
self.current_step = self.total_steps
self.draw_current_step()
self.update_progress()
self.update_detail_text()
if self.current_step > 0:
move = self.replay_data['moves'][self.current_step - 1]
move_time = move.get('timestamp', 0)
self.update_chat_by_time(move_time)
def on_progress_change(self, value):
"""进度条变化事件"""
if self.replay_data:
step = int(float(value))
if step != self.current_step:
self.current_step = step
self.draw_current_step()
self.update_progress()
self.update_detail_text()
if self.current_step > 0:
move = self.replay_data['moves'][self.current_step - 1]
move_time = move.get('timestamp', 0)
self.update_chat_by_time(move_time)
def toggle_play(self):
"""切换播放状态"""
if not self.replay_data:
return
self.playing = not self.playing
if self.playing:
self.btn_play.config(text="暂停")
self.play_animation()
else:
self.btn_play.config(text="播放")
def play_animation(self):
"""播放动画"""
if not self.playing or not self.replay_data:
return
if self.current_step 0:
move = self.replay_data['moves'][self.current_step - 1]
move_time = move.get('timestamp', 0)
self.update_chat_by_time(move_time)
self.root.after(int(self.play_delay * 1000), self.play_animation)
else:
self.playing = False
self.btn_play.config(text="播放")
def show_help(self):
"""显示使用说明"""
help_window = Toplevel(self.root)
help_window.title("使用说明")
help_window.geometry("600x400")
help_text = Text(help_window, wrap=tk.WORD, padx=10, pady=10)
help_text.pack(fill=tk.BOTH, expand=True)
help_content = """
五子棋对局回放查看器使用说明
1. 打开回放文件
- 点击菜单栏的"文件" -> "打开回放文件"
- 选择要查看的回放JSON文件
- 程序会自动尝试加载对应的聊天记录
2. 打开聊天记录
- 如果需要单独加载聊天记录,点击"文件" -> "打开聊天记录"
- 选择对应的聊天记录JSON文件
3. 控制播放
- 使用"第一步"、"上一步"、"下一步"、"最后一步"按钮控制播放进度
- 使用进度条快速跳转到指定步数
- 点击"播放"按钮自动播放整个对局
4. 聊天记录同步
- 勾选"同步聊天记录"可以在播放对局时同步显示聊天内容
- 取消勾选则显示所有聊天记录
5. 查看详细信息
- 右侧面板显示对局详细信息和聊天记录
- 对局信息包括步数、玩家、棋子类型、位置和时间
- 聊天记录显示对局过程中的所有聊天内容
6. 获取回放文件
- 回放文件需要向管理员申请获取
- 回放文件保存在服务器的replays目录中
- 聊天记录保存在服务器的chat_logs目录中
注意事项:
- 确保回放文件和聊天记录文件来自同一局游戏
- 回放文件格式为JSON,包含对局的每一步信息
- 聊天记录文件格式为JSON,包含对局过程中的所有聊天内容
- 如果遇到问题,请联系管理员获取帮助
"""
help_text.insert(1.0, help_content)
help_text.config(state=tk.DISABLED)
close_button = tk.Button(help_window, text="关闭", command=help_window.destroy)
close_button.pack(pady=10)
def on_closing(self):
"""关闭窗口时的处理"""
self.playing = False
self.root.destroy()
if __name__ == "__main__":
app = GomokuReplayViewer()
回放文件格式:
{
"game_id": "20250916_193931",
"start_time": 1758022774.6713629,
"end_time": 1758022791.628326,
"winner": "1",
"moves": [
{
"x": 11,
"y": 13,
"piece": "B",
"username": "1",
"timestamp": 1758022774.6713629
},
{
"x": 8,
"y": 3,
"piece": "W",
"username": "2",
"timestamp": 1758022775.1263444
}
}
聊天文件json参考格式:
{
"game_id": "20230915_143022",
"chats": [
{
"username": "玩家A",
"role": "黑棋",
"message": "大家好!",
"timestamp": 1694773820.123456,
"audience": "all"
},
{
"username": "玩家B",
"role": "白棋",
"message": "请多指教",
"timestamp": 1694773821.123456,
"audience": "all"
}
]
}
代码开源
服务器端(server.py):
import socket
import threading
import json
import time
import os
from enum import Enum
from datetime import datetime
class PlayerRole(Enum):
BLACK = 1
WHITE = 2
SPECTATOR = 3
class GomokuServer:
def __init__(self, host='localhost', port=8888):
self.host = host
self.port = port
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.clients = {}
self.players = {}
self.spectators = []
self.board = [[' ' for _ in range(15)] for _ in range(15)]
self.current_turn = PlayerRole.BLACK
self.game_started = False
self.move_history = []
self.chat_history = []
self.game_id = None
self.lock = threading.Lock()
self.user_counter = 0
self.banned_ips = self.load_banned_ips()
self.usernames = set()
self.last_move_time = {}
if not os.path.exists("replays"):
os.makedirs("replays")
if not os.path.exists("chat_logs"):
os.makedirs("chat_logs")
def load_banned_ips(self):
try:
if os.path.exists("banned.json"):
with open("banned.json", "r") as f:
banned_data = json.load(f)
if isinstance(banned_data, list):
return banned_data
else:
return list(banned_data.keys())
else:
with open("banned.json", "w") as f:
json.dump([], f)
return []
except Exception as e:
print(f"加载封禁列表失败: {e}")
return []
def save_banned_ips(self):
try:
with open("banned.json", "w") as f:
json.dump(self.banned_ips, f)
except Exception as e:
print(f"保存封禁列表失败: {e}")
def is_ip_banned(self, ip):
return ip in self.banned_ips
def ban_ip(self, ip, duration_minutes=10):
if ip not in self.banned_ips:
self.banned_ips.append(ip)
self.save_banned_ips()
print(f"已封禁IP: {ip}, 时长: {duration_minutes}分钟")
timer = threading.Timer(duration_minutes * 60, self.unban_ip, [ip])
timer.daemon = True
timer.start()
def unban_ip(self, ip):
if ip in self.banned_ips:
self.banned_ips.remove(ip)
self.save_banned_ips()
print(f"已解封IP: {ip}")
def start(self):
self.server_socket.bind((self.host, self.port))
self.server_socket.listen(5)
print(f"服务器已启动,监听地址: {self.host}:{self.port}")
while True:
client_socket, addr = self.server_socket.accept()
client_ip = addr[0]
if self.is_ip_banned(client_ip):
print(f"拒绝被封禁IP的连接: {client_ip}")
try:
ban_msg = {"type": "banned", "message": "您的IP已被封禁,无法连接服务器"}
client_socket.send(json.dumps(ban_msg).encode())
time.sleep(1)
except:
pass
client_socket.close()
continue
print(f"新连接: {addr}")
client_handler = threading.Thread(target=self.handle_client, args=(client_socket, addr))
client_handler.daemon = True
client_handler.start()
def handle_client(self, client_socket, addr):
client_ip = addr[0]
try:
data = client_socket.recv(1024).decode()
login_info = json.loads(data)
if login_info["type"] != "login" or "username" not in login_info:
client_socket.send(json.dumps({"type": "error", "message": "请先发送用户名"}).encode())
client_socket.close()
return
username = login_info["username"]
is_admin = login_info.get("is_admin", False)
with self.lock:
if username in self.usernames:
client_socket.send(json.dumps({"type": "error", "message": "用户名已存在,请选择其他用户名"}).encode())
client_socket.close()
return
self.usernames.add(username)
user_id = f"user_{self.user_counter}"
self.user_counter += 1
with self.lock:
self.clients[client_socket] = {
"username": username,
"user_id": user_id,
"role": None,
"address": client_ip,
"is_admin": is_admin
}
with self.lock:
if len(self.players) = 5:
return True
return False
def reset_game(self):
self.board = [[' ' for _ in range(15)] for _ in range(15)]
self.current_turn = PlayerRole.BLACK
self.game_started = False
self.move_history = []
self.chat_history = []
self.game_id = None
self.broadcast({"type": "board", "board": self.board}, include_spectators=True)
if __name__ == "__main__":
server = GomokuServer()
server.start()
客户端代码(user-client.py):
import socket
import threading
import json
import tkinter as tk
from tkinter import simpledialog, messagebox, scrolledtext
import time
class GomokuUserClient:
def __init__(self):
self.root = tk.Tk()
self.root.title("五子棋用户端")
self.root.geometry("700x800")
self.frame_connect = tk.Frame(self.root)
self.frame_connect.pack(pady=10)
tk.Label(self.frame_connect, text="服务器地址:").grid(row=0, column=0)
self.entry_host = tk.Entry(self.frame_connect, width=15)
self.entry_host.insert(0, "localhost")
self.entry_host.grid(row=0, column=1)
tk.Label(self.frame_connect, text="端口:").grid(row=0, column=2)
self.entry_port = tk.Entry(self.frame_connect, width=8)
self.entry_port.insert(0, "8888")
self.entry_port.grid(row=0, column=3)
self.btn_connect = tk.Button(self.frame_connect, text="连接", command=self.connect_server)
self.btn_connect.grid(row=0, column=4, padx=5)
self.user_listbox = tk.Listbox(self.root, width=20, height=10)
self.user_listbox.pack(side=tk.RIGHT, fill=tk.Y, padx=5, pady=5)
tk.Label(self.root, text="在线用户").pack(side=tk.RIGHT)
self.chat_area = scrolledtext.ScrolledText(self.root, height=10, state='disabled')
self.chat_area.pack(pady=5, padx=10, fill=tk.BOTH)
self.frame_chat = tk.Frame(self.root)
self.frame_chat.pack(pady=5, fill=tk.X)
self.entry_chat = tk.Entry(self.frame_chat)
self.entry_chat.pack(side=tk.LEFT, padx=5, fill=tk.X, expand=True)
self.entry_chat.bind("", self.send_chat)
self.btn_send = tk.Button(self.frame_chat, text="发送", command=self.send_chat)
self.btn_send.pack(side=tk.RIGHT, padx=5)
self.canvas = tk.Canvas(self.root, width=450, height=450, bg="#E8C87E")
self.canvas.pack(pady=10)
self.control_frame = tk.Frame(self.root)
self.control_frame.pack(pady=5)
self.btn_replay = tk.Button(self.control_frame, text="查看回放", command=self.show_replay)
self.btn_replay.pack(side=tk.LEFT, padx=5)
self.btn_refresh = tk.Button(self.control_frame, text="刷新用户", command=self.refresh_user_list)
self.btn_refresh.pack(side=tk.LEFT, padx=5)
self.status = tk.Label(self.root, text="未连接", relief=tk.SUNKEN, anchor=tk.W)
self.status.pack(side=tk.BOTTOM, fill=tk.X)
self.socket = None
self.username = None
self.role = None
self.board = [[' ' for _ in range(15)] for _ in range(15)]
self.cell_size = 30
self.margin = 20
self.users = {}
self.move_history = []
self.replay_mode = False
self.replay_index = 0
self.draw_board()
self.root.protocol("WM_DELETE_WINDOW", self.on_closing)
self.root.mainloop()
def draw_board(self):
self.canvas.delete("all")
for i in range(15):
self.canvas.create_line(
self.margin,
self.margin + i * self.cell_size,
self.margin + 14 * self.cell_size,
self.margin + i * self.cell_size
)
self.canvas.create_line(
self.margin + i * self.cell_size,
self.margin,
self.margin + i * self.cell_size,
self.margin + 14 * self.cell_size
)
for i in range(15):
for j in range(15):
if self.board[j] != ' ':
color = "black" if self.board[j] == 'B' else "white"
self.canvas.create_oval(
self.margin + j * self.cell_size - 13,
self.margin + i * self.cell_size - 13,
self.margin + j * self.cell_size + 13,
self.margin + i * self.cell_size + 13,
fill=color, outline="black"
)
self.canvas.bind("[B]", self.on_click)
def on_click(self, event):
if not self.socket or self.role == "SPECTATOR" or self.replay_mode:
return
x = event.x - self.margin
y = event.y - self.margin
if x = len(self.move_history):
step = len(self.move_history) - 1
self.replay_index = step
self.update_replay_display(canvas)
def update_replay_display(self, canvas, info_label=None):
canvas.delete("pieces")
for i in range(self.replay_index + 1):
move = self.move_history
color = "black" if move["piece"] == 'B' else "white"
canvas.create_oval(
self.margin + move["y"] * self.cell_size - 13,
self.margin + move["x"] * self.cell_size - 13,
self.margin + move["y"] * self.cell_size + 13,
self.margin + move["x"] * self.cell_size + 13,
fill=color, outline="black", tags="pieces"
)
if info_label and self.replay_index
提示
1.项目已在GitHub开源
链接:https://github.com/PTlongling/pygomoku/
2.由于项目采用socket发包处理,所以断开连接后无提示但出现WinError,属正常现象
3.目前服务器限制禁止同名,此时会出现等待服务器分配角色提示,请重启程序链接换一个名字
4.服务器端端口默认开放在8888,可以通过改代码换开放端口
5.有bug记得找我反馈