diff --git a/consumer_test.go b/consumer_test.go index 18e41c8..18439d6 100644 --- a/consumer_test.go +++ b/consumer_test.go @@ -104,7 +104,7 @@ func TestJobReachTimeout(t *testing.T) { WithWorkerCount(2), ) assert.NoError(t, err) - assert.NoError(t, q.Queue(m, job.WithTimeout(30*time.Millisecond))) + assert.NoError(t, q.Queue(m, job.AllowOption{Timeout: job.Time(30 * time.Millisecond)})) q.Start() time.Sleep(50 * time.Millisecond) q.Release() @@ -138,8 +138,8 @@ func TestCancelJobAfterShutdown(t *testing.T) { WithWorkerCount(2), ) assert.NoError(t, err) - assert.NoError(t, q.Queue(m, job.WithTimeout(100*time.Millisecond))) - assert.NoError(t, q.Queue(m, job.WithTimeout(100*time.Millisecond))) + assert.NoError(t, q.Queue(m, job.AllowOption{Timeout: job.Time(100 * time.Millisecond)})) + assert.NoError(t, q.Queue(m, job.AllowOption{Timeout: job.Time(100 * time.Millisecond)})) q.Start() time.Sleep(10 * time.Millisecond) assert.Equal(t, 2, int(q.metric.busyWorkers)) @@ -367,8 +367,10 @@ func TestRetryCountWithNewMessage(t *testing.T) { assert.NoError(t, q.Queue( m, - job.WithRetryCount(3), - job.WithRetryDelay(50*time.Millisecond), + job.AllowOption{ + RetryCount: job.Int64(3), + RetryDelay: job.Time(50 * time.Millisecond), + }, )) assert.Len(t, messages, 0) q.Start() @@ -403,8 +405,10 @@ func TestRetryCountWithNewTask(t *testing.T) { messages <- "foobar" return nil }, - job.WithRetryCount(3), - job.WithRetryDelay(50*time.Millisecond), + job.AllowOption{ + RetryCount: job.Int64(3), + RetryDelay: job.Time(50 * time.Millisecond), + }, )) assert.Len(t, messages, 0) q.Start() @@ -437,8 +441,10 @@ func TestCancelRetryCountWithNewTask(t *testing.T) { messages <- "foobar" return nil }, - job.WithRetryCount(3), - job.WithRetryDelay(100*time.Millisecond), + job.AllowOption{ + RetryCount: job.Int64(3), + RetryDelay: job.Time(100 * time.Millisecond), + }, )) assert.Len(t, messages, 0) q.Start() @@ -478,8 +484,10 @@ func TestCancelRetryCountWithNewMessage(t *testing.T) { assert.NoError(t, q.Queue( m, - job.WithRetryCount(3), - job.WithRetryDelay(100*time.Millisecond), + job.AllowOption{ + RetryCount: job.Int64(3), + RetryDelay: job.Time(100 * time.Millisecond), + }, )) assert.Len(t, messages, 0) q.Start() diff --git a/job/benchmark_test.go b/job/benchmark_test.go new file mode 100644 index 0000000..7c6a0a6 --- /dev/null +++ b/job/benchmark_test.go @@ -0,0 +1,33 @@ +package job + +import ( + "context" + "testing" + "time" +) + +func BenchmarkNewTask(b *testing.B) { + for i := 0; i < b.N; i++ { + NewTask(func(context.Context) error { + return nil + }, + AllowOption{ + RetryCount: Int64(100), + RetryDelay: Time(30 * time.Millisecond), + Timeout: Time(3 * time.Millisecond), + }, + ) + } +} + +func BenchmarkNewOption(b *testing.B) { + for i := 0; i < b.N; i++ { + _ = NewOptions( + AllowOption{ + RetryCount: Int64(100), + RetryDelay: Time(30 * time.Millisecond), + Timeout: Time(3 * time.Millisecond), + }, + ) + } +} diff --git a/job/job.go b/job/job.go index ad98fb3..54e4d4b 100644 --- a/job/job.go +++ b/job/job.go @@ -47,7 +47,7 @@ func (m *Message) Encode() []byte { return b } -func NewMessage(m core.QueuedMessage, opts ...Option) *Message { +func NewMessage(m core.QueuedMessage, opts ...AllowOption) *Message { o := NewOptions(opts...) return &Message{ @@ -58,7 +58,7 @@ func NewMessage(m core.QueuedMessage, opts ...Option) *Message { } } -func NewTask(task TaskFunc, opts ...Option) *Message { +func NewTask(task TaskFunc, opts ...AllowOption) *Message { o := NewOptions(opts...) return &Message{ diff --git a/job/job_test.go b/job/job_test.go new file mode 100644 index 0000000..8d7765a --- /dev/null +++ b/job/job_test.go @@ -0,0 +1 @@ +package job diff --git a/job/option.go b/job/option.go index a0be4a9..0873f6b 100644 --- a/job/option.go +++ b/job/option.go @@ -8,54 +8,45 @@ type Options struct { timeout time.Duration } -// An Option configures a mutex. -type Option interface { - apply(*Options) -} - -// OptionFunc is a function that configures a job. -type OptionFunc func(*Options) - -// apply calls f(option) -func (f OptionFunc) apply(option *Options) { - f(option) -} - -func newDefaultOptions() *Options { - return &Options{ +func newDefaultOptions() Options { + return Options{ retryCount: 0, retryDelay: 100 * time.Millisecond, timeout: 60 * time.Minute, } } +type AllowOption struct { + RetryCount *int64 + RetryDelay *time.Duration + Timeout *time.Duration +} + // NewOptions with custom parameter -func NewOptions(opts ...Option) *Options { +func NewOptions(opts ...AllowOption) Options { o := newDefaultOptions() - // Loop through each option - for _, opt := range opts { - // Call the option giving the instantiated - opt.apply(o) + if len(opts) != 0 { + if opts[0].RetryCount != nil && *opts[0].RetryCount != o.retryCount { + o.retryCount = *opts[0].RetryCount + } + + if opts[0].RetryDelay != nil && *opts[0].RetryDelay != o.retryDelay { + o.retryDelay = *opts[0].RetryDelay + } + + if opts[0].Timeout != nil && *opts[0].Timeout != o.timeout { + o.timeout = *opts[0].Timeout + } } return o } -func WithRetryCount(count int64) Option { - return OptionFunc(func(o *Options) { - o.retryCount = count - }) -} - -func WithRetryDelay(t time.Duration) Option { - return OptionFunc(func(o *Options) { - o.retryDelay = t - }) +func Int64(val int64) *int64 { + return &val } -func WithTimeout(t time.Duration) Option { - return OptionFunc(func(o *Options) { - o.timeout = t - }) +func Time(v time.Duration) *time.Duration { + return &v } diff --git a/job/option_test.go b/job/option_test.go new file mode 100644 index 0000000..95fc74b --- /dev/null +++ b/job/option_test.go @@ -0,0 +1,22 @@ +package job + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestOptions(t *testing.T) { + o := NewOptions( + AllowOption{ + RetryCount: Int64(100), + RetryDelay: Time(30 * time.Millisecond), + Timeout: Time(3 * time.Millisecond), + }, + ) + + assert.Equal(t, int64(100), o.retryCount) + assert.Equal(t, 30*time.Millisecond, o.retryDelay) + assert.Equal(t, 3*time.Millisecond, o.timeout) +} diff --git a/queue.go b/queue.go index 3298cf2..68a192b 100644 --- a/queue.go +++ b/queue.go @@ -114,7 +114,7 @@ func (q *Queue) Wait() { } // Queue to queue all job -func (q *Queue) Queue(m core.QueuedMessage, opts ...job.Option) error { +func (q *Queue) Queue(m core.QueuedMessage, opts ...job.AllowOption) error { if atomic.LoadInt32(&q.stopFlag) == 1 { return ErrQueueShutdown } @@ -133,7 +133,7 @@ func (q *Queue) Queue(m core.QueuedMessage, opts ...job.Option) error { } // QueueTask to queue job task -func (q *Queue) QueueTask(task job.TaskFunc, opts ...job.Option) error { +func (q *Queue) QueueTask(task job.TaskFunc, opts ...job.AllowOption) error { if atomic.LoadInt32(&q.stopFlag) == 1 { return ErrQueueShutdown } diff --git a/queue_example_test.go b/queue_example_test.go index 1383c1b..94b750f 100644 --- a/queue_example_test.go +++ b/queue_example_test.go @@ -75,7 +75,9 @@ func ExampleNewPool_queueTaskTimeout() { rets <- idx return nil - }, job.WithTimeout(100*time.Millisecond)); err != nil { + }, job.AllowOption{ + Timeout: job.Time(100 * time.Millisecond), + }); err != nil { log.Println(err) } }