Pythonでストリーミング音声認識をやってみた (Google Cloud Speech-to-Text API V1)

スポンサーリンク

GCP音声認識APIがあることを知識としては知っているけど、あんまり使ったことがなかったので使ってみました。 Speech-to-Text API V1の音声認識方法には「同期認識」「非同期認識」「ストリーミング認識」の3種類があります。 ネットで見かけるサンプルは同期認識が多いですが、音声対話に使いたいので結果が早く取得できる「ストリーミング認識」を使ってみました。


動作確認環境

Windows 11 バージョン22H2 (OSビルド 22621.1105)
python 3.10


プログラム

GCPの公式サンプルを読んだものの、イマイチどこでブロッキングしているのかわからなかったので、適当に解析して自分なりに再構成。

同期認識と比べるとプログラム長いな。。。


必要に応じてpythonパッケージをインストール

pip install  --upgrade google-cloud-speech event-channel pyaudio


GCPの認証関連の設定をした上で、次のプログラムを実行

プログラム

from multiprocessing import Value
from threading import Timer
from typing import Optional

import pyaudio
from event_channel.event_channel import EventChannel
from google.api_core.exceptions import OutOfRange
from google.cloud import speech
from six.moves import queue


class MicrophoneStream(object):
    def __init__(self, rate, chunk):
        self._rate = rate
        self._chunk = chunk

        # スレッドセーフのオーディオ格納バッファ。サイズ無制限
        self._buff = queue.Queue()
        self._audio_interface = None
        self._audio_stream = None
        self.closed = True

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.close()

    def open(self):
        self._audio_interface = pyaudio.PyAudio()
        self._audio_stream = self._audio_interface.open(
            format=pyaudio.paInt16,
            channels=1,
            rate=self._rate,
            input=True,
            frames_per_buffer=self._chunk,
            # コールバック
            stream_callback=self._fill_buffer,
        )

        self.closed = False

        return self

    def _stop_generator(self):
        self._buff.put(None)

    def close(self):
        if not self.closed:
            self._audio_stream.stop_stream()
            self._audio_stream.close()
            self.closed = True
            self._stop_generator()
            self._audio_interface.terminate()

    # noinspection PyUnusedLocal
    def _fill_buffer(self, in_data, frame_count, time_info, status_flags):
        # コールバックが呼ばれたときにバッファにデータを格納
        self._buff.put(in_data)
        return None, pyaudio.paContinue

    def generator(self):
        """ジェネレーターメソッド"""
        while not self.closed:
            # キューからの取り出し
            # データが終わったときにはNoneが積まれているので、Noneだった場合は戻る
            chunk = self._buff.get(block=True, timeout=None)
            if chunk is None:
                return
            data = [chunk]

            while True:
                try:
                    # データがあれば、さらに取得。
                    # block=Falseなので、データがなければqueue.Empty例外発生
                    chunk = self._buff.get(block=False)
                except queue.Empty:
                    # バッファが空になったらbreak
                    break
                # データが終わったときにはNoneが積まれているので、Noneだった場合は戻る
                if chunk is None:
                    return
                data.append(chunk)

            yield b"".join(data)


class VoiceRecognitionResult:
    def __init__(self, text: str, is_final: bool, is_timed_out: bool = False):
        self.text = text
        self.is_final = is_final
        self.is_timed_out = is_timed_out


 # noinspection PyMethodMayBeStatic
class GoogleSTT:
    SILENCE_TIMEOUT_SECOND = 3.0
    SAMPLING_RATE = 16000
    CHUNK_SIZE = int(SAMPLING_RATE / 10)  # 100ms
    EVENT_NAME_START = "start"
    EVENT_NAME_END = "end"
    EVENT_NAME_RESULT = "result"
    EVENT_NAME_RESPONSE = "response"

    def __init__(self, language_code: str = "ja-JP", single_utterance: bool = False,
                 speech_contexts: Optional[list[str]] = None):
        self._timed_out = Value('i', 0)
        self._transcript = ""
        self._language_code = language_code
        self._single_utterance = single_utterance
        self._stream = None
        self._event_channel = EventChannel()
        self._client = speech.SpeechClient()
        self._config = self._get_streaming_config(speech_contexts or [])

    def subscribe(self, event_name: str, func):
        self._event_channel.subscribe(event_name, func)

    def _get_streaming_config(self, speech_contexts: list[str]):
        # Add the hard-coded leaf-node commands
        contexts = [speech.SpeechContext(phrases=speech_contexts)] # noqa

        config = speech.RecognitionConfig(
            # 非圧縮の 16 ビット符号付きリトル エンディアン サンプル (リニアPCM)
            encoding=speech.RecognitionConfig.AudioEncoding.LINEAR16,  # noqa
            sample_rate_hertz=self.SAMPLING_RATE,  # noqa
            language_code=self._language_code,  # noqa
            # model="command_and_search",  # noqa
            speech_contexts=contexts, # noqa
        )

        return speech.StreamingRecognitionConfig(
            config=config,  # noqa
            # 中間結果を返すかどうか
            interim_results=True,  # noqa
            # 1回だけ音声認識するかどうか。音声コマンド向け。終話検出が早いような気がする。(未検証)
            single_utterance=self._single_utterance  # noqa
        )

    def _on_timeout(self):
        with self._timed_out.get_lock():
            self._timed_out.value = 1
        # TODO: 優先度低: streamと疎結合にしたい。(generatorの振る舞いも約束事が多い)
        if self._stream:
            self._stream.close()
            self._stream = None
        self._event_channel.publish(self.EVENT_NAME_RESULT, VoiceRecognitionResult(self._transcript, True, True))

    def _processing(self, response_generator):
        timer = None
        self._transcript = ""
        with self._timed_out.get_lock():
            self._timed_out.value = 0
        try:
            for response in response_generator:
                if self._timed_out.value == 1:
                    break

                self._event_channel.publish(self.EVENT_NAME_RESPONSE, response)
                if not response.results:
                    if self._single_utterance:
                        if response.speech_event_type.name == "END_OF_SINGLE_UTTERANCE":
                            # single_utterance=Trueの場合、次のような挙動になる
                            # ・END_OF_SINGLE_UTTERANCEを返したあと、ワンテンポ遅れてis_final=Trueで認識結果が返る
                            # ・END_OF_SINGLE_UTTERANCEを返すものの、その後無反応になることがある。(主に無音の場合)
                            #
                            # 無反応対策でタイマーを使って監視
                            timer = Timer(self.SILENCE_TIMEOUT_SECOND, self._on_timeout)
                            timer.start()
                    continue

                result = response.results[0]
                if not result.alternatives:
                    continue

                self._transcript = result.alternatives[0].transcript

                if result.is_final:
                    self._event_channel.publish(self.EVENT_NAME_RESULT, VoiceRecognitionResult(self._transcript, True))
                    if self._single_utterance:
                        break
                else:
                    # is_final=Falseで返ってきた結果は認識精度が低いため、表示以外の用途で使うのはオススメできない。
                    # 不鮮明な音だと is_final=False で何度か結果が返ってくるものの、その後何も反応がないことがある。
                    # TODO: 優先度低: ずっとサーバーから反応がないケースがあるので、タイマーで監視する?
                    self._event_channel.publish(self.EVENT_NAME_RESULT, VoiceRecognitionResult(self._transcript, False))
        finally:
            if timer:
                timer.cancel()

    def recognize(self, audio_stream):
        self._stream = audio_stream
        audio_generator = self._stream.generator()
        audio_generator_object = (
            speech.StreamingRecognizeRequest(audio_content=content) for content in audio_generator  # noqa
        )

        self._event_channel.publish(self.EVENT_NAME_START)
        # ストリーミング認識。何か返すべきものがあるまでブロッキング
        # メソッドの詳細は下記参照
        # https://cloud.google.com/python/docs/reference/speech/latest/google.cloud.speech_v1.services.speech.SpeechClient#google_cloud_speech_v1_services_speech_SpeechClient_streaming_recognize
        response_generator = self._client.streaming_recognize(self._config, audio_generator_object)  # noqa

        self._processing(response_generator)
        self._stream = None
        self._event_channel.publish(self.EVENT_NAME_END)


if __name__ == "__main__":
    def on_start():
        print("stt start")


    def on_end():
        print("stt end")


    def on_result(result: VoiceRecognitionResult):
        if result.is_timed_out:
            print('stt final(timeout):' + result.text)
        elif result.is_final:
            print('stt final:' + result.text)
        else:
            print('stt not final:' + result.text)


    def on_response(x):
        # # デバッグ出力
        # print("------------------")
        # print(x)
        pass


    speech_contexts = ["後退", "前進", "右旋回", "左旋回", "バック"]

    stt = GoogleSTT(single_utterance=True, speech_contexts=speech_contexts)
    # stt = GoogleSTT()
    stt.subscribe(stt.EVENT_NAME_START, on_start)
    stt.subscribe(stt.EVENT_NAME_END, on_end)
    stt.subscribe(stt.EVENT_NAME_RESULT, on_result)
    stt.subscribe(stt.EVENT_NAME_RESPONSE, on_response)
    while True:
        print("create stream")
        try:
            with MicrophoneStream(stt.SAMPLING_RATE, stt.CHUNK_SIZE) as stream:
                print("call recognize")
                stt.recognize(stream)
        except OutOfRange as e:
            # 以前試したときには305秒間発話がない場合に発生した
            print("catch OutOfRange exception")
            print(e)
            break
            # 音声認識を続けたい場合は、breakではなくcontinueにする
            # continue


メモ

  • 調べていたら、single_utteranceというパラメータを発見。
    • 発話終了の判定が早いような気がする(ちゃんと測定してない)ので、単発で音声認識したい場合は便利そう。
  • このプログラムだと、どうしてもマイクで拾った音のうち、音声認識できていないタイミングがあります。
    • そのあたりも考慮したプログラムを作成する場合は、こちらのサンプル参照。更にサンプルプログラムがややこしい。。。