微博热搜公众号推送

查看 26|回复 0
作者:没有结果lin   
[Python] 纯文本查看 复制代码import csv
import requests
import logging
import json
from datetime import datetime
import time
# Initialize logging
logging.basicConfig(level=logging.INFO)
def fetch_hot_searches():
    url = "https://weibo.com/ajax/statuses/hot_band"
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36 Edg/111.0.1661.62'
    }
    try:
        response = requests.get(url, headers=headers)
        response.raise_for_status()  # Raises HTTPError for bad responses
        data = response.json()["data"]["band_list"]
        hot = []
        count = 0
        for item in data:
            count += 1
            note = item.get('note', 'Unknown')
            category = item.get('category', 'Unknown')
            raw_hot = item.get('raw_hot', 'Unknown')
            hot_url = f'https://s.weibo.com/weibo?q={note}&Refer=index'
            onboard_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
            hot.append([count, note, category, raw_hot, hot_url, onboard_time])
        with open('weibo_hot.csv', 'w', newline='', encoding="utf-8") as file:
            writer = csv.writer(file)
            writer.writerow(["id", "note", "category", "raw_hot", "url"])
            # 遍历列表中的每个元素,写入 CSV 文件的单独一行中
            for item in hot:
                writer.writerow(item, )
        send_msg("热搜实时榜单", str(hot).replace("[","").replace("]","").replace("'",""))
   
    except requests.RequestException as e:
        logging.error(f"Request failed: {e}")
    except KeyError as e:
        logging.error(f"Key error: {e}")
    except json.JSONDecodeError as e:
        logging.error(f"JSON decode error: {e}")
def send_msg(head, body):
    logging.info('----------start send_msg')
    url = 'http://push.ijingniu.cn/send'
    data = {
        'key': 'XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX',  # Replace with your actual key
        'head': head,
        'body': body
    }
    try:
        response = requests.post(url=url, data=data)
        response.raise_for_status()
        logging.info(response.text)
        if response.ok:
            return json.loads(response.text)['status']
        else:
            return False
    except requests.RequestException as e:
        logging.error(f"Request failed: {e}")
        return False
    except json.JSONDecodeError as e:
        logging.error(f"JSON decode error: {e}")
        return False
while True:
    fetch_hot_searches()
    time.sleep(3600)  # 1个小时推送一次
#fetch_hot_searches()
上面代码替换自己的秘钥


image.png (24.75 KB, 下载次数: 0)
下载附件
2024-9-4 12:14 上传



image.png (3.77 KB, 下载次数: 0)
下载附件
2024-9-4 12:11 上传



ebbb833c111cc09ad6de04f91093618.jpg (473.21 KB, 下载次数: 0)
下载附件
2024-9-4 12:13 上传

下载次数, 下载附件

您需要登录后才可以回帖 登录 | 立即注册

返回顶部