程序员最近都爱上了这个网站  程序员们快来瞅瞅吧!  it98k网:it98k.com

本站消息

站长简介/公众号

  出租广告位,需要合作请联系站长

+关注
已关注

分类  

暂无分类

标签  

暂无标签

日期归档  

暂无数据

您可以使用 amazon-transcribe 0.6.2 python sdk 使用 Amazon Transcribe 医疗模型吗?

发布于2024-10-30 03:04     阅读(1049)     评论(0)     点赞(9)     收藏(2)


所以我想使用 Amazon Transcribe Medical 模型在 Python 中进行流式传输。是否有任何 SDK 可以让我实现流式传输?我也查看了 boto,但它只涉及批处理而不是流式传输。我只能找到这个,但不确定如何在这个 SDK 中使用 Amazon Transcribe medical。


import asyncio import sounddevice from amazon_transcribe.client import TranscribeStreamingClient

from amazon_transcribe.handlers import TranscriptResultStreamHandler from amazon_transcribe.model import TranscriptEvent import boto3 from botocore.credentials import Credentials


class MyEventHandler(TranscriptResultStreamHandler):
    async def handle_transcript_event(self, transcript_event: TranscriptEvent):
        results = transcript_event.transcript.results
        for result in results:
            for alt in result.alternatives:
                print(alt.transcript)



async def mic_stream():
    loop = asyncio.get_event_loop()
    input_queue = asyncio.Queue()

    def callback(indata, frame_count, time_info, status):
        loop.call_soon_threadsafe(input_queue.put_nowait, (bytes(indata), status))

    stream = sounddevice.RawInputStream(
        channels=1,
        samplerate=16000,
        callback=callback,
        blocksize=1024 * 2,
        dtype="int16",
    )
    with stream:
        while True:
            indata, status = await input_queue.get()
            yield indata, status


async def write_chunks(stream):
    async for chunk, status in mic_stream():
        await stream.input_stream.send_audio_event(audio_chunk=chunk)
    await stream.input_stream.end_stream()


async def basic_transcribe():
    region = "us-east-1"


    # Initialize the TranscribeStreamingClient with the async credentials provider
    client = TranscribeStreamingClient(
        region=region
          )

    # Start the transcription stream
    stream = await client.start_stream_transcription(
        language_code="en-US",
        media_sample_rate_hz=16000,
        media_encoding="pcm"
    )

    handler = MyEventHandler(stream.output_stream)
    await asyncio.gather(write_chunks(stream), handler.handle_events())


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(basic_transcribe())
    loop.close()

解决方案


暂无回答



所属网站分类: 技术文章 > 问答

作者:黑洞官方问答小能手

链接:https://www.pythonheidong.com/blog/article/2040072/31b9cb866a2d89ed8edc/

来源:python黑洞网

任何形式的转载都请注明出处,如有侵权 一经发现 必将追究其法律责任

9 0
收藏该文
已收藏

评论内容:(最多支持255个字符)