YOLO813

如何快速的将视频/音频的声音转成文字?

    需求:需要将一些大小为1G左右(时长在2个小时以上)的视频的声音提取出来转成文字。如果不采用程序,人工的做法就是“开两部手机,一部手机打开讯飞APP(带有音频转文字功能),一部手机放着视频,对着录音”,着实麻烦。

 

    先说下程序处理的测试结果,1G大小的视频,时长为2:15:03,生成的文本为120KB大小,大约有41313个字节(这可真是太能讲了!),对比看原视频,字符的准确率几乎在90%以上(普通话基本全部识别正确,影响准确率的主要是方言和口头禅),一个两小时的视频处理成文档大约在5分钟左右。

---代码基础解析---

    先讲一下大致的操作逻辑,语音转文字主功能调用的是讯飞的语音转写API(Application Programming Interface),类似于之前写的文章利用百度API实现图片转动漫是一样的道理,其实一开始还是选择的是百度语音转写的API,但是它满足不了需求,因为其要求录音文件必须要上传到云端(即以网页url的形式发送给它),无法处理本地音频;第二其最大只接受100M的音频文件(具体格式可前往文档观看),而我要处理的视频即使压缩成MP3音频也有120M;而讯飞API,不仅接受本地音频文件的处理(新用户免费体验5个小时),而且还支持最大500M音频文件的上传,所以最后选择的是讯飞API(具体可在下方参阅中前往技术文档)。

    由于讯飞API只支持这些音频格式:wav/flac/opus/m4a/mp3,所以我们先将MP4视频转成MP3好了,利用python的moviepy模块来处理(记得使用pip进行下载),当然该模块还有很多其它功能,例如:对视频时长进行剪切,对多个视频在时长上进行拼接,改变视频分辨率,把图片列表变成视频等等不在这一次的描述当中。

from moviepy.editor import *
video = VideoFileClip(input_mp4) # 视频输入文件名
audio = video.audio
audio.write_audiofile(output_mp3)# 视频输出文件名

    接下来就是将我们输出的MP3音频文件output_mp3利用讯飞API转成文档,具体的方法由于只是单纯的应用,所以我们不用太了解底层的代码,讯飞API有相关的python demo,我们拿过来用就可以了,主要是注册讯飞API开放平台账号之后,我们选择创建新应用,应用名称随便写不重名就可以,创建应用之后,我们选择左侧语音识别->语音转写,滑动到最下面,领取免费5小时试用体验包,然后复制APPID和SecretKey后面的一串字符至python demo文件中,如下图:

 

讯飞开放平台:https://www.xfyun.cn/
百度语音转写文档:https://ai.baidu.com/ai-doc/SPEECH/ck5diijkt
moviepy文档:https://pypi.org/project/moviepy/

 

完整源码

# -*- coding: utf-8 -*-
# 非实时转写调用demo


import base64
import hashlib
import hmac
import json
import os
import time
import csv
import requests
import re


#要处理的MP4文件名!
input_mp4 = 'input.mp4'
output_mp3 = input_mp4.replace("mp4", "mp3")
#mp4 -> mp3
from moviepy.editor import *
video = VideoFileClip(input_mp4)
audio = video.audio
audio.write_audiofile(output_mp3)


outputCommaText = "has_comma.txt"
outputNoComma = "output.txt"  #输出的txt文件名


lfasr_host = 'http://raasr.xfyun.cn/api'


# 请求的接口名
api_prepare = '/prepare'
api_upload = '/upload'
api_merge = '/merge'
api_get_progress = '/getProgress'
api_get_result = '/getResult'
# 文件分片大小10M
file_piece_sice = 10485760


# ——————————————————转写可配置参数————————————————
# 参数可在官网界面(https://doc.xfyun.cn/rest_api/%E8%AF%AD%E9%9F%B3%E8%BD%AC%E5%86%99.html)查看,根据需求可自行在gene_params方法里添加修改
# 转写类型
lfasr_type = 0
# 是否开启分词
has_participle = 'false'
has_seperate = 'true'
# 多候选词个数
max_alternatives = 0
# 子用户标识
suid = ''



class SliceIdGenerator:
    """slice id生成器"""


    def __init__(self):
        self.__ch = 'aaaaaaaaa`'


    def getNextSliceId(self):
        ch = self.__ch
        j = len(ch) - 1
        while j >= 0:
            cj = ch[j]
            if cj != 'z':
                ch = ch[:j] + chr(ord(cj) + 1) + ch[j + 1:]
                break
            else:
                ch = ch[:j] + 'a' + ch[j + 1:]
                j = j - 1
        self.__ch = ch
        return self.__ch

class RequestApi(object):
    def __init__(self, appid, secret_key, upload_file_path):
        self.appid = your_appid
        self.secret_key = your_secret_key
        self.upload_file_path = "./"+ output_mp3
        self.contents_list = []
    # 根据不同的apiname生成不同的参数,本示例中未使用全部参数您可在官网(https://doc.xfyun.cn/rest_api/%E8%AF%AD%E9%9F%B3%E8%BD%AC%E5%86%99.html)查看后选择适合业务场景的进行更换
    def gene_params(self, apiname, taskid=None, slice_id=None):
        appid = self.appid
        secret_key = self.secret_key
        upload_file_path = self.upload_file_path
        ts = str(int(time.time()))
        m2 = hashlib.md5()
        m2.update((appid + ts).encode('utf-8'))
        md5 = m2.hexdigest()
        md5 = bytes(md5, encoding='utf-8')
        # 以secret_key为key, 上面的md5为msg, 使用hashlib.sha1加密结果为signa
        signa = hmac.new(secret_key.encode('utf-8'), md5, hashlib.sha1).digest()
        signa = base64.b64encode(signa)
        signa = str(signa, 'utf-8')
        file_len = os.path.getsize(upload_file_path)
        file_name = os.path.basename(upload_file_path)
        param_dict = {}

        if apiname == api_prepare:
            # slice_num是指分片数量,如果您使用的音频都是较短音频也可以不分片,直接将slice_num指定为1即可
            slice_num = int(file_len / file_piece_sice) + (0 if (file_len % file_piece_sice == 0) else 1)
            param_dict['app_id'] = appid
            param_dict['signa'] = signa
            param_dict['ts'] = ts
            param_dict['file_len'] = str(file_len)
            param_dict['file_name'] = file_name
            param_dict['slice_num'] = str(slice_num)
        elif apiname == api_upload:
            param_dict['app_id'] = appid
            param_dict['signa'] = signa
            param_dict['ts'] = ts
            param_dict['task_id'] = taskid
            param_dict['slice_id'] = slice_id
        elif apiname == api_merge:
            param_dict['app_id'] = appid
            param_dict['signa'] = signa
            param_dict['ts'] = ts
            param_dict['task_id'] = taskid
            param_dict['file_name'] = file_name
        elif apiname == api_get_progress or apiname == api_get_result:
            param_dict['app_id'] = appid
            param_dict['signa'] = signa
            param_dict['ts'] = ts
            param_dict['task_id'] = taskid
        return param_dict

    # 请求和结果解析,结果中各个字段的含义可参考:https://doc.xfyun.cn/rest_api/%E8%AF%AD%E9%9F%B3%E8%BD%AC%E5%86%99.html
    def gene_request(self, apiname, data, files=None, headers=None):
        response = requests.post(lfasr_host + apiname, data=data, files=files, headers=headers)
        result = json.loads(response.text)
        if result["ok"] == 0:
            print("{} success:".format(apiname) + str(result))
            if apiname == "/getResult":
                print(result['data'])
                contents = result["data"].replace("[","").replace("]","") # #result=> dict  contents >str
                list_lines = contents.split("},{")#<list'>
                print("*"*20)
                for i in range(len(list_lines)):
                    onebest = list_lines[i].replace("{","").replace("}","")
                    try:
                        res = re.search('onebest\":\"(\S+?)\",\"', onebest)
                    except:
                        res = "None"
                    # print(f"onebest:{onebest}")  #xxxx:"bg":"80580","ed":"81340","onebest":"嗯","speaker":"0"
                    try:
                        print(f"res: {res.groups()}")
                        wanna_contents = "".join(res.groups()) #元组转字符串
                        wanna_contents = wanna_contents.strip()
                    except:
                        wanna_contents = res
                    if wanna_contents == ",":
                        wanna_contents = ""
                    self.contents_list.append(wanna_contents)
            return result
        else:
            print("{} error:".format(apiname) + str(result))
            exit(0)
            return result
    # 预处理
    def prepare_request(self):
        return self.gene_request(apiname=api_prepare,
                                 data=self.gene_params(api_prepare))
    # 上传
    def upload_request(self, taskid, upload_file_path):
        file_object = open(upload_file_path, 'rb')
        try:
            index = 1
            sig = SliceIdGenerator()
            while True:
                content = file_object.read(file_piece_sice)
                if not content or len(content) == 0:
                    break
                files = {
                    "filename": self.gene_params(api_upload).get("slice_id"),
                    "content": content
                }
                response = self.gene_request(api_upload,
                                             data=self.gene_params(api_upload, taskid=taskid,
                                                                   slice_id=sig.getNextSliceId()),
                                             files=files)
                if response.get('ok') != 0:
                    # 上传分片失败
                    print('upload slice fail, response: ' + str(response))
                    return False
                print('upload slice ' + str(index) + ' success')
                index += 1
        finally:
            'file index:' + str(file_object.tell())
            file_object.close()
        return True
    # 合并
    def merge_request(self, taskid):
        return self.gene_request(api_merge, data=self.gene_params(api_merge, taskid=taskid))


    # 获取进度
    def get_progress_request(self, taskid):
        return self.gene_request(api_get_progress, data=self.gene_params(api_get_progress, taskid=taskid))


    # 获取结果
    def get_result_request(self, taskid):
        return self.gene_request(api_get_result, data=self.gene_params(api_get_result, taskid=taskid))


    def all_api_request(self):
        # 1. 预处理
        pre_result = self.prepare_request()
        taskid = pre_result["data"]
        # 2 . 分片上传
        self.upload_request(taskid=taskid, upload_file_path=self.upload_file_path)
        # 3 . 文件合并
        self.merge_request(taskid=taskid)
        # 4 . 获取任务进度
        while True:
            # 每隔20秒获取一次任务进度
            progress = self.get_progress_request(taskid)
            progress_dic = progress
            if progress_dic['err_no'] != 0 and progress_dic['err_no'] != 26605:
                print('task error: ' + progress_dic['failed'])
                return
            else:
                data = progress_dic['data']
                task_status = json.loads(data)
                if task_status['status'] == 9:
                    print('task ' + taskid + ' finished')
                    break
                print('The task ' + taskid + ' is in processing, task status: ' + str(data))

            # 每次获取进度间隔20S
            time.sleep(5)
        # 5 . 获取结果
        self.get_result_request(taskid=taskid)
        with open(outputCommaText, "a", encoding="utf-8") as fp:
            writer = csv.writer(fp)
            writer.writerow(self.contents_list)
# 注意:如果出现requests模块报错:"NoneType" object has no attribute 'read', 请尝试将requests模块更新到2.20.0或以上版本(本demo测试版本为2.20.0)
# 输入讯飞开放平台的appid,secret_key和待转写的文件路径
if __name__ == '__main__':
    api = RequestApi(appid="", secret_key="", upload_file_path=r"")
    api.all_api_request()
    #解决逗号的问题
    with open(outputCommaText, "r" , encoding="utf-8") as fp:
        reader = csv.reader(fp)
        print(reader)
        for i, value in enumerate(reader):
            print(''.join(value))
            with open(outputNoComma, "a" , encoding="utf-8") as fps:
                fps.write(''.join(value))