// max_threads为 -1 时表示线程池中的线程数无限制并且线程由动态生成
// max_threads为正整数时,线程池就会预先创建max_threads个线程
g_thread_pool_new (GFunc func,
gpointer user_data,
gint max_threads,
gboolean exclusive,
GError **error)
{
GRealThreadPool *retval;
……………. //这些点代表一些省略的代码
retval = g_new (GRealThreadPool, 1);
retval->pool.func = func;
retval->pool.user_data = user_data;
retval->pool.exclusive = exclusive;
retval->queue = g_async_queue_new (); // 创建异步队列
retval->cond = NULL;
retval->max_threads = max_threads;
retval->num_threads = 0;
retval->running = TRUE;
…………….
if (retval->pool.exclusive)
{
g_async_queue_lock (retval->queue);
while (retval->num_threads < retval->max_threads)
{
GError *local_error = NULL;
g_thread_pool_start_thread (retval, &local_error);// 起动新的线程
…………….
}
g_async_queue_unlock (retval->queue);
}
return (GThreadPool*) retval;
}
g_thread_pool_start_thread (GRealThreadPool *pool,
GError **error)
{
gboolean success = FALSE;
if (pool->num_threads >= pool->max_threads && pool->max_threads != -1)
/* Enough threads are already running */
return;
…………….
if (!success)
{
GError *local_error = NULL;
/* No thread was found, we have to start a new one */
// 真正创建一个新的线程
g_thread_create (g_thread_pool_thread_proxy, pool, FALSE, &local_error);
……………….
}
pool->num_threads++;
}
g_thread_pool_thread_proxy (gpointer data)
{
GRealThreadPool *pool;
pool = data;
……………..
g_async_queue_lock (pool->queue);
while (TRUE)
{
gpointer task;
// 线程等待任务,也即等待数据,线程在等待就是处在线程池中的空闲线程
task = g_thread_pool_wait_for_new_task (pool);
// 如果线程被唤醒收到并数据就用此线程执行任务,否则继续循环等待
// 注意:当任务做完时,继续循环又会调用上面的g_thread_pool_wait_for_new_task
// 而进入等待状态,
if (task)
{
if (pool->running || !pool->immediate)
{
/* A task was received and the thread pool is active, so
* execute the function.
*/
g_async_queue_unlock (pool->queue);
pool->pool.func (task, pool->pool.user_data);
g_async_queue_lock (pool->queue);
}
}
else
{
………………
}
}
return NULL;
}
g_thread_pool_wait_for_new_task (GRealThreadPool *pool)
{
gpointer task = NULL;
if (pool->running || (!pool->immediate &&
g_async_queue_length_unlocked (pool->queue) > 0))
{
/* This thread pool is still active. */
if (pool->num_threads > pool->max_threads && pool->max_threads != -1)
{
…………..
}
else if (pool->pool.exclusive)
{
/* Exclusive threads stay attached to the pool. */
// 调用异步队列的pop接口进入等待状态,到此一个线程的创建过程就完成了
task = g_async_queue_pop_unlocked (pool->queue);
}
else
{
………….
}
}
else
{
…………
}
return task; }
|
|
来自: astrotycoon > 《thread》