summaryrefslogtreecommitdiff
path: root/firmware/kernel/queue.c
diff options
context:
space:
mode:
Diffstat (limited to 'firmware/kernel/queue.c')
-rw-r--r--firmware/kernel/queue.c786
1 files changed, 786 insertions, 0 deletions
diff --git a/firmware/kernel/queue.c b/firmware/kernel/queue.c
new file mode 100644
index 0000000000..379e3f62c8
--- /dev/null
+++ b/firmware/kernel/queue.c
@@ -0,0 +1,786 @@
1/***************************************************************************
2 * __________ __ ___.
3 * Open \______ \ ____ ____ | | _\_ |__ _______ ___
4 * Source | _// _ \_/ ___\| |/ /| __ \ / _ \ \/ /
5 * Jukebox | | ( <_> ) \___| < | \_\ ( <_> > < <
6 * Firmware |____|_ /\____/ \___ >__|_ \|___ /\____/__/\_ \
7 * \/ \/ \/ \/ \/
8 * $Id$
9 *
10 * Copyright (C) 2002 by Björn Stenberg
11 *
12 * This program is free software; you can redistribute it and/or
13 * modify it under the terms of the GNU General Public License
14 * as published by the Free Software Foundation; either version 2
15 * of the License, or (at your option) any later version.
16 *
17 * This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
18 * KIND, either express or implied.
19 *
20 ****************************************************************************/
21
22#include <string.h>
23#include "config.h"
24#include "kernel.h"
25#include "system.h"
26#include "queue.h"
27#include "corelock.h"
28#include "kernel-internal.h"
29#include "general.h"
30#include "panic.h"
31
32/* This array holds all queues that are initiated. It is used for broadcast. */
33static struct
34{
35 struct event_queue *queues[MAX_NUM_QUEUES+1];
36#ifdef HAVE_CORELOCK_OBJECT
37 struct corelock cl;
38#endif
39} all_queues SHAREDBSS_ATTR;
40
41/****************************************************************************
42 * Queue handling stuff
43 ****************************************************************************/
44
45#ifdef HAVE_EXTENDED_MESSAGING_AND_NAME
46/****************************************************************************
47 * Sender thread queue structure that aids implementation of priority
48 * inheritance on queues because the send list structure is the same as
49 * for all other kernel objects:
50 *
51 * Example state:
52 * E0 added with queue_send and removed by thread via queue_wait(_w_tmo)
53 * E3 was posted with queue_post
54 * 4 events remain enqueued (E1-E4)
55 *
56 * rd wr
57 * q->events[]: | XX | E1 | E2 | E3 | E4 | XX |
58 * q->send->senders[]: | NULL | T1 | T2 | NULL | T3 | NULL |
59 * \/ \/ \/
60 * q->send->list: >->|T0|<->|T1|<->|T2|<-------->|T3|<-<
61 * q->send->curr_sender: /\
62 *
63 * Thread has E0 in its own struct queue_event.
64 *
65 ****************************************************************************/
66
67/* Puts the specified return value in the waiting thread's return value
68 * and wakes the thread.
69 *
70 * A sender should be confirmed to exist before calling which makes it
71 * more efficent to reject the majority of cases that don't need this
72 * called.
73 */
74static void queue_release_sender(struct thread_entry * volatile * sender,
75 intptr_t retval)
76{
77 struct thread_entry *thread = *sender;
78
79 *sender = NULL; /* Clear slot. */
80#ifdef HAVE_WAKEUP_EXT_CB
81 thread->wakeup_ext_cb = NULL; /* Clear callback. */
82#endif
83 thread->retval = retval; /* Assign thread-local return value. */
84 *thread->bqp = thread; /* Move blocking queue head to thread since
85 wakeup_thread wakes the first thread in
86 the list. */
87 wakeup_thread(thread->bqp);
88}
89
90/* Releases any waiting threads that are queued with queue_send -
91 * reply with 0.
92 */
93static void queue_release_all_senders(struct event_queue *q)
94{
95 if(q->send)
96 {
97 unsigned int i;
98 for(i = q->read; i != q->write; i++)
99 {
100 struct thread_entry **spp =
101 &q->send->senders[i & QUEUE_LENGTH_MASK];
102
103 if(*spp)
104 {
105 queue_release_sender(spp, 0);
106 }
107 }
108 }
109}
110
111/* Callback to do extra forced removal steps from sender list in addition
112 * to the normal blocking queue removal and priority dis-inherit */
113static void queue_remove_sender_thread_cb(struct thread_entry *thread)
114{
115 *((struct thread_entry **)thread->retval) = NULL;
116#ifdef HAVE_WAKEUP_EXT_CB
117 thread->wakeup_ext_cb = NULL;
118#endif
119 thread->retval = 0;
120}
121
122/* Enables queue_send on the specified queue - caller allocates the extra
123 * data structure. Only queues which are taken to be owned by a thread should
124 * enable this however an official owner is not compulsory but must be
125 * specified for priority inheritance to operate.
126 *
127 * Use of queue_wait(_w_tmo) by multiple threads on a queue using synchronous
128 * messages results in an undefined order of message replies or possible default
129 * replies if two or more waits happen before a reply is done.
130 */
131void queue_enable_queue_send(struct event_queue *q,
132 struct queue_sender_list *send,
133 unsigned int owner_id)
134{
135 int oldlevel = disable_irq_save();
136 corelock_lock(&q->cl);
137
138 if(send != NULL && q->send == NULL)
139 {
140 memset(send, 0, sizeof(*send));
141#ifdef HAVE_PRIORITY_SCHEDULING
142 send->blocker.wakeup_protocol = wakeup_priority_protocol_release;
143 send->blocker.priority = PRIORITY_IDLE;
144 if(owner_id != 0)
145 {
146 send->blocker.thread = thread_id_entry(owner_id);
147 q->blocker_p = &send->blocker;
148 }
149#endif
150 q->send = send;
151 }
152
153 corelock_unlock(&q->cl);
154 restore_irq(oldlevel);
155
156 (void)owner_id;
157}
158
159/* Unblock a blocked thread at a given event index */
160static inline void queue_do_unblock_sender(struct queue_sender_list *send,
161 unsigned int i)
162{
163 if(send)
164 {
165 struct thread_entry **spp = &send->senders[i];
166
167 if(UNLIKELY(*spp))
168 {
169 queue_release_sender(spp, 0);
170 }
171 }
172}
173
174/* Perform the auto-reply sequence */
175static inline void queue_do_auto_reply(struct queue_sender_list *send)
176{
177 if(send && send->curr_sender)
178 {
179 /* auto-reply */
180 queue_release_sender(&send->curr_sender, 0);
181 }
182}
183
184/* Moves waiting thread's refrence from the senders array to the
185 * current_sender which represents the thread waiting for a reponse to the
186 * last message removed from the queue. This also protects the thread from
187 * being bumped due to overflow which would not be a valid action since its
188 * message _is_ being processed at this point. */
189static inline void queue_do_fetch_sender(struct queue_sender_list *send,
190 unsigned int rd)
191{
192 if(send)
193 {
194 struct thread_entry **spp = &send->senders[rd];
195
196 if(*spp)
197 {
198 /* Move thread reference from array to the next thread
199 that queue_reply will release */
200 send->curr_sender = *spp;
201 (*spp)->retval = (intptr_t)spp;
202 *spp = NULL;
203 }
204 /* else message was posted asynchronously with queue_post */
205 }
206}
207#else
208/* Empty macros for when synchoronous sending is not made */
209#define queue_release_all_senders(q)
210#define queue_do_unblock_sender(send, i)
211#define queue_do_auto_reply(send)
212#define queue_do_fetch_sender(send, rd)
213#endif /* HAVE_EXTENDED_MESSAGING_AND_NAME */
214
215/* Queue must not be available for use during this call */
216void queue_init(struct event_queue *q, bool register_queue)
217{
218 int oldlevel = disable_irq_save();
219
220 if(register_queue)
221 {
222 corelock_lock(&all_queues.cl);
223 }
224
225 corelock_init(&q->cl);
226 q->queue = NULL;
227 /* What garbage is in write is irrelevant because of the masking design-
228 * any other functions the empty the queue do this as well so that
229 * queue_count and queue_empty return sane values in the case of a
230 * concurrent change without locking inside them. */
231 q->read = q->write;
232#ifdef HAVE_EXTENDED_MESSAGING_AND_NAME
233 q->send = NULL; /* No message sending by default */
234 IF_PRIO( q->blocker_p = NULL; )
235#endif
236
237 if(register_queue)
238 {
239 void **queues = (void **)all_queues.queues;
240 void **p = find_array_ptr(queues, q);
241
242 if(p - queues >= MAX_NUM_QUEUES)
243 {
244 panicf("queue_init->out of queues");
245 }
246
247 if(*p == NULL)
248 {
249 /* Add it to the all_queues array */
250 *p = q;
251 corelock_unlock(&all_queues.cl);
252 }
253 }
254
255 restore_irq(oldlevel);
256}
257
258/* Queue must not be available for use during this call */
259void queue_delete(struct event_queue *q)
260{
261 int oldlevel = disable_irq_save();
262 corelock_lock(&all_queues.cl);
263 corelock_lock(&q->cl);
264
265 /* Remove the queue if registered */
266 remove_array_ptr((void **)all_queues.queues, q);
267
268 corelock_unlock(&all_queues.cl);
269
270 /* Release thread(s) waiting on queue head */
271 thread_queue_wake(&q->queue);
272
273#ifdef HAVE_EXTENDED_MESSAGING_AND_NAME
274 if(q->send)
275 {
276 /* Release threads waiting for replies */
277 queue_release_all_senders(q);
278
279 /* Reply to any dequeued message waiting for one */
280 queue_do_auto_reply(q->send);
281
282 q->send = NULL;
283 IF_PRIO( q->blocker_p = NULL; )
284 }
285#endif
286
287 q->read = q->write;
288
289 corelock_unlock(&q->cl);
290 restore_irq(oldlevel);
291}
292
293/* NOTE: multiple threads waiting on a queue head cannot have a well-
294 defined release order if timeouts are used. If multiple threads must
295 access the queue head, use a dispatcher or queue_wait only. */
296void queue_wait(struct event_queue *q, struct queue_event *ev)
297{
298 int oldlevel;
299 unsigned int rd;
300
301#ifdef HAVE_PRIORITY_SCHEDULING
302 KERNEL_ASSERT(QUEUE_GET_THREAD(q) == NULL ||
303 QUEUE_GET_THREAD(q) == thread_self_entry(),
304 "queue_wait->wrong thread\n");
305#endif
306
307 oldlevel = disable_irq_save();
308 corelock_lock(&q->cl);
309
310#ifdef HAVE_EXTENDED_MESSAGING_AND_NAME
311 /* Auto-reply (even if ev is NULL to avoid stalling a waiting thread) */
312 queue_do_auto_reply(q->send);
313#endif
314
315 while(1)
316 {
317 struct thread_entry *current;
318
319 rd = q->read;
320 if (rd != q->write) /* A waking message could disappear */
321 break;
322
323 current = thread_self_entry();
324
325 IF_COP( current->obj_cl = &q->cl; )
326 current->bqp = &q->queue;
327
328 block_thread(current);
329
330 corelock_unlock(&q->cl);
331 switch_thread();
332
333 disable_irq();
334 corelock_lock(&q->cl);
335 }
336
337#ifdef HAVE_EXTENDED_MESSAGING_AND_NAME
338 if(ev)
339#endif
340 {
341 q->read = rd + 1;
342 rd &= QUEUE_LENGTH_MASK;
343 *ev = q->events[rd];
344
345 /* Get data for a waiting thread if one */
346 queue_do_fetch_sender(q->send, rd);
347 }
348 /* else just waiting on non-empty */
349
350 corelock_unlock(&q->cl);
351 restore_irq(oldlevel);
352}
353
354void queue_wait_w_tmo(struct event_queue *q, struct queue_event *ev, int ticks)
355{
356 int oldlevel;
357 unsigned int rd, wr;
358
359 /* this function works only with a positive number (or zero) of ticks */
360 if (ticks == TIMEOUT_BLOCK)
361 {
362 queue_wait(q, ev);
363 return;
364 }
365
366#ifdef HAVE_EXTENDED_MESSAGING_AND_NAME
367 KERNEL_ASSERT(QUEUE_GET_THREAD(q) == NULL ||
368 QUEUE_GET_THREAD(q) == thread_self_entry(),
369 "queue_wait_w_tmo->wrong thread\n");
370#endif
371
372 oldlevel = disable_irq_save();
373 corelock_lock(&q->cl);
374
375#ifdef HAVE_EXTENDED_MESSAGING_AND_NAME
376 /* Auto-reply (even if ev is NULL to avoid stalling a waiting thread) */
377 queue_do_auto_reply(q->send);
378#endif
379
380 rd = q->read;
381 wr = q->write;
382 if (rd == wr && ticks > 0)
383 {
384 struct thread_entry *current = thread_self_entry();
385
386 IF_COP( current->obj_cl = &q->cl; )
387 current->bqp = &q->queue;
388
389 block_thread_w_tmo(current, ticks);
390 corelock_unlock(&q->cl);
391
392 switch_thread();
393
394 disable_irq();
395 corelock_lock(&q->cl);
396
397 rd = q->read;
398 wr = q->write;
399 }
400
401#ifdef HAVE_EXTENDED_MESSAGING_AND_NAME
402 if(ev)
403#endif
404 {
405 /* no worry about a removed message here - status is checked inside
406 locks - perhaps verify if timeout or false alarm */
407 if (rd != wr)
408 {
409 q->read = rd + 1;
410 rd &= QUEUE_LENGTH_MASK;
411 *ev = q->events[rd];
412 /* Get data for a waiting thread if one */
413 queue_do_fetch_sender(q->send, rd);
414 }
415 else
416 {
417 ev->id = SYS_TIMEOUT;
418 }
419 }
420 /* else just waiting on non-empty */
421
422 corelock_unlock(&q->cl);
423 restore_irq(oldlevel);
424}
425
426void queue_post(struct event_queue *q, long id, intptr_t data)
427{
428 int oldlevel;
429 unsigned int wr;
430
431 oldlevel = disable_irq_save();
432 corelock_lock(&q->cl);
433
434 wr = q->write++ & QUEUE_LENGTH_MASK;
435
436 KERNEL_ASSERT((q->write - q->read) <= QUEUE_LENGTH,
437 "queue_post ovf q=%08lX", (long)q);
438
439 q->events[wr].id = id;
440 q->events[wr].data = data;
441
442 /* overflow protect - unblock any thread waiting at this index */
443 queue_do_unblock_sender(q->send, wr);
444
445 /* Wakeup a waiting thread if any */
446 wakeup_thread(&q->queue);
447
448 corelock_unlock(&q->cl);
449 restore_irq(oldlevel);
450}
451
452#ifdef HAVE_EXTENDED_MESSAGING_AND_NAME
453/* IRQ handlers are not allowed use of this function - we only aim to
454 protect the queue integrity by turning them off. */
455intptr_t queue_send(struct event_queue *q, long id, intptr_t data)
456{
457 int oldlevel;
458 unsigned int wr;
459
460 oldlevel = disable_irq_save();
461 corelock_lock(&q->cl);
462
463 wr = q->write++ & QUEUE_LENGTH_MASK;
464
465 KERNEL_ASSERT((q->write - q->read) <= QUEUE_LENGTH,
466 "queue_send ovf q=%08lX", (long)q);
467
468 q->events[wr].id = id;
469 q->events[wr].data = data;
470
471 if(LIKELY(q->send))
472 {
473 struct queue_sender_list *send = q->send;
474 struct thread_entry **spp = &send->senders[wr];
475 struct thread_entry *current = thread_self_entry();
476
477 if(UNLIKELY(*spp))
478 {
479 /* overflow protect - unblock any thread waiting at this index */
480 queue_release_sender(spp, 0);
481 }
482
483 /* Wakeup a waiting thread if any */
484 wakeup_thread(&q->queue);
485
486 /* Save thread in slot, add to list and wait for reply */
487 *spp = current;
488 IF_COP( current->obj_cl = &q->cl; )
489 IF_PRIO( current->blocker = q->blocker_p; )
490#ifdef HAVE_WAKEUP_EXT_CB
491 current->wakeup_ext_cb = queue_remove_sender_thread_cb;
492#endif
493 current->retval = (intptr_t)spp;
494 current->bqp = &send->list;
495
496 block_thread(current);
497
498 corelock_unlock(&q->cl);
499 switch_thread();
500
501 return current->retval;
502 }
503
504 /* Function as queue_post if sending is not enabled */
505 wakeup_thread(&q->queue);
506
507 corelock_unlock(&q->cl);
508 restore_irq(oldlevel);
509
510 return 0;
511}
512
513#if 0 /* not used now but probably will be later */
514/* Query if the last message dequeued was added by queue_send or not */
515bool queue_in_queue_send(struct event_queue *q)
516{
517 bool in_send;
518
519#if NUM_CORES > 1
520 int oldlevel = disable_irq_save();
521 corelock_lock(&q->cl);
522#endif
523
524 in_send = q->send && q->send->curr_sender;
525
526#if NUM_CORES > 1
527 corelock_unlock(&q->cl);
528 restore_irq(oldlevel);
529#endif
530
531 return in_send;
532}
533#endif
534
535/* Replies with retval to the last dequeued message sent with queue_send */
536void queue_reply(struct event_queue *q, intptr_t retval)
537{
538 if(q->send && q->send->curr_sender)
539 {
540 struct queue_sender_list *sender;
541
542 int oldlevel = disable_irq_save();
543 corelock_lock(&q->cl);
544
545 sender = q->send;
546
547 /* Double-check locking */
548 if(LIKELY(sender && sender->curr_sender))
549 queue_release_sender(&sender->curr_sender, retval);
550
551 corelock_unlock(&q->cl);
552 restore_irq(oldlevel);
553 }
554}
555#endif /* HAVE_EXTENDED_MESSAGING_AND_NAME */
556
557#ifdef HAVE_EXTENDED_MESSAGING_AND_NAME
558/* Scan the even queue from head to tail, returning any event from the
559 filter list that was found, optionally removing the event. If an
560 event is returned, synchronous events are handled in the same manner as
561 with queue_wait(_w_tmo); if discarded, then as queue_clear.
562 If filters are NULL, any event matches. If filters exist, the default
563 is to search the full queue depth.
564 Earlier filters take precedence.
565
566 Return true if an event was found, false otherwise. */
567bool queue_peek_ex(struct event_queue *q, struct queue_event *ev,
568 unsigned int flags, const long (*filters)[2])
569{
570 bool have_msg;
571 unsigned int rd, wr;
572 int oldlevel;
573
574 if(LIKELY(q->read == q->write))
575 return false; /* Empty: do nothing further */
576
577 have_msg = false;
578
579 oldlevel = disable_irq_save();
580 corelock_lock(&q->cl);
581
582 /* Starting at the head, find first match */
583 for(rd = q->read, wr = q->write; rd != wr; rd++)
584 {
585 struct queue_event *e = &q->events[rd & QUEUE_LENGTH_MASK];
586
587 if(filters)
588 {
589 /* Have filters - find the first thing that passes */
590 const long (* f)[2] = filters;
591 const long (* const f_last)[2] =
592 &filters[flags & QPEEK_FILTER_COUNT_MASK];
593 long id = e->id;
594
595 do
596 {
597 if(UNLIKELY(id >= (*f)[0] && id <= (*f)[1]))
598 goto passed_filter;
599 }
600 while(++f <= f_last);
601
602 if(LIKELY(!(flags & QPEEK_FILTER_HEAD_ONLY)))
603 continue; /* No match; test next event */
604 else
605 break; /* Only check the head */
606 }
607 /* else - anything passes */
608
609 passed_filter:
610
611 /* Found a matching event */
612 have_msg = true;
613
614 if(ev)
615 *ev = *e; /* Caller wants the event */
616
617 if(flags & QPEEK_REMOVE_EVENTS)
618 {
619 /* Do event removal */
620 unsigned int r = q->read;
621 q->read = r + 1; /* Advance head */
622
623 if(ev)
624 {
625 /* Auto-reply */
626 queue_do_auto_reply(q->send);
627 /* Get the thread waiting for reply, if any */
628 queue_do_fetch_sender(q->send, rd & QUEUE_LENGTH_MASK);
629 }
630 else
631 {
632 /* Release any thread waiting on this message */
633 queue_do_unblock_sender(q->send, rd & QUEUE_LENGTH_MASK);
634 }
635
636 /* Slide messages forward into the gap if not at the head */
637 while(rd != r)
638 {
639 unsigned int dst = rd & QUEUE_LENGTH_MASK;
640 unsigned int src = --rd & QUEUE_LENGTH_MASK;
641
642 q->events[dst] = q->events[src];
643 /* Keep sender wait list in sync */
644 if(q->send)
645 q->send->senders[dst] = q->send->senders[src];
646 }
647 }
648
649 break;
650 }
651
652 corelock_unlock(&q->cl);
653 restore_irq(oldlevel);
654
655 return have_msg;
656}
657
658bool queue_peek(struct event_queue *q, struct queue_event *ev)
659{
660 return queue_peek_ex(q, ev, 0, NULL);
661}
662
663void queue_remove_from_head(struct event_queue *q, long id)
664{
665 const long f[2] = { id, id };
666 while (queue_peek_ex(q, NULL,
667 QPEEK_FILTER_HEAD_ONLY | QPEEK_REMOVE_EVENTS, &f));
668}
669#else /* !HAVE_EXTENDED_MESSAGING_AND_NAME */
670/* The more powerful routines aren't required */
671bool queue_peek(struct event_queue *q, struct queue_event *ev)
672{
673 unsigned int rd;
674
675 if(q->read == q->write)
676 return false;
677
678 bool have_msg = false;
679
680 int oldlevel = disable_irq_save();
681 corelock_lock(&q->cl);
682
683 rd = q->read;
684 if(rd != q->write)
685 {
686 *ev = q->events[rd & QUEUE_LENGTH_MASK];
687 have_msg = true;
688 }
689
690 corelock_unlock(&q->cl);
691 restore_irq(oldlevel);
692
693 return have_msg;
694}
695
696void queue_remove_from_head(struct event_queue *q, long id)
697{
698 int oldlevel;
699
700 oldlevel = disable_irq_save();
701 corelock_lock(&q->cl);
702
703 while(q->read != q->write)
704 {
705 unsigned int rd = q->read & QUEUE_LENGTH_MASK;
706
707 if(q->events[rd].id != id)
708 {
709 break;
710 }
711
712 /* Release any thread waiting on this message */
713 queue_do_unblock_sender(q->send, rd);
714
715 q->read++;
716 }
717
718 corelock_unlock(&q->cl);
719 restore_irq(oldlevel);
720}
721#endif /* HAVE_EXTENDED_MESSAGING_AND_NAME */
722
723/* Poll queue to see if a message exists - careful in using the result if
724 * queue_remove_from_head is called when messages are posted - possibly use
725 * queue_wait_w_tmo(&q, 0) in that case or else a removed message that
726 * unsignals the queue may cause an unwanted block */
727bool queue_empty(const struct event_queue* q)
728{
729 return ( q->read == q->write );
730}
731
732void queue_clear(struct event_queue* q)
733{
734 int oldlevel;
735
736 oldlevel = disable_irq_save();
737 corelock_lock(&q->cl);
738
739 /* Release all threads waiting in the queue for a reply -
740 dequeued sent message will be handled by owning thread */
741 queue_release_all_senders(q);
742
743 q->read = q->write;
744
745 corelock_unlock(&q->cl);
746 restore_irq(oldlevel);
747}
748
749/**
750 * The number of events waiting in the queue.
751 *
752 * @param struct of event_queue
753 * @return number of events in the queue
754 */
755int queue_count(const struct event_queue *q)
756{
757 return q->write - q->read;
758}
759
760int queue_broadcast(long id, intptr_t data)
761{
762 struct event_queue **p = all_queues.queues;
763 struct event_queue *q;
764
765#if NUM_CORES > 1
766 int oldlevel = disable_irq_save();
767 corelock_lock(&all_queues.cl);
768#endif
769
770 for(q = *p; q != NULL; q = *(++p))
771 {
772 queue_post(q, id, data);
773 }
774
775#if NUM_CORES > 1
776 corelock_unlock(&all_queues.cl);
777 restore_irq(oldlevel);
778#endif
779
780 return p - all_queues.queues;
781}
782
783void init_queues(void)
784{
785 corelock_init(&all_queues.cl);
786}