The Bones of a Resource Manager

这篇文章将从服务器端和客户端两侧来描述大体的框架和分层,并会给出实例。

1. Under the covers

1.1 Under the client’s covers

当一个客户端调用需要路径名解析的函数时(比如open()/rename()/stat()/unlink()),它会同时向进程管理器和对应的资源管理器发送消息,进而去获取文件描述符,并通过这个文件描述符向与路径名关联的设备发送消息。

/*
 * In this stage, the client talks 
 * to the process manager and the resource manager.
 */
fd = open("/dev/ser1", O_RDWR);

/*
 * In this stage, the client talks directly to the
 * resource manager.
 */
for (packet = 0; packet < npackets; packet++)
{
    write(fd, packets[packet], PACKET_SIZE);
}
close(fd);

上边的这个代码,背后的逻辑是怎么实现的呢?我们假设这是名为devc-ser8250资源管理器管理的串口设备,注册的路径名为dev/ser1

Under-the-cover communication between the client, the process manager, and the resource manager

客户端发送query消息,open()函数会向进程管理器发送消息,请求查找名字,比如/dev/ser1
进程管理器会返回与路径名关联的nd/pid/child/handle。当devc-ser8250资源管理器使用/dev/ser1注册时,进程管理器会维护一个类似于下边的条目信息:
0, 47167, 1, 0, 0, /dev/ser1
其中条目的各项分别代表:

  • 0, Node descriptor(nd),用于描述在网络中的节点;
  • 47167, Process ID(pid),资源管理器的进程ID号;
  • 1, Channel ID(chid),资源管理器用于接收消息的通道;
  • 0, Handle的索引号,由资源管理器注册的Handle;
  • 0, 在名称注册期间传递的打开类型;
  • /dev/ser1,注册的路径名;
    在进程管理器中会维护一个条目表,用于记录各个资源管理器的信息。在名字匹配的时候会选择最长匹配。
    客户端需要向资源管理器发送一个connect消息,首先它需要创建一个连接的通道:
fd = ConnectAttach(nd, pid, chid, 0, 0);

客户端也是使用这个fd向资源管理器发送连接消息,请求打开/dev/ser1。当资源管理器收到连接消息后,会进行权限检查。

1.资源管理器通常会回应通过或失败。
2.当客户端获取到文件描述符后,可以直接通过它向设备发送消息。示例代码看起来像是客户端直接向设备写入,实际上在调用write()时,会先发送一个_IO_WRITE消息给资源管理器,请求数据写入,客户端调用close()时,会发送_IO_CLOSE_DUP消息给资源管理器,完成最后资源的回收清理工作。

1.2 Under the resource manager’s covers

资源管理器就是一个服务器,通过send/receive/reply消息协议来接收和回复消息,伪代码如下:

initialize the resource manager
register the name with the process manager
DO forever
    receive a message
    SWITCH on the type of message
        CASE _IO_CONNECT:
            call io_open handler
            ENDCASE
        CASE _IO_READ:
            call io_read handler
            ENDCASE
        CASE _IO_WRITE:
            call io_write handler
            ENDCASE
        .   /* etc. handle all other messages */
        .   /* that may occur, performing     */
        .   /* processing as appropriate      */
    ENDSWITCH
ENDDO

事实上,上述代码中的很多细节在实现一个资源管理器中可能用不到,因为有一些库进行了封装,但是自己还是需要去实现对消息的回复。

2. Layers in a resource manager

资源管理器由四层组成,从下到上分别为:

  1. iofunc layer
  2. resmgr layer
  3. dispatch layer
  4. thread pool layer

2.1 iofunc layer

这一层的主要功能是提供POSIX特性,用户编写资源管理器时不需要太关心向外界提供POSIX文件系统所涉及的细节。
这一层由一组函数iofunc_*组成,包含了默认处理程序,如果没有提供自己的处理程序的话,就会使用默认提供的程序。比如,如果没有提供io_open处理程序的话,就会调用iofunc_open_default()。同时也包含了默认处理程序调用的帮助函数,当自己实现处理程序时,也可以调用这些帮助函数。

2.2 resmgr layer

这一层负责管理资源管理器库的大部分细节:

  • 检查传入消息

  • 调用合适的处理程序来处理消息
    如果不使用这一层的话,则需要用户自己解析消息,大部分资源管理器都会用到这一层。

The resmgr layer to handle IO* message

2.3 dispatch layer

这一层充当多种事件类型的阻塞点,使用这一层,可以处理:

  • IO*消息, 使用resmgr层;

  • select(),注册一个处理函数,当数据包到达时调用,这里的函数是select_*()函数;

  • Pulses,注册一个处理函数,当pulses来的时候调用,这里的函数是pulse_*()函数;

  • 其他消息,可以提供自己创建的一系列消息类型和处理函数,当消息到达时调用对应的处理函数,这里的函数是

message_*()

函数;

use the dispatch layer to handle IO* messages, select, pulses, and other messages

消息进行分发时,会进行查找和匹配,然后会调用到匹配的Handler函数,如果没有找到匹配的,则返回

MsgError

2.4 thread pool layer

这一层允许用户创建单线程或多线程资源管理器,这意味着可以一个线程读,一个线程写。需要为线程提供阻塞函数,以及阻塞函数返回时需要调用的处理函数。大多数情况下,可以将dispatch layer的任务分配到线程上来,当然,也可以是resmgr layer任务或自己实现的功能。

3. Simple examples of device resource managers

3.1 Single-threaded device resource managers

先来看一份完整的单线程设备资源管理器代码吧:

#include <errno.h>
#include <stdio.h>
#include <stddef.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/iofunc.h>
#include <sys/dispatch.h>

static resmgr_connect_funcs_t    connect_funcs;
static resmgr_io_funcs_t         io_funcs;
static iofunc_attr_t             attr;

main(int argc, char **argv)
{
    /* declare variables we'll be using */
    resmgr_attr_t        resmgr_attr;
    dispatch_t           *dpp;
    dispatch_context_t   *ctp;
    int                  id;

    /* initialize dispatch interface */
    if((dpp = dispatch_create()) == NULL) {
        fprintf(stderr,
                "%s: Unable to allocate dispatch handle.\n",
                argv[0]);
        return EXIT_FAILURE;
    }

    /* initialize resource manager attributes */
    memset(&resmgr_attr, 0, sizeof resmgr_attr);
    resmgr_attr.nparts_max = 1;
    resmgr_attr.msg_max_size = 2048;

    /* initialize functions for handling messages */
    iofunc_func_init(_RESMGR_CONNECT_NFUNCS, &connect_funcs, 
                     _RESMGR_IO_NFUNCS, &io_funcs);

    /* initialize attribute structure used by the device */
    iofunc_attr_init(&attr, S_IFNAM | 0666, 0, 0);

    /* attach our device name */
    id = resmgr_attach(
            dpp,            /* dispatch handle        */
            &resmgr_attr,   /* resource manager attrs */
            "/dev/sample",  /* device name            */
            _FTYPE_ANY,     /* open type              */
            0,              /* flags                  */
            &connect_funcs, /* connect routines       */
            &io_funcs,      /* I/O routines           */
            &attr);         /* handle                 */
    if(id == -1) {
        fprintf(stderr, "%s: Unable to attach name.\n", argv[0]);
        return EXIT_FAILURE;
    }

    /* allocate a context structure */
    ctp = dispatch_context_alloc(dpp);

    /* start the resource manager message loop */
    while(1) {
        if((ctp = dispatch_block(ctp)) == NULL) {
            fprintf(stderr, "block error\n");
            return EXIT_FAILURE;
        }
        dispatch_handler(ctp);
    }
}

上述代码完成以下功能:

  1. 初始化dispatch接口
    通过调用dispatch_create()接口创建dispatch_t结构,这个结构中包含了channel ID,但channel ID实际的创建是在附加其他内容的时候,比如调用resmgr_attach()/message_attach()/pulse_attach()等。
  2. 初始化资源管理器属性
    当调用resmgr_attach()时会传入resmgr_attr_t结构,在这个示例中主要配置:
  • nparts_max,可供服务器回复的IOV的个数;
  • msg_max_size,最大接收缓冲大小;
  1. 初始化消息处理函数
    在这个例子中提供了两张表,用于指定在特定消息到达时调用哪个函数:
  • 连接函数表;
  • I/O函数表;
    可以调用iofunc_func_init()接口来配置默认操作函数。
  1. 初始化设备使用的属性结构
    调用iofunc_attr_init()接口来设置,属性结构中至少应该包括以下信息:
  • 权限和设备类型
  • 组ID和属主ID
  1. 在名字空间中注册一个名字
    调用resmgr_attach()来注册资源管理器的路径。在资源管理器能接收到其他程序的消息之前,需要通过进程管理器通知其他程序它与路径名的绑定关系。

  2. 分配context结构体
    调用dispatch_context_alloc()接口来分配,context结构体包含了用于接收消息的缓冲,也包含了可用于回复消息的IOVs缓冲。

  3. 开始资源管理器消息循环
    当进入循环后,资源管理器便在dispatch_block()中接收消息,并调用dispatch_handler()进行分发处理,选择连接函数表和I/O函数表中的合适函数进行执行。当执行完毕后,会再进入dispatch_block()来等待接收其他消息。

3.2 Multi-threaded device resource managers

下边是多线程设备资源管理器示例代码:

#include <errno.h>
#include <stdio.h>
#include <stddef.h>
#include <stdlib.h>
#include <unistd.h>

/*
 * Define THREAD_POOL_PARAM_T such that we can avoid a compiler
 * warning when we use the dispatch_*() functions below
 */
#define THREAD_POOL_PARAM_T dispatch_context_t

#include <sys/iofunc.h>
#include <sys/dispatch.h>

static resmgr_connect_funcs_t    connect_funcs;
static resmgr_io_funcs_t         io_funcs;
static iofunc_attr_t             attr;

main(int argc, char **argv)
{
    /* declare variables we'll be using */
    thread_pool_attr_t   pool_attr;
    resmgr_attr_t        resmgr_attr;
    dispatch_t           *dpp;
    thread_pool_t        *tpp;
    dispatch_context_t   *ctp;
    int                  id;

    /* initialize dispatch interface */
    if((dpp = dispatch_create()) == NULL) {
        fprintf(stderr, "%s: Unable to allocate dispatch handle.\n",
                argv[0]);
        return EXIT_FAILURE;
    }

    /* initialize resource manager attributes */
    memset(&resmgr_attr, 0, sizeof resmgr_attr);
    resmgr_attr.nparts_max = 1;
    resmgr_attr.msg_max_size = 2048;

    /* initialize functions for handling messages */
    iofunc_func_init(_RESMGR_CONNECT_NFUNCS, &connect_funcs, 
                     _RESMGR_IO_NFUNCS, &io_funcs);

    /* initialize attribute structure used by the device */
    iofunc_attr_init(&attr, S_IFNAM | 0666, 0, 0);

    /* attach our device name */
    id = resmgr_attach(dpp,            /* dispatch handle        */
                       &resmgr_attr,   /* resource manager attrs */
                       "/dev/sample",  /* device name            */
                       _FTYPE_ANY,     /* open type              */
                       0,              /* flags                  */
                       &connect_funcs, /* connect routines       */
                       &io_funcs,      /* I/O routines           */
                       &attr);         /* handle                 */
    if(id == -1) {
        fprintf(stderr, "%s: Unable to attach name.\n", argv[0]);
        return EXIT_FAILURE;
    }

    /* initialize thread pool attributes */
    memset(&pool_attr, 0, sizeof pool_attr);
    pool_attr.handle = dpp;
    pool_attr.context_alloc = dispatch_context_alloc;
    pool_attr.block_func = dispatch_block; 
    pool_attr.unblock_func = dispatch_unblock;
    pool_attr.handler_func = dispatch_handler;
    pool_attr.context_free = dispatch_context_free;
    pool_attr.lo_water = 2;
    pool_attr.hi_water = 4;
    pool_attr.increment = 1;
    pool_attr.maximum = 50;

    /* allocate a thread pool handle */
    if((tpp = thread_pool_create(&pool_attr, 
                                 POOL_FLAG_EXIT_SELF)) == NULL) {
        fprintf(stderr, "%s: Unable to initialize thread pool.\n",
                argv[0]);
        return EXIT_FAILURE;
    }

    /* start the threads; will not return */
    thread_pool_start(tpp);
}

从代码中可以看出,大部分代码与单线程示例一样。在这个代码中,线程在阻塞循环中用到了dispatch_*()函数(dispatch layer)。

  1. 初始化线程池的属性
    pool_attr的各个字段赋值,用于告知线程在阻塞循环时调用哪些函数,以及线程池需要维护多少线程。
  2. 分配一个线程池的handle
    调用thread_pool_create()接口来分配,这个handle用于控制整个线程池。
  3. 开启线程
    调用thread_pool_start()接口,启动线程池。每个新创建的线程都会使用在属性结构中给出的context_alloc()函数来分配THREAD_POOL_PARAM_T定义类型的context结构。

3.3 Using MsgSend() and MsgReply()

可以不使用read()write()接口来与资源管理器交互,可以调用MsgSend()来发送消息,下边的示例将分两部分:服务器和客户端。服务器端需要使能PROCMGR_AID_PATHSPACE,用于确保能调用resmgr_attach()函数。

服务器代码:

/*
 * ResMgr and Message Server Process
 */

#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <errno.h>
#include <string.h>
#include <sys/neutrino.h>
#include <sys/iofunc.h>
#include <sys/dispatch.h>

resmgr_connect_funcs_t  ConnectFuncs;
resmgr_io_funcs_t       IoFuncs;
iofunc_attr_t           IoFuncAttr;


typedef struct
{
    uint16_t msg_no;
    char     msg_data[255];
} server_msg_t;


int message_callback( message_context_t *ctp, int type, unsigned flags, 
                      void *handle )
{
    server_msg_t *msg;
    int num;
    char msg_reply[255];

    /* Cast a pointer to the message data */
    msg = (server_msg_t *)ctp->msg;

    /* Print some useful information about the message */
    printf( "\n\nServer Got Message:\n" );
    printf( "  type: %d\n" , type );
    printf( "  data: %s\n\n", msg->msg_data );

    /* Build the reply message */
    num = type - _IO_MAX;
    snprintf( msg_reply, 254, "Server Got Message Code: _IO_MAX + %d", num );

    /* Send a reply to the waiting (blocked) client */ 
    MsgReply( ctp->rcvid, EOK, msg_reply, strlen( msg_reply ) + 1 );

    return 0;
}



int main( int argc, char **argv )
{
    resmgr_attr_t        resmgr_attr;
    message_attr_t       message_attr;
    dispatch_t           *dpp;
    dispatch_context_t   *ctp, *ctp_ret;
    int                  resmgr_id, message_id;

    /* Create the dispatch interface */
    dpp = dispatch_create();
    if( dpp == NULL )
    {
        fprintf( stderr, "dispatch_create() failed: %s\n", 
                 strerror( errno ) );
        return EXIT_FAILURE;
    }

    memset( &resmgr_attr, 0, sizeof( resmgr_attr ) );
    resmgr_attr.nparts_max = 1;
    resmgr_attr.msg_max_size = 2048;

    /* Setup the default I/O functions to handle open/read/write/... */
    iofunc_func_init( _RESMGR_CONNECT_NFUNCS, &ConnectFuncs,
                      _RESMGR_IO_NFUNCS, &IoFuncs );

    /* Setup the attribute for the entry in the filesystem */
    iofunc_attr_init( &IoFuncAttr, S_IFNAM | 0666, 0, 0 );

    resmgr_id = resmgr_attach( dpp, &resmgr_attr, "serv", _FTYPE_ANY, 
                               0, &ConnectFuncs, &IoFuncs, &IoFuncAttr );
    if( resmgr_id == -1 )
    {
        fprintf( stderr, "resmgr_attach() failed: %s\n", strerror( errno ) );
        return EXIT_FAILURE;
    }

    /* Setup our message callback */
    memset( &message_attr, 0, sizeof( message_attr ) );
    message_attr.nparts_max = 1;
    message_attr.msg_max_size = 4096;

    /* Attach a callback (handler) for two message types */
    message_id = message_attach( dpp, &message_attr, _IO_MAX + 1,
                                 _IO_MAX + 2, message_callback, NULL );
    if( message_id == -1 )
    {
        fprintf( stderr, "message_attach() failed: %s\n", strerror( errno ) );
        return EXIT_FAILURE;
    }

    /* Setup a context for the dispatch layer to use */
    ctp = dispatch_context_alloc( dpp );
    if( ctp == NULL )
    {
        fprintf( stderr, "dispatch_context_alloc() failed: %s\n", 
                 strerror( errno ) );
        return EXIT_FAILURE;
    }


    /* The "Data Pump" - get and process messages */
    while( 1 )
    {
        ctp_ret = dispatch_block( ctp );
        if( ctp_ret )
        {
            dispatch_handler( ctp );
        }
        else
        {
            fprintf( stderr, "dispatch_block() failed: %s\n", 
                     strerror( errno ) );
            return EXIT_FAILURE;
        }
    }

    return EXIT_SUCCESS;
}
  • 首先是调用dispatch_create()接口来创建dpp,通过这个handle将接收到的消息转发到合适的层(resmgr, message, pulse);
  • 设置调用resmgr_attach()所需要的变量;
  • 调用iofunc_func_init()来设置默认处理函数,调用iofunc_attr_init()来设置属性结构;
  • 调用resmgr_attach(),注意这时候注册的路径不是绝对路径,因此默认在它的执行路径下。.
  • 告诉dispatch layer,除了由resmgr layer处理的标准I/O和连接消息之外,还需要处理自己的消息。
  • 调用message_attach()接口来注册自己的消息处理函数。
  • 当调用dispatch_block()接收消息后,调用dispatch_handler()来处理,实际上会在dispatch_handler()中调用message_callback()函数。当消息类型为_IO_MAX + 1_IO_MAX + 2时,就会回调函数。
  • 客户端代码:
/* 
 * Message Client Process 
 */ 

#include <stdio.h> 
#include <unistd.h> 
#include <stdlib.h> 
#include <fcntl.h> 
#include <errno.h> 
#include <string.h> 
#include <sys/neutrino.h> 
#include <sys/iofunc.h> 
#include <sys/dispatch.h> 

typedef struct 
{ 
    uint16_t msg_no; 
    char msg_data[255]; 
} client_msg_t; 

int main( int argc, char **argv ) 
{ 
    int fd; 
    int c; 
    client_msg_t msg; 
    int ret; 
    int num; 
    char msg_reply[255]; 

    num = 1; 

    /* Process any command line arguments */ 
    while( ( c = getopt( argc, argv, "n:" ) ) != -1 ) 
    { 
        if( c == 'n' ) 
        { 
            num = strtol( optarg, 0, 0 ); 
        } 
    } 
    /* Open a connection to the server (fd == coid) */ 
    fd = open( "serv", O_RDWR ); 
    if( fd == -1 ) 
    { 
        fprintf( stderr, "Unable to open server connection: %s\n", 
            strerror( errno ) ); 
        return EXIT_FAILURE; 
    } 

    /* Clear the memory for the msg and the reply */ 
    memset( &msg, 0, sizeof( msg ) ); 
    memset( &msg_reply, 0, sizeof( msg_reply ) ); 

    /* Set up the message data to send to the server */ 
    msg.msg_no = _IO_MAX + num; 
    snprintf( msg.msg_data, 254, "client %d requesting reply.", getpid() ); 

    printf( "client: msg_no: _IO_MAX + %d\n", num ); 
    fflush( stdout ); 

    /* Send the data to the server and get a reply */ 
    ret = MsgSend( fd, &msg, sizeof( msg ), msg_reply, 255 ); 
    if( ret == -1 ) 
    { 
        fprintf( stderr, "Unable to MsgSend() to server: %s\n", strerror( errno ) ); 
        return EXIT_FAILURE; 
    } 

    /* Print out the reply data */ 
    printf( "client: server replied: %s\n", msg_reply ); 

    close( fd ); 

    return EXIT_SUCCESS; 
}

客户端通过open()接口获取connection id,调用MsgSend()接口去往服务器上发送消息,并且等待回复。

上述服务器和客户端的例子非常简单,但覆盖了大部分的内容,基于这个基础的框架,还可以做其他的事情:

  • 基于不同的消息类型,注册不同的消息回调函数;
  • 除了消息之外,使用pulse_attach()来接收pulse
  • 重写默认的I/O消息处理函数以便客户端也能使用read()write()来与服务器交互;
  • 使用线程池来编写多线程服务器;