将音频流式传输到 DialogFlow 以进行实时意图识别

Streaming audio to DialogFlow for real-time intent recognition

我正在尝试将音频从(Pepper 机器人)麦克风流式传输到 DialogFlow。我有发送一段音频的工作代码。当我发送请求时,响应包含消息 None Exception iterating requests!。我以前在读取音频文件时看到过这个错误。但是,我看不出我现在传递的数据有什么问题。

processRemote 每当麦克风录音时调用。当将 sound_data[0].tostring() 写入 StringIO 并稍后以 4096 字节的块检索它时,该解决方案有效。

self.processing_queue 应该包含一些音频块,在处理新音频之前应该对其进行处理。

self.session_client.streaming_detect_intent(requests) 的响应中出现错误。

感谢任何想法。

    def processRemote(self, nbOfChannels, nbOfSamplesByChannel, timeStamp, inputBuffer):
        """audio stream callback method with simple silence detection"""
        sound_data_interlaced = np.fromstring(str(inputBuffer), dtype=np.int16)
        sound_data = np.reshape(sound_data_interlaced,
                                (nbOfChannels, nbOfSamplesByChannel), 'F')
        peak_value = np.max(sound_data)
        chunk = sound_data[0].tostring()
        self.processing_queue.append(chunk)
        if self.is_active:
            # detect sound
            if peak_value > 6000:
                print("Peak:", peak_value)
                if not self.recordingInProgress:
                    self.startRecording()

            # if recording is in progress we send directly to google
            try:
                if self.recordingInProgress:
                    print("preparing request proc remote")
                    requests = [dialogflow.types.StreamingDetectIntentRequest(input_audio=chunk)]
                    print("should send now")
                    responses = self.session_client.streaming_detect_intent(requests)
                    for response in responses:
                        print("checking response")
                        if len(response.fulfillment_text) != 0:
                            print("response not empty")
                            self.stopRecording(response)  # stop if we already know the intent
            except Exception as e:
                print(e)

    def startRecording(self):
        """init a in memory file object and save the last raw sound buffer to it."""
        # session path setup
        self.session_path = self.session_client.session_path(DIALOG_FLOW_GCP_PROJECT_ID, self.uuid)
        self.recordingInProgress = True
        requests = list()

        # set up streaming
        print("start streaming")
        q_input = dialogflow.types.QueryInput(audio_config=self.audio_config)
        req = dialogflow.types.StreamingDetectIntentRequest(
                        session=self.session_path, query_input=q_input)
        requests.append(req)

        # process pre-recorded audio
        print("work on stored audio")
        for chunk in self.processing_queue:
            print("appending chunk")
            try:
                requests.append(dialogflow.types.StreamingDetectIntentRequest(input_audio=chunk))
            except Exception as e:
                print(e)
        print("getting response")
        responses = self.session_client.streaming_detect_intent(requests)
        print("got response")
        print(responses)

        # iterate though responses from pre-recorded audio
        try:
            for response in responses:
                print("checking response")
                if len(response.fulfillment_text) != 0:
                    print("response not empty")
                    self.stopRecording(response)  # stop if we already know the intent
        except Exception as e:
            print(e)

        # otherwise continue listening
        print("start recording (live)")

    def stopRecording(self, query_result):
        """saves the recording to memory"""
        # stop recording
        self.recordingInProgress = False
        self.disable_google_speech(force=True)
        print("stopped recording")

        # process response
        action = query_result.action
        text = query_result.fulfillment_text.encode("utf-8")
        if (action is not None) or (text is not None):
            if len(text) != 0:
                self.speech.say(text)
            if len(action) != 0:
                parameters = query_result.parameters
                self.execute_action(action, parameters)

根据 source codesession_client.streaming_detect_intent 函数需要一个可迭代对象作为其参数。但是你目前正在给它一个请求列表。

不会工作:

requests = [dialogflow.types.StreamingDetectIntentRequest(input_audio=chunk)]
responses = self.session_client.streaming_detect_intent(requests) 
#None Exception iterating requests!

备选方案:

# wrap the list in an iterator
requests = [dialogflow.types.StreamingDetectIntentRequest(input_audio=chunk)]
responses = self.session_client.streaming_detect_intent(iter(requests))

# Note: The example in the source code calls the function like this
# but this gave me the same error
requests = [dialogflow.types.StreamingDetectIntentRequest(input_audio=chunk)]
for response in self.session_client.streaming_detect_intent(requests):
    # process response

使用生成器结构

虽然这修复了错误,但意图检测仍然无效。我相信更好的程序结构是使用生成器,如文档中所建议的那样。类似于(伪代码):

def dialogflow_mic_stream_generator():
    # open stream
    audio_stream = ...

    # send configuration request
    query_input = dialogflow.types.QueryInput(audio_config=audio_config)
    yield dialogflow.types.StreamingDetectIntentRequest(session=session_path,
            query_input=query_input)

    # output audio data from stream
    while audio_stream_is_active:
        chunk = audio_stream.read(chunk_size)
        yield dialogflow.types.StreamingDetectIntentRequest(input_audio=chunk)

requests = dialogflow_mic_stream_generator()
responses = session_client.streaming_detect_intent(requests)

for response in responses:
    # process response