首页
关于
友链
Search
1
ESXI 一些功能调整
442 阅读
2
SoftEther 客户端篇
423 阅读
3
天翼云网关3.0/4.0超管密码
414 阅读
4
SoftEther 服务端篇
308 阅读
5
远程桌面rustdesk使用说明
254 阅读
默认分类
代码相关
vue
html
python
系统
数据库
docker
安卓
软件分享
杂七杂八的工具
游戏分享
游戏相关
我的家庭影院
Ai
登录
Search
标签搜索
python
esxi
git
SoftEther
linux
apscheduler
在线
rclone
Ubuntu
list
列表
idm
激活码
Charles
pdf
免安装
鲁大师
图片查看器
蜂蜜浏览器
honeyview
哥特式
累计撰写
98
篇文章
累计收到
16
条评论
首页
栏目
默认分类
代码相关
vue
html
python
系统
数据库
docker
安卓
软件分享
杂七杂八的工具
游戏分享
游戏相关
我的家庭影院
Ai
页面
关于
友链
搜索到
29
篇与
的结果
2025-03-13
python库 faker
faker,一个非常厉害的 Python 库Github地址:https://github.com/joke2k/faker在软件开发和测试过程中,生成真实可信的测试数据是一项重要但耗时的工作。Python的faker库提供了一个简单而强大的解决方案,它能够生成各种类型的虚拟数据,包括个人信息、地址、公司信息等。这些数据看起来十分真实,非常适合用于开发测试、数据库填充和应用程序演示。安装基础安装使用pip包管理器进行安装:pip install faker基本功能生成个人信息faker库提供了丰富的个人信息生成功能,包括姓名、地址、电话号码等。这些数据可以按照不同的语言和地区格式生成,确保了数据的本地化和真实性。from faker import Faker # 创建Faker实例 fake = Faker('zh_CN') # 使用中文本地化 # 生成个人基本信息 print(f"姓名: {fake.name()}") print(f"地址: {fake.address()}") print(f"手机号: {fake.phone_number()}") print(f"电子邮箱: {fake.email()}") print(f"身份证号: {fake.ssn()}")生成公司信息在企业应用开发中,经常需要模拟公司相关的信息。faker提供了完整的公司信息生成功能,包括公司名称、职位名称、营业执照号等数据。from faker import Faker fake = Faker('zh_CN') # 生成公司相关信息 print(f"公司名称: {fake.company()}") print(f"职位名称: {fake.job()}") print(f"公司口号: {fake.catch_phrase()}") print(f"营业执照号: {fake.company_prefix()}") print(f"公司地址: {fake.company_suffix()}")生成日期和时间faker支持生成各种格式的日期和时间数据,可以指定日期范围,生成过去或未来的随机时间。这对于创建时间序列数据或测试日期相关功能特别有用。from faker import Faker from datetime import datetime, timedelta fake = Faker() # 生成日期时间数据 print(f"当前时间: {fake.date_time_this_month()}") print(f"过去日期: {fake.date_time_between(start_date='-30d', end_date='now')}") print(f"未来日期: {fake.future_date(end_date='+30d')}") print(f"时间戳: {fake.unix_time()}")高级功能自定义数据生成器faker允许创建自定义的数据生成器,可以根据特定需求定义数据的生成规则。这种灵活性使其能够适应各种特殊的数据生成需求。from faker import Faker from faker.providers import BaseProvider class CustomProvider(BaseProvider): def custom_product_code(self): return f"PRD-{self.random_int(min=1000, max=9999)}" def custom_status(self): statuses = ['待处理', '处理中', '已完成', '已取消'] return self.random_element(statuses) fake = Faker('zh_CN') fake.add_provider(CustomProvider) # 使用自定义生成器 print(f"产品编码: {fake.custom_product_code()}") print(f"订单状态: {fake.custom_status()}")多语言支持faker提供了出色的多语言支持,可以根据不同地区的特点生成本地化数据。这对于开发国际化应用或测试多语言功能非常有帮助。from faker import Faker # 创建多语言Faker实例 fake_cn = Faker('zh_CN') # 中文 fake_en = Faker('en_US') # 英文 fake_jp = Faker('ja_JP') # 日文 # 生成不同语言的数据 print("中文姓名:", fake_cn.name()) print("英文姓名:", fake_en.name()) print("日文姓名:", fake_jp.name()) print("中文地址:", fake_cn.address()) print("英文地址:", fake_en.address()) print("日文地址:", fake_jp.address())实际应用场景数据库测试数据生成在开发过程中,经常需要生成大量测试数据来填充数据库。使用faker可以快速生成符合要求的测试数据,帮助开发人员进行功能测试和性能测试。from faker import Faker import sqlite3 class TestDataGenerator: def __init__(self): self.fake = Faker('zh_CN') self.conn = sqlite3.connect('test.db') self.cursor = self.conn.cursor() def create_tables(self): self.cursor.execute(''' CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY, name TEXT, email TEXT, address TEXT, phone TEXT ) ''') def generate_user_data(self, count): for _ in range(count): self.cursor.execute(''' INSERT INTO users (name, email, address, phone) VALUES (?, ?, ?, ?) ''', ( self.fake.name(), self.fake.email(), self.fake.address(), self.fake.phone_number() )) self.conn.commit() # 使用示例 generator = TestDataGenerator() generator.create_tables() generator.generate_user_data(100)API测试数据模拟在进行API测试时,需要模拟各种请求数据。faker可以帮助生成符合API要求的测试数据,提高测试效率和覆盖率。from faker import Faker import json class APITestDataGenerator: def __init__(self): self.fake = Faker('zh_CN') def generate_user_payload(self): return { "user": { "name": self.fake.name(), "email": self.fake.email(), "phone": self.fake.phone_number(), "address": { "street": self.fake.street_address(), "city": self.fake.city(), "postcode": self.fake.postcode() } } } def generate_order_payload(self): return { "order": { "order_id": self.fake.uuid4(), "customer_name": self.fake.name(), "product_name": self.fake.word(), "quantity": self.fake.random_int(min=1, max=10), "order_date": self.fake.date_time_this_month().isoformat() } } # 使用示例 generator = APITestDataGenerator() user_data = generator.generate_user_payload() order_data = generator.generate_order_payload() print(json.dumps(user_data, indent=2, ensure_ascii=False)) print(json.dumps(order_data, indent=2, ensure_ascii=False))总结Python faker库为开发者提供了一个强大而灵活的测试数据生成解决方案。通过其丰富的内置数据生成器和自定义功能,开发者可以轻松创建各种类型的模拟数据。faker的多语言支持和本地化特性使其特别适合国际化应用的开发和测试。在实际应用中,从简单的个人信息生成到复杂的数据库测试数据填充,faker都展现出了优秀的性能和可靠性。
2025年03月13日
26 阅读
0 评论
0 点赞
2025-03-13
Python异步任务调度:任务队列Celery
Python异步任务调度:最牛逼的任务队列神器Celery在现代应用中,异步任务调度无疑是提升性能和响应速度的关键之一。尤其在处理高并发、长时间运行任务时,如何让后台任务异步执行成为了开发者面临的一个重要问题。Python中,Celery 是一个强大且广泛使用的异步任务队列工具,它让你可以轻松处理并发任务、定时任务以及任务调度。那么,今天我们就来深入了解Celery以及如何在Python项目中使用它。什么是Celery?Celery 是一个分布式的任务队列系统,支持异步任务执行和定时任务调度。通过Celery,你可以把一些耗时的操作,比如发送邮件、视频处理、图像处理等从主流程中提取出来,交给Celery去处理,这样就能有效提升系统的响应速度和用户体验。Celery通常与消息代理(如RabbitMQ或Redis)结合使用,负责将任务分配给多个工作进程进行异步执行。它的工作原理就像是一个生产者-消费者模式,生产者将任务放入队列,消费者从队列中取出任务并执行。安装Celery首先,我们需要安装Celery。你可以通过pip命令轻松安装:pip install celery使用Celery调度任务让我们通过一个简单的例子来了解如何使用Celery调度异步任务。假设你正在开发一个网站,并且想要在用户注册后发送一个欢迎邮件,而发送邮件是一个耗时操作。为了避免阻塞用户注册的流程,我们希望将邮件发送任务放到Celery中异步执行。步骤1:创建一个Celery实例首先,我们需要创建一个Celery应用。我们可以将其配置成在一个叫做celery.py的文件中:from celery import Celery # 创建一个Celery实例,配置消息队列为Redis app = Celery('tasks', broker='redis://localhost:6379/0') # 定义任务 @app.task def send_welcome_email(user_email): print(f"Sending welcome email to {user_email}") # 这里可以调用发送邮件的逻辑 return f"Email sent to {user_email}"在上面的代码中:Celery('tasks', broker='redis://localhost:6379/0')这一行代码创建了一个Celery应用,并指定了消息代理使用Redis。send_welcome_email是我们定义的一个简单任务,它会模拟发送邮件。步骤2:启动Celery Worker在开发过程中,你可以启动Celery Worker来处理任务。进入项目根目录,执行以下命令:celery -A celery worker --loglevel=info这个命令会启动Celery的工作进程(worker)。它会从消息队列中取出任务并执行。你会看到类似以下的输出,表示Celery Worker已经准备好接收任务了。[2025-02-14 15:00:00,000: INFO/MainProcess] Connected to redis://localhost:6379/0 [2025-02-14 15:00:01,000: INFO/MainProcess] mingle: searching for neighbors [2025-02-14 15:00:02,000: INFO/MainProcess] mingle: all alone [2025-02-14 15:00:02,000: INFO/MainProcess] celery@hostname ready.步骤3:调用任务接下来,我们可以通过Python脚本来调用这个异步任务:from celery import Celery from tasks import send_welcome_email # 调用异步任务 send_welcome_email.delay('user@example.com')send_welcome_email.delay('user@example.com') 是Celery提供的一个异步调用方法,使用delay方法将任务提交到队列中,而不是直接执行。这时,Celery Worker会从队列中取出任务并执行。当邮件发送完成后,你会看到终端输出了类似:Sending welcome email to user@example.com定时任务调度除了异步任务,Celery 还支持定时任务调度,类似于Linux的cron任务。我们可以利用celery-beat扩展来实现定时任务。让我们来写个简单的定时任务,比如每小时检查一次用户的邮箱是否有效。步骤1:安装celery-beatpip install celery[redis] celery[redis]==5.0.5 pip install celery[beat]步骤2:配置定时任务在celery.py中添加如下配置:from celery import Celery from celery.schedules import crontab app = Celery('tasks', broker='redis://localhost:6379/0') @app.task def check_email_validity(): print("Checking email validity for users...") # 定义定时任务,每小时执行一次 app.conf.beat_schedule = { 'check-email-validity-every-hour': { 'task': 'tasks.check_email_validity', 'schedule': crontab(minute=0, hour='*'), # 每小时的第0分钟执行 }, }步骤3:启动Celery Beat除了启动Worker,我们还需要启动Celery Beat来调度定时任务:celery -A celery beat --loglevel=info这时,check_email_validity任务会每小时自动执行一次。总结Celery是一个非常强大的Python任务队列工具,适合用于处理异步任务和定时任务。在开发现代应用时,Celery帮助我们将耗时任务从主流程中分离出来,从而提升系统性能和响应速度。通过简单的配置和几行代码,你就能在项目中轻松集成Celery,实现异步执行任务和定时任务调度。无论你是开发Web应用、后台任务还是周期性调度任务,Celery都能成为你最牛逼的得力助手!
2025年03月13日
14 阅读
0 评论
0 点赞
2025-01-14
python网络编程:ZeroMQ实战指南
想写分布式应用却被Socket编程折腾得够呛?今天给大家介绍一个超强的网络通信库:ZeroMQ(也叫ØMQ)。它不仅简单易用,性能还贼棒!让我们一起来玩转这个神器!1. 为什么选ZeroMQ?传统Socket编程就像用电报机发消息:又慢又容易出错。而ZeroMQ就像用微信聊天:简单、快速、可靠。它能帮你:轻松实现进程间通信搞定分布式系统处理高并发场景2. 快速入门先安装ZeroMQ的Python绑定:pip install pyzmq2.1 最简单的例子:请求-响应模式服务端代码(server.py):import zmq import time context = zmq.Context() socket = context.socket(zmq.REP) socket.bind("tcp://*:5555") print("服务器已启动,等待客户端连接...") while True: # 等待客户端请求 message = socket.recv_string() print(f"收到请求: {message}") # 模拟处理时间 time.sleep(1) # 发送响应 response = f"收到你的消息:{message}" socket.send_string(response)客户端代码(client.py):import zmq context = zmq.Context() socket = context.socket(zmq.REQ) socket.connect("tcp://localhost:5555") for i in range(3): message = f"Hello {i}" print(f"发送请求: {message}") # 发送请求 socket.send_string(message) # 等待响应 response = socket.recv_string() print(f"收到响应: {response}\n")是不是超简单?运行效果就像微信对话一样:发消息→等回复→收到回复。3. 进阶应用:发布-订阅模式这个模式特别适合做消息推送,比如气象站向多个用户推送天气数据。发布者代码(publisher.py):import zmq import time import random def weather_update(): return { "温度": round(random.uniform(20, 30), 1), "湿度": round(random.uniform(40, 70), 1) } context = zmq.Context() socket = context.socket(zmq.PUB) socket.bind("tcp://*:5556") print("气象站启动...") while True: data = weather_update() message = f"北京,{data['温度']},{data['湿度']}" socket.send_string(message) print(f"发布天气数据: {message}") time.sleep(2)订阅者代码(subscriber.py):import zmq context = zmq.Context() socket = context.socket(zmq.SUB) socket.connect("tcp://localhost:5556") socket.setsockopt_string(zmq.SUBSCRIBE, "北京") print("天气监测站已连接,等待数据...") while True: message = socket.recv_string() city, temp, humidity = message.split(',') print(f"\n{city}实时天气:") print(f"温度: {temp}°C") print(f"湿度: {humidity}%")4. 实战项目:简单的分布式计算系统来个实用的例子:主节点分发任务,工作节点处理任务。任务分发器(dispatcher.py):import zmq import random import time context = zmq.Context() sender = context.socket(zmq.PUSH) sender.bind("tcp://*:5557") collector = context.socket(zmq.PULL) collector.bind("tcp://*:5558") print("任务分发系统启动...") # 发送任务 tasks_sent = 0 while tasks_sent < 10: # 创建一个计算任务 data = random.randint(1, 100) sender.send_json({ "task_id": tasks_sent, "data": data }) print(f"发送任务 {tasks_sent}: 计算{data}的平方") tasks_sent += 1 # 接收结果 results = [] while len(results) < 10: result = collector.recv_json() results.append(result) print(f"收到任务{result['task_id']}的结果: {result['result']}") print("\n所有任务完成!")工作节点(worker.py):import zmq import time import random context = zmq.Context() receiver = context.socket(zmq.PULL) receiver.connect("tcp://localhost:5557") sender = context.socket(zmq.PUSH) sender.connect("tcp://localhost:5558") print("工作节点已启动,等待任务...") while True: task = receiver.recv_json() # 模拟计算过程 time.sleep(random.random()) # 计算结果 result = task['data'] ** 2 # 发送结果 sender.send_json({ "task_id": task['task_id'], "result": result }) print(f"完成任务{task['task_id']}: {task['data']}的平方是{result}")5. 实用技巧5.1 消息模式选择指南REQ/REP:适合客户端-服务器模型PUB/SUB:适合数据广播场景PUSH/PULL:适合任务分发场景5.2 错误处理import zmq import zmq.error try: socket.send_string("消息", zmq.NOBLOCK) except zmq.error.Again: print("发送缓冲区已满,消息未发送")5.3 超时处理import zmq from zmq.error import Again poller = zmq.Poller() poller.register(socket, zmq.POLLIN) # 等待1秒 if poller.poll(1000): message = socket.recv_string() else: print("接收超时")6. 性能优化小贴士使用合适的消息大小:太小:网络开销占比大太大:延迟增加建议:1KB到1MB之间适当的缓冲区设置:socket.setsockopt(zmq.SNDHWM, 1000) # 发送缓冲区大小 socket.setsockopt(zmq.RCVHWM, 1000) # 接收缓冲区大小使用多线程提升性能:from threading import Thread def worker(): context = zmq.Context() socket = context.socket(zmq.REP) socket.connect("inproc://workers") # 处理逻辑... threads = [Thread(target=worker) for _ in range(3)] for thread in threads: thread.start()ZeroMQ就像是给程序装上了"微信":简单:几行代码搞定通信灵活:多种通信模式随意选可靠:自动重连、消息队列高效:性能甩开传统Socket几条街掌握了ZeroMQ,分布式系统不再是噩梦!
2025年01月14日
56 阅读
0 评论
0 点赞
2024-11-28
Idea安装codeium排坑(faild download language server)
1. 插件市场搜索安装codeium2. 重启后右下角有这个图标3. 重启IDEA之后,提示faild download language server4. 前往 github.com 下载对应版本的exe程序,版本与插件版本对应,比如上图中我插件的版本是 1.28.1 我就下载1.28.1的exe。5. 把下载好的 language_server_windows_x64.exe 放到插件目录下的一个很长字符串的目录下。这里有一个小技巧,如果你的插件目录不在这里,你可以用Everything软件全盘搜codeium 。最后一串英文,每个人的可能不一样C:\Users\用户名\AppData\Roaming\JetBrains\PyCharm2024.3\plugins\codeium\f740e5605570f4cf2ef1fdda47389168e7319c796. 重启IDEA后提示登录,登录之后便可使用
2024年11月28日
175 阅读
0 评论
0 点赞
2024-11-12
pycharm激活说明
2025年3月27日,评论区提到了另外的方法,在语言和区域设置中,把区域改为未指定就好了{dotted startColor="#ff6c6c" endColor="#1989fa"/}现在的激活会验证网址,暂时只想到代理的方式解决设置-->外观与行为-->系统设置-->HTTPdaili选择手动代理-->HTTP主机号:随便一个不存在的都行,127.0.0.1端口号:81(不存在就行)-不为以下项使用代理:downloads.marketplace.jetbrains.com, resources.jetbrains.com, hub.jetbrains.com, plugins.jetbrains.com, *.github.com, *.google.com, *.bing.com, registry.geoway.com, 172.16.*, 10.*, localhost, api.cognitive.microsofttranslator.com, *.microsoft.com
2024年11月12日
88 阅读
3 评论
0 点赞
2024-07-02
【Django】执行查询——查询JSONField
JSONField本篇的例子以下面这个模型为基础:from django.db import models class Dog(models.Model): name = models.CharField(max_length=200) data = models.JSONField(null=True) def __str__(self): return self.name 保存和查询None值在使用JSONField时,要注意空值的处理,注意区分SQL NULL和JSON null保存None值我们首先创建两个Dog实例from django.db.models import Value, JSONField Dog.objects.create(name="Max", data=None) # SQL NULL. # <Dog: Max> Dog.objects.create(name="Archie", data=Value(None, JSONField())) # JSON null. # <Dog: Archie>观察两个实例在数据库中的表现,可以发现SQL NULL和JSON null是不一样的。查询None值查询时,None值将被解释为JSON null,要查询SQL NULL,需要使用isnull:Dog.objects.filter(data=None) # <QuerySet [<Dog: Archie>]> Dog.objects.filter(data=Value(None, JSONField())) # <QuerySet [<Dog: Archie>]> Dog.objects.filter(data__isnull=True) # <QuerySet [<Dog: Max>]> Dog.objects.filter(data__isnull=False) # <QuerySet [<Dog: Archie>]> 对比可以发现,data=None在保存时存储为SQL NULL,查询时解释为JSON null根据键值查找根据json数据的键进行查找,用__连接,可以多个键连接一起,如果某个值是一个列表,要进行索引,就用整数代表该列表的索引。例子:Dog.objects.create( ... name="Rufus", ... data={ ... "breed": "labrador", ... "owner": { ... "name": "Bob", ... "other_pets": [ ... { ... "name": "Fishy", ... } ... ], ... }, ... }, ... ) # <Dog: Rufus> Dog.objects.create(name="Meg", data={"breed": "collie", "owner": None}) # <Dog: Meg> # __连接键进行查找 Dog.objects.filter(data__breed="collie") # <QuerySet [<Dog: Meg>]> # 多个键连接进行查找 Dog.objects.filter(data__owner__name="Bob") # <QuerySet [<Dog: Rufus>]> # 某个值是一个列表,要进行索引,就用整数代表该列表的索引 Dog.objects.filter(data__owner__other_pets__0__name="Fishy") # <QuerySet [<Dog: Rufus>]> # 查询缺少的键,使用isnull查找 Dog.objects.create(name="Shep", data={"breed": "collie"}) # <Dog: Shep> Dog.objects.filter(data__owner__isnull=True) # <QuerySet [<Dog: Shep>]> 包含与键查找containscontains 查询返回的对象是那些包含给定键值对的顶层字段的对象。Dog.objects.create(name="Rufus", data={"breed": "labrador", "owner": "Bob"}) # <Dog: Rufus> Dog.objects.create(name="Meg", data={"breed": "collie", "owner": "Bob"}) # <Dog: Meg> Dog.objects.create(name="Fred", data={}) # <Dog: Fred> Dog.objects.filter(data__contains={"owner": "Bob"}) # <QuerySet [<Dog: Rufus>, <Dog: Meg>]> Dog.objects.filter(data__contains={"breed": "collie"}) # <QuerySet [<Dog: Meg>]> contained_by这个可以理解为 contains 查询的反向。 要查询的对象满足这样的条件:其该字段对应的数据是传递的值的子集。Dog.objects.create(name="Rufus", data={"breed": "labrador", "owner": "Bob"}) # <Dog: Rufus> Dog.objects.create(name="Meg", data={"breed": "collie", "owner": "Bob"}) # <Dog: Meg> Dog.objects.create(name="Fred", data={}) # <Dog: Fred> Dog.objects.filter(data__contained_by={"breed": "collie", "owner": "Bob"}) # <QuerySet [<Dog: Meg>, <Dog: Fred>]> Dog.objects.filter(data__contained_by={"breed": "collie"}) # <QuerySet [<Dog: Fred>]> has_key返回数据顶层的键中有给定值的对象。Dog.objects.create(name="Rufus", data={"breed": "labrador"}) # <Dog: Rufus> Dog.objects.create(name="Meg", data={"breed": "collie", "owner": "Bob"}) # <Dog: Meg> Dog.objects.filter(data__has_key="owner") # <QuerySet [<Dog: Meg>]> has_keys返回数据顶层的键都包含在给定列表中的对象。Dog.objects.create(name="Rufus", data={"breed": "labrador"}) # <Dog: Rufus> Dog.objects.create(name="Meg", data={"breed": "collie", "owner": "Bob"}) # <Dog: Meg> Dog.objects.filter(data__has_keys=["breed", "owner"]) # <QuerySet [<Dog: Meg>]> has_any_keys返回数据顶层的键中有至少1个在给定列表中的对象,比has_keys条件更宽松。Dog.objects.create(name="Rufus", data={"breed": "labrador"}) # <Dog: Rufus> Dog.objects.create(name="Meg", data={"owner": "Bob"}) # <Dog: Meg> Dog.objects.filter(data__has_any_keys=["owner", "breed"]) # <QuerySet [<Dog: Rufus>, <Dog: Meg>]>
2024年07月02日
25 阅读
0 评论
0 点赞
2024-06-28
python关键字
在Python中,assert是一个关键字,用于编写断言(assertion)。断言是一种用于检查程序中的条件是否为真的方法,通常用于调试和测试目的。当使用assert时,程序会在断言条件为假的情况下引发AssertionError异常。assert的语法如下:assert condition, message其中,condition是要检查的条件,如果条件为假,将会引发AssertionError异常。message是可选的,用于在断言失败时指定错误消息。以下是一个简单的示例:def divide(x, y): assert y != 0, "除数不能为零" return x / y result = divide(6, 3) # 不会触发断言异常 print(result) result = divide(6, 0) # 会触发断言异常,抛出AssertionError: 除数不能为零在上面的示例中,assert y != 0, "除数不能为零"用于检查除数是否为零,如果为零则触发AssertionError异常,并且指定了错误消息"除数不能为零"。这有助于在程序中快速识别和调试问题,并提供有用的错误信息。需要注意的是,一般情况下,assert语句在生产环境中通常会被禁用,因为它们可能会暴露敏感信息或导致安全问题。然而,在开发和测试阶段,assert语句是一个非常有用的工具,可以帮助程序员快速发现和修复问题。
2024年06月28日
17 阅读
0 评论
0 点赞
2024-05-31
Django查询技巧
查询每个供货商的商品库存如果supplier_supplier.id是每个供应商的ID,并且在extra()方法中使用原始SQL查询来计算每个供应商的库存总和,可以按如下方式操作:from django.db.models import Sum # 假设您有名为Supplier的模型,它有一个名为supplier的外键字段指向supplier_supplier模型 # 获取每个供应商的库存总和 supplier_stock_sums = Supplier.objects.extra( select={'stock_sum': 'SELECT IFNULL(SUM(main_product.stock), 0) FROM main_product WHERE main_product.supplier_id = supplier_supplier.id AND main_product.status = 10'} ) # 打印每个供应商的库存总和 for supplier in supplier_stock_sums: print(f"供应商 {supplier.id} 的库存总和为 {supplier.stock_sum}")在上面的示例中,在extra()方法的select参数中使用了原始SQL查询,并在查询中引用了supplier_supplier.id来表示每个供应商的ID。这样就可以计算每个供应商的库存总和。
2024年05月31日
23 阅读
0 评论
0 点赞
2024-05-11
Flask项目的一些封装
封装传入的参数main.py文件from base.views import asapi from flask import Flask, request, jsonify from flask_cors import CORS from base.views import ApiException as ex app = Flask(__name__) # CORS(app, resources={r"/*": {"origins": ["http://localhost:63343"]}}) CORS(app, resources={r"/*": {"origins": "*"}}) def checker1(param): # print("checker", param) return True @app.route("/test", methods=['get', 'post']) @asapi(checker=checker1) def test(param): return param if __name__ == '__main__': # en = Encipher() # res = en.getDatas({"1": "123"}) # print(res) # print(en.encode_data(res)) app.run( host="0.0.0.0", port=6003, debug=False )main.py文件下,新建一个base目录,创建文件views.py,写入下面内容from flask import request, jsonify from lxml.html.clean import clean_html import inspect class SiteException(Exception): def __init__(self, message=None, code=None): self.message = message self.code = code def to_json(self): pass def __str__(self): # return jsonify({"code": self.code, "msg": self.message}) return self.message class ApiException(SiteException): def __init__(self, message, code=1001): self.message = message self.code = code def to_json(self): return {"code": self.code, "msg": self.message} # return self.message def raiseApiEx(e, c=1001): raise ApiException(e, c) def asapi(a=None, checker=None): def param_handler(func): def wrapper(*args, **kwargs): def __func(**kvargs): try: req = kvargs.get("param") if checker and not checker(req): raiseApiEx('检查不通过') params = req datas = {} for k, v in list(params.items()): if v == 'undefined': # 排除JS值空 params[k] = '' for k in list(params.keys()): v = params[k] if isinstance(v, str) and '<' in v and '>' in v: lv = v.lower() if 'script' in lv or 'cookie' in lv: params[k] = clean_html(v) # arginfos = getattr(func, 'arginfos') arginfos = inspect.signature(func).parameters for param_name, param_info in arginfos.items(): if param_name != "param" and param_name not in params: params[param_name] = param_info.default if param_name == "param": datas['param'] = params elif param_info.default == inspect.Parameter.empty and param_name not in params: raise raiseApiEx(f"{func.__name__}缺少参数:{param_name}") else: datas[param_name] = params.get(param_name, param_info.default) res = func(**datas) return apiReaspone(en.getDatas(res)) except ApiException as e: return jsonify(e.to_json()) param = dict(request.values.items()) setattr(__func, 'checker', checker) kwargs.setdefault("param", param) # 在这里可以对参数进行处理,比如验证、转换等操作 # 将参数传递给路由处理函数 return __func(**kwargs) return wrapper return param_handler def apiReaspone(data): return jsonify({"code": 200, "data": data})
2024年05月11日
27 阅读
0 评论
0 点赞
2024-03-13
pip使用教程
1. pip升级库# 查看当前电脑中所有可以升级的Python包 pip list --outdated # 安装pip-review包 pip install pip-review # 使用pip-review命令一次性更新所有过时的包 pip-review --local --interactive # 升级单个包 pip install --upgrade ** pip install --upgrade **
2024年03月13日
60 阅读
0 评论
0 点赞
2024-01-30
兰空图床本地上传文件,配合Typora使用
# -*- coding: utf-8 -*- # @Time : 2022-05-18 13:53 # @Author : GeTeShi # @File : test6.py import datetime import time import requests import redis import json import re import os import sys rdb = redis.StrictRedis(host='localhost', port=6379, decode_responses=True) NAME = "test" forever = 60 * 60 * 24 * 30 * 12 * 100 def get_cache(key, name=None): global NAME if name: NAME = name if rdb.hlen(name) and rdb.hexists(name, key): res = rdb.hget(NAME, key) try: res = json.loads(res) except json.decoder.JSONDecodeError: pass return res else: return False def get_cache_all_key(name): global NAME if name: NAME = name if rdb.hlen(name): return rdb.hkeys(name=name) def set_cache(key, content, timeout=60, name=None): global NAME if name: NAME = name if isinstance(content, dict): content = json.dumps(content) rdb.hset(NAME, key, content) rdb.expire(key, timeout) def del_cache(key, name=None): global NAME if name: NAME = name if rdb.hexists(name=name, key=key): rdb.hdel(name, key) def md5(s): import hashlib m2 = hashlib.md5() m2.update(s.encode("utf-8")) return m2.hexdigest() class image_upload(object): def __init__(self, email, password): self.URL = "https://你的图床的api" self.header = { "Accept": "application/json", "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/108.0.0.0 Safari/537.36 Edg/108.0.1462.54 ", } self.email = email self.password = password self.TOKEN_KEY = f"{self.email}_{md5(self.password)}_{datetime.datetime.strftime(datetime.datetime.now(), '%Y%m')}" self.sess = requests.Session() self.sess.get(url="https://你的图床的api") def get_requests(self, url, method, data=None, file=None): if not self.header.get('Authorization') and "tokens" not in url: header = self.header self.login() self.header.update(header) if method == "post": response = self.sess.post(url=url, data=data if data else "", headers=self.header) elif method == "get": response = self.sess.get(url=url, params=data if data else "", headers=self.header) elif method == "upload": response = self.sess.post(url=url, files=file, headers=self.header, data=data) else: response = self.sess.delete(url=url, headers=self.header) if response.status_code == 200: response.encoding = "utf-8" response = response.json() return response def get_token(self): self.header = {"Accept": "application/json"} token = self.get_requests(url=self.URL + "/tokens", method="post", data={ "email": self.email, "password": self.password }) if token: set_cache(self.TOKEN_KEY, token, timeout=60 * 60 * 24, name="upload-image") return token def upload(self, file): self.header.update({ "Accept": "multipart/form-data" }) filename = file.rsplit("\\", 1)[1].split(".")[0] filetype = file.rsplit("\\", 1)[1].split(".")[1] files = { "file": (f"{filename}.{filetype}", open(file, 'rb'), f"image/{filetype}"), } # 存储策略id,默认1 data = { "strategy_id": 1, } url = self.URL + "/upload" res = self.get_requests(url=url, method="upload", data=data, file=files) if res and res.get("status"): data = res.get('data') print(data.get('links').get('url')) return data.get('links').get('thumbnail_url') else: raise Exception(f"图片上传失败,失败原因:{res.get('message')}") def login(self): token = get_cache(self.TOKEN_KEY, name="upload-image") if (not token) or (not token.get('data') or not token.get('data').get('token')): token = self.get_token() # token = get_cache(self.TOKEN_KEY, name="upload-image") url = self.URL + "/profile" # print(token.get('data')) self.header.update({ "Authorization": "Bearer " + token.get('data').get('token'), "Accept": "application/json" }) self.get_requests(url=url, data="", method="get") def main(self): pass def clear(self): self.get_requests(url=self.URL + "/tokens", data="", method="delete") self.header = {} return "已清空tokens" def get_strategies(self): """获取存储策略""" res = self.get_requests(url=self.URL + "/strategies", method="get") print(res) if __name__ == '__main__': images = sys.argv[1:] for image in images: image_upload(email="你的用户名", password="你的密码").upload(file=image)
2024年01月30日
34 阅读
0 评论
0 点赞
2024-01-29
requestium库的使用
这个Python库把requests按在地上摩擦!在 Python 编程中,处理网络请求是一个常见的任务,特别是做爬虫采集数据。最受欢迎的是 Requests 和 Selenium。而Requestium 结合了这两个库优点的工具,它可以让你在一个统一的接口中使用 Requests 的简便性和 Selenium 的强大功能。1. 安装 Requestium在开始使用 Requestium 之前,你需要先将其安装到你的环境中。安装非常简单,可以通过 pip 命令完成:pip install requestium2. Requestium 的核心功能结合 Requests 和 Selenium: Requestium 将 Requests 库的简易性和 Selenium 库的交互功能结合在了一起。无缝切换: 它允许用户在需要时从 Requests 无缝切换到 Selenium,反之亦然。增强的 XPath 支持: Requestium 提供了对 XPath 的额外支持,使得在使用 Selenium 时可以更方便地定位元素。3. 使用 Requestium 发送请求Requestium 的使用方法与 Requests 类似。以下是一个基本示例,展示如何发送 GET 请求:# 创建一个 Session对象 from requestium import Session # 使用 Requestium 发送 GET 请求 s = Session(webdriver_path='chromedriver', browser='chrome', default_timeout=15) response = s.get('https://www.example.com') print(response.text)4. 结合 Selenium 和 RequestsRequestium 的真正强大之处在于它能够让你在需要时切换到 Selenium。以下是一个示例,展示如何在发送请求后使用 Selenium 处理 JavaScript:from requestium import Session s = Session(webdriver_path='chromedriver', browser='chrome', default_timeout=15) # 发送请求 s.get('https://www.example.com') # 使用 Selenium 处理页面 s.driver.get('https://www.example.com') # 使用 Selenium 定位元素 element = s.driver.find_element_by_xpath('//div[@class="example"]') print(element.text)5. 实战案例假设我们要抓取一个动态加载内容的网页。首先,我们使用 Requestium 发送请求,然后通过 Selenium 处理 JavaScript,最后提取所需数据。from requestium import Session s = Session(webdriver_path='chromedriver', browser='chrome', default_timeout=15) # 访问网页 s.get('https://www.dynamic-content-website.com') # 使用 Selenium 处理动态内容 s.driver.get('https://www.dynamic-content-website.com') # 提取数据 data = s.driver.find_element_by_xpath('//div[@id="dynamic-content"]').text print(data)
2024年01月29日
38 阅读
0 评论
0 点赞
2024-01-11
JupyterNoteBook安装
1. 安装# 安装 pip install jupyter # 安装中文包 pip install jupyterlab-language-pack-zh-CN # 代码自动补全 pip install jupyterlab-lsp pip install -U jedi-language-server启动jupyter notebook配置虚拟环境# 首先需要进入到虚拟环境里面 .\python-venv\main\Scripts\activate # 这里演示的虚拟环境名称叫main python -m ipykernel install --user --name=main
2024年01月11日
19 阅读
0 评论
0 点赞
2023-11-13
matrix搭建以及机器人配置
matrix的安装1. 创建配置文件sudo docker run -it --rm -v /tmp/data:/data -e SYNAPSE_SERVER_NAME=你的域名 -e SYNAPSE_REPORT_STATS=yes matrixdotorg/synapse:latest generate其中/tmp/data可以换,记得就行,这里用这个展示2. 到/tmp/data里面找到homeserver.yaml编辑 # Configuration file for Synapse. # # This is a YAML file: see [1] for a quick introduction. Note in particular # that *indentation is important*: all the elements of a list or dictionary # should have the same indentation. # # [1] https://docs.ansible.com/ansible/latest/reference_appendices/YAMLSyntax.html # # For more information on how to configure Synapse, including a complete accounting of # each option, go to docs/usage/configuration/config_documentation.md or # https://matrix-org.github.io/synapse/latest/usage/configuration/config_documentation.html server_name: "matrix域名" pid_file: /data/homeserver.pid listeners: - port: 8008 # 容器的端口号,不改 tls: false type: http x_forwarded: true resources: - names: [client, federation] compress: false # database: # name: sqlite3 # args: # database: /data/homeserver.db database: # 数据库设置,如果人少的话推荐上面的sqlite,pgdb太伤人了 name: psycopg2 args: user: matrix password: 24W53MDwk8TyBh7z database: matrix host: # 你懂的 cp_min: 5 cp_max: 10 # allow_unsafe_locale: true log_config: "/data/matrix域名.log.config" # 自动生成的,一般不用改 media_store_path: /data/media_store registration_shared_secret: "c:*RWamLStS9J,sQhm7=dwGpC2#DVKRtwCkCBvPwMJ&p" report_stats: true macaroon_secret_key: "g3@hjR7pMO2&enhNM*jKZq*l3=JS@d~Jl8J3ZvJ~smCkLD" form_secret: "I,kV-blS-oVG3h_c2kiQ&kH0P+=@V&pMedyu4~E-i;BzP" signing_key_path: "/data/matrix域名.signing.key" # 自动生成的,一般不用改 trusted_key_servers: - server_name: "matrix域名" # 开放注册 enable_registration: true # 注册不需要验证,没有配置邮箱的时候推荐这个不验证 enable_registration_without_verification: false # 注册需要验证邮箱 registrations_require_3pid: - email # 邮箱配置,这里用88邮箱演示 email: smtp_host: "smtp.88.com" smtp_port: 465 smtp_user: "发送的邮箱地址" smtp_pass: "密码,你懂的" force_tls: true require_transport_security: true enable_tls: true notif_from: "通知的邮箱地址" app_name: "聊天室" enable_notifs: true notif_for_new_users: true client_base_url: "https://element的域名" validation_token_lifetime: 15m invite_client_location: https://element的域名3. docker-compose 文件version: "3.3" services: synapse: image: "matrixdotorg/synapse:latest" container_name: "matrix_synapse" restart: unless-stopped ports: - 8008:8008 volumes: - "./data:/data" # 这个data需要上面的/tmp/data,我的这个文件放在/tmp里面,所以用了./data environment: VIRTUAL_HOST: "matrix域名" VIRTUAL_PORT: 8008 LETSENCRYPT_HOST: "matrix域名" SYNAPSE_SERVER_NAME: "matrix域名" SYNAPSE_REPORT_STATS: "yes" element-web: ports: - '80:80' volumes: - './data/config.json:/app/config.json' image: vectorim/element-web restart: unless-stopped搭建到这里就完了{lamp/}使用nio-bot安装这两个库pip nio-bot pip install nio-bot[e2ee,cli]实例文件import logging # from niobot import NioBot, Context import niobot from apscheduler.schedulers.asyncio import AsyncIOScheduler logging.basicConfig(level=logging.INFO) bot = niobot.NioBot( homeserver="https://test.test.test", # your homeserver user_id="@test:test.test.test", # the user ID to log in as (Fully qualified) device_id="nio-message", store_path=r"./store/", command_prefix="!", # the prefix to respond to (case sensitive, must be lowercase if below is True) case_insensitive=True, # messages will be lower()cased before being handled. This is recommended. owner_id="@user:test.test.test", # The user ID who owns this bot. Optional, but required for bot.is_owner(...). ignore_self=True ) # bot.mount_module("test.py") def schedule_auto_messages(): scheduler = AsyncIOScheduler() # 测试的定时任务 # scheduler.add_job(test, "cron", second="*/3", id="test", misfire_grace_time=180, args=[bot]) scheduler.start() @bot.on_event("ready") async def on_ready(sync_result: niobot.SyncResponse): print("Logged in!") schedule_auto_messages() @bot.command("ping") async def ping_command(ctx: niobot.Context): latency = ctx.latency # await ctx.reply("Pong!") await ctx.respond(f"Pong! `{latency:.2f}ms` latency.") # A command with arguments @bot.command(name="echo") async def echo_command(ctx: niobot.Context, message: str): print("这里的输出") await ctx.respond(message) # bot.run(password="password") # starts the bot with a password. If you already have a login token, see: bot.run(access_token="token") # starts the bot with a login token.获取token使用这个命令niocli get-access-token -U '@test:test.test.test' -D '设备id->device_id'验证设备https://github.com/poljar/matrix-nio/blob/main/examples/verify_with_emoji.py通过表情符号验证,申请token使用上面那个,获取token的方式,之后用这个验证,需要创建一个文件// credentials.json {"homeserver": "https://matrix.example.org", "user_id": "@test:matrix.example.org", "device_id": "niobot2", "access_token": "上面那个"}之后直接运行,多尝试几次,不成功就把store里面的文件删掉。重新获取token,先运行机器人的文件,再运行这个表情包验证文件!!
2023年11月13日
201 阅读
0 评论
0 点赞
2023-10-31
pyenv使用教程
[root@localhost ~]# cat /etc/redhat-release CentOS release 6.9 (Final) 系统默认是Python 2.6 版本 [root@localhost ~]# python -V Python 2.6.6 1) 安装依赖环境 [root@localhost ~]# yum -y install gcc zlib-devel bzip2-devel openssl-devel ncurses-devel sqlite-devel readline-devel tk-devel gdbm-devel db4-devel libpcap-devel xz-devel git 2) 安装pyenv包 pyenv可以通过多种方式安装,可以参考项目在github上的Installtion, 地址为: https://github.com/pyenv/pyenv-installer 推荐采用The automatic installer的方式安装,可以一键安装pyenv的所有插件。 [root@localhost ~]# curl -L https://github.com/pyenv/pyenv-installer/raw/master/bin/pyenv-installer | bash pyenv套件下插件: - pyenv-doctor - pyenv-installer - pyenv-update - pyenv-virtualenv - pyenv-which-ext ================================================================================== 温馨提示: 以上https://github.com/pyenv/pyenv-installer/raw/master/bin/pyenv-installer的访问内容 , 可以将内容粘出来放在服务器的一个shell脚本文件中, 然后执行该脚本用以安装pyenv 该脚本下载地址: https://pan.baidu.com/s/1wW9ylrmc4Q9wxu_i3-1wYA 提取密码: rhtj 执行脚本进行安装(执行前授予755权限) # chmod 755 pyenv-installer # /bin/bash pyenv-installer ================================================================================= 分析一下上面的pyenv-installer脚本,可以发现在centos上,其实它做了以下事情: git clone --depth 1"git://github.com/pyenv/pyenv.git" "${HOME}/.pyenv" git clone --depth 1"git://github.com/pyenv/pyenv-doctor.git" "${HOME}/.pyenv/plugins/pyenv-doctor" git clone --depth 1"git://github.com/pyenv/pyenv-installer.git" "${HOME}/.pyenv/plugins/pyenv-installer" git clone --depth 1"git://github.com/pyenv/pyenv-update.git" "${HOME}/.pyenv/plugins/pyenv-update" git clone --depth 1"git://github.com/pyenv/pyenv-virtualenv.git" "${HOME}/.pyenv/plugins/pyenv-virtualenv" git clone --depth 1"git://github.com/pyenv/pyenv-which-ext.git" "${HOME}/.pyenv/plugins/pyenv-which-ext" 上面安装完成后,还需要执行下面的命令,将pyenv安装到系统环境变量中。 [root@localhost ~]# ll -d /root/.pyenv drwxr-xr-x 11 root root 4096 Dec 17 10:48 /root/.pyenv 在~/.bash_profile文件底部添加下面三行内容, 让系统可以找到 pyenv 安装的 Python [root@localhost ~]# vim ~/.bash_profile export PATH="/root/.pyenv/bin:$PATH" eval "$(pyenv init -)" eval "$(pyenv virtualenv-init -)" # Ubuntu使用这个 echo 'export PYENV_ROOT="$HOME/.pyenv"' >> ~/.bashrc echo 'command -v pyenv >/dev/null || export PATH="$PYENV_ROOT/bin:$PATH"' >> ~/.bashrc echo 'eval "$(pyenv init -)"' >> ~/.bashrc 使上面配置生效 [root@localhost ~]# source ~/.bash_profile 查看pyenv安装情况 [root@localhost ~]# pyenv --version //或者"pyenv -v" pyenv 1.2.8 更新pyenv [root@localhost ~]# pyenv update 3) 卸载pyenv 先删除pyenv的安装目录, 这里即是/root/.pyenv [root@localhost ~]# rm -fr /root/.pyenv 接着删除~/.bash_profile里面配置的系统环境变量 [root@localhost ~]# vim ~/.bash_profile //删除下面三行 export PATH="/root/.pyenv/bin:$PATH" eval "$(pyenv init -)" eval "$(pyenv virtualenv-init -)" [root@localhost ~]# source ~/.bash_profile 这样pyenv就被卸载了, 卸载pyenv后, 当前终端shell里会出现"-bash: pyenv: command not found" 的提示信息, 不过不影响使用. 再打开其他的终端窗口, 就不会出现该提示信息. pyenv常用命令pyenv install --list # 列出可安装版本 pyenv install <version> # 安装对应版本 pyenv install -v <version> # 安装对应版本,若发生错误,可以显示详细的错误信息 pyenv versions # 显示当前使用的python版本 pyenv which python # 显示当前python安装路径 pyenv global <version> # 设置默认Python版本 pyenv local <version> # 当前路径创建一个.python-version, 以后进入这个目录自动切换为该版本 pyenv shell <version> # 当前shell的session中启用某版本,优先级高于global 及 local
2023年10月31日
129 阅读
0 评论
0 点赞
1
2