0%

1
2
3
4
5
6
7
pip install pyinstaller -i https://mirrors.aliyun.com/pypi/simple/
pyinstaller -F gooeydemo[OpenSeleniumClient.py](py%2FOpenSeleniumClient.py).py -w 打包
# 基于 pyinstaller
pip install fbs -i https://mirrors.aliyun.com/pypi/simple/

fbs project

本文地址: https://github.com/maxzhao-it/blog/post/7e49a571/

1
pip install mysqlclient -i https://mirrors.aliyun.com/pypi/simple/

多线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70

import os
import re
import urllib.request

import MySQLdb
import m3u8
import requests
import threading

from dotenv import load_dotenv

from py import m3u8Merge

load_dotenv()
outPath = 'F:/资料文档/zztt/'

# 自动提交事务
def getConn():
return MySQLdb.connect(
host=os.getenv("db_pic_host"),
port=int(os.getenv("db_pic_port")),
user=os.getenv("db_pic_user"),
passwd=os.getenv("db_pic_password"),
db=os.getenv("db_pic_server"),
autocommit=True
)

def saveInfo(id, name, url, href):
sql = '''
insert into z(id,name,url,href) values("{}","{}","{}","{}")''' \
.format(id, name, url, href)
cursor = 1;
try:
connection = getConn()
cursor = connection.cursor()
cursor.execute(sql)
cursor.fetchall()
# connection.commit()
cursor.close()
return True
except Exception as e:
try:
cursor.close()
except Exception as e:
print('1')
print("添加数据库数据失败:", e, sql)
return False
def listNeedDownload():
sql = '''
select id,name,url from z where download = 2 limit 10
'''
try:
connection = getConn()
cursor = connection.cursor()
cursor.execute(sql)
data = cursor.fetchall()
# connection.commit()
result = []
for row in data:
result.append({
'id': row[0],
'name': row[1],
'url': row[2]
})
cursor.close()
return result
except Exception as e:
print("listNeedDownload失败:", e, sql)
return []

数据库连接池

1
2
pip install DBUtils -i https://mirrors.aliyun.com/pypi/simple/
pip install pymysql -i https://mirrors.aliyun.com/pypi/simple/
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import pymysql
from dbutils.pooled_db import PooledDB
from dotenv import load_dotenv
load_dotenv()
pool = PooledDB(
creator=pymysql,
maxconnections=20,
mincached=2,
maxcached=10,
blocking=True,
host=os.getenv("db_pic_host"),
port=int(os.getenv("db_pic_port")),
user=os.getenv("db_pic_user"),
passwd=os.getenv("db_pic_password"),
db=os.getenv("db_pic_server"),
charset='utf8mb4'
)
def a():
sql = '''
insert into img(id,name,tab_url,url) values("{}","{}","{}","{}")''' \
.format(id, name, tab_url, url)
cursor = 1
connection = pool.connection()
try:
cursor = connection.cursor()
cursor.execute(sql)
cursor.fetchall()
connection.commit()
close(cursor)
connection.close()
return True
except Exception as e:
close(cursor)
print("添加数据库数据失败:", e, sql)
connection.close()
return False

本文地址: https://github.com/maxzhao-it/blog/post/5fc222/

操作JSON

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import urllib.request
import urllib.parse

# 包含关系
flag = '.mp4' not in '1.mp4'
# 查找
'1.mp4'.find('.mp4')
# 长度
'1.mp4'.count('1')
# 长度
'1.mp4'.replace('.mp4', '')
'1.mp4'.endswith('.mp4')
downloadUrl = 'http:/test.com/1 2/123'
# 转换空格
downloadUrl = downloadUrl.replace(' ', '%20')
# 转义
urllib.parse.quote(downloadUrl)

本文地址: https://github.com/maxzhao-it/blog/post/4ad1a794/

安装

1
2
pip install ffmpy -i https://mirrors.aliyun.com/pypi/simple/
pip3 install opencv-python -i https://mirrors.aliyun.com/pypi/simple/
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import os
import uuid
from ffmpy import FFmpeg

# 视频裁剪
def cut_out_video(video_path: str, output_dir: str, start_pix: tuple, size: tuple):
ext = os.path.basename(video_path).strip().split('.')[-1]
if ext not in ['mp4', 'avi', 'flv']:
raise Exception('format error')
result = os.path.join(output_dir, '{}.{}'.format(uuid.uuid1().hex, ext))
ff = FFmpeg(inputs={video_path: None},
outputs={
result: '-vf crop={}:{}:{}:{} -y -threads 5 -preset ultrafast -strict -2'.format(size[0], size[1],
start_pix[0],
start_pix[1])})
print(ff.cmd)
ff.run()
return result


if __name__ == '__main__':
print(cut_out_video(r'C:\Users\huyi\Desktop\test2.mp4', r'C:\Users\huyi\Desktop', (0, 0), (512, 512)))
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import cv2

vc = cv2.VideoCapture("f:\\Videos\\4K Tokkit\\dbxbqm22\\7257911883674668289.mp4")
# 检查视频是否成功打开
if not vc.isOpened():
print("无法打开视频文件")
exit()
fps = vc.get(cv2.CAP_PROP_FPS)

c = 0
num = round(fps)
while True:
ret, frame = vc.read()
pic_path = 'f:\\Videos\\4K Tokkit\\test/'
if not ret:
break
if (c % round(fps) == 0):
cv2.imwrite(pic_path + '7257911883674668289_' + str(c) + '.jpg', frame)
c = c + 1
vc.release()
cv2.destroyAllWindows()

写入中文路径处理

cv2.imreadcv2.imwrite无法读写含中文的路径。

1
2
3
import cv2
image = cv2.imdecode(np.fromfile('中文图片.png'), cv2.IMREAD_UNCHANGED)
cv2.imencode('.png', image)[1].tofile('保存_中文图片.png')

本文地址: https://github.com/maxzhao-it/blog/post/8ddf92f3/

操作文件

1
pip install flask -i https://mirrors.aliyun.com/pypi/simple/

执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from flask import Flask,request
app = Flask(__name__)

@app.route('/api/testPost', methods=['Get'])
def testPost():
# 获取请求中的全部内容
param = request.args
# 内容必须转为str类型,否则会报错(因为打印方法里面出现了+符号)
print("打印所有的参数:" + str(param))
# 获取参数名为userName的请求参数数据值
userName = request.args.get('username')
# 获取参数名为userPassword的请求参数数据值
userPassword = request.args.get('password')
# 判断一下,如果账号姓名为lqq并且账号密码为123的时候显示登录成功,否则(参数缺失或参数错误)显示失败
if (userName == '1') & (userPassword == '1'):
return "登录成功"
else:
return "登录失败"
if __name__ == '__main__':
app.run(port=8888)

测试

本文地址: https://github.com/maxzhao-it/blog/post/9bf163e/

多线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import threading
import time
# 定义最大允许的线程数量
max_threads = 5
# 创建 Semaphore 对象
semaphore = threading.Semaphore(max_threads)
def asyncConcat(startPath, endPath, middlePath, middleAudioPath, outputPath, semaphore):
with semaphore:
thread_id = threading.current_thread().ident
print("Thread ID: ", threading.current_thread().ident)
concat(startPath, endPath, middlePath, middleAudioPath, outputPath)


if __name__ == '__main__':
# 创建线程
threads = []
for file in files:
count = count + 1
basename = os.path.basename(file)
if '.mp4' not in basename:
continue
if '已剪辑' in basename:
continue
if os.path.exists(outputPath + basename.replace('.', '-已剪辑.')):
print('视频已剪辑=', middleDir + basename)
continue
print('待剪辑视频=', middleDir + basename)
fileCount = fileCount + 1
# concat(startPath, endPath, middleDir + basename, middleAudioPath, outputPath)
thread = threading.Thread(target=asyncConcat, args=(
startPath, endPath, middleDir + basename, middleAudioPath, outputPath, semaphore))
threads.append(thread)
thread.start()
# 等待所有线程执行结束
for thread in threads:
thread.join(timeout=1)

获取线程id

1
2
3
import threading
thread_id = threading.current_thread().ident
print("Thread ID: ", thread_id)

设置等待时间

1
2
import threading
thread.join(timeout=1)

线程次 py3

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from concurrent.futures import ThreadPoolExecutor
def task(x, y):
time.sleep(1)
return x * y


if __name__ == '__main__':
with ThreadPoolExecutor(max_workers=10) as executor:
future = executor.submit(task, 1, 2)
print(future.result())
future = executor.map(task, [1, 2, 3], [1, 2, 4])
# 等待所有任务完成
executor.shutdown()
for r in future:
print(r)
with ThreadPoolExecutor(max_workers=10) as executor:
futures = [executor.submit(task, i, i) for i in range(5)]
print(all(f.result() for f in futures))
# 等待所有任务完成
executor.shutdown()
print(all(f.result() for f in futures))
for r in futures:
print(r.result())

本文地址: https://github.com/maxzhao-it/blog/post/f4423a86/

1
pip install requests m3u8 pycryptodome -i https://mirrors.aliyun.com/pypi/simple/

多线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import os

import subprocess

outPath = 'F:/资料文档/zztt/'
nameLength = len('f1bded9f280d7945fa5a1503a2669a66')


def merge_m3u8(m3u8Path):
# 拼接所有TS文件
tsList = os.listdir(m3u8Path)
if len(tsList) == 0:
return
mainName = tsList[0][0:nameLength]
cmd = ' '.join(f"file '{m3u8Path + '/' + mainName}{i}.ts'" for i in range(0, len(tsList)))
cmd = ''.join(f"{m3u8Path + '/' + mainName}{i}.ts\n" for i in range(0, len(tsList)))
with open(f"input.txt", "w", encoding='utf-8') as f:
f.write(cmd)
outFileName = os.path.basename(m3u8Path)

output_filename = m3u8Path + "/" + outFileName + '.mp4'

# 使用ffmpeg命令将所有TS文件合并
result = subprocess.call(f"ffmpeg -f concat -safe 0 -i input.txt -c copy {output_filename}.mp4", shell=True)
output = result.stdout.decode('utf-8')
# 删除临时文件
# for i in range(1, len(ts_urls) + 1):
# os.remove(f"{i}.ts")
# os.remove("input.txt")


if __name__ == '__main__':
merge_m3u8('F:/资料文档/zztt/《独家猛料》')

下载 ffmpeg

本文地址: https://github.com/maxzhao-it/blog/post/7b6a6783/