Commit efcd6148 authored by 翟艳秋(20软)'s avatar 翟艳秋(20软)

1. [add] 将日志文件打到本地;

2. [add] 可合成确定时间点的旁白; 3. [add] 针对检测和合成过程中发生的错误进行报错处理; 4. [modified] 将旁白检测部分的结果实时输出到文件中; 5. [modified] 改为直接使用cv2获取视频时长; 6. [modified] 修改调用paddlespeech的路径问题。
parent 5f39d7a7
......@@ -7,6 +7,7 @@ from paddlespeech.cli import ASRExecutor
from PaddlePaddle_DeepSpeech2.data_utils.audio_process import AudioInferProcess
from PaddlePaddle_DeepSpeech2.utils.predict import Predictor
from PaddlePaddle_DeepSpeech2.utils.audio_vad import crop_audio_vad
from detect_with_asr import write_to_sheet
import os
normal_speed = 4
......@@ -41,7 +42,7 @@ normal_speed = 4
# 使用paddle deepspeech进行语音识别
def predict_long_audio_with_paddle(wav_path, pre_time, state):
def predict_long_audio_with_paddle(wav_path, pre_time, book_name, sheet_name, state):
# 获取数据生成器,处理数据和获取字典需要
vocab_path = './PaddlePaddle_DeepSpeech2/dataset/zh_vocab.txt'
mean_std_path = './PaddlePaddle_DeepSpeech2/dataset/mean_std.npz'
......@@ -87,13 +88,16 @@ def predict_long_audio_with_paddle(wav_path, pre_time, state):
)
if text:
if i == 0 or (i > 0 and time_stamps[i][0] - last_time >= 1):
recommend_lens = int(time_stamps[i][0] * normal_speed) if i == 0 else int(
(time_stamps[i][0] - last_time) * normal_speed)
narratages.append(["", "", "", "插入旁白,推荐字数为%d" % recommend_lens])
narratages.append(
[round(time_stamps[i][0] + pre_time, 2), round(time_stamps[i][1] + pre_time, 2), text, ''])
recommend_lens = int((time_stamps[i][0] - last_time) * normal_speed)
# narratages.append(["", "", "", "插入旁白,推荐字数为%d" % recommend_lens])
write_to_sheet(book_name, sheet_name, ["", "", "", "插入旁白,推荐字数为%d" % recommend_lens])
# narratages.append([round(time_stamps[i][0] + pre_time, 2), round(time_stamps[i][1] + pre_time, 2),
# text, ''])
write_to_sheet(book_name, sheet_name,
[round(time_stamps[i][0] + pre_time, 2), round(time_stamps[i][1] + pre_time, 2), text, ''])
last_time = time_stamps[i][1]
print("第%d个分割音频 对应时间为%.2f-%.2f 识别结果: %s" % (i, time_stamps[i][0] + pre_time, time_stamps[i][1] + pre_time, text))
print(
"第%d个分割音频 对应时间为%.2f-%.2f 识别结果: %s" % (i, time_stamps[i][0] + pre_time, time_stamps[i][1] + pre_time, text))
state[0] = float((i + 1) / len(audios_path)) if state[0] is None or state[0] < 0.99 else 0.99
print("最终结果,消耗时间:%d, 识别结果: %s" % (round((time.time() - start) * 1000), texts))
......
......@@ -8,6 +8,48 @@ from openpyxl.styles import PatternFill, Alignment
from split_wav import *
def create_sheet(path, sheet_name, value):
"""
根据给定的表头,初始化表格,
:param path: str, 表格(book)的存储位置
:param sheet_name: str, 表(sheet)的名字
:param value: list, 表头内容为['起始时间','终止时间','字幕','建议','旁边解说脚本']
:return: None
"""
index = len(value)
workbook = openpyxl.Workbook()
sheet = workbook.active
sheet.title = sheet_name
# 将字幕对应的那一列扩宽一些
sheet.column_dimensions['C'].width = 50
sheet.column_dimensions['D'].width = 30
for i in range(0, index):
for j in range(0, len(value[i])):
sheet.cell(row=i + 1, column=j + 1, value=str(value[i][j]))
workbook.save(path)
def write_to_sheet(path, sheet_name, value):
"""
向已存在的表格中写入数据
:param path:
:param sheet_name:
:param value:
:return:
"""
index = len(value)
workbook = openpyxl.load_workbook(path)
sheet = workbook.get_sheet_by_name(sheet_name)
cur_row = sheet.max_row
for j in range(0, index):
sheet.cell(row=cur_row + 1, column=j + 1, value=str(value[j]))
if value[j] == '' or '插入旁白' in str(value[j]):
sheet.cell(row=cur_row + 1, column=j + 1).fill = PatternFill(fill_type='solid', fgColor='ffff00')
if j == 2:
sheet.cell(row=cur_row + 1, column=j + 1).alignment = Alignment(wrapText=True)
workbook.save(path)
def trans_to_mono(wav_path):
"""
将音频的通道数channel转换为1
......@@ -53,15 +95,16 @@ def detect_with_asr(video_path, book_path, start_time=0, end_time=-1, state=None
# audio_path = trans_to_mono(total_wav_path)
# xlsx中的表格名为“旁白插入位置建议”
if os.path.exists(book_path):
os.remove(book_path)
book_name_xlsx = book_path
sheet_name_xlsx = "旁白插入位置建议"
table_head = [["起始时间", "终止时间", "字幕", '建议', '解说脚本']]
create_sheet(book_name_xlsx, sheet_name_xlsx, table_head)
sys.path.append("./PaddlePaddle_DeepSpeech2")
sys.path.append("PaddlePaddle_DeepSpeech2")
from infer_path import predict_long_audio_with_paddle
table_head = [["起始时间", "终止时间", "字幕", '建议', '解说脚本']]
table_content = table_head + predict_long_audio_with_paddle(audio_path, start_time, state)
from detect_with_ocr import write_excel_xlsx
write_excel_xlsx(book_name_xlsx, sheet_name_xlsx, table_content)
predict_long_audio_with_paddle(audio_path, start_time, book_name_xlsx, sheet_name_xlsx, state)
# 删除中间文件
shutil.rmtree(tmp_root)
state[0] = 1
......
import time
import os
# import time
import cv2
import numpy as np
from paddleocr import PaddleOCR
import difflib
import openpyxl
from openpyxl.styles import PatternFill, Alignment
# from openpyxl.styles import PatternFill, Alignment
# from openpyxl import Workbook
import re
from detect_with_asr import create_sheet, write_to_sheet
# 字幕的上下边界
up_b, down_b = 0, 0
......@@ -138,15 +140,15 @@ def detect_subtitle(img):
bottom_position = None
if len(res) == 0:
return None
log = []
# log = []
possible_txt = []
for x in res:
rect, (txt, confidence) = x
font_size = rect[2][1] - rect[0][1]
mid = (rect[0][0] + rect[1][0]) / 2
gradient = np.arctan(abs((rect[1][1] - rect[0][1]) / (rect[1][0] - rect[0][0])))
log.append("文本:{},置信度:{},中心点:{},斜率:{},字体大小:{}".format(txt, confidence, mid / img.shape[1], gradient, font_size))
# 置信度>0.7 & 斜率<0.1 & 字幕偏移量<=25 & 字幕中心在画面宽的0.4-0.6之间
# log.append("文本:{},置信度:{},中心点:{},斜率:{},字体大小:{}".format(txt, confidence, mid / img.shape[1], gradient,
# font_size)) 置信度>0.7 & 斜率<0.1 & 字幕偏移量<=25 & 字幕中心在画面宽的0.4-0.6之间
if confidence > 0.7 and gradient < 0.1 and 0.4 < mid / img.shape[1] < 0.6 and \
abs(rect[0][1] - 30) + abs(img.shape[0] - rect[2][1] - 30) <= 25:
subTitle += txt
......@@ -161,12 +163,15 @@ def detect_subtitle(img):
return None
def process_video(video_path, begin, end, state):
def process_video(video_path, begin, end, book_path, sheet_name, state):
"""
处理视频,主要完成对字幕的捕捉以及根据字幕分析得出旁白可能位置的任务
:param video_path: 待处理视频的路径
:param begin: 电影的实际开始位置(秒)
:param end: 电影除演职表外的实际结束位置(秒)
:param book_path: 输出表格地址
:param sheet_name: 输出表格中的表名
:param state: 用于通信的状态关键字
:return:
"""
video = cv2.VideoCapture(video_path)
......@@ -203,9 +208,11 @@ def process_video(video_path, begin, end, state):
print('--------------------------------------------------')
recommend_lens = int(res[-1][0] * normal_speed) if len(res) == 1 else int(
(res[-1][0] - res[-2][1]) * normal_speed)
narratage_recommend.append(['', '', '', '插入旁白,推荐字数为%d' % recommend_lens])
# narratage_recommend.append(['', '', '', '插入旁白,推荐字数为%d' % recommend_lens])
write_to_sheet(book_path, sheet_name, ['', '', '', '插入旁白,推荐字数为%d' % recommend_lens])
print(start_time, end_time, lastSubTitle)
narratage_recommend.append([round(start_time, 2), round(end_time, 2), lastSubTitle, ''])
# narratage_recommend.append([round(start_time, 2), round(end_time, 2), lastSubTitle, ''])
write_to_sheet(book_path, sheet_name, [round(start_time, 2), round(end_time, 2), lastSubTitle, ''])
# 两句话连在一起,但是两句话不一样
elif lastSubTitle is not None and subTitle is not None:
if string_similar(lastSubTitle, subTitle) < 0.7:
......@@ -215,9 +222,11 @@ def process_video(video_path, begin, end, state):
print('--------------------------------------------------')
recommend_lens = int(res[-1][0] * normal_speed) if len(res) == 1 else int(
(res[-1][0] - res[-2][1]) * normal_speed)
narratage_recommend.append(['', '', '', '插入旁白,推荐字数为%d' % recommend_lens])
# narratage_recommend.append(['', '', '', '插入旁白,推荐字数为%d' % recommend_lens])
write_to_sheet(book_path, sheet_name, ['', '', '', '插入旁白,推荐字数为%d' % recommend_lens])
print(start_time, end_time, lastSubTitle)
narratage_recommend.append([round(start_time, 2), round(end_time, 2), lastSubTitle, ''])
# narratage_recommend.append([round(start_time, 2), round(end_time, 2), lastSubTitle, ''])
write_to_sheet(book_path, sheet_name, [round(start_time, 2), round(end_time, 2), lastSubTitle, ''])
start_time = end_time
else:
lastSubTitle = subTitle if len(subTitle) > len(lastSubTitle) else lastSubTitle
......@@ -229,35 +238,38 @@ def process_video(video_path, begin, end, state):
print('--------------------------------------------------')
recommend_lens = int(res[-1][0] * normal_speed) if len(res) == 1 else int(
(res[-1][0] - res[-2][1]) * normal_speed)
narratage_recommend.append(['', '', '', '插入旁白,推荐字数为%d' % recommend_lens])
# narratage_recommend.append(['', '', '', '插入旁白,推荐字数为%d' % recommend_lens])
write_to_sheet(book_path, sheet_name, ['', '', '', '插入旁白,推荐字数为%d' % recommend_lens])
break
return narratage_recommend
def write_excel_xlsx(path, sheet_name, value):
"""
将旁白推荐信息输出表格
:param path: 输出表格的存储路径
:param sheet_name:表格中的表名
:param value:输出到表格中的信息
:return:
"""
index = len(value)
workbook = openpyxl.Workbook()
sheet = workbook.active
sheet.title = sheet_name
# 将字幕对应的那一列扩宽一些
sheet.column_dimensions['C'].width = 50
sheet.column_dimensions['D'].width = 30
for i in range(0, index):
for j in range(0, len(value[i])):
sheet.cell(row=i + 1, column=j + 1, value=str(value[i][j])).alignment = Alignment(wrapText=True)
if value[i][j] == '' or '插入旁白' in str(value[i][j]) or value[i][j] == '翻译':
sheet.cell(row=i + 1, column=j + 1).fill = PatternFill(fill_type='solid', fgColor='ffff00')
workbook.save(path)
# def write_excel_xlsx(path, sheet_name, value):
# """
# 将旁白推荐信息输出表格
# :param path: 输出表格的存储路径
# :param sheet_name:表格中的表名
# :param value:输出到表格中的信息
# :return:
# """
# index = len(value)
# workbook = Workbook()
# sheet = workbook.active
# sheet.title = sheet_name
# # 将字幕对应的那一列扩宽一些
# sheet.column_dimensions['C'].width = 50
# sheet.column_dimensions['D'].width = 30
# for i in range(0, index):
# for j in range(0, len(value[i])):
# sheet.cell(row=i + 1, column=j + 1, value=str(value[i][j])).alignment = Alignment(wrapText=True)
# if value[i][j] == '' or '插入旁白' in str(value[i][j]) or value[i][j] == '翻译':
# sheet.cell(row=i + 1, column=j + 1).fill = PatternFill(fill_type='solid', fgColor='ffff00')
# workbook.save(path)
def detect_with_ocr(video_path, book_path, start_time, end_time, state):
if os.path.exists(book_path):
os.remove(book_path)
book_name_xlsx = book_path
sheet_name_xlsx = "旁白插入位置建议"
......@@ -267,14 +279,12 @@ def detect_with_ocr(video_path, book_path, start_time, end_time, state):
# 获取并构建输出信息
table_head = [["起始时间", "终止时间", "字幕", '建议', '解说脚本']]
table_content = table_head + process_video(video_path, start_time, end_time, state)
# 输出旁白位置推荐信息到表格
write_excel_xlsx(book_name_xlsx, sheet_name_xlsx, table_content)
state[0] = 1.00
create_sheet(book_name_xlsx, sheet_name_xlsx, table_head)
process_video(video_path, start_time, end_time, book_name_xlsx, sheet_name_xlsx, state)
if __name__ == '__main__':
video_path = "D:/heelo/hysxm_1.mp4"
book_path = '何以笙箫默.xlsx'
detect_with_ocr(video_path, book_path, 0, 300, [None])
pass
# video_path = "D:/heelo/hysxm_1.mp4"
# book_path = '何以笙箫默.xlsx'
# detect_with_ocr(video_path, book_path, 0, 300, [None])
# -*- coding:utf-8 -*-
import threading
import time
import traceback
from mttkinter import mtTkinter as tk
from tkinter import filedialog, ttk, messagebox
import sys
import io
import os
import ffmpeg
import datetime
from speech_synthesis import ss_and_export
import ctypes
import inspect
......@@ -20,6 +25,55 @@ window.iconbitmap("eagle_2.ico")
video_duration = ""
def create_detail_day():
daytime = datetime.datetime.now().strftime('day' + '%Y_%m_%d')
return daytime
def make_print_to_file(path='./'):
class Logger(object):
def __init__(self, filename="detect_with_ocr.log", path='./'):
if not os.path.exists(path):
os.mkdir(path)
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
self.terminal = sys.stdout
self.log = open(os.path.join(path, filename), "a", encoding='utf8')
print(path)
def write(self, message):
self.terminal.write(message)
self.log.write(message)
def flush(self):
pass
sys.stdout = Logger(create_detail_day() + '.log', path=path)
# 复写线程,用于解决主线程无法步骤子线程中异常的问题
class RunThread(threading.Thread): # The timer class is derived from the class threading.Thread
def __init__(self, funcName, name, args=()):
threading.Thread.__init__(self)
self._args = args
self._funcName = funcName
self._name = name
self.exitcode = 0
self.exception = None
self.exc_traceback = ''
def run(self): # Overwrite run() method, put what you want the thread do here
try:
self._run()
except Exception as e:
print(e)
self.exitcode = 1 # 如果线程异常退出,将该标志位设置为1,正常退出为0
self.exception = e
self.exc_traceback = ''.join(traceback.format_exception(*sys.exc_info())) # 在改成员变量中记录异常信息
def _run(self):
self._funcName(*self._args)
def open_video_file():
"""
打开文件
......@@ -33,34 +87,15 @@ def open_video_file():
inputFilePath.set(video_path)
# 获取视频的时长等信息,初始化开始结束时间
startTime.set("00:00:00")
info = ffmpeg.probe(video_path)
vs = next(c for c in info['streams'] if c['codec_type'] == 'video')
global video_duration
try:
duration = int(float(vs['duration']))
hours = int(duration / 3600)
minutes = int(duration / 60 - 60 * hours)
seconds = int(duration - 60 * minutes - 3600 * hours)
endTime.set("%02d:%02d:%02d" % (hours, minutes, seconds))
video_duration = endTime.get()
except KeyError:
flag = False
for k in vs['tags'].keys():
k_l = str.lower(k)
if 'duration' in k_l:
duration = vs['tags'][k].split(':')
endTime.set("%02d:%02d:%02d" % (int(duration[0]), int(duration[1]), float(duration[2])))
video_duration = endTime.get()
flag = True
if not flag:
video = cv2.VideoCapture(video_path)
fps = video.get(cv2.CAP_PROP_FPS)
duration = video.get(cv2.CAP_PROP_FRAME_COUNT) / fps
hours = int(duration / 3600)
minutes = int(duration / 60 - 60 * hours)
seconds = int(duration - 60 * minutes - 3600 * hours)
endTime.set("%02d:%02d:%02d" % (hours, minutes, seconds))
video_duration = endTime.get()
video = cv2.VideoCapture(video_path)
fps = video.get(cv2.CAP_PROP_FPS)
duration = video.get(cv2.CAP_PROP_FRAME_COUNT) / fps
hours = int(duration / 3600)
minutes = int(duration / 60 - 60 * hours)
seconds = int(duration - 60 * minutes - 3600 * hours)
endTime.set("%02d:%02d:%02d" % (hours, minutes, seconds))
video_duration = endTime.get()
def find_save_file():
......@@ -197,20 +232,45 @@ def start_detect():
processState.set("开始检测")
# 多线程同步进行检测和进度条更新
state = [None]
threads = [
threading.Thread(target=start_process, args=(progressbar_1, progress_1, state, 100000), name="startProgress1"),
threading.Thread(target=detect,
args=(video_path, start_time, end_time, book_path, state, hasSubtitle.get()),
name="detect")]
threads = []
t = RunThread(funcName=start_process, args=(progressbar_1, progress_1, state, 100000), name="startProgress1")
t.setDaemon(True)
threads.append(t)
t = RunThread(funcName=detect,
args=(video_path, start_time, end_time, book_path, state, hasSubtitle.get()),
name="detect")
t.setDaemon(True)
threads.append(t)
for t in threads:
t.start()
# 线程完成任务后结束线程
# 线程完成任务后结束线程,一旦有一个线程结束就判断是否是意外中断
while 1:
alive = True
for t in threads:
alive = alive and t.is_alive()
if not alive:
break
time.sleep(5)
for t in threads:
t.join()
# 将进度条的进度拉满到100%,并给出“任务已完成”的提示
progressbar_1['value'] = 100
progress_1['text'] = '100.0%'
if t.exitcode != 0:
print("Exception in", t.getName())
messagebox.showerror("错误", "运行出错,请联系开发者处理")
processState.set("任务中断")
progress_state = progressbar_1['value']
progressbar_1.stop()
progressbar_1['value'] = progress_state
stopDetection.config(state=tk.DISABLED)
startDetection.config(state=tk.ACTIVE)
return
# 若不是意外中断,则将进度条的进度拉满到100%,并给出“任务已完成”的提示
processState.set("任务已完成")
progressbar_1.stop()
progressbar_1['value'] = 100
progress_1['text'] = "100.00%"
# 检测完成后,将“停止检测”按钮设置为不可点击状态,”开始检测“按钮设置为可点击状态
stopDetection.config(state=tk.DISABLED)
startDetection.config(state=tk.ACTIVE)
......@@ -348,21 +408,43 @@ def start_synthesis():
# 多线程同时实现语音合成+字幕导出、进度条
state = [None]
threads = [
threading.Thread(target=start_process, args=(progressbar_2, progress_2, state, 100000), name="startProgress2"),
threading.Thread(target=ss_and_export,
args=(video_path, sheet_path, audio_dir, speed, caption_path, state), name="ssAndExport")]
threads = []
t = RunThread(funcName=ss_and_export,
args=(video_path, sheet_path, audio_dir, speed, caption_path, state), name="ssAndExport")
t.setDaemon(True)
threads.append(t)
t = RunThread(funcName=start_process, args=(progressbar_2, progress_2, state, 100000), name="startProgress2")
t.setDaemon(True)
threads.append(t)
for t in threads:
t.start()
# 查询线程是否有结束的,一旦一个结束,另一个也结束
while 1:
alive = True
for t in threads:
alive = alive and t.is_alive()
if not alive:
break
time.sleep(5)
for t in threads:
t.join()
processState_2.set("语音和字幕已导出完毕")
if t.exitcode != 0:
print("Exception in", t.getName())
messagebox.showerror("错误", "运行出错,请联系开发者处理")
processState.set("任务中断")
progress_state = progressbar_2['value']
progressbar_2.stop()
progressbar_2['value'] = progress_state
startSynthesis.config(state=tk.ACTIVE)
stopSynthesis.config(state=tk.DISABLED)
return
startSynthesis.config(state=tk.ACTIVE)
stopSynthesis.config(state=tk.DISABLED)
def stop_synthesis():
print(threading.enumerate())
for x in threading.enumerate():
if x.getName() in ["startSynthesis", "startProgress2", "ssAndExport"]:
_async_raise(x.ident, SystemExit)
......@@ -593,6 +675,8 @@ def on_closing():
window.destroy()
thread_it(make_print_to_file(os.path.join(os.getcwd(), 'log')), name="logging")
window.protocol("WM_DELETE_WINDOW", on_closing)
# 刷新显示
......
......@@ -3,7 +3,7 @@ import os
import argparse
import time
from azure.cognitiveservices.speech import AudioDataStream, SpeechConfig, SpeechSynthesizer
from azure.cognitiveservices.speech import AudioDataStream, SpeechConfig, SpeechSynthesizer, ResultReason
from azure.cognitiveservices.speech.audio import AudioOutputConfig
import openpyxl
......@@ -36,8 +36,18 @@ def speech_synthesis(text, output_file, speed):
audio_config = AudioOutputConfig(filename=audio_path)
synthesizer = SpeechSynthesizer(speech_config=speech_config, audio_config=audio_config)
result = synthesizer.speak_text(text)
print(result.reason)
synthesizer.speak_text(text)
while result.reason == ResultReason.Canceled:
cancellation_details = result.cancellation_details
print("取消的原因", cancellation_details.reason)
time.sleep(1)
synthesizer.stop_speaking()
del synthesizer
synthesizer = SpeechSynthesizer(speech_config=speech_config, audio_config=audio_config)
result = synthesizer.speak_text(text)
print(result.reason)
if float(speed) != 1.0:
change_speed(output_file, speed)
......@@ -100,13 +110,21 @@ def get_narratage_text(sheet_content, speed):
else:
# 如果旁白中有换行符,即分为n段,则按照换行符进行分割,并间隔0.5s
text_split = text.split('\n')
if subtitle[i] is None:
# 如果旁白有对应的时间戳(是这段大旁白里的特定位置)
if start_time[i] is not None and end_time[i] is not None:
cur_start = float(start_time[i])
cur_end = float(end_time[i])
elif subtitle[i] is None:
cur_start = float(end_time[i - 1]) + 0.1 if i > 0 else 0
cur_end = float(start_time[i + 1])
# 如果是最后一句旁白,后面没有字幕及时间戳了,就先把cur_end置为-1
cur_end = float(start_time[i + 1]) if i + 1 < len(start_time) else -1
else:
# 有字幕,可覆盖字幕
cur_start = float(start_time[i])
cur_end = float(end_time[i])
for x in text_split:
if len(x) == 0:
continue
cur_end = max(cur_end, cur_start + (len(x) / normal_speed + normal_interval) / speed)
narratage_text.append(x)
narratage_start_time.append(cur_start)
......@@ -208,7 +226,6 @@ def ss_and_export(video_path, sheet_path, output_dir, speed, caption_file, state
wav_path = os.path.join(root_path, '%.2f.wav' % start_timestamp[i])
narratage_paths.append(wav_path)
speech_synthesis(text, wav_path, speed)
time.sleep(2)
print("目前正在处理{}".format(wav_path))
if state is not None:
state[0] = float((i + 1) / len(narratages)) * 0.97
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment