summaryrefslogtreecommitdiffstats
path: root/net/tipc/bcast.c
diff options
context:
space:
mode:
authorJon Paul Maloy2015-10-22 14:51:41 +0200
committerDavid S. Miller2015-10-24 15:56:37 +0200
commit5266698661401afc5e4a1a521cf9ba10724d10dd (patch)
treecf3d466a2d9982f403a689e8a0c819c7e3693bde /net/tipc/bcast.c
parenttipc: introduce capability bit for broadcast synchronization (diff)
downloadkernel-qcow2-linux-5266698661401afc5e4a1a521cf9ba10724d10dd.tar.gz
kernel-qcow2-linux-5266698661401afc5e4a1a521cf9ba10724d10dd.tar.xz
kernel-qcow2-linux-5266698661401afc5e4a1a521cf9ba10724d10dd.zip
tipc: let broadcast packet reception use new link receive function
The code path for receiving broadcast packets is currently distinct from the unicast path. This leads to unnecessary code and data duplication, something that can be avoided with some effort. We now introduce separate per-peer tipc_link instances for handling broadcast packet reception. Each receive link keeps a pointer to the common, single, broadcast link instance, and can hence handle release and retransmission of send buffers as if they belonged to the own instance. Furthermore, we let each unicast link instance keep a reference to both the pertaining broadcast receive link, and to the common send link. This makes it possible for the unicast links to easily access data for broadcast link synchronization, as well as for carrying acknowledges for received broadcast packets. Signed-off-by: Jon Maloy <jon.maloy@ericsson.com> Reviewed-by: Ying Xue <ying.xue@windriver.com> Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/tipc/bcast.c')
-rw-r--r--net/tipc/bcast.c163
1 files changed, 132 insertions, 31 deletions
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index 7fdf895e7973..ea28c2919b38 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -112,11 +112,6 @@ static struct tipc_bc_base *tipc_bc_base(struct net *net)
return tipc_net(net)->bcbase;
}
-static struct tipc_link *tipc_bc_sndlink(struct net *net)
-{
- return tipc_net(net)->bcl;
-}
-
/**
* tipc_nmap_equal - test for equality of node maps
*/
@@ -169,31 +164,6 @@ static void bcbuf_decr_acks(struct sk_buff *buf)
bcbuf_set_acks(buf, bcbuf_acks(buf) - 1);
}
-void tipc_bclink_add_node(struct net *net, u32 addr)
-{
- struct tipc_net *tn = net_generic(net, tipc_net_id);
- struct tipc_link *l = tipc_bc_sndlink(net);
- tipc_bclink_lock(net);
- tipc_nmap_add(&tn->bcbase->bcast_nodes, addr);
- tipc_link_add_bc_peer(l);
- tipc_bclink_unlock(net);
-}
-
-void tipc_bclink_remove_node(struct net *net, u32 addr)
-{
- struct tipc_net *tn = net_generic(net, tipc_net_id);
-
- tipc_bclink_lock(net);
- tipc_nmap_remove(&tn->bcbase->bcast_nodes, addr);
- tn->bcl->ackers--;
-
- /* Last node? => reset backlog queue */
- if (!tn->bcbase->bcast_nodes.count)
- tipc_link_purge_backlog(tn->bcbase->link);
-
- tipc_bclink_unlock(net);
-}
-
static void bclink_set_last_sent(struct net *net)
{
struct tipc_net *tn = net_generic(net, tipc_net_id);
@@ -501,12 +471,141 @@ int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list)
__skb_queue_purge(&rcvq);
return rc;
}
+
/* Broadcast to all nodes, inluding local node */
tipc_bcbearer_xmit(net, &xmitq);
tipc_sk_mcast_rcv(net, &rcvq, &inputq);
__skb_queue_purge(list);
return 0;
}
+
+/* tipc_bcast_rcv - receive a broadcast packet, and deliver to rcv link
+ *
+ * RCU is locked, no other locks set
+ */
+int tipc_bcast_rcv(struct net *net, struct tipc_link *l, struct sk_buff *skb)
+{
+ struct tipc_msg *hdr = buf_msg(skb);
+ struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq;
+ struct sk_buff_head xmitq;
+ int rc;
+
+ __skb_queue_head_init(&xmitq);
+
+ if (msg_mc_netid(hdr) != tipc_netid(net) || !tipc_link_is_up(l)) {
+ kfree_skb(skb);
+ return 0;
+ }
+
+ tipc_bcast_lock(net);
+ if (msg_user(hdr) == BCAST_PROTOCOL)
+ rc = tipc_link_bc_nack_rcv(l, skb, &xmitq);
+ else
+ rc = tipc_link_rcv(l, skb, NULL);
+ tipc_bcast_unlock(net);
+
+ if (!skb_queue_empty(&xmitq))
+ tipc_bcbearer_xmit(net, &xmitq);
+
+ /* Any socket wakeup messages ? */
+ if (!skb_queue_empty(inputq))
+ tipc_sk_rcv(net, inputq);
+
+ return rc;
+}
+
+/* tipc_bcast_ack_rcv - receive and handle a broadcast acknowledge
+ *
+ * RCU is locked, no other locks set
+ */
+void tipc_bcast_ack_rcv(struct net *net, struct tipc_link *l, u32 acked)
+{
+ struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq;
+ struct sk_buff_head xmitq;
+
+ __skb_queue_head_init(&xmitq);
+
+ tipc_bcast_lock(net);
+ tipc_link_bc_ack_rcv(l, acked, &xmitq);
+ tipc_bcast_unlock(net);
+
+ tipc_bcbearer_xmit(net, &xmitq);
+
+ /* Any socket wakeup messages ? */
+ if (!skb_queue_empty(inputq))
+ tipc_sk_rcv(net, inputq);
+}
+
+/* tipc_bcast_synch_rcv - check and update rcv link with peer's send state
+ *
+ * RCU is locked, no other locks set
+ */
+void tipc_bcast_sync_rcv(struct net *net, struct tipc_link *l,
+ struct tipc_msg *hdr)
+{
+ struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq;
+ struct sk_buff_head xmitq;
+
+ __skb_queue_head_init(&xmitq);
+
+ tipc_bcast_lock(net);
+ if (msg_type(hdr) == STATE_MSG) {
+ tipc_link_bc_ack_rcv(l, msg_bcast_ack(hdr), &xmitq);
+ tipc_link_bc_sync_rcv(l, hdr, &xmitq);
+ } else {
+ tipc_link_bc_init_rcv(l, hdr);
+ }
+ tipc_bcast_unlock(net);
+
+ tipc_bcbearer_xmit(net, &xmitq);
+
+ /* Any socket wakeup messages ? */
+ if (!skb_queue_empty(inputq))
+ tipc_sk_rcv(net, inputq);
+}
+
+/* tipc_bcast_add_peer - add a peer node to broadcast link and bearer
+ *
+ * RCU is locked, node lock is set
+ */
+void tipc_bcast_add_peer(struct net *net, u32 addr, struct tipc_link *uc_l,
+ struct sk_buff_head *xmitq)
+{
+ struct tipc_net *tn = net_generic(net, tipc_net_id);
+ struct tipc_link *snd_l = tipc_bc_sndlink(net);
+
+ tipc_bclink_lock(net);
+ tipc_nmap_add(&tn->bcbase->bcast_nodes, addr);
+ tipc_link_add_bc_peer(snd_l, uc_l, xmitq);
+ tipc_bclink_unlock(net);
+}
+
+/* tipc_bcast_remove_peer - remove a peer node from broadcast link and bearer
+ *
+ * RCU is locked, node lock is set
+ */
+void tipc_bcast_remove_peer(struct net *net, u32 addr,
+ struct tipc_link *rcv_l)
+{
+ struct tipc_net *tn = net_generic(net, tipc_net_id);
+ struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq;
+ struct tipc_link *snd_l = tipc_bc_sndlink(net);
+ struct sk_buff_head xmitq;
+
+ __skb_queue_head_init(&xmitq);
+
+ tipc_bclink_lock(net);
+ tipc_nmap_remove(&tn->bcbase->bcast_nodes, addr);
+ tipc_link_remove_bc_peer(snd_l, rcv_l, &xmitq);
+ tipc_bclink_unlock(net);
+
+ tipc_bcbearer_xmit(net, &xmitq);
+
+ /* Any socket wakeup messages ? */
+ if (!skb_queue_empty(inputq))
+ tipc_sk_rcv(net, inputq);
+}
+
/**
* bclink_accept_pkt - accept an incoming, in-sequence broadcast packet
*
@@ -728,6 +827,7 @@ static int tipc_bcbearer_send(struct net *net, struct sk_buff *buf,
return 0;
}
}
+ msg_set_mc_netid(msg, tn->net_id);
/* Send buffer over bearers until all targets reached */
bcbearer->remains = bclink->bcast_nodes;
@@ -1042,12 +1142,13 @@ int tipc_bcast_init(struct net *net)
spin_lock_init(&tipc_net(net)->bclock);
bb->node.net = net;
- if (!tipc_link_bc_create(&bb->node,
+ if (!tipc_link_bc_create(&bb->node, 0, 0,
MAX_PKT_DEFAULT_MCAST,
BCLINK_WIN_DEFAULT,
0,
&bb->inputq,
&bb->namedq,
+ NULL,
&l))
goto enomem;
bb->link = l;