summaryrefslogtreecommitdiffstats
path: root/net/tipc/socket.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/tipc/socket.c')
-rw-r--r--net/tipc/socket.c48
1 files changed, 36 insertions, 12 deletions
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 0a2eac309177..50145c95ac96 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -201,6 +201,11 @@ static bool tsk_conn_cong(struct tipc_sock *tsk)
return tsk->snt_unacked > tsk->snd_win;
}
+static u16 tsk_blocks(int len)
+{
+ return ((len / FLOWCTL_BLK_SZ) + 1);
+}
+
/* tsk_blocks(): translate a buffer size in bytes to number of
* advertisable blocks, taking into account the ratio truesize(len)/len
* We can trust that this ratio is always < 4 for len >= FLOWCTL_BLK_SZ
@@ -831,6 +836,7 @@ static int tipc_send_group_bcast(struct socket *sock, struct msghdr *m,
struct tipc_group *grp = tsk->group;
struct tipc_nlist *dsts = tipc_group_dests(grp);
struct tipc_mc_method *method = &tsk->mc_method;
+ int blks = tsk_blocks(MCAST_H_SIZE + dlen);
struct tipc_msg *hdr = &tsk->phdr;
int mtu = tipc_bcast_get_mtu(net);
struct sk_buff_head pkts;
@@ -839,14 +845,15 @@ static int tipc_send_group_bcast(struct socket *sock, struct msghdr *m,
if (!dsts->local && !dsts->remote)
return -EHOSTUNREACH;
- /* Block or return if any destination link is congested */
- rc = tipc_wait_for_cond(sock, &timeout, !tsk->cong_link_cnt);
+ /* Block or return if any destination link or member is congested */
+ rc = tipc_wait_for_cond(sock, &timeout, !tsk->cong_link_cnt &&
+ !tipc_group_bc_cong(grp, blks));
if (unlikely(rc))
return rc;
/* Complete message header */
msg_set_type(hdr, TIPC_GRP_BCAST_MSG);
- msg_set_hdr_sz(hdr, MCAST_H_SIZE);
+ msg_set_hdr_sz(hdr, GROUP_H_SIZE);
msg_set_destport(hdr, 0);
msg_set_destnode(hdr, 0);
msg_set_nameinst(hdr, 0);
@@ -864,9 +871,8 @@ static int tipc_send_group_bcast(struct socket *sock, struct msghdr *m,
if (unlikely(rc))
return rc;
- /* Update broadcast sequence number */
- tipc_group_update_bc_members(tsk->group);
-
+ /* Update broadcast sequence number and send windows */
+ tipc_group_update_bc_members(tsk->group, blks);
return dlen;
}
@@ -1024,7 +1030,7 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen)
if (unlikely(dlen > TIPC_MAX_USER_MSG_SIZE))
return -EMSGSIZE;
- if (unlikely(grp))
+ if (unlikely(grp && !dest))
return tipc_send_group_bcast(sock, m, dlen, timeout);
if (unlikely(!dest)) {
@@ -1420,6 +1426,7 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m,
bool connected = !tipc_sk_type_connectionless(sk);
struct tipc_sock *tsk = tipc_sk(sk);
int rc, err, hlen, dlen, copy;
+ struct sk_buff_head xmitq;
struct tipc_msg *hdr;
struct sk_buff *skb;
bool grp_evt;
@@ -1436,8 +1443,8 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m,
}
timeout = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
+ /* Step rcv queue to first msg with data or error; wait if necessary */
do {
- /* Look at first msg in receive queue; wait if necessary */
rc = tipc_wait_for_rcvmsg(sock, &timeout);
if (unlikely(rc))
goto exit;
@@ -1485,12 +1492,21 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m,
if (unlikely(flags & MSG_PEEK))
goto exit;
+ /* Send group flow control advertisement when applicable */
+ if (tsk->group && msg_in_group(hdr) && !grp_evt) {
+ skb_queue_head_init(&xmitq);
+ tipc_group_update_rcv_win(tsk->group, tsk_blocks(hlen + dlen),
+ msg_orignode(hdr), msg_origport(hdr),
+ &xmitq);
+ tipc_node_distr_xmit(sock_net(sk), &xmitq);
+ }
+
tsk_advance_rx_queue(sk);
if (likely(!connected))
goto exit;
- /* Send connection flow control ack when applicable */
+ /* Send connection flow control advertisement when applicable */
tsk->rcv_unacked += tsk_inc(tsk, hlen + dlen);
if (tsk->rcv_unacked >= tsk->rcv_win / TIPC_ACK_RATE)
tipc_sk_send_ack(tsk);
@@ -1650,6 +1666,7 @@ static void tipc_sk_proto_rcv(struct sock *sk,
struct tipc_sock *tsk = tipc_sk(sk);
struct tipc_msg *hdr = buf_msg(skb);
struct tipc_group *grp = tsk->group;
+ bool wakeup = false;
switch (msg_user(hdr)) {
case CONN_MANAGER:
@@ -1658,19 +1675,23 @@ static void tipc_sk_proto_rcv(struct sock *sk,
case SOCK_WAKEUP:
tipc_dest_del(&tsk->cong_links, msg_orignode(hdr), 0);
tsk->cong_link_cnt--;
- sk->sk_write_space(sk);
+ wakeup = true;
break;
case GROUP_PROTOCOL:
- tipc_group_proto_rcv(grp, hdr, inputq, xmitq);
+ tipc_group_proto_rcv(grp, &wakeup, hdr, inputq, xmitq);
break;
case TOP_SRV:
- tipc_group_member_evt(tsk->group, skb, inputq, xmitq);
+ tipc_group_member_evt(tsk->group, &wakeup, &sk->sk_rcvbuf,
+ skb, inputq, xmitq);
skb = NULL;
break;
default:
break;
}
+ if (wakeup)
+ sk->sk_write_space(sk);
+
kfree_skb(skb);
}
@@ -1785,6 +1806,9 @@ static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *skb)
struct tipc_sock *tsk = tipc_sk(sk);
struct tipc_msg *hdr = buf_msg(skb);
+ if (unlikely(msg_in_group(hdr)))
+ return sk->sk_rcvbuf;
+
if (unlikely(!msg_connected(hdr)))
return sk->sk_rcvbuf << msg_importance(hdr);