]> git.saurik.com Git - apt.git/blame - apt-pkg/acquire.cc
Ignore EINVAL from prctl(PR_SET_NO_NEW_PRIVS)
[apt.git] / apt-pkg / acquire.cc
CommitLineData
0118833a
AL
1// -*- mode: cpp; mode: fold -*-
2// Description /*{{{*/
1b480911 3// $Id: acquire.cc,v 1.50 2004/03/17 05:17:11 mdz Exp $
0118833a
AL
4/* ######################################################################
5
6 Acquire - File Acquiration
7
1e3f4083 8 The core element for the schedule system is the concept of a named
0a8a80e5 9 queue. Each queue is unique and each queue has a name derived from the
1e3f4083 10 URI. The degree of paralization can be controlled by how the queue
0a8a80e5
AL
11 name is derived from the URI.
12
0118833a
AL
13 ##################################################################### */
14 /*}}}*/
15// Include Files /*{{{*/
ea542140
DK
16#include <config.h>
17
0118833a
AL
18#include <apt-pkg/acquire.h>
19#include <apt-pkg/acquire-item.h>
20#include <apt-pkg/acquire-worker.h>
0a8a80e5
AL
21#include <apt-pkg/configuration.h>
22#include <apt-pkg/error.h>
cdcc6d34 23#include <apt-pkg/strutl.h>
1cd1c398 24#include <apt-pkg/fileutl.h>
8267fe24 25
453b82a3
DK
26#include <string>
27#include <vector>
b4fc9b6f 28#include <iostream>
75ef8f14 29#include <sstream>
526334a0 30#include <stdio.h>
453b82a3
DK
31#include <stdlib.h>
32#include <string.h>
33#include <unistd.h>
96c6cab1 34#include <iomanip>
526334a0 35
7a7fa5f0 36#include <dirent.h>
8267fe24 37#include <sys/time.h>
453b82a3 38#include <sys/select.h>
524f8105 39#include <errno.h>
ea542140
DK
40
41#include <apti18n.h>
0118833a
AL
42 /*}}}*/
43
b4fc9b6f
AL
44using namespace std;
45
0118833a
AL
46// Acquire::pkgAcquire - Constructor /*{{{*/
47// ---------------------------------------------------------------------
93bf083d 48/* We grab some runtime state from the configuration space */
5efbd596 49pkgAcquire::pkgAcquire() : LockFD(-1), Queues(0), Workers(0), Configs(0), Log(NULL), ToFetch(0),
1cd1c398 50 Debug(_config->FindB("Debug::pkgAcquire",false)),
5efbd596 51 Running(false)
0118833a 52{
1cd1c398 53 string const Mode = _config->Find("Acquire::Queue-Mode","host");
0a8a80e5
AL
54 if (strcasecmp(Mode.c_str(),"host") == 0)
55 QueueMode = QueueHost;
56 if (strcasecmp(Mode.c_str(),"access") == 0)
1cd1c398
DK
57 QueueMode = QueueAccess;
58}
5efbd596 59pkgAcquire::pkgAcquire(pkgAcquireStatus *Progress) : LockFD(-1), Queues(0), Workers(0),
1cd1c398
DK
60 Configs(0), Log(Progress), ToFetch(0),
61 Debug(_config->FindB("Debug::pkgAcquire",false)),
5efbd596 62 Running(false)
1cd1c398
DK
63{
64 string const Mode = _config->Find("Acquire::Queue-Mode","host");
65 if (strcasecmp(Mode.c_str(),"host") == 0)
66 QueueMode = QueueHost;
67 if (strcasecmp(Mode.c_str(),"access") == 0)
68 QueueMode = QueueAccess;
69 Setup(Progress, "");
70}
71 /*}}}*/
72// Acquire::Setup - Delayed Constructor /*{{{*/
73// ---------------------------------------------------------------------
74/* Do everything needed to be a complete Acquire object and report the
75 success (or failure) back so the user knows that something is wrong… */
43acd019
DK
76bool pkgAcquire::Setup(pkgAcquireStatus *Progress, string const &Lock,
77 bool const createDirectories)
1cd1c398
DK
78{
79 Log = Progress;
0a8a80e5 80
1cd1c398 81 // check for existence and possibly create auxiliary directories
43acd019
DK
82 if (createDirectories == true)
83 {
84 string const listDir = _config->FindDir("Dir::State::lists");
85 string const partialListDir = listDir + "partial/";
86 string const archivesDir = _config->FindDir("Dir::Cache::Archives");
87 string const partialArchivesDir = archivesDir + "partial/";
9c2c9c24 88
43acd019
DK
89 if (CreateAPTDirectoryIfNeeded(_config->FindDir("Dir::State"), partialListDir) == false &&
90 CreateAPTDirectoryIfNeeded(listDir, partialListDir) == false)
91 return _error->Errno("Acquire", _("List directory %spartial is missing."), listDir.c_str());
9c2c9c24 92
43acd019
DK
93 if (CreateAPTDirectoryIfNeeded(_config->FindDir("Dir::Cache"), partialArchivesDir) == false &&
94 CreateAPTDirectoryIfNeeded(archivesDir, partialArchivesDir) == false)
95 return _error->Errno("Acquire", _("Archives directory %spartial is missing."), archivesDir.c_str());
96 }
1cd1c398
DK
97
98 if (Lock.empty() == true || _config->FindB("Debug::NoLocking", false) == true)
99 return true;
100
101 // Lock the directory this acquire object will work in
102 LockFD = GetLock(flCombine(Lock, "lock"));
103 if (LockFD == -1)
104 return _error->Error(_("Unable to lock directory %s"), Lock.c_str());
105
106 return true;
107}
108 /*}}}*/
0118833a
AL
109// Acquire::~pkgAcquire - Destructor /*{{{*/
110// ---------------------------------------------------------------------
93bf083d 111/* Free our memory, clean up the queues (destroy the workers) */
0118833a
AL
112pkgAcquire::~pkgAcquire()
113{
459681d3 114 Shutdown();
1cd1c398
DK
115
116 if (LockFD != -1)
117 close(LockFD);
118
3b5421b4
AL
119 while (Configs != 0)
120 {
121 MethodConfig *Jnk = Configs;
122 Configs = Configs->Next;
123 delete Jnk;
124 }
281daf46
AL
125}
126 /*}}}*/
8e5fc8f5 127// Acquire::Shutdown - Clean out the acquire object /*{{{*/
281daf46
AL
128// ---------------------------------------------------------------------
129/* */
130void pkgAcquire::Shutdown()
131{
f7f0d6c7 132 while (Items.empty() == false)
1b480911
AL
133 {
134 if (Items[0]->Status == Item::StatFetching)
135 Items[0]->Status = Item::StatError;
281daf46 136 delete Items[0];
1b480911 137 }
0a8a80e5
AL
138
139 while (Queues != 0)
140 {
141 Queue *Jnk = Queues;
142 Queues = Queues->Next;
143 delete Jnk;
144 }
0118833a
AL
145}
146 /*}}}*/
147// Acquire::Add - Add a new item /*{{{*/
148// ---------------------------------------------------------------------
93bf083d
AL
149/* This puts an item on the acquire list. This list is mainly for tracking
150 item status */
0118833a
AL
151void pkgAcquire::Add(Item *Itm)
152{
153 Items.push_back(Itm);
154}
155 /*}}}*/
156// Acquire::Remove - Remove a item /*{{{*/
157// ---------------------------------------------------------------------
93bf083d 158/* Remove an item from the acquire list. This is usually not used.. */
0118833a
AL
159void pkgAcquire::Remove(Item *Itm)
160{
a3eaf954
AL
161 Dequeue(Itm);
162
753b3525 163 for (ItemIterator I = Items.begin(); I != Items.end();)
0118833a
AL
164 {
165 if (*I == Itm)
b4fc9b6f 166 {
0118833a 167 Items.erase(I);
b4fc9b6f
AL
168 I = Items.begin();
169 }
753b3525 170 else
f7f0d6c7 171 ++I;
8267fe24 172 }
0118833a
AL
173}
174 /*}}}*/
0a8a80e5
AL
175// Acquire::Add - Add a worker /*{{{*/
176// ---------------------------------------------------------------------
93bf083d
AL
177/* A list of workers is kept so that the select loop can direct their FD
178 usage. */
0a8a80e5
AL
179void pkgAcquire::Add(Worker *Work)
180{
181 Work->NextAcquire = Workers;
182 Workers = Work;
183}
184 /*}}}*/
185// Acquire::Remove - Remove a worker /*{{{*/
186// ---------------------------------------------------------------------
93bf083d
AL
187/* A worker has died. This can not be done while the select loop is running
188 as it would require that RunFds could handling a changing list state and
1e3f4083 189 it can't.. */
0a8a80e5
AL
190void pkgAcquire::Remove(Worker *Work)
191{
93bf083d
AL
192 if (Running == true)
193 abort();
194
0a8a80e5
AL
195 Worker **I = &Workers;
196 for (; *I != 0;)
197 {
198 if (*I == Work)
199 *I = (*I)->NextAcquire;
200 else
201 I = &(*I)->NextAcquire;
202 }
203}
204 /*}}}*/
0118833a
AL
205// Acquire::Enqueue - Queue an URI for fetching /*{{{*/
206// ---------------------------------------------------------------------
93bf083d 207/* This is the entry point for an item. An item calls this function when
281daf46 208 it is constructed which creates a queue (based on the current queue
93bf083d
AL
209 mode) and puts the item in that queue. If the system is running then
210 the queue might be started. */
8267fe24 211void pkgAcquire::Enqueue(ItemDesc &Item)
0118833a 212{
0a8a80e5 213 // Determine which queue to put the item in
e331f6ed
AL
214 const MethodConfig *Config;
215 string Name = QueueName(Item.URI,Config);
0a8a80e5
AL
216 if (Name.empty() == true)
217 return;
218
219 // Find the queue structure
220 Queue *I = Queues;
221 for (; I != 0 && I->Name != Name; I = I->Next);
222 if (I == 0)
223 {
224 I = new Queue(Name,this);
225 I->Next = Queues;
226 Queues = I;
93bf083d
AL
227
228 if (Running == true)
229 I->Startup();
0a8a80e5 230 }
bfd22fc0 231
e331f6ed
AL
232 // See if this is a local only URI
233 if (Config->LocalOnly == true && Item.Owner->Complete == false)
234 Item.Owner->Local = true;
8267fe24 235 Item.Owner->Status = Item::StatIdle;
0a8a80e5
AL
236
237 // Queue it into the named queue
c03462c6
MV
238 if(I->Enqueue(Item))
239 ToFetch++;
240
0a8a80e5
AL
241 // Some trace stuff
242 if (Debug == true)
243 {
8267fe24
AL
244 clog << "Fetching " << Item.URI << endl;
245 clog << " to " << Item.Owner->DestFile << endl;
e331f6ed 246 clog << " Queue is: " << Name << endl;
0a8a80e5 247 }
3b5421b4
AL
248}
249 /*}}}*/
0a8a80e5 250// Acquire::Dequeue - Remove an item from all queues /*{{{*/
3b5421b4 251// ---------------------------------------------------------------------
93bf083d
AL
252/* This is called when an item is finished being fetched. It removes it
253 from all the queues */
0a8a80e5
AL
254void pkgAcquire::Dequeue(Item *Itm)
255{
256 Queue *I = Queues;
bfd22fc0 257 bool Res = false;
93bf083d
AL
258 if (Debug == true)
259 clog << "Dequeuing " << Itm->DestFile << endl;
5674f6b3
RG
260
261 for (; I != 0; I = I->Next)
262 {
263 if (I->Dequeue(Itm))
264 {
265 Res = true;
266 if (Debug == true)
267 clog << "Dequeued from " << I->Name << endl;
268 }
269 }
270
bfd22fc0
AL
271 if (Res == true)
272 ToFetch--;
0a8a80e5
AL
273}
274 /*}}}*/
275// Acquire::QueueName - Return the name of the queue for this URI /*{{{*/
276// ---------------------------------------------------------------------
277/* The string returned depends on the configuration settings and the
278 method parameters. Given something like http://foo.org/bar it can
279 return http://foo.org or http */
e331f6ed 280string pkgAcquire::QueueName(string Uri,MethodConfig const *&Config)
3b5421b4 281{
93bf083d
AL
282 URI U(Uri);
283
e331f6ed 284 Config = GetConfig(U.Access);
0a8a80e5
AL
285 if (Config == 0)
286 return string();
287
288 /* Single-Instance methods get exactly one queue per URI. This is
289 also used for the Access queue method */
290 if (Config->SingleInstance == true || QueueMode == QueueAccess)
5674f6b3
RG
291 return U.Access;
292
293 string AccessSchema = U.Access + ':',
294 FullQueueName = AccessSchema + U.Host;
295 unsigned int Instances = 0, SchemaLength = AccessSchema.length();
296
297 Queue *I = Queues;
298 for (; I != 0; I = I->Next) {
299 // if the queue already exists, re-use it
300 if (I->Name == FullQueueName)
301 return FullQueueName;
302
303 if (I->Name.compare(0, SchemaLength, AccessSchema) == 0)
304 Instances++;
305 }
306
307 if (Debug) {
308 clog << "Found " << Instances << " instances of " << U.Access << endl;
309 }
310
311 if (Instances >= (unsigned int)_config->FindI("Acquire::QueueHost::Limit",10))
312 return U.Access;
93bf083d 313
5674f6b3 314 return FullQueueName;
0118833a
AL
315}
316 /*}}}*/
3b5421b4
AL
317// Acquire::GetConfig - Fetch the configuration information /*{{{*/
318// ---------------------------------------------------------------------
319/* This locates the configuration structure for an access method. If
320 a config structure cannot be found a Worker will be created to
321 retrieve it */
0a8a80e5 322pkgAcquire::MethodConfig *pkgAcquire::GetConfig(string Access)
3b5421b4
AL
323{
324 // Search for an existing config
325 MethodConfig *Conf;
326 for (Conf = Configs; Conf != 0; Conf = Conf->Next)
327 if (Conf->Access == Access)
328 return Conf;
329
330 // Create the new config class
331 Conf = new MethodConfig;
332 Conf->Access = Access;
333 Conf->Next = Configs;
334 Configs = Conf;
0118833a 335
3b5421b4
AL
336 // Create the worker to fetch the configuration
337 Worker Work(Conf);
338 if (Work.Start() == false)
339 return 0;
7c6e2dc7
MV
340
341 /* if a method uses DownloadLimit, we switch to SingleInstance mode */
4b65cc13 342 if(_config->FindI("Acquire::"+Access+"::Dl-Limit",0) > 0)
7c6e2dc7
MV
343 Conf->SingleInstance = true;
344
3b5421b4
AL
345 return Conf;
346}
347 /*}}}*/
0a8a80e5
AL
348// Acquire::SetFds - Deal with readable FDs /*{{{*/
349// ---------------------------------------------------------------------
350/* Collect FDs that have activity monitors into the fd sets */
351void pkgAcquire::SetFds(int &Fd,fd_set *RSet,fd_set *WSet)
352{
353 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
354 {
355 if (I->InReady == true && I->InFd >= 0)
356 {
357 if (Fd < I->InFd)
358 Fd = I->InFd;
359 FD_SET(I->InFd,RSet);
360 }
361 if (I->OutReady == true && I->OutFd >= 0)
362 {
363 if (Fd < I->OutFd)
364 Fd = I->OutFd;
365 FD_SET(I->OutFd,WSet);
366 }
367 }
368}
369 /*}}}*/
370// Acquire::RunFds - Deal with active FDs /*{{{*/
371// ---------------------------------------------------------------------
93bf083d
AL
372/* Dispatch active FDs over to the proper workers. It is very important
373 that a worker never be erased while this is running! The queue class
374 should never erase a worker except during shutdown processing. */
0a8a80e5
AL
375void pkgAcquire::RunFds(fd_set *RSet,fd_set *WSet)
376{
377 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
378 {
379 if (I->InFd >= 0 && FD_ISSET(I->InFd,RSet) != 0)
380 I->InFdReady();
381 if (I->OutFd >= 0 && FD_ISSET(I->OutFd,WSet) != 0)
382 I->OutFdReady();
383 }
384}
385 /*}}}*/
386// Acquire::Run - Run the fetch sequence /*{{{*/
387// ---------------------------------------------------------------------
388/* This runs the queues. It manages a select loop for all of the
389 Worker tasks. The workers interact with the queues and items to
390 manage the actual fetch. */
1c5f7e5f 391pkgAcquire::RunResult pkgAcquire::Run(int PulseIntervall)
0a8a80e5 392{
8b89e57f
AL
393 Running = true;
394
0a8a80e5
AL
395 for (Queue *I = Queues; I != 0; I = I->Next)
396 I->Startup();
397
b98f2859
AL
398 if (Log != 0)
399 Log->Start();
400
024d1123
AL
401 bool WasCancelled = false;
402
0a8a80e5 403 // Run till all things have been acquired
8267fe24
AL
404 struct timeval tv;
405 tv.tv_sec = 0;
1c5f7e5f 406 tv.tv_usec = PulseIntervall;
0a8a80e5
AL
407 while (ToFetch > 0)
408 {
409 fd_set RFds;
410 fd_set WFds;
411 int Highest = 0;
412 FD_ZERO(&RFds);
413 FD_ZERO(&WFds);
414 SetFds(Highest,&RFds,&WFds);
415
b0db36b1
AL
416 int Res;
417 do
418 {
419 Res = select(Highest+1,&RFds,&WFds,0,&tv);
420 }
421 while (Res < 0 && errno == EINTR);
422
8267fe24 423 if (Res < 0)
8b89e57f 424 {
8267fe24
AL
425 _error->Errno("select","Select has failed");
426 break;
8b89e57f 427 }
93bf083d 428
0a8a80e5 429 RunFds(&RFds,&WFds);
93bf083d
AL
430 if (_error->PendingError() == true)
431 break;
8267fe24
AL
432
433 // Timeout, notify the log class
434 if (Res == 0 || (Log != 0 && Log->Update == true))
435 {
1c5f7e5f 436 tv.tv_usec = PulseIntervall;
8267fe24
AL
437 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
438 I->Pulse();
024d1123
AL
439 if (Log != 0 && Log->Pulse(this) == false)
440 {
441 WasCancelled = true;
442 break;
443 }
8267fe24 444 }
0a8a80e5 445 }
be4401bf 446
b98f2859
AL
447 if (Log != 0)
448 Log->Stop();
449
be4401bf
AL
450 // Shut down the acquire bits
451 Running = false;
0a8a80e5 452 for (Queue *I = Queues; I != 0; I = I->Next)
8e5fc8f5 453 I->Shutdown(false);
0a8a80e5 454
ab559b35 455 // Shut down the items
f7f0d6c7 456 for (ItemIterator I = Items.begin(); I != Items.end(); ++I)
8e5fc8f5 457 (*I)->Finished();
ab559b35 458
024d1123
AL
459 if (_error->PendingError())
460 return Failed;
461 if (WasCancelled)
462 return Cancelled;
463 return Continue;
93bf083d
AL
464}
465 /*}}}*/
be4401bf 466// Acquire::Bump - Called when an item is dequeued /*{{{*/
93bf083d
AL
467// ---------------------------------------------------------------------
468/* This routine bumps idle queues in hopes that they will be able to fetch
469 the dequeued item */
470void pkgAcquire::Bump()
471{
be4401bf
AL
472 for (Queue *I = Queues; I != 0; I = I->Next)
473 I->Bump();
0a8a80e5
AL
474}
475 /*}}}*/
8267fe24
AL
476// Acquire::WorkerStep - Step to the next worker /*{{{*/
477// ---------------------------------------------------------------------
478/* Not inlined to advoid including acquire-worker.h */
479pkgAcquire::Worker *pkgAcquire::WorkerStep(Worker *I)
480{
481 return I->NextAcquire;
d3e8fbb3 482}
8267fe24 483 /*}}}*/
a6568219 484// Acquire::Clean - Cleans a directory /*{{{*/
7a7fa5f0
AL
485// ---------------------------------------------------------------------
486/* This is a bit simplistic, it looks at every file in the dir and sees
487 if it is part of the download set. */
488bool pkgAcquire::Clean(string Dir)
489{
95b5f6c1
DK
490 // non-existing directories are by definition clean…
491 if (DirectoryExists(Dir) == false)
492 return true;
493
10ecfe4f
MV
494 if(Dir == "/")
495 return _error->Error(_("Clean of %s is not supported"), Dir.c_str());
496
7a7fa5f0
AL
497 DIR *D = opendir(Dir.c_str());
498 if (D == 0)
b2e465d6 499 return _error->Errno("opendir",_("Unable to read %s"),Dir.c_str());
7a7fa5f0
AL
500
501 string StartDir = SafeGetCWD();
502 if (chdir(Dir.c_str()) != 0)
503 {
504 closedir(D);
b2e465d6 505 return _error->Errno("chdir",_("Unable to change to %s"),Dir.c_str());
7a7fa5f0
AL
506 }
507
508 for (struct dirent *Dir = readdir(D); Dir != 0; Dir = readdir(D))
509 {
510 // Skip some files..
511 if (strcmp(Dir->d_name,"lock") == 0 ||
512 strcmp(Dir->d_name,"partial") == 0 ||
513 strcmp(Dir->d_name,".") == 0 ||
514 strcmp(Dir->d_name,"..") == 0)
515 continue;
516
517 // Look in the get list
b4fc9b6f 518 ItemCIterator I = Items.begin();
f7f0d6c7 519 for (; I != Items.end(); ++I)
7a7fa5f0
AL
520 if (flNotDir((*I)->DestFile) == Dir->d_name)
521 break;
522
523 // Nothing found, nuke it
524 if (I == Items.end())
525 unlink(Dir->d_name);
526 };
527
7a7fa5f0 528 closedir(D);
3c8cda8b
MV
529 if (chdir(StartDir.c_str()) != 0)
530 return _error->Errno("chdir",_("Unable to change to %s"),StartDir.c_str());
7a7fa5f0
AL
531 return true;
532}
533 /*}}}*/
a6568219
AL
534// Acquire::TotalNeeded - Number of bytes to fetch /*{{{*/
535// ---------------------------------------------------------------------
536/* This is the total number of bytes needed */
a02db58f 537APT_PURE unsigned long long pkgAcquire::TotalNeeded()
a6568219 538{
a3c4c81a 539 unsigned long long Total = 0;
f7f0d6c7 540 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); ++I)
a6568219
AL
541 Total += (*I)->FileSize;
542 return Total;
543}
544 /*}}}*/
545// Acquire::FetchNeeded - Number of bytes needed to get /*{{{*/
546// ---------------------------------------------------------------------
547/* This is the number of bytes that is not local */
a02db58f 548APT_PURE unsigned long long pkgAcquire::FetchNeeded()
a6568219 549{
a3c4c81a 550 unsigned long long Total = 0;
f7f0d6c7 551 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); ++I)
a6568219
AL
552 if ((*I)->Local == false)
553 Total += (*I)->FileSize;
554 return Total;
555}
556 /*}}}*/
6b1ff003
AL
557// Acquire::PartialPresent - Number of partial bytes we already have /*{{{*/
558// ---------------------------------------------------------------------
559/* This is the number of bytes that is not local */
a02db58f 560APT_PURE unsigned long long pkgAcquire::PartialPresent()
6b1ff003 561{
a3c4c81a 562 unsigned long long Total = 0;
f7f0d6c7 563 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); ++I)
6b1ff003
AL
564 if ((*I)->Local == false)
565 Total += (*I)->PartialSize;
566 return Total;
567}
92fcbfc1 568 /*}}}*/
8e5fc8f5 569// Acquire::UriBegin - Start iterator for the uri list /*{{{*/
f7a08e33
AL
570// ---------------------------------------------------------------------
571/* */
572pkgAcquire::UriIterator pkgAcquire::UriBegin()
573{
574 return UriIterator(Queues);
575}
576 /*}}}*/
8e5fc8f5 577// Acquire::UriEnd - End iterator for the uri list /*{{{*/
f7a08e33
AL
578// ---------------------------------------------------------------------
579/* */
580pkgAcquire::UriIterator pkgAcquire::UriEnd()
581{
582 return UriIterator(0);
583}
584 /*}}}*/
e331f6ed
AL
585// Acquire::MethodConfig::MethodConfig - Constructor /*{{{*/
586// ---------------------------------------------------------------------
587/* */
25613a61
DK
588pkgAcquire::MethodConfig::MethodConfig() : d(NULL), Next(0), SingleInstance(false),
589 Pipeline(false), SendConfig(false), LocalOnly(false), NeedsCleanup(false),
590 Removable(false)
e331f6ed 591{
e331f6ed
AL
592}
593 /*}}}*/
0a8a80e5
AL
594// Queue::Queue - Constructor /*{{{*/
595// ---------------------------------------------------------------------
596/* */
25613a61
DK
597pkgAcquire::Queue::Queue(string Name,pkgAcquire *Owner) : d(NULL), Next(0),
598 Name(Name), Items(0), Workers(0), Owner(Owner), PipeDepth(0), MaxPipeDepth(1)
0a8a80e5 599{
0a8a80e5
AL
600}
601 /*}}}*/
602// Queue::~Queue - Destructor /*{{{*/
603// ---------------------------------------------------------------------
604/* */
605pkgAcquire::Queue::~Queue()
606{
8e5fc8f5 607 Shutdown(true);
0a8a80e5
AL
608
609 while (Items != 0)
610 {
611 QItem *Jnk = Items;
612 Items = Items->Next;
613 delete Jnk;
614 }
615}
616 /*}}}*/
617// Queue::Enqueue - Queue an item to the queue /*{{{*/
618// ---------------------------------------------------------------------
619/* */
c03462c6 620bool pkgAcquire::Queue::Enqueue(ItemDesc &Item)
0a8a80e5 621{
7a1b1f8b 622 QItem **I = &Items;
c03462c6
MV
623 // move to the end of the queue and check for duplicates here
624 for (; *I != 0; I = &(*I)->Next)
625 if (Item.URI == (*I)->URI)
626 {
627 Item.Owner->Status = Item::StatDone;
628 return false;
629 }
630
0a8a80e5 631 // Create a new item
7a1b1f8b
AL
632 QItem *Itm = new QItem;
633 *Itm = Item;
634 Itm->Next = 0;
635 *I = Itm;
0a8a80e5 636
8267fe24 637 Item.Owner->QueueCounter++;
93bf083d
AL
638 if (Items->Next == 0)
639 Cycle();
c03462c6 640 return true;
0a8a80e5
AL
641}
642 /*}}}*/
c88edf1d 643// Queue::Dequeue - Remove an item from the queue /*{{{*/
0a8a80e5 644// ---------------------------------------------------------------------
b185acc2 645/* We return true if we hit something */
bfd22fc0 646bool pkgAcquire::Queue::Dequeue(Item *Owner)
0a8a80e5 647{
b185acc2
AL
648 if (Owner->Status == pkgAcquire::Item::StatFetching)
649 return _error->Error("Tried to dequeue a fetching object");
650
bfd22fc0
AL
651 bool Res = false;
652
0a8a80e5
AL
653 QItem **I = &Items;
654 for (; *I != 0;)
655 {
656 if ((*I)->Owner == Owner)
657 {
658 QItem *Jnk= *I;
659 *I = (*I)->Next;
660 Owner->QueueCounter--;
661 delete Jnk;
bfd22fc0 662 Res = true;
0a8a80e5
AL
663 }
664 else
665 I = &(*I)->Next;
666 }
bfd22fc0
AL
667
668 return Res;
0a8a80e5
AL
669}
670 /*}}}*/
671// Queue::Startup - Start the worker processes /*{{{*/
672// ---------------------------------------------------------------------
8e5fc8f5
AL
673/* It is possible for this to be called with a pre-existing set of
674 workers. */
0a8a80e5
AL
675bool pkgAcquire::Queue::Startup()
676{
8e5fc8f5
AL
677 if (Workers == 0)
678 {
679 URI U(Name);
680 pkgAcquire::MethodConfig *Cnf = Owner->GetConfig(U.Access);
681 if (Cnf == 0)
682 return false;
683
684 Workers = new Worker(this,Cnf,Owner->Log);
685 Owner->Add(Workers);
686 if (Workers->Start() == false)
687 return false;
688
689 /* When pipelining we commit 10 items. This needs to change when we
690 added other source retry to have cycle maintain a pipeline depth
691 on its own. */
692 if (Cnf->Pipeline == true)
6ce72612 693 MaxPipeDepth = _config->FindI("Acquire::Max-Pipeline-Depth",10);
8e5fc8f5
AL
694 else
695 MaxPipeDepth = 1;
696 }
5cb5d8dc 697
93bf083d 698 return Cycle();
0a8a80e5
AL
699}
700 /*}}}*/
701// Queue::Shutdown - Shutdown the worker processes /*{{{*/
702// ---------------------------------------------------------------------
8e5fc8f5
AL
703/* If final is true then all workers are eliminated, otherwise only workers
704 that do not need cleanup are removed */
705bool pkgAcquire::Queue::Shutdown(bool Final)
0a8a80e5
AL
706{
707 // Delete all of the workers
8e5fc8f5
AL
708 pkgAcquire::Worker **Cur = &Workers;
709 while (*Cur != 0)
0a8a80e5 710 {
8e5fc8f5
AL
711 pkgAcquire::Worker *Jnk = *Cur;
712 if (Final == true || Jnk->GetConf()->NeedsCleanup == false)
713 {
714 *Cur = Jnk->NextQueue;
715 Owner->Remove(Jnk);
716 delete Jnk;
717 }
718 else
719 Cur = &(*Cur)->NextQueue;
0a8a80e5
AL
720 }
721
722 return true;
3b5421b4
AL
723}
724 /*}}}*/
7d8afa39 725// Queue::FindItem - Find a URI in the item list /*{{{*/
c88edf1d
AL
726// ---------------------------------------------------------------------
727/* */
728pkgAcquire::Queue::QItem *pkgAcquire::Queue::FindItem(string URI,pkgAcquire::Worker *Owner)
729{
730 for (QItem *I = Items; I != 0; I = I->Next)
731 if (I->URI == URI && I->Worker == Owner)
732 return I;
733 return 0;
734}
735 /*}}}*/
736// Queue::ItemDone - Item has been completed /*{{{*/
737// ---------------------------------------------------------------------
738/* The worker signals this which causes the item to be removed from the
93bf083d
AL
739 queue. If this is the last queue instance then it is removed from the
740 main queue too.*/
c88edf1d
AL
741bool pkgAcquire::Queue::ItemDone(QItem *Itm)
742{
b185acc2 743 PipeDepth--;
db890fdb
AL
744 if (Itm->Owner->Status == pkgAcquire::Item::StatFetching)
745 Itm->Owner->Status = pkgAcquire::Item::StatDone;
746
93bf083d
AL
747 if (Itm->Owner->QueueCounter <= 1)
748 Owner->Dequeue(Itm->Owner);
749 else
750 {
751 Dequeue(Itm->Owner);
752 Owner->Bump();
753 }
c88edf1d 754
93bf083d
AL
755 return Cycle();
756}
757 /*}}}*/
758// Queue::Cycle - Queue new items into the method /*{{{*/
759// ---------------------------------------------------------------------
b185acc2
AL
760/* This locates a new idle item and sends it to the worker. If pipelining
761 is enabled then it keeps the pipe full. */
93bf083d
AL
762bool pkgAcquire::Queue::Cycle()
763{
764 if (Items == 0 || Workers == 0)
c88edf1d
AL
765 return true;
766
e7432370
AL
767 if (PipeDepth < 0)
768 return _error->Error("Pipedepth failure");
769
93bf083d
AL
770 // Look for a queable item
771 QItem *I = Items;
e7432370 772 while (PipeDepth < (signed)MaxPipeDepth)
b185acc2
AL
773 {
774 for (; I != 0; I = I->Next)
775 if (I->Owner->Status == pkgAcquire::Item::StatIdle)
776 break;
777
778 // Nothing to do, queue is idle.
779 if (I == 0)
780 return true;
781
782 I->Worker = Workers;
783 I->Owner->Status = pkgAcquire::Item::StatFetching;
e7432370 784 PipeDepth++;
b185acc2
AL
785 if (Workers->QueueItem(I) == false)
786 return false;
787 }
93bf083d 788
b185acc2 789 return true;
c88edf1d
AL
790}
791 /*}}}*/
be4401bf
AL
792// Queue::Bump - Fetch any pending objects if we are idle /*{{{*/
793// ---------------------------------------------------------------------
b185acc2 794/* This is called when an item in multiple queues is dequeued */
be4401bf
AL
795void pkgAcquire::Queue::Bump()
796{
b185acc2 797 Cycle();
be4401bf
AL
798}
799 /*}}}*/
b98f2859
AL
800// AcquireStatus::pkgAcquireStatus - Constructor /*{{{*/
801// ---------------------------------------------------------------------
802/* */
25613a61 803pkgAcquireStatus::pkgAcquireStatus() : d(NULL), Percent(0), Update(true), MorePulses(false)
b98f2859
AL
804{
805 Start();
806}
807 /*}}}*/
808// AcquireStatus::Pulse - Called periodically /*{{{*/
809// ---------------------------------------------------------------------
810/* This computes some internal state variables for the derived classes to
811 use. It generates the current downloaded bytes and total bytes to download
812 as well as the current CPS estimate. */
024d1123 813bool pkgAcquireStatus::Pulse(pkgAcquire *Owner)
b98f2859
AL
814{
815 TotalBytes = 0;
816 CurrentBytes = 0;
d568ed2d
AL
817 TotalItems = 0;
818 CurrentItems = 0;
b98f2859
AL
819
820 // Compute the total number of bytes to fetch
821 unsigned int Unknown = 0;
822 unsigned int Count = 0;
c6e9cc58
MV
823 bool UnfetchedReleaseFiles = false;
824 for (pkgAcquire::ItemCIterator I = Owner->ItemsBegin();
825 I != Owner->ItemsEnd();
f7f0d6c7 826 ++I, ++Count)
b98f2859 827 {
d568ed2d
AL
828 TotalItems++;
829 if ((*I)->Status == pkgAcquire::Item::StatDone)
f7f0d6c7 830 ++CurrentItems;
d568ed2d 831
a6568219
AL
832 // Totally ignore local items
833 if ((*I)->Local == true)
834 continue;
b2e465d6 835
d0cfa8ad
MV
836 // see if the method tells us to expect more
837 TotalItems += (*I)->ExpectedAdditionalItems;
838
c6e9cc58
MV
839 // check if there are unfetched Release files
840 if ((*I)->Complete == false && (*I)->ExpectedAdditionalItems > 0)
841 UnfetchedReleaseFiles = true;
842
b98f2859
AL
843 TotalBytes += (*I)->FileSize;
844 if ((*I)->Complete == true)
845 CurrentBytes += (*I)->FileSize;
846 if ((*I)->FileSize == 0 && (*I)->Complete == false)
f7f0d6c7 847 ++Unknown;
b98f2859
AL
848 }
849
850 // Compute the current completion
dbbc5494 851 unsigned long long ResumeSize = 0;
b98f2859
AL
852 for (pkgAcquire::Worker *I = Owner->WorkersBegin(); I != 0;
853 I = Owner->WorkerStep(I))
c62f7898 854 {
b98f2859 855 if (I->CurrentItem != 0 && I->CurrentItem->Owner->Complete == false)
aa0e1101
AL
856 {
857 CurrentBytes += I->CurrentSize;
858 ResumeSize += I->ResumePoint;
859
860 // Files with unknown size always have 100% completion
861 if (I->CurrentItem->Owner->FileSize == 0 &&
862 I->CurrentItem->Owner->Complete == false)
863 TotalBytes += I->CurrentSize;
864 }
c62f7898 865 }
aa0e1101 866
b98f2859
AL
867 // Normalize the figures and account for unknown size downloads
868 if (TotalBytes <= 0)
869 TotalBytes = 1;
870 if (Unknown == Count)
871 TotalBytes = Unknown;
18ef0a78
AL
872
873 // Wha?! Is not supposed to happen.
874 if (CurrentBytes > TotalBytes)
875 CurrentBytes = TotalBytes;
96c6cab1
MV
876
877 // debug
878 if (_config->FindB("Debug::acquire::progress", false) == true)
879 std::clog << " Bytes: "
880 << SizeToStr(CurrentBytes) << " / " << SizeToStr(TotalBytes)
881 << std::endl;
b98f2859
AL
882
883 // Compute the CPS
884 struct timeval NewTime;
885 gettimeofday(&NewTime,0);
2ec1674d 886 if ((NewTime.tv_sec - Time.tv_sec == 6 && NewTime.tv_usec > Time.tv_usec) ||
b98f2859
AL
887 NewTime.tv_sec - Time.tv_sec > 6)
888 {
f17ac097
AL
889 double Delta = NewTime.tv_sec - Time.tv_sec +
890 (NewTime.tv_usec - Time.tv_usec)/1000000.0;
b98f2859 891
b98f2859 892 // Compute the CPS value
f17ac097 893 if (Delta < 0.01)
e331f6ed
AL
894 CurrentCPS = 0;
895 else
aa0e1101
AL
896 CurrentCPS = ((CurrentBytes - ResumeSize) - LastBytes)/Delta;
897 LastBytes = CurrentBytes - ResumeSize;
dbbc5494 898 ElapsedTime = (unsigned long long)Delta;
b98f2859
AL
899 Time = NewTime;
900 }
024d1123 901
c6e9cc58
MV
902 // calculate the percentage, if we have too little data assume 1%
903 if (TotalBytes > 0 && UnfetchedReleaseFiles)
96c6cab1
MV
904 Percent = 0;
905 else
906 // use both files and bytes because bytes can be unreliable
907 Percent = (0.8 * (CurrentBytes/float(TotalBytes)*100.0) +
908 0.2 * (CurrentItems/float(TotalItems)*100.0));
909
75ef8f14
MV
910 int fd = _config->FindI("APT::Status-Fd",-1);
911 if(fd > 0)
912 {
913 ostringstream status;
914
915 char msg[200];
916 long i = CurrentItems < TotalItems ? CurrentItems + 1 : CurrentItems;
c033d415
MV
917 unsigned long long ETA = 0;
918 if(CurrentCPS > 0)
919 ETA = (TotalBytes - CurrentBytes) / CurrentCPS;
75ef8f14 920
1e8b4c0f
MV
921 // only show the ETA if it makes sense
922 if (ETA > 0 && ETA < 172800 /* two days */ )
0c508b03 923 snprintf(msg,sizeof(msg), _("Retrieving file %li of %li (%s remaining)"), i, TotalItems, TimeToStr(ETA).c_str());
1e8b4c0f 924 else
0c508b03 925 snprintf(msg,sizeof(msg), _("Retrieving file %li of %li"), i, TotalItems);
1e8b4c0f 926
75ef8f14
MV
927 // build the status str
928 status << "dlstatus:" << i
96c6cab1 929 << ":" << std::setprecision(3) << Percent
d0cfa8ad
MV
930 << ":" << msg
931 << endl;
31bda500
DK
932
933 std::string const dlstatus = status.str();
d68d65ad 934 FileFd::Write(fd, dlstatus.c_str(), dlstatus.size());
75ef8f14
MV
935 }
936
024d1123 937 return true;
b98f2859
AL
938}
939 /*}}}*/
940// AcquireStatus::Start - Called when the download is started /*{{{*/
941// ---------------------------------------------------------------------
942/* We just reset the counters */
943void pkgAcquireStatus::Start()
944{
945 gettimeofday(&Time,0);
946 gettimeofday(&StartTime,0);
947 LastBytes = 0;
948 CurrentCPS = 0;
949 CurrentBytes = 0;
950 TotalBytes = 0;
951 FetchedBytes = 0;
952 ElapsedTime = 0;
d568ed2d
AL
953 TotalItems = 0;
954 CurrentItems = 0;
b98f2859
AL
955}
956 /*}}}*/
a6568219 957// AcquireStatus::Stop - Finished downloading /*{{{*/
b98f2859
AL
958// ---------------------------------------------------------------------
959/* This accurately computes the elapsed time and the total overall CPS. */
960void pkgAcquireStatus::Stop()
961{
962 // Compute the CPS and elapsed time
963 struct timeval NewTime;
964 gettimeofday(&NewTime,0);
965
31a0531d
AL
966 double Delta = NewTime.tv_sec - StartTime.tv_sec +
967 (NewTime.tv_usec - StartTime.tv_usec)/1000000.0;
b98f2859 968
b98f2859 969 // Compute the CPS value
31a0531d 970 if (Delta < 0.01)
e331f6ed
AL
971 CurrentCPS = 0;
972 else
31a0531d 973 CurrentCPS = FetchedBytes/Delta;
b98f2859 974 LastBytes = CurrentBytes;
dbbc5494 975 ElapsedTime = (unsigned long long)Delta;
b98f2859
AL
976}
977 /*}}}*/
978// AcquireStatus::Fetched - Called when a byte set has been fetched /*{{{*/
979// ---------------------------------------------------------------------
980/* This is used to get accurate final transfer rate reporting. */
73da43e9 981void pkgAcquireStatus::Fetched(unsigned long long Size,unsigned long long Resume)
93274b8d 982{
b98f2859
AL
983 FetchedBytes += Size - Resume;
984}
985 /*}}}*/