diff --git a/internal/pkg/archiver/warc.go b/internal/pkg/archiver/warc.go index 38028fa4..a668d290 100644 --- a/internal/pkg/archiver/warc.go +++ b/internal/pkg/archiver/warc.go @@ -58,62 +58,31 @@ func startWARCWriter() error { IPv6AnyIP: config.Get().IPv6AnyIP, ConnReadDeadline: config.Get().ConnReadDeadline, DigestAlgorithm: warc.GetDigestFromPrefix(config.Get().WARCDigestAlgorithm), + Proxy: config.Get().Proxy, } // Instantiate WARC client var err error - if config.Get().Proxy != "" { - proxiedWARCSettings := WARCSettings - proxiedWARCSettings.Proxy = config.Get().Proxy - globalArchiver.ClientWithProxy, err = warc.NewWARCWritingHTTPClient(proxiedWARCSettings) - if err != nil { - logger.Error("unable to init proxied WARC HTTP client", "err", err.Error(), "func", "archiver.startWARCWriter") - return err - } - go func() { - for err := range globalArchiver.ClientWithProxy.ErrChan { - logger.Error("WARC writer error", "err", err.Err.Error(), "func", err.Func) - } - }() + // Instantiate WARC client + globalArchiver.Client, err = warc.NewWARCWritingHTTPClient(WARCSettings) + if err != nil { + logger.Error("unable to init WARC HTTP client", "err", err.Error(), "func", "archiver.startWARCWriter") + return err } - // Even if a proxied client has been set, we want to create an non-proxied one - if config.Get().Proxy == "" { - globalArchiver.Client, err = warc.NewWARCWritingHTTPClient(WARCSettings) - if err != nil { - logger.Error("unable to init WARC HTTP client", "err", err.Error(), "func", "archiver.startWARCWriter") - return err + go func() { + for err := range globalArchiver.Client.ErrChan { + logger.Error("WARC writer error", "err", err.Err.Error(), "func", err.Func) } - - go func() { - for err := range globalArchiver.Client.ErrChan { - logger.Error("WARC writer error", "err", err.Err.Error(), "func", err.Func) - } - }() - } + }() // Set the timeouts if config.Get().HTTPTimeout > 0 { - if globalArchiver.Client != nil { - globalArchiver.Client.Timeout = config.Get().HTTPTimeout - } - - if globalArchiver.ClientWithProxy != nil { - globalArchiver.ClientWithProxy.Timeout = config.Get().HTTPTimeout - } - } - return nil -} - -func GetClients() (clients []*warc.CustomHTTPClient) { - for _, c := range []*warc.CustomHTTPClient{globalArchiver.Client, globalArchiver.ClientWithProxy} { - if c != nil { - clients = append(clients, c) - } + globalArchiver.Client.Timeout = config.Get().HTTPTimeout } - return clients + return nil } type WARCStats struct { @@ -130,28 +99,19 @@ type WARCStats struct { func GetStats() WARCStats { var stats WARCStats - for _, c := range []*warc.CustomHTTPClient{globalArchiver.Client, globalArchiver.ClientWithProxy} { - if c != nil { - stats.WARCWritingQueueSize += int64(c.WaitGroup.Size()) - stats.WARCTotalBytesArchived += c.DataTotal.Load() - stats.CDXDedupeTotalBytes += c.CDXDedupeTotalBytes.Load() - stats.DoppelgangerDedupeTotalBytes += c.DoppelgangerDedupeTotalBytes.Load() - stats.LocalDedupeTotalBytes += c.LocalDedupeTotalBytes.Load() - stats.CDXDedupeTotal += c.CDXDedupeTotal.Load() - stats.DoppelgangerDedupeTotal += c.DoppelgangerDedupeTotal.Load() - stats.LocalDedupeTotal += c.LocalDedupeTotal.Load() - } - } + c := globalArchiver.Client + stats.WARCWritingQueueSize = int64(c.WaitGroup.Size()) + stats.WARCTotalBytesArchived = c.DataTotal.Load() + stats.CDXDedupeTotalBytes = c.CDXDedupeTotalBytes.Load() + stats.DoppelgangerDedupeTotalBytes = c.DoppelgangerDedupeTotalBytes.Load() + stats.LocalDedupeTotalBytes = c.LocalDedupeTotalBytes.Load() + stats.CDXDedupeTotal = c.CDXDedupeTotal.Load() + stats.DoppelgangerDedupeTotal = c.DoppelgangerDedupeTotal.Load() + stats.LocalDedupeTotal = c.LocalDedupeTotal.Load() return stats } // This function is used in multiple places so it can't be replaced by GetStats() func GetWARCWritingQueueSize() (total int) { - for _, c := range []*warc.CustomHTTPClient{globalArchiver.Client, globalArchiver.ClientWithProxy} { - if c != nil { - total += c.WaitGroup.Size() - } - } - - return total + return globalArchiver.Client.WaitGroup.Size() } diff --git a/internal/pkg/archiver/worker.go b/internal/pkg/archiver/worker.go index 85a1272e..ad742801 100644 --- a/internal/pkg/archiver/worker.go +++ b/internal/pkg/archiver/worker.go @@ -34,8 +34,7 @@ type archiver struct { inputCh chan *models.Item outputCh chan *models.Item - Client *warc.CustomHTTPClient - ClientWithProxy *warc.CustomHTTPClient + Client *warc.CustomHTTPClient } var ( @@ -122,14 +121,11 @@ func Stop() { } } }() + globalArchiver.Client.WaitGroup.Wait() - stopLocalWatcher <- struct{}{} logger.Debug("WARC writing finished") + stopLocalWatcher <- struct{}{} globalArchiver.Client.Close() - if globalArchiver.ClientWithProxy != nil { - globalArchiver.ClientWithProxy.WaitGroup.Wait() - globalArchiver.ClientWithProxy.Close() - } logger.Info("stopped") } @@ -218,13 +214,6 @@ func archive(workerID string, seed *models.Item) { panic(err) } - var client *warc.CustomHTTPClient - if config.Get().Proxy != "" { - client = globalArchiver.ClientWithProxy - } else { - client = globalArchiver.Client - } - for i := range items { if items[i].GetStatus() != models.ItemPreProcessed { logger.Debug("skipping item", "item_id", items[i].GetShortID(), "status", items[i].GetStatus()) @@ -236,9 +225,9 @@ func archive(workerID string, seed *models.Item) { wg.Add(1) if config.Get().Headless { - go headless.ArchiveItem(items[i], &wg, guard, globalBucketManager, client) + go headless.ArchiveItem(items[i], &wg, guard, globalBucketManager, globalArchiver.Client) } else { - go general.ArchiveItem(items[i], &wg, guard, globalBucketManager, client) + go general.ArchiveItem(items[i], &wg, guard, globalBucketManager, globalArchiver.Client) } }