diff options
author | zeripath | 2022-02-22 12:08:35 +0000 |
---|---|---|
committer | GitHub | 2022-02-22 20:08:35 +0800 |
commit | 382101ecc734ed2f2b3f1ecf19ec20b003d057a9 (patch) | |
tree | 7a8d8709320b35e357bf50df9f6f4a91e8f471b6 | |
parent | 86c3481effd5a5e7e9d486577df993f1b4f1adc3 (diff) |
In disk_channel queues synchronously push to disk on shutdown (#18415) (#18788)
Partial Backport of #18415
Instead of using an asynchronous goroutine to push to disk on shutdown
just close the datachan and immediately push to the disk.
Prevents messages of incompletely flushed queues.
Signed-off-by: Andrew Thornton <art27@cantab.net>
Co-authored-by: Lunny Xiao <xiaolunwen@gmail.com>
-rw-r--r-- | modules/queue/queue_bytefifo.go | 8 | ||||
-rw-r--r-- | modules/queue/queue_disk_channel.go | 2 | ||||
-rw-r--r-- | modules/queue/queue_disk_channel_test.go | 1 | ||||
-rw-r--r-- | modules/queue/unique_queue_disk_channel.go | 13 |
4 files changed, 12 insertions, 12 deletions
diff --git a/modules/queue/queue_bytefifo.go b/modules/queue/queue_bytefifo.go index edde47a62..c4d5d20a8 100644 --- a/modules/queue/queue_bytefifo.go +++ b/modules/queue/queue_bytefifo.go @@ -195,9 +195,11 @@ loop: } } -var errQueueEmpty = fmt.Errorf("empty queue") -var errEmptyBytes = fmt.Errorf("empty bytes") -var errUnmarshal = fmt.Errorf("failed to unmarshal") +var ( + errQueueEmpty = fmt.Errorf("empty queue") + errEmptyBytes = fmt.Errorf("empty bytes") + errUnmarshal = fmt.Errorf("failed to unmarshal") +) func (q *ByteFIFOQueue) doPop() error { q.lock.Lock() diff --git a/modules/queue/queue_disk_channel.go b/modules/queue/queue_disk_channel.go index 72f330670..199f958bc 100644 --- a/modules/queue/queue_disk_channel.go +++ b/modules/queue/queue_disk_channel.go @@ -251,8 +251,8 @@ func (q *PersistableChannelQueue) Shutdown() { q.channelQueue.Wait() q.internal.(*LevelQueue).Wait() // Redirect all remaining data in the chan to the internal channel - close(q.channelQueue.dataChan) log.Trace("PersistableChannelQueue: %s Redirecting remaining data", q.delayedStarter.name) + close(q.channelQueue.dataChan) for data := range q.channelQueue.dataChan { _ = q.internal.Push(data) atomic.AddInt64(&q.channelQueue.numInQueue, -1) diff --git a/modules/queue/queue_disk_channel_test.go b/modules/queue/queue_disk_channel_test.go index c90d715a7..db12d9575 100644 --- a/modules/queue/queue_disk_channel_test.go +++ b/modules/queue/queue_disk_channel_test.go @@ -188,5 +188,4 @@ func TestPersistableChannelQueue(t *testing.T) { for _, callback := range callbacks { callback() } - } diff --git a/modules/queue/unique_queue_disk_channel.go b/modules/queue/unique_queue_disk_channel.go index af42c0913..975421a33 100644 --- a/modules/queue/unique_queue_disk_channel.go +++ b/modules/queue/unique_queue_disk_channel.go @@ -238,13 +238,12 @@ func (q *PersistableChannelUniqueQueue) Shutdown() { q.channelQueue.Wait() q.internal.(*LevelUniqueQueue).Wait() // Redirect all remaining data in the chan to the internal channel - go func() { - log.Trace("PersistableChannelUniqueQueue: %s Redirecting remaining data", q.delayedStarter.name) - for data := range q.channelQueue.dataChan { - _ = q.internal.Push(data) - } - log.Trace("PersistableChannelUniqueQueue: %s Done Redirecting remaining data", q.delayedStarter.name) - }() + close(q.channelQueue.dataChan) + log.Trace("PersistableChannelUniqueQueue: %s Redirecting remaining data", q.delayedStarter.name) + for data := range q.channelQueue.dataChan { + _ = q.internal.Push(data) + } + log.Trace("PersistableChannelUniqueQueue: %s Done Redirecting remaining data", q.delayedStarter.name) log.Debug("PersistableChannelUniqueQueue: %s Shutdown", q.delayedStarter.name) } |