- Java锁的逻辑(结合对象头和ObjectMonitor)
- 还在用饼状图?来瞧瞧这些炫酷的百分比可视化新图形(附代码实现)⛵
- 自动注册实体类到EntityFrameworkCore上下文,并适配ABP及ABPVNext
- 基于Sklearn机器学习代码实战
。
前置步骤和录屏是一样的,见我的上一篇文章 。
https://www.cnblogs.com/billin/p/17219558.html 。
bool obs_output_actual_start ( obs_output_t *output)在上文的这个函数中,如果是启用推流直播,函数指针会转到这里 。
//碧麟精简批注版
//rtmp推流开始
static bool rtmp_stream_start(void *data)
{
struct rtmp_stream *stream = data;
os_atomic_set_bool(&stream->connecting, true);
//开启推流线程
return pthread_create(&stream->connect_thread, NULL, connect_thread,
stream) == 0;
}
推流使用的是obs-outputs插件 。
上面函数就是新开一个线程,来执行 。
这个函数static void *connect_thread(void *data),线程传入参数是stream 。
先看一下rtmp_stream结构 。
//rtmp stream结构
struct rtmp_stream {
//obs_output
obs_output_t *output;
//packet信息
struct circlebuf packets;
bool sent_headers;
bool got_first_video;
int64_t start_dts_offset;
volatile bool connecting;
//连接线程地址
pthread_t connect_thread;
//发送线程地址
pthread_t send_thread;
os_sem_t *send_sem;
//推流地址,key
//比如b站,推流地址是rtmp://live-push.bilivideo.com/live-bvc/
struct dstr path, key;
//用户名,密码
struct dstr username, password;
//编码器名字:
//我用的是FMLE/3.0 (compatible; FMSc/1.0)
struct dstr encoder_name;
struct dstr bind_ip;
int64_t last_dts_usec;
uint64_t total_bytes_sent;
int dropped_frames;
pthread_mutex_t dbr_mutex;
struct circlebuf dbr_frames;
size_t dbr_data_size;
long audio_bitrate;
long dbr_est_bitrate;
long dbr_orig_bitrate;
long dbr_prev_bitrate;
long dbr_cur_bitrate;
long dbr_inc_bitrate;
bool dbr_enabled;
// RTMP结构对象
RTMP rtmp;
pthread_t socket_thread;
uint8_t *write_buf;
size_t write_buf_len;
size_t write_buf_size;
pthread_mutex_t write_buf_mutex;
};
rtmp_stream结构里保存了推流的所有关键信息,包括推流地址,key,流的编码器等参数 。
还保存了几个关键的指针,用于多线程中调度。有连接线程connect_thread,也有发送线程pthread_t send_thread,
。
//rtmp连接线程
static void *connect_thread(void *data)
{
struct rtmp_stream *stream = data;
int ret;
//设置线程名
os_set_thread_name("rtmp-stream: connect_thread");
//初始化
if (!silently_reconnecting(stream)) {
if (!init_connect(stream)) {
obs_output_signal_stop(stream->output,
OBS_OUTPUT_BAD_PATH);
os_atomic_set_bool(&stream->silent_reconnect, false);
return NULL;
}
} else {
struct encoder_packet packet;
peek_next_packet(stream, &packet);
stream->start_dts_offset = get_ms_time(&packet, packet.dts);
}
//连接
ret = try_connect(stream);
if (ret != OBS_OUTPUT_SUCCESS) {
obs_output_signal_stop(stream->output, ret);
info("Connection to %s failed: %d", stream->path.array, ret);
}
if (!stopping(stream))
pthread_detach(stream->connect_thread);
os_atomic_set_bool(&stream->silent_reconnect, false);
os_atomic_set_bool(&stream->connecting, false);
return NULL;
}
上面的方法是在单独的线程中执行的,主要是做了下面几件事:
1 设置线程名为“rtmp-stream :connect_thread” 。
2 初始化 init_connect 。
3 通过调用try_connect(stream)执行实际连接逻辑 。
//rtmp connect
static int try_connect(struct rtmp_stream *stream)
{
info("Connecting to RTMP URL %s...", stream->path.array);
//rtmp初始化
RTMP_Init(&stream->rtmp);
//设置URL
if (!RTMP_SetupURL(&stream->rtmp, stream->path.array))
return OBS_OUTPUT_BAD_PATH;
RTMP_EnableWrite(&stream->rtmp);
//设置流编码格式
dstr_copy(&stream->encoder_name, "FMLE/3.0 (compatible; FMSc/1.0)");
//设置用户名密码
set_rtmp_dstr(&stream->rtmp.Link.pubUser, &stream->username);
set_rtmp_dstr(&stream->rtmp.Link.pubPasswd, &stream->password);
set_rtmp_dstr(&stream->rtmp.Link.flashVer, &stream->encoder_name);
stream->rtmp.Link.swfUrl = stream->rtmp.Link.tcUrl;
stream->rtmp.Link.customConnectEncode = add_connect_data;
if (dstr_is_empty(&stream->bind_ip) ||
dstr_cmp(&stream->bind_ip, "default") == 0) {
memset(&stream->rtmp.m_bindIP, 0,
sizeof(stream->rtmp.m_bindIP));
} else {
bool success = netif_str_to_addr(&stream->rtmp.m_bindIP.addr,
&stream->rtmp.m_bindIP.addrLen,
stream->bind_ip.array);
if (success) {
int len = stream->rtmp.m_bindIP.addrLen;
bool ipv6 = len == sizeof(struct sockaddr_in6);
info("Binding to IPv%d", ipv6 ? 6 : 4);
}
}
RTMP_AddStream(&stream->rtmp, stream->key.array);
stream->rtmp.m_outChunkSize = 4096;
stream->rtmp.m_bSendChunkSizeInfo = true;
stream->rtmp.m_bUseNagle = true;
//连接
if (!RTMP_Connect(&stream->rtmp, NULL)) {
set_output_error(stream);
return OBS_OUTPUT_CONNECT_FAILED;
}
if (!RTMP_ConnectStream(&stream->rtmp, 0))
return OBS_OUTPUT_INVALID_STREAM;
info("Connection to %s successful", stream->path.array);
//到这里说明连接成功,开始初始化send逻辑,准备推流
return init_send(stream);
}
当连接成功,开始调用init_send函数,初始化send,准备推流 。
需要注意,send也是在单独的线 。
程处理的,因为传视频比较大,如果不单开线程,肯定会造成阻塞.
//初始化rtmp send逻辑
static int init_send(struct rtmp_stream *stream)
{
int ret;
obs_output_t *context = stream->output;
//创建send线程,执行send_thread方法,参数是stream
ret = pthread_create(&stream->send_thread, NULL, send_thread, stream);
if (stream->new_socket_loop) {
int one = 1;
#ifdef _WIN32
if (ioctlsocket(stream->rtmp.m_sb.sb_socket, FIONBIO, &one)) {
stream->rtmp.last_error_code = WSAGetLastError();
#else
if (ioctl(stream->rtmp.m_sb.sb_socket, FIONBIO, &one)) {
stream->rtmp.last_error_code = errno;
#endif
warn("Failed to set non-blocking socket");
return OBS_OUTPUT_ERROR;
}
os_event_reset(stream->send_thread_signaled_exit);
info("New socket loop enabled by user");
if (stream->low_latency_mode)
info("Low latency mode enabled by user");
if (stream->write_buf)
bfree(stream->write_buf);
int total_bitrate = 0;
obs_encoder_t *vencoder = obs_output_get_video_encoder(context);
if (vencoder) {
obs_data_t *params = obs_encoder_get_settings(vencoder);
if (params) {
int bitrate =
obs_data_get_int(params, "bitrate");
if (!bitrate) {
warn("Video encoder didn't return a "
"valid bitrate, new network "
"code may function poorly. "
"Low latency mode disabled.");
stream->low_latency_mode = false;
bitrate = 10000;
}
total_bitrate += bitrate;
obs_data_release(params);
}
}
obs_encoder_t *aencoder =
obs_output_get_audio_encoder(context, 0);
if (aencoder) {
obs_data_t *params = obs_encoder_get_settings(aencoder);
if (params) {
int bitrate =
obs_data_get_int(params, "bitrate");
if (!bitrate)
bitrate = 160;
total_bitrate += bitrate;
obs_data_release(params);
}
}
// to bytes/sec
int ideal_buffer_size = total_bitrate * 128;
if (ideal_buffer_size < 131072)
ideal_buffer_size = 131072;
stream->write_buf_size = ideal_buffer_size;
stream->write_buf = bmalloc(ideal_buffer_size);
#ifdef _WIN32
ret = pthread_create(&stream->socket_thread, NULL,
socket_thread_windows, stream);
#else
warn("New socket loop not supported on this platform");
return OBS_OUTPUT_ERROR;
#endif
if (ret != 0) {
RTMP_Close(&stream->rtmp);
warn("Failed to create socket thread");
return OBS_OUTPUT_ERROR;
}
stream->socket_thread_active = true;
stream->rtmp.m_bCustomSend = true;
stream->rtmp.m_customSendFunc = socket_queue_data;
stream->rtmp.m_customSendParam = stream;
}
os_atomic_set_bool(&stream->active, true);
if (!send_meta_data(stream)) {
warn("Disconnected while attempting to send metadata");
set_output_error(stream);
return OBS_OUTPUT_DISCONNECTED;
}
obs_encoder_t *aencoder = obs_output_get_audio_encoder(context, 1);
if (aencoder && !send_additional_meta_data(stream)) {
warn("Disconnected while attempting to send additional "
"metadata");
return OBS_OUTPUT_DISCONNECTED;
}
if (obs_output_get_audio_encoder(context, 2) != NULL) {
warn("Additional audio streams not supported");
return OBS_OUTPUT_DISCONNECTED;
}
if (!silently_reconnecting(stream))
obs_output_begin_data_capture(stream->output, 0);
return OBS_OUTPUT_SUCCESS;
}
。
核心推流线程,在单独的线程里完成推流逻辑 。
。
//推流核心线程
static void *send_thread(void *data)
{
struct rtmp_stream *stream = data;
//设置线程名
os_set_thread_name("rtmp-stream: send_thread");
//设定buffersize
#if defined(_WIN32)
// Despite MSDN claiming otherwise, send buffer auto tuning on
// Windows 7 doesn't seem to work very well.
if (get_win_ver_int() == 0x601) {
DWORD cur_sendbuf_size;
DWORD desired_sendbuf_size = 524288;
socklen_t int_size = sizeof(int);
if (!getsockopt(stream->rtmp.m_sb.sb_socket, SOL_SOCKET,
SO_SNDBUF, (char *)&cur_sendbuf_size,
&int_size) &&
cur_sendbuf_size < desired_sendbuf_size) {
setsockopt(stream->rtmp.m_sb.sb_socket, SOL_SOCKET,
SO_SNDBUF, (char *)&desired_sendbuf_size,
sizeof(desired_sendbuf_size));
}
}
log_sndbuf_size(stream);
#endif
//推流主循环
while (os_sem_wait(stream->send_sem) == 0) {
struct encoder_packet packet;
struct dbr_frame dbr_frame;
if (stopping(stream) && stream->stop_ts == 0) {
break;
}
if (!get_next_packet(stream, &packet))
continue;
if (stopping(stream)) {
if (can_shutdown_stream(stream, &packet)) {
obs_encoder_packet_release(&packet);
break;
}
}
if (!stream->sent_headers) {
if (!send_headers(stream)) {
os_atomic_set_bool(&stream->disconnected, true);
break;
}
}
/* silent reconnect signal received from server, reconnect on
* next keyframe */
if (silently_reconnecting(stream) &&
packet.type == OBS_ENCODER_VIDEO && packet.keyframe) {
reinsert_packet_at_front(stream, &packet);
break;
}
if (stream->dbr_enabled) {
dbr_frame.send_beg = os_gettime_ns();
dbr_frame.size = packet.size;
}
if (send_packet(stream, &packet, false, packet.track_idx) < 0) {
os_atomic_set_bool(&stream->disconnected, true);
break;
}
if (stream->dbr_enabled) {
dbr_frame.send_end = os_gettime_ns();
pthread_mutex_lock(&stream->dbr_mutex);
dbr_add_frame(stream, &dbr_frame);
pthread_mutex_unlock(&stream->dbr_mutex);
}
}
bool encode_error = os_atomic_load_bool(&stream->encode_error);
if (disconnected(stream)) {
info("Disconnected from %s", stream->path.array);
} else if (encode_error) {
info("Encoder error, disconnecting");
} else if (silently_reconnecting(stream)) {
info("Silent reconnect signal received from server");
} else {
info("User stopped the stream");
}
#if defined(_WIN32)
log_sndbuf_size(stream);
#endif
if (stream->new_socket_loop) {
os_event_signal(stream->send_thread_signaled_exit);
os_event_signal(stream->buffer_has_data_event);
pthread_join(stream->socket_thread, NULL);
stream->socket_thread_active = false;
stream->rtmp.m_bCustomSend = false;
}
set_output_error(stream);
if (silently_reconnecting(stream)) {
/* manually close the socket to prevent librtmp from sending
* unpublish / deletestream messages when we call RTMP_Close,
* since we want to re-use this stream when we reconnect */
RTMPSockBuf_Close(&stream->rtmp.m_sb);
stream->rtmp.m_sb.sb_socket = -1;
}
RTMP_Close(&stream->rtmp);
/* reset bitrate on stop */
if (stream->dbr_enabled) {
if (stream->dbr_cur_bitrate != stream->dbr_orig_bitrate) {
stream->dbr_cur_bitrate = stream->dbr_orig_bitrate;
dbr_set_bitrate(stream);
}
}
if (!stopping(stream)) {
pthread_detach(stream->send_thread);
if (!silently_reconnecting(stream))
obs_output_signal_stop(stream->output,
OBS_OUTPUT_DISCONNECTED);
} else if (encode_error) {
obs_output_signal_stop(stream->output, OBS_OUTPUT_ENCODE_ERROR);
} else {
obs_output_end_data_capture(stream->output);
}
if (!silently_reconnecting(stream)) {
free_packets(stream);
os_event_reset(stream->stop_event);
os_atomic_set_bool(&stream->active, false);
}
stream->sent_headers = false;
/* reset bitrate on stop */
if (stream->dbr_enabled) {
if (stream->dbr_cur_bitrate != stream->dbr_orig_bitrate) {
stream->dbr_cur_bitrate = stream->dbr_orig_bitrate;
dbr_set_bitrate(stream);
}
}
if (silently_reconnecting(stream)) {
rtmp_stream_start(stream);
}
return NULL;
}
。
最后此篇关于obs推流核心流程分析的文章就讲到这里了,如果你想了解更多关于obs推流核心流程分析的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
Linux 有许多跨(假设是 2 个)CPU 内核执行的线程和进程。我希望我的单线程 C/C++ 应用程序成为 CPU0 上的唯一线程。我如何“移动”所有其他线程以使用 CPU1? 我知道我可以使用
我有一个类似于下图的数据库表 Table with 2 columns (UserId and value) 我将传递 UserId 和 2 个字符串。例如:userId: 1, key1: h1,
我想在我的新项目中使用 ASP.NET Core,因为我听说它更快。但是,该项目将使用广泛的数据库访问功能,Entity Framework Core 不支持其中一些功能。我想知道,是否可以使用 En
我已经使用 EntityFrameworkCore.SqlServer 2.0 开发了 asp .net core wep api 2.0 应用程序。它是使用数据库优先方法开发的。当尝试使用 dbco
我已经阅读了很多关于这个主题的文章,但我仍然无法处理这个问题。对不起,如果它是重复的,无论如何! 所以基本上,我正在从头开始构建一个 Angular 应用程序,并且我想按照最佳约定来组织我的代码。我有
我对MPI还是陌生的,所以如果这是一个琐碎的问题,请原谅我。我有一个四核CPU。我想运行一个在单个内核上使用两个进程的OpenMPI C++程序。有什么办法吗?如果是这样,那又如何?我提到了this
下面是一个传播异常处理机制的类问题,所需的输出是异常。任何人都可以解释为什么输出是异常,在此先感谢。 Class Question { public void m1() throws Excep
我想打印每个获得 CPU 时间片的进程的 name 和 pid。可能吗? 最佳答案 对于单个流程,您可以在以下位置获取此信息: /proc//stat 第14和第15个字段分别代表在用户态和内核态花费
我想知道是否可以识别具有特定 thread-id 的线程使用的物理处理器(核心)? 例如,我有一个多线程应用程序,它有两 (2) 个线程(例如,thread-id = 10 和 thread-id =
我有一个需要身份验证的 Solr 核心。假设我有一个用户,密码为password。当我现在尝试在控制台中创建一个 Solr 核心时 bin\solr create -c test 我收到 HTTP 错
我想为与使用它的项目不同的类库中的第二个和后续数据库创建迁移。有皱纹。我永远不会知道连接字符串,直到用户登录并且我可以从目录数据库 (saas) 中获取它。 对于目录数据库,我使用了来自 this 的
我想为一种可以产生 GHC Core 的简单语言创建一个前端。然后我想获取这个输出并通过正常的 GHC 管道运行它。根据this page , 不能直接通过 ghc 命令实现。我想知道是否有任何方法可
阅读文档,我构建了 2 个使用 BLE 连接 2 个 iDevices 的应用程序。 一个设备是中央设备,另一个是外围设备。 Central在寻找Peripheral,当找到它时,探索它的服务和特性,
在我的网络应用程序中,我对长时间运行的任务进行了操作,我想在后台调用此任务。因此,根据文档 .net core 3.1 Queued background tasks我为此使用这样的代码: publi
Solr 1.4 Enterprise Search Server 建议对核心副本进行大量更新,然后将其换成主核心。我正在按照以下步骤操作: 创建准备核心:http://localhost:8983/
它们是否存在,如果存在,文档和代码在哪里? 最佳答案 它们位于 Git 的 test 目录中。 https://github.com/jquery/jquery/tree/master/test 关于
我有一个 Lisp (SBCL 1.0.40.0.debian) 应用程序 (myfitnessdata),它使用以下代码来处理命令行参数: (:use :common-lisp) (:export
Core是GHC的中间语言。阅读Core可以帮助你更好地了解程序的性能。有人向我索要有关阅读 Core 的文档或教程,但我找不到太多。 有哪些文档可用于阅读 GHC Core? 这是我迄今为止发现的内
我有一个核心 WebJob 部署到 Azure Web 应用程序中。我正在使用WebJobs version 3.0.6 . 我注意到,WebJob 代码不会立即拾取对连接字符串和应用程序设置的更改(
我有一个在内部构造和使用 SqlConnection 类的第三方库。我可以从该类继承,但它有大量重载,到目前为止我一直无法找到合适的重载。我想要的是将参数附加到正在使用的连接字符串。 有没有办法在 .
我是一名优秀的程序员,十分优秀!