aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorzeripath2022-02-22 12:08:35 +0000
committerGitHub2022-02-22 20:08:35 +0800
commit382101ecc734ed2f2b3f1ecf19ec20b003d057a9 (patch)
tree7a8d8709320b35e357bf50df9f6f4a91e8f471b6
parent86c3481effd5a5e7e9d486577df993f1b4f1adc3 (diff)
In disk_channel queues synchronously push to disk on shutdown (#18415) (#18788)
Partial Backport of #18415 Instead of using an asynchronous goroutine to push to disk on shutdown just close the datachan and immediately push to the disk. Prevents messages of incompletely flushed queues. Signed-off-by: Andrew Thornton <art27@cantab.net> Co-authored-by: Lunny Xiao <xiaolunwen@gmail.com>
-rw-r--r--modules/queue/queue_bytefifo.go8
-rw-r--r--modules/queue/queue_disk_channel.go2
-rw-r--r--modules/queue/queue_disk_channel_test.go1
-rw-r--r--modules/queue/unique_queue_disk_channel.go13
4 files changed, 12 insertions, 12 deletions
diff --git a/modules/queue/queue_bytefifo.go b/modules/queue/queue_bytefifo.go
index edde47a62..c4d5d20a8 100644
--- a/modules/queue/queue_bytefifo.go
+++ b/modules/queue/queue_bytefifo.go
@@ -195,9 +195,11 @@ loop:
}
}
-var errQueueEmpty = fmt.Errorf("empty queue")
-var errEmptyBytes = fmt.Errorf("empty bytes")
-var errUnmarshal = fmt.Errorf("failed to unmarshal")
+var (
+ errQueueEmpty = fmt.Errorf("empty queue")
+ errEmptyBytes = fmt.Errorf("empty bytes")
+ errUnmarshal = fmt.Errorf("failed to unmarshal")
+)
func (q *ByteFIFOQueue) doPop() error {
q.lock.Lock()
diff --git a/modules/queue/queue_disk_channel.go b/modules/queue/queue_disk_channel.go
index 72f330670..199f958bc 100644
--- a/modules/queue/queue_disk_channel.go
+++ b/modules/queue/queue_disk_channel.go
@@ -251,8 +251,8 @@ func (q *PersistableChannelQueue) Shutdown() {
q.channelQueue.Wait()
q.internal.(*LevelQueue).Wait()
// Redirect all remaining data in the chan to the internal channel
- close(q.channelQueue.dataChan)
log.Trace("PersistableChannelQueue: %s Redirecting remaining data", q.delayedStarter.name)
+ close(q.channelQueue.dataChan)
for data := range q.channelQueue.dataChan {
_ = q.internal.Push(data)
atomic.AddInt64(&q.channelQueue.numInQueue, -1)
diff --git a/modules/queue/queue_disk_channel_test.go b/modules/queue/queue_disk_channel_test.go
index c90d715a7..db12d9575 100644
--- a/modules/queue/queue_disk_channel_test.go
+++ b/modules/queue/queue_disk_channel_test.go
@@ -188,5 +188,4 @@ func TestPersistableChannelQueue(t *testing.T) {
for _, callback := range callbacks {
callback()
}
-
}
diff --git a/modules/queue/unique_queue_disk_channel.go b/modules/queue/unique_queue_disk_channel.go
index af42c0913..975421a33 100644
--- a/modules/queue/unique_queue_disk_channel.go
+++ b/modules/queue/unique_queue_disk_channel.go
@@ -238,13 +238,12 @@ func (q *PersistableChannelUniqueQueue) Shutdown() {
q.channelQueue.Wait()
q.internal.(*LevelUniqueQueue).Wait()
// Redirect all remaining data in the chan to the internal channel
- go func() {
- log.Trace("PersistableChannelUniqueQueue: %s Redirecting remaining data", q.delayedStarter.name)
- for data := range q.channelQueue.dataChan {
- _ = q.internal.Push(data)
- }
- log.Trace("PersistableChannelUniqueQueue: %s Done Redirecting remaining data", q.delayedStarter.name)
- }()
+ close(q.channelQueue.dataChan)
+ log.Trace("PersistableChannelUniqueQueue: %s Redirecting remaining data", q.delayedStarter.name)
+ for data := range q.channelQueue.dataChan {
+ _ = q.internal.Push(data)
+ }
+ log.Trace("PersistableChannelUniqueQueue: %s Done Redirecting remaining data", q.delayedStarter.name)
log.Debug("PersistableChannelUniqueQueue: %s Shutdown", q.delayedStarter.name)
}