TXT关键字搜索(支持G级大文本)

查看 96|回复 10
作者:鼠八爷   
基于论坛大佬的VB项目的关键词搜索,由于编码问题。自己重新用python重构了一下,支持GB超级大文本。
大佬的帖子:大佬帖子
测试图下


QQ20250423-055352.png (99.14 KB, 下载次数: 0)
下载附件
2025-4-27 12:15 上传

[Python] 纯文本查看 复制代码import tkinter as tk
from tkinter import ttk, filedialog, messagebox, simpledialog
from tkinter.scrolledtext import ScrolledText
import threading
import time
import os
import platform
from queue import Queue
class KeywordSearchApp:
    def __init__(self, root):
        self.root = root
        self.root.title("关键词搜索工具")
        self.root.geometry("800x800")
        self.setup_ui()
        # 初始化变量
        self.keywords = []
        self.files = []
        self.results = []
        self.stop_requested = False
        self.total_lines = 0
        self.start_time = 0
        self.update_queue = Queue()
        self.running = False
        # 启动界面更新循环
        self.root.after(100, self.process_updates)
    def setup_ui(self):
        # 顶部按钮区域
        control_frame = ttk.Frame(self.root)
        control_frame.pack(pady=5, fill=tk.X)
        buttons = [
            ("导入关键词", self.import_keywords),
            ("添加关键词", self.add_keyword),
            ("导入文档", self.import_files),
            ("开始搜索", self.start_search),
            ("停止搜索", self.stop_search),
            ("导出文本", self.export_results)
        ]
        for text, cmd in buttons:
            btn = ttk.Button(control_frame, text=text, command=cmd)
            btn.pack(side=tk.LEFT, padx=2)
        # 选项区域
        options_frame = ttk.Frame(self.root)
        options_frame.pack(pady=5, fill=tk.X)
        self.source_check = tk.IntVar()
        self.dup_check = tk.IntVar()
        ttk.Checkbutton(options_frame, text="标记出处", variable=self.source_check).pack(side=tk.LEFT)
        ttk.Checkbutton(options_frame, text="数据去重", variable=self.dup_check).pack(side=tk.LEFT)
        # 主内容区域
        main_frame = ttk.Frame(self.root)
        main_frame.pack(fill=tk.BOTH, expand=True)
        # 关键词列表(左侧)
        keyword_frame = ttk.LabelFrame(main_frame, text="关键词列表")
        keyword_frame.pack(side=tk.LEFT, fill=tk.Y, padx=5)
        self.keyword_list = tk.Listbox(
            keyword_frame,
            width=20,
            selectmode=tk.EXTENDED,
            activestyle='none'
        )
        self.keyword_list.pack(fill=tk.BOTH, expand=True)
        # 文件列表和结果(右侧)
        right_frame = ttk.Frame(main_frame)
        right_frame.pack(side=tk.RIGHT, fill=tk.BOTH, expand=True)
        # 文件列表(添加右键菜单)
        file_frame = ttk.LabelFrame(right_frame, text="文档列表")
        file_frame.pack(fill=tk.X, padx=5, pady=5)
        self.tree = ttk.Treeview(
            file_frame,
            columns=("name", "path", "status"),
            show="headings",
            selectmode="browse"
        )
        self.tree.heading("name", text="文件名")
        self.tree.heading("path", text="文件路径")
        self.tree.heading("status", text="状态")
        self.tree.column("name", width=150, anchor=tk.W)
        self.tree.column("path", width=300, anchor=tk.W)
        self.tree.column("status", width=80, anchor=tk.CENTER)
        vsb = ttk.Scrollbar(file_frame, orient="vertical", command=self.tree.yview)
        self.tree.configure(yscrollcommand=vsb.set)
        self.tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
        vsb.pack(side=tk.RIGHT, fill=tk.Y)
        # 绑定右键菜单
        self.tree.bind("[B]", self.on_right_click)
        self.context_menu = tk.Menu(self.root, tearoff=0)
        self.context_menu.add_command(label="打开文件位置", command=self.open_file_location)
        self.context_menu.add_command(label="删除选中项", command=self.delete_selected_item)
        self.context_menu.add_separator()
        self.context_menu.add_command(label="清空列表", command=self.clear_list)
        # 搜索结果
        result_frame = ttk.LabelFrame(right_frame, text="输出结果")
        result_frame.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
        self.result_text = ScrolledText(
            result_frame,
            wrap=tk.WORD,
            font=('Consolas', 10)
        )
        self.result_text.pack(fill=tk.BOTH, expand=True)
        # 状态栏
        self.status_vars = {
            "keywords": tk.StringVar(value="0 个"),
            "files": tk.StringVar(value="0 个"),
            "lines": tk.StringVar(value="0 行"),
            "results": tk.StringVar(value="0 条"),
            "time": tk.StringVar(value="0.0 秒")
        }
        status_bar = ttk.Frame(self.root)
        status_bar.pack(fill=tk.X, pady=2)
        status_labels = [
            ("关键词", "keywords"),
            ("处理文档", "files"),
            ("分析行数", "lines"),
            ("搜索结果", "results"),
            ("耗时", "time")
        ]
        for text, key in status_labels:
            frame = ttk.Frame(status_bar)
            frame.pack(side=tk.LEFT, padx=8)
            ttk.Label(frame, text=f"{text}:").pack(side=tk.LEFT)
            ttk.Label(frame, textvariable=self.status_vars[key]).pack(side=tk.LEFT)
    def on_right_click(self, event):
        """右键点击事件处理"""
        item = self.tree.identify_row(event.y)
        if item:
            self.tree.selection_set(item)
            self.context_menu.post(event.x_root, event.y_root)
    def open_file_location(self):
        """打开文件所在位置"""
        selected = self.tree.selection()
        if not selected:
            return
        item = selected[0]
        path = self.tree.item(item, "values")[1]
        try:
            if platform.system() == "Windows":
                os.startfile(os.path.dirname(path))
            elif platform.system() == "Darwin":
                os.system(f'open "{os.path.dirname(path)}"')
            else:
                os.system(f'xdg-open "{os.path.dirname(path)}"')
        except Exception as e:
            messagebox.showerror("错误", f"无法打开文件位置:{str(e)}")
    def delete_selected_item(self):
        """删除选中项"""
        selected = self.tree.selection()
        if not selected:
            return
        for item in selected:
            self.tree.delete(item)
        self.status_vars["files"].set(f"{len(self.tree.get_children())} 个")
    def clear_list(self):
        """清空文档列表"""
        if messagebox.askyesno("确认", "确定要清空所有文档吗?"):
            for item in self.tree.get_children():
                self.tree.delete(item)
            self.status_vars["files"].set("0 个")
    # 以下其他方法保持不变(保持原有功能)
    def process_updates(self):
        while not self.update_queue.empty():
            update_type, data = self.update_queue.get()
            if update_type == "status":
                self.tree.item(data[0], values=(data[1], data[2], data[3]))
            elif update_type == "result":
                self.result_text.insert(tk.END, data + "\n")
                self.result_text.see(tk.END)
            elif update_type == "progress":
                self.status_vars["lines"].set(f"{data} 行")
            elif update_type == "time":
                elapsed = time.time() - self.start_time
                self.status_vars["time"].set(f"{elapsed:.1f} 秒")
            elif update_type == "results":
                self.status_vars["results"].set(f"{data} 条")
        self.root.after(100, self.process_updates)
    def import_keywords(self):
        files = filedialog.askopenfilenames(filetypes=[("文本文件", "*.txt")])
        if files:
            for f in files:
                with open(f, "r", encoding="utf-8") as fd:
                    for line in fd:
                        keyword = line.strip()
                        if keyword and keyword not in self.keyword_list.get(0, tk.END):
                            self.keyword_list.insert(tk.END, keyword)
            self.status_vars["keywords"].set(f"{self.keyword_list.size()} 个")
    def add_keyword(self):
        keyword = simpledialog.askstring("添加关键词", "请输入关键词:")
        if keyword and keyword not in self.keyword_list.get(0, tk.END):
            self.keyword_list.insert(tk.END, keyword)
            self.status_vars["keywords"].set(f"{self.keyword_list.size()} 个")
    def import_files(self):
        files = filedialog.askopenfilenames(filetypes=[("文本文件", "*.txt")])
        for f in files:
            self.tree.insert("", tk.END, values=(os.path.basename(f), f, "等待中"))
        self.status_vars["files"].set(f"{len(self.tree.get_children())} 个")
    def start_search(self):
        if self.running:
            return
        selected = [self.keyword_list.get(i) for i in self.keyword_list.curselection()]
        if not selected:
            messagebox.showwarning("警告", "请先选择关键词!")
            return
        self.results.clear()
        self.result_text.delete(1.0, tk.END)
        self.stop_requested = False
        self.total_lines = 0
        self.start_time = time.time()
        self.running = True
        self.status_vars["results"].set("0 条")
        threading.Thread(
            target=self.search_files,
            args=(selected,),
            daemon=True
        ).start()
    def search_files(self, keywords):
        last_update_time = time.time()
        total_results = 0
        for idx, item in enumerate(self.tree.get_children()):
            if self.stop_requested:
                break
            values = self.tree.item(item, "values")
            path = values[1]
            self.update_queue.put(("status", (item, os.path.basename(path), path, "处理中")))
            try:
                with open(path, "r", encoding="utf-8", errors='ignore') as f:
                    for line_num, line in enumerate(f, 1):
                        if self.stop_requested:
                            break
                        self.total_lines += 1
                        if self.total_lines % 100 == 0:
                            self.update_queue.put(("progress", self.total_lines))
                        if time.time() - last_update_time >= 1:
                            self.update_queue.put(("time", None))
                            last_update_time = time.time()
                        for kw in keywords:
                            if kw in line:
                                result = f"[{os.path.basename(path)}] 第{line_num}行: {line.strip()}" \
                                    if self.source_check.get() else line.strip()
                                self.results.append(result)
                                total_results += 1
                                self.update_queue.put(("results", total_results))
                                self.update_queue.put(("result", result))
                status = "完成" if not self.stop_requested else "已停止"
                self.update_queue.put(("status", (item, os.path.basename(path), path, status)))
            except Exception as e:
                self.update_queue.put(("status", (item, os.path.basename(path), path, "错误")))
        self.update_queue.put(("progress", self.total_lines))
        self.update_queue.put(("time", None))
        self.running = False
    def stop_search(self):
        self.stop_requested = True
        messagebox.showinfo("提示", "正在停止搜索...")
    def export_results(self):
        if not self.results:
            messagebox.showwarning("警告", "没有可导出的结果!")
            return
        file_path = filedialog.asksaveasfilename(
            defaultextension=".txt",
            filetypes=[("文本文件", "*.txt")]
        )
        if file_path:
            try:
                data = list(set(self.results)) if self.dup_check.get() else self.results
                with open(file_path, "w", encoding="utf-8") as f:
                    f.write("\n".join(data))
                messagebox.showinfo("成功", f"成功导出 {len(data)} 条结果!")
            except Exception as e:
                messagebox.showerror("错误", f"导出失败:{str(e)}")
if __name__ == "__main__":
    root = tk.Tk()
    app = KeywordSearchApp(root)
    root.mainloop()
打包成程序了,
https://wwen.lanzout.com/iOrJG2uowdeb
兄弟们,我想上传头像,能不能点点赞啥的

关键词, 大佬

鼠八爷
OP
  


天真Aro 发表于 2025-4-27 20:45
对的,大概12e行的数据

一个文本吗?还是多个。我测试的是将近四亿行数据,不过是七个文本,一个文本最少5000w行的数据。
ismesge   

兄弟 ,你这号注册有十年了,才一个星星?
鼠八爷
OP
  


ismesge 发表于 2025-4-27 13:44
兄弟 ,你这号注册有十年了,才一个星星?

还有个更早的,12年的忘记账号密码了,。
27149   

好奇问下,“大佬帖子”在哪?
Redbell   

可以搞个开源项目了,然后弯道超车
EEsama   

感谢分享 看一下
whtaoo   

给你加分。。。。
xiaoyang4788   


鼠八爷 发表于 2025-4-27 13:46
还有个更早的,12年的忘记账号密码了,。

和你一样,更早的号忘记密码了。。。
tb612443   

感谢分享,学习一下
您需要登录后才可以回帖 登录 | 立即注册

返回顶部