分享

glib库线程池代码分析

 astrotycoon 2015-02-08
 
// max_threads为 -1 时表示线程池中的线程数无限制并且线程由动态生成
// max_threads为正整数时,线程池就会预先创建max_threads个线程
g_thread_pool_new (GFunc            func,
                 gpointer         user_data,
                 gint             max_threads,
                 gboolean         exclusive,
                 GError         **error)
{
 GRealThreadPool *retval;
    ……………. //这些点代表一些省略的代码
 retval = g_new (GRealThreadPool, 1);
 
 retval->pool.func = func;
 retval->pool.user_data = user_data;
 retval->pool.exclusive = exclusive;
 retval->queue = g_async_queue_new (); // 创建异步队列
 retval->cond = NULL;
 retval->max_threads = max_threads;
 retval->num_threads = 0;
 retval->running = TRUE;
    …………….
 if (retval->pool.exclusive)
 {
      g_async_queue_lock (retval->queue);
 
      while (retval->num_threads < retval->max_threads)
         {
            GError *local_error = NULL;
            g_thread_pool_start_thread (retval, &local_error);// 起动新的线程
            …………….
        }
      g_async_queue_unlock (retval->queue);
 }
 
 return (GThreadPool*) retval;
}
 
g_thread_pool_start_thread (GRealThreadPool *pool,
                         GError          **error)
{
 gboolean success = FALSE;
 
 if (pool->num_threads >= pool->max_threads && pool->max_threads != -1)
    /* Enough threads are already running */
    return;
 …………….
 if (!success)
 {
      GError *local_error = NULL;
      /* No thread was found, we have to start a new one */
      // 真正创建一个新的线程
      g_thread_create (g_thread_pool_thread_proxy, pool, FALSE, &local_error);
      ……………….
 }
 pool->num_threads++;
}
 
g_thread_pool_thread_proxy (gpointer data)
{
 GRealThreadPool *pool;
 
 pool = data;
 ……………..
 g_async_queue_lock (pool->queue);
 
 while (TRUE)
 {
      gpointer task;
 
      // 线程等待任务,也即等待数据,线程在等待就是处在线程池中的空闲线程
      task = g_thread_pool_wait_for_new_task (pool);
      // 如果线程被唤醒收到并数据就用此线程执行任务,否则继续循环等待
      // 注意:当任务做完时,继续循环又会调用上面的g_thread_pool_wait_for_new_task
      // 而进入等待状态,
if (task)
        {
            if (pool->running || !pool->immediate)
             {
               /* A task was received and the thread pool is active, so
               * execute the function.
               */
               g_async_queue_unlock (pool->queue);
               pool->pool.func (task, pool->pool.user_data);
               g_async_queue_lock (pool->queue);
           }
        }
      else
        {
            ………………
      }
 }
 return NULL;
}
 
g_thread_pool_wait_for_new_task (GRealThreadPool *pool)
{
 gpointer task = NULL;
 
 if (pool->running || (!pool->immediate &&
                     g_async_queue_length_unlocked (pool->queue) > 0))
 {
      /* This thread pool is still active. */
      if (pool->num_threads > pool->max_threads && pool->max_threads != -1)
        {
          …………..
        }
      else if (pool->pool.exclusive)
        {
          /* Exclusive threads stay attached to the pool. */
        // 调用异步队列的pop接口进入等待状态,到此一个线程的创建过程就完成了
          task = g_async_queue_pop_unlocked (pool->queue);
        }
      else
        {
          ………….
        }
 }
 else
 {
      …………
 }
 return task;
}

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多