欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

【OVS2.5源码解读】 用户态的flow table流表操作

程序员文章站 2022-07-06 21:54:52
...

当一个数据包到达网卡的时候,首先要经过内核Openvswitch.ko,流表Flow Table在内核中有一份,通过key查找内核中的flow table,即可以得到action,然后执行action之后,直接发送这个包,只有在内核无法查找到流表项的时候,才会通过upcall来调用用户态ovs-vswtichd中的flow table。

【OVS2.5源码解读】 用户态的flow table流表操作

upcall线程处理由datapath通过netlink机制上送的报文。无论是内核态datapath还是基于dpdk的用户态datapath,当flow table查不到之后都会进入upcall的处理(慢速路径),其入口函数为ofproto-dpif-upcall.c中的udpif_upcall_handler函数。

1、udpif_upcall_handler函数

* The upcall handler thread tries to read a batch of UPCALL_MAX_BATCH 
 * upcalls from dpif, processes the batch and installs corresponding flows 
 * in dpif. */  
static void *  
udpif_upcall_handler(void *arg)  
{  
    struct handler *handler = arg;  
    struct udpif *udpif = handler->udpif;  

    while (!latch_is_set(&handler->udpif->exit_latch)) {    
        if (recv_upcalls(handler)) {  
            poll_immediate_wake();     //不阻塞,说明还有upcall需要处理  
        } else {  
            dpif_recv_wait(udpif->dpif, handler->handler_id);     //阻塞在netlink接收上  
            latch_wait(&udpif->exit_latch);   
        }  
        poll_block();       //poll阻塞  
    }  

    return NULL;  
}  

upcall的处理函数udpif_upcall_handler会在udpif_start_threads里面初始化,同时创建的还有udpif_revalidator的线程

udpif_upcall_handler通过fd poll的方式等待触发,如果有upcall上送,则进入recv_upcalls的处理函数中

2、recv_upcalls函数

static size_t  
recv_upcalls(struct handler *handler)  
{  
    struct udpif *udpif = handler->udpif;  
    uint64_t recv_stubs[UPCALL_MAX_BATCH][512 / 8];  
    struct ofpbuf recv_bufs[UPCALL_MAX_BATCH];  
    struct dpif_upcall dupcalls[UPCALL_MAX_BATCH];  
    struct upcall upcalls[UPCALL_MAX_BATCH];  
    struct flow flows[UPCALL_MAX_BATCH];  
    size_t n_upcalls, i;  

    n_upcalls = 0;  
    while (n_upcalls < UPCALL_MAX_BATCH) {  
        struct ofpbuf *recv_buf = &recv_bufs[n_upcalls];  
        struct dpif_upcall *dupcall = &dupcalls[n_upcalls];  
        struct upcall *upcall = &upcalls[n_upcalls];  
        struct flow *flow = &flows[n_upcalls];  
        unsigned int mru;  
        int error;  

        ofpbuf_use_stub(recv_buf, recv_stubs[n_upcalls],  
                        sizeof recv_stubs[n_upcalls]);  
        if (dpif_recv(udpif->dpif, handler->handler_id, dupcall, recv_buf)) {    //接收upcall报文  
            ofpbuf_uninit(recv_buf);  
            break;  
        }  

        if (odp_flow_key_to_flow(dupcall->key, dupcall->key_len, flow)  
            == ODP_FIT_ERROR) {  
            goto free_dupcall;  
        }  

        if (dupcall->mru) {  
            mru = nl_attr_get_u16(dupcall->mru);  
        } else {  
            mru = 0;  
        }  

        error = upcall_receive(upcall, udpif->backer, &dupcall->packet,  
                               dupcall->type, dupcall->userdata, flow, mru,  
                               &dupcall->ufid, PMD_ID_NULL);  
        if (error) {  
            if (error == ENODEV) {  
                /* Received packet on datapath port for which we couldn't 
                 * associate an ofproto.  This can happen if a port is removed 
                 * while traffic is being received.  Print a rate-limited 
                 * message in case it happens frequently. */  
                dpif_flow_put(udpif->dpif, DPIF_FP_CREATE, dupcall->key,  
                              dupcall->key_len, NULL, 0, NULL, 0,  
                              &dupcall->ufid, PMD_ID_NULL, NULL);  
                VLOG_INFO_RL(&rl, "received packet on unassociated datapath "  
                             "port %"PRIu32, flow->in_port.odp_port);  
            }  
            goto free_dupcall;  
        }  

        upcall->key = dupcall->key;  
        upcall->key_len = dupcall->key_len;  
        upcall->ufid = &dupcall->ufid;  

        upcall->out_tun_key = dupcall->out_tun_key;  
        upcall->actions = dupcall->actions;  

        if (vsp_adjust_flow(upcall->ofproto, flow, &dupcall->packet)) {  
            upcall->vsp_adjusted = true;  
        }  

        pkt_metadata_from_flow(&dupcall->packet.md, flow);  
        flow_extract(&dupcall->packet, flow);  

        error = process_upcall(udpif, upcall,  
                               &upcall->odp_actions, &upcall->wc);  
        if (error) {  
            goto cleanup;  
        }  

        n_upcalls++;  
        continue;  

cleanup:  
        upcall_uninit(upcall);  
free_dupcall:  
        dp_packet_uninit(&dupcall->packet);  
        ofpbuf_uninit(recv_buf);  
    }  

    if (n_upcalls) {  
        handle_upcalls(handler->udpif, upcalls, n_upcalls);  
        for (i = 0; i < n_upcalls; i++) {  
            dp_packet_uninit(&dupcalls[i].packet);  
            ofpbuf_uninit(&recv_bufs[i]);  
            upcall_uninit(&upcalls[i]);  
        }  
    }  

    return n_upcalls;  
}  

recv_upcalls会一次处理UPCALL_MAX_BATCH个请求,我们以单个请求的处理为例子:

(1)首先调用的是dpif_recv,实际调用了dpif_class->recv注册的函数。接收的数据会放到struct dpif_upcall和struct ofpbuf里面。struct dpif_upcall代表了一个报文的upcall,除了报文内容还有upcall带上来的netlink属性数据。

int  
dpif_recv(struct dpif *dpif, uint32_t handler_id, struct dpif_upcall *upcall,  
          struct ofpbuf *buf)  
{  
    int error = EAGAIN;  

    if (dpif->dpif_class->recv) {  
        error = dpif->dpif_class->recv(dpif, handler_id, upcall, buf);  //实际调用dpif_netlink_recv函数  
        if (!error) {  
            dpif_print_packet(dpif, upcall);  
        } else if (error != EAGAIN) {  
            log_operation(dpif, "recv", error);  
        }  
    }  
    return error;  
}  

最终会调用nl_sock_recv__函数,通过recvmsg函数获取netlink消息,其中用到了sock对象、event对象等、channel对象,而内核提供了genl框架可以更加简便地处理netlink消息。

由此,upcall线程处理流程是这样的,epoll_wait所有的sock,如果返回,说明有报文,那么在返回的events中,携带了port_id信息,通过该信息可以确定是哪个sock,那么可以通过该sock接收到报文,然后针对报文进行处理。 upcall会有多个线程,每个线程都有全部的sock对象,意味着各进程之间通过竞争处理upcall消息。

(2)第二步是调用upcall_receive,该函数用于构造一个struct upcall结构体

static int
upcall_receive(struct upcall *upcall, const struct dpif_backer *backer,
               const struct dp_packet *packet, enum dpif_upcall_type type,
               const struct nlattr *userdata, const struct flow *flow,
               const unsigned int mru,
               const ovs_u128 *ufid, const unsigned pmd_id)
{
    int error;

    error = xlate_lookup(backer, flow, &upcall->ofproto, &upcall->ipfix,
                         &upcall->sflow, NULL, &upcall->in_port);
    if (error) {
        return error;
    }
    upcall->recirc = NULL;
    upcall->have_recirc_ref = false;
    upcall->flow = flow;
    upcall->packet = packet;
    upcall->ufid = ufid;
    upcall->pmd_id = pmd_id;
    upcall->type = type;
    upcall->userdata = userdata;
    ofpbuf_use_stub(&upcall->odp_actions, upcall->odp_actions_stub,
                    sizeof upcall->odp_actions_stub);
    ofpbuf_init(&upcall->put_actions, 0);

    upcall->xout_initialized = false;
    upcall->ukey_persists = false;

    upcall->ukey = NULL;
    upcall->key = NULL;
    upcall->key_len = 0;
    upcall->mru = mru;

    upcall->out_tun_key = NULL;
    upcall->actions = NULL;

    return 0;
}

/* Given a datapath and flow metadata ('backer', and 'flow' respectively),
 * optionally populates 'ofproto' with the ofproto_dpif, 'ofp_in_port' with the
 * openflow in_port, and 'ipfix', 'sflow', and 'netflow' with the appropriate
 * handles for those protocols if they're enabled.  Caller may use the returned
 * pointers until quiescing, for longer term use additional references must
 * be taken.
 *
 * Returns 0 if successful, ENODEV if the parsed flow has no associated ofproto.
 */
int
xlate_lookup(const struct dpif_backer *backer, const struct flow *flow,
             struct ofproto_dpif **ofprotop, struct dpif_ipfix **ipfix,
             struct dpif_sflow **sflow, struct netflow **netflow,
             ofp_port_t *ofp_in_port)

内核层会封装含有key、packet和action参数等内容的upcall消息上交用户层。那么用户层接收到upcall之后直接匹配表项即可,为什么还要分类呢?(其主要体现在函数read_upcalls()(ofproto-dpif-upcalls.c))。
先给一张图:

【OVS2.5源码解读】 用户态的flow table流表操作

可以看到,用户层的upcall结构体有dupcall和miss两个成员,这就和ovs性能提升密切相关了。OVS将具有相同key的upcall归为一类,管理映射到同一个miss中。这样就完成了相似packet的分类工作,便于后期统一匹配处理,提高效率。

在上面这个过程中,需要从key提取出flow进行哈希查找和分类。Flow就是前面讲解到的用户层用于表示匹配域的结构体,OVS调用函数flow_extract()函数从packet与md(metadata元数据)中解析并构造flow赋值给miss->flow,在这里别忘了添加相应解析函数。

(3)提取包头,调用void flow_extract(struct dp_packet *packet, struct flow *flow)

struct dp_packet是实际报文的封装,如果是在dpdk的dp下,会在mbuf后面的线性内存存放这些元数据。

提取出的flow如下:

struct flow {
    /* Metadata */
    struct flow_tnl tunnel; /* Encapsulating tunnel parameters. */
    ovs_be64 metadata; /* OpenFlow Metadata. */
    uint32_t regs[FLOW_N_REGS]; /* Registers. */
    uint32_t skb_priority; /* Packet priority for QoS. */
    uint32_t pkt_mark; /* Packet mark. */
    uint32_t dp_hash; /* Datapath computed hash value. The exact
                                 * computation is opaque to the user space. */
    union flow_in_port in_port; /* Input port.*/
    uint32_t recirc_id; /* Must be exact match. */
    uint16_t ct_state; /* Connection tracking state. */
    uint16_t ct_zone; /* Connection tracking zone. */
    uint32_t ct_mark; /* Connection mark.*/
    uint8_t pad1[4]; /* Pad to 64 bits. */
    ovs_u128 ct_label; /* Connection label. */
    uint32_t conj_id; /* Conjunction ID. */
    ofp_port_t actset_output; /* Output port in action set. */
    uint8_t pad2[2]; /* Pad to 64 bits. */

    /* L2, Order the same as in the Ethernet header! (64-bit aligned) */
    struct eth_addr dl_dst; /* Ethernet destination address. */
    struct eth_addr dl_src; /* Ethernet source address. */
    ovs_be16 dl_type; /* Ethernet frame type. */
    ovs_be16 vlan_tci; /* If 802.1Q, TCI | VLAN_CFI; otherwise 0. */
    ovs_be32 mpls_lse[ROUND_UP(FLOW_MAX_MPLS_LABELS, 2)]; /* MPLS label stack
                                                             (with padding). */
    /* L3 (64-bit aligned) */
    ovs_be32 nw_src; /* IPv4 source address. */
    ovs_be32 nw_dst; /* IPv4 destination address. */
    struct in6_addr ipv6_src; /* IPv6 source address. */
    struct in6_addr ipv6_dst; /* IPv6 destination address. */
    ovs_be32 ipv6_label; /* IPv6 flow label. */
    uint8_t nw_frag; /* FLOW_FRAG_* flags. */
    uint8_t nw_tos; /* IP ToS (including DSCP and ECN). */
    uint8_t nw_ttl; /* IP TTL/Hop Limit. */
    uint8_t nw_proto; /* IP protocol or low 8 bits of ARP opcode. */
    struct in6_addr nd_target; /* IPv6 neighbor discovery (ND) target. */
    struct eth_addr arp_sha; /* ARP/ND source hardware address. */
    struct eth_addr arp_tha; /* ARP/ND target hardware address. */
    ovs_be16 tcp_flags; /* TCP flags. With L3 to avoid matching L4. */
    ovs_be16 pad3; /* Pad to 64 bits. */

    /* L4 (64-bit aligned) */
    ovs_be16 tp_src; /* TCP/UDP/SCTP source port/ICMP type. */
    ovs_be16 tp_dst; /* TCP/UDP/SCTP destination port/ICMP code. */
    ovs_be32 igmp_group_ip4; /* IGMP group IPv4 address.
                                 * Keep last for BUILD_ASSERT_DECL below. */
};

(4) 然后调用static int process_upcall(struct udpif *udpif, struct upcall *upcall, struct ofpbuf *odp_actions, struct flow_wildcards *wc)来处理upcall。

一共有4中upcall:

  • MISS_UPCALL(未命中流表)
  • SFLOW_UPCALL(sFlow)
  • IPFIX_UPCALL
  • FLOW_SAMPLE_UPCALL。

而datapath一共发送两种类型的netlink消息:
OVS_PACKET_CMD_MISS和OVS_PACKET_CMD_ACTION。
upcall类型的映射关系通过classify_upcall函数实现

对于MISS_UPCALL,调用static void upcall_xlate(struct udpif *udpif, struct upcall *upcall, struct ofpbuf *odp_actions, struct flow_wildcards *wc):

switch (classify_upcall(upcall->type, userdata)) {
case MISS_UPCALL:
    upcall_xlate(udpif, upcall, odp_actions, wc);
    return 0;

upcall_xlate首先初始化xlate_in,之后调用xlate_actions,生成datapath需要的struct xlate_out,xlate_actions函数比较复杂,其中最重要的调用是通过rule_dpif_lookup_from_table查找到匹配的流表规则,进而生成actions 。

rule_dpif_lookup_from_table会通过流表的级联一个个顺序查找,每单个流表都会调用rule_dpif_lookup_in_table,而对于rule_dpif_lookup_in_table而言,实际调用了classifier_lookup来在流表中查找rule。

找到rule之后,xlate_actions最终调用do_xlate_actions针对每种ACTION_ATTR对flow执行不同操作:

switch (a->type) {
case OFPACT_OUTPUT:
    xlate_output_action(ctx, ofpact_get_OUTPUT(a)->port,
                        ofpact_get_OUTPUT(a)->max_len, true);
    break;

case OFPACT_SET_VLAN_VID:
    wc->masks.vlan_tci |= htons(VLAN_VID_MASK | VLAN_CFI);
    if (flow->vlan_tci & htons(VLAN_CFI) ||
        ofpact_get_SET_VLAN_VID(a)->push_vlan_if_needed) {
        flow->vlan_tci &= ~htons(VLAN_VID_MASK);
        flow->vlan_tci |= (htons(ofpact_get_SET_VLAN_VID(a)->vlan_vid)
                           | htons(VLAN_CFI));
    }
    break;

case OFPACT_SET_ETH_SRC:
    WC_MASK_FIELD(wc, dl_src);
    flow->dl_src = ofpact_get_SET_ETH_SRC(a)->mac;
    break;

case OFPACT_SET_ETH_DST:
    WC_MASK_FIELD(wc, dl_dst);
    flow->dl_dst = ofpact_get_SET_ETH_DST(a)->mac;
    break;

case OFPACT_SET_IPV4_SRC:
    CHECK_MPLS_RECIRCULATION();
    if (flow->dl_type == htons(ETH_TYPE_IP)) {
        memset(&wc->masks.nw_src, 0xff, sizeof wc->masks.nw_src);
        flow->nw_src = ofpact_get_SET_IPV4_SRC(a)->ipv4;
    }
    break;

case OFPACT_SET_IPV4_DST:
    CHECK_MPLS_RECIRCULATION();
    if (flow->dl_type == htons(ETH_TYPE_IP)) {
        memset(&wc->masks.nw_dst, 0xff, sizeof wc->masks.nw_dst);
        flow->nw_dst = ofpact_get_SET_IPV4_DST(a)->ipv4;
    }
    break;

case OFPACT_SET_L4_SRC_PORT:
    CHECK_MPLS_RECIRCULATION();
    if (is_ip_any(flow) && !(flow->nw_frag & FLOW_NW_FRAG_LATER)) {
        memset(&wc->masks.nw_proto, 0xff, sizeof wc->masks.nw_proto);
        memset(&wc->masks.tp_src, 0xff, sizeof wc->masks.tp_src);
        flow->tp_src = htons(ofpact_get_SET_L4_SRC_PORT(a)->port);
    }
    break;

case OFPACT_SET_L4_DST_PORT:
    CHECK_MPLS_RECIRCULATION();
    if (is_ip_any(flow) && !(flow->nw_frag & FLOW_NW_FRAG_LATER)) {
        memset(&wc->masks.nw_proto, 0xff, sizeof wc->masks.nw_proto);
        memset(&wc->masks.tp_dst, 0xff, sizeof wc->masks.tp_dst);
        flow->tp_dst = htons(ofpact_get_SET_L4_DST_PORT(a)->port);
    }
    break;

(5) 最后调用static void handle_upcalls(struct udpif *udpif, struct upcall *upcalls, size_t n_upcalls)将flow rule添加到内核中的datapath

他会调用void dpif_operate(struct dpif *dpif, struct dpif_op **ops, size_t n_ops),
调用dpif->dpif_class->operate(dpif, ops, chunk);
调用dpif_netlink_operate():

static void
dpif_netlink_operate(struct dpif *dpif_, struct dpif_op **ops, size_t n_ops)
{
    struct dpif_netlink *dpif = dpif_netlink_cast(dpif_);

    while (n_ops > 0) {
        size_t chunk = dpif_netlink_operate__(dpif, ops, n_ops);
        ops += chunk;
        n_ops -= chunk;
    }
}

在static size_t dpif_netlink_operate__(struct dpif_netlink *dpif, struct dpif_op **ops, size_t n_ops)中,有以下的代码:

switch (op->type) {
case DPIF_OP_FLOW_PUT:
    put = &op->u.flow_put;
    dpif_netlink_init_flow_put(dpif, put, &flow);
    if (put->stats) {
        flow.nlmsg_flags |= NLM_F_ECHO;
        aux->txn.reply = &aux->reply;
    }
    dpif_netlink_flow_to_ofpbuf(&flow, &aux->request);
    break;

case DPIF_OP_FLOW_DEL:
    del = &op->u.flow_del;
    dpif_netlink_init_flow_del(dpif, del, &flow);
    if (del->stats) {
        flow.nlmsg_flags |= NLM_F_ECHO;
        aux->txn.reply = &aux->reply;
    }
    dpif_netlink_flow_to_ofpbuf(&flow, &aux->request);
    break;

会调用netlink修改内核中datapath的规则。