summaryrefslogblamecommitdiffstats
path: root/drivers/ntb/ntb_transport.c
blob: bf7ade14c742b29d4f4cca257e662ff98176f204 (plain) (tree)
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451





























































                                                                          
                                           
































                                                                             
                                  










                                                                       
                                  


                                                       
                                        





































                                               
                                        






































































                                                                               
                                                 











                                                     
                                                    
























































                                                                                       


                                            















































                                                                             


                                            



















































































































                                                                               
                                         
                                     
                     


                                               

                                                                          


                                                      
                                                   
                                                      

                                                            
                                          
                                                       
 




                                                                     








                                                                              











                                                                                





                                                                 
                                                         
 


                                                                           


















                                                                               





                                                                
 

                                                                           




























                                                                             
                                                 









































































































































































                                                                                

                                         








                                         











                                                                          












                                                                               
                                                          












































                                                                           
                                                                 




































































                                                                         
                                                             
 
                                                                            





















                                                                               
                                                 








                                                                       
                                                                            
























































                                                                               

                                                                































                                                                             
                                                           


                                                    
                                                                            



                                                                              
              

























                                                                              
                                                                            








                                                                                                           
                                                                                









                                                                    

                                                              













































































































































































































































































































































































                                                                                
                                                                
 
                                                                    

                                          
/*
 * This file is provided under a dual BSD/GPLv2 license.  When using or
 *   redistributing this file, you may do so under either license.
 *
 *   GPL LICENSE SUMMARY
 *
 *   Copyright(c) 2012 Intel Corporation. All rights reserved.
 *
 *   This program is free software; you can redistribute it and/or modify
 *   it under the terms of version 2 of the GNU General Public License as
 *   published by the Free Software Foundation.
 *
 *   BSD LICENSE
 *
 *   Copyright(c) 2012 Intel Corporation. All rights reserved.
 *
 *   Redistribution and use in source and binary forms, with or without
 *   modification, are permitted provided that the following conditions
 *   are met:
 *
 *     * Redistributions of source code must retain the above copyright
 *       notice, this list of conditions and the following disclaimer.
 *     * Redistributions in binary form must reproduce the above copy
 *       notice, this list of conditions and the following disclaimer in
 *       the documentation and/or other materials provided with the
 *       distribution.
 *     * Neither the name of Intel Corporation nor the names of its
 *       contributors may be used to endorse or promote products derived
 *       from this software without specific prior written permission.
 *
 *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
 *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
 *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
 *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
 *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
 *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
 *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
 *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
 *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
 *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 *
 * Intel PCIe NTB Linux driver
 *
 * Contact Information:
 * Jon Mason <jon.mason@intel.com>
 */
#include <linux/debugfs.h>
#include <linux/delay.h>
#include <linux/dma-mapping.h>
#include <linux/errno.h>
#include <linux/export.h>
#include <linux/interrupt.h>
#include <linux/module.h>
#include <linux/pci.h>
#include <linux/slab.h>
#include <linux/types.h>
#include <linux/ntb.h>
#include "ntb_hw.h"

#define NTB_TRANSPORT_VERSION	1

static unsigned int transport_mtu = 0x401E;
module_param(transport_mtu, uint, 0644);
MODULE_PARM_DESC(transport_mtu, "Maximum size of NTB transport packets");

static unsigned char max_num_clients = 2;
module_param(max_num_clients, byte, 0644);
MODULE_PARM_DESC(max_num_clients, "Maximum number of NTB transport clients");

struct ntb_queue_entry {
	/* ntb_queue list reference */
	struct list_head entry;
	/* pointers to data to be transfered */
	void *cb_data;
	void *buf;
	unsigned int len;
	unsigned int flags;
};

struct ntb_transport_qp {
	struct ntb_transport *transport;
	struct ntb_device *ndev;
	void *cb_data;

	bool client_ready;
	bool qp_link;
	u8 qp_num;	/* Only 64 QP's are allowed.  0-63 */

	void (*tx_handler) (struct ntb_transport_qp *qp, void *qp_data,
			    void *data, int len);
	struct list_head tx_free_q;
	spinlock_t ntb_tx_free_q_lock;
	void *tx_mw_begin;
	void *tx_mw_end;
	void *tx_offset;
	unsigned int tx_max_frame;

	void (*rx_handler) (struct ntb_transport_qp *qp, void *qp_data,
			    void *data, int len);
	struct tasklet_struct rx_work;
	struct list_head rx_pend_q;
	struct list_head rx_free_q;
	spinlock_t ntb_rx_pend_q_lock;
	spinlock_t ntb_rx_free_q_lock;
	void *rx_buff_begin;
	void *rx_buff_end;
	void *rx_offset;
	unsigned int rx_max_frame;

	void (*event_handler) (void *data, int status);
	struct delayed_work link_work;
	struct work_struct link_cleanup;

	struct dentry *debugfs_dir;
	struct dentry *debugfs_stats;

	/* Stats */
	u64 rx_bytes;
	u64 rx_pkts;
	u64 rx_ring_empty;
	u64 rx_err_no_buf;
	u64 rx_err_oflow;
	u64 rx_err_ver;
	u64 tx_bytes;
	u64 tx_pkts;
	u64 tx_ring_full;
};

struct ntb_transport_mw {
	size_t size;
	void *virt_addr;
	dma_addr_t dma_addr;
};

struct ntb_transport_client_dev {
	struct list_head entry;
	struct device dev;
};

struct ntb_transport {
	struct list_head entry;
	struct list_head client_devs;

	struct ntb_device *ndev;
	struct ntb_transport_mw mw[NTB_NUM_MW];
	struct ntb_transport_qp *qps;
	unsigned int max_qps;
	unsigned long qp_bitmap;
	bool transport_link;
	struct delayed_work link_work;
	struct work_struct link_cleanup;
	struct dentry *debugfs_dir;
};

enum {
	DESC_DONE_FLAG = 1 << 0,
	LINK_DOWN_FLAG = 1 << 1,
};

struct ntb_payload_header {
	u64 ver;
	unsigned int len;
	unsigned int flags;
};

enum {
	VERSION = 0,
	MW0_SZ,
	MW1_SZ,
	NUM_QPS,
	QP_LINKS,
	MAX_SPAD,
};

#define QP_TO_MW(qp)		((qp) % NTB_NUM_MW)
#define NTB_QP_DEF_NUM_ENTRIES	100
#define NTB_LINK_DOWN_TIMEOUT	10

static int ntb_match_bus(struct device *dev, struct device_driver *drv)
{
	return !strncmp(dev_name(dev), drv->name, strlen(drv->name));
}

static int ntb_client_probe(struct device *dev)
{
	const struct ntb_client *drv = container_of(dev->driver,
						    struct ntb_client, driver);
	struct pci_dev *pdev = container_of(dev->parent, struct pci_dev, dev);
	int rc = -EINVAL;

	get_device(dev);
	if (drv && drv->probe)
		rc = drv->probe(pdev);
	if (rc)
		put_device(dev);

	return rc;
}

static int ntb_client_remove(struct device *dev)
{
	const struct ntb_client *drv = container_of(dev->driver,
						    struct ntb_client, driver);
	struct pci_dev *pdev = container_of(dev->parent, struct pci_dev, dev);

	if (drv && drv->remove)
		drv->remove(pdev);

	put_device(dev);

	return 0;
}

struct bus_type ntb_bus_type = {
	.name = "ntb_bus",
	.match = ntb_match_bus,
	.probe = ntb_client_probe,
	.remove = ntb_client_remove,
};

static LIST_HEAD(ntb_transport_list);

static int ntb_bus_init(struct ntb_transport *nt)
{
	if (list_empty(&ntb_transport_list)) {
		int rc = bus_register(&ntb_bus_type);
		if (rc)
			return rc;
	}

	list_add(&nt->entry, &ntb_transport_list);

	return 0;
}

static void ntb_bus_remove(struct ntb_transport *nt)
{
	struct ntb_transport_client_dev *client_dev, *cd;

	list_for_each_entry_safe(client_dev, cd, &nt->client_devs, entry) {
		dev_err(client_dev->dev.parent, "%s still attached to bus, removing\n",
			dev_name(&client_dev->dev));
		list_del(&client_dev->entry);
		device_unregister(&client_dev->dev);
	}

	list_del(&nt->entry);

	if (list_empty(&ntb_transport_list))
		bus_unregister(&ntb_bus_type);
}

static void ntb_client_release(struct device *dev)
{
	struct ntb_transport_client_dev *client_dev;
	client_dev = container_of(dev, struct ntb_transport_client_dev, dev);

	kfree(client_dev);
}

/**
 * ntb_unregister_client_dev - Unregister NTB client device
 * @device_name: Name of NTB client device
 *
 * Unregister an NTB client device with the NTB transport layer
 */
void ntb_unregister_client_dev(char *device_name)
{
	struct ntb_transport_client_dev *client, *cd;
	struct ntb_transport *nt;

	list_for_each_entry(nt, &ntb_transport_list, entry)
		list_for_each_entry_safe(client, cd, &nt->client_devs, entry)
			if (!strncmp(dev_name(&client->dev), device_name,
				     strlen(device_name))) {
				list_del(&client->entry);
				device_unregister(&client->dev);
			}
}
EXPORT_SYMBOL_GPL(ntb_unregister_client_dev);

/**
 * ntb_register_client_dev - Register NTB client device
 * @device_name: Name of NTB client device
 *
 * Register an NTB client device with the NTB transport layer
 */
int ntb_register_client_dev(char *device_name)
{
	struct ntb_transport_client_dev *client_dev;
	struct ntb_transport *nt;
	int rc;

	if (list_empty(&ntb_transport_list))
		return -ENODEV;

	list_for_each_entry(nt, &ntb_transport_list, entry) {
		struct device *dev;

		client_dev = kzalloc(sizeof(struct ntb_transport_client_dev),
				     GFP_KERNEL);
		if (!client_dev) {
			rc = -ENOMEM;
			goto err;
		}

		dev = &client_dev->dev;

		/* setup and register client devices */
		dev_set_name(dev, "%s", device_name);
		dev->bus = &ntb_bus_type;
		dev->release = ntb_client_release;
		dev->parent = &ntb_query_pdev(nt->ndev)->dev;

		rc = device_register(dev);
		if (rc) {
			kfree(client_dev);
			goto err;
		}

		list_add_tail(&client_dev->entry, &nt->client_devs);
	}

	return 0;

err:
	ntb_unregister_client_dev(device_name);

	return rc;
}
EXPORT_SYMBOL_GPL(ntb_register_client_dev);

/**
 * ntb_register_client - Register NTB client driver
 * @drv: NTB client driver to be registered
 *
 * Register an NTB client driver with the NTB transport layer
 *
 * RETURNS: An appropriate -ERRNO error value on error, or zero for success.
 */
int ntb_register_client(struct ntb_client *drv)
{
	drv->driver.bus = &ntb_bus_type;

	if (list_empty(&ntb_transport_list))
		return -ENODEV;

	return driver_register(&drv->driver);
}
EXPORT_SYMBOL_GPL(ntb_register_client);

/**
 * ntb_unregister_client - Unregister NTB client driver
 * @drv: NTB client driver to be unregistered
 *
 * Unregister an NTB client driver with the NTB transport layer
 *
 * RETURNS: An appropriate -ERRNO error value on error, or zero for success.
 */
void ntb_unregister_client(struct ntb_client *drv)
{
	driver_unregister(&drv->driver);
}
EXPORT_SYMBOL_GPL(ntb_unregister_client);

static int debugfs_open(struct inode *inode, struct file *filp)
{
	filp->private_data = inode->i_private;
	return 0;
}

static ssize_t debugfs_read(struct file *filp, char __user *ubuf, size_t count,
			    loff_t *offp)
{
	struct ntb_transport_qp *qp;
	char buf[1024];
	ssize_t ret, out_offset, out_count;

	out_count = 1024;

	qp = filp->private_data;
	out_offset = 0;
	out_offset += snprintf(buf + out_offset, out_count - out_offset,
			       "NTB QP stats\n");
	out_offset += snprintf(buf + out_offset, out_count - out_offset,
			       "rx_bytes - \t%llu\n", qp->rx_bytes);
	out_offset += snprintf(buf + out_offset, out_count - out_offset,
			       "rx_pkts - \t%llu\n", qp->rx_pkts);
	out_offset += snprintf(buf + out_offset, out_count - out_offset,
			       "rx_ring_empty - %llu\n", qp->rx_ring_empty);
	out_offset += snprintf(buf + out_offset, out_count - out_offset,
			       "rx_err_no_buf - %llu\n", qp->rx_err_no_buf);
	out_offset += snprintf(buf + out_offset, out_count - out_offset,
			       "rx_err_oflow - \t%llu\n", qp->rx_err_oflow);
	out_offset += snprintf(buf + out_offset, out_count - out_offset,
			       "rx_err_ver - \t%llu\n", qp->rx_err_ver);
	out_offset += snprintf(buf + out_offset, out_count - out_offset,
			       "rx_buff_begin - %p\n", qp->rx_buff_begin);
	out_offset += snprintf(buf + out_offset, out_count - out_offset,
			       "rx_offset - \t%p\n", qp->rx_offset);
	out_offset += snprintf(buf + out_offset, out_count - out_offset,
			       "rx_buff_end - \t%p\n", qp->rx_buff_end);

	out_offset += snprintf(buf + out_offset, out_count - out_offset,
			       "tx_bytes - \t%llu\n", qp->tx_bytes);
	out_offset += snprintf(buf + out_offset, out_count - out_offset,
			       "tx_pkts - \t%llu\n", qp->tx_pkts);
	out_offset += snprintf(buf + out_offset, out_count - out_offset,
			       "tx_ring_full - \t%llu\n", qp->tx_ring_full);
	out_offset += snprintf(buf + out_offset, out_count - out_offset,
			       "tx_mw_begin - \t%p\n", qp->tx_mw_begin);
	out_offset += snprintf(buf + out_offset, out_count - out_offset,
			       "tx_offset - \t%p\n", qp->tx_offset);
	out_offset += snprintf(buf + out_offset, out_count - out_offset,
			       "tx_mw_end - \t%p\n", qp->tx_mw_end);

	out_offset += snprintf(buf + out_offset, out_count - out_offset,
			       "QP Link %s\n", (qp->qp_link == NTB_LINK_UP) ?
			       "Up" : "Down");

	ret = simple_read_from_buffer(ubuf, count, offp, buf, out_offset);
	return ret;
}

static const struct file_operations ntb_qp_debugfs_stats = {
	.owner = THIS_MODULE,
	.open = debugfs_open,
	.read = debugfs_read,
};

static void ntb_list_add(spinlock_t *lock, struct list_head *entry,
			 struct list_head *list)
{
	unsigned long flags;

	spin_lock_irqsave(lock, flags);
	list_add_tail(entry, list);
	spin_unlock_irqrestore(lock, flags);
}

static struct ntb_queue_entry *ntb_list_rm(spinlock_t *lock,
						struct list_head *list)
{
	struct ntb_queue_entry *entry;
	unsigned long flags;

	spin_lock_irqsave(lock, flags);
	if (list_empty(list)) {
		entry = NULL;
		goto out;
	}
	entry = list_first_entry(list, struct ntb_queue_entry, entry);
	list_del(&entry->entry);
out:
	spin_unlock_irqrestore(lock, flags);

	return entry;
}

static void ntb_transport_setup_qp_mw(struct ntb_transport *nt,
				      unsigned int qp_num)
{
	struct ntb_transport_qp *qp = &nt->qps[qp_num];
	unsigned int rx_size, num_qps_mw;
	u8 mw_num = QP_TO_MW(qp_num);
	void *offset;

	WARN_ON(nt->mw[mw_num].virt_addr == 0);

	if (nt->max_qps % NTB_NUM_MW && mw_num < nt->max_qps % NTB_NUM_MW)
		num_qps_mw = nt->max_qps / NTB_NUM_MW + 1;
	else
		num_qps_mw = nt->max_qps / NTB_NUM_MW;

	rx_size = nt->mw[mw_num].size / num_qps_mw;
	qp->rx_buff_begin = nt->mw[mw_num].virt_addr +
			    (qp_num / NTB_NUM_MW * rx_size);
	qp->rx_buff_end = qp->rx_buff_begin + rx_size;
	qp->rx_offset = qp->rx_buff_begin;
	qp->rx_max_frame = min(transport_mtu, rx_size);

	/* setup the hdr offsets with 0's */
	for (offset = qp->rx_buff_begin + qp->rx_max_frame -
		      sizeof(struct ntb_payload_header);
	     offset < qp->rx_buff_end; offset += qp->rx_max_frame)
		memset(offset, 0, sizeof(struct ntb_payload_header));

	qp->rx_pkts = 0;
	qp->tx_pkts = 0;
}

static int ntb_set_mw(struct ntb_transport *nt, int num_mw, unsigned int size)
{
	struct ntb_transport_mw *mw = &nt->mw[num_mw];
	struct pci_dev *pdev = ntb_query_pdev(nt->ndev);

	/* Alloc memory for receiving data.  Must be 4k aligned */
	mw->size = ALIGN(size, 4096);

	mw->virt_addr = dma_alloc_coherent(&pdev->dev, mw->size, &mw->dma_addr,
					   GFP_KERNEL);
	if (!mw->virt_addr) {
		dev_err(&pdev->dev, "Unable to allocate MW buffer of size %d\n",
		       (int) mw->size);
		return -ENOMEM;
	}

	/* Notify HW the memory location of the receive buffer */
	ntb_set_mw_addr(nt->ndev, num_mw, mw->dma_addr);

	return 0;
}

static void ntb_qp_link_cleanup(struct work_struct *work)
{
	struct ntb_transport_qp *qp = container_of(work,
						   struct ntb_transport_qp,
						   link_cleanup);
	struct ntb_transport *nt = qp->transport;
	struct pci_dev *pdev = ntb_query_pdev(nt->ndev);

	if (qp->qp_link == NTB_LINK_DOWN) {
		cancel_delayed_work_sync(&qp->link_work);
		return;
	}

	if (qp->event_handler)
		qp->event_handler(qp->cb_data, NTB_LINK_DOWN);

	dev_info(&pdev->dev, "qp %d: Link Down\n", qp->qp_num);
	qp->qp_link = NTB_LINK_DOWN;

	if (nt->transport_link == NTB_LINK_UP)
		schedule_delayed_work(&qp->link_work,
				      msecs_to_jiffies(NTB_LINK_DOWN_TIMEOUT));
}

static void ntb_qp_link_down(struct ntb_transport_qp *qp)
{
	schedule_work(&qp->link_cleanup);
}

static void ntb_transport_link_cleanup(struct work_struct *work)
{
	struct ntb_transport *nt = container_of(work, struct ntb_transport,
						link_cleanup);
	int i;

	if (nt->transport_link == NTB_LINK_DOWN)
		cancel_delayed_work_sync(&nt->link_work);
	else
		nt->transport_link = NTB_LINK_DOWN;

	/* Pass along the info to any clients */
	for (i = 0; i < nt->max_qps; i++)
		if (!test_bit(i, &nt->qp_bitmap))
			ntb_qp_link_down(&nt->qps[i]);

	/* The scratchpad registers keep the values if the remote side
	 * goes down, blast them now to give them a sane value the next
	 * time they are accessed
	 */
	for (i = 0; i < MAX_SPAD; i++)
		ntb_write_local_spad(nt->ndev, i, 0);
}

static void ntb_transport_event_callback(void *data, enum ntb_hw_event event)
{
	struct ntb_transport *nt = data;

	switch (event) {
	case NTB_EVENT_HW_LINK_UP:
		schedule_delayed_work(&nt->link_work, 0);
		break;
	case NTB_EVENT_HW_LINK_DOWN:
		schedule_work(&nt->link_cleanup);
		break;
	default:
		BUG();
	}
}

static void ntb_transport_link_work(struct work_struct *work)
{
	struct ntb_transport *nt = container_of(work, struct ntb_transport,
						link_work.work);
	struct ntb_device *ndev = nt->ndev;
	struct pci_dev *pdev = ntb_query_pdev(ndev);
	u32 val;
	int rc, i;

	/* send the local info */
	rc = ntb_write_remote_spad(ndev, VERSION, NTB_TRANSPORT_VERSION);
	if (rc) {
		dev_err(&pdev->dev, "Error writing %x to remote spad %d\n",
			0, VERSION);
		goto out;
	}

	rc = ntb_write_remote_spad(ndev, MW0_SZ, ntb_get_mw_size(ndev, 0));
	if (rc) {
		dev_err(&pdev->dev, "Error writing %x to remote spad %d\n",
			(u32) ntb_get_mw_size(ndev, 0), MW0_SZ);
		goto out;
	}

	rc = ntb_write_remote_spad(ndev, MW1_SZ, ntb_get_mw_size(ndev, 1));
	if (rc) {
		dev_err(&pdev->dev, "Error writing %x to remote spad %d\n",
			(u32) ntb_get_mw_size(ndev, 1), MW1_SZ);
		goto out;
	}

	rc = ntb_write_remote_spad(ndev, NUM_QPS, nt->max_qps);
	if (rc) {
		dev_err(&pdev->dev, "Error writing %x to remote spad %d\n",
			nt->max_qps, NUM_QPS);
		goto out;
	}

	rc = ntb_read_local_spad(nt->ndev, QP_LINKS, &val);
	if (rc) {
		dev_err(&pdev->dev, "Error reading spad %d\n", QP_LINKS);
		goto out;
	}

	rc = ntb_write_remote_spad(ndev, QP_LINKS, val);
	if (rc) {
		dev_err(&pdev->dev, "Error writing %x to remote spad %d\n",
			val, QP_LINKS);
		goto out;
	}

	/* Query the remote side for its info */
	rc = ntb_read_remote_spad(ndev, VERSION, &val);
	if (rc) {
		dev_err(&pdev->dev, "Error reading remote spad %d\n", VERSION);
		goto out;
	}

	if (val != NTB_TRANSPORT_VERSION)
		goto out;
	dev_dbg(&pdev->dev, "Remote version = %d\n", val);

	rc = ntb_read_remote_spad(ndev, NUM_QPS, &val);
	if (rc) {
		dev_err(&pdev->dev, "Error reading remote spad %d\n", NUM_QPS);
		goto out;
	}

	if (val != nt->max_qps)
		goto out;
	dev_dbg(&pdev->dev, "Remote max number of qps = %d\n", val);

	rc = ntb_read_remote_spad(ndev, MW0_SZ, &val);
	if (rc) {
		dev_err(&pdev->dev, "Error reading remote spad %d\n", MW0_SZ);
		goto out;
	}

	if (!val)
		goto out;
	dev_dbg(&pdev->dev, "Remote MW0 size = %d\n", val);

	rc = ntb_set_mw(nt, 0, val);
	if (rc)
		goto out;

	rc = ntb_read_remote_spad(ndev, MW1_SZ, &val);
	if (rc) {
		dev_err(&pdev->dev, "Error reading remote spad %d\n", MW1_SZ);
		goto out;
	}

	if (!val)
		goto out;
	dev_dbg(&pdev->dev, "Remote MW1 size = %d\n", val);

	rc = ntb_set_mw(nt, 1, val);
	if (rc)
		goto out;

	nt->transport_link = NTB_LINK_UP;

	for (i = 0; i < nt->max_qps; i++) {
		struct ntb_transport_qp *qp = &nt->qps[i];

		ntb_transport_setup_qp_mw(nt, i);

		if (qp->client_ready == NTB_LINK_UP)
			schedule_delayed_work(&qp->link_work, 0);
	}

	return;

out:
	if (ntb_hw_link_status(ndev))
		schedule_delayed_work(&nt->link_work,
				      msecs_to_jiffies(NTB_LINK_DOWN_TIMEOUT));
}

static void ntb_qp_link_work(struct work_struct *work)
{
	struct ntb_transport_qp *qp = container_of(work,
						   struct ntb_transport_qp,
						   link_work.work);
	struct pci_dev *pdev = ntb_query_pdev(qp->ndev);
	struct ntb_transport *nt = qp->transport;
	int rc, val;

	WARN_ON(nt->transport_link != NTB_LINK_UP);

	rc = ntb_read_local_spad(nt->ndev, QP_LINKS, &val);
	if (rc) {
		dev_err(&pdev->dev, "Error reading spad %d\n", QP_LINKS);
		return;
	}

	rc = ntb_write_remote_spad(nt->ndev, QP_LINKS, val | 1 << qp->qp_num);
	if (rc)
		dev_err(&pdev->dev, "Error writing %x to remote spad %d\n",
			val | 1 << qp->qp_num, QP_LINKS);

	/* query remote spad for qp ready bits */
	rc = ntb_read_remote_spad(nt->ndev, QP_LINKS, &val);
	if (rc)
		dev_err(&pdev->dev, "Error reading remote spad %d\n", QP_LINKS);

	dev_dbg(&pdev->dev, "Remote QP link status = %x\n", val);

	/* See if the remote side is up */
	if (1 << qp->qp_num & val) {
		qp->qp_link = NTB_LINK_UP;

		dev_info(&pdev->dev, "qp %d: Link Up\n", qp->qp_num);
		if (qp->event_handler)
			qp->event_handler(qp->cb_data, NTB_LINK_UP);
	} else if (nt->transport_link == NTB_LINK_UP)
		schedule_delayed_work(&qp->link_work,
				      msecs_to_jiffies(NTB_LINK_DOWN_TIMEOUT));
}

static void ntb_transport_init_queue(struct ntb_transport *nt,
				     unsigned int qp_num)
{
	struct ntb_transport_qp *qp;
	unsigned int num_qps_mw, tx_size;
	u8 mw_num = QP_TO_MW(qp_num);

	qp = &nt->qps[qp_num];
	qp->qp_num = qp_num;
	qp->transport = nt;
	qp->ndev = nt->ndev;
	qp->qp_link = NTB_LINK_DOWN;
	qp->client_ready = NTB_LINK_DOWN;
	qp->event_handler = NULL;

	if (nt->max_qps % NTB_NUM_MW && mw_num < nt->max_qps % NTB_NUM_MW)
		num_qps_mw = nt->max_qps / NTB_NUM_MW + 1;
	else
		num_qps_mw = nt->max_qps / NTB_NUM_MW;

	tx_size = ntb_get_mw_size(qp->ndev, mw_num) / num_qps_mw;
	qp->tx_mw_begin = ntb_get_mw_vbase(nt->ndev, mw_num) +
			  (qp_num / NTB_NUM_MW * tx_size);
	qp->tx_mw_end = qp->tx_mw_begin + tx_size;
	qp->tx_offset = qp->tx_mw_begin;
	qp->tx_max_frame = min(transport_mtu, tx_size);

	if (nt->debugfs_dir) {
		char debugfs_name[4];

		snprintf(debugfs_name, 4, "qp%d", qp_num);
		qp->debugfs_dir = debugfs_create_dir(debugfs_name,
						     nt->debugfs_dir);

		qp->debugfs_stats = debugfs_create_file("stats", S_IRUSR,
							qp->debugfs_dir, qp,
							&ntb_qp_debugfs_stats);
	}

	INIT_DELAYED_WORK(&qp->link_work, ntb_qp_link_work);
	INIT_WORK(&qp->link_cleanup, ntb_qp_link_cleanup);

	spin_lock_init(&qp->ntb_rx_pend_q_lock);
	spin_lock_init(&qp->ntb_rx_free_q_lock);
	spin_lock_init(&qp->ntb_tx_free_q_lock);

	INIT_LIST_HEAD(&qp->rx_pend_q);
	INIT_LIST_HEAD(&qp->rx_free_q);
	INIT_LIST_HEAD(&qp->tx_free_q);
}

int ntb_transport_init(struct pci_dev *pdev)
{
	struct ntb_transport *nt;
	int rc, i;

	nt = kzalloc(sizeof(struct ntb_transport), GFP_KERNEL);
	if (!nt)
		return -ENOMEM;

	if (debugfs_initialized())
		nt->debugfs_dir = debugfs_create_dir(KBUILD_MODNAME, NULL);
	else
		nt->debugfs_dir = NULL;

	nt->ndev = ntb_register_transport(pdev, nt);
	if (!nt->ndev) {
		rc = -EIO;
		goto err;
	}

	nt->max_qps = min(nt->ndev->max_cbs, max_num_clients);

	nt->qps = kcalloc(nt->max_qps, sizeof(struct ntb_transport_qp),
			  GFP_KERNEL);
	if (!nt->qps) {
		rc = -ENOMEM;
		goto err1;
	}

	nt->qp_bitmap = ((u64) 1 << nt->max_qps) - 1;

	for (i = 0; i < nt->max_qps; i++)
		ntb_transport_init_queue(nt, i);

	INIT_DELAYED_WORK(&nt->link_work, ntb_transport_link_work);
	INIT_WORK(&nt->link_cleanup, ntb_transport_link_cleanup);

	rc = ntb_register_event_callback(nt->ndev,
					 ntb_transport_event_callback);
	if (rc)
		goto err2;

	INIT_LIST_HEAD(&nt->client_devs);
	rc = ntb_bus_init(nt);
	if (rc)
		goto err3;

	if (ntb_hw_link_status(nt->ndev))
		schedule_delayed_work(&nt->link_work, 0);

	return 0;

err3:
	ntb_unregister_event_callback(nt->ndev);
err2:
	kfree(nt->qps);
err1:
	ntb_unregister_transport(nt->ndev);
err:
	debugfs_remove_recursive(nt->debugfs_dir);
	kfree(nt);
	return rc;
}

void ntb_transport_free(void *transport)
{
	struct ntb_transport *nt = transport;
	struct pci_dev *pdev;
	int i;

	nt->transport_link = NTB_LINK_DOWN;

	/* verify that all the qp's are freed */
	for (i = 0; i < nt->max_qps; i++)
		if (!test_bit(i, &nt->qp_bitmap))
			ntb_transport_free_queue(&nt->qps[i]);

	ntb_bus_remove(nt);

	cancel_delayed_work_sync(&nt->link_work);

	debugfs_remove_recursive(nt->debugfs_dir);

	ntb_unregister_event_callback(nt->ndev);

	pdev = ntb_query_pdev(nt->ndev);

	for (i = 0; i < NTB_NUM_MW; i++)
		if (nt->mw[i].virt_addr)
			dma_free_coherent(&pdev->dev, nt->mw[i].size,
					  nt->mw[i].virt_addr,
					  nt->mw[i].dma_addr);

	kfree(nt->qps);
	ntb_unregister_transport(nt->ndev);
	kfree(nt);
}

static void ntb_rx_copy_task(struct ntb_transport_qp *qp,
			     struct ntb_queue_entry *entry, void *offset)
{

	struct ntb_payload_header *hdr;

	BUG_ON(offset < qp->rx_buff_begin ||
	       offset + qp->rx_max_frame >= qp->rx_buff_end);

	hdr = offset + qp->rx_max_frame - sizeof(struct ntb_payload_header);
	entry->len = hdr->len;

	memcpy(entry->buf, offset, entry->len);

	/* Ensure that the data is fully copied out before clearing the flag */
	wmb();
	hdr->flags = 0;

	if (qp->rx_handler && qp->client_ready == NTB_LINK_UP)
		qp->rx_handler(qp, qp->cb_data, entry->cb_data, entry->len);

	ntb_list_add(&qp->ntb_rx_free_q_lock, &entry->entry, &qp->rx_free_q);
}

static int ntb_process_rxc(struct ntb_transport_qp *qp)
{
	struct ntb_payload_header *hdr;
	struct ntb_queue_entry *entry;
	void *offset;

	entry = ntb_list_rm(&qp->ntb_rx_pend_q_lock, &qp->rx_pend_q);
	if (!entry) {
		hdr = offset + qp->rx_max_frame -
		      sizeof(struct ntb_payload_header);
		dev_dbg(&ntb_query_pdev(qp->ndev)->dev,
			"no buffer - HDR ver %llu, len %d, flags %x\n",
			hdr->ver, hdr->len, hdr->flags);
		qp->rx_err_no_buf++;
		return -ENOMEM;
	}

	offset = qp->rx_offset;
	hdr = offset + qp->rx_max_frame - sizeof(struct ntb_payload_header);

	if (!(hdr->flags & DESC_DONE_FLAG)) {
		ntb_list_add(&qp->ntb_rx_pend_q_lock, &entry->entry,
				  &qp->rx_pend_q);
		qp->rx_ring_empty++;
		return -EAGAIN;
	}

	if (hdr->ver != qp->rx_pkts) {
		dev_dbg(&ntb_query_pdev(qp->ndev)->dev,
			"qp %d: version mismatch, expected %llu - got %llu\n",
			qp->qp_num, qp->rx_pkts, hdr->ver);
		ntb_list_add(&qp->ntb_rx_pend_q_lock, &entry->entry,
				  &qp->rx_pend_q);
		qp->rx_err_ver++;
		return -EIO;
	}

	if (hdr->flags & LINK_DOWN_FLAG) {
		ntb_qp_link_down(qp);

		ntb_list_add(&qp->ntb_rx_pend_q_lock, &entry->entry,
				  &qp->rx_pend_q);

		/* Ensure that the data is fully copied out before clearing the
		 * done flag
		 */
		wmb();
		hdr->flags = 0;
		goto out;
	}

	dev_dbg(&ntb_query_pdev(qp->ndev)->dev,
		"rx offset %p, ver %llu - %d payload received, buf size %d\n",
		qp->rx_offset, hdr->ver, hdr->len, entry->len);

	if (hdr->len <= entry->len)
		ntb_rx_copy_task(qp, entry, offset);
	else {
		ntb_list_add(&qp->ntb_rx_pend_q_lock, &entry->entry,
				  &qp->rx_pend_q);

		/* Ensure that the data is fully copied out before clearing the
		 * done flag
		 */
		wmb();
		hdr->flags = 0;
		qp->rx_err_oflow++;
		dev_dbg(&ntb_query_pdev(qp->ndev)->dev,
			"RX overflow! Wanted %d got %d\n",
			hdr->len, entry->len);
	}

	qp->rx_bytes += hdr->len;
	qp->rx_pkts++;

out:
	qp->rx_offset += qp->rx_max_frame;
	if (qp->rx_offset + qp->rx_max_frame >= qp->rx_buff_end)
		qp->rx_offset = qp->rx_buff_begin;

	return 0;
}

static void ntb_transport_rx(unsigned long data)
{
	struct ntb_transport_qp *qp = (struct ntb_transport_qp *)data;
	int rc;

	do {
		rc = ntb_process_rxc(qp);
	} while (!rc);
}

static void ntb_transport_rxc_db(void *data, int db_num)
{
	struct ntb_transport_qp *qp = data;

	dev_dbg(&ntb_query_pdev(qp->ndev)->dev, "%s: doorbell %d received\n",
		__func__, db_num);

	tasklet_schedule(&qp->rx_work);
}

static void ntb_tx_copy_task(struct ntb_transport_qp *qp,
			     struct ntb_queue_entry *entry,
			     void *offset)
{
	struct ntb_payload_header *hdr;

	BUG_ON(offset < qp->tx_mw_begin ||
	       offset + qp->tx_max_frame >= qp->tx_mw_end);

	memcpy_toio(offset, entry->buf, entry->len);

	hdr = offset + qp->tx_max_frame - sizeof(struct ntb_payload_header);
	hdr->len = entry->len;
	hdr->ver = qp->tx_pkts;

	/* Ensure that the data is fully copied out before setting the flag */
	wmb();
	hdr->flags = entry->flags | DESC_DONE_FLAG;

	ntb_ring_sdb(qp->ndev, qp->qp_num);

	/* The entry length can only be zero if the packet is intended to be a
	 * "link down" or similar.  Since no payload is being sent in these
	 * cases, there is nothing to add to the completion queue.
	 */
	if (entry->len > 0) {
		qp->tx_bytes += entry->len;

		if (qp->tx_handler)
			qp->tx_handler(qp, qp->cb_data, entry->cb_data,
				       entry->len);
	}

	ntb_list_add(&qp->ntb_tx_free_q_lock, &entry->entry, &qp->tx_free_q);
}

static int ntb_process_tx(struct ntb_transport_qp *qp,
			  struct ntb_queue_entry *entry)
{
	struct ntb_payload_header *hdr;
	void *offset;

	offset = qp->tx_offset;
	hdr = offset + qp->tx_max_frame - sizeof(struct ntb_payload_header);

	dev_dbg(&ntb_query_pdev(qp->ndev)->dev, "%lld - offset %p, tx %p, entry len %d flags %x buff %p\n",
		 qp->tx_pkts, offset, qp->tx_offset, entry->len, entry->flags,
		 entry->buf);
	if (hdr->flags) {
		qp->tx_ring_full++;
		return -EAGAIN;
	}

	if (entry->len > qp->tx_max_frame - sizeof(struct ntb_payload_header)) {
		if (qp->tx_handler)
			qp->tx_handler(qp->cb_data, qp, NULL, -EIO);

		ntb_list_add(&qp->ntb_tx_free_q_lock, &entry->entry,
			     &qp->tx_free_q);
		return 0;
	}

	ntb_tx_copy_task(qp, entry, offset);

	qp->tx_offset += qp->tx_max_frame;
	if (qp->tx_offset + qp->tx_max_frame >= qp->tx_mw_end)
		qp->tx_offset = qp->tx_mw_begin;

	qp->tx_pkts++;

	return 0;
}

static void ntb_send_link_down(struct ntb_transport_qp *qp)
{
	struct pci_dev *pdev = ntb_query_pdev(qp->ndev);
	struct ntb_queue_entry *entry;
	int i, rc;

	if (qp->qp_link == NTB_LINK_DOWN)
		return;

	qp->qp_link = NTB_LINK_DOWN;
	dev_info(&pdev->dev, "qp %d: Link Down\n", qp->qp_num);

	for (i = 0; i < NTB_LINK_DOWN_TIMEOUT; i++) {
		entry = ntb_list_rm(&qp->ntb_tx_free_q_lock,
					 &qp->tx_free_q);
		if (entry)
			break;
		msleep(100);
	}

	if (!entry)
		return;

	entry->cb_data = NULL;
	entry->buf = NULL;
	entry->len = 0;
	entry->flags = LINK_DOWN_FLAG;

	rc = ntb_process_tx(qp, entry);
	if (rc)
		dev_err(&pdev->dev, "ntb: QP%d unable to send linkdown msg\n",
			qp->qp_num);
}

/**
 * ntb_transport_create_queue - Create a new NTB transport layer queue
 * @rx_handler: receive callback function
 * @tx_handler: transmit callback function
 * @event_handler: event callback function
 *
 * Create a new NTB transport layer queue and provide the queue with a callback
 * routine for both transmit and receive.  The receive callback routine will be
 * used to pass up data when the transport has received it on the queue.   The
 * transmit callback routine will be called when the transport has completed the
 * transmission of the data on the queue and the data is ready to be freed.
 *
 * RETURNS: pointer to newly created ntb_queue, NULL on error.
 */
struct ntb_transport_qp *
ntb_transport_create_queue(void *data, struct pci_dev *pdev,
			   const struct ntb_queue_handlers *handlers)
{
	struct ntb_queue_entry *entry;
	struct ntb_transport_qp *qp;
	struct ntb_transport *nt;
	unsigned int free_queue;
	int rc, i;

	nt = ntb_find_transport(pdev);
	if (!nt)
		goto err;

	free_queue = ffs(nt->qp_bitmap);
	if (!free_queue)
		goto err;

	/* decrement free_queue to make it zero based */
	free_queue--;

	clear_bit(free_queue, &nt->qp_bitmap);

	qp = &nt->qps[free_queue];
	qp->cb_data = data;
	qp->rx_handler = handlers->rx_handler;
	qp->tx_handler = handlers->tx_handler;
	qp->event_handler = handlers->event_handler;

	for (i = 0; i < NTB_QP_DEF_NUM_ENTRIES; i++) {
		entry = kzalloc(sizeof(struct ntb_queue_entry), GFP_ATOMIC);
		if (!entry)
			goto err1;

		ntb_list_add(&qp->ntb_rx_free_q_lock, &entry->entry,
				  &qp->rx_free_q);
	}

	for (i = 0; i < NTB_QP_DEF_NUM_ENTRIES; i++) {
		entry = kzalloc(sizeof(struct ntb_queue_entry), GFP_ATOMIC);
		if (!entry)
			goto err2;

		ntb_list_add(&qp->ntb_tx_free_q_lock, &entry->entry,
				  &qp->tx_free_q);
	}

	tasklet_init(&qp->rx_work, ntb_transport_rx, (unsigned long) qp);

	rc = ntb_register_db_callback(qp->ndev, free_queue, qp,
				      ntb_transport_rxc_db);
	if (rc)
		goto err3;

	dev_info(&pdev->dev, "NTB Transport QP %d created\n", qp->qp_num);

	return qp;

err3:
	tasklet_disable(&qp->rx_work);
err2:
	while ((entry =
		ntb_list_rm(&qp->ntb_tx_free_q_lock, &qp->tx_free_q)))
		kfree(entry);
err1:
	while ((entry =
		ntb_list_rm(&qp->ntb_rx_free_q_lock, &qp->rx_free_q)))
		kfree(entry);
	set_bit(free_queue, &nt->qp_bitmap);
err:
	return NULL;
}
EXPORT_SYMBOL_GPL(ntb_transport_create_queue);

/**
 * ntb_transport_free_queue - Frees NTB transport queue
 * @qp: NTB queue to be freed
 *
 * Frees NTB transport queue
 */
void ntb_transport_free_queue(struct ntb_transport_qp *qp)
{
	struct pci_dev *pdev = ntb_query_pdev(qp->ndev);
	struct ntb_queue_entry *entry;

	if (!qp)
		return;

	cancel_delayed_work_sync(&qp->link_work);

	ntb_unregister_db_callback(qp->ndev, qp->qp_num);
	tasklet_disable(&qp->rx_work);

	while ((entry =
		ntb_list_rm(&qp->ntb_rx_free_q_lock, &qp->rx_free_q)))
		kfree(entry);

	while ((entry =
		ntb_list_rm(&qp->ntb_rx_pend_q_lock, &qp->rx_pend_q))) {
		dev_warn(&pdev->dev, "Freeing item from a non-empty queue\n");
		kfree(entry);
	}

	while ((entry =
		ntb_list_rm(&qp->ntb_tx_free_q_lock, &qp->tx_free_q)))
		kfree(entry);

	set_bit(qp->qp_num, &qp->transport->qp_bitmap);

	dev_info(&pdev->dev, "NTB Transport QP %d freed\n", qp->qp_num);
}
EXPORT_SYMBOL_GPL(ntb_transport_free_queue);

/**
 * ntb_transport_rx_remove - Dequeues enqueued rx packet
 * @qp: NTB queue to be freed
 * @len: pointer to variable to write enqueued buffers length
 *
 * Dequeues unused buffers from receive queue.  Should only be used during
 * shutdown of qp.
 *
 * RETURNS: NULL error value on error, or void* for success.
 */
void *ntb_transport_rx_remove(struct ntb_transport_qp *qp, unsigned int *len)
{
	struct ntb_queue_entry *entry;
	void *buf;

	if (!qp || qp->client_ready == NTB_LINK_UP)
		return NULL;

	entry = ntb_list_rm(&qp->ntb_rx_pend_q_lock, &qp->rx_pend_q);
	if (!entry)
		return NULL;

	buf = entry->cb_data;
	*len = entry->len;

	ntb_list_add(&qp->ntb_rx_free_q_lock, &entry->entry,
			  &qp->rx_free_q);

	return buf;
}
EXPORT_SYMBOL_GPL(ntb_transport_rx_remove);

/**
 * ntb_transport_rx_enqueue - Enqueue a new NTB queue entry
 * @qp: NTB transport layer queue the entry is to be enqueued on
 * @cb: per buffer pointer for callback function to use
 * @data: pointer to data buffer that incoming packets will be copied into
 * @len: length of the data buffer
 *
 * Enqueue a new receive buffer onto the transport queue into which a NTB
 * payload can be received into.
 *
 * RETURNS: An appropriate -ERRNO error value on error, or zero for success.
 */
int ntb_transport_rx_enqueue(struct ntb_transport_qp *qp, void *cb, void *data,
			     unsigned int len)
{
	struct ntb_queue_entry *entry;

	if (!qp)
		return -EINVAL;

	entry = ntb_list_rm(&qp->ntb_rx_free_q_lock, &qp->rx_free_q);
	if (!entry)
		return -ENOMEM;

	entry->cb_data = cb;
	entry->buf = data;
	entry->len = len;

	ntb_list_add(&qp->ntb_rx_pend_q_lock, &entry->entry,
			  &qp->rx_pend_q);

	return 0;
}
EXPORT_SYMBOL_GPL(ntb_transport_rx_enqueue);

/**
 * ntb_transport_tx_enqueue - Enqueue a new NTB queue entry
 * @qp: NTB transport layer queue the entry is to be enqueued on
 * @cb: per buffer pointer for callback function to use
 * @data: pointer to data buffer that will be sent
 * @len: length of the data buffer
 *
 * Enqueue a new transmit buffer onto the transport queue from which a NTB
 * payload will be transmitted.  This assumes that a lock is behing held to
 * serialize access to the qp.
 *
 * RETURNS: An appropriate -ERRNO error value on error, or zero for success.
 */
int ntb_transport_tx_enqueue(struct ntb_transport_qp *qp, void *cb, void *data,
			     unsigned int len)
{
	struct ntb_queue_entry *entry;
	int rc;

	if (!qp || qp->qp_link != NTB_LINK_UP || !len)
		return -EINVAL;

	entry = ntb_list_rm(&qp->ntb_tx_free_q_lock, &qp->tx_free_q);
	if (!entry)
		return -ENOMEM;

	entry->cb_data = cb;
	entry->buf = data;
	entry->len = len;
	entry->flags = 0;

	rc = ntb_process_tx(qp, entry);
	if (rc)
		ntb_list_add(&qp->ntb_tx_free_q_lock, &entry->entry,
			     &qp->tx_free_q);

	return rc;
}
EXPORT_SYMBOL_GPL(ntb_transport_tx_enqueue);

/**
 * ntb_transport_link_up - Notify NTB transport of client readiness to use queue
 * @qp: NTB transport layer queue to be enabled
 *
 * Notify NTB transport layer of client readiness to use queue
 */
void ntb_transport_link_up(struct ntb_transport_qp *qp)
{
	if (!qp)
		return;

	qp->client_ready = NTB_LINK_UP;

	if (qp->transport->transport_link == NTB_LINK_UP)
		schedule_delayed_work(&qp->link_work, 0);
}
EXPORT_SYMBOL_GPL(ntb_transport_link_up);

/**
 * ntb_transport_link_down - Notify NTB transport to no longer enqueue data
 * @qp: NTB transport layer queue to be disabled
 *
 * Notify NTB transport layer of client's desire to no longer receive data on
 * transport queue specified.  It is the client's responsibility to ensure all
 * entries on queue are purged or otherwise handled appropraitely.
 */
void ntb_transport_link_down(struct ntb_transport_qp *qp)
{
	struct pci_dev *pdev = ntb_query_pdev(qp->ndev);
	int rc, val;

	if (!qp)
		return;

	qp->client_ready = NTB_LINK_DOWN;

	rc = ntb_read_local_spad(qp->ndev, QP_LINKS, &val);
	if (rc) {
		dev_err(&pdev->dev, "Error reading spad %d\n", QP_LINKS);
		return;
	}

	rc = ntb_write_remote_spad(qp->ndev, QP_LINKS,
				   val & ~(1 << qp->qp_num));
	if (rc)
		dev_err(&pdev->dev, "Error writing %x to remote spad %d\n",
			val & ~(1 << qp->qp_num), QP_LINKS);

	if (qp->qp_link == NTB_LINK_UP)
		ntb_send_link_down(qp);
	else
		cancel_delayed_work_sync(&qp->link_work);
}
EXPORT_SYMBOL_GPL(ntb_transport_link_down);

/**
 * ntb_transport_link_query - Query transport link state
 * @qp: NTB transport layer queue to be queried
 *
 * Query connectivity to the remote system of the NTB transport queue
 *
 * RETURNS: true for link up or false for link down
 */
bool ntb_transport_link_query(struct ntb_transport_qp *qp)
{
	return qp->qp_link == NTB_LINK_UP;
}
EXPORT_SYMBOL_GPL(ntb_transport_link_query);

/**
 * ntb_transport_qp_num - Query the qp number
 * @qp: NTB transport layer queue to be queried
 *
 * Query qp number of the NTB transport queue
 *
 * RETURNS: a zero based number specifying the qp number
 */
unsigned char ntb_transport_qp_num(struct ntb_transport_qp *qp)
{
	return qp->qp_num;
}
EXPORT_SYMBOL_GPL(ntb_transport_qp_num);

/**
 * ntb_transport_max_size - Query the max payload size of a qp
 * @qp: NTB transport layer queue to be queried
 *
 * Query the maximum payload size permissible on the given qp
 *
 * RETURNS: the max payload size of a qp
 */
unsigned int ntb_transport_max_size(struct ntb_transport_qp *qp)
{
	return qp->tx_max_frame - sizeof(struct ntb_payload_header);
}
EXPORT_SYMBOL_GPL(ntb_transport_max_size);