diff --git a/consumer.go b/consumer.go index 80b8bad..96af040 100644 --- a/consumer.go +++ b/consumer.go @@ -119,15 +119,24 @@ func (s *Consumer) Queue(task QueuedMessage) error { // Request a new task from channel func (s *Consumer) Request() (QueuedMessage, error) { - select { - case task, ok := <-s.taskQueue: - if !ok { - return nil, ErrQueueHasBeenClosed + clock := 0 +loop: + for { + select { + case task, ok := <-s.taskQueue: + if !ok { + return nil, ErrQueueHasBeenClosed + } + return task, nil + case <-time.After(1 * time.Second): + if clock == 5 { + break loop + } + clock += 1 } - return task, nil - default: - return nil, ErrNoTaskInQueue } + + return nil, ErrNoTaskInQueue } // NewConsumer for create new consumer instance