]> git.saurik.com Git - apt.git/blame - apt-pkg/acquire.cc
use a static FileFd::Write overload to reduce duplication of write()-retry code
[apt.git] / apt-pkg / acquire.cc
CommitLineData
0118833a
AL
1// -*- mode: cpp; mode: fold -*-
2// Description /*{{{*/
1b480911 3// $Id: acquire.cc,v 1.50 2004/03/17 05:17:11 mdz Exp $
0118833a
AL
4/* ######################################################################
5
6 Acquire - File Acquiration
7
0a8a80e5
AL
8 The core element for the schedual system is the concept of a named
9 queue. Each queue is unique and each queue has a name derived from the
10 URI. The degree of paralization can be controled by how the queue
11 name is derived from the URI.
12
0118833a
AL
13 ##################################################################### */
14 /*}}}*/
15// Include Files /*{{{*/
ea542140
DK
16#include <config.h>
17
0118833a
AL
18#include <apt-pkg/acquire.h>
19#include <apt-pkg/acquire-item.h>
20#include <apt-pkg/acquire-worker.h>
0a8a80e5
AL
21#include <apt-pkg/configuration.h>
22#include <apt-pkg/error.h>
cdcc6d34 23#include <apt-pkg/strutl.h>
1cd1c398 24#include <apt-pkg/fileutl.h>
8267fe24 25
b4fc9b6f 26#include <iostream>
75ef8f14 27#include <sstream>
526334a0
MV
28#include <stdio.h>
29
7a7fa5f0 30#include <dirent.h>
8267fe24 31#include <sys/time.h>
524f8105 32#include <errno.h>
ea542140
DK
33
34#include <apti18n.h>
0118833a
AL
35 /*}}}*/
36
b4fc9b6f
AL
37using namespace std;
38
0118833a
AL
39// Acquire::pkgAcquire - Constructor /*{{{*/
40// ---------------------------------------------------------------------
93bf083d 41/* We grab some runtime state from the configuration space */
5efbd596 42pkgAcquire::pkgAcquire() : LockFD(-1), Queues(0), Workers(0), Configs(0), Log(NULL), ToFetch(0),
1cd1c398 43 Debug(_config->FindB("Debug::pkgAcquire",false)),
5efbd596 44 Running(false)
0118833a 45{
1cd1c398 46 string const Mode = _config->Find("Acquire::Queue-Mode","host");
0a8a80e5
AL
47 if (strcasecmp(Mode.c_str(),"host") == 0)
48 QueueMode = QueueHost;
49 if (strcasecmp(Mode.c_str(),"access") == 0)
1cd1c398
DK
50 QueueMode = QueueAccess;
51}
5efbd596 52pkgAcquire::pkgAcquire(pkgAcquireStatus *Progress) : LockFD(-1), Queues(0), Workers(0),
1cd1c398
DK
53 Configs(0), Log(Progress), ToFetch(0),
54 Debug(_config->FindB("Debug::pkgAcquire",false)),
5efbd596 55 Running(false)
1cd1c398
DK
56{
57 string const Mode = _config->Find("Acquire::Queue-Mode","host");
58 if (strcasecmp(Mode.c_str(),"host") == 0)
59 QueueMode = QueueHost;
60 if (strcasecmp(Mode.c_str(),"access") == 0)
61 QueueMode = QueueAccess;
62 Setup(Progress, "");
63}
64 /*}}}*/
65// Acquire::Setup - Delayed Constructor /*{{{*/
66// ---------------------------------------------------------------------
67/* Do everything needed to be a complete Acquire object and report the
68 success (or failure) back so the user knows that something is wrong… */
69bool pkgAcquire::Setup(pkgAcquireStatus *Progress, string const &Lock)
70{
71 Log = Progress;
0a8a80e5 72
1cd1c398 73 // check for existence and possibly create auxiliary directories
9c2c9c24
DK
74 string const listDir = _config->FindDir("Dir::State::lists");
75 string const partialListDir = listDir + "partial/";
76 string const archivesDir = _config->FindDir("Dir::Cache::Archives");
77 string const partialArchivesDir = archivesDir + "partial/";
78
7753e468
DK
79 if (CreateAPTDirectoryIfNeeded(_config->FindDir("Dir::State"), partialListDir) == false &&
80 CreateAPTDirectoryIfNeeded(listDir, partialListDir) == false)
9c2c9c24
DK
81 return _error->Errno("Acquire", _("List directory %spartial is missing."), listDir.c_str());
82
7753e468
DK
83 if (CreateAPTDirectoryIfNeeded(_config->FindDir("Dir::Cache"), partialArchivesDir) == false &&
84 CreateAPTDirectoryIfNeeded(archivesDir, partialArchivesDir) == false)
9c2c9c24 85 return _error->Errno("Acquire", _("Archives directory %spartial is missing."), archivesDir.c_str());
1cd1c398
DK
86
87 if (Lock.empty() == true || _config->FindB("Debug::NoLocking", false) == true)
88 return true;
89
90 // Lock the directory this acquire object will work in
91 LockFD = GetLock(flCombine(Lock, "lock"));
92 if (LockFD == -1)
93 return _error->Error(_("Unable to lock directory %s"), Lock.c_str());
94
95 return true;
96}
97 /*}}}*/
0118833a
AL
98// Acquire::~pkgAcquire - Destructor /*{{{*/
99// ---------------------------------------------------------------------
93bf083d 100/* Free our memory, clean up the queues (destroy the workers) */
0118833a
AL
101pkgAcquire::~pkgAcquire()
102{
459681d3 103 Shutdown();
1cd1c398
DK
104
105 if (LockFD != -1)
106 close(LockFD);
107
3b5421b4
AL
108 while (Configs != 0)
109 {
110 MethodConfig *Jnk = Configs;
111 Configs = Configs->Next;
112 delete Jnk;
113 }
281daf46
AL
114}
115 /*}}}*/
8e5fc8f5 116// Acquire::Shutdown - Clean out the acquire object /*{{{*/
281daf46
AL
117// ---------------------------------------------------------------------
118/* */
119void pkgAcquire::Shutdown()
120{
f7f0d6c7 121 while (Items.empty() == false)
1b480911
AL
122 {
123 if (Items[0]->Status == Item::StatFetching)
124 Items[0]->Status = Item::StatError;
281daf46 125 delete Items[0];
1b480911 126 }
0a8a80e5
AL
127
128 while (Queues != 0)
129 {
130 Queue *Jnk = Queues;
131 Queues = Queues->Next;
132 delete Jnk;
133 }
0118833a
AL
134}
135 /*}}}*/
136// Acquire::Add - Add a new item /*{{{*/
137// ---------------------------------------------------------------------
93bf083d
AL
138/* This puts an item on the acquire list. This list is mainly for tracking
139 item status */
0118833a
AL
140void pkgAcquire::Add(Item *Itm)
141{
142 Items.push_back(Itm);
143}
144 /*}}}*/
145// Acquire::Remove - Remove a item /*{{{*/
146// ---------------------------------------------------------------------
93bf083d 147/* Remove an item from the acquire list. This is usually not used.. */
0118833a
AL
148void pkgAcquire::Remove(Item *Itm)
149{
a3eaf954
AL
150 Dequeue(Itm);
151
753b3525 152 for (ItemIterator I = Items.begin(); I != Items.end();)
0118833a
AL
153 {
154 if (*I == Itm)
b4fc9b6f 155 {
0118833a 156 Items.erase(I);
b4fc9b6f
AL
157 I = Items.begin();
158 }
753b3525 159 else
f7f0d6c7 160 ++I;
8267fe24 161 }
0118833a
AL
162}
163 /*}}}*/
0a8a80e5
AL
164// Acquire::Add - Add a worker /*{{{*/
165// ---------------------------------------------------------------------
93bf083d
AL
166/* A list of workers is kept so that the select loop can direct their FD
167 usage. */
0a8a80e5
AL
168void pkgAcquire::Add(Worker *Work)
169{
170 Work->NextAcquire = Workers;
171 Workers = Work;
172}
173 /*}}}*/
174// Acquire::Remove - Remove a worker /*{{{*/
175// ---------------------------------------------------------------------
93bf083d
AL
176/* A worker has died. This can not be done while the select loop is running
177 as it would require that RunFds could handling a changing list state and
178 it cant.. */
0a8a80e5
AL
179void pkgAcquire::Remove(Worker *Work)
180{
93bf083d
AL
181 if (Running == true)
182 abort();
183
0a8a80e5
AL
184 Worker **I = &Workers;
185 for (; *I != 0;)
186 {
187 if (*I == Work)
188 *I = (*I)->NextAcquire;
189 else
190 I = &(*I)->NextAcquire;
191 }
192}
193 /*}}}*/
0118833a
AL
194// Acquire::Enqueue - Queue an URI for fetching /*{{{*/
195// ---------------------------------------------------------------------
93bf083d 196/* This is the entry point for an item. An item calls this function when
281daf46 197 it is constructed which creates a queue (based on the current queue
93bf083d
AL
198 mode) and puts the item in that queue. If the system is running then
199 the queue might be started. */
8267fe24 200void pkgAcquire::Enqueue(ItemDesc &Item)
0118833a 201{
0a8a80e5 202 // Determine which queue to put the item in
e331f6ed
AL
203 const MethodConfig *Config;
204 string Name = QueueName(Item.URI,Config);
0a8a80e5
AL
205 if (Name.empty() == true)
206 return;
207
208 // Find the queue structure
209 Queue *I = Queues;
210 for (; I != 0 && I->Name != Name; I = I->Next);
211 if (I == 0)
212 {
213 I = new Queue(Name,this);
214 I->Next = Queues;
215 Queues = I;
93bf083d
AL
216
217 if (Running == true)
218 I->Startup();
0a8a80e5 219 }
bfd22fc0 220
e331f6ed
AL
221 // See if this is a local only URI
222 if (Config->LocalOnly == true && Item.Owner->Complete == false)
223 Item.Owner->Local = true;
8267fe24 224 Item.Owner->Status = Item::StatIdle;
0a8a80e5
AL
225
226 // Queue it into the named queue
c03462c6
MV
227 if(I->Enqueue(Item))
228 ToFetch++;
229
0a8a80e5
AL
230 // Some trace stuff
231 if (Debug == true)
232 {
8267fe24
AL
233 clog << "Fetching " << Item.URI << endl;
234 clog << " to " << Item.Owner->DestFile << endl;
e331f6ed 235 clog << " Queue is: " << Name << endl;
0a8a80e5 236 }
3b5421b4
AL
237}
238 /*}}}*/
0a8a80e5 239// Acquire::Dequeue - Remove an item from all queues /*{{{*/
3b5421b4 240// ---------------------------------------------------------------------
93bf083d
AL
241/* This is called when an item is finished being fetched. It removes it
242 from all the queues */
0a8a80e5
AL
243void pkgAcquire::Dequeue(Item *Itm)
244{
245 Queue *I = Queues;
bfd22fc0 246 bool Res = false;
0a8a80e5 247 for (; I != 0; I = I->Next)
bfd22fc0 248 Res |= I->Dequeue(Itm);
93bf083d
AL
249
250 if (Debug == true)
251 clog << "Dequeuing " << Itm->DestFile << endl;
bfd22fc0
AL
252 if (Res == true)
253 ToFetch--;
0a8a80e5
AL
254}
255 /*}}}*/
256// Acquire::QueueName - Return the name of the queue for this URI /*{{{*/
257// ---------------------------------------------------------------------
258/* The string returned depends on the configuration settings and the
259 method parameters. Given something like http://foo.org/bar it can
260 return http://foo.org or http */
e331f6ed 261string pkgAcquire::QueueName(string Uri,MethodConfig const *&Config)
3b5421b4 262{
93bf083d
AL
263 URI U(Uri);
264
e331f6ed 265 Config = GetConfig(U.Access);
0a8a80e5
AL
266 if (Config == 0)
267 return string();
268
269 /* Single-Instance methods get exactly one queue per URI. This is
270 also used for the Access queue method */
271 if (Config->SingleInstance == true || QueueMode == QueueAccess)
b98f2859 272 return U.Access;
93bf083d
AL
273
274 return U.Access + ':' + U.Host;
0118833a
AL
275}
276 /*}}}*/
3b5421b4
AL
277// Acquire::GetConfig - Fetch the configuration information /*{{{*/
278// ---------------------------------------------------------------------
279/* This locates the configuration structure for an access method. If
280 a config structure cannot be found a Worker will be created to
281 retrieve it */
0a8a80e5 282pkgAcquire::MethodConfig *pkgAcquire::GetConfig(string Access)
3b5421b4
AL
283{
284 // Search for an existing config
285 MethodConfig *Conf;
286 for (Conf = Configs; Conf != 0; Conf = Conf->Next)
287 if (Conf->Access == Access)
288 return Conf;
289
290 // Create the new config class
291 Conf = new MethodConfig;
292 Conf->Access = Access;
293 Conf->Next = Configs;
294 Configs = Conf;
0118833a 295
3b5421b4
AL
296 // Create the worker to fetch the configuration
297 Worker Work(Conf);
298 if (Work.Start() == false)
299 return 0;
7c6e2dc7
MV
300
301 /* if a method uses DownloadLimit, we switch to SingleInstance mode */
4b65cc13 302 if(_config->FindI("Acquire::"+Access+"::Dl-Limit",0) > 0)
7c6e2dc7
MV
303 Conf->SingleInstance = true;
304
3b5421b4
AL
305 return Conf;
306}
307 /*}}}*/
0a8a80e5
AL
308// Acquire::SetFds - Deal with readable FDs /*{{{*/
309// ---------------------------------------------------------------------
310/* Collect FDs that have activity monitors into the fd sets */
311void pkgAcquire::SetFds(int &Fd,fd_set *RSet,fd_set *WSet)
312{
313 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
314 {
315 if (I->InReady == true && I->InFd >= 0)
316 {
317 if (Fd < I->InFd)
318 Fd = I->InFd;
319 FD_SET(I->InFd,RSet);
320 }
321 if (I->OutReady == true && I->OutFd >= 0)
322 {
323 if (Fd < I->OutFd)
324 Fd = I->OutFd;
325 FD_SET(I->OutFd,WSet);
326 }
327 }
328}
329 /*}}}*/
330// Acquire::RunFds - Deal with active FDs /*{{{*/
331// ---------------------------------------------------------------------
93bf083d
AL
332/* Dispatch active FDs over to the proper workers. It is very important
333 that a worker never be erased while this is running! The queue class
334 should never erase a worker except during shutdown processing. */
0a8a80e5
AL
335void pkgAcquire::RunFds(fd_set *RSet,fd_set *WSet)
336{
337 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
338 {
339 if (I->InFd >= 0 && FD_ISSET(I->InFd,RSet) != 0)
340 I->InFdReady();
341 if (I->OutFd >= 0 && FD_ISSET(I->OutFd,WSet) != 0)
342 I->OutFdReady();
343 }
344}
345 /*}}}*/
346// Acquire::Run - Run the fetch sequence /*{{{*/
347// ---------------------------------------------------------------------
348/* This runs the queues. It manages a select loop for all of the
349 Worker tasks. The workers interact with the queues and items to
350 manage the actual fetch. */
1c5f7e5f 351pkgAcquire::RunResult pkgAcquire::Run(int PulseIntervall)
0a8a80e5 352{
8b89e57f
AL
353 Running = true;
354
0a8a80e5
AL
355 for (Queue *I = Queues; I != 0; I = I->Next)
356 I->Startup();
357
b98f2859
AL
358 if (Log != 0)
359 Log->Start();
360
024d1123
AL
361 bool WasCancelled = false;
362
0a8a80e5 363 // Run till all things have been acquired
8267fe24
AL
364 struct timeval tv;
365 tv.tv_sec = 0;
1c5f7e5f 366 tv.tv_usec = PulseIntervall;
0a8a80e5
AL
367 while (ToFetch > 0)
368 {
369 fd_set RFds;
370 fd_set WFds;
371 int Highest = 0;
372 FD_ZERO(&RFds);
373 FD_ZERO(&WFds);
374 SetFds(Highest,&RFds,&WFds);
375
b0db36b1
AL
376 int Res;
377 do
378 {
379 Res = select(Highest+1,&RFds,&WFds,0,&tv);
380 }
381 while (Res < 0 && errno == EINTR);
382
8267fe24 383 if (Res < 0)
8b89e57f 384 {
8267fe24
AL
385 _error->Errno("select","Select has failed");
386 break;
8b89e57f 387 }
93bf083d 388
0a8a80e5 389 RunFds(&RFds,&WFds);
93bf083d
AL
390 if (_error->PendingError() == true)
391 break;
8267fe24
AL
392
393 // Timeout, notify the log class
394 if (Res == 0 || (Log != 0 && Log->Update == true))
395 {
1c5f7e5f 396 tv.tv_usec = PulseIntervall;
8267fe24
AL
397 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
398 I->Pulse();
024d1123
AL
399 if (Log != 0 && Log->Pulse(this) == false)
400 {
401 WasCancelled = true;
402 break;
403 }
8267fe24 404 }
0a8a80e5 405 }
be4401bf 406
b98f2859
AL
407 if (Log != 0)
408 Log->Stop();
409
be4401bf
AL
410 // Shut down the acquire bits
411 Running = false;
0a8a80e5 412 for (Queue *I = Queues; I != 0; I = I->Next)
8e5fc8f5 413 I->Shutdown(false);
0a8a80e5 414
ab559b35 415 // Shut down the items
f7f0d6c7 416 for (ItemIterator I = Items.begin(); I != Items.end(); ++I)
8e5fc8f5 417 (*I)->Finished();
ab559b35 418
024d1123
AL
419 if (_error->PendingError())
420 return Failed;
421 if (WasCancelled)
422 return Cancelled;
423 return Continue;
93bf083d
AL
424}
425 /*}}}*/
be4401bf 426// Acquire::Bump - Called when an item is dequeued /*{{{*/
93bf083d
AL
427// ---------------------------------------------------------------------
428/* This routine bumps idle queues in hopes that they will be able to fetch
429 the dequeued item */
430void pkgAcquire::Bump()
431{
be4401bf
AL
432 for (Queue *I = Queues; I != 0; I = I->Next)
433 I->Bump();
0a8a80e5
AL
434}
435 /*}}}*/
8267fe24
AL
436// Acquire::WorkerStep - Step to the next worker /*{{{*/
437// ---------------------------------------------------------------------
438/* Not inlined to advoid including acquire-worker.h */
439pkgAcquire::Worker *pkgAcquire::WorkerStep(Worker *I)
440{
441 return I->NextAcquire;
442};
443 /*}}}*/
a6568219 444// Acquire::Clean - Cleans a directory /*{{{*/
7a7fa5f0
AL
445// ---------------------------------------------------------------------
446/* This is a bit simplistic, it looks at every file in the dir and sees
447 if it is part of the download set. */
448bool pkgAcquire::Clean(string Dir)
449{
95b5f6c1
DK
450 // non-existing directories are by definition clean…
451 if (DirectoryExists(Dir) == false)
452 return true;
453
7a7fa5f0
AL
454 DIR *D = opendir(Dir.c_str());
455 if (D == 0)
b2e465d6 456 return _error->Errno("opendir",_("Unable to read %s"),Dir.c_str());
7a7fa5f0
AL
457
458 string StartDir = SafeGetCWD();
459 if (chdir(Dir.c_str()) != 0)
460 {
461 closedir(D);
b2e465d6 462 return _error->Errno("chdir",_("Unable to change to %s"),Dir.c_str());
7a7fa5f0
AL
463 }
464
465 for (struct dirent *Dir = readdir(D); Dir != 0; Dir = readdir(D))
466 {
467 // Skip some files..
468 if (strcmp(Dir->d_name,"lock") == 0 ||
469 strcmp(Dir->d_name,"partial") == 0 ||
470 strcmp(Dir->d_name,".") == 0 ||
471 strcmp(Dir->d_name,"..") == 0)
472 continue;
473
474 // Look in the get list
b4fc9b6f 475 ItemCIterator I = Items.begin();
f7f0d6c7 476 for (; I != Items.end(); ++I)
7a7fa5f0
AL
477 if (flNotDir((*I)->DestFile) == Dir->d_name)
478 break;
479
480 // Nothing found, nuke it
481 if (I == Items.end())
482 unlink(Dir->d_name);
483 };
484
7a7fa5f0 485 closedir(D);
3c8cda8b
MV
486 if (chdir(StartDir.c_str()) != 0)
487 return _error->Errno("chdir",_("Unable to change to %s"),StartDir.c_str());
7a7fa5f0
AL
488 return true;
489}
490 /*}}}*/
a6568219
AL
491// Acquire::TotalNeeded - Number of bytes to fetch /*{{{*/
492// ---------------------------------------------------------------------
493/* This is the total number of bytes needed */
a3c4c81a 494unsigned long long pkgAcquire::TotalNeeded()
a6568219 495{
a3c4c81a 496 unsigned long long Total = 0;
f7f0d6c7 497 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); ++I)
a6568219
AL
498 Total += (*I)->FileSize;
499 return Total;
500}
501 /*}}}*/
502// Acquire::FetchNeeded - Number of bytes needed to get /*{{{*/
503// ---------------------------------------------------------------------
504/* This is the number of bytes that is not local */
a3c4c81a 505unsigned long long pkgAcquire::FetchNeeded()
a6568219 506{
a3c4c81a 507 unsigned long long Total = 0;
f7f0d6c7 508 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); ++I)
a6568219
AL
509 if ((*I)->Local == false)
510 Total += (*I)->FileSize;
511 return Total;
512}
513 /*}}}*/
6b1ff003
AL
514// Acquire::PartialPresent - Number of partial bytes we already have /*{{{*/
515// ---------------------------------------------------------------------
516/* This is the number of bytes that is not local */
a3c4c81a 517unsigned long long pkgAcquire::PartialPresent()
6b1ff003 518{
a3c4c81a 519 unsigned long long Total = 0;
f7f0d6c7 520 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); ++I)
6b1ff003
AL
521 if ((*I)->Local == false)
522 Total += (*I)->PartialSize;
523 return Total;
524}
92fcbfc1 525 /*}}}*/
8e5fc8f5 526// Acquire::UriBegin - Start iterator for the uri list /*{{{*/
f7a08e33
AL
527// ---------------------------------------------------------------------
528/* */
529pkgAcquire::UriIterator pkgAcquire::UriBegin()
530{
531 return UriIterator(Queues);
532}
533 /*}}}*/
8e5fc8f5 534// Acquire::UriEnd - End iterator for the uri list /*{{{*/
f7a08e33
AL
535// ---------------------------------------------------------------------
536/* */
537pkgAcquire::UriIterator pkgAcquire::UriEnd()
538{
539 return UriIterator(0);
540}
541 /*}}}*/
e331f6ed
AL
542// Acquire::MethodConfig::MethodConfig - Constructor /*{{{*/
543// ---------------------------------------------------------------------
544/* */
545pkgAcquire::MethodConfig::MethodConfig()
546{
547 SingleInstance = false;
e331f6ed
AL
548 Pipeline = false;
549 SendConfig = false;
550 LocalOnly = false;
459681d3 551 Removable = false;
e331f6ed
AL
552 Next = 0;
553}
554 /*}}}*/
0a8a80e5
AL
555// Queue::Queue - Constructor /*{{{*/
556// ---------------------------------------------------------------------
557/* */
558pkgAcquire::Queue::Queue(string Name,pkgAcquire *Owner) : Name(Name),
559 Owner(Owner)
560{
561 Items = 0;
562 Next = 0;
563 Workers = 0;
b185acc2
AL
564 MaxPipeDepth = 1;
565 PipeDepth = 0;
0a8a80e5
AL
566}
567 /*}}}*/
568// Queue::~Queue - Destructor /*{{{*/
569// ---------------------------------------------------------------------
570/* */
571pkgAcquire::Queue::~Queue()
572{
8e5fc8f5 573 Shutdown(true);
0a8a80e5
AL
574
575 while (Items != 0)
576 {
577 QItem *Jnk = Items;
578 Items = Items->Next;
579 delete Jnk;
580 }
581}
582 /*}}}*/
583// Queue::Enqueue - Queue an item to the queue /*{{{*/
584// ---------------------------------------------------------------------
585/* */
c03462c6 586bool pkgAcquire::Queue::Enqueue(ItemDesc &Item)
0a8a80e5 587{
7a1b1f8b 588 QItem **I = &Items;
c03462c6
MV
589 // move to the end of the queue and check for duplicates here
590 for (; *I != 0; I = &(*I)->Next)
591 if (Item.URI == (*I)->URI)
592 {
593 Item.Owner->Status = Item::StatDone;
594 return false;
595 }
596
0a8a80e5 597 // Create a new item
7a1b1f8b
AL
598 QItem *Itm = new QItem;
599 *Itm = Item;
600 Itm->Next = 0;
601 *I = Itm;
0a8a80e5 602
8267fe24 603 Item.Owner->QueueCounter++;
93bf083d
AL
604 if (Items->Next == 0)
605 Cycle();
c03462c6 606 return true;
0a8a80e5
AL
607}
608 /*}}}*/
c88edf1d 609// Queue::Dequeue - Remove an item from the queue /*{{{*/
0a8a80e5 610// ---------------------------------------------------------------------
b185acc2 611/* We return true if we hit something */
bfd22fc0 612bool pkgAcquire::Queue::Dequeue(Item *Owner)
0a8a80e5 613{
b185acc2
AL
614 if (Owner->Status == pkgAcquire::Item::StatFetching)
615 return _error->Error("Tried to dequeue a fetching object");
616
bfd22fc0
AL
617 bool Res = false;
618
0a8a80e5
AL
619 QItem **I = &Items;
620 for (; *I != 0;)
621 {
622 if ((*I)->Owner == Owner)
623 {
624 QItem *Jnk= *I;
625 *I = (*I)->Next;
626 Owner->QueueCounter--;
627 delete Jnk;
bfd22fc0 628 Res = true;
0a8a80e5
AL
629 }
630 else
631 I = &(*I)->Next;
632 }
bfd22fc0
AL
633
634 return Res;
0a8a80e5
AL
635}
636 /*}}}*/
637// Queue::Startup - Start the worker processes /*{{{*/
638// ---------------------------------------------------------------------
8e5fc8f5
AL
639/* It is possible for this to be called with a pre-existing set of
640 workers. */
0a8a80e5
AL
641bool pkgAcquire::Queue::Startup()
642{
8e5fc8f5
AL
643 if (Workers == 0)
644 {
645 URI U(Name);
646 pkgAcquire::MethodConfig *Cnf = Owner->GetConfig(U.Access);
647 if (Cnf == 0)
648 return false;
649
650 Workers = new Worker(this,Cnf,Owner->Log);
651 Owner->Add(Workers);
652 if (Workers->Start() == false)
653 return false;
654
655 /* When pipelining we commit 10 items. This needs to change when we
656 added other source retry to have cycle maintain a pipeline depth
657 on its own. */
658 if (Cnf->Pipeline == true)
6ce72612 659 MaxPipeDepth = _config->FindI("Acquire::Max-Pipeline-Depth",10);
8e5fc8f5
AL
660 else
661 MaxPipeDepth = 1;
662 }
5cb5d8dc 663
93bf083d 664 return Cycle();
0a8a80e5
AL
665}
666 /*}}}*/
667// Queue::Shutdown - Shutdown the worker processes /*{{{*/
668// ---------------------------------------------------------------------
8e5fc8f5
AL
669/* If final is true then all workers are eliminated, otherwise only workers
670 that do not need cleanup are removed */
671bool pkgAcquire::Queue::Shutdown(bool Final)
0a8a80e5
AL
672{
673 // Delete all of the workers
8e5fc8f5
AL
674 pkgAcquire::Worker **Cur = &Workers;
675 while (*Cur != 0)
0a8a80e5 676 {
8e5fc8f5
AL
677 pkgAcquire::Worker *Jnk = *Cur;
678 if (Final == true || Jnk->GetConf()->NeedsCleanup == false)
679 {
680 *Cur = Jnk->NextQueue;
681 Owner->Remove(Jnk);
682 delete Jnk;
683 }
684 else
685 Cur = &(*Cur)->NextQueue;
0a8a80e5
AL
686 }
687
688 return true;
3b5421b4
AL
689}
690 /*}}}*/
7d8afa39 691// Queue::FindItem - Find a URI in the item list /*{{{*/
c88edf1d
AL
692// ---------------------------------------------------------------------
693/* */
694pkgAcquire::Queue::QItem *pkgAcquire::Queue::FindItem(string URI,pkgAcquire::Worker *Owner)
695{
696 for (QItem *I = Items; I != 0; I = I->Next)
697 if (I->URI == URI && I->Worker == Owner)
698 return I;
699 return 0;
700}
701 /*}}}*/
702// Queue::ItemDone - Item has been completed /*{{{*/
703// ---------------------------------------------------------------------
704/* The worker signals this which causes the item to be removed from the
93bf083d
AL
705 queue. If this is the last queue instance then it is removed from the
706 main queue too.*/
c88edf1d
AL
707bool pkgAcquire::Queue::ItemDone(QItem *Itm)
708{
b185acc2 709 PipeDepth--;
db890fdb
AL
710 if (Itm->Owner->Status == pkgAcquire::Item::StatFetching)
711 Itm->Owner->Status = pkgAcquire::Item::StatDone;
712
93bf083d
AL
713 if (Itm->Owner->QueueCounter <= 1)
714 Owner->Dequeue(Itm->Owner);
715 else
716 {
717 Dequeue(Itm->Owner);
718 Owner->Bump();
719 }
c88edf1d 720
93bf083d
AL
721 return Cycle();
722}
723 /*}}}*/
724// Queue::Cycle - Queue new items into the method /*{{{*/
725// ---------------------------------------------------------------------
b185acc2
AL
726/* This locates a new idle item and sends it to the worker. If pipelining
727 is enabled then it keeps the pipe full. */
93bf083d
AL
728bool pkgAcquire::Queue::Cycle()
729{
730 if (Items == 0 || Workers == 0)
c88edf1d
AL
731 return true;
732
e7432370
AL
733 if (PipeDepth < 0)
734 return _error->Error("Pipedepth failure");
735
93bf083d
AL
736 // Look for a queable item
737 QItem *I = Items;
e7432370 738 while (PipeDepth < (signed)MaxPipeDepth)
b185acc2
AL
739 {
740 for (; I != 0; I = I->Next)
741 if (I->Owner->Status == pkgAcquire::Item::StatIdle)
742 break;
743
744 // Nothing to do, queue is idle.
745 if (I == 0)
746 return true;
747
748 I->Worker = Workers;
749 I->Owner->Status = pkgAcquire::Item::StatFetching;
e7432370 750 PipeDepth++;
b185acc2
AL
751 if (Workers->QueueItem(I) == false)
752 return false;
753 }
93bf083d 754
b185acc2 755 return true;
c88edf1d
AL
756}
757 /*}}}*/
be4401bf
AL
758// Queue::Bump - Fetch any pending objects if we are idle /*{{{*/
759// ---------------------------------------------------------------------
b185acc2 760/* This is called when an item in multiple queues is dequeued */
be4401bf
AL
761void pkgAcquire::Queue::Bump()
762{
b185acc2 763 Cycle();
be4401bf
AL
764}
765 /*}}}*/
b98f2859
AL
766// AcquireStatus::pkgAcquireStatus - Constructor /*{{{*/
767// ---------------------------------------------------------------------
768/* */
dcaa1185 769pkgAcquireStatus::pkgAcquireStatus() : d(NULL), Update(true), MorePulses(false)
b98f2859
AL
770{
771 Start();
772}
773 /*}}}*/
774// AcquireStatus::Pulse - Called periodically /*{{{*/
775// ---------------------------------------------------------------------
776/* This computes some internal state variables for the derived classes to
777 use. It generates the current downloaded bytes and total bytes to download
778 as well as the current CPS estimate. */
024d1123 779bool pkgAcquireStatus::Pulse(pkgAcquire *Owner)
b98f2859
AL
780{
781 TotalBytes = 0;
782 CurrentBytes = 0;
d568ed2d
AL
783 TotalItems = 0;
784 CurrentItems = 0;
b98f2859
AL
785
786 // Compute the total number of bytes to fetch
787 unsigned int Unknown = 0;
788 unsigned int Count = 0;
b4fc9b6f 789 for (pkgAcquire::ItemCIterator I = Owner->ItemsBegin(); I != Owner->ItemsEnd();
f7f0d6c7 790 ++I, ++Count)
b98f2859 791 {
d568ed2d
AL
792 TotalItems++;
793 if ((*I)->Status == pkgAcquire::Item::StatDone)
f7f0d6c7 794 ++CurrentItems;
d568ed2d 795
a6568219
AL
796 // Totally ignore local items
797 if ((*I)->Local == true)
798 continue;
b2e465d6 799
b98f2859
AL
800 TotalBytes += (*I)->FileSize;
801 if ((*I)->Complete == true)
802 CurrentBytes += (*I)->FileSize;
803 if ((*I)->FileSize == 0 && (*I)->Complete == false)
f7f0d6c7 804 ++Unknown;
b98f2859
AL
805 }
806
807 // Compute the current completion
dbbc5494 808 unsigned long long ResumeSize = 0;
b98f2859
AL
809 for (pkgAcquire::Worker *I = Owner->WorkersBegin(); I != 0;
810 I = Owner->WorkerStep(I))
811 if (I->CurrentItem != 0 && I->CurrentItem->Owner->Complete == false)
aa0e1101
AL
812 {
813 CurrentBytes += I->CurrentSize;
814 ResumeSize += I->ResumePoint;
815
816 // Files with unknown size always have 100% completion
817 if (I->CurrentItem->Owner->FileSize == 0 &&
818 I->CurrentItem->Owner->Complete == false)
819 TotalBytes += I->CurrentSize;
820 }
821
b98f2859
AL
822 // Normalize the figures and account for unknown size downloads
823 if (TotalBytes <= 0)
824 TotalBytes = 1;
825 if (Unknown == Count)
826 TotalBytes = Unknown;
18ef0a78
AL
827
828 // Wha?! Is not supposed to happen.
829 if (CurrentBytes > TotalBytes)
830 CurrentBytes = TotalBytes;
b98f2859
AL
831
832 // Compute the CPS
833 struct timeval NewTime;
834 gettimeofday(&NewTime,0);
2ec1674d 835 if ((NewTime.tv_sec - Time.tv_sec == 6 && NewTime.tv_usec > Time.tv_usec) ||
b98f2859
AL
836 NewTime.tv_sec - Time.tv_sec > 6)
837 {
f17ac097
AL
838 double Delta = NewTime.tv_sec - Time.tv_sec +
839 (NewTime.tv_usec - Time.tv_usec)/1000000.0;
b98f2859 840
b98f2859 841 // Compute the CPS value
f17ac097 842 if (Delta < 0.01)
e331f6ed
AL
843 CurrentCPS = 0;
844 else
aa0e1101
AL
845 CurrentCPS = ((CurrentBytes - ResumeSize) - LastBytes)/Delta;
846 LastBytes = CurrentBytes - ResumeSize;
dbbc5494 847 ElapsedTime = (unsigned long long)Delta;
b98f2859
AL
848 Time = NewTime;
849 }
024d1123 850
75ef8f14
MV
851 int fd = _config->FindI("APT::Status-Fd",-1);
852 if(fd > 0)
853 {
854 ostringstream status;
855
856 char msg[200];
857 long i = CurrentItems < TotalItems ? CurrentItems + 1 : CurrentItems;
c033d415
MV
858 unsigned long long ETA = 0;
859 if(CurrentCPS > 0)
860 ETA = (TotalBytes - CurrentBytes) / CurrentCPS;
75ef8f14 861
1e8b4c0f
MV
862 // only show the ETA if it makes sense
863 if (ETA > 0 && ETA < 172800 /* two days */ )
0c508b03 864 snprintf(msg,sizeof(msg), _("Retrieving file %li of %li (%s remaining)"), i, TotalItems, TimeToStr(ETA).c_str());
1e8b4c0f 865 else
0c508b03 866 snprintf(msg,sizeof(msg), _("Retrieving file %li of %li"), i, TotalItems);
1e8b4c0f
MV
867
868
75ef8f14
MV
869
870 // build the status str
871 status << "dlstatus:" << i
872 << ":" << (CurrentBytes/float(TotalBytes)*100.0)
873 << ":" << msg
874 << endl;
31bda500
DK
875
876 std::string const dlstatus = status.str();
d68d65ad 877 FileFd::Write(fd, dlstatus.c_str(), dlstatus.size());
75ef8f14
MV
878 }
879
024d1123 880 return true;
b98f2859
AL
881}
882 /*}}}*/
883// AcquireStatus::Start - Called when the download is started /*{{{*/
884// ---------------------------------------------------------------------
885/* We just reset the counters */
886void pkgAcquireStatus::Start()
887{
888 gettimeofday(&Time,0);
889 gettimeofday(&StartTime,0);
890 LastBytes = 0;
891 CurrentCPS = 0;
892 CurrentBytes = 0;
893 TotalBytes = 0;
894 FetchedBytes = 0;
895 ElapsedTime = 0;
d568ed2d
AL
896 TotalItems = 0;
897 CurrentItems = 0;
b98f2859
AL
898}
899 /*}}}*/
a6568219 900// AcquireStatus::Stop - Finished downloading /*{{{*/
b98f2859
AL
901// ---------------------------------------------------------------------
902/* This accurately computes the elapsed time and the total overall CPS. */
903void pkgAcquireStatus::Stop()
904{
905 // Compute the CPS and elapsed time
906 struct timeval NewTime;
907 gettimeofday(&NewTime,0);
908
31a0531d
AL
909 double Delta = NewTime.tv_sec - StartTime.tv_sec +
910 (NewTime.tv_usec - StartTime.tv_usec)/1000000.0;
b98f2859 911
b98f2859 912 // Compute the CPS value
31a0531d 913 if (Delta < 0.01)
e331f6ed
AL
914 CurrentCPS = 0;
915 else
31a0531d 916 CurrentCPS = FetchedBytes/Delta;
b98f2859 917 LastBytes = CurrentBytes;
dbbc5494 918 ElapsedTime = (unsigned long long)Delta;
b98f2859
AL
919}
920 /*}}}*/
921// AcquireStatus::Fetched - Called when a byte set has been fetched /*{{{*/
922// ---------------------------------------------------------------------
923/* This is used to get accurate final transfer rate reporting. */
73da43e9 924void pkgAcquireStatus::Fetched(unsigned long long Size,unsigned long long Resume)
93274b8d 925{
b98f2859
AL
926 FetchedBytes += Size - Resume;
927}
928 /*}}}*/