互动
最近评论

app.py

import time
import os
from waitress import serve
from flask import Flask
from jinja2 import Environment,FileSystemLoader
from markupsafe import Markup
from pyecharts.globals import CurrentConfig
import pandas as pd
CurrentConfig.GLOBAL_ENV = Environment(loader=FileSystemLoader("./templates"))
from flask import render_template
from pyecharts import  options as opts
from pyecharts.charts import  Bar,Line,Timeline,WordCloud,EffectScatter
import DAO
app = Flask(__name__)
data = ''
def pre():
    global data
    sql = "select * from test"
    data = DAO.select_data(sql)



def webFlow_base()->Bar:
    result = data.groupby('日期').agg({'浏览': 'sum'}).to_dict()['浏览']
    # result

    c = (
        Line()
        .add_xaxis(list(result.keys()))
        .add_yaxis('浏览量', list(result.values()))
        .set_global_opts(title_opts=opts.TitleOpts(title="网站每日流量变化图")
                         ,datazoom_opts=opts.DataZoomOpts(range_start=10,range_end=30))

    )
    return c
def firstTenWriter_base()->Timeline:
    t2 = Timeline()
    t2.add_schema(
        is_auto_play=True,
        is_loop_play=True,
        play_interval=4000
    )

    dates = data['日期'].unique()
    for date in dates:
        top10 = (
            data[data.日期 == date]
            .groupby('作者')
            .agg({'评分': 'mean'})
            .nlargest(10, '评分').round(2)  # 取前十
            .reset_index()
        )

        # 创建柱状图
        bar = (
            Bar()
            .add_xaxis(top10['作者'].tolist())
            .add_yaxis(
                "评分 (柱状图)",
                top10['评分'].tolist(),
                label_opts=opts.LabelOpts(
                    is_show=True,
                    position="inside",
                )
            )
        )

        # 创建折线图
        line = (
            Line()
            .add_xaxis(top10['作者'].tolist())
            .add_yaxis(
                "评分趋势 (折线图)",
                top10['评分'].tolist(),
                is_smooth=True,  # 平滑曲线
                label_opts=opts.LabelOpts(
                    is_show=True,  # 显示标签
                    formatter="{b}"  # 只显示名称 (作者)
                )
            )
        )

        # 将柱状图叠加到折线图上
        line.overlap(bar)

        # 设置折线图的 zindex 为 20,确保它在柱状图的上层
        line.set_series_opts(zindex=10)

        # 设置全局选项
        line.set_global_opts(
            title_opts=opts.TitleOpts(title=f"日期 {date} 前十作家评分"),
            xaxis_opts=opts.AxisOpts(name="作者", axislabel_opts=opts.LabelOpts(rotate=30)),  # 旋转 x 轴标签
            yaxis_opts=opts.AxisOpts(name="评分"),
            legend_opts=opts.LegendOpts(pos_top="5%")  # 调整图例位置
        )

        # 添加到时间轴
        t2.add(line, date)

    return t2

def writer_base() -> Timeline:
    # 动态展示网站作家每日评分变化
    t2 = Timeline()
    t2.add_schema(
        is_auto_play=True,
        is_loop_play=True,
        play_interval=2000
    )

    writers = data.作者.unique()
    for w in writers:
        item = data[data.作者 == w].groupby('日期').agg({'评分': 'mean'}).to_dict()['评分']

        bar = (
            Bar()
            .add_xaxis(list(item.keys()))
            .add_yaxis('评分', [round(val, 2) for val in item.values()],
                       label_opts=opts.LabelOpts(
                           is_show=True,  # 显示标签
                           position="top",  # 标签显示在柱状图上方
                           formatter= w  # `{b}` 将显示每个柱子对应的 x 轴值(这里是作者名字)
                            )
                    )
            .set_global_opts(title_opts=opts.TitleOpts(title="作家 {} 评分变化图".format(w)))
        )

        #     break;
        t2.add(bar, w)
    return t2


def hotWord_base()->WordCloud:
    # 热点标签生成
    label_value = {}

    for i in range(len(data)):
        if data['收藏'][i] >= 1000:
            temp = data['类型'][i].split(',')
            temp = [var.replace('[', '').replace(']', '').replace("'", '').strip() for var in temp]
            temp = [var for var in temp if not var.endswith("users入り")]
            for v in temp:
                if v not in label_value.keys():
                    label_value[v] = 0
                label_value[v] += data['评分'][i]
    label_value = label_value.items()

    # 创建词云图
    c = (
        WordCloud()
        .add(series_name="热点标签", data_pair=label_value, word_size_range=[20, 80])  # 调整字体大小范围
        .set_global_opts(
            title_opts=opts.TitleOpts(
                title="热点标签", title_textstyle_opts=opts.TextStyleOpts(font_size=23)
            ),
            tooltip_opts=opts.TooltipOpts(is_show=True),
        )
        .set_series_opts(
            #设置为菱形
            shape="star",  # 设置为菱形
            word_gap=5,  # 设置词与词之间的间距
            rotation_range=[-90, 90]  # 允许词云旋转
        )
    )

    # 调整图表的宽度和高度(可选,视需要)
    c.width = "100%"  # 设置为100%的宽度,适应屏幕
    c.height = "1050px"  # 设置词云图的高度
    return c

def effectScatter()->EffectScatter:
    x_data = data.日期.unique()
    y_data = data.groupby('日期').agg({'评分': 'mean'}).to_dict()['评分']

    c = (EffectScatter()
                     .add_xaxis(list(x_data))
                     .add_yaxis('评分', [int(i) for i in list(y_data.values())])
                     .set_global_opts(title_opts=opts.TitleOpts(title="网站每日评分变化图")
                                      , datazoom_opts=opts.DataZoomOpts(range_start=10, range_end=30))
                     )

    return c
@app.route("/webFlow")
def index():
    c = webFlow_base()
    c.render("static/webFlow.html")
    time.sleep(0.5)
    return render_template("webFlow.html")

@app.route("/firstTenWriter")
def index2():
    c = firstTenWriter_base()
    c.render("static/firstTenWriter.html")
    time.sleep(0.5)
    return render_template("firstTenWriter.html")
@app.route("/writer")
def index3():
    c = writer_base()
    c.render("static/writer.html")
    time.sleep(0.5)
    return render_template("writer.html")

@app.route("/hotWord")
def index4():

    c = hotWord_base()
    c.render("static/hotWord.html")
    time.sleep(0.5)
    return render_template("hotword.html")

@app.route("/effectScatter")
def index5():
    print(1)
    c = effectScatter()
    c.render("static/effectScatter.html")
    time.sleep(0.5)
    print(2)
    return render_template("effectScatter.html")

@app.route("/")
def index6():
    image_dir = r'./static/assets/img'
    image_paths = []

    # 遍历目录获取图片路径
    for filename in os.listdir(image_dir):
        if filename.endswith(('.jpg', '.jpeg', '.png', '.gif')):
            # 将图片路径添加到列表中
            static_image_path=os.path.join('static','assets', 'img', filename)
            if not os.path.exists(static_image_path):
                os.rename(os.path.join(image_dir, filename), static_image_path)
                print(f"移动文件: {filename} 到 {static_image_path}")
            image_paths.append(static_image_path)
    return render_template("index.html", image_paths=image_paths)


if __name__ == "__main__":
    pre()
    print(type(data))
    serve(app, host='0.0.0.0', port=8000)
    #app.run(host='0.0.0.0',port=5000)

reptile.py

import requests
import pandas as pd
from lxml import etree
import time
import datetime
from selenium import webdriver
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.edge.options import Options
from selenium.webdriver.edge.service import Service
from selenium.webdriver.common.by import By
from time import sleep
import threading
import DAO
from pyecharts import options as opts
from pyecharts.charts import Map, Timeline, Bar, Line, Pie, EffectScatter
from pyecharts.charts import WordCloud
import configparser
from concurrent.futures import ThreadPoolExecutor
from selenium.webdriver.support import expected_conditions as EC
from tqdm import tqdm
import re
import os
import download
import json
# 读取配置文件
config = configparser.RawConfigParser()
config.read(r"D:\py大作业\pixiv-\flask2\config\config.ini")

Cookie = config.get("reptile", "Cookie")
UA = config.get("reptile", "UA")
proxy = config.get("reptile", "proxy")
driverUrl = config.get("reptile", "driverUrl")
savePath = config.get("reptile", "savePath")
download_num = int(config.get("reptile", "download_num"))

up_url = "https://www.pixiv.net"
headers = {
    "Cookie": Cookie,
    "User-Agent": UA
}

mainurl = "https://www.pixiv.net/ranking.php"
# 数据列表
data_list = []
browser = None
download_list=[]
# 获取动态页面数据
# 注意需要爬取的信息是会改变的如fATptn的作者位置,需要去亲自去网页确认
def ret_match(master_image_url):
    match = re.search(r"/img-master/img/(.+?)_p\d+", master_image_url)  # 更正后的正则表达式
    if match:
        extracted_part = match.group(1)
        return extracted_part
    else:
        return None
def extract_and_build_pixiv_url(match):
    #从未登录状态获取原图的链接,暴力拼接法
    """从 master 图片 URL 提取信息并构建 original 图片 URL。"""
    if match:
        extracted_part = match
        possible_url = []
        original_image_url1 = f"https://i.pximg.net/img-original/img/{extracted_part}_p0.jpg"  # 添加 _p0
        original_image_url2 = f"https://i.pximg.net/img-original/img/{extracted_part}_p0.png"
        possible_url.extend([original_image_url1, original_image_url2])
        return possible_url
    else:
        return None

def getDataFromWeb(url, nowTime):
    global browser, download_num
    browser.get(url)
    taget = [a.text for a in browser.find_elements(By.CLASS_NAME,'gtm-new-work-tag-event-click')]

    temp = [b.text for b in [a.find_element(By.TAG_NAME,'dd') for a in
                             browser.find_element(By.CLASS_NAME,'dpDffd').find_elements(By.TAG_NAME,'li')]]

    writer = browser.find_element(By.CLASS_NAME,'fATptn').find_element(By.TAG_NAME,'div').text
    imgurl = ''
    if download_num > 0:
        try:
            image_element = browser.find_element(By.CSS_SELECTOR, "img.sc-1qpw8k9-1")
            parent_a_element = image_element.find_element(By.XPATH, "..")
            image_url = parent_a_element.get_attribute("href")
            #image_url = image_element.get_attribute("href")
            # url_match=ret_match(image_url)
            # original_image_url = extract_and_build_pixiv_url(url_match)
            # imgurl = url_match
            original_image_url= [image_url]
            imgurl = image_url
            print(f"Image URL: {imgurl}")
            if(download_num==1): pass
            else:
                download_list.extend(original_image_url)
                download_num -= 1

        except Exception as e:
            print('不许看哦')
            pass
            #print(f"Error while downloading image: {e}")

    return [nowTime, writer, temp[0], temp[1], temp[2], taget, imgurl]

# 线程任务—获取数据
def thread_getData(url_list,nowTime):
    for url in tqdm(url_list, desc="获取数据进度", ncols=100, unit="项"):  # 使用 tqdm 包装 url_list
        print('--'+url)
        data_list.append(getDataFromWeb(url,nowTime))

# 获取下一页的数据
def get_NewPage(url):
    if url == "":
        url = "https://www.pixiv.net/ranking.php?mode=daily&content=illust"
    print(url)
    context = requests.get(url = url,headers=headers,proxies={'http':'http://127.0.0.1:7890','https':'http://127.0.0.1:7890','socks':'socks5://127.0.0.1:7890'}).text
    html = etree.HTML(context)
    print('finished get')
    nowtime = html.xpath('//*[@id="wrapper"]/div[1]/div/div[2]/div/nav[2]/ul/li[2]/a/text()')[0] # 获取当前排名的日期----nowtime
    print(nowtime.encode("utf-8").decode("utf-8"))
    o = html.xpath('/html/body/div[3]/div[1]/div/div[3]/div[1]/section/div[2]/a[1]/@href')
    o = [up_url+x for x in o if "user" not in x] # 各个图片的网页
    return o,nowtime,html

# 获取Cookie并加载到Selenium中
def getCookie(driver):
    driver.get("https://www.pixiv.net/")
    time.sleep(60)
    print(driver.get_cookies())
    with open('cookies.txt', 'w') as f:
        f.write(json.dumps(driver.get_cookies()))
def login(driver):
    driver.get("https://www.pixiv.net/")
    with open('cookies.txt', 'r') as f:
        cookies_list = json.load(f)
        for cookie in cookies_list:
            # 检查并转换Cookie中的expiry字段
            if isinstance(cookie.get('expiry'), float):
                cookie['expiry'] = int(cookie['expiry'])
            driver.add_cookie(cookie)
    driver.refresh()



if __name__ == "__main__":


    # cookie_list = []
    # for cookie_str in Cookie.split(';'):
    #     cookie = {}
    #     name, value = cookie_str.strip().split('=', 1)
    #     cookie['name'] = name
    #     cookie['value'] = value
    #     cookie_list.append(cookie)
    # print(cookie_list)
    # 进入各个网页,收集他们的标签等信息
    browser_options = Options()
    browser_options.add_argument('--headless')
    browser_options.add_argument('--disable-gpu')
    browser_options.add_argument("--proxy-server=" + proxy)

    # browser_options.add_argument("--proxy-server=http://10.112.78.231:7890")
    service = Service(driverUrl)
    browser = webdriver.Edge(service=service, options=browser_options)

    # browser.get('https://www.pixiv.net')  # 先访问Pixiv首页以触发Cookie加载
    # load_cookies_to_browser(browser, cookie_list)
    #getCookie(browser)
    login(browser)
    # browser.refresh()
    #pass
    start = time.time()
    threads = []
    next_url = ""
    ok = False
    for j in range(1):
        o, nowtime, html = get_NewPage(next_url)
        l = 0
        r = len(o) // 5
        thread_getData(o,nowtime)
        #for i in range(10):
        #    thread1 = threading.Thread(target=thread_getData, args=(o[l:r], nowtime))
        #    l += len(o) // 5
        #    r += len(o) // 5
        #    threads.append(thread1)
        #for i in threads:
        #    i.start()
        #for t in threads:
        #    t.join()

        print("ok!", end=" ")
        #print(nowtime)
        print(len(data_list))

        print("\n")
        # for t in threads:
        #     del (t)
        # threads.clear()
        # xpath 的基本语法https://halo.suzakudry.top/archives/xpathde-ji-ben-yu-fa
        if ok == False:
            next_url = \
            [mainurl + i for i in html.xpath('//*[@id="wrapper"]/div[1]/div/div[2]/div/nav[2]/ul/li[3]/a/@href')][0]
            ok = True
        else:
            next_url = \
            [mainurl + i for i in html.xpath('//*[@id="wrapper"]/div[1]/div/div[2]/div/nav[2]/ul/li[4]/a/@href')][0]
    end = time.time()
    print(end - start)
    browser.close()
    browser.quit()
    data = pd.DataFrame(data_list)
    #print(data)

    #最后再下载图片
    print(download_list)
    download.download_images(download_list)

    data.columns = ['日期', '作者', '点赞', '收藏', '浏览', '类型', '图片url']
    data['点赞'] = data['点赞'].str.replace(',', '')
    data[['点赞']] = data[['点赞']].astype('int')
    data['收藏'] = data['收藏'].str.replace(',', '')
    data[['收藏']] = data[['收藏']].astype('int')
    data['浏览'] = data['浏览'].str.replace(',', '')
    data[['浏览']] = data[['浏览']].astype('int')
    data['日期'] = data['日期'].astype('string')
    data['作者'] = data['作者'].astype('string')
    data['类型'] = data['类型'].astype('string')
    data['评分'] = round(data.点赞 * 0.3 + data.收藏 * 0.5 + data.浏览 * 0.2,2)
    data['图片url'] = data['图片url'].astype('string')
    DAO.insert(data)

download.py

import os
import requests
import re
from time import sleep
from tqdm import tqdm
from configparser import RawConfigParser
from typing import Dict

import dataclasses

# 读取配置文件
config = RawConfigParser()
config.read(r"D:\py大作业\pixiv-\flask2\config\config.ini", encoding="utf-8")

# 获取配置
savePath = config.get("reptile", "savePath")
Cookie = config.get("reptile", "Cookie")
UA = config.get("reptile", "UA")
# proxy = config.get("reptile", "proxy")

# 代理配置(确保代理服务器已运行)
@dataclasses.dataclass  # 使用 dataclass 装饰器
class ProxyConfig:
    proxy: Dict = dataclasses.field(default_factory=lambda: {"https": "http://127.0.0.1:7890"})

# 初始化代理配置
proxy_config = ProxyConfig()

# 请求头
headers = {
    "Cookie": "first_visit_datetime_pc=2023-12-09%2021%3A31%3A13; p_ab_id=0; p_ab_id_2=6; p_ab_d_id=244589072; yuid_b=NlCVUYk; privacy_policy_notification=0; a_type=0; b_type=0; login_ever=yes; device_token=8356e6c1013cc6294b3a3f7956959538; privacy_policy_agreement=7; c_type=20; _im_vid=01JE3ADR0ZXFNSQ376V4P2MDPF; _gcl_au=1.1.1144295421.1733131284; __cf_bm=MwvJ45b..KemUK1MKnA0pKJlqTT48hESbEMe0HNjUgE-1733293411-1.0.1.1-UrvOF6m3m5u6_ocIf8HGlxQUqofhBz90YkVhFCFmAOAahtcNn61NqylTyDn2JDeG9tIAbFZ_hNRojKpTeimCgR7PcDAcybOHB50wAuYBwNo; cf_clearance=1CiGGORvOKrXpakkPYXKzpVU_A7E40TAfLI_H_yN01Q-1733293412-1.2.1.1-sBmM5duXrswtnhN4W9KILH2SMcKSoYSOI2v5GVaHh5rLKxS.3V8Os7rSP7u2WXP2K9UkLEIoldI6plTv80anM5nOfKGp.jApfw29B8xdwAxTvVbavFSja3VLprNDqMxTfXd6Zjr2xNoHcPjx7F7yihKEYbtVspFz2JwocsmrDSWONwlY9Xo4dxY.fkGcCO4iKE_8mrQa.lexYjUme3rV1Si6fprt7NMBqfVWrf91.gtyeRUjlqz8bdP1i6TYXH0y1pSETrBaMbbK09MadrHtFlnkxdTGeySVbeaySJp1uNQEVNSTZ5W1akFcLqNRN98x9ApT3oiGGowS0jnT3LVQPofJ1t92UsPFI0EgBPWnaMouFQOPQ1ImTwCYVip811dtIbxLxZ6HcMEnVmqINdjYqw; PHPSESSID=49934750_NGntZmQyGNe8yYtMJPmxxdffyf3EwPc0; _ga_MZ1NL4PHH0=GS1.1.1733293419.8.1.1733293441.0.0.0; _gid=GA1.2.1918966557.1733293445; _gat_UA-1830249-3=1; _ga_75BBYNYN9J=GS1.1.1733293414.12.1.1733293463.0.0.0; _ga=GA1.2.565933053.1702125080",
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36",
}

def get_format(url):
    match = re.search(r'\.(\w+)$', url)
    if match:
        image_format = match.group(1)
        print(image_format)  # 输出:png
        return image_format
    else:
        print("无法提取图片格式")
        return "jpg"
def download_image(url):
    """
    下载图片并保存到指定目录
    """
    print(savePath)
    save_dir = savePath  # 图片保存路径
    if not os.path.exists(save_dir):
        os.makedirs(save_dir)  # 如果文件夹不存在,则创建

    try:
        result = re.search(r"/(\d+)_", url)
        image_id = result.group(1)
        image_name = url[url.rfind("/") + 1:]  # 获取文件名
        header = {"Referer": f"https://www.pixiv.net/artworks/{image_id}"}
        headers.update(header)  # 更新请求头
        print(f"https://www.pixiv.net/artworks/{image_id}")
        #formatd=get_format(url)
        print(f"downloading {image_name}")
        save_path = os.path.join(save_dir, f"{image_name}")  # 图片保存路径
        proxy = None  # 或者不传递 proxies 参数
        # 请求并下载图片
        response = requests.get(url, headers=headers, proxies=proxy_config.proxy)
        sleep(0.5)
        if response.status_code == 200:
            #wb 只写方式打开或新建一个二进制文件,只允许写数据。
            with open(save_path, 'wb') as f:
                for chunk in response.iter_content(1024):  # 分块写入文件
                    f.write(chunk)
            print(f"Image saved: {save_path}")
        else:
            print(f"Failed to download image: {response.status_code}")
    except requests.exceptions.RequestException as e:
        print(f"Error downloading image: {e}")

def download_images(image_urls):
    """
    批量下载图片,传入图片 URL 列表
    """
    # 使用 tqdm 展示下载进度
    for image_url in tqdm(image_urls, desc="Downloading images", ncols=100, unit="images"):
        download_image(image_url)
        sleep(1)


if __name__ == "__main__":
    # 示例:下载图片
    image_urls = [
        'https://i.pximg.net/img-original/img/2024/12/08/08/00/06/124996182_p0.jpg',  # 填入图片的URL
        'https://i.pximg.net/img-original/img/2024/12/10/00/00/21/125049816_p0.png'  # 填入图片的URL
    ]
    download_images(image_urls)  # 批量下载图片


评论
你无需删除空行,直接评论以获取最佳展示效果
引用到评论