The complete python code of the example is available on our github.
Below we discuss it in detail.
Non standard python packages required
We need the following packages which are not part of standard python:
- ffmpy - this is a convenience package that allows us to invoke ffmpeg utility from python.
ffmpeg is a Swiss army knife of audio processing. We need it because it is able to stream audio to an RTP endpoint. - websockets - one of the several available python packages for websocket connectivity. Other websockets packages are available, but this one seems to be popular and well documented. This package also requires asyncio.
Parts of the example code explained
Authorization
JWT = "<Your JWT HERE>"
headers = {"Authorization":JWT}
Voicegain API uses JWT for authentication. So to run the example you will need to use Voicegain Web Console to obtain a JWT - here is an article that describes how to do it.
Body of the request
body = {
"sessions": [
{
"asyncMode": "REAL-TIME",
"websocket": {
"adHoc": 'true',
"useSTOMP" : 'false',
"minimumDelay": 0
},
"content" : {
"incremental" : ['words'],
"full" : []
}
}
],
"audio": {
"source": { "stream": { "protocol": "RTP" } },
"format": "PCMU",
"channel" : "mono",
"rate": 8000,
"capture": 'true'
},
"settings": {
"asr": {
"noInputTimeout": 60000,
"completeTimeout": 0
}
}
}
In the request we:
- set the mode to REAL-TIME - this will give us the results with minimum delay
- specify websocket as the method to receive transcription results
- it is an adHoc i.e. not previously named websocket - this is standard for websockets that we want to keep private
- we do not use STOMP protocol - STOMP is meant for multiple-recipient use cases
- we do want to get the results without delay - but that also means that the results may contain more rewrites/edits
- we specify content that we want to receive
- we want to receive incremental words
- we do not care about the final full result
- audio source is specified as streamed RTP protocol
- audio format in our case is mono u-law encoded audio with 8kHz sample rate
- we set capture to true - this to help with debugging - we can retrieve the audio as it was sent to the speech recognizer
- the asr settings are minimal - we set no-input timeout to 60 seconds - this means that speech in audio has to start within first 60 seconds of starting audio streaming - if there is no speech audio then the recognizer will terminate with NOINPUT result
The request and synchronous response
init_response = requests.post("https://api.voicegain.ai/v1/asr/transcribe/async", json=body, headers=headers).json()
# retrieve values from response
# sessionId and capturedAudio are printed for debugging purposes
session_id = init_response["sessions"][0]["sessionId"]
ws_url = init_response["sessions"][0]["websocket"]["url"]
rtp_ip = init_response["audio"]["stream"]["ip"]
rtp_port = init_response["audio"]["stream"]["port"]
capturedAudio = init_response["audio"].get("capturedAudio")
print(" sessionId: {}".format(session_id))
print(" RTP ip: {}".format(rtp_ip))
print(" RTP port: {}".format(rtp_port))
if( not(capturedAudio is None)):
print("captured audio id: {}".format(capturedAudio))
print(" Websocket Url: {}".format(ws_url), flush=True)
We are making request to the API using HTTP POST to https://api.voicegain.ai/v1/asr/transcribe/async
This request will reserve recognizer resources and return parameters of the new recognition session. The parameters from the response that we are interested in are:
- sessionId - used mainly for checking logs if something goes wrong
- websocket.url - this is the URL of the websocket over which the messages with incremental recognition results are being sent.
- audio.stream.ip and port - this is where we will stream the audio using RTP - port will be in 5100-6000 range
- audio.capturedAudio - id of the captured audio as it is sent to the recognizer - this will be after decoding ulaw to L16 - handy if we need to debug recognition not working as expected
Function to concatenate individual recognition results
Individual recognition results are sent in JSON format, which is described here. The JSON messages may contain individual utterances or they may contain edits of previously sent utterances. The code below assembles the final recognition result using a stack - this makes it easy to do the deletes that are part of edit.
stack = []
# function to process JSON with incremental transcription results sent as messages over websocket
def process_ws_msg(wsMsg):
#print(wsMsg, flush=True)
try:
data = json.loads(wsMsg)
utter = data.get('utt')
if( utter is None ):
toDel = data.get('del')
if( toDel is None):
# unknown edit
print("EDIT->"+wsMsg, flush=True)
else:
# delete and edits
for i in range(toDel):
stack.pop()
edits = data.get('edit')
if(not (edits is None)):
for edit in edits:
utter = edit.get('utt')
stack.append(utter)
else:
# simple utterance
stack.append(utter)
print(' '.join(stack), flush=True)
except Exception as e:
print("ERROR: "+str(e), flush=True)
As you can see we handle two types of messages:
- one contains simple utterance ('utt')
- the other contains an edit starting with a delete ('del')
In case something goes wrong and we get some other type of edit message we will print it as "EDIT -> 'msg'
Function for ffmpeg reading audio and streaming
def stream_audio():
ff = FFmpeg(
inputs={'ENS_ending.wav': ['-re']},
outputs={'rtp://'+rtp_ip+':'+str(rtp_port) : ['-ar', '8000', '-f', 'mulaw', '-f', 'rtp']}
#outputs={'ENS_ending.ulaw' : ['-ar', '8000', '-f', 'mulaw']}
)
ff.cmd
##'ffmpeg -i input.ts output.mp4'
ff.run()
Here we:
- specify the input file - ENS_ending.wav
- tell ffmpeg to read it at about the real-time speed - option -re - this will give realistic streaming results
- specify output to RTP
- using the ip and port that we obtained from Voicegain API response - rtp://'+rtp_ip+':'+str(rtp_port)
- the format is set to mulaw and sample rate is set to 8000 Hz
- note - commented out is an output command in case you want to test if the correct mulaw audio is generated
- finally we rung ffmpeg by invoking ff.run
Async function to connect to websocket and receive messages
async def websocket_receive(uri):
async with websockets.connect(uri) as websocket:
try:
while True:
ws_msg = await websocket.recv()
process_ws_msg(ws_msg)
except Exception as e:
print(e)
Here we make a connection to the websocket on the uri received from the Voicegain API and we specify that the previously defined process_ws_msg function should be called for every received websocket message
Thread class for websocket processing
Because the main thread of the python code will be handling the ffmeg streaming, we need to start a separate thread to handle the connection to websocket and receiving the results.
class wsThread (threading.Thread):
def __init__(self, ws_uri):
threading.Thread.__init__(self)
self.ws_uri = ws_uri
def run(self):
print ("Starting "+str(datetime.datetime.now()), flush=True)
try:
asyncio.new_event_loop().run_until_complete(websocket_receive(self.ws_uri))
except Exception as e:
print(e)
print ("Exiting "+str(datetime.datetime.now()), flush=True)
The class is very simple. It only has two methods:
- constructor that sets the webscoket uri
- the run method that runs the websocket_receive function within asyncio event loop
Code that launches the websocket thread and ffmpeg streaming
Finally we have this code that launches the websocket thread, then starts streaming audio to Voicegain. At the end we are waiting for the websocket thread to finish and join the main thread.
threadWs = wsThread(ws_url)
threadWs.start()
# stream audio
stream_audio()
# wait for websocket thread to join
threadWs.join()
Comments
0 comments
Please sign in to leave a comment.