Python小红书数据采集终极指南:xhs工具完整使用教程
Python小红书数据采集终极指南xhs工具完整使用教程【免费下载链接】xhs基于小红书 Web 端进行的请求封装。https://reajason.github.io/xhs/项目地址: https://gitcode.com/gh_mirrors/xh/xhs小红书作为中国领先的生活方式分享平台蕴含着丰富的用户行为数据和内容趋势。xhs是一个专为开发者设计的Python爬虫工具通过封装小红书Web端API接口帮助用户快速获取公开内容数据。本文将深入解析xhs工具的核心功能、实战应用和最佳实践为开发者提供完整的小红书数据采集解决方案。核心特性解析为什么选择xhs工具xhs工具的核心优势在于其完整的API封装和智能签名机制。通过分析xhs/core.py源码我们可以看到工具提供了全面的接口覆盖1. 完整的API接口支持xhs工具支持小红书主要的数据接口包括笔记搜索与详情获取用户信息与发布内容查询多种内容类型的分类获取智能签名验证机制2. 智能签名服务架构为了解决小红书的反爬机制xhs工具设计了灵活的签名方案。通过example/basic_sign_server.py示例开发者可以部署独立的签名服务确保请求的稳定性和安全性。3. 错误处理与重试机制xhs/exception.py模块定义了完整的异常处理体系包括DataFetchError、IPBlockError等帮助开发者构建健壮的数据采集应用。环境搭建与快速开始安装xhs工具pip install xhs从源码安装最新版本git clone https://gitcode.com/gh_mirrors/xh/xhs cd xhs python setup.py install获取必要凭证使用xhs工具需要小红书的cookie信息关键字段包括a1用户身份标识web_session会话信息webId设备标识基础使用示例参考example/basic_usage.py快速开始你的第一个数据采集脚本from xhs import XhsClient # 初始化客户端 client XhsClient(cookieyour_cookie_here) # 搜索热门笔记 results client.search_note( keyword美食探店, page1, page_size20 ) print(f找到 {len(results[items])} 条相关笔记)实战应用场景与代码示例场景一市场趋势分析import json from xhs import XhsClient, FeedType def analyze_market_trends(): client XhsClient(cookieyour_cookie) # 获取不同类别的推荐内容 categories [ FeedType.FOOD, # 美食 FeedType.FASION, # 穿搭 FeedType.COSMETICS, # 彩妆 FeedType.TRAVEL # 旅行 ] trends_data {} for category in categories: try: feed client.get_home_feed(categorycategory.value) trends_data[category.name] { count: len(feed.get(items, [])), top_keywords: extract_keywords(feed) } except Exception as e: print(f获取{category.name}数据失败: {e}) return trends_data def extract_keywords(feed_data): 从feed数据中提取关键词 # 实现关键词提取逻辑 pass场景二竞品内容监控import schedule import time from datetime import datetime from xhs import XhsClient, help class CompetitorMonitor: def __init__(self, cookie): self.client XhsClient(cookie) self.competitors { brand_a: user_id_1, brand_b: user_id_2, brand_c: user_id_3 } def monitor_daily_posts(self): 监控竞争对手的每日发布内容 daily_report {} for brand, user_id in self.competitors.items(): try: user_notes self.client.get_user_notes( user_iduser_id, page1, page_size10 ) daily_report[brand] { post_count: len(user_notes.get(notes, [])), total_likes: sum(note.get(likes, 0) for note in user_notes.get(notes, [])), total_collects: sum(note.get(collects, 0) for note in user_notes.get(notes, [])), latest_post: user_notes.get(notes, [])[0] if user_notes.get(notes) else None } except Exception as e: print(f监控{brand}失败: {e}) return daily_report def start_monitoring(self, interval_hours6): 启动定时监控 schedule.every(interval_hours).hours.do(self.monitor_daily_posts) while True: schedule.run_pending() time.sleep(60)场景三内容质量评估from xhs import XhsClient from typing import Dict, List class ContentQualityAnalyzer: def __init__(self, cookie): self.client XhsClient(cookie) def analyze_note_quality(self, note_id: str) - Dict: 分析笔记质量指标 try: note_detail self.client.get_note_by_id( note_idnote_id, xsec_tokenyour_token ) # 计算互动率 likes note_detail.get(likes, 0) collects note_detail.get(collects, 0) comments note_detail.get(comments, 0) shares note_detail.get(shares, 0) # 提取内容特征 content note_detail.get(desc, ) images help.get_imgs_url_from_note(note_detail) quality_score self.calculate_quality_score( likes, collects, comments, shares, len(content), len(images) ) return { note_id: note_id, quality_score: quality_score, engagement_rate: (likes collects comments) / 1000, content_length: len(content), image_count: len(images), has_video: video in note_detail, publish_time: note_detail.get(time, ) } except Exception as e: print(f分析笔记{note_id}失败: {e}) return None def calculate_quality_score(self, *args) - float: 计算内容质量得分 # 实现质量评分算法 pass高级功能配置与优化签名服务部署方案对于生产环境建议部署独立的签名服务。参考xhs-api/app.py实现# Docker部署签名服务 docker run -it -d -p 5005:5005 reajason/xhs-api:latest # 客户端使用签名服务 from xhs import XhsClient import requests def remote_sign(uri, dataNone, a1, web_session): 远程签名函数 response requests.post( http://localhost:5005/sign, json{ uri: uri, data: data, a1: a1, web_session: web_session } ) return response.json() # 初始化带远程签名的客户端 client XhsClient( cookieyour_cookie, signremote_sign )性能优化策略1. 连接池管理import requests from requests.adapters import HTTPAdapter from xhs import XhsClient class OptimizedXhsClient: def __init__(self, cookie, max_retries3, pool_connections10, pool_maxsize10): self.session requests.Session() # 配置连接池 adapter HTTPAdapter( pool_connectionspool_connections, pool_maxsizepool_maxsize, max_retriesmax_retries ) self.session.mount(https://, adapter) self.session.mount(http://, adapter) self.client XhsClient(cookie, sessionself.session) def get_with_retry(self, func, *args, **kwargs): 带重试机制的请求 for attempt in range(3): try: return func(*args, **kwargs) except Exception as e: if attempt 2: raise time.sleep(2 ** attempt) # 指数退避2. 缓存机制实现import json import os from datetime import datetime, timedelta from functools import wraps def cached_result(cache_dircache, ttl_hours24): 结果缓存装饰器 def decorator(func): wraps(func) def wrapper(*args, **kwargs): # 生成缓存键 cache_key f{func.__name__}_{hash(str(args) str(kwargs))} cache_file os.path.join(cache_dir, f{cache_key}.json) # 检查缓存是否有效 if os.path.exists(cache_file): with open(cache_file, r, encodingutf-8) as f: cache_data json.load(f) cache_time datetime.fromisoformat(cache_data[timestamp]) if datetime.now() - cache_time timedelta(hoursttl_hours): return cache_data[data] # 执行函数并缓存结果 result func(*args, **kwargs) os.makedirs(cache_dir, exist_okTrue) cache_data { timestamp: datetime.now().isoformat(), data: result } with open(cache_file, w, encodingutf-8) as f: json.dump(cache_data, f, ensure_asciiFalse, indent2) return result return wrapper return decorator # 使用缓存 cached_result(cache_dirnote_cache, ttl_hours12) def get_note_with_cache(client, note_id, xsec_token): 带缓存的笔记获取 return client.get_note_by_id(note_id, xsec_token)3. 并发处理优化import concurrent.futures from typing import List, Dict class BatchProcessor: def __init__(self, client, max_workers5): self.client client self.max_workers max_workers def batch_get_notes(self, note_ids: List[str], xsec_tokens: List[str]) - List[Dict]: 批量获取笔记信息 results [] with concurrent.futures.ThreadPoolExecutor(max_workersself.max_workers) as executor: # 创建任务映射 future_to_note { executor.submit( self.client.get_note_by_id, note_id, token ): (note_id, token) for note_id, token in zip(note_ids, xsec_tokens) } # 收集结果 for future in concurrent.futures.as_completed(future_to_note): note_id, token future_to_note[future] try: result future.result() results.append(result) except Exception as e: print(f获取笔记{note_id}失败: {e}) results.append({note_id: note_id, error: str(e)}) return results错误处理与监控异常处理最佳实践from xhs.exception import DataFetchError, IPBlockError, NeedVerifyError, SignError import time import random class RobustXhsClient: def __init__(self, cookie, sign_funcNone): self.client XhsClient(cookie, signsign_func) self.request_count 0 self.last_request_time time.time() def safe_request(self, api_func, max_retries3, delay_range(2, 5)): 安全的API请求封装 for attempt in range(max_retries): try: # 控制请求频率 current_time time.time() time_since_last current_time - self.last_request_time if time_since_last 1.0: # 至少1秒间隔 time.sleep(1.0 - time_since_last) result api_func() self.request_count 1 self.last_request_time time.time() return result except DataFetchError as e: print(f数据获取失败 (尝试 {attempt 1}/{max_retries}): {e}) if attempt max_retries - 1: wait_time random.uniform(*delay_range) time.sleep(wait_time) except IPBlockError: print(检测到IP限制建议更换IP或等待一段时间) break except SignError: print(签名失败检查签名服务配置) break except NeedVerifyError: print(需要验证码验证) break except Exception as e: print(f未知错误: {e}) if attempt max_retries - 1: time.sleep(random.uniform(*delay_range)) return None def get_note_safe(self, note_id, xsec_token, max_retries3): 安全的获取笔记信息 return self.safe_request( lambda: self.client.get_note_by_id(note_id, xsec_token), max_retriesmax_retries )监控与日志记录import logging from logging.handlers import RotatingFileHandler from datetime import datetime def setup_logging(): 配置日志系统 logger logging.getLogger(xhs_client) logger.setLevel(logging.INFO) # 文件处理器 file_handler RotatingFileHandler( xhs_client.log, maxBytes10*1024*1024, # 10MB backupCount5 ) file_handler.setLevel(logging.INFO) # 控制台处理器 console_handler logging.StreamHandler() console_handler.setLevel(logging.WARNING) # 格式器 formatter logging.Formatter( %(asctime)s - %(name)s - %(levelname)s - %(message)s ) file_handler.setFormatter(formatter) console_handler.setFormatter(formatter) logger.addHandler(file_handler) logger.addHandler(console_handler) return logger class MonitoredXhsClient: def __init__(self, cookie, loggerNone): self.client XhsClient(cookie) self.logger logger or setup_logging() self.metrics { total_requests: 0, successful_requests: 0, failed_requests: 0, start_time: datetime.now() } def get_with_monitoring(self, api_func, *args, **kwargs): 带监控的API调用 self.metrics[total_requests] 1 try: result api_func(*args, **kwargs) self.metrics[successful_requests] 1 self.logger.info(fAPI调用成功: {api_func.__name__}) return result except Exception as e: self.metrics[failed_requests] 1 self.logger.error(fAPI调用失败: {api_func.__name__}, 错误: {str(e)}) raise def get_metrics(self): 获取监控指标 current_time datetime.now() runtime (current_time - self.metrics[start_time]).total_seconds() return { **self.metrics, runtime_seconds: runtime, requests_per_second: self.metrics[total_requests] / runtime if runtime 0 else 0, success_rate: self.metrics[successful_requests] / self.metrics[total_requests] if self.metrics[total_requests] 0 else 0 }数据存储与处理建议数据库设计示例import sqlite3 from datetime import datetime import json class XhsDataStorage: def __init__(self, db_pathxhs_data.db): self.conn sqlite3.connect(db_path) self.create_tables() def create_tables(self): 创建数据表 cursor self.conn.cursor() # 笔记表 cursor.execute( CREATE TABLE IF NOT EXISTS notes ( note_id TEXT PRIMARY KEY, user_id TEXT, title TEXT, content TEXT, likes INTEGER, collects INTEGER, comments INTEGER, shares INTEGER, image_count INTEGER, has_video BOOLEAN, publish_time TIMESTAMP, category TEXT, tags TEXT, -- JSON数组 raw_data TEXT, -- 原始JSON数据 created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ) # 用户表 cursor.execute( CREATE TABLE IF NOT EXISTS users ( user_id TEXT PRIMARY KEY, nickname TEXT, avatar_url TEXT, followers INTEGER, following INTEGER, notes_count INTEGER, likes_count INTEGER, raw_data TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ) # 搜索记录表 cursor.execute( CREATE TABLE IF NOT EXISTS search_records ( id INTEGER PRIMARY KEY AUTOINCREMENT, keyword TEXT, sort_type TEXT, page INTEGER, page_size INTEGER, total_results INTEGER, search_time TIMESTAMP, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ) self.conn.commit() def save_note(self, note_data): 保存笔记数据 cursor self.conn.cursor() cursor.execute( INSERT OR REPLACE INTO notes (note_id, user_id, title, content, likes, collects, comments, shares, image_count, has_video, publish_time, category, tags, raw_data, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) , ( note_data.get(note_id), note_data.get(user_id), note_data.get(title, ), note_data.get(desc, ), note_data.get(likes, 0), note_data.get(collects, 0), note_data.get(comments, 0), note_data.get(shares, 0), len(note_data.get(images, [])), video in note_data, note_data.get(time), note_data.get(category, ), json.dumps(note_data.get(tags, []), ensure_asciiFalse), json.dumps(note_data, ensure_asciiFalse), datetime.now().isoformat() )) self.conn.commit() def get_notes_by_category(self, category, limit100): 按分类获取笔记 cursor self.conn.cursor() cursor.execute( SELECT * FROM notes WHERE category ? ORDER BY publish_time DESC LIMIT ? , (category, limit)) columns [desc[0] for desc in cursor.description] return [dict(zip(columns, row)) for row in cursor.fetchall()] def close(self): 关闭数据库连接 self.conn.close()最佳实践与注意事项1. 合规使用原则仅采集公开数据不要尝试获取非公开的用户信息控制请求频率建议每次请求间隔2-5秒避免对服务器造成压力尊重版权合理使用采集到的内容遵守平台使用条款数据存储安全加密存储敏感信息限制数据访问权限2. 性能优化建议使用连接池减少TCP连接建立开销实现缓存机制减少重复请求批量处理使用并发提高效率错误重试实现指数退避重试策略3. 监控与维护日志记录详细记录API调用和错误信息性能监控跟踪请求成功率、响应时间等指标定期更新关注xhs工具的更新及时升级版本数据备份定期备份采集的数据4. 常见问题解决签名失败检查cookie中的a1字段是否与签名服务一致IP限制降低请求频率或更换IP地址数据获取失败检查网络连接和cookie有效性内存泄漏定期清理缓存和连接池总结与展望xhs工具为开发者提供了一个强大而灵活的小红书数据采集解决方案。通过本文的介绍你应该已经掌握了核心功能完整的API封装和智能签名机制实战应用市场分析、竞品监控、内容评估等场景高级配置签名服务部署和性能优化策略最佳实践错误处理、数据存储和合规使用随着小红书平台的不断更新xhs工具也在持续演进。建议开发者关注项目的GitHub仓库获取最新更新参与社区讨论分享使用经验根据实际需求定制化开发遵守平台规则和法律法规通过合理使用xhs工具你可以高效地获取小红书公开数据为业务决策和产品开发提供有力支持。记住技术是工具合理使用才能发挥最大价值。【免费下载链接】xhs基于小红书 Web 端进行的请求封装。https://reajason.github.io/xhs/项目地址: https://gitcode.com/gh_mirrors/xh/xhs创作声明:本文部分内容由AI辅助生成(AIGC),仅供参考