@

0. 太长不看系列,直接使用

1.2官网注册后拿到APISecret和APIKey,直接复制文章2.5demo代码,保存为real_time_audio_recognition.py,在命令行执行

python real_time_audio_recognition.py -client_secret=您的client_secret -client_id=您的client_id -file_path=test.wav --audio_format=wav --sample_rate=16000

使用中有任何问题,欢迎留言提问。

1. Python调用标贝科技语音识别websocket接口,实现语音转文字

1.1 环境准备:

Python 3

1.2 获取权限

标贝科技 https://ai.data-baker.com/#/index

填写邀请码fwwqgs,每日免费调用量还可以翻倍

在这里插入图片描述

在这里插入图片描述

1.2.1 登录

点击产品地址进行登录,支持短信、密码、微信三种方式登录。
在这里插入图片描述

1.2.2 创建新应用

登录后进入【首页概览】,各位开发者可以进行创建多个应用。包括一句话识别、长语音识别、录音文件识别;在线合成、离线合成、长文本合成。
在这里插入图片描述

1.2.3 选择服务

进入【已创建的应用】,左侧选择您需调用的AI技术服务,右侧展示对应服务页面概览(您可查询用量、管理套餐、购买服务量、自主获取授权、预警管理)。
在这里插入图片描述

1.2.4 获取Key&Secret

通过服务 / 授权管理,获取对应参数,进行开发配置(获取访问令牌token

在这里插入图片描述

拿到Key和Secret就可以正式使用啦!

2. 代码实现

2.1 获取access_token

在拿到Key和Secret后,我们还需要调用授权接口获取access_token,这个access_token有效时长是24小时。

# 获取access_token用于鉴权
def get_access_token(client_secret, client_id):
    grant_type = "client_credentials"
    url = "https://openapi.data-baker.com/oauth/2.0/token?grant_type={}&client_secret={}&client_id={}" \
        .format(grant_type, client_secret, client_id)

    try:
        response = requests.post(url)
        response.raise_for_status()
    except Exception as e:
        print(response.text)
        raise Exception
    else:
        access_token = json.loads(response.text).get(\'access_token\')
        return access_token

2.2 准备数据

需要根据接口要求设置参数,并且对音频数据进行分割

# 准备数据
def prepare_data(args, access_token):
    # 读取音频文件
    with open(args.file_path, \'rb\') as f:
        file = f.read()

    # 填写Header信息
    audio_format = args.audio_format
    sample_rate = args.sample_rate

    splited_data = [str(base64.b64encode(file[i:i + 5120]), encoding=\'utf-8\') for i in range(0, len(file), 5120)]
    asr_params = {"audio_format": audio_format, "sample_rate": int(sample_rate), "speech_type": 1}

    json_list = []
    for i in range(len(splited_data)):
        if i != len(splited_data) - 1:
            asr_params[\'req_idx\'] = i
        else:
            asr_params[\'req_idx\'] = -len(splited_data) + 1
        asr_params["audio_data"] = splited_data[i]
        data = {"access_token": access_token, "version": "1.0", "asr_params": asr_params}

        json_list.append(json.dumps(data))

    return json_list

2.3 配置接口参数

client_secret和client_id:在文章1.2的官网获取,必填

file_save_path:文件保存路径,必填

audio_format:音频类型,默认wav格式

sample_rate:采样率,默认16000Hz

# 获取命令行输入参数
def get_args():
    parser = argparse.ArgumentParser(description=\'ASR\')
    parser.add_argument(\'-client_secret\', type=str, required=True)
    parser.add_argument(\'-client_id\', type=str, required=True)
    parser.add_argument(\'-file_path\', type=str, required=True)
    parser.add_argument(\'--audio_format\', type=str, default=\'wav\')
    parser.add_argument(\'--sample_rate\', type=str, default=\'16000\')
    args = parser.parse_args()

    return args

2.4 建立websocket客户端

class Client:
    def __init__(self, data, uri):
        self.data = data
        self.uri = uri

    #建立连接
    def connect(self):
        ws_app = websocket.WebSocketApp(uri,
                                        on_open=self.on_open,
                                        on_message=self.on_message,
                                        on_error=self.on_error,
                                        on_close=self.on_close)
        ws_app.run_forever()

    # 建立连接后发送消息
    def on_open(self, ws):
        print("sending..")
        for i in range(len(self.data)):
            ws.send(self.data[i])

    # 接收消息
    def on_message(self, ws, message):
        code = json.loads(message).get("code")
        if code != 90000:
            # 打印接口错误
            print(message)
        if json.loads(message).get(\'end_flag\') == 1:
            print(json.loads(message).get(\'asr_text\'))

    # 打印错误
    def on_error(slef, ws, error):
        print("error: ", str(error))

    # 关闭连接
    def on_close(ws):
        print("client closed.")

2.5 完整demo

import argparse
import json
import base64
import requests
import websocket


class Client:
    def __init__(self, data, uri):
        self.data = data
        self.uri = uri

    #建立连接
    def connect(self):
        ws_app = websocket.WebSocketApp(uri,
                                        on_open=self.on_open,
                                        on_message=self.on_message,
                                        on_error=self.on_error,
                                        on_close=self.on_close)
        ws_app.run_forever()

    # 建立连接后发送消息
    def on_open(self, ws):
        print("sending..")
        for i in range(len(self.data)):
            ws.send(self.data[i])

    # 接收消息
    def on_message(self, ws, message):
        code = json.loads(message).get("code")
        if code != 90000:
            # 打印接口错误
            print(message)
        if json.loads(message).get(\'end_flag\') == 1:
            print(json.loads(message).get(\'asr_text\'))

    # 打印错误
    def on_error(slef, ws, error):
        print("error: ", str(error))

    # 关闭连接
    def on_close(ws):
        print("client closed.")


# 准备数据
def prepare_data(args, access_token):
    # 读取音频文件
    with open(args.file_path, \'rb\') as f:
        file = f.read()

    # 填写Header信息
    audio_format = args.audio_format
    sample_rate = args.sample_rate

    splited_data = [str(base64.b64encode(file[i:i + 5120]), encoding=\'utf-8\') for i in range(0, len(file), 5120)]
    asr_params = {"audio_format": audio_format, "sample_rate": int(sample_rate), "speech_type": 1}

    json_list = []
    for i in range(len(splited_data)):
        if i != len(splited_data) - 1:
            asr_params[\'req_idx\'] = i
        else:
            asr_params[\'req_idx\'] = -len(splited_data) + 1
        asr_params["audio_data"] = splited_data[i]
        data = {"access_token": access_token, "version": "1.0", "asr_params": asr_params}

        json_list.append(json.dumps(data))

    return json_list


# 获取命令行输入参数
def get_args():
    parser = argparse.ArgumentParser(description=\'ASR\')
    parser.add_argument(\'-client_secret\', type=str, required=True)
    parser.add_argument(\'-client_id\', type=str, required=True)
    parser.add_argument(\'-file_path\', type=str, required=True)
    parser.add_argument(\'--audio_format\', type=str, default=\'wav\')
    parser.add_argument(\'--sample_rate\', type=str, default=\'16000\')
    args = parser.parse_args()

    return args


# 获取access_token用于鉴权
def get_access_token(client_secret, client_id):
    grant_type = "client_credentials"
    url = "https://openapi.data-baker.com/oauth/2.0/token?grant_type={}&client_secret={}&client_id={}" \
        .format(grant_type, client_secret, client_id)

    try:
        response = requests.post(url)
        response.raise_for_status()
    except Exception as e:
        print(response.text)
        raise Exception
    else:
        access_token = json.loads(response.text).get(\'access_token\')
        return access_token


if __name__ == \'__main__\':
    try:
        args = get_args()

        # 获取access_token
        client_secret = args.client_secret
        client_id = args.client_id
        access_token = get_access_token(client_secret, client_id)

        # 准备数据
        data = prepare_data(args, access_token)

        uri = "wss://openapi.data-baker.com/asr/realtime"
        # 建立Websocket连接
        client = Client(data, uri)
        client.connect()
    except Exception as e:
        print(e)

2.5 执行

复制所有代码,确定音频为wav格式,采样率为16K,在命令行执行

python real_time_audio_recognition.py -client_secret=您的client_secret -client_id=您的client_id -file_path=test.wav --audio_format=wav --sample_rate=16000

填写邀请码fwwqgs,每日免费调用量还可以翻倍

在这里插入图片描述

在这里插入图片描述

版权声明:本文为DataBaker原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/DataBaker/p/14914455.html