media: dvb: mpq: Use kthread instead of workqueue

demux used single-threaded workqueue to process TS packets
notified from the HW. Workqueue implementation was changed
so that all work scheduled to workqueues are submitted
to same kworker threads, this result on having TS packet
processing not to be done on demux own thread and compete
with other work scheduled by other drivers. Moved to separate
thread dedicated only for demux.

Change-Id: Ia8b96543f26428a0a12809d34c27849f900cc45e
Signed-off-by: Hamad Kadmany <hkadmany@codeaurora.org>
Signed-off-by: Neha Pandey <nehap@codeaurora.org>
This commit is contained in:
Hamad Kadmany 2012-11-27 17:51:01 +02:00 committed by Stephen Boyd
parent 3b2854c970
commit 4b868401f2
4 changed files with 228 additions and 216 deletions

View file

@ -123,17 +123,15 @@ only one demux device may process it at a time.
Background Processing
---------------------
When demux receives notifications from underlying HW drivers about new
data, it schedules work to a single-threaded workqueue to process the
notification.
Demux allocates a kernel thread for each live-input to process
the TS packets notified from the HW for specific input. There
are two such inputs (TSIF0 and TSIF1), both can be processed in
parallel by two seperate threads.
The processing is the action of demuxing of the new data; it may sleep
as it locks against the demux data-structure that may be accessed by
user-space in the meanwhile.
A single threaded workqueue exists for each live input (TSIF0 or TSIF1)
to process the inputs in parallel.
Dependencies
------------
The demux driver depends on the following kernel drivers and subsystems:

View file

@ -30,7 +30,7 @@
/**
* TSIF alias name length
*/
#define TSIF_NAME_LENGTH 10
#define TSIF_NAME_LENGTH 20
#define MPQ_MAX_FOUND_PATTERNS 5

View file

@ -13,7 +13,7 @@
#include <linux/init.h>
#include <linux/module.h>
#include <linux/tsif_api.h>
#include <linux/workqueue.h>
#include <linux/kthread.h>
#include <linux/moduleparam.h>
#include "mpq_dvb_debug.h"
#include "mpq_dmx_plugin_common.h"
@ -38,17 +38,8 @@ static int threshold = DMX_TSIF_PACKETS_IN_CHUNK_DEF;
static int tsif_mode = DMX_TSIF_DRIVER_MODE_DEF;
static int clock_inv;
module_param(threshold, int, S_IRUGO);
module_param(tsif_mode, int, S_IRUGO);
module_param(clock_inv, int, S_IRUGO);
/*
* Work scheduled each time TSIF notifies dmx
* of new TS packet
*/
struct tsif_work {
struct work_struct work;
int tsif_id;
};
module_param(tsif_mode, int, S_IRUGO | S_IWUSR);
module_param(clock_inv, int, S_IRUGO | S_IWUSR);
/*
@ -78,11 +69,12 @@ static struct
{
/* Information for each TSIF input processing */
struct {
/* work used to submit to workqueue for processing */
struct tsif_work work;
/* thread processing TS packets from TSIF */
struct task_struct *thread;
wait_queue_head_t wait_queue;
/* workqueue that processes TS packets from specific TSIF */
struct workqueue_struct *workqueue;
/* Counter for data notifications from TSIF */
atomic_t data_cnt;
/* TSIF alias */
char name[TSIF_NAME_LENGTH];
@ -103,94 +95,72 @@ static struct
/**
* Worker function that processes the TS packets notified by the TSIF driver.
* Demux thread function handling data from specific TSIF.
*
* @worker: the executed work
* @arg: TSIF number
*/
static void mpq_dmx_tsif_work(struct work_struct *worker)
static int mpq_dmx_tsif_thread(void *arg)
{
struct tsif_work *tsif_work =
container_of(worker, struct tsif_work, work);
struct mpq_demux *mpq_demux;
struct tsif_driver_info *tsif_driver;
size_t packets = 0;
int tsif = tsif_work->tsif_id;
int tsif = (int)arg;
int ret;
mpq_demux = mpq_dmx_tsif_info.tsif[tsif].mpq_demux;
tsif_driver = &(mpq_dmx_tsif_info.tsif[tsif].tsif_driver);
do {
ret = wait_event_interruptible(
mpq_dmx_tsif_info.tsif[tsif].wait_queue,
(atomic_read(
&mpq_dmx_tsif_info.tsif[tsif].data_cnt) != 0) ||
kthread_should_stop());
MPQ_DVB_DBG_PRINT(
"%s executed, tsif = %d\n",
__func__,
tsif);
if ((ret < 0) || kthread_should_stop()) {
MPQ_DVB_DBG_PRINT("%s: exit\n", __func__);
break;
}
if (mutex_lock_interruptible(&mpq_dmx_tsif_info.tsif[tsif].mutex))
return;
if (mutex_lock_interruptible(
&mpq_dmx_tsif_info.tsif[tsif].mutex))
return -ERESTARTSYS;
/* Check if driver handler is still valid */
if (tsif_driver->tsif_handler == NULL) {
mutex_unlock(&mpq_dmx_tsif_info.tsif[tsif].mutex);
MPQ_DVB_ERR_PRINT("%s: tsif_driver->tsif_handler is NULL!\n",
tsif_driver = &(mpq_dmx_tsif_info.tsif[tsif].tsif_driver);
mpq_demux = mpq_dmx_tsif_info.tsif[tsif].mpq_demux;
/* Check if driver handler is still valid */
if (tsif_driver->tsif_handler == NULL) {
mutex_unlock(&mpq_dmx_tsif_info.tsif[tsif].mutex);
MPQ_DVB_DBG_PRINT(
"%s: tsif was detached\n",
__func__);
return;
}
continue;
}
tsif_get_state(tsif_driver->tsif_handler, &(tsif_driver->ri),
&(tsif_driver->wi), &(tsif_driver->state));
tsif_get_state(
tsif_driver->tsif_handler, &(tsif_driver->ri),
&(tsif_driver->wi), &(tsif_driver->state));
if ((tsif_driver->wi == tsif_driver->ri) ||
(tsif_driver->state == tsif_state_stopped) ||
(tsif_driver->state == tsif_state_error)) {
if ((tsif_driver->wi == tsif_driver->ri) ||
(tsif_driver->state == tsif_state_stopped) ||
(tsif_driver->state == tsif_state_error)) {
mpq_demux->hw_notification_size = 0;
mpq_demux->hw_notification_size = 0;
mutex_unlock(&mpq_dmx_tsif_info.tsif[tsif].mutex);
mutex_unlock(&mpq_dmx_tsif_info.tsif[tsif].mutex);
MPQ_DVB_ERR_PRINT(
"%s: invalid TSIF state (%d), wi = (%d), ri = (%d)\n",
__func__,
tsif_driver->state, tsif_driver->wi, tsif_driver->ri);
return;
}
MPQ_DVB_DBG_PRINT(
"%s: TSIF invalid state %d, %d, %d\n",
__func__,
tsif_driver->state,
tsif_driver->wi,
tsif_driver->ri);
continue;
}
if (tsif_driver->wi > tsif_driver->ri) {
packets = (tsif_driver->wi - tsif_driver->ri);
mpq_demux->hw_notification_size = packets;
atomic_dec(&mpq_dmx_tsif_info.tsif[tsif].data_cnt);
dvb_dmx_swfilter_format(
&mpq_demux->demux,
(tsif_driver->data_buffer +
(tsif_driver->ri * TSIF_PKT_SIZE)),
(packets * TSIF_PKT_SIZE),
DMX_TSP_FORMAT_192_TAIL);
tsif_driver->ri =
(tsif_driver->ri + packets) % tsif_driver->buffer_size;
tsif_reclaim_packets(tsif_driver->tsif_handler,
tsif_driver->ri);
} else {
/*
* wi < ri, means wraparound on cyclic buffer.
* Handle in two stages.
*/
packets = (tsif_driver->buffer_size - tsif_driver->ri);
mpq_demux->hw_notification_size = packets;
dvb_dmx_swfilter_format(
&mpq_demux->demux,
(tsif_driver->data_buffer +
(tsif_driver->ri * TSIF_PKT_SIZE)),
(packets * TSIF_PKT_SIZE),
DMX_TSP_FORMAT_192_TAIL);
/* tsif_driver->ri should be 0 after this */
tsif_driver->ri =
(tsif_driver->ri + packets) % tsif_driver->buffer_size;
packets = tsif_driver->wi;
if (packets > 0) {
mpq_demux->hw_notification_size += packets;
if (tsif_driver->wi > tsif_driver->ri) {
packets = (tsif_driver->wi - tsif_driver->ri);
mpq_demux->hw_notification_size = packets;
dvb_dmx_swfilter_format(
&mpq_demux->demux,
@ -202,13 +172,55 @@ static void mpq_dmx_tsif_work(struct work_struct *worker)
tsif_driver->ri =
(tsif_driver->ri + packets) %
tsif_driver->buffer_size;
tsif_reclaim_packets(
tsif_driver->tsif_handler,
tsif_driver->ri);
} else {
/*
* wi < ri, means wraparound on cyclic buffer.
* Handle in two stages.
*/
packets = (tsif_driver->buffer_size - tsif_driver->ri);
mpq_demux->hw_notification_size = packets;
dvb_dmx_swfilter_format(
&mpq_demux->demux,
(tsif_driver->data_buffer +
(tsif_driver->ri * TSIF_PKT_SIZE)),
(packets * TSIF_PKT_SIZE),
DMX_TSP_FORMAT_192_TAIL);
/* tsif_driver->ri should be 0 after this */
tsif_driver->ri =
(tsif_driver->ri + packets) %
tsif_driver->buffer_size;
packets = tsif_driver->wi;
if (packets > 0) {
mpq_demux->hw_notification_size += packets;
dvb_dmx_swfilter_format(
&mpq_demux->demux,
(tsif_driver->data_buffer +
(tsif_driver->ri * TSIF_PKT_SIZE)),
(packets * TSIF_PKT_SIZE),
DMX_TSP_FORMAT_192_TAIL);
tsif_driver->ri =
(tsif_driver->ri + packets) %
tsif_driver->buffer_size;
}
tsif_reclaim_packets(
tsif_driver->tsif_handler,
tsif_driver->ri);
}
tsif_reclaim_packets(tsif_driver->tsif_handler,
tsif_driver->ri);
}
mutex_unlock(&mpq_dmx_tsif_info.tsif[tsif].mutex);
} while (1);
mutex_unlock(&mpq_dmx_tsif_info.tsif[tsif].mutex);
return 0;
}
@ -220,7 +232,6 @@ static void mpq_dmx_tsif_work(struct work_struct *worker)
static void mpq_tsif_callback(void *user)
{
int tsif = (int)user;
struct work_struct *work;
struct mpq_demux *mpq_demux;
MPQ_DVB_DBG_PRINT("%s executed, tsif = %d\n", __func__, tsif);
@ -229,11 +240,8 @@ static void mpq_tsif_callback(void *user)
mpq_demux = mpq_dmx_tsif_info.tsif[tsif].mpq_demux;
mpq_dmx_update_hw_statistics(mpq_demux);
work = &mpq_dmx_tsif_info.tsif[tsif].work.work;
/* Scheudle a new work to demux workqueue */
if (!work_pending(work))
queue_work(mpq_dmx_tsif_info.tsif[tsif].workqueue, work);
atomic_inc(&mpq_dmx_tsif_info.tsif[tsif].data_cnt);
wake_up(&mpq_dmx_tsif_info.tsif[tsif].wait_queue);
}
@ -376,20 +384,10 @@ static int mpq_tsif_dmx_stop(struct mpq_demux *mpq_demux)
tsif_driver = &(mpq_dmx_tsif_info.tsif[tsif].tsif_driver);
tsif_stop(tsif_driver->tsif_handler);
tsif_detach(tsif_driver->tsif_handler);
/*
* temporarily release mutex and flush the work queue
* before setting tsif_handler to NULL
*/
mutex_unlock(&mpq_dmx_tsif_info.tsif[tsif].mutex);
flush_workqueue(mpq_dmx_tsif_info.tsif[tsif].workqueue);
/* re-acquire mutex */
if (mutex_lock_interruptible(
&mpq_dmx_tsif_info.tsif[tsif].mutex))
return -ERESTARTSYS;
tsif_driver->tsif_handler = NULL;
tsif_driver->data_buffer = NULL;
tsif_driver->buffer_size = 0;
atomic_set(&mpq_dmx_tsif_info.tsif[tsif].data_cnt, 0);
mpq_dmx_tsif_info.tsif[tsif].mpq_demux = NULL;
}
@ -708,31 +706,28 @@ static int __init mpq_dmx_tsif_plugin_init(void)
}
for (i = 0; i < TSIF_COUNT; i++) {
mpq_dmx_tsif_info.tsif[i].work.tsif_id = i;
INIT_WORK(&mpq_dmx_tsif_info.tsif[i].work.work,
mpq_dmx_tsif_work);
snprintf(mpq_dmx_tsif_info.tsif[i].name,
TSIF_NAME_LENGTH,
"tsif_%d",
"dmx_tsif%d",
i);
mpq_dmx_tsif_info.tsif[i].workqueue =
create_singlethread_workqueue(
atomic_set(&mpq_dmx_tsif_info.tsif[i].data_cnt, 0);
init_waitqueue_head(&mpq_dmx_tsif_info.tsif[i].wait_queue);
mpq_dmx_tsif_info.tsif[i].thread =
kthread_run(
mpq_dmx_tsif_thread, (void *)i,
mpq_dmx_tsif_info.tsif[i].name);
if (mpq_dmx_tsif_info.tsif[i].workqueue == NULL) {
if (IS_ERR(mpq_dmx_tsif_info.tsif[i].thread)) {
int j;
for (j = 0; j < i; j++) {
destroy_workqueue(
mpq_dmx_tsif_info.tsif[j].workqueue);
kthread_stop(mpq_dmx_tsif_info.tsif[j].thread);
mutex_destroy(&mpq_dmx_tsif_info.tsif[j].mutex);
}
MPQ_DVB_ERR_PRINT(
"%s: create_singlethread_workqueue failed\n",
"%s: kthread_run failed\n",
__func__);
return -ENOMEM;
@ -753,7 +748,7 @@ static int __init mpq_dmx_tsif_plugin_init(void)
ret);
for (i = 0; i < TSIF_COUNT; i++) {
destroy_workqueue(mpq_dmx_tsif_info.tsif[i].workqueue);
kthread_stop(mpq_dmx_tsif_info.tsif[i].thread);
mutex_destroy(&mpq_dmx_tsif_info.tsif[i].mutex);
}
}
@ -781,16 +776,13 @@ static void __exit mpq_dmx_tsif_plugin_exit(void)
if (tsif_driver->tsif_handler)
tsif_stop(tsif_driver->tsif_handler);
}
/* Detach from TSIF driver to avoid further notifications. */
if (tsif_driver->tsif_handler)
tsif_detach(tsif_driver->tsif_handler);
/* release mutex to allow work queue to finish scheduled work */
mutex_unlock(&mpq_dmx_tsif_info.tsif[i].mutex);
/* flush the work queue and destroy it */
flush_workqueue(mpq_dmx_tsif_info.tsif[i].workqueue);
destroy_workqueue(mpq_dmx_tsif_info.tsif[i].workqueue);
kthread_stop(mpq_dmx_tsif_info.tsif[i].thread);
mutex_destroy(&mpq_dmx_tsif_info.tsif[i].mutex);
}

View file

@ -12,12 +12,11 @@
#include <linux/init.h>
#include <linux/module.h>
#include <linux/workqueue.h>
#include <linux/kthread.h>
#include <mach/msm_tspp.h>
#include "mpq_dvb_debug.h"
#include "mpq_dmx_plugin_common.h"
#define TSIF_COUNT 2
#define TSPP_MAX_PID_FILTER_NUM 16
@ -28,6 +27,7 @@
/* For each TSIF we allocate two pipes, one for PES and one for sections */
#define TSPP_PES_CHANNEL 0
#define TSPP_SECTION_CHANNEL 1
#define TSPP_CHANNEL_COUNT 2
/* the channel_id set to TSPP driver based on TSIF number and channel type */
#define TSPP_CHANNEL_ID(tsif, ch) ((tsif << 1) + ch)
@ -84,18 +84,10 @@ enum mem_buffer_allocation_mode {
static int clock_inv;
static int tsif_mode = 2;
static int allocation_mode = MPQ_DMX_TSPP_INTERNAL_ALLOC;
module_param(tsif_mode, int, S_IRUGO);
module_param(clock_inv, int, S_IRUGO);
module_param(tsif_mode, int, S_IRUGO | S_IWUSR);
module_param(clock_inv, int, S_IRUGO | S_IWUSR);
module_param(allocation_mode, int, S_IRUGO);
/*
* Work scheduled each time TSPP notifies dmx
* of new TS packet in some channel
*/
struct tspp_work {
struct work_struct work;
int channel_id;
};
/* The following structure hold singelton information
* required for dmx implementation on top of TSPP.
@ -111,8 +103,8 @@ static struct
*/
int pes_channel_ref;
/* work used to submit to workqueue to process pes channel */
struct tspp_work pes_work;
/* Counter for data notifications on PES pipe */
atomic_t pes_data_cnt;
/* ION handle used for TSPP data buffer allocation */
struct ion_handle *pes_mem_heap_handle;
@ -130,8 +122,8 @@ static struct
*/
int section_channel_ref;
/* work used to submit to workqueue to process pes channel */
struct tspp_work section_work;
/* Counter for data notifications on section pipe */
atomic_t section_data_cnt;
/* ION handle used for TSPP data buffer allocation */
struct ion_handle *section_mem_heap_handle;
@ -151,8 +143,9 @@ static struct
int ref_count;
} filters[TSPP_MAX_PID_FILTER_NUM];
/* workqueue that processes TS packets from specific TSIF */
struct workqueue_struct *workqueue;
/* thread processing TS packets from TSPP */
struct task_struct *thread;
wait_queue_head_t wait_queue;
/* TSIF alias */
char name[TSIF_NAME_LENGTH];
@ -274,55 +267,93 @@ static int mpq_tspp_get_filter_slot(int tsif, int pid)
}
/**
* Worker function that processes the TS packets notified by TSPP.
* Demux thread function handling data from specific TSIF.
*
* @worker: the executed work
* @arg: TSIF number
*/
static void mpq_dmx_tspp_work(struct work_struct *worker)
static int mpq_dmx_tspp_thread(void *arg)
{
struct tspp_work *tspp_work =
container_of(worker, struct tspp_work, work);
int tsif = (int)arg;
struct mpq_demux *mpq_demux;
int channel_id = tspp_work->channel_id;
int tsif = TSPP_GET_TSIF_NUM(channel_id);
const struct tspp_data_descriptor *tspp_data_desc;
atomic_t *data_cnt;
int ref_count;
int ret;
int i;
mpq_demux = mpq_dmx_tspp_info.tsif[tsif].mpq_demux;
do {
ret = wait_event_interruptible(
mpq_dmx_tspp_info.tsif[tsif].wait_queue,
(atomic_read(
&mpq_dmx_tspp_info.tsif[tsif].pes_data_cnt)) ||
(atomic_read(
&mpq_dmx_tspp_info.tsif[tsif].section_data_cnt)) ||
kthread_should_stop());
/* Lock against the TSPP filters data-structure */
if (mutex_lock_interruptible(&mpq_dmx_tspp_info.tsif[tsif].mutex))
return;
if ((ret < 0) || kthread_should_stop()) {
MPQ_DVB_ERR_PRINT("%s: exit\n", __func__);
break;
}
/* Make sure channel is still active */
if (TSPP_IS_PES_CHANNEL(channel_id))
ref_count = mpq_dmx_tspp_info.tsif[tsif].pes_channel_ref;
else
ref_count = mpq_dmx_tspp_info.tsif[tsif].section_channel_ref;
/* Lock against the TSPP filters data-structure */
if (mutex_lock_interruptible(
&mpq_dmx_tspp_info.tsif[tsif].mutex))
return -ERESTARTSYS;
for (i = 0; i < TSPP_CHANNEL_COUNT; i++) {
int channel_id = TSPP_CHANNEL_ID(tsif, i);
if (TSPP_IS_PES_CHANNEL(channel_id)) {
ref_count =
mpq_dmx_tspp_info.tsif[tsif].pes_channel_ref;
data_cnt =
&mpq_dmx_tspp_info.tsif[tsif].pes_data_cnt;
} else {
ref_count =
mpq_dmx_tspp_info.tsif[tsif].
section_channel_ref;
data_cnt =
&mpq_dmx_tspp_info.tsif[tsif].section_data_cnt;
}
/* Make sure channel is still active */
if (ref_count == 0)
continue;
atomic_dec(data_cnt);
mpq_demux = mpq_dmx_tspp_info.tsif[tsif].mpq_demux;
mpq_demux->hw_notification_size = 0;
/*
* Go through all filled descriptors
* and perform demuxing on them
*/
while ((tspp_data_desc =
tspp_get_buffer(0, channel_id)) != NULL) {
mpq_demux->hw_notification_size +=
(tspp_data_desc->size /
TSPP_RAW_TTS_SIZE);
dvb_dmx_swfilter_format(
&mpq_demux->demux,
tspp_data_desc->virt_base,
tspp_data_desc->size,
DMX_TSP_FORMAT_192_TAIL);
/*
* Notify TSPP that the buffer
* is no longer needed
*/
tspp_release_buffer(0,
channel_id, tspp_data_desc->id);
}
}
if (ref_count == 0) {
mutex_unlock(&mpq_dmx_tspp_info.tsif[tsif].mutex);
return;
}
} while (1);
mpq_demux->hw_notification_size = 0;
/* Go through all filled descriptors and perform demuxing on them */
while ((tspp_data_desc = tspp_get_buffer(0, channel_id)) != NULL) {
mpq_demux->hw_notification_size +=
(tspp_data_desc->size / TSPP_RAW_TTS_SIZE);
dvb_dmx_swfilter_format(
&mpq_demux->demux,
tspp_data_desc->virt_base,
tspp_data_desc->size,
DMX_TSP_FORMAT_192_TAIL);
/* Notify TSPP that the buffer is no longer needed */
tspp_release_buffer(0, channel_id, tspp_data_desc->id);
}
mutex_unlock(&mpq_dmx_tspp_info.tsif[tsif].mutex);
return 0;
}
/**
@ -334,7 +365,6 @@ static void mpq_dmx_tspp_work(struct work_struct *worker)
static void mpq_tspp_callback(int channel_id, void *user)
{
int tsif = (int)user;
struct work_struct *work;
struct mpq_demux *mpq_demux;
/* Save statistics on TSPP notifications */
@ -342,13 +372,11 @@ static void mpq_tspp_callback(int channel_id, void *user)
mpq_dmx_update_hw_statistics(mpq_demux);
if (TSPP_IS_PES_CHANNEL(channel_id))
work = &mpq_dmx_tspp_info.tsif[tsif].pes_work.work;
atomic_inc(&mpq_dmx_tspp_info.tsif[tsif].pes_data_cnt);
else
work = &mpq_dmx_tspp_info.tsif[tsif].section_work.work;
atomic_inc(&mpq_dmx_tspp_info.tsif[tsif].section_data_cnt);
/* Scheudle a new work to demux workqueue */
if (!work_pending(work))
queue_work(mpq_dmx_tspp_info.tsif[tsif].workqueue, work);
wake_up(&mpq_dmx_tspp_info.tsif[tsif].wait_queue);
}
/**
@ -586,6 +614,7 @@ static int mpq_tspp_dmx_remove_channel(struct dvb_demux_feed *feed)
int tsif;
int ret;
int channel_id;
atomic_t *data_cnt;
int *channel_ref_count;
struct tspp_filter tspp_filter;
struct mpq_demux *mpq_demux = feed->demux->priv;
@ -613,10 +642,12 @@ static int mpq_tspp_dmx_remove_channel(struct dvb_demux_feed *feed)
channel_id = TSPP_CHANNEL_ID(tsif, TSPP_PES_CHANNEL);
channel_ref_count =
&mpq_dmx_tspp_info.tsif[tsif].pes_channel_ref;
data_cnt = &mpq_dmx_tspp_info.tsif[tsif].pes_data_cnt;
} else {
channel_id = TSPP_CHANNEL_ID(tsif, TSPP_SECTION_CHANNEL);
channel_ref_count =
&mpq_dmx_tspp_info.tsif[tsif].section_channel_ref;
data_cnt = &mpq_dmx_tspp_info.tsif[tsif].section_data_cnt;
}
/* check if required TSPP pipe is already allocated or not */
@ -677,6 +708,7 @@ static int mpq_tspp_dmx_remove_channel(struct dvb_demux_feed *feed)
tspp_unregister_notification(0, channel_id);
tspp_close_channel(0, channel_id);
tspp_close_stream(0, channel_id);
atomic_set(data_cnt, 0);
}
mutex_unlock(&mpq_dmx_tspp_info.tsif[tsif].mutex);
@ -1079,24 +1111,14 @@ static int __init mpq_dmx_tspp_plugin_init(void)
mpq_dmx_tspp_info.tsif[i].pes_mem_heap_handle = NULL;
mpq_dmx_tspp_info.tsif[i].pes_mem_heap_virt_base = NULL;
mpq_dmx_tspp_info.tsif[i].pes_mem_heap_phys_base = 0;
mpq_dmx_tspp_info.tsif[i].pes_work.channel_id =
TSPP_CHANNEL_ID(i, TSPP_PES_CHANNEL);
INIT_WORK(&mpq_dmx_tspp_info.tsif[i].pes_work.work,
mpq_dmx_tspp_work);
atomic_set(&mpq_dmx_tspp_info.tsif[i].pes_data_cnt, 0);
mpq_dmx_tspp_info.tsif[i].section_channel_ref = 0;
mpq_dmx_tspp_info.tsif[i].section_index = 0;
mpq_dmx_tspp_info.tsif[i].section_mem_heap_handle = NULL;
mpq_dmx_tspp_info.tsif[i].section_mem_heap_virt_base = NULL;
mpq_dmx_tspp_info.tsif[i].section_mem_heap_phys_base = 0;
mpq_dmx_tspp_info.tsif[i].section_work.channel_id =
TSPP_CHANNEL_ID(i, TSPP_SECTION_CHANNEL);
INIT_WORK(&mpq_dmx_tspp_info.tsif[i].section_work.work,
mpq_dmx_tspp_work);
atomic_set(&mpq_dmx_tspp_info.tsif[i].section_data_cnt, 0);
for (j = 0; j < TSPP_MAX_PID_FILTER_NUM; j++) {
mpq_dmx_tspp_info.tsif[i].filters[j].pid = -1;
@ -1105,22 +1127,23 @@ static int __init mpq_dmx_tspp_plugin_init(void)
snprintf(mpq_dmx_tspp_info.tsif[i].name,
TSIF_NAME_LENGTH,
"tsif_%d",
"dmx_tsif%d",
i);
mpq_dmx_tspp_info.tsif[i].workqueue =
create_singlethread_workqueue(
init_waitqueue_head(&mpq_dmx_tspp_info.tsif[i].wait_queue);
mpq_dmx_tspp_info.tsif[i].thread =
kthread_run(
mpq_dmx_tspp_thread, (void *)i,
mpq_dmx_tspp_info.tsif[i].name);
if (mpq_dmx_tspp_info.tsif[i].workqueue == NULL) {
if (IS_ERR(mpq_dmx_tspp_info.tsif[i].thread)) {
for (j = 0; j < i; j++) {
destroy_workqueue(
mpq_dmx_tspp_info.tsif[j].workqueue);
kthread_stop(mpq_dmx_tspp_info.tsif[j].thread);
mutex_destroy(&mpq_dmx_tspp_info.tsif[j].mutex);
}
MPQ_DVB_ERR_PRINT(
"%s: create_singlethread_workqueue failed\n",
"%s: kthread_run failed\n",
__func__);
return -ENOMEM;
@ -1138,7 +1161,7 @@ static int __init mpq_dmx_tspp_plugin_init(void)
ret);
for (i = 0; i < TSIF_COUNT; i++) {
destroy_workqueue(mpq_dmx_tspp_info.tsif[i].workqueue);
kthread_stop(mpq_dmx_tspp_info.tsif[i].thread);
mutex_destroy(&mpq_dmx_tspp_info.tsif[i].mutex);
}
}
@ -1179,8 +1202,7 @@ static void __exit mpq_dmx_tspp_plugin_exit(void)
mpq_dmx_tsif_ion_cleanup(i);
mutex_unlock(&mpq_dmx_tspp_info.tsif[i].mutex);
flush_workqueue(mpq_dmx_tspp_info.tsif[i].workqueue);
destroy_workqueue(mpq_dmx_tspp_info.tsif[i].workqueue);
kthread_stop(mpq_dmx_tspp_info.tsif[i].thread);
mutex_destroy(&mpq_dmx_tspp_info.tsif[i].mutex);
}