GCPに音声認識APIがあることを知識としては知っているけど、あんまり使ったことがなかったので使ってみました。 Speech-to-Text API V1の音声認識方法には「同期認識」「非同期認識」「ストリーミング認識」の3種類があります。 ネットで見かけるサンプルは同期認識が多いですが、音声対話に使いたいので結果が早く取得できる「ストリーミング認識」を使ってみました。
動作確認環境
Windows 11 バージョン22H2 (OSビルド 22621.1105)
python 3.10
プログラム
GCPの公式サンプルを読んだものの、イマイチどこでブロッキングしているのかわからなかったので、適当に解析して自分なりに再構成。
同期認識と比べるとプログラム長いな。。。
必要に応じてpythonパッケージをインストール
pip install --upgrade google-cloud-speech event-channel pyaudio
GCPの認証関連の設定をした上で、次のプログラムを実行
プログラム
from multiprocessing import Value from threading import Timer from typing import Optional import pyaudio from event_channel.event_channel import EventChannel from google.api_core.exceptions import OutOfRange from google.cloud import speech from six.moves import queue class MicrophoneStream(object): def __init__(self, rate, chunk): self._rate = rate self._chunk = chunk # スレッドセーフのオーディオ格納バッファ。サイズ無制限 self._buff = queue.Queue() self._audio_interface = None self._audio_stream = None self.closed = True def __enter__(self): return self.open() def __exit__(self, exc_type, exc_val, exc_tb): self.close() def open(self): self._audio_interface = pyaudio.PyAudio() self._audio_stream = self._audio_interface.open( format=pyaudio.paInt16, channels=1, rate=self._rate, input=True, frames_per_buffer=self._chunk, # コールバック stream_callback=self._fill_buffer, ) self.closed = False return self def _stop_generator(self): self._buff.put(None) def close(self): if not self.closed: self._audio_stream.stop_stream() self._audio_stream.close() self.closed = True self._stop_generator() self._audio_interface.terminate() # noinspection PyUnusedLocal def _fill_buffer(self, in_data, frame_count, time_info, status_flags): # コールバックが呼ばれたときにバッファにデータを格納 self._buff.put(in_data) return None, pyaudio.paContinue def generator(self): """ジェネレーターメソッド""" while not self.closed: # キューからの取り出し # データが終わったときにはNoneが積まれているので、Noneだった場合は戻る chunk = self._buff.get(block=True, timeout=None) if chunk is None: return data = [chunk] while True: try: # データがあれば、さらに取得。 # block=Falseなので、データがなければqueue.Empty例外発生 chunk = self._buff.get(block=False) except queue.Empty: # バッファが空になったらbreak break # データが終わったときにはNoneが積まれているので、Noneだった場合は戻る if chunk is None: return data.append(chunk) yield b"".join(data) class VoiceRecognitionResult: def __init__(self, text: str, is_final: bool, is_timed_out: bool = False): self.text = text self.is_final = is_final self.is_timed_out = is_timed_out # noinspection PyMethodMayBeStatic class GoogleSTT: SILENCE_TIMEOUT_SECOND = 3.0 SAMPLING_RATE = 16000 CHUNK_SIZE = int(SAMPLING_RATE / 10) # 100ms EVENT_NAME_START = "start" EVENT_NAME_END = "end" EVENT_NAME_RESULT = "result" EVENT_NAME_RESPONSE = "response" def __init__(self, language_code: str = "ja-JP", single_utterance: bool = False, speech_contexts: Optional[list[str]] = None): self._timed_out = Value('i', 0) self._transcript = "" self._language_code = language_code self._single_utterance = single_utterance self._stream = None self._event_channel = EventChannel() self._client = speech.SpeechClient() self._config = self._get_streaming_config(speech_contexts or []) def subscribe(self, event_name: str, func): self._event_channel.subscribe(event_name, func) def _get_streaming_config(self, speech_contexts: list[str]): # Add the hard-coded leaf-node commands contexts = [speech.SpeechContext(phrases=speech_contexts)] # noqa config = speech.RecognitionConfig( # 非圧縮の 16 ビット符号付きリトル エンディアン サンプル (リニアPCM) encoding=speech.RecognitionConfig.AudioEncoding.LINEAR16, # noqa sample_rate_hertz=self.SAMPLING_RATE, # noqa language_code=self._language_code, # noqa # model="command_and_search", # noqa speech_contexts=contexts, # noqa ) return speech.StreamingRecognitionConfig( config=config, # noqa # 中間結果を返すかどうか interim_results=True, # noqa # 1回だけ音声認識するかどうか。音声コマンド向け。終話検出が早いような気がする。(未検証) single_utterance=self._single_utterance # noqa ) def _on_timeout(self): with self._timed_out.get_lock(): self._timed_out.value = 1 # TODO: 優先度低: streamと疎結合にしたい。(generatorの振る舞いも約束事が多い) if self._stream: self._stream.close() self._stream = None self._event_channel.publish(self.EVENT_NAME_RESULT, VoiceRecognitionResult(self._transcript, True, True)) def _processing(self, response_generator): timer = None self._transcript = "" with self._timed_out.get_lock(): self._timed_out.value = 0 try: for response in response_generator: if self._timed_out.value == 1: break self._event_channel.publish(self.EVENT_NAME_RESPONSE, response) if not response.results: if self._single_utterance: if response.speech_event_type.name == "END_OF_SINGLE_UTTERANCE": # single_utterance=Trueの場合、次のような挙動になる # ・END_OF_SINGLE_UTTERANCEを返したあと、ワンテンポ遅れてis_final=Trueで認識結果が返る # ・END_OF_SINGLE_UTTERANCEを返すものの、その後無反応になることがある。(主に無音の場合) # # 無反応対策でタイマーを使って監視 timer = Timer(self.SILENCE_TIMEOUT_SECOND, self._on_timeout) timer.start() continue result = response.results[0] if not result.alternatives: continue self._transcript = result.alternatives[0].transcript if result.is_final: self._event_channel.publish(self.EVENT_NAME_RESULT, VoiceRecognitionResult(self._transcript, True)) if self._single_utterance: break else: # is_final=Falseで返ってきた結果は認識精度が低いため、表示以外の用途で使うのはオススメできない。 # 不鮮明な音だと is_final=False で何度か結果が返ってくるものの、その後何も反応がないことがある。 # TODO: 優先度低: ずっとサーバーから反応がないケースがあるので、タイマーで監視する? self._event_channel.publish(self.EVENT_NAME_RESULT, VoiceRecognitionResult(self._transcript, False)) finally: if timer: timer.cancel() def recognize(self, audio_stream): self._stream = audio_stream audio_generator = self._stream.generator() audio_generator_object = ( speech.StreamingRecognizeRequest(audio_content=content) for content in audio_generator # noqa ) self._event_channel.publish(self.EVENT_NAME_START) # ストリーミング認識。何か返すべきものがあるまでブロッキング # メソッドの詳細は下記参照 # https://cloud.google.com/python/docs/reference/speech/latest/google.cloud.speech_v1.services.speech.SpeechClient#google_cloud_speech_v1_services_speech_SpeechClient_streaming_recognize response_generator = self._client.streaming_recognize(self._config, audio_generator_object) # noqa self._processing(response_generator) self._stream = None self._event_channel.publish(self.EVENT_NAME_END) if __name__ == "__main__": def on_start(): print("stt start") def on_end(): print("stt end") def on_result(result: VoiceRecognitionResult): if result.is_timed_out: print('stt final(timeout):' + result.text) elif result.is_final: print('stt final:' + result.text) else: print('stt not final:' + result.text) def on_response(x): # # デバッグ出力 # print("------------------") # print(x) pass speech_contexts = ["後退", "前進", "右旋回", "左旋回", "バック"] stt = GoogleSTT(single_utterance=True, speech_contexts=speech_contexts) # stt = GoogleSTT() stt.subscribe(stt.EVENT_NAME_START, on_start) stt.subscribe(stt.EVENT_NAME_END, on_end) stt.subscribe(stt.EVENT_NAME_RESULT, on_result) stt.subscribe(stt.EVENT_NAME_RESPONSE, on_response) while True: print("create stream") try: with MicrophoneStream(stt.SAMPLING_RATE, stt.CHUNK_SIZE) as stream: print("call recognize") stt.recognize(stream) except OutOfRange as e: # 以前試したときには305秒間発話がない場合に発生した print("catch OutOfRange exception") print(e) break # 音声認識を続けたい場合は、breakではなくcontinueにする # continue