暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

技术夜校|PThread历险记

得物技术 2020-11-23
480


技术夜校第七期由casa大神带来《PThread历险记》的分享。

他是业内iOS架构体系各方案提出人:target-action组件化方案、离散型API调度方案、Virtual Record持久化方案;多个开源项目作者:CTMediator(3.3k)、RTNetworking&CTNetworking(1.9k)、CTPersistance(700+)、HandyAutoLayout(1.2k),他,

2002 写了C的第一句hello world

2007 拿了NOIP C语言 省一等奖

2014 开了博客:https://casatwy.com/

2016 出了好几个开源项目,涵盖iOS架构体系的主要部分

导语

casa的分享总结起来就两句话【小朋友才看文档,高手全靠猜!】&【数据 + 逻辑 = 功能】。1个小时就分享了这?对,你没看错,这就是精华。想了解这两句话里到底有何乾坤的小伙伴,一定要坚持往下翻哦,你一定会收获非凡。

一个线程创建函数:pthread_create

各种偶现bug无从下手?徒手写个高可用线程池?相信我,今天之后,你就能徒手写出一个线程池!因为所有语言或库实现多线程的基础都是PThread。

创建线程

要创建一个PThread线程,得先有一个能做事的线程

pthread_create(事情)

pthread_create(事情,参数)

pthread_create(线程,事情,参数)

pthread_create(线程,线程属性,事情,参数)

成功?pthread_create(线程线程属性事情参数)

Int pthread_create(pthread_t *threadconst pthread_attr_t *attrvoid*(*start_routine)(void *)void *arg)


pthread_create是pthread库中最复杂的函数,如果你能掌握它,剩下的都是小菜一碟。


线程的 8 个属性

其他属性操作系统都可以搞定,最需要关注的是【detach state】属性。detach state的意思就是,创建的线程是detach还是Join。detach意味着线程结束之后,相关资源就会被立刻回收;Join是在线程结束之后资源不会立刻释放,而是等待别的线程来join,当把自己的返回值交给来join的线程之后,自己就会被释放,如果一直没有线程来join,那这个线程就会一直存在,直到进程结束。

创建线程-源码展示

void *print_message_function( void *ptr )

{

     char *message;

     message = (char *) ptr;

     printf("%s \n", message);

 

main() {

     pthread_t thread1, thread2;

     char *message1 = "Thread 1";

     char *message2 = "Thread 2";

     int  iret1, iret2;

     

     iret1 = pthread_create( &thread1,

                              NULL,

                              print_message_function,

                             (void*) message1);

    

     iret2 = pthread_create( &thread2,

                              NULL,

                              print_message_function,

                             (void*) message2);


     pthread_join(thread1, NULL);

     pthread_join(thread2, NULL);

     printf("Thread 1 returns: %d\n",iret1);

     printf("Thread 2 returns: %d\n",iret2);

     exit(0);

}

创建线程只需要调pthread_create,线程创建完就会立即执行,执行完就结束。但是,线程结束后,还是有很多关键点需要关注。

退出线程

线程结束后需要关注:join exit / cancel / kill / return

Join

join   pthread_join(pthread_t thread, void **value_ptr)
父线程监听子线程的结束,并通过pthread_join函数获得子线程的返回值。

Join语句会阻塞线程,它真正的作用是,当线程有值需要返回时,可以通过join接受返回的值。


多线程Join,会有哪些场景?

  • A线程正在运行,BCDEF发起对A的Join

    Join调用完毕后,A线程仍在运行中

  • A线程正在运行,BCD成功完成对A的Join

    A线程结束,EF后续紧跟着发起Join

  • A线程正在运行,且运行结束

    此时BCDEF成功完成对A的Join

如何判断三个场景会产生什么的结果呢?两个判断标准:只能被Join 1 次&先到先得


exit

exit   pthread_exit(void **value_ptr)
子线程主动结束,通过pthread_exit传递值给父线程的pthread_join去接收


cancel

cancel   pthread_cancel(pthread_t thread)
向【子线程/自己】发送cancel信号,让子线程在线程取消点cancel。发送信号成功不代表cancel成功。

所有会让线程进入阻塞的点都是线程取消点,有些C库没有符合POSIX标准,所以要在调用这些函数之前使用pthread_test_cancel自建线程取消点


kill

kill   pthread_kill(pthread_t thread, int sig)
向【子线程/自己】发送指定的信号,如果子线程没有响应该信号的代码,则交由进程响应
例如发送SIGQUIT信号,子线程不响应的话,进程就会响应该信号,结束进程

通过kill给线程传递信号

int main() {

    pthread_t threadid;

    pthread_create( &threadid,

                                           NULL,

                                           routine,

                                           (void *)&threadparm);

    sleep(5);

 pthread_kill( threadid, SIGUSR1);

    pthread_join( threadid, NULL);

    return 0


void* routine(void *param)

{

    sigset_t set;

    sigemptyset(&set);

    sigaddset(&set, SIGUSR1)

    if (sigwait(&set) == SIGUSR1) {

        perror("Sigwait error");

        pthread_exit((void *)2);

    } else {

        printf("signal received!")

    }

    pthread_exit(0);

}

return语句

本质上跟pthread_exit一样,但是不会调用cleanup函数。

cleanup函数是线程结束之前还能通过它们多做些事的方式,以下是代码展示:

static void * thread_function(void *thread_arg)

{

    ...

    pthread_cleanup_push(callback1, "this is callback 1.");

    pthread_cleanup_push(callback2, "this is callback 2.");

    ...

    pthread_cleanup_pop(0); // 0就不调用callback2

    pthread_cleanup_pop(1); // 保持栈平衡

    pthread_exit((void *) result); // 使用return不会调callback

}

 

void callback1(void *callback_arg)

{

    printf("arg is : %s\n", (char *)callback_arg);

}

void callback2(void *callback_arg)

{

    printf("arg is : %s\n", (char *)callback_arg);

}

int main ()

{

    ...

    pthread_create(&tid, NULL, thread_function, (void *)thread_arg)

    ...

    return 0;

}

恭喜你,到这,你已经掌握猜的本领了,而且不是乱猜。但要真正掌握多线程,你还需要了解一个前提,写代码的哲学——

数据 + 逻辑 = 功能

两个锁:Mutex Lock、Reader-Writer Lock

为了解决线程竞争,需要用加锁来解决。Mutex Lock是最常用的锁。

Mutext Lock互斥锁

API列表

• int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr)

• int pthread_mutex_destroy(pthread_mutex_t *mutex)

• int pthread_mutex_lock(pthread_mutex_t *mutex)

• int pthread_mutex_unlock(pthread_mutex_t *mutex)

• int pthread_mutex_trylock(pthread_mutex_t *mutex)

代码展示

pthread_t tid[2];

pthread_mutex_t lock;


void* routine(void *arg)

{

    pthread_mutex_lock(&lock);

   

...

 

    pthread_mutex_unlock(&lock);

   

return NULL;

}

 

int main(void)

{

   pthread_mutex_init(&lock, NULL)

   

int i = 2;

    while ( i --> 0 )

    {

        pthread_create(&(tid[i]), NULL, &routine, NULL);

    }

   

pthread_join(tid[0], NULL);

    pthread_join(tid[1], NULL);

   

pthread_mutex_destroy(&lock);

   

return 0;

}

5大属性

第一个和第二个属性中均涉及一个词:优先级反转,这是什么呢?优先级反转是指低优先级的进程反而先于高优先级的进程得到执行。

怎么办?

• 释放资源的时候(关键临界区)不允许被中断

• 无锁同步方案(Non-blocking Synchronization / Read-Copy-Update)

•  Priority Ceiling Protocol

•  Priority Inheritance

Priority Ceiling Protocol

Priority Inheritance

进程共享

通过进程的共享内存,进行锁的共享

typedef struct

{

    pthread_mutex_t Lock;

} TShared;


const int NPROCESSES=32;  

pid_t pidprocesses[128];

 

// main process

int shmid = shmget(1616,sizeof(TShared),IPC_CREAT|0666);

 

TShared *shm = (TShared *) shmat(shmid,NULL,0);

 

pthread_mutex_init(&shm->Lock, 带上pshared属性);

/* ... other code ... */

for (int i=0;i<NPROCESSES;i++) {

    waitpid(pidprocesses[i],0,0);

}

 

for (int i=0;i<NPROCESSES;i++)

{

   pidprocesses[i]=fork();

   if (!pidprocesses[i])

   { 

      sleep(5); // wait the main process

      int shmid = shmget(1616,sizeof(TShared),0666);

     

TShared *shm = (TShared *) shmat(shmid,NULL,0);

     

bool cond=true;      

      while(cond)

      {

         pthread_mutex_lock(&shm->Lock);

         /* ... other code ... */        

         pthread_mutex_unlock(&shm->Lock);

      }

      exit(0);

   }

}

ROBUST:保证资源安全

当持有一个锁的线程还没释放就挂了,怎么办?

1、STALLED
无法申请到锁,未定义的行为。后面申请这个锁的线程可能会一直wait。
2、ROBUST
下一个申请这个锁的线程会收到一个 EOWNERDEAD错误第三个第四个申请锁的线程会处于waiting状态。

这个线程可以尝试恢复上一个线程挂掉之后对锁或对程序执行逻辑的影响

如果恢复成功,就可以调用pthread_mutex_consistent()函数来标记这个锁已经恢复正常,这个锁就相当于被这个线程给持有了。当这个线程释放锁了之后,其它后面的线程就可以正常使用锁了。如果恢复失败,这个锁就会永远处于不可用的状态,只能通过pthread_mutex_destroy()来回收这个锁。

死锁

重复加解锁会出现死锁,互相锁死就是死锁。

怎么办?按照顺序加锁即可。

Reader-Writer Lock读写锁

专治读得太多,写得太少。

• 创建和销毁
pthread_rwlock_init & pthread_rwlock_destroy 

• 读锁
pthread_rwlock_rdlock & pthread_rwlock_tryrdlock 

• 写锁
pthread_rwlock_wrlock & pthread_rwlock_trywrlock

 • 解锁
pthread_rwlock_unlock

2 个属性

• PShared   是否进程间共享 

• Kind   (防止写饥饿)
    Prefer Reader
        读优先,默认值
    Prefer Writer Non-Recursive
        写优先
    Prefer Writer
        同上,glibc中只提供Non-Recursive,添加这个只是为了跟POSIX对齐

代码展示

main() {

    ...

rwlock_init(&account_lock, 0, NULL);

    ...

}


// read thread

float get_balance() {

rw_rdlock(&account_lock);

    bal = checking_balance + saving_balance;

rw_unlock(&account_lock);

    return(bal);

}


// write thread

void transfer_checking_to_savings(float amount) {

rw_wrlock(&account_lock);

checking_balance = checking_balance - amount;

    saving_balance = saving_balance + amount;

rw_unlock(&account_lock);

}

Thread-Specific Data: 跟随线程的数据

如果想有一个数据,只能被本线程内所有函数访问,不能被别的线程访问,应该怎么办?

让数据跟着线程一起创建!

但,Pthread有更好的实现方案——Thread-Specific Data!

API列表

• int pthread_key_create(pthread_key_t *key, voi(*destructor)(void *))

•    int pthread_key_delete(pthread_key_t key)

•    int pthread_setspecific(pthread_key_t key, const void *value)

•    void * pthread_getspecific(pthread_key_t key)

代码展示

pthread_key_t   rl_key;

pthread_once_t  rl_once = PTHREAD_ONCE_INIT;


void readline_destructor(void *ptr)

{

    free(ptr);

}


void readline_once(void)

{

pthread_key_create(&rl_key, readline_destructor);

}


ssize_t readline( ... )

{

    ...


    void * ptr


    pthread_once(&rl_once, readline_once);


    if ( (ptr = pthread_getspecific(rl_key)) == NULL) {

// 初始化ptr,并写入key

        ptr = Malloc( ... );

pthread_setspecific(rl_key, ptr);

}


    …使用ptr…


pthread_delete_key(rl_key)

}

三个逻辑控制机制:

Condition-Variables、Semaphore、Barrier 

怎么通知另一个线程,让对方能够继续工作?加锁!

如果对方比你先到达临界区呢?Condition Variables !

Condition Variables

API列表

• int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr)

 int pthread_cond_destroy(pthread_cond_t *cond)

 int pthread_cond_signal(pthread_cond_t *cond)

• int pthread_cond_broadcast(pthread_cond_t *cond)

 int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)

2个属性

码展示

void thread_waiting_for_condition_signal ()

{

    pthread_mutex_lock(&mutex);

    while (operation_queue == NULL) {

pthread_cond_wait(&condition_variable_signal, &mutex);

    }


    …


    pthread_mutex_unlock(&mutex);

}


void thread_prepare_queue ()

{

    pthread_mutex_lock(&mutex);


    … 


pthread_cond_signal(&condition_variable_signal);

    pthread_mutex_unlock(&mutex);


    ...


    pthread_exit((void *) 0);


使用Condition Variable要注意三点

一定要结合Mutex使用

一定要先进行条件检测

一定要在临界区扔信号


关于【怎么通知另一个线程,让对方能够继续工作?】的另一个方案就是Semaphore !

Semaphore

API列表

•  int sem_init(sem_t *sem, int pshared, unsigned int value)
   int sem_open(const char *name, int oflag, …)

•  int sem_post(sem_t *sem) // value 加1

•  int sem_wait(sem_t *sem) // value 减1

•  int sem_destroy(sem_*sem)

•  int sem_getvalue(sem_t *sem, int *sval)

代码展示

typedef struct {

    char buf[BSIZE];

    sem_t occupied;

    sem_t empty;

    int nextin;

    int nextout;

    sem_t pmut;

    sem_t cmut;

} buffer_t;

 

buffer_t buffer;

 

sem_init(&buffer.occupied, 0, 0);

sem_init(&buffer.empty,0, BSIZE);

sem_init(&buffer.pmut, 0, 1);

sem_init(&buffer.cmut, 0, 1);

buffer.nextin = buffer.nextout = 0;


void producer(buffer_t *b, char item) {

    sem_wait(&b->empty);

    sem_wait(&b->pmut);

 

    b->buf[b->nextin] = item;

    b->nextin++;

    b->nextin %= BSIZE;

 

    sem_post(&b->pmut);

    sem_post(&b->occupied);

}

 

char consumer(buffer_t *b) {

    char item;


    sem_wait(&b->occupied);

    sem_wait(&b->cmut);


    item = b->buf[b->nextout];

    b->nextout++;

    b->nextout %= BSIZE;


    sem_post(&b->cmut);

    sem_post(&b->empty);


    return(item);

}

如何协调线程自己的工作逻辑?如等别的线程都到某个点后,我再继续?线程跑的快慢不一,如何保证线程都到一个点后,再做后面的事情呢?使用Barrier。这是Pthread中最简单的机制,设立一个栅栏,让相关的线程走到预定位置就wait,所有人都走到了,再继续。

Barrier

API列表

• int pthread_barrier_init(pthread_barrier_t *barrier, const pthread_barrierattr_t *attr, unsigned intcount)

• int pthread_barrier_destroy(pthread_barrier_t *barrier)

 int pthread_barrier_wait(pthread_barrier_t *barrier)

1 个属性

是否可以跨进程共享

代码展示

#define THREAD_COUNT 4

pthread_barrier_t mybarrier;



void* threadFn(void *args) {

  int sec = 1 + rand() % 5;

  sleep(sec);

  int result = pthread_barrier_wait(&mybarrier);

  printf("result is %d", result);

  return NULL;

}



int main() {

  int i;

  pthread_t ids[THREAD_COUNT];


pthread_barrier_init(&mybarrier, NULL, THREAD_COUNT + 1);


  for (i=0; i < THREAD_COUNT; i++) {

    pthread_create(&ids[i], NULL, threadFn, NULL);

  }


  int result = pthread_barrier_wait(&mybarrier);

  printf("main() is going! result is %d\n", result);


  for (i=0; i < THREAD_COUNT; i++) {

    pthread_join(ids[i], NULL);

  }


pthread_barrier_destroy(&mybarrier);


  return 0;

}

所有wait的线程唤醒之后,只有一个线程收到的返回值是PTHREAD_BARRIER_SERIAL_THREAD

其它线程收到的返回值是0

结尾

分享最后,casa给大家布置了作业,相信坚持阅读了文章的你,一定也跃跃欲试,那就来吧!

【得物技术】

扫一扫关注公众号


文章转载自得物技术,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论