'2016/09'에 해당되는 글 1건

  1. 2016.09.11 [Linux/Kernel] Kernel readv/writev implementation (1)

- Kernel readv/writev internal implementation (1) -


네트워크 프로그래밍을 하던, 아니면 단순한 File I/O를 수행하던 read/write함수는 한 번쯤은 사용해 보았으리라 생각한다. 그런데, 만약에 전송하는 데이터가 여러 버퍼에 나뉘어져 있다면 어떻게 해야할까. 아니면 한 데이터를 일정 크기로 끊어서 여러 버퍼에 나눠 저장해야 한다면?

물론 read/write함수를 버퍼의 개수만큼 호출하면 매우 간단할 것이다. 그런데, 만약 나뉘어진 버퍼의 개수가 천 단위를 넘어간다면 어떻게 될까? 그래도 read/write를 버퍼 개수만큼 호출하면 될까?


여기서 왜 안되는지 묻는 사람들이 있을지도 모르겠지만, 리눅스 커널은 User-mode와 Kernel-mode로 나뉘어 있다는 것을 기억하자. 비록 우리가 빌드해서 실행시키는 프로그램 자체는 User-mode에서 동작하지만, 실제 장치에 접근해야 하는 read/write함수는 Kernel-mode에서 수행될 수 밖에 없다. 즉, Syscall인 read/write함수를 호출하게 되면, User-mode에서 Kernel-mode로 Context switch가 일어나게 된다. 이 Context switch 과정에서 Register나 기타 여러 프로그램의 수행 정보를 메모리에 저장하고, 페이징 모듈은 프로세스 매핑 메모리에서 커널 메모리 영역으로의 전환을 시도하게 된다. 즉, 필연적으로 Overhead가 발생할 수 밖에 없다는 것이다.


다시 돌아와서, 만약 버퍼가 수천개가 있다고 read/write를 수천번 시도하게 된다면 Context switch도 수천번 일어나게 된다는 것. 그만큼 수행시간에 있어서 더 많은 Cost를 소모하게 된다.

즉, User-mode와 Kernel-mode사이의 전환을 최소화 하면서 동시에 여러 버퍼에 대한 작업을 할 수 있는 Syscall이 필요하게 된 것이다.

그리고 그것이 readv/writev이다.


본격적인 readv/writev에 대한 설명에 앞서, readv/writev의 효율성을 확인하기 위해 24Byte크기의 char형 버퍼 1024개를 대상으로 read-readv의 퍼포먼스 테스트를 진행해 보았다.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
#include <sys/time.h>
#include <sys/uio.h>
#include <sys/stat.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdio.h>
 
int main(){
    struct timeval t1, t2;
        double diffTime, finalTime = 0;
 
    char chBuf[1024][25= {0, };
    ssize_t nr;
    int fd, i, loop;
 
    printf("read() : 1024-Buffers\n");
 
    for(loop = 0; loop < 10++loop){
        gettimeofday(&t1, NULL);
 
        fd = open("txtfile", O_RDONLY);
        if (fd == -1) {
            perror ("open");
            return fd;
        }
 
        for(i = 0; i < 1024++i){
            nr = read(fd, chBuf[i], 24);
            if (nr == -1) {
                perror ("read");
                return nr;
            }
        }
 
        if(close(fd)){
            perror("close");
            return -1;
        }
 
        gettimeofday(&t2, NULL);
        diffTime = t2.tv_usec - t1.tv_usec;
        finalTime += diffTime;
        printf("[Loop %d] Elapsed Time : %fus\n", loop, diffTime);
    }
    printf("[Avarage] Elapsed Time : %fus\n", finalTime/10);
    return 0;
}
 
cs

-read 테스트 코드


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
#include <sys/time.h>
#include <sys/uio.h>
#include <sys/stat.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdio.h>
 
int main(){
    struct timeval t1, t2;
        double diffTime, finalTime = 0;
 
    struct iovec iov[1024];
    char chBuf[1024][25= {0, };
    ssize_t nr;
    int fd, i, loop;
 
    printf("readv() : 1024-Buffers\n");
 
    for(loop = 0; loop < 10++loop){
        gettimeofday(&t1, NULL);
 
        fd = open("txtfile", O_RDONLY);
        if (fd == -1) {
            perror ("open");
            return fd;
        }
 
        for(i = 0; i < 1024++i){
            iov[i].iov_base = chBuf[i];
            iov[i].iov_len = 24;
        }
 
        nr = readv (fd, iov, 1024);
        if (nr == -1) {
            perror ("readv");
            return nr;
        }
 
        if(close(fd)){
            perror("close");
            return -1;
        }
 
        gettimeofday(&t2, NULL);
        diffTime = t2.tv_usec - t1.tv_usec;
        finalTime += diffTime;
        printf("[Loop %d] Elapsed Time : %fus\n", loop, diffTime);
    }
    printf("[Avarage] Elapsed Time : %fus\n", finalTime/10);
    return 0;
}
 
cs

-readv 테스트 코드


-실제 테스트 결과


실제로 위의 두 코드를 컴파일, 실행시킨 결과 약 9배 가까이의 수행시간 차이가 나는 것을 알 수 있다.


그렇다면, readv/writev가 기존의 read/write와 어느 부분에서 달라진 것일까? 바로 iovec라는 구조체이다. 기존의 read에서는 버퍼 그 자체의 포인터를 인자로서 넘겼다면, readv에서는 다수의 버퍼를 iovec[] 즉, iovec array에 저장해서 iovec array의 포인터를 인자로서 전달하게 된다. 간단하게 말해서, Syscall의 호출 전에 다수의 버퍼를 하나의 Wrapper 배열에 저장해 넘기게 된다고 보면 될 듯 하다. 쉽게 말하자면 여러 곳에 흩어져 있는 문서(입출력 버퍼)들을 서류철(iovec)에 모아 부장님(커널)께 한번에 결제 받는 것과 비슷하다고 보면 될까.


아무튼, readv와 writev에서 공통적으로 사용되는 구조체인 iovec는 매우 간단한 구조를 띄고 있다.


1
2
3
4
5
struct iovec
{
    void __user *iov_base;    
    __kernel_size_t iov_len;
};
cs


보다시피 실제 버퍼의 주소값을 담는 void __user pointer형인 iov_base와 iovec에 담긴 주소의 크기를 나타내는 __kernel_size_t형인 iov_len으로 이루어져 있다.


실제 코드에서의 사용은 다음과 같다.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
#include <sys/uio.h>
#include <sys/stat.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdio.h>
 
int main(){
    struct iovec iov[3];
    char chBuf[3][65];
    ssize_t nr;
    int fd, i;
 
    fd = open("txtfile", O_RDONLY);
    if (fd == -1) {
        perror ("open");
        return fd;
    }
 
    for(i = 0; i < 3++i){
        iov[i].iov_base = chBuf[i];
        iov[i].iov_len = 64;
    }
 
    nr = readv (fd, iov, 3);
    if (nr == -1) {
        perror ("readv");
        return nr;
    }
 
    for(i = 0; i < 3++i){
        printf("Num.%d [%ld Bytes] : %s\n", i, iov[i].iov_len, (char*)iov[i].iov_base);
    }
 
    if(close(fd)){
        perror("close");
        return -1;
    }
    
    return 0;
}
cs


위와 같이 iovec배열을 생성해준 뒤, 각각의 iovec에 입출력버퍼를 할당해준다. iovec에서 iov_base에 배열의 포인터를, iov_len에 각 배열의 크기를 할당하게 된다.


위 코드에서 readv함수는 readv(fd, iov, 3)의 형태로 호출되고 있는데,

기존의 read의 형태인


1
ssize_t read(int fd, void *buf, size_t count);
cs


에서 아래와 같이


1
ssize_t readv(int fd, const struct iovec *iov, int iovcnt);
cs


의 형태로 바뀐 것이다. 자세히 보면 기존 read함수의 원형에서 크게 벗어난 부분이 없다. 기존 버퍼를 직접 받던 buf포인터가 iovec배열을 받는 iovec포인터로 바뀌었고, 버퍼의 크기가, iovec에 할당된 버퍼의 갯수, 즉 iovec배열의 크기로 바뀐 것 뿐이다.


그럼 iovec을 인자로 받은 readv/writev가 실제로 어떠한 식으로 동작하는지 코드를 보며 한 번 분석해보도록 하겠다.

먼저 readv의 내부 구현을 살펴보도록 하겠다.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
SYSCALL_DEFINE3(readv, unsigned long, fd, const struct iovec __user *, vec,
        unsigned long, vlen)
{
    return do_readv(fd, vec, vlen, 0);
}
 
static ssize_t do_readv(unsigned long fd, const struct iovec __user *vec,
                        unsigned long vlen, int flags)
{
        struct fd f = fdget_pos(fd);
        ssize_t ret = -EBADF;
 
        if (f.file) {
                loff_t pos = file_pos_read(f.file);
                ret = vfs_readv(f.file, vec, vlen, &pos, flags);
                if (ret >= 0)
                        file_pos_write(f.file, pos);
                fdput_pos(f);
        }
 
        if (ret > 0)
                add_rchar(current, ret);
        inc_syscr(current);
        return ret;
}
cs


가장 먼저, readv()의 호출에 의해 readv() Syscall wrapper routine가 호출되게 된다. SYSCALL_DEFINE3매크로의 형태를 하고 있는 Wrapper routine는 do_readv함수를 이어 호출하게 된다.


do_readv는 Wrapper routine으로부터 fd(File descriptor - fd), vec(IO Vector - iovec), vlen(IO Vector Count - iovcnt)를 차례대로 전달 받게 된다. 이때, 마지막 인자인 flags는 readv호출 자체에서는 0으로 설정되며, readv계열 파생 함수인 preadv(2)에서 Syscall 인자로서 받게 되기에 크게 신경 쓰지 않아도 된다.


do_readv에서는 fdget_pos함수를 통해서 fd값(int)로부터 실제 File descriptor를 구해온 뒤, file_pos_read를 호출하면서 File offset 구해오게 된다.

그 뒤, 구해온 File offset을 포함해서, 입출력 작업에 필요한 인자들을 넘기면서 vfs_readv함수를 호출하게 된다.


1
2
3
4
5
6
7
8
9
10
ssize_t vfs_readv(struct file *file, const struct iovec __user *vec,
          unsigned long vlen, loff_t *pos, int flags)
{
    if (!(file->f_mode & FMODE_READ))
        return -EBADF;
    if (!(file->f_mode & FMODE_CAN_READ))
        return -EINVAL;
 
    return do_readv_writev(READ, file, vec, vlen, pos, flags);
}
cs


vfs_readv함수에서는 인자로 넘어온 파일 구조체에서 파일이 읽기 가능한 상태인지만 확인한 뒤, do_readv_writev함수을 READ Type으로 호출하게 된다. (리눅스 커널 함수들 중 입출력에 관련된 함수들은 read와 write가 같은 함수에 구현되어 있고, type인자를 통해 구분되는 경우가 많다)


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
static ssize_t do_readv_writev(int type, struct file *file,
                   const struct iovec __user * uvector,
                   unsigned long nr_segs, loff_t *pos,
                   int flags)
{
    size_t tot_len;
    struct iovec iovstack[UIO_FASTIOV];
    struct iovec *iov = iovstack;
    struct iov_iter iter;
    ssize_t ret;
    io_fn_t fn;
    iter_fn_t iter_fn;
 
    ret = import_iovec(type, uvector, nr_segs,
               ARRAY_SIZE(iovstack), &iov, &iter);
    if (ret < 0)
        return ret;
 
    tot_len = iov_iter_count(&iter);
    if (!tot_len)
        goto out;
    ret = rw_verify_ara(type, file, pos, tot_len);
    if (ret < 0)
        goto out;
 
    if (type == READ) {
        fn = file->f_op->read;
        iter_fn = file->f_op->read_iter;
    } else {
        fn = (io_fn_t)file->f_op->write;
        iter_fn = file->f_op->write_iter;
        file_start_write(file);
    }
 
    if (iter_fn)
        ret = do_iter_readv_writev(file, &iter, pos, iter_fn, flags);
    else
        ret = do_loop_readv_writev(file, &iter, pos, fn, flags);
 
    if (type != READ)
        file_end_write(file);
 
out:
    kfree(iov);
    if ((ret + (type == READ)) > 0) {
        if (type == READ)
            fsnotify_access(file);
        else
            fsnotify_modify(file);
    }
    return ret;
}
cs


위 코드가 do_readv_writev함수이다. 가장 먼저 import_iovec함수를 통해서 유저영역상에 iovec배열을 커널영역 iovec배열로 전달하게 된다. 먼저 import_iovec의 구현을 살펴보도록 하겠다.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
int import_iovec(int type, const struct iovec __user * uvector,
                 unsigned nr_segs, unsigned fast_segs,
                 struct iovec **iov, struct iov_iter *i)
{
        ssize_t n;
        struct iovec *p;
        n = rw_copy_check_uvector(type, uvector, nr_segs, fast_segs,
                                  *iov, &p);
        if (n < 0) {
                if (p != *iov)
                        kfree(p);
                *iov = NULL;
                return n;
        }
        iov_iter_init(i, type, p, nr_segs, n);
        *iov = p == *iov ? NULL : p;
        return 0;
}
EXPORT_SYMBOL(import_iovec);
cs


import_iovec의 모습이다. 여기서 주의해서 보아야 할 것은 fast_segs와 iov, 그리고 i이다. 이 세 변수 만은 확실히 기억하도록 하자. import_iovec함수에서는 rw_copy_check_uvector라는 함수를 통해 커널영역 iovec배열을 할당하고 유저영역 iovec의 값을 복사하게 된다.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
ssize_t rw_copy_check_uvector(int type, const struct iovec __user * uvector,
                              unsigned long nr_segs, unsigned long fast_segs,
                              struct iovec *fast_pointer,
                              struct iovec **ret_pointer)
{
        unsigned long seg;
        ssize_t ret;
        struct iovec *iov = fast_pointer;
 
        /*
         * SuS says "The readv() function *may* fail if the iovcnt argument
         * was less than or equal to 0, or greater than {IOV_MAX}.  Linux has
         * traditionally returned zero for zero segments, so...
         */
        if (nr_segs == 0) {
                ret = 0;
                goto out;
        }
 
        /*
         * First get the "struct iovec" from user memory and
         * verify all the pointers
         */
        if (nr_segs > UIO_MAXIOV) {
                ret = -EINVAL;
                goto out;
        }
        if (nr_segs > fast_segs) {
                iov = kmalloc(nr_segs*sizeof(struct iovec), GFP_KERNEL);
                if (iov == NULL) {
                        ret = -ENOMEM;
                        goto out;
                }
        }
        if (copy_from_user(iov, uvector, nr_segs*sizeof(*uvector))) {
                ret = -EFAULT;
                goto out;
        }
 
        /*
         * According to the Single Unix Specification we should return EINVAL
         * if an element length is < 0 when cast to ssize_t or if the
         * total length would overflow the ssize_t return value of the
         * system call.
         *
         * Linux caps all read/write calls to MAX_RW_COUNT, and avoids the
         * overflow case.
         */
        ret = 0;
        for (seg = 0; seg < nr_segs; seg++) {
                void __user *buf = iov[seg].iov_base;
                ssize_t len = (ssize_t)iov[seg].iov_len;
 
                /* see if we we're about to use an invalid len or if
                 * it's about to overflow ssize_t */
                if (len < 0) {
                        ret = -EINVAL;
                        goto out;
                }
                if (type >= 0
                    && unlikely(!access_ok(vrfy_dir(type), buf, len))) {
                        ret = -EFAULT;
                        goto out;
                }
                if (len > MAX_RW_COUNT - ret) {
                        len = MAX_RW_COUNT - ret;
                        iov[seg].iov_len = len;
                }
                ret += len;
        }
out:
        *ret_pointer = iov;
        return ret;
}
cs


rw_copy_check_uvector의 28번째 줄을 보면 iovec 포인터형 지역변수인 iov에 fast_pointer를 넣어주고 있다. fast_pointer는 rw_copy_check_uvector의 5번째 매개변수로, import_iovec에서의 iov에 해당하게 된다. 그런데, 이 함수에서 44~58번째 줄을 보도록 하자.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
if (nr_segs > UIO_MAXIOV) {
        ret = -EINVAL;
        goto out;
}
if (nr_segs > fast_segs) {
        iov = kmalloc(nr_segs*sizeof(struct iovec), GFP_KERNEL);
        if (iov == NULL) {
                ret = -ENOMEM;
                goto out;
        }
}
if (copy_from_user(iov, uvector, nr_segs*sizeof(*uvector))) {
        ret = -EFAULT;
        goto out;
}
cs


위 코드는 해당 부분을 잘라낸 코드이다. 여기서 5번째 if문에 의해 nr_segs가 fast_segs보다 클 경우, iov에 kmalloc을 통해 커널 메모리를 할당해주게 된다. 그런 뒤, 12번 라인에서 copy_from_user를 통해 유저영역 iovec의 값들을 iov에 복사해주게 된다. 이때, 놓치지 말아야 할 것이 있는데, iov가 5번 라인 if문을 충족시키지 않아 할당되지 않았을 때에도 copy_from_user를 수행할 수 있는가라는 의문을 가져야 한다.


이 문제의 답은 import_iovec의 호출 전, do_readv_writev함수에서 찾을 수 있다. do_readv_writev함수의 6~9번 라인과 import_iovec호출 부분을 참조하자.


1
2
3
4
5
6
7
8
9
size_t tot_len;
struct iovec iovstack[UIO_FASTIOV];
struct iovec *iov = iovstack;
struct iov_iter iter;
 
...
 
ret = import_iovec(type, uvector, nr_segs,
               ARRAY_SIZE(iovstack), &iov, &iter);
cs


해당 부분인데, rw_copy_check_uvector의 fast_pointer, 즉 import_iovec의 iov에 해당하는 값이 위의 3번 라인에서 정의되는 것을 볼 수 있다. 이때, 3번 라인에서 보이는 iovstack은 2번 라인에서 do_readv_writev의 지역변수로서 iovec[UIO_FASTIOV]라는 스택 배열이다. 또한, fast_segs에 해당하는 부분은 ARRAY_SIZE(iovstack)이다. 즉, rw_copy_check_uvector에서 iov는 비할당된 영역을 가리키지 않는다.


rw_copy_check_uvector의 28번 라인, 즉, nr_segs와 fast_segs를 비교하는 if문에서 만일 do_readv_writev에서 할당한 iovstack의 크기(여기서는 UIO_FASTIOV)보다 readv로 넘어온 총 iovec의 개수가 더 많을 경우에만 kmalloc을 통해서 별도의 영역 할당을 진행하게 된다.


여기에는 의미가 있는데, 어차피 커널영역 iovec객체인 iov는 do_readv_writev보다 하위 스택 프레임에서만 이용할 것이 보장된다. 따라서, 적은 수의 iovec을 처리할 경우에는 kmalloc을 통해 별도의 iovec들을 할당하는 것보다, 약간의 고정적 스택 낭비를 감안하더라도 do_readv_writev함수의 스택 프레임 구성시에 지역변수로서 일정크기의 iovec배열을 미리 정의하여 빠른 처리가 가능하도록 하는 것이다.


kmalloc을 통한 할당여부를 결정하는 임계값인 UIO_FASTIOV는 '#define UIO_FASTIOV 8'로, 만일 사용자가 readv로 전달해 준, iovec배열의 iovec객체가 8개보다 많을 경우에는 kmalloc을 통해 별도로 커널 메모리를 할당해주게 된다.


자, 그럼 다시 원래 rw_copy_check_uvector로 돌아와서, rw_copy_check_uvector의 소스코드를 찬찬히 분석해보도록 하자.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
if (nr_segs == 0) {
        ret = 0;
        goto out;
}
 
/*
 * First get the "struct iovec" from user memory and
 * verify all the pointers
 */
if (nr_segs > UIO_MAXIOV) {
        ret = -EINVAL;
        goto out;
}
if (nr_segs > fast_segs) {
        iov = kmalloc(nr_segs*sizeof(struct iovec), GFP_KERNEL);
        if (iov == NULL) {
                ret = -ENOMEM;
                goto out;
        }  
}
if (copy_from_user(iov, uvector, nr_segs*sizeof(*uvector))) {
        ret = -EFAULT;
        goto out;
}
cs


가장 먼저 이 부분을 살펴보도록 하자. 먼저 nr_segs, 즉 readv로 넘어온 iovec의 개수가 0개일 경우에는 아무 작업을 하지 않고 바로 0를 리턴하게 된다.


그 뒤, 10~13번 라인을 살펴보도록 하자. nr_segs가 UIO_MAXIOV보다 클 경우, ret에 에러코드를 넣은 뒤 바로 함수의 마지막 부분으로 이동하게 된다. 이때, UIO_MAXIOV는 '#define UIO_MAXIOV 1024'로 이 코드를 간단하게 해석하자면 사용자가 readv로 넘겨준 iovec배열에서 iovec객체의 개수가 1024개를 넘을 경우 에러를 내게 된다.



위 사진은 실제로 1024이상의 iovec을 인자로 주어 readv를 호출하였을 경우고, 정상적으로 readv가 수행되지 않고 오류를 출력함을 볼 수 있다.


이는 실제 iovec처리를 위해서는 kmalloc을 통하여 커널 메모리를 할당 받는데, 1024와 같이 크기 제한을 두지 않을 경우 메모리 침범이 일어날 가능성이 있고, 설령 그렇지 않더라도 kmalloc플래그가 GFP_KERNEL이기 때문에 할당이 불가능한 메모리 크기를 할당받기 위해 해당 제어경로가 무한정 휴면상태에 빠질 수 있기에 1024로 iovec의 개수를 제한하지 않았나라고 생각된다.


계속 이어서, 14~24 라인은 위에서 개략적인 설명을 하였으므로 넘어가도록 하겠다.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
        ret = 0;
        for (seg = 0; seg < nr_segs; seg++) {
                void __user *buf = iov[seg].iov_base;
                ssize_t len = (ssize_t)iov[seg].iov_len;
 
                /* see if we we're about to use an invalid len or if
                 * it's about to overflow ssize_t */
                if (len < 0) {
                        ret = -EINVAL;
                        goto out;
                }
                if (type >= 0
                    && unlikely(!access_ok(vrfy_dir(type), buf, len))) {
                        ret = -EFAULT;
                        goto out;
                }
                if (len > MAX_RW_COUNT - ret) {
                        len = MAX_RW_COUNT - ret;
                        iov[seg].iov_len = len;
                }
                ret += len;
        }
out:
        *ret_pointer = iov;
        return ret;
}
cs


위 코드에서는 상단코드 24라인까지의 모든 과정을 끝낸 뒤, 유저모드 iovec의 정보들이 모두 커널모드에 할당된 iovec으로 옮겨진 뒤 수행되는 작업이다. for문을 통해 nr_segs, 즉 iovec의 개수만큼 3~21라인까지 루프문을 진행하게 된다. 전체적으로 크게 신경 쓸 부분은 없다.


8~11라인은 iov_len이 음수일 경우, 즉 오버플로우나 기타 이유를 통해 iov_len이 비정상적으로 변경되었을 경우를 판단하게 된다.


17~21라인은 MAX_RW_COUNT - ret, 즉 커널에서 한번에 최대로 입출력 처리를 할 수 있는 바이트 수에서 이전 iovec객체까지의 길이(iov_len)총합을 뺀 것보다 현재 Index의 iovec객체의 iov_len이 더 클 경우, 해당 값을 MAX_RW_COUNT-ret으로 지정하게 된다. 쉽게 말해서, iovec이 아무리 많고, 하나의 iovec객체의 버퍼 크기가 아무리 크더라도 한번에 입출력을 수행할 수 있는 바이트 수는 MAX_RW_COUNT를 넘을 수 없다는 것이다.

이때 주의해야 하는 것은, MAX_RW_COUNT의 크기를 넘은 시점부터 그 이후의 iovec에 대해서는 입출력이 수행되지 않는다는 것이지, readv의 수행에 오류가 생기는 것은 아니다. 물론, readv에서 오류가 생기는게 아닐 뿐이지 그 전에 너무 큰 배열의 생성으로 Segmentation Fault가 발생하지 않을까 싶다.

만일 MAX_RW_COUNT 값을 넘었는지 확인하고자 한다면, readv로부터 리턴된 읽은 바이트 수를 비교하면 된다.


그럼 이어서 12~16 라인을 살펴보도록 하자. 해당 라인에서는 가장 먼저 type >= 0인지 확인하게 된다. type은 정상적인 경우 READ와 WRITE, READA의 세 type만이 존재하며, 모두 0이상 값을 가지고 있다(READ == 0). 따라서 type이 Invalid할 경우 바로 에러코드를 저장하고 함수를 나가게 된다(Short-circuit evaluation).

만일, type이 Valid할 경우, access_ok를 통해서 iovec에 할당된 영역에 접근이 가능한지(유효 메모리 범위 안에 존재하는지)를 확인하게 된다.

이때, 첫번째 인자인 type은 vrfy_dir매크로('#define vrfy_dir(type) ((type) == READ ? VERIFY_WRITE : VERIFY_READ')를 통해서 결정케 된다.


마지막 23~26 라인에서는 할당과 값 정의, 그리고 Verify과정까지 모두 거친 iov를 rw_copy_check_uvector의 마지막 매개변수인 ret_pointer에 저장한 뒤, ret(<= MAX_RW_COUNT)를 리턴해주며 다시 import_iovec으로 돌아가게 된다.

그럼, 다시 import_iovec함수를 살펴보자.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
int import_iovec(int type, const struct iovec __user * uvector,
                 unsigned nr_segs, unsigned fast_segs,
                 struct iovec **iov, struct iov_iter *i)
{
        ssize_t n;
        struct iovec *p;
        n = rw_copy_check_uvector(type, uvector, nr_segs, fast_segs,
                                  *iov, &p);
        if (n < 0) {
                if (p != *iov)
                        kfree(p);
                *iov = NULL;
                return n;
        }
        iov_iter_init(i, type, p, nr_segs, n);
        *iov = p == *iov ? NULL : p;
        return 0;
}
EXPORT_SYMBOL(import_iovec);
cs


위에 이미 존재하는 코드지만, 스크롤을 다시 올리기는 필자도 귀찮았기에 가져왔다.

rw_copy_check_uvector의 Return(iovec의 총 크기)가 0보다 작을 경우에는 Invalid하기에 9~14라인에서 메모리 할당을 해제하고 함수를 나가게 된다.


하지만, 정상적으로 Return값이 들어왔을 경우에는 iov_iter_init함수를 호출하게 된다.

그 뒤에 16라인에서는 iov의 값을 삼항연산자로 비교, 만일 rw_copy_check_uvector에서 얻어온 return_pointer인 p가 do_readv_writev의 iovstack과 같을 경우(즉, iovec의 개수가 8개 이하라 kmalloc이 수행되지 않은 경우)에는 iovstack이 존재하기에 iov를 NULL로 초기화 하며, 그렇지 않을 경우에는 p로 초기화 해주어 할당한 메모리 영역의 주소를 알려주게 된다.


간략하게 iov_iter_init에 대해 소개해보자면, 소스코드는 다음과 같다.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
void iov_iter_init(struct iov_iter *i, int direction,
                        const struct iovec *iov, unsigned long nr_segs,
                        size_t count)
{
        /* It will get better.  Eventually... */
        if (segment_eq(get_fs(), KERNEL_DS)) {
                direction |= ITER_KVEC;
                i->type = direction;
                i->kvec = (struct kvec *)iov;
        } else {
                i->type = direction;
                i->iov = iov;
        }
        i->nr_segs = nr_segs;
        i->iov_offset = 0;
        i->count = count;
}
EXPORT_SYMBOL(iov_iter_init);
cs


간단하게 설명해보자면, 6번째 라인 if문에서 get_fs()로 얻어온 현재 스레드의 세그먼트 정보와 KERNEL_DS를 비교, 현재 세그먼트가 커널 데이터 세그먼트 영역이라면, 7~9 라인을 수행하게 된다.


7번 라인에서는 direction(type, 현재는 READ)에 ITER_KVEC을 OR함으로 현재 iovec Iteration이 커널 영역에서 진행됨을 알려주게 된다.

8번 라인은 else일때의 11번 라인과 동일하게 direction으로 iov_iter구조체의 type을 설정하게 된다.

9번과 12번 라인의 차이점에 주의해야 되는데. 커널 데이터 세그먼트 영역에 속할 경우, iovec이 아닌 kvec으로서 변환하여 iovec을 iov_iter에 할당하게 된다.


이때, kvec과 iovec의 변환은 비교적 자유로운 편인데,


1
2
3
4
struct kvec {
        void *iov_base;
        size_t iov_len;
};
cs


1
2
3
4
5
struct iovec
{
        void __user *iov_base;
        __kernel_size_t iov_len;
};
cs


보다시피 kvec의 경우에는 iov_base가 커널영역의 실제 주소를 가르키고, iovec은 구조체 자체는 커널 영역에 할당 되더라도 iov_base는 __user포인터이기에 유저영역의 가상 주소를 가리키게 된다. 그 외에는 큰 차이가 없다. 구조상의 차이가 없고, 다만 저장되는 주소가 커널 주소냐 유저 주소냐의 차이일 뿐.


다시 돌아가서, 14~16 라인을 통해서 마지막으로 iovec을 iov_iter구조체를 통해 Iteration할 수 있도록, 총 iovec객체의 개수와, 총 iovec의 바이트 크기를 iov_iter구조체에 저장하게 된다.


그럼, 다시 import_iovec으로 돌아와서 설명을 계속하자면, import_iovec은 유저영역에서 받아온 iovec을 커널 메모리상에 복사한 뒤, 커널에 할당된 iovec을 iov_iter구조체에 등록하는 역할까지를 수행케 된다.


이제 import_iovec에서 벗어나, do_readv_writev로 돌아가도록 하자.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
if (ret < 0)
    return ret;
 
tot_len = iov_iter_count(&iter);
if (!tot_len)
    goto out;
ret = rw_verify_area(type, file, pos, tot_len);
if (ret < 0)
    goto out;
 
if (type == READ) {
    fn = file->f_op->read;
    iter_fn = file->f_op->read_iter;
else {
    fn = (io_fn_t)file->f_op->write;
    iter_fn = file->f_op->write_iter;
    file_start_write(file);
}
cs


import_iovec의 호출 이후 부분이다. import_iovec이 정상 수행이 되지 않은 경우, 1번 라인의 if문에서 바로 호출 종료가 되게 된다.


이어 4번 라인에서 iov_iter_count를 통해 현재 iov_iter의 총 바이트 길이를 구해오게 된다. 그 뒤, rw_verify_area를 통해서 해당 크기만큼의 공간이 파일에서 접근이 가능한지 확인하게 된다.

rw_verify_area에서는 mandatory lock의 여부와, 파일 오프셋의 Valid여부 등을 확인하게 되는데, readv의 구현과 직접 연관이 된 것이 아니므로 이런 역할을 수행한다 정도로 넘어가자.


11번 라인부터는 READ일 경우 현재 파일 구조체의 file_operations구조체에서 read함수와 read_iter함수를 얻어오게 된다. 현재 일반적으로 쓰이고 있는 Ext4 파일시스템에서는 read멤버는 존재하지 않으며, read_iter멤버만이 new_sync_read로 정의되어 있다.


14번 이하 라인에서는 READ가 아닐 경우, 즉 WRITE의 경우의 작업이며 지금은 크게 관심가질 필요 없다. file_start_write함수를 통해 현재 파일시스템에 해당 파일의 inode가 존재하는지 확인하고 존재할 경우 해당 파일의 superblock에 locking을 수행한다 정도만 숙지하자.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
    if (iter_fn)
        ret = do_iter_readv_writev(file, &iter, pos, iter_fn, flags);
    else
        ret = do_loop_readv_writev(file, &iter, pos, fn, flags);
 
    if (type != READ)
        file_end_write(file);
 
out:
    kfree(iov);
    if ((ret + (type == READ)) > 0) {
        if (type == READ)
            fsnotify_access(file);
        else
            fsnotify_modify(file);
    }
    return ret;
}
cs


사실상 do_readv_writev의 남은 부분은 크게 중요치는 않은 부분들이다.

간단하게 iter_fn이 정의되어 있으면 iter_fn을 인자로 넘기면서 do_iter_readv_writev함수를 먼저 호출하고, 그렇지 않을 경우 do_loop_readv_writev에 read를 넘기면서 호출하게 된다.


그 이후 만일 READ가 아닐경우에는 file_end_write로 superblock lock을 unlock해주게 된다.


10번 라인부터는 할당된 iovec을 해제해주면서 fsnotify_access(READ일 경우)와 fsnotify_modify(WRITE 등)을 통해서 파일에 대한 접근 및 변경이 일어났음을 fsnotify를 통해 전달하게 된다.


그럼 여기서부터는 함수의 수행이 두 가지로 갈리게 된다. 하지만 최신 파일시스템의 대부분은 iter_read을 지원하며, 비교적 이전 파일시스템의 경우에도 iter_read과 read가 모두 존재하는 경우가 많기 때문에, do_iter_readv_writev에 대해서만 서술하도록 하겠다.

do_loop_readv_writev함수는 writev구현까지 모두 설명한 이후에 기회가 된다면 추가 포스팅을 통해 서술하도록 하겠다.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
static ssize_t do_iter_readv_writev(struct file *filp, struct iov_iter *iter,
        loff_t *ppos, iter_fn_t fn, int flags)
{
    struct kiocb kiocb;
    ssize_t ret;
 
    if (flags & ~RWF_HIPRI)
        return -EOPNOTSUPP;
 
    init_sync_kiocb(&kiocb, filp);
    if (flags & RWF_HIPRI)
        kiocb.ki_flags |= IOCB_HIPRI;
    kiocb.ki_pos = *ppos;
 
    ret = fn(&kiocb, iter);
    BUG_ON(ret == -EIOCBQUEUED);
    *ppos = kiocb.ki_pos;
    return ret;
}
cs


위의 코드가 do_iter_readv_writev의 원형이다.


먼저 flag가 0이 아닐경우 -EOPNOTSUPP으로 리턴하게 된다. 그러나 이때, RWF_HIPRI는 제외되는데(~ + AND), High-priority request는 사실상 직접적인 입출력 수행에 큰 영향을 끼치지 않으므로(Polling에 있어 우선순위 지정), 지원하지 않을 경우 무시할 수 있기 때문이다.

RWF_HIPRI플래그가 SET되어 있을 경우에는 11~12 라인을 통해 kiocb의 ki_flags에 IOCB_HIPRI가 추가되지만, 해당 플래그의 사용 여부는 프로그래머가 관심 가질 필요 없다고 할 수 있다. 쉽게 표현하자면 C++에서의 inline키워드와 비슷한 느낌이라 보면 될듯하다.


이어서 10번 라인에서 init_sync_kiocb를 통하여 kiocb구조체를 초기화 하게 된다.

kiocb구조체는 iocb(I/O control block)의 커널 구현체로, kiocb에 대한 자세한 설명은 차후 별도 포스팅으로 진행하고, 입출력 작업 자체에 필요한 정보를 저장하는 구조체라 알아두면 될 듯 하다(각 파일시스템에서 공통적으로 현재 파일의 정보를 접근 및 변경할 때 사용케 된다).


init_sync_kiocb의 구현은 다음과 같다.


1
2
3
4
5
6
7
static inline void init_sync_kiocb(struct kiocb *kiocb, struct file *filp)
{
        *kiocb = (struct kiocb) {
                .ki_filp = filp,
                .ki_flags = iocb_flags(filp),
        };
}
cs


kiocb구조체의 ki_filp멤버에 file구조체의 포인터를 전달해주고, ki_flags에는 해당 file구조체에서 IOCB_DIRECT(Direct IO)등의 파일 플래그를 받아 저장하게 된다.

iocb_flags의 구현에 대해서는 본 포스팅에서는 언급하지 않으니, 만일 관심이 있다면 http://revdev.tistory.com/54를 참조하기를 바란다.


다시 do_iter_readv_writev로 돌아와서, 13번 라인에서 앞으로 읽어야 할 File offset을 인자로 전달받은 ppos에서 kiocb구조체의 ki_pos로 넘겨주게 된다.


마지막으로, 15번 라인에서 fn, 즉 파일시스템의 read_iter함수를 호출하면서 readv를 수행하게 된다. 최신 파일시스템인 Ext4 파일시스템의 경우에는 generic_file_read_iter함수를 호출하게 된다.

(자세한 내용은 동일하게 http://revdev.tistory.com/54를 참조)


이후 16번 라인에서 BUGON으로 fn의 리턴 값을 검사한 뒤, File offset을 읽은 값만큼 변경된 kiocb구조체의 ki_pos값으로 바꾼 뒤에 읽은 바이트 수를 리턴 하는 것으로 readv의 수행은 끝나게 된다.


간략하게 정리해보자면, SYSCALL_DEFINE3(readv...를 통해 Syscall Handling을 한 후, do_readv, vfs_readv를 넘어오면서 do_readv_writev에서 New read feature(iter_read)와 Old feature(read)로 수행이 갈라지게 되며, 최종적으로는 do_iter_readv_writev와 do_loop_readv_writev모두에서 제반작업만을 수행한 뒤, 파일시스템 Native 파일입출력 함수로 넘어가게 된다.


일단, readv에 대해서는 모든 정리를 마친 듯 하다. 다음은 writev인데... 이건 다음 포스팅에 이어 쓰도록 하겠다. 솔직히 말해서 이 정도 정리도 본인에게는 힘든 편이었다.


뭐, 이대로 끝내기에는 아쉬우니까 한 가지 재미있는 점을 보도록 하자.

http://revdev.tistory.com/54를 보면, 본인이 read함수의 Internal implementation에 대해서 정리해 놓았는데. 해당 포스팅을 잘 보다보면 분명 단일 버퍼에 대한 입출력을 수행하는 read함수에서도 iovec을 사용함을 볼 수 있다.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
static ssize_t new_sync_read(struct file *filp, char __user *buf, size_t len, loff_t *ppos)
{
    struct iovec iov = { .iov_base = buf, .iov_len = len };
    struct kiocb kiocb;
    struct iov_iter iter;
    ssize_t ret;
    init_sync_kiocb(&kiocb, filp);
    kiocb.ki_pos = *ppos;
    iov_iter_init(&iter, READ, &iov, 1, len);
    ret = filp->f_op->read_iter(&kiocb, &iter);
    BUG_ON(ret == -EIOCBQUEUED);
    *ppos = kiocb.ki_pos;
    return ret;
}
cs


바로 이 부분에서 말이다.


init_sync_kiocb와 iov_iter_init이 보인다. 물론 iovec iov도 보인다.

이게 어떻게 된 것일까? 간단하게 생각하면 된다.

read의 구현형식도 사실상 readv와 다를바 없다고 말이다. 다만, iovec의 개수가 1개일 뿐이고, 그에 따라 iov_iter의 구성에 필요한 제반작업이 딱히 필요 없어, 해당 부분이 빠진 것 뿐이다.


이를 알 수 있는 가장 큰 증거가 있는데, 위 코드에서 3번 라인과 9번 라인에 집중해보자.


1
2
struct iovec iov = { .iov_base = buf, .iov_len = len };
iov_iter_init(&iter, READ, &iov, 1, len);
cs


iovec구조체를 만들면서 동시에 인자로 넘겨받은 단일 버퍼와 그 크기를 바로 초기화 해주고 있다.

또한 iov_iter_init을 통해서 iov_iter를 만들되, 그 크기를 1로서 정의하고 있다(iov_iter_init의 4번째 인자 참조).

쉽게 말해서 read와 readv의 구현은 내부적으로는 같다고 보는 것이 맞을 듯하다.


어쩌면 당연한 결과이다. read와 readv 둘 모두 같은 파일시스템 입출력 함수를 사용한다는 점에서 미루어보자면 말이다.


뭐 아무튼 이번 포스팅은 이쯤에서 마무리 하도록 하겠다. 이후에 좀 더 좋은 포스팅으로 찾아오도록 하겠다.

포스팅 내용중에 애매하거나 이상한 표현이 있을 경우 지적해주면 바로 정정하겠다.




Posted by RevDev
,