import asyncio
import aiohttp
from aiortc import RTCPeerConnection, RTCSessionDescription
from aiortc.contrib.media import MediaPlayerasync def send_sdp(e_sdp):url = "https://xxxxx:xxxx/rtc/v1/whip/?app=live&stream=livestream"async with aiohttp.ClientSession() as session:async with session.post(url,data=e_sdp.sdp.encode(), # 将 SDP 字符串编码为字节headers={"Content-Type": "application/sdp","Content-Length": str(len(e_sdp.sdp))},ssl=False # 忽略 SSL 证书验证(不推荐在生产环境中使用)) as response:response_data = await response.text()print("对方的SDP:", response_data)return RTCSessionDescription(sdp=response_data, type='answer')async def send_candidate(candidate):if candidate:print("收集到的候选:", candidate) # 处理候选,例如打印候选信息async def run():pc = RTCPeerConnection()# 添加本地媒体player = MediaPlayer('D:\\ceshi\\guoqing.mp4')# player = MediaPlayer('D:\\ceshi\\guoqing.mp4')pc.addTrack(player.video)pc.addTrack(player.audio) # 确保使用 audio# 监听 ICE 候选pc.onicecandidate = lambda candidate: asyncio.create_task(send_candidate(candidate))# 创建 offeroffer = await pc.createOffer()print("本地生成的SDP:", offer.sdp) # 打印本地 SDPawait pc.setLocalDescription(offer)# 发送 offer 并接收 answeranswer = await send_sdp(offer)# 设置远程描述await pc.setRemoteDescription(answer)# 监听 ICE 连接状态变化def on_connection_state_change():print("连接状态:", pc.connectionState)print("ICE 连接状态:", pc.iceConnectionState)if pc.connectionState == "connected":print("连接已建立!")elif pc.connectionState == "disconnected":print("连接已断开!")elif pc.connectionState == "failed":print("连接失败!")elif pc.connectionState == "closed":print("连接已关闭!")pc.onconnectionstatechange = on_connection_state_change# 保持连接活跃while True:await asyncio.sleep(1)if __name__ == "__main__":asyncio.run(run())
上面为推流,下面为拉流
import asyncio
import waveimport aiohttp
import numpy as np
import pyaudio
from aiortc import RTCPeerConnection, RTCSessionDescription, MediaStreamTrackfrom webRTC import send_candidate# 自定义音频接收器
class AudioReceiver(MediaStreamTrack):kind = "audio"def __init__(self):super().__init__()self.sample_rate = 16000 # 设置音频采样率self.chunk_size = 1024self.channels = 1self.pyaudio = pyaudio.PyAudio()self.stream = self.pyaudio.open(format=pyaudio.paInt16,channels=self.channels,rate=self.sample_rate,input=True,frames_per_buffer=self.chunk_size,input_device_index=1 # 根据需要调整输入设备索引)self.framecount = 0 # 初始化时间戳# 创建 WAV 文件self.wav_file = wave.open("output.wav", "wb")self.wav_file.setnchannels(self.channels)self.wav_file.setsampwidth(2) # 16位音频self.wav_file.setframerate(self.sample_rate)async def next_timestamp(self):self.framecount += 1return self.framecount, 160 / self.sample_ratedef create_audio_frame(self, data, sample_rate, channels, pts, time_base):if not isinstance(data, bytes):raise ValueError("Data must be a bytes object")# 将一维字节数组转换为 NumPy 数组samples_per_channel = len(data) // (channels * 2) # 每个样本 2 字节samples = np.frombuffer(data, dtype=np.int16).reshape(samples_per_channel, channels)# 这里我们不再创建 AudioFrame,只需返回样本数据return samplesasync def recv(self):print("recv执行")audio_chunk = self.stream.read(self.chunk_size)# 获取帧时间戳pts, time_base = await self.next_timestamp()# 创建音频帧audio_frame = self.create_audio_frame(data=audio_chunk,sample_rate=self.sample_rate,channels=self.channels,pts=pts,time_base=time_base)# 写入 WAV 文件self.wav_file.writeframes(audio_chunk) # 写入原始字节数据return audio_chunk # 返回字节数据def onmute(self):print("音频流静音")def onunmute(self):print("音频流取消静音")asyncio.create_task(self.recv()) # 在这里调用 recv 方法def close(self):# 关闭流和文件self.stream.stop_stream()self.stream.close()self.pyaudio.terminate()self.wav_file.close()async def send_sdp(e_sdp):url = "https://xxxxx:xxxx/rtc/v1/whip-play/?app=live&stream=livestream01"async with aiohttp.ClientSession() as session:async with session.post(url,data=e_sdp.sdp.encode(), # 将 SDP 字符串编码为字节headers={"Content-Type": "application/sdp","Content-Length": str(len(e_sdp.sdp))},ssl=False # 忽略 SSL 证书验证(不推荐在生产环境中使用)) as response:response_data = await response.text()print("对方的SDP:成功")return RTCSessionDescription(sdp=response_data, type='answer')async def run():pc = RTCPeerConnection()# 添加音频接收器audio_receiver = AudioReceiver()pc.addTrack(audio_receiver)# 监听 ICE 候选pc.onicecandidate = lambda candidate: asyncio.create_task(send_candidate(candidate))# 创建 offeroffer = await pc.createOffer()await pc.setLocalDescription(offer)# 发送 offer 并接收 answeranswer = await send_sdp(offer)if answer is not None:# 确保当前信令状态允许设置远端描述if pc.signalingState == "have-local-offer":await pc.setRemoteDescription(answer)else:print(f"错误:当前信令状态为 {pc.signalingState},无法处理答案")else:print("错误:未能获取有效的 SDP 答复")# 监听连接状态变化pc.onconnectionstatechange = lambda: print("连接状态:", pc.connectionState)# 保持连接活跃while True:try:audio_data = await audio_receiver.recv()except Exception as e:print("接收音频数据时出错:", e)await asyncio.sleep(0.1)if __name__ == "__main__":asyncio.run(run())