准备

proxy lab 的难度和 malloc lab 不在一个数量级上,主要体现在两个方面:

  1. 除了第三部分要实现 cache 以外,前面两部分要求实现 proxy 和 synchronization, 都可以在书上的示例代码中拼凑出来;
  2. 测试评分宽松,通过就满分,malloc lab 可是按性能评分的;

简单归简单,但是不能不做准备工作:

  1. 仔细看一遍 proxy lab 的说明文档,在 csapp3e labs 官网上;
  2. 熟读 csapp3e 第11章,略读第10章,第12章;

要求实现三个功能:

  1. proxy
  2. synchronization
  3. cache

代码

proxy

这部分的代码大半都可以复用 tiny.c 的代码,包括函数 read_requesthdrs, clienterror, doit 的大部分, main 的大部分。

定义几个字符串全局变量做为请求的头部分:

1
2
3
4
5
6
7
static const char *user_agent_hdr = "user-agent: mozilla/5.0 (x11; linux x86_64; rv:52.0) gecko/20100101 firefox/52.0\r\n";
static const char *accept_hdr = "accept: text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8\r\n";
static const char *accept_encoding_hdr = "accept-encoding: gzip, deflate\r\n";
static const char *language_hdr = "accept-language: zh,en-us;q=0.7,en;q=0.3\r\n";
static const char *connection_hdr = "connection: close\r\n";
static const char *pxy_connection_hdr = "proxy-connection: close\r\n";
static const char *secure_hdr = "upgrade-insecure-requests: 1\r\n";

别忘了 lab 说明文档中要求在 proxy 的运行过程中遇到 sigpipe 信号时不要报错:

1
2
3
4
5
6
/* handle sigpipe signal */
void sigpipe_handler(int sig)
{
  printf("sigpipe handled %d\n", sig);
  return;
}

以及负责解析 client 发送的网址请求(原谅我,就像注释中说的,这个函数我没有自己写, 只是稍微改了一下,被 malloc lab 折腾惨了,抠细节的代码不想再写了):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
/*
 * this parse function i just copy the idea from another genius
 * do not want to spend too much time in that helper function
 */
int parse_uri(char *uri, char *host, char *port, char *path)
{
  const char *ptr;
  const char *tmp;
  char scheme[10];
  int len;
  int i;

  ptr = uri;

  tmp = strchr(ptr, ':');
  if (null == tmp)
    return 0;

  len = tmp - ptr;
  (void)strncpy(scheme, ptr, len);
  scheme[len] = '\0';
  for (i = 0; i < len; i++)
    scheme[i] = tolower(scheme[i]);
  if (strcasecmp(scheme, "http"))
    return 0;
  tmp++;
  ptr = tmp;

  for ( i = 0; i < 2; i++ ) {
    if ( '/' != *ptr ) {
      return 0;
    }
    ptr++;
  }

  tmp = ptr;
  while ('\0' != *tmp) {
    if (':' == *tmp || '/' == *tmp)
      break;
    tmp++;
  }
  len = tmp - ptr;
  (void)strncpy(host, ptr, len);
  host[len] = '\0';

  ptr = tmp;

  if (':' == *ptr) {
    ptr++;
    tmp = ptr;
    /* read port */
    while ('\0' != *tmp && '/' != *tmp)
      tmp++;
    len = tmp - ptr;
    (void)strncpy(port, ptr, len);
    port[len] = '\0';
    ptr = tmp;
  } else {
    port = "80";
  }

  if ('\0' == *ptr) {
    strcpy(path, "/");
    return 1;
  }

  tmp = ptr;
  while ('\0' != *tmp)
    tmp++;
  len = tmp - ptr;
  (void)strncpy(path, ptr, len);
  path[len] = '\0';

  return 1;
}

main 函数的前半部分和 tiny.c 的一致,在检查命令行参数是否合法之后:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
Signal(SIGPIPE, sigpipe_handler);

listenfd = Open_listenfd(argv[1]);
while (1) {
  clientlen = sizeof(clientaddr);
  connfd = Accept(listenfd, (SA *)&clientaddr, &clientlen);
  Getnameinfo((SA *) &clientaddr, clientlen, hostname, MAXLINE,
	      port, MAXLINE, 0);
  printf("Accepted connection from (%s, %s)\n", hostname, port);
  do_it(connfd);
  Close(connfd);
 }

然后就是 do_it 函数,它负责 proxy 的主要功能:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
void doit(int fd)
{
  char buf[MAXLINE], up_buf[MAXLINE], method[MAXLINE], uri[MAXLINE], version[MAXLINE];
  char host[MAXLINE], path[MAXLINE], port[MAXLINE];
  char cache[MAX_OBJECT_SIZE];
  rio_t rio, sio;
  int up_fd;
  size_t n, m = 0;

  /* Read request line and headers */
  Rio_readinitb(&rio, fd);
  if (!Rio_readlineb(&rio, buf, MAXLINE))
    return;
  sscanf(buf, "%s %s %s", method, uri, version);
  if (strcasecmp(method, "GET"))
    {
      clienterror(fd, method, "501", "Not Implemented",
		  "Proxy does not implement this method");
      return;
    }
  read_requesthdrs(&rio);
  if(!parse_uri(uri, host, port, path))
    {
      clienterror(fd, uri, "404", "Not found", "Request could not be parsed");
      return;
    }

  up_fd = Open_clientfd(host, port);
  Rio_readinitb(&sio, up_fd);

  sprintf(up_buf, "GET %s HTTP/1.0\r\nHost: %s\r\n%s%s%s%s%s%s%s\r\n", path, host, user_agent_hdr, accept_hdr, language_hdr, accept_encoding_hdr, connection_hdr, secure_hdr, pxy_connection_hdr);
  Rio_writen(up_fd, up_buf, strlen(up_buf));

  while((n = Rio_readlineb(&sio, up_buf, MAXLINE)) != 0)
    {
      content_type_ignore = m;
      m += n;
      Rio_writen(fd, up_buf, n);
    }
}

这样就可以实现一个简单的单线程 proxy 功能。

synchronization

大部份相关代码都可以借鉴 CSAPP3e 第12章的关于并发编程的例子(Pthread):

一个 thread 包装函数:

1
2
3
4
5
6
7
8
9
void *thread(void *var)
{
  int fd = *((int *)var);
  Pthread_detach(Pthread_self());
  Signal(SIGPIPE, sigpipe_handler);
  doit(fd);
  Close(fd);
  return NULL;
}

main 函数中修改原来调用 do_it 的那部分代码:

1
2
Pthread_create(&t_id, NULL, thread, &connfd);
/* 因为在 thread 包装器中已经写了 Close(fd), main 函数中就可以省去了 */

如果前面 proxy 功能能通过测试的话,这个多线程就可以正常工作。

cache

这里实现的是一个双向链表来存储缓存内容(感谢 malloc lab 给的灵感),就像前面所说 的,proxy lab 测试要求比较低,并没有很复杂的边界条件,通过测试是没有问题的,但是 这个双向链表实现出来以后有些地方不能和说明文档里的要求(可能只时建议)保持一致:

  1. 由于缓存替换要求实现 LRU 算法(或者接近 LRU),作者在这里是严格按照 LRU 算法 来做的,具体是:

    1. 每次读写都算做一次使用该链表元素
    2. 讲新近读写的缓存从链表中挑出,将它的前后链表关系置为 NULL
    3. 链接它的前后元素
    4. 插入链表头
  2. 但是问题就来了,因为这是一个支持多线程的 proxy, 这些线程共用一个链表(当然了), 所以每次更新链表的操作就需要加 , 每次读写都需要;虽然我在读缓存的代码中加 入了一个条件判断语句:如果链表头就是当前缓存元素,就跳过链表更新;毕竟如果有 多个线程同时读一个缓存的话,第一个加锁并读完的的线程将会把它置于链接头并释放 锁,后面等待的线程就可以跳过更新链表操作从而实现说明文档上的读并行;但是如果 有特别复杂的线程并行情况,比如最坏的情况:大量并行线程读取两个不同的缓存,链 表头更迭频繁,读并行就无法实现了。

  3. 大量的链表链接关系更新,多线程导致每处链表关系更新和读取都需要加锁;也考虑过 用单向链表和缓存内加时间戳来实现 LRU 算法,但是,单向链表的添加和解除元素在多 线程的情况下还是得加锁,并没有很大得并发提升。

  4. 写缓存在说明文档中是明确要求加锁(或者生产者消费者并发模形),在缓存结构体中 添加一个 mutex 来实现,每次要写缓存都要添加锁和释放锁。

  5. 使用固定大小的数组来取代链表?在多线程情况下从数组中存取元素照样得上锁;

  6. 综上这些问题都是选用链表而引出来,单向或者双向都无法避免,应该还会有对并发更 友好数据模形,实在不想折腾了(在经过 malloc lab 得折腾之后作者已经懈怠了), 等以后知识量上去了,或者真的有灵光一闪,再来处理这个。

  7. #<2018-04-06 Fri> 看了下 CSAPP3e 12.5.14 P707 读者优先的读者-写者模形, 一部分实现读者优先的代码可以借用到 cache.c 的读缓存函数里,可以实现读缓存的并 行实现(不过极端条件下当并行读取的进程同时进入 if(!(cache_node = CACHE_HEAD))== 时,就几乎没有并行可言了);书中的读者-写者模形整体来看并不适合 做 proxylab 要求的缓存:书中的模形过于简单,基于生产者-消费者模形的-有东西 产出就取走,而缓存要求的是有匹配的内容就读取该内容,无匹配就将获得的数据做为 缓存插入,不过 sbuf 这个数据结构还是可以借用的,需要大改,改造思路就是 LRU 链表替换原来的数组,或者保理数组,新加入一个数据结构存储每一个缓存的地址并桉 照 LRU 顺序排列;

缓存结构体:

1
2
3
4
5
6
7
8
typedef struct cache{
  int conn;			        /* is it a buf while connection */
  char buf[MAX_OBJECT_SIZE];            /* buf */
  char uri[MAXLINE];		        /* buf's uri */
  sem_t mutex;				/* sem_t mutex */
  struct cache *next_cache;	        /* next cache */
  struct cache *prev_cache;		/* previous cache */
}cache;

配套方法:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
/* initial CACHE_HEAD and sem_t mutex for the whole linked list */
void init_cache_list(void)
{
  CACHE_HEAD = NULL;
  Sem_init(&mutex, 0, 1);
}
/* insert cachenode as the head */
void insert_cache_head(cache *cache_node)
{
  if(CACHE_HEAD == cache_node) return;
  else if(CACHE_HEAD)
    {
      CACHE_HEAD->prev_cache = cache_node;
      cache_node->next_cache = CACHE_HEAD;
      cache_node->prev_cache = NULL;
      CACHE_HEAD = cache_node;
    }
  else CACHE_HEAD = cache_node;
}

/* break the node relations of a cache_node, and link the prev and next */
void repair_cache_link(cache *cache_node)
{
  cache *prev, *next;
  if(cache_node == CACHE_HEAD) return;
  else
    {
      prev = cache_node->prev_cache;
      next = cache_node->next_cache;
      if(prev) prev->next_cache = next;
      if(next) next->prev_cache = prev;
      cache_node->prev_cache = NULL;
      cache_node->next_cache = NULL;
    }
}

/* write cache */
cache *write_cache(char *buf, char *uri)
{
  P(&mutex);
  cache *cache_node = find_cache(uri);
  if(!cache_node)
    {
      cache_node = (cache *)malloc(sizeof(cache));
      Sem_init(&(cache_node->mutex), 0, 1);
    }
  P(&(cache_node->mutex));
  cache_node->conn = 1;		/* write as connection */
  strcpy(cache_node->uri, uri);
  strcpy(cache_node->buf, buf);
  V(&(cache_node->mutex));
  if(!(cache_node == CACHE_HEAD))
    {
      repair_cache_link(cache_node);
      insert_cache_head(cache_node);
    }
  V(&mutex);
  return cache_node;
}

/* after lost connection, turn off flag conn, handle eviction */
void hadnle_after_disconn(cache *cache_node)
{
  size_t size = 0;
  P(&(cache_node->mutex));
  cache_node->conn = 0;
  V(&(cache_node->mutex));
  P(&mutex);
  size = get_whole_cache_size();
  while(size > MAX_CACHE_SIZE)
    {
      printf("1");
      delete_tail_cache();
      size = get_whole_cache_size();
    }
  V(&mutex);
}

/* read cache */
char *read_cache(cache *cache_node)
{
  char *buf;

  P(&cnt_lock);
  readcnt++;
  if(readcnt == 1) P(&(cache_node->mutex));
  V(&cnt_lock);

  if(!(cache_node == CACHE_HEAD))
    {
      P(&mutex);
      repair_cache_link(cache_node);
      insert_cache_head(cache_node);
      V(&mutex);
    }

  buf = cache_node->buf;

  P(&cnt_lock);
  readcnt--;
  if(readcnt == 0) V(&(cache_node->mutex));
  V(&cnt_lock);

  return buf;
}

/* find cache based on host and path, if not exists, return NULL */
cache *find_cache(char *uri)
{
  cache *cache_node = CACHE_HEAD;
  while(cache_node)
    {
      printf("2");
      if(!strcmp(cache_node->uri, uri))
	return cache_node;
      else cache_node = cache_node->next_cache;
    }
  return NULL;
}

/* cache size, not the whole cache size but the buf size */
size_t get_cache_size(cache *cache_node)
{
  return strlen(cache_node->buf);
}

/* the size of entire cache list, only the buf size included */
size_t get_whole_cache_size(void)
{
  cache *cache_node = CACHE_HEAD;
  size_t size = 0;

  while(cache_node)
    {
      printf("3");
      if(!cache_node->conn) size += get_cache_size(cache_node);
      cache_node = cache_node->next_cache;
    }

  return size;
}

/* find tail cache */
cache *find_tail_cache(void)
{
  cache *temp_cache = NULL;
  cache *tail_cache = CACHE_HEAD;

  while(tail_cache)
    {
      printf("4");
      if(!tail_cache->next_cache && !tail_cache->conn) return tail_cache;
      else if(!tail_cache->conn) temp_cache = tail_cache;
      tail_cache = tail_cache->next_cache;
    }
  return temp_cache;
}

/* delete a tail cache_node */
void delete_tail_cache(void)
{
  cache *tail_cache = find_tail_cache();
  if(tail_cache)
    {
      P(&mutex);
      repair_cache_link(tail_cache);
      free(tail_cache);
      V(&mutex);
    }
}

在 proxy.c 中实现缓存,在 main 函数中初始化链表:

1
init_cache_list();

检查 client 提交的 uri 有没有缓存,如果没有缓存,按原来的逻辑执行下去并缓存收到的内容:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
char cache[MAX_OBJECT_SIZE];
int content_type_ignore;
struct cache *cache_node;

/* 省略 */

/* if already cached */
cache_node = find_cache(uri);
if(cache_node)
  {
    strcpy(up_buf, read_cache(cache_node));
    Rio_writen(fd, up_buf, strlen(up_buf));
    return;
  }

/* if not cached */

/* 省略 */

while((n = Rio_readlineb(&sio, up_buf, MAXLINE)) != 0)
  {
    content_type_ignore = m;
    m += n;
    Rio_writen(fd, up_buf, n);
    if(content_type_ignore) strcat(cache, up_buf);
  }
if(m <= MAX_OBJECT_SIZE) cache_node = write_cache(cache, uri);
/* ready to disconnect */
hadnle_after_disconn(cache_node);

写在后面

proxy lab 这样结束掉虽然能通过测试,它现在内部的代码逻辑还是存在很多问题的;

这次作者选择划水;

一来这次的 lab 过于功能性,网络编程和并发编程的知识结构比较顶层,做了收获有限, 投入大量时间并不值得;

二来作者当前的知识量也不够(数据结构、算法、一些顶层实现的函数接口),深挖下去效 率太低,这些时间可以花在更重要的事情上。

就这样吧,这里的代码以后会有重构的那一天的。

<2018-04-06 Fri> 7 稍微改进了读缓存的并行性 。