]> git.saurik.com Git - apt.git/blame - apt-pkg/acquire.cc
fix: Member variable 'X' is not initialized in the constructor.
[apt.git] / apt-pkg / acquire.cc
CommitLineData
0118833a
AL
1// -*- mode: cpp; mode: fold -*-
2// Description /*{{{*/
1b480911 3// $Id: acquire.cc,v 1.50 2004/03/17 05:17:11 mdz Exp $
0118833a
AL
4/* ######################################################################
5
6 Acquire - File Acquiration
7
1e3f4083 8 The core element for the schedule system is the concept of a named
0a8a80e5 9 queue. Each queue is unique and each queue has a name derived from the
1e3f4083 10 URI. The degree of paralization can be controlled by how the queue
0a8a80e5
AL
11 name is derived from the URI.
12
0118833a
AL
13 ##################################################################### */
14 /*}}}*/
15// Include Files /*{{{*/
ea542140
DK
16#include <config.h>
17
0118833a
AL
18#include <apt-pkg/acquire.h>
19#include <apt-pkg/acquire-item.h>
20#include <apt-pkg/acquire-worker.h>
0a8a80e5
AL
21#include <apt-pkg/configuration.h>
22#include <apt-pkg/error.h>
cdcc6d34 23#include <apt-pkg/strutl.h>
1cd1c398 24#include <apt-pkg/fileutl.h>
8267fe24 25
453b82a3
DK
26#include <string>
27#include <vector>
b4fc9b6f 28#include <iostream>
75ef8f14 29#include <sstream>
526334a0 30#include <stdio.h>
453b82a3
DK
31#include <stdlib.h>
32#include <string.h>
33#include <unistd.h>
96c6cab1 34#include <iomanip>
526334a0 35
7a7fa5f0 36#include <dirent.h>
8267fe24 37#include <sys/time.h>
453b82a3 38#include <sys/select.h>
524f8105 39#include <errno.h>
ea542140
DK
40
41#include <apti18n.h>
0118833a
AL
42 /*}}}*/
43
b4fc9b6f
AL
44using namespace std;
45
0118833a
AL
46// Acquire::pkgAcquire - Constructor /*{{{*/
47// ---------------------------------------------------------------------
93bf083d 48/* We grab some runtime state from the configuration space */
5efbd596 49pkgAcquire::pkgAcquire() : LockFD(-1), Queues(0), Workers(0), Configs(0), Log(NULL), ToFetch(0),
1cd1c398 50 Debug(_config->FindB("Debug::pkgAcquire",false)),
5efbd596 51 Running(false)
0118833a 52{
1cd1c398 53 string const Mode = _config->Find("Acquire::Queue-Mode","host");
0a8a80e5
AL
54 if (strcasecmp(Mode.c_str(),"host") == 0)
55 QueueMode = QueueHost;
56 if (strcasecmp(Mode.c_str(),"access") == 0)
1cd1c398
DK
57 QueueMode = QueueAccess;
58}
5efbd596 59pkgAcquire::pkgAcquire(pkgAcquireStatus *Progress) : LockFD(-1), Queues(0), Workers(0),
1cd1c398
DK
60 Configs(0), Log(Progress), ToFetch(0),
61 Debug(_config->FindB("Debug::pkgAcquire",false)),
5efbd596 62 Running(false)
1cd1c398
DK
63{
64 string const Mode = _config->Find("Acquire::Queue-Mode","host");
65 if (strcasecmp(Mode.c_str(),"host") == 0)
66 QueueMode = QueueHost;
67 if (strcasecmp(Mode.c_str(),"access") == 0)
68 QueueMode = QueueAccess;
69 Setup(Progress, "");
70}
71 /*}}}*/
72// Acquire::Setup - Delayed Constructor /*{{{*/
73// ---------------------------------------------------------------------
74/* Do everything needed to be a complete Acquire object and report the
75 success (or failure) back so the user knows that something is wrong… */
76bool pkgAcquire::Setup(pkgAcquireStatus *Progress, string const &Lock)
77{
78 Log = Progress;
0a8a80e5 79
1cd1c398 80 // check for existence and possibly create auxiliary directories
9c2c9c24
DK
81 string const listDir = _config->FindDir("Dir::State::lists");
82 string const partialListDir = listDir + "partial/";
83 string const archivesDir = _config->FindDir("Dir::Cache::Archives");
84 string const partialArchivesDir = archivesDir + "partial/";
85
7753e468
DK
86 if (CreateAPTDirectoryIfNeeded(_config->FindDir("Dir::State"), partialListDir) == false &&
87 CreateAPTDirectoryIfNeeded(listDir, partialListDir) == false)
9c2c9c24
DK
88 return _error->Errno("Acquire", _("List directory %spartial is missing."), listDir.c_str());
89
7753e468
DK
90 if (CreateAPTDirectoryIfNeeded(_config->FindDir("Dir::Cache"), partialArchivesDir) == false &&
91 CreateAPTDirectoryIfNeeded(archivesDir, partialArchivesDir) == false)
9c2c9c24 92 return _error->Errno("Acquire", _("Archives directory %spartial is missing."), archivesDir.c_str());
1cd1c398
DK
93
94 if (Lock.empty() == true || _config->FindB("Debug::NoLocking", false) == true)
95 return true;
96
97 // Lock the directory this acquire object will work in
98 LockFD = GetLock(flCombine(Lock, "lock"));
99 if (LockFD == -1)
100 return _error->Error(_("Unable to lock directory %s"), Lock.c_str());
101
102 return true;
103}
104 /*}}}*/
0118833a
AL
105// Acquire::~pkgAcquire - Destructor /*{{{*/
106// ---------------------------------------------------------------------
93bf083d 107/* Free our memory, clean up the queues (destroy the workers) */
0118833a
AL
108pkgAcquire::~pkgAcquire()
109{
459681d3 110 Shutdown();
1cd1c398
DK
111
112 if (LockFD != -1)
113 close(LockFD);
114
3b5421b4
AL
115 while (Configs != 0)
116 {
117 MethodConfig *Jnk = Configs;
118 Configs = Configs->Next;
119 delete Jnk;
120 }
281daf46
AL
121}
122 /*}}}*/
8e5fc8f5 123// Acquire::Shutdown - Clean out the acquire object /*{{{*/
281daf46
AL
124// ---------------------------------------------------------------------
125/* */
126void pkgAcquire::Shutdown()
127{
f7f0d6c7 128 while (Items.empty() == false)
1b480911
AL
129 {
130 if (Items[0]->Status == Item::StatFetching)
131 Items[0]->Status = Item::StatError;
281daf46 132 delete Items[0];
1b480911 133 }
0a8a80e5
AL
134
135 while (Queues != 0)
136 {
137 Queue *Jnk = Queues;
138 Queues = Queues->Next;
139 delete Jnk;
140 }
0118833a
AL
141}
142 /*}}}*/
143// Acquire::Add - Add a new item /*{{{*/
144// ---------------------------------------------------------------------
93bf083d
AL
145/* This puts an item on the acquire list. This list is mainly for tracking
146 item status */
0118833a
AL
147void pkgAcquire::Add(Item *Itm)
148{
149 Items.push_back(Itm);
150}
151 /*}}}*/
152// Acquire::Remove - Remove a item /*{{{*/
153// ---------------------------------------------------------------------
93bf083d 154/* Remove an item from the acquire list. This is usually not used.. */
0118833a
AL
155void pkgAcquire::Remove(Item *Itm)
156{
a3eaf954
AL
157 Dequeue(Itm);
158
753b3525 159 for (ItemIterator I = Items.begin(); I != Items.end();)
0118833a
AL
160 {
161 if (*I == Itm)
b4fc9b6f 162 {
0118833a 163 Items.erase(I);
b4fc9b6f
AL
164 I = Items.begin();
165 }
753b3525 166 else
f7f0d6c7 167 ++I;
8267fe24 168 }
0118833a
AL
169}
170 /*}}}*/
0a8a80e5
AL
171// Acquire::Add - Add a worker /*{{{*/
172// ---------------------------------------------------------------------
93bf083d
AL
173/* A list of workers is kept so that the select loop can direct their FD
174 usage. */
0a8a80e5
AL
175void pkgAcquire::Add(Worker *Work)
176{
177 Work->NextAcquire = Workers;
178 Workers = Work;
179}
180 /*}}}*/
181// Acquire::Remove - Remove a worker /*{{{*/
182// ---------------------------------------------------------------------
93bf083d
AL
183/* A worker has died. This can not be done while the select loop is running
184 as it would require that RunFds could handling a changing list state and
1e3f4083 185 it can't.. */
0a8a80e5
AL
186void pkgAcquire::Remove(Worker *Work)
187{
93bf083d
AL
188 if (Running == true)
189 abort();
190
0a8a80e5
AL
191 Worker **I = &Workers;
192 for (; *I != 0;)
193 {
194 if (*I == Work)
195 *I = (*I)->NextAcquire;
196 else
197 I = &(*I)->NextAcquire;
198 }
199}
200 /*}}}*/
0118833a
AL
201// Acquire::Enqueue - Queue an URI for fetching /*{{{*/
202// ---------------------------------------------------------------------
93bf083d 203/* This is the entry point for an item. An item calls this function when
281daf46 204 it is constructed which creates a queue (based on the current queue
93bf083d
AL
205 mode) and puts the item in that queue. If the system is running then
206 the queue might be started. */
8267fe24 207void pkgAcquire::Enqueue(ItemDesc &Item)
0118833a 208{
0a8a80e5 209 // Determine which queue to put the item in
e331f6ed
AL
210 const MethodConfig *Config;
211 string Name = QueueName(Item.URI,Config);
0a8a80e5
AL
212 if (Name.empty() == true)
213 return;
214
215 // Find the queue structure
216 Queue *I = Queues;
217 for (; I != 0 && I->Name != Name; I = I->Next);
218 if (I == 0)
219 {
220 I = new Queue(Name,this);
221 I->Next = Queues;
222 Queues = I;
93bf083d
AL
223
224 if (Running == true)
225 I->Startup();
0a8a80e5 226 }
bfd22fc0 227
e331f6ed
AL
228 // See if this is a local only URI
229 if (Config->LocalOnly == true && Item.Owner->Complete == false)
230 Item.Owner->Local = true;
8267fe24 231 Item.Owner->Status = Item::StatIdle;
0a8a80e5
AL
232
233 // Queue it into the named queue
c03462c6
MV
234 if(I->Enqueue(Item))
235 ToFetch++;
236
0a8a80e5
AL
237 // Some trace stuff
238 if (Debug == true)
239 {
8267fe24
AL
240 clog << "Fetching " << Item.URI << endl;
241 clog << " to " << Item.Owner->DestFile << endl;
e331f6ed 242 clog << " Queue is: " << Name << endl;
0a8a80e5 243 }
3b5421b4
AL
244}
245 /*}}}*/
0a8a80e5 246// Acquire::Dequeue - Remove an item from all queues /*{{{*/
3b5421b4 247// ---------------------------------------------------------------------
93bf083d
AL
248/* This is called when an item is finished being fetched. It removes it
249 from all the queues */
0a8a80e5
AL
250void pkgAcquire::Dequeue(Item *Itm)
251{
252 Queue *I = Queues;
bfd22fc0 253 bool Res = false;
93bf083d
AL
254 if (Debug == true)
255 clog << "Dequeuing " << Itm->DestFile << endl;
5674f6b3
RG
256
257 for (; I != 0; I = I->Next)
258 {
259 if (I->Dequeue(Itm))
260 {
261 Res = true;
262 if (Debug == true)
263 clog << "Dequeued from " << I->Name << endl;
264 }
265 }
266
bfd22fc0
AL
267 if (Res == true)
268 ToFetch--;
0a8a80e5
AL
269}
270 /*}}}*/
271// Acquire::QueueName - Return the name of the queue for this URI /*{{{*/
272// ---------------------------------------------------------------------
273/* The string returned depends on the configuration settings and the
274 method parameters. Given something like http://foo.org/bar it can
275 return http://foo.org or http */
e331f6ed 276string pkgAcquire::QueueName(string Uri,MethodConfig const *&Config)
3b5421b4 277{
93bf083d
AL
278 URI U(Uri);
279
e331f6ed 280 Config = GetConfig(U.Access);
0a8a80e5
AL
281 if (Config == 0)
282 return string();
283
284 /* Single-Instance methods get exactly one queue per URI. This is
285 also used for the Access queue method */
286 if (Config->SingleInstance == true || QueueMode == QueueAccess)
5674f6b3
RG
287 return U.Access;
288
289 string AccessSchema = U.Access + ':',
290 FullQueueName = AccessSchema + U.Host;
291 unsigned int Instances = 0, SchemaLength = AccessSchema.length();
292
293 Queue *I = Queues;
294 for (; I != 0; I = I->Next) {
295 // if the queue already exists, re-use it
296 if (I->Name == FullQueueName)
297 return FullQueueName;
298
299 if (I->Name.compare(0, SchemaLength, AccessSchema) == 0)
300 Instances++;
301 }
302
303 if (Debug) {
304 clog << "Found " << Instances << " instances of " << U.Access << endl;
305 }
306
307 if (Instances >= (unsigned int)_config->FindI("Acquire::QueueHost::Limit",10))
308 return U.Access;
93bf083d 309
5674f6b3 310 return FullQueueName;
0118833a
AL
311}
312 /*}}}*/
3b5421b4
AL
313// Acquire::GetConfig - Fetch the configuration information /*{{{*/
314// ---------------------------------------------------------------------
315/* This locates the configuration structure for an access method. If
316 a config structure cannot be found a Worker will be created to
317 retrieve it */
0a8a80e5 318pkgAcquire::MethodConfig *pkgAcquire::GetConfig(string Access)
3b5421b4
AL
319{
320 // Search for an existing config
321 MethodConfig *Conf;
322 for (Conf = Configs; Conf != 0; Conf = Conf->Next)
323 if (Conf->Access == Access)
324 return Conf;
325
326 // Create the new config class
327 Conf = new MethodConfig;
328 Conf->Access = Access;
329 Conf->Next = Configs;
330 Configs = Conf;
0118833a 331
3b5421b4
AL
332 // Create the worker to fetch the configuration
333 Worker Work(Conf);
334 if (Work.Start() == false)
335 return 0;
7c6e2dc7
MV
336
337 /* if a method uses DownloadLimit, we switch to SingleInstance mode */
4b65cc13 338 if(_config->FindI("Acquire::"+Access+"::Dl-Limit",0) > 0)
7c6e2dc7
MV
339 Conf->SingleInstance = true;
340
3b5421b4
AL
341 return Conf;
342}
343 /*}}}*/
0a8a80e5
AL
344// Acquire::SetFds - Deal with readable FDs /*{{{*/
345// ---------------------------------------------------------------------
346/* Collect FDs that have activity monitors into the fd sets */
347void pkgAcquire::SetFds(int &Fd,fd_set *RSet,fd_set *WSet)
348{
349 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
350 {
351 if (I->InReady == true && I->InFd >= 0)
352 {
353 if (Fd < I->InFd)
354 Fd = I->InFd;
355 FD_SET(I->InFd,RSet);
356 }
357 if (I->OutReady == true && I->OutFd >= 0)
358 {
359 if (Fd < I->OutFd)
360 Fd = I->OutFd;
361 FD_SET(I->OutFd,WSet);
362 }
363 }
364}
365 /*}}}*/
366// Acquire::RunFds - Deal with active FDs /*{{{*/
367// ---------------------------------------------------------------------
93bf083d
AL
368/* Dispatch active FDs over to the proper workers. It is very important
369 that a worker never be erased while this is running! The queue class
370 should never erase a worker except during shutdown processing. */
0a8a80e5
AL
371void pkgAcquire::RunFds(fd_set *RSet,fd_set *WSet)
372{
373 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
374 {
375 if (I->InFd >= 0 && FD_ISSET(I->InFd,RSet) != 0)
376 I->InFdReady();
377 if (I->OutFd >= 0 && FD_ISSET(I->OutFd,WSet) != 0)
378 I->OutFdReady();
379 }
380}
381 /*}}}*/
382// Acquire::Run - Run the fetch sequence /*{{{*/
383// ---------------------------------------------------------------------
384/* This runs the queues. It manages a select loop for all of the
385 Worker tasks. The workers interact with the queues and items to
386 manage the actual fetch. */
1c5f7e5f 387pkgAcquire::RunResult pkgAcquire::Run(int PulseIntervall)
0a8a80e5 388{
8b89e57f
AL
389 Running = true;
390
0a8a80e5
AL
391 for (Queue *I = Queues; I != 0; I = I->Next)
392 I->Startup();
393
b98f2859
AL
394 if (Log != 0)
395 Log->Start();
396
024d1123
AL
397 bool WasCancelled = false;
398
0a8a80e5 399 // Run till all things have been acquired
8267fe24
AL
400 struct timeval tv;
401 tv.tv_sec = 0;
1c5f7e5f 402 tv.tv_usec = PulseIntervall;
0a8a80e5
AL
403 while (ToFetch > 0)
404 {
405 fd_set RFds;
406 fd_set WFds;
407 int Highest = 0;
408 FD_ZERO(&RFds);
409 FD_ZERO(&WFds);
410 SetFds(Highest,&RFds,&WFds);
411
b0db36b1
AL
412 int Res;
413 do
414 {
415 Res = select(Highest+1,&RFds,&WFds,0,&tv);
416 }
417 while (Res < 0 && errno == EINTR);
418
8267fe24 419 if (Res < 0)
8b89e57f 420 {
8267fe24
AL
421 _error->Errno("select","Select has failed");
422 break;
8b89e57f 423 }
93bf083d 424
0a8a80e5 425 RunFds(&RFds,&WFds);
93bf083d
AL
426 if (_error->PendingError() == true)
427 break;
8267fe24
AL
428
429 // Timeout, notify the log class
430 if (Res == 0 || (Log != 0 && Log->Update == true))
431 {
1c5f7e5f 432 tv.tv_usec = PulseIntervall;
8267fe24
AL
433 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
434 I->Pulse();
024d1123
AL
435 if (Log != 0 && Log->Pulse(this) == false)
436 {
437 WasCancelled = true;
438 break;
439 }
8267fe24 440 }
0a8a80e5 441 }
be4401bf 442
b98f2859
AL
443 if (Log != 0)
444 Log->Stop();
445
be4401bf
AL
446 // Shut down the acquire bits
447 Running = false;
0a8a80e5 448 for (Queue *I = Queues; I != 0; I = I->Next)
8e5fc8f5 449 I->Shutdown(false);
0a8a80e5 450
ab559b35 451 // Shut down the items
f7f0d6c7 452 for (ItemIterator I = Items.begin(); I != Items.end(); ++I)
8e5fc8f5 453 (*I)->Finished();
ab559b35 454
024d1123
AL
455 if (_error->PendingError())
456 return Failed;
457 if (WasCancelled)
458 return Cancelled;
459 return Continue;
93bf083d
AL
460}
461 /*}}}*/
be4401bf 462// Acquire::Bump - Called when an item is dequeued /*{{{*/
93bf083d
AL
463// ---------------------------------------------------------------------
464/* This routine bumps idle queues in hopes that they will be able to fetch
465 the dequeued item */
466void pkgAcquire::Bump()
467{
be4401bf
AL
468 for (Queue *I = Queues; I != 0; I = I->Next)
469 I->Bump();
0a8a80e5
AL
470}
471 /*}}}*/
8267fe24
AL
472// Acquire::WorkerStep - Step to the next worker /*{{{*/
473// ---------------------------------------------------------------------
474/* Not inlined to advoid including acquire-worker.h */
475pkgAcquire::Worker *pkgAcquire::WorkerStep(Worker *I)
476{
477 return I->NextAcquire;
d3e8fbb3 478}
8267fe24 479 /*}}}*/
a6568219 480// Acquire::Clean - Cleans a directory /*{{{*/
7a7fa5f0
AL
481// ---------------------------------------------------------------------
482/* This is a bit simplistic, it looks at every file in the dir and sees
483 if it is part of the download set. */
484bool pkgAcquire::Clean(string Dir)
485{
95b5f6c1
DK
486 // non-existing directories are by definition clean…
487 if (DirectoryExists(Dir) == false)
488 return true;
489
10ecfe4f
MV
490 if(Dir == "/")
491 return _error->Error(_("Clean of %s is not supported"), Dir.c_str());
492
7a7fa5f0
AL
493 DIR *D = opendir(Dir.c_str());
494 if (D == 0)
b2e465d6 495 return _error->Errno("opendir",_("Unable to read %s"),Dir.c_str());
7a7fa5f0
AL
496
497 string StartDir = SafeGetCWD();
498 if (chdir(Dir.c_str()) != 0)
499 {
500 closedir(D);
b2e465d6 501 return _error->Errno("chdir",_("Unable to change to %s"),Dir.c_str());
7a7fa5f0
AL
502 }
503
504 for (struct dirent *Dir = readdir(D); Dir != 0; Dir = readdir(D))
505 {
506 // Skip some files..
507 if (strcmp(Dir->d_name,"lock") == 0 ||
508 strcmp(Dir->d_name,"partial") == 0 ||
509 strcmp(Dir->d_name,".") == 0 ||
510 strcmp(Dir->d_name,"..") == 0)
511 continue;
512
513 // Look in the get list
b4fc9b6f 514 ItemCIterator I = Items.begin();
f7f0d6c7 515 for (; I != Items.end(); ++I)
7a7fa5f0
AL
516 if (flNotDir((*I)->DestFile) == Dir->d_name)
517 break;
518
519 // Nothing found, nuke it
520 if (I == Items.end())
521 unlink(Dir->d_name);
522 };
523
7a7fa5f0 524 closedir(D);
3c8cda8b
MV
525 if (chdir(StartDir.c_str()) != 0)
526 return _error->Errno("chdir",_("Unable to change to %s"),StartDir.c_str());
7a7fa5f0
AL
527 return true;
528}
529 /*}}}*/
a6568219
AL
530// Acquire::TotalNeeded - Number of bytes to fetch /*{{{*/
531// ---------------------------------------------------------------------
532/* This is the total number of bytes needed */
a02db58f 533APT_PURE unsigned long long pkgAcquire::TotalNeeded()
a6568219 534{
a3c4c81a 535 unsigned long long Total = 0;
f7f0d6c7 536 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); ++I)
a6568219
AL
537 Total += (*I)->FileSize;
538 return Total;
539}
540 /*}}}*/
541// Acquire::FetchNeeded - Number of bytes needed to get /*{{{*/
542// ---------------------------------------------------------------------
543/* This is the number of bytes that is not local */
a02db58f 544APT_PURE unsigned long long pkgAcquire::FetchNeeded()
a6568219 545{
a3c4c81a 546 unsigned long long Total = 0;
f7f0d6c7 547 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); ++I)
a6568219
AL
548 if ((*I)->Local == false)
549 Total += (*I)->FileSize;
550 return Total;
551}
552 /*}}}*/
6b1ff003
AL
553// Acquire::PartialPresent - Number of partial bytes we already have /*{{{*/
554// ---------------------------------------------------------------------
555/* This is the number of bytes that is not local */
a02db58f 556APT_PURE unsigned long long pkgAcquire::PartialPresent()
6b1ff003 557{
a3c4c81a 558 unsigned long long Total = 0;
f7f0d6c7 559 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); ++I)
6b1ff003
AL
560 if ((*I)->Local == false)
561 Total += (*I)->PartialSize;
562 return Total;
563}
92fcbfc1 564 /*}}}*/
8e5fc8f5 565// Acquire::UriBegin - Start iterator for the uri list /*{{{*/
f7a08e33
AL
566// ---------------------------------------------------------------------
567/* */
568pkgAcquire::UriIterator pkgAcquire::UriBegin()
569{
570 return UriIterator(Queues);
571}
572 /*}}}*/
8e5fc8f5 573// Acquire::UriEnd - End iterator for the uri list /*{{{*/
f7a08e33
AL
574// ---------------------------------------------------------------------
575/* */
576pkgAcquire::UriIterator pkgAcquire::UriEnd()
577{
578 return UriIterator(0);
579}
580 /*}}}*/
e331f6ed
AL
581// Acquire::MethodConfig::MethodConfig - Constructor /*{{{*/
582// ---------------------------------------------------------------------
583/* */
25613a61
DK
584pkgAcquire::MethodConfig::MethodConfig() : d(NULL), Next(0), SingleInstance(false),
585 Pipeline(false), SendConfig(false), LocalOnly(false), NeedsCleanup(false),
586 Removable(false)
e331f6ed 587{
e331f6ed
AL
588}
589 /*}}}*/
0a8a80e5
AL
590// Queue::Queue - Constructor /*{{{*/
591// ---------------------------------------------------------------------
592/* */
25613a61
DK
593pkgAcquire::Queue::Queue(string Name,pkgAcquire *Owner) : d(NULL), Next(0),
594 Name(Name), Items(0), Workers(0), Owner(Owner), PipeDepth(0), MaxPipeDepth(1)
0a8a80e5 595{
0a8a80e5
AL
596}
597 /*}}}*/
598// Queue::~Queue - Destructor /*{{{*/
599// ---------------------------------------------------------------------
600/* */
601pkgAcquire::Queue::~Queue()
602{
8e5fc8f5 603 Shutdown(true);
0a8a80e5
AL
604
605 while (Items != 0)
606 {
607 QItem *Jnk = Items;
608 Items = Items->Next;
609 delete Jnk;
610 }
611}
612 /*}}}*/
613// Queue::Enqueue - Queue an item to the queue /*{{{*/
614// ---------------------------------------------------------------------
615/* */
c03462c6 616bool pkgAcquire::Queue::Enqueue(ItemDesc &Item)
0a8a80e5 617{
7a1b1f8b 618 QItem **I = &Items;
c03462c6
MV
619 // move to the end of the queue and check for duplicates here
620 for (; *I != 0; I = &(*I)->Next)
621 if (Item.URI == (*I)->URI)
622 {
623 Item.Owner->Status = Item::StatDone;
624 return false;
625 }
626
0a8a80e5 627 // Create a new item
7a1b1f8b
AL
628 QItem *Itm = new QItem;
629 *Itm = Item;
630 Itm->Next = 0;
631 *I = Itm;
0a8a80e5 632
8267fe24 633 Item.Owner->QueueCounter++;
93bf083d
AL
634 if (Items->Next == 0)
635 Cycle();
c03462c6 636 return true;
0a8a80e5
AL
637}
638 /*}}}*/
c88edf1d 639// Queue::Dequeue - Remove an item from the queue /*{{{*/
0a8a80e5 640// ---------------------------------------------------------------------
b185acc2 641/* We return true if we hit something */
bfd22fc0 642bool pkgAcquire::Queue::Dequeue(Item *Owner)
0a8a80e5 643{
b185acc2
AL
644 if (Owner->Status == pkgAcquire::Item::StatFetching)
645 return _error->Error("Tried to dequeue a fetching object");
646
bfd22fc0
AL
647 bool Res = false;
648
0a8a80e5
AL
649 QItem **I = &Items;
650 for (; *I != 0;)
651 {
652 if ((*I)->Owner == Owner)
653 {
654 QItem *Jnk= *I;
655 *I = (*I)->Next;
656 Owner->QueueCounter--;
657 delete Jnk;
bfd22fc0 658 Res = true;
0a8a80e5
AL
659 }
660 else
661 I = &(*I)->Next;
662 }
bfd22fc0
AL
663
664 return Res;
0a8a80e5
AL
665}
666 /*}}}*/
667// Queue::Startup - Start the worker processes /*{{{*/
668// ---------------------------------------------------------------------
8e5fc8f5
AL
669/* It is possible for this to be called with a pre-existing set of
670 workers. */
0a8a80e5
AL
671bool pkgAcquire::Queue::Startup()
672{
8e5fc8f5
AL
673 if (Workers == 0)
674 {
675 URI U(Name);
676 pkgAcquire::MethodConfig *Cnf = Owner->GetConfig(U.Access);
677 if (Cnf == 0)
678 return false;
679
680 Workers = new Worker(this,Cnf,Owner->Log);
681 Owner->Add(Workers);
682 if (Workers->Start() == false)
683 return false;
684
685 /* When pipelining we commit 10 items. This needs to change when we
686 added other source retry to have cycle maintain a pipeline depth
687 on its own. */
688 if (Cnf->Pipeline == true)
6ce72612 689 MaxPipeDepth = _config->FindI("Acquire::Max-Pipeline-Depth",10);
8e5fc8f5
AL
690 else
691 MaxPipeDepth = 1;
692 }
5cb5d8dc 693
93bf083d 694 return Cycle();
0a8a80e5
AL
695}
696 /*}}}*/
697// Queue::Shutdown - Shutdown the worker processes /*{{{*/
698// ---------------------------------------------------------------------
8e5fc8f5
AL
699/* If final is true then all workers are eliminated, otherwise only workers
700 that do not need cleanup are removed */
701bool pkgAcquire::Queue::Shutdown(bool Final)
0a8a80e5
AL
702{
703 // Delete all of the workers
8e5fc8f5
AL
704 pkgAcquire::Worker **Cur = &Workers;
705 while (*Cur != 0)
0a8a80e5 706 {
8e5fc8f5
AL
707 pkgAcquire::Worker *Jnk = *Cur;
708 if (Final == true || Jnk->GetConf()->NeedsCleanup == false)
709 {
710 *Cur = Jnk->NextQueue;
711 Owner->Remove(Jnk);
712 delete Jnk;
713 }
714 else
715 Cur = &(*Cur)->NextQueue;
0a8a80e5
AL
716 }
717
718 return true;
3b5421b4
AL
719}
720 /*}}}*/
7d8afa39 721// Queue::FindItem - Find a URI in the item list /*{{{*/
c88edf1d
AL
722// ---------------------------------------------------------------------
723/* */
724pkgAcquire::Queue::QItem *pkgAcquire::Queue::FindItem(string URI,pkgAcquire::Worker *Owner)
725{
726 for (QItem *I = Items; I != 0; I = I->Next)
727 if (I->URI == URI && I->Worker == Owner)
728 return I;
729 return 0;
730}
731 /*}}}*/
732// Queue::ItemDone - Item has been completed /*{{{*/
733// ---------------------------------------------------------------------
734/* The worker signals this which causes the item to be removed from the
93bf083d
AL
735 queue. If this is the last queue instance then it is removed from the
736 main queue too.*/
c88edf1d
AL
737bool pkgAcquire::Queue::ItemDone(QItem *Itm)
738{
b185acc2 739 PipeDepth--;
db890fdb
AL
740 if (Itm->Owner->Status == pkgAcquire::Item::StatFetching)
741 Itm->Owner->Status = pkgAcquire::Item::StatDone;
742
93bf083d
AL
743 if (Itm->Owner->QueueCounter <= 1)
744 Owner->Dequeue(Itm->Owner);
745 else
746 {
747 Dequeue(Itm->Owner);
748 Owner->Bump();
749 }
c88edf1d 750
93bf083d
AL
751 return Cycle();
752}
753 /*}}}*/
754// Queue::Cycle - Queue new items into the method /*{{{*/
755// ---------------------------------------------------------------------
b185acc2
AL
756/* This locates a new idle item and sends it to the worker. If pipelining
757 is enabled then it keeps the pipe full. */
93bf083d
AL
758bool pkgAcquire::Queue::Cycle()
759{
760 if (Items == 0 || Workers == 0)
c88edf1d
AL
761 return true;
762
e7432370
AL
763 if (PipeDepth < 0)
764 return _error->Error("Pipedepth failure");
765
93bf083d
AL
766 // Look for a queable item
767 QItem *I = Items;
e7432370 768 while (PipeDepth < (signed)MaxPipeDepth)
b185acc2
AL
769 {
770 for (; I != 0; I = I->Next)
771 if (I->Owner->Status == pkgAcquire::Item::StatIdle)
772 break;
773
774 // Nothing to do, queue is idle.
775 if (I == 0)
776 return true;
777
778 I->Worker = Workers;
779 I->Owner->Status = pkgAcquire::Item::StatFetching;
e7432370 780 PipeDepth++;
b185acc2
AL
781 if (Workers->QueueItem(I) == false)
782 return false;
783 }
93bf083d 784
b185acc2 785 return true;
c88edf1d
AL
786}
787 /*}}}*/
be4401bf
AL
788// Queue::Bump - Fetch any pending objects if we are idle /*{{{*/
789// ---------------------------------------------------------------------
b185acc2 790/* This is called when an item in multiple queues is dequeued */
be4401bf
AL
791void pkgAcquire::Queue::Bump()
792{
b185acc2 793 Cycle();
be4401bf
AL
794}
795 /*}}}*/
b98f2859
AL
796// AcquireStatus::pkgAcquireStatus - Constructor /*{{{*/
797// ---------------------------------------------------------------------
798/* */
25613a61 799pkgAcquireStatus::pkgAcquireStatus() : d(NULL), Percent(0), Update(true), MorePulses(false)
b98f2859
AL
800{
801 Start();
802}
803 /*}}}*/
804// AcquireStatus::Pulse - Called periodically /*{{{*/
805// ---------------------------------------------------------------------
806/* This computes some internal state variables for the derived classes to
807 use. It generates the current downloaded bytes and total bytes to download
808 as well as the current CPS estimate. */
024d1123 809bool pkgAcquireStatus::Pulse(pkgAcquire *Owner)
b98f2859
AL
810{
811 TotalBytes = 0;
812 CurrentBytes = 0;
d568ed2d
AL
813 TotalItems = 0;
814 CurrentItems = 0;
b98f2859
AL
815
816 // Compute the total number of bytes to fetch
817 unsigned int Unknown = 0;
818 unsigned int Count = 0;
c6e9cc58
MV
819 bool UnfetchedReleaseFiles = false;
820 for (pkgAcquire::ItemCIterator I = Owner->ItemsBegin();
821 I != Owner->ItemsEnd();
f7f0d6c7 822 ++I, ++Count)
b98f2859 823 {
d568ed2d
AL
824 TotalItems++;
825 if ((*I)->Status == pkgAcquire::Item::StatDone)
f7f0d6c7 826 ++CurrentItems;
d568ed2d 827
a6568219
AL
828 // Totally ignore local items
829 if ((*I)->Local == true)
830 continue;
b2e465d6 831
d0cfa8ad
MV
832 // see if the method tells us to expect more
833 TotalItems += (*I)->ExpectedAdditionalItems;
834
c6e9cc58
MV
835 // check if there are unfetched Release files
836 if ((*I)->Complete == false && (*I)->ExpectedAdditionalItems > 0)
837 UnfetchedReleaseFiles = true;
838
b98f2859
AL
839 TotalBytes += (*I)->FileSize;
840 if ((*I)->Complete == true)
841 CurrentBytes += (*I)->FileSize;
842 if ((*I)->FileSize == 0 && (*I)->Complete == false)
f7f0d6c7 843 ++Unknown;
b98f2859
AL
844 }
845
846 // Compute the current completion
dbbc5494 847 unsigned long long ResumeSize = 0;
b98f2859
AL
848 for (pkgAcquire::Worker *I = Owner->WorkersBegin(); I != 0;
849 I = Owner->WorkerStep(I))
c62f7898 850 {
b98f2859 851 if (I->CurrentItem != 0 && I->CurrentItem->Owner->Complete == false)
aa0e1101
AL
852 {
853 CurrentBytes += I->CurrentSize;
854 ResumeSize += I->ResumePoint;
855
856 // Files with unknown size always have 100% completion
857 if (I->CurrentItem->Owner->FileSize == 0 &&
858 I->CurrentItem->Owner->Complete == false)
859 TotalBytes += I->CurrentSize;
860 }
c62f7898 861 }
aa0e1101 862
b98f2859
AL
863 // Normalize the figures and account for unknown size downloads
864 if (TotalBytes <= 0)
865 TotalBytes = 1;
866 if (Unknown == Count)
867 TotalBytes = Unknown;
18ef0a78
AL
868
869 // Wha?! Is not supposed to happen.
870 if (CurrentBytes > TotalBytes)
871 CurrentBytes = TotalBytes;
96c6cab1
MV
872
873 // debug
874 if (_config->FindB("Debug::acquire::progress", false) == true)
875 std::clog << " Bytes: "
876 << SizeToStr(CurrentBytes) << " / " << SizeToStr(TotalBytes)
877 << std::endl;
b98f2859
AL
878
879 // Compute the CPS
880 struct timeval NewTime;
881 gettimeofday(&NewTime,0);
2ec1674d 882 if ((NewTime.tv_sec - Time.tv_sec == 6 && NewTime.tv_usec > Time.tv_usec) ||
b98f2859
AL
883 NewTime.tv_sec - Time.tv_sec > 6)
884 {
f17ac097
AL
885 double Delta = NewTime.tv_sec - Time.tv_sec +
886 (NewTime.tv_usec - Time.tv_usec)/1000000.0;
b98f2859 887
b98f2859 888 // Compute the CPS value
f17ac097 889 if (Delta < 0.01)
e331f6ed
AL
890 CurrentCPS = 0;
891 else
aa0e1101
AL
892 CurrentCPS = ((CurrentBytes - ResumeSize) - LastBytes)/Delta;
893 LastBytes = CurrentBytes - ResumeSize;
dbbc5494 894 ElapsedTime = (unsigned long long)Delta;
b98f2859
AL
895 Time = NewTime;
896 }
024d1123 897
c6e9cc58
MV
898 // calculate the percentage, if we have too little data assume 1%
899 if (TotalBytes > 0 && UnfetchedReleaseFiles)
96c6cab1
MV
900 Percent = 0;
901 else
902 // use both files and bytes because bytes can be unreliable
903 Percent = (0.8 * (CurrentBytes/float(TotalBytes)*100.0) +
904 0.2 * (CurrentItems/float(TotalItems)*100.0));
905
75ef8f14
MV
906 int fd = _config->FindI("APT::Status-Fd",-1);
907 if(fd > 0)
908 {
909 ostringstream status;
910
911 char msg[200];
912 long i = CurrentItems < TotalItems ? CurrentItems + 1 : CurrentItems;
c033d415
MV
913 unsigned long long ETA = 0;
914 if(CurrentCPS > 0)
915 ETA = (TotalBytes - CurrentBytes) / CurrentCPS;
75ef8f14 916
1e8b4c0f
MV
917 // only show the ETA if it makes sense
918 if (ETA > 0 && ETA < 172800 /* two days */ )
0c508b03 919 snprintf(msg,sizeof(msg), _("Retrieving file %li of %li (%s remaining)"), i, TotalItems, TimeToStr(ETA).c_str());
1e8b4c0f 920 else
0c508b03 921 snprintf(msg,sizeof(msg), _("Retrieving file %li of %li"), i, TotalItems);
1e8b4c0f 922
75ef8f14
MV
923 // build the status str
924 status << "dlstatus:" << i
96c6cab1 925 << ":" << std::setprecision(3) << Percent
d0cfa8ad
MV
926 << ":" << msg
927 << endl;
31bda500
DK
928
929 std::string const dlstatus = status.str();
d68d65ad 930 FileFd::Write(fd, dlstatus.c_str(), dlstatus.size());
75ef8f14
MV
931 }
932
024d1123 933 return true;
b98f2859
AL
934}
935 /*}}}*/
936// AcquireStatus::Start - Called when the download is started /*{{{*/
937// ---------------------------------------------------------------------
938/* We just reset the counters */
939void pkgAcquireStatus::Start()
940{
941 gettimeofday(&Time,0);
942 gettimeofday(&StartTime,0);
943 LastBytes = 0;
944 CurrentCPS = 0;
945 CurrentBytes = 0;
946 TotalBytes = 0;
947 FetchedBytes = 0;
948 ElapsedTime = 0;
d568ed2d
AL
949 TotalItems = 0;
950 CurrentItems = 0;
b98f2859
AL
951}
952 /*}}}*/
a6568219 953// AcquireStatus::Stop - Finished downloading /*{{{*/
b98f2859
AL
954// ---------------------------------------------------------------------
955/* This accurately computes the elapsed time and the total overall CPS. */
956void pkgAcquireStatus::Stop()
957{
958 // Compute the CPS and elapsed time
959 struct timeval NewTime;
960 gettimeofday(&NewTime,0);
961
31a0531d
AL
962 double Delta = NewTime.tv_sec - StartTime.tv_sec +
963 (NewTime.tv_usec - StartTime.tv_usec)/1000000.0;
b98f2859 964
b98f2859 965 // Compute the CPS value
31a0531d 966 if (Delta < 0.01)
e331f6ed
AL
967 CurrentCPS = 0;
968 else
31a0531d 969 CurrentCPS = FetchedBytes/Delta;
b98f2859 970 LastBytes = CurrentBytes;
dbbc5494 971 ElapsedTime = (unsigned long long)Delta;
b98f2859
AL
972}
973 /*}}}*/
974// AcquireStatus::Fetched - Called when a byte set has been fetched /*{{{*/
975// ---------------------------------------------------------------------
976/* This is used to get accurate final transfer rate reporting. */
73da43e9 977void pkgAcquireStatus::Fetched(unsigned long long Size,unsigned long long Resume)
93274b8d 978{
b98f2859
AL
979 FetchedBytes += Size - Resume;
980}
981 /*}}}*/