]> git.saurik.com Git - apt.git/blame - apt-pkg/acquire.cc
mark internal interfaces as hidden
[apt.git] / apt-pkg / acquire.cc
CommitLineData
0118833a
AL
1// -*- mode: cpp; mode: fold -*-
2// Description /*{{{*/
1b480911 3// $Id: acquire.cc,v 1.50 2004/03/17 05:17:11 mdz Exp $
0118833a
AL
4/* ######################################################################
5
6 Acquire - File Acquiration
7
1e3f4083 8 The core element for the schedule system is the concept of a named
0a8a80e5 9 queue. Each queue is unique and each queue has a name derived from the
1e3f4083 10 URI. The degree of paralization can be controlled by how the queue
0a8a80e5
AL
11 name is derived from the URI.
12
0118833a
AL
13 ##################################################################### */
14 /*}}}*/
15// Include Files /*{{{*/
ea542140
DK
16#include <config.h>
17
0118833a
AL
18#include <apt-pkg/acquire.h>
19#include <apt-pkg/acquire-item.h>
20#include <apt-pkg/acquire-worker.h>
0a8a80e5
AL
21#include <apt-pkg/configuration.h>
22#include <apt-pkg/error.h>
cdcc6d34 23#include <apt-pkg/strutl.h>
1cd1c398 24#include <apt-pkg/fileutl.h>
8267fe24 25
453b82a3
DK
26#include <string>
27#include <vector>
b4fc9b6f 28#include <iostream>
75ef8f14 29#include <sstream>
04a54261
DK
30#include <iomanip>
31
526334a0 32#include <stdio.h>
453b82a3
DK
33#include <stdlib.h>
34#include <string.h>
35#include <unistd.h>
04a54261
DK
36#include <pwd.h>
37#include <grp.h>
7a7fa5f0 38#include <dirent.h>
8267fe24 39#include <sys/time.h>
453b82a3 40#include <sys/select.h>
524f8105 41#include <errno.h>
56472095 42#include <sys/stat.h>
04a54261 43#include <sys/types.h>
ea542140
DK
44
45#include <apti18n.h>
0118833a
AL
46 /*}}}*/
47
b4fc9b6f
AL
48using namespace std;
49
0118833a
AL
50// Acquire::pkgAcquire - Constructor /*{{{*/
51// ---------------------------------------------------------------------
93bf083d 52/* We grab some runtime state from the configuration space */
5efbd596 53pkgAcquire::pkgAcquire() : LockFD(-1), Queues(0), Workers(0), Configs(0), Log(NULL), ToFetch(0),
1cd1c398 54 Debug(_config->FindB("Debug::pkgAcquire",false)),
5efbd596 55 Running(false)
0118833a 56{
03aa0847 57 Initialize();
1cd1c398 58}
04a54261
DK
59pkgAcquire::pkgAcquire(pkgAcquireStatus *Progress) : LockFD(-1), Queues(0), Workers(0),
60 Configs(0), Log(NULL), ToFetch(0),
1cd1c398 61 Debug(_config->FindB("Debug::pkgAcquire",false)),
5efbd596 62 Running(false)
03aa0847
DK
63{
64 Initialize();
65 SetLog(Progress);
66}
67void pkgAcquire::Initialize()
1cd1c398
DK
68{
69 string const Mode = _config->Find("Acquire::Queue-Mode","host");
70 if (strcasecmp(Mode.c_str(),"host") == 0)
71 QueueMode = QueueHost;
72 if (strcasecmp(Mode.c_str(),"access") == 0)
73 QueueMode = QueueAccess;
03aa0847
DK
74
75 // chown the auth.conf file as it will be accessed by our methods
76 std::string const SandboxUser = _config->Find("APT::Sandbox::User");
77 if (getuid() == 0 && SandboxUser.empty() == false) // if we aren't root, we can't chown, so don't try it
78 {
79 struct passwd const * const pw = getpwnam(SandboxUser.c_str());
80 struct group const * const gr = getgrnam("root");
81 if (pw != NULL && gr != NULL)
82 {
83 std::string const AuthConf = _config->FindFile("Dir::Etc::netrc");
84 if(AuthConf.empty() == false && RealFileExists(AuthConf) &&
85 chown(AuthConf.c_str(), pw->pw_uid, gr->gr_gid) != 0)
86 _error->WarningE("SetupAPTPartialDirectory", "chown to %s:root of file %s failed", SandboxUser.c_str(), AuthConf.c_str());
87 }
88 }
1cd1c398
DK
89}
90 /*}}}*/
04a54261
DK
91// Acquire::GetLock - lock directory and prepare for action /*{{{*/
92static bool SetupAPTPartialDirectory(std::string const &grand, std::string const &parent)
1cd1c398 93{
04a54261
DK
94 std::string const partial = parent + "partial";
95 if (CreateAPTDirectoryIfNeeded(grand, partial) == false &&
96 CreateAPTDirectoryIfNeeded(parent, partial) == false)
97 return false;
0a8a80e5 98
03aa0847
DK
99 std::string const SandboxUser = _config->Find("APT::Sandbox::User");
100 if (getuid() == 0 && SandboxUser.empty() == false) // if we aren't root, we can't chown, so don't try it
04a54261 101 {
03aa0847
DK
102 struct passwd const * const pw = getpwnam(SandboxUser.c_str());
103 struct group const * const gr = getgrnam("root");
1924b1e5
MV
104 if (pw != NULL && gr != NULL)
105 {
106 // chown the partial dir
107 if(chown(partial.c_str(), pw->pw_uid, gr->gr_gid) != 0)
108 _error->WarningE("SetupAPTPartialDirectory", "chown to %s:root of directory %s failed", SandboxUser.c_str(), partial.c_str());
1924b1e5 109 }
04a54261
DK
110 }
111 if (chmod(partial.c_str(), 0700) != 0)
112 _error->WarningE("SetupAPTPartialDirectory", "chmod 0700 of directory %s failed", partial.c_str());
113
114 return true;
115}
116bool pkgAcquire::Setup(pkgAcquireStatus *Progress, string const &Lock)
117{
118 Log = Progress;
119 if (Lock.empty())
43acd019
DK
120 {
121 string const listDir = _config->FindDir("Dir::State::lists");
04a54261
DK
122 if (SetupAPTPartialDirectory(_config->FindDir("Dir::State"), listDir) == false)
123 return _error->Errno("Acquire", _("List directory %spartial is missing."), listDir.c_str());
43acd019 124 string const archivesDir = _config->FindDir("Dir::Cache::Archives");
04a54261
DK
125 if (SetupAPTPartialDirectory(_config->FindDir("Dir::Cache"), archivesDir) == false)
126 return _error->Errno("Acquire", _("Archives directory %spartial is missing."), archivesDir.c_str());
127 return true;
128 }
129 return GetLock(Lock);
130}
131bool pkgAcquire::GetLock(std::string const &Lock)
132{
133 if (Lock.empty() == true)
134 return false;
9c2c9c24 135
04a54261
DK
136 // check for existence and possibly create auxiliary directories
137 string const listDir = _config->FindDir("Dir::State::lists");
138 string const archivesDir = _config->FindDir("Dir::Cache::Archives");
9c2c9c24 139
04a54261
DK
140 if (Lock == listDir)
141 {
142 if (SetupAPTPartialDirectory(_config->FindDir("Dir::State"), listDir) == false)
143 return _error->Errno("Acquire", _("List directory %spartial is missing."), listDir.c_str());
144 }
145 if (Lock == archivesDir)
146 {
147 if (SetupAPTPartialDirectory(_config->FindDir("Dir::Cache"), archivesDir) == false)
43acd019
DK
148 return _error->Errno("Acquire", _("Archives directory %spartial is missing."), archivesDir.c_str());
149 }
1cd1c398 150
04a54261 151 if (_config->FindB("Debug::NoLocking", false) == true)
1cd1c398
DK
152 return true;
153
154 // Lock the directory this acquire object will work in
04a54261 155 LockFD = ::GetLock(flCombine(Lock, "lock"));
1cd1c398
DK
156 if (LockFD == -1)
157 return _error->Error(_("Unable to lock directory %s"), Lock.c_str());
158
159 return true;
160}
161 /*}}}*/
0118833a
AL
162// Acquire::~pkgAcquire - Destructor /*{{{*/
163// ---------------------------------------------------------------------
93bf083d 164/* Free our memory, clean up the queues (destroy the workers) */
0118833a
AL
165pkgAcquire::~pkgAcquire()
166{
459681d3 167 Shutdown();
1cd1c398
DK
168
169 if (LockFD != -1)
170 close(LockFD);
171
3b5421b4
AL
172 while (Configs != 0)
173 {
174 MethodConfig *Jnk = Configs;
175 Configs = Configs->Next;
176 delete Jnk;
177 }
281daf46
AL
178}
179 /*}}}*/
8e5fc8f5 180// Acquire::Shutdown - Clean out the acquire object /*{{{*/
281daf46
AL
181// ---------------------------------------------------------------------
182/* */
183void pkgAcquire::Shutdown()
184{
f7f0d6c7 185 while (Items.empty() == false)
1b480911
AL
186 {
187 if (Items[0]->Status == Item::StatFetching)
188 Items[0]->Status = Item::StatError;
281daf46 189 delete Items[0];
1b480911 190 }
0a8a80e5
AL
191
192 while (Queues != 0)
193 {
194 Queue *Jnk = Queues;
195 Queues = Queues->Next;
196 delete Jnk;
197 }
0118833a
AL
198}
199 /*}}}*/
200// Acquire::Add - Add a new item /*{{{*/
201// ---------------------------------------------------------------------
93bf083d
AL
202/* This puts an item on the acquire list. This list is mainly for tracking
203 item status */
0118833a
AL
204void pkgAcquire::Add(Item *Itm)
205{
206 Items.push_back(Itm);
207}
208 /*}}}*/
209// Acquire::Remove - Remove a item /*{{{*/
210// ---------------------------------------------------------------------
93bf083d 211/* Remove an item from the acquire list. This is usually not used.. */
0118833a
AL
212void pkgAcquire::Remove(Item *Itm)
213{
a3eaf954
AL
214 Dequeue(Itm);
215
753b3525 216 for (ItemIterator I = Items.begin(); I != Items.end();)
0118833a
AL
217 {
218 if (*I == Itm)
b4fc9b6f 219 {
0118833a 220 Items.erase(I);
b4fc9b6f
AL
221 I = Items.begin();
222 }
753b3525 223 else
f7f0d6c7 224 ++I;
8267fe24 225 }
0118833a
AL
226}
227 /*}}}*/
0a8a80e5
AL
228// Acquire::Add - Add a worker /*{{{*/
229// ---------------------------------------------------------------------
93bf083d
AL
230/* A list of workers is kept so that the select loop can direct their FD
231 usage. */
0a8a80e5
AL
232void pkgAcquire::Add(Worker *Work)
233{
234 Work->NextAcquire = Workers;
235 Workers = Work;
236}
237 /*}}}*/
238// Acquire::Remove - Remove a worker /*{{{*/
239// ---------------------------------------------------------------------
93bf083d
AL
240/* A worker has died. This can not be done while the select loop is running
241 as it would require that RunFds could handling a changing list state and
1e3f4083 242 it can't.. */
0a8a80e5
AL
243void pkgAcquire::Remove(Worker *Work)
244{
93bf083d
AL
245 if (Running == true)
246 abort();
247
0a8a80e5
AL
248 Worker **I = &Workers;
249 for (; *I != 0;)
250 {
251 if (*I == Work)
252 *I = (*I)->NextAcquire;
253 else
254 I = &(*I)->NextAcquire;
255 }
256}
257 /*}}}*/
0118833a
AL
258// Acquire::Enqueue - Queue an URI for fetching /*{{{*/
259// ---------------------------------------------------------------------
93bf083d 260/* This is the entry point for an item. An item calls this function when
281daf46 261 it is constructed which creates a queue (based on the current queue
93bf083d
AL
262 mode) and puts the item in that queue. If the system is running then
263 the queue might be started. */
8267fe24 264void pkgAcquire::Enqueue(ItemDesc &Item)
0118833a 265{
0a8a80e5 266 // Determine which queue to put the item in
e331f6ed
AL
267 const MethodConfig *Config;
268 string Name = QueueName(Item.URI,Config);
0a8a80e5
AL
269 if (Name.empty() == true)
270 return;
271
272 // Find the queue structure
273 Queue *I = Queues;
274 for (; I != 0 && I->Name != Name; I = I->Next);
275 if (I == 0)
276 {
277 I = new Queue(Name,this);
278 I->Next = Queues;
279 Queues = I;
93bf083d
AL
280
281 if (Running == true)
282 I->Startup();
0a8a80e5 283 }
bfd22fc0 284
e331f6ed
AL
285 // See if this is a local only URI
286 if (Config->LocalOnly == true && Item.Owner->Complete == false)
287 Item.Owner->Local = true;
8267fe24 288 Item.Owner->Status = Item::StatIdle;
0a8a80e5
AL
289
290 // Queue it into the named queue
c03462c6
MV
291 if(I->Enqueue(Item))
292 ToFetch++;
293
0a8a80e5
AL
294 // Some trace stuff
295 if (Debug == true)
296 {
8267fe24
AL
297 clog << "Fetching " << Item.URI << endl;
298 clog << " to " << Item.Owner->DestFile << endl;
e331f6ed 299 clog << " Queue is: " << Name << endl;
0a8a80e5 300 }
3b5421b4
AL
301}
302 /*}}}*/
0a8a80e5 303// Acquire::Dequeue - Remove an item from all queues /*{{{*/
3b5421b4 304// ---------------------------------------------------------------------
93bf083d
AL
305/* This is called when an item is finished being fetched. It removes it
306 from all the queues */
0a8a80e5
AL
307void pkgAcquire::Dequeue(Item *Itm)
308{
309 Queue *I = Queues;
bfd22fc0 310 bool Res = false;
93bf083d
AL
311 if (Debug == true)
312 clog << "Dequeuing " << Itm->DestFile << endl;
5674f6b3
RG
313
314 for (; I != 0; I = I->Next)
315 {
316 if (I->Dequeue(Itm))
317 {
318 Res = true;
319 if (Debug == true)
320 clog << "Dequeued from " << I->Name << endl;
321 }
322 }
323
bfd22fc0
AL
324 if (Res == true)
325 ToFetch--;
0a8a80e5
AL
326}
327 /*}}}*/
328// Acquire::QueueName - Return the name of the queue for this URI /*{{{*/
329// ---------------------------------------------------------------------
330/* The string returned depends on the configuration settings and the
331 method parameters. Given something like http://foo.org/bar it can
332 return http://foo.org or http */
e331f6ed 333string pkgAcquire::QueueName(string Uri,MethodConfig const *&Config)
3b5421b4 334{
93bf083d
AL
335 URI U(Uri);
336
e331f6ed 337 Config = GetConfig(U.Access);
0a8a80e5
AL
338 if (Config == 0)
339 return string();
340
341 /* Single-Instance methods get exactly one queue per URI. This is
342 also used for the Access queue method */
343 if (Config->SingleInstance == true || QueueMode == QueueAccess)
5674f6b3
RG
344 return U.Access;
345
346 string AccessSchema = U.Access + ':',
347 FullQueueName = AccessSchema + U.Host;
348 unsigned int Instances = 0, SchemaLength = AccessSchema.length();
349
350 Queue *I = Queues;
351 for (; I != 0; I = I->Next) {
352 // if the queue already exists, re-use it
353 if (I->Name == FullQueueName)
354 return FullQueueName;
355
356 if (I->Name.compare(0, SchemaLength, AccessSchema) == 0)
357 Instances++;
358 }
359
360 if (Debug) {
361 clog << "Found " << Instances << " instances of " << U.Access << endl;
362 }
363
364 if (Instances >= (unsigned int)_config->FindI("Acquire::QueueHost::Limit",10))
365 return U.Access;
93bf083d 366
5674f6b3 367 return FullQueueName;
0118833a
AL
368}
369 /*}}}*/
3b5421b4
AL
370// Acquire::GetConfig - Fetch the configuration information /*{{{*/
371// ---------------------------------------------------------------------
372/* This locates the configuration structure for an access method. If
373 a config structure cannot be found a Worker will be created to
374 retrieve it */
0a8a80e5 375pkgAcquire::MethodConfig *pkgAcquire::GetConfig(string Access)
3b5421b4
AL
376{
377 // Search for an existing config
378 MethodConfig *Conf;
379 for (Conf = Configs; Conf != 0; Conf = Conf->Next)
380 if (Conf->Access == Access)
381 return Conf;
382
383 // Create the new config class
384 Conf = new MethodConfig;
385 Conf->Access = Access;
386 Conf->Next = Configs;
387 Configs = Conf;
0118833a 388
3b5421b4
AL
389 // Create the worker to fetch the configuration
390 Worker Work(Conf);
391 if (Work.Start() == false)
392 return 0;
7c6e2dc7
MV
393
394 /* if a method uses DownloadLimit, we switch to SingleInstance mode */
4b65cc13 395 if(_config->FindI("Acquire::"+Access+"::Dl-Limit",0) > 0)
7c6e2dc7
MV
396 Conf->SingleInstance = true;
397
3b5421b4
AL
398 return Conf;
399}
400 /*}}}*/
0a8a80e5
AL
401// Acquire::SetFds - Deal with readable FDs /*{{{*/
402// ---------------------------------------------------------------------
403/* Collect FDs that have activity monitors into the fd sets */
404void pkgAcquire::SetFds(int &Fd,fd_set *RSet,fd_set *WSet)
405{
406 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
407 {
408 if (I->InReady == true && I->InFd >= 0)
409 {
410 if (Fd < I->InFd)
411 Fd = I->InFd;
412 FD_SET(I->InFd,RSet);
413 }
414 if (I->OutReady == true && I->OutFd >= 0)
415 {
416 if (Fd < I->OutFd)
417 Fd = I->OutFd;
418 FD_SET(I->OutFd,WSet);
419 }
420 }
421}
422 /*}}}*/
423// Acquire::RunFds - Deal with active FDs /*{{{*/
424// ---------------------------------------------------------------------
93bf083d
AL
425/* Dispatch active FDs over to the proper workers. It is very important
426 that a worker never be erased while this is running! The queue class
427 should never erase a worker except during shutdown processing. */
0a8a80e5
AL
428void pkgAcquire::RunFds(fd_set *RSet,fd_set *WSet)
429{
430 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
431 {
432 if (I->InFd >= 0 && FD_ISSET(I->InFd,RSet) != 0)
433 I->InFdReady();
434 if (I->OutFd >= 0 && FD_ISSET(I->OutFd,WSet) != 0)
435 I->OutFdReady();
436 }
437}
438 /*}}}*/
439// Acquire::Run - Run the fetch sequence /*{{{*/
440// ---------------------------------------------------------------------
441/* This runs the queues. It manages a select loop for all of the
442 Worker tasks. The workers interact with the queues and items to
443 manage the actual fetch. */
1c5f7e5f 444pkgAcquire::RunResult pkgAcquire::Run(int PulseIntervall)
0a8a80e5 445{
8b89e57f
AL
446 Running = true;
447
0a8a80e5
AL
448 for (Queue *I = Queues; I != 0; I = I->Next)
449 I->Startup();
450
b98f2859
AL
451 if (Log != 0)
452 Log->Start();
453
024d1123
AL
454 bool WasCancelled = false;
455
0a8a80e5 456 // Run till all things have been acquired
8267fe24
AL
457 struct timeval tv;
458 tv.tv_sec = 0;
1c5f7e5f 459 tv.tv_usec = PulseIntervall;
0a8a80e5
AL
460 while (ToFetch > 0)
461 {
462 fd_set RFds;
463 fd_set WFds;
464 int Highest = 0;
465 FD_ZERO(&RFds);
466 FD_ZERO(&WFds);
467 SetFds(Highest,&RFds,&WFds);
468
b0db36b1
AL
469 int Res;
470 do
471 {
472 Res = select(Highest+1,&RFds,&WFds,0,&tv);
473 }
474 while (Res < 0 && errno == EINTR);
475
8267fe24 476 if (Res < 0)
8b89e57f 477 {
8267fe24
AL
478 _error->Errno("select","Select has failed");
479 break;
8b89e57f 480 }
93bf083d 481
0a8a80e5 482 RunFds(&RFds,&WFds);
93bf083d
AL
483 if (_error->PendingError() == true)
484 break;
8267fe24
AL
485
486 // Timeout, notify the log class
487 if (Res == 0 || (Log != 0 && Log->Update == true))
488 {
1c5f7e5f 489 tv.tv_usec = PulseIntervall;
8267fe24
AL
490 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
491 I->Pulse();
024d1123
AL
492 if (Log != 0 && Log->Pulse(this) == false)
493 {
494 WasCancelled = true;
495 break;
496 }
8267fe24 497 }
0a8a80e5 498 }
be4401bf 499
b98f2859
AL
500 if (Log != 0)
501 Log->Stop();
502
be4401bf
AL
503 // Shut down the acquire bits
504 Running = false;
0a8a80e5 505 for (Queue *I = Queues; I != 0; I = I->Next)
8e5fc8f5 506 I->Shutdown(false);
0a8a80e5 507
ab559b35 508 // Shut down the items
f7f0d6c7 509 for (ItemIterator I = Items.begin(); I != Items.end(); ++I)
8e5fc8f5 510 (*I)->Finished();
ab559b35 511
024d1123
AL
512 if (_error->PendingError())
513 return Failed;
514 if (WasCancelled)
515 return Cancelled;
516 return Continue;
93bf083d
AL
517}
518 /*}}}*/
be4401bf 519// Acquire::Bump - Called when an item is dequeued /*{{{*/
93bf083d
AL
520// ---------------------------------------------------------------------
521/* This routine bumps idle queues in hopes that they will be able to fetch
522 the dequeued item */
523void pkgAcquire::Bump()
524{
be4401bf
AL
525 for (Queue *I = Queues; I != 0; I = I->Next)
526 I->Bump();
0a8a80e5
AL
527}
528 /*}}}*/
8267fe24
AL
529// Acquire::WorkerStep - Step to the next worker /*{{{*/
530// ---------------------------------------------------------------------
531/* Not inlined to advoid including acquire-worker.h */
532pkgAcquire::Worker *pkgAcquire::WorkerStep(Worker *I)
533{
534 return I->NextAcquire;
d3e8fbb3 535}
8267fe24 536 /*}}}*/
a6568219 537// Acquire::Clean - Cleans a directory /*{{{*/
7a7fa5f0
AL
538// ---------------------------------------------------------------------
539/* This is a bit simplistic, it looks at every file in the dir and sees
540 if it is part of the download set. */
541bool pkgAcquire::Clean(string Dir)
542{
95b5f6c1
DK
543 // non-existing directories are by definition clean…
544 if (DirectoryExists(Dir) == false)
545 return true;
546
10ecfe4f
MV
547 if(Dir == "/")
548 return _error->Error(_("Clean of %s is not supported"), Dir.c_str());
549
7a7fa5f0
AL
550 DIR *D = opendir(Dir.c_str());
551 if (D == 0)
b2e465d6 552 return _error->Errno("opendir",_("Unable to read %s"),Dir.c_str());
7a7fa5f0
AL
553
554 string StartDir = SafeGetCWD();
555 if (chdir(Dir.c_str()) != 0)
556 {
557 closedir(D);
b2e465d6 558 return _error->Errno("chdir",_("Unable to change to %s"),Dir.c_str());
7a7fa5f0
AL
559 }
560
561 for (struct dirent *Dir = readdir(D); Dir != 0; Dir = readdir(D))
562 {
563 // Skip some files..
564 if (strcmp(Dir->d_name,"lock") == 0 ||
565 strcmp(Dir->d_name,"partial") == 0 ||
566 strcmp(Dir->d_name,".") == 0 ||
567 strcmp(Dir->d_name,"..") == 0)
568 continue;
569
570 // Look in the get list
b4fc9b6f 571 ItemCIterator I = Items.begin();
f7f0d6c7 572 for (; I != Items.end(); ++I)
7a7fa5f0
AL
573 if (flNotDir((*I)->DestFile) == Dir->d_name)
574 break;
575
576 // Nothing found, nuke it
577 if (I == Items.end())
578 unlink(Dir->d_name);
579 };
580
7a7fa5f0 581 closedir(D);
3c8cda8b
MV
582 if (chdir(StartDir.c_str()) != 0)
583 return _error->Errno("chdir",_("Unable to change to %s"),StartDir.c_str());
7a7fa5f0
AL
584 return true;
585}
586 /*}}}*/
a6568219
AL
587// Acquire::TotalNeeded - Number of bytes to fetch /*{{{*/
588// ---------------------------------------------------------------------
589/* This is the total number of bytes needed */
a02db58f 590APT_PURE unsigned long long pkgAcquire::TotalNeeded()
a6568219 591{
a3c4c81a 592 unsigned long long Total = 0;
f7f0d6c7 593 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); ++I)
a6568219
AL
594 Total += (*I)->FileSize;
595 return Total;
596}
597 /*}}}*/
598// Acquire::FetchNeeded - Number of bytes needed to get /*{{{*/
599// ---------------------------------------------------------------------
600/* This is the number of bytes that is not local */
a02db58f 601APT_PURE unsigned long long pkgAcquire::FetchNeeded()
a6568219 602{
a3c4c81a 603 unsigned long long Total = 0;
f7f0d6c7 604 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); ++I)
a6568219
AL
605 if ((*I)->Local == false)
606 Total += (*I)->FileSize;
607 return Total;
608}
609 /*}}}*/
6b1ff003
AL
610// Acquire::PartialPresent - Number of partial bytes we already have /*{{{*/
611// ---------------------------------------------------------------------
612/* This is the number of bytes that is not local */
a02db58f 613APT_PURE unsigned long long pkgAcquire::PartialPresent()
6b1ff003 614{
a3c4c81a 615 unsigned long long Total = 0;
f7f0d6c7 616 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); ++I)
6b1ff003
AL
617 if ((*I)->Local == false)
618 Total += (*I)->PartialSize;
619 return Total;
620}
92fcbfc1 621 /*}}}*/
8e5fc8f5 622// Acquire::UriBegin - Start iterator for the uri list /*{{{*/
f7a08e33
AL
623// ---------------------------------------------------------------------
624/* */
625pkgAcquire::UriIterator pkgAcquire::UriBegin()
626{
627 return UriIterator(Queues);
628}
629 /*}}}*/
8e5fc8f5 630// Acquire::UriEnd - End iterator for the uri list /*{{{*/
f7a08e33
AL
631// ---------------------------------------------------------------------
632/* */
633pkgAcquire::UriIterator pkgAcquire::UriEnd()
634{
635 return UriIterator(0);
636}
637 /*}}}*/
e331f6ed
AL
638// Acquire::MethodConfig::MethodConfig - Constructor /*{{{*/
639// ---------------------------------------------------------------------
640/* */
25613a61
DK
641pkgAcquire::MethodConfig::MethodConfig() : d(NULL), Next(0), SingleInstance(false),
642 Pipeline(false), SendConfig(false), LocalOnly(false), NeedsCleanup(false),
643 Removable(false)
e331f6ed 644{
e331f6ed
AL
645}
646 /*}}}*/
0a8a80e5
AL
647// Queue::Queue - Constructor /*{{{*/
648// ---------------------------------------------------------------------
649/* */
25613a61
DK
650pkgAcquire::Queue::Queue(string Name,pkgAcquire *Owner) : d(NULL), Next(0),
651 Name(Name), Items(0), Workers(0), Owner(Owner), PipeDepth(0), MaxPipeDepth(1)
0a8a80e5 652{
0a8a80e5
AL
653}
654 /*}}}*/
655// Queue::~Queue - Destructor /*{{{*/
656// ---------------------------------------------------------------------
657/* */
658pkgAcquire::Queue::~Queue()
659{
8e5fc8f5 660 Shutdown(true);
0a8a80e5
AL
661
662 while (Items != 0)
663 {
664 QItem *Jnk = Items;
665 Items = Items->Next;
666 delete Jnk;
667 }
668}
669 /*}}}*/
670// Queue::Enqueue - Queue an item to the queue /*{{{*/
671// ---------------------------------------------------------------------
672/* */
c03462c6 673bool pkgAcquire::Queue::Enqueue(ItemDesc &Item)
0a8a80e5 674{
7a1b1f8b 675 QItem **I = &Items;
c03462c6
MV
676 // move to the end of the queue and check for duplicates here
677 for (; *I != 0; I = &(*I)->Next)
678 if (Item.URI == (*I)->URI)
679 {
680 Item.Owner->Status = Item::StatDone;
681 return false;
682 }
683
0a8a80e5 684 // Create a new item
7a1b1f8b
AL
685 QItem *Itm = new QItem;
686 *Itm = Item;
687 Itm->Next = 0;
688 *I = Itm;
0a8a80e5 689
8267fe24 690 Item.Owner->QueueCounter++;
93bf083d
AL
691 if (Items->Next == 0)
692 Cycle();
c03462c6 693 return true;
0a8a80e5
AL
694}
695 /*}}}*/
c88edf1d 696// Queue::Dequeue - Remove an item from the queue /*{{{*/
0a8a80e5 697// ---------------------------------------------------------------------
b185acc2 698/* We return true if we hit something */
bfd22fc0 699bool pkgAcquire::Queue::Dequeue(Item *Owner)
0a8a80e5 700{
b185acc2
AL
701 if (Owner->Status == pkgAcquire::Item::StatFetching)
702 return _error->Error("Tried to dequeue a fetching object");
703
bfd22fc0
AL
704 bool Res = false;
705
0a8a80e5
AL
706 QItem **I = &Items;
707 for (; *I != 0;)
708 {
709 if ((*I)->Owner == Owner)
710 {
711 QItem *Jnk= *I;
712 *I = (*I)->Next;
713 Owner->QueueCounter--;
714 delete Jnk;
bfd22fc0 715 Res = true;
0a8a80e5
AL
716 }
717 else
718 I = &(*I)->Next;
719 }
bfd22fc0
AL
720
721 return Res;
0a8a80e5
AL
722}
723 /*}}}*/
724// Queue::Startup - Start the worker processes /*{{{*/
725// ---------------------------------------------------------------------
8e5fc8f5
AL
726/* It is possible for this to be called with a pre-existing set of
727 workers. */
0a8a80e5
AL
728bool pkgAcquire::Queue::Startup()
729{
8e5fc8f5
AL
730 if (Workers == 0)
731 {
732 URI U(Name);
733 pkgAcquire::MethodConfig *Cnf = Owner->GetConfig(U.Access);
734 if (Cnf == 0)
735 return false;
736
737 Workers = new Worker(this,Cnf,Owner->Log);
738 Owner->Add(Workers);
739 if (Workers->Start() == false)
740 return false;
741
742 /* When pipelining we commit 10 items. This needs to change when we
743 added other source retry to have cycle maintain a pipeline depth
744 on its own. */
745 if (Cnf->Pipeline == true)
6ce72612 746 MaxPipeDepth = _config->FindI("Acquire::Max-Pipeline-Depth",10);
8e5fc8f5
AL
747 else
748 MaxPipeDepth = 1;
749 }
5cb5d8dc 750
93bf083d 751 return Cycle();
0a8a80e5
AL
752}
753 /*}}}*/
754// Queue::Shutdown - Shutdown the worker processes /*{{{*/
755// ---------------------------------------------------------------------
8e5fc8f5
AL
756/* If final is true then all workers are eliminated, otherwise only workers
757 that do not need cleanup are removed */
758bool pkgAcquire::Queue::Shutdown(bool Final)
0a8a80e5
AL
759{
760 // Delete all of the workers
8e5fc8f5
AL
761 pkgAcquire::Worker **Cur = &Workers;
762 while (*Cur != 0)
0a8a80e5 763 {
8e5fc8f5
AL
764 pkgAcquire::Worker *Jnk = *Cur;
765 if (Final == true || Jnk->GetConf()->NeedsCleanup == false)
766 {
767 *Cur = Jnk->NextQueue;
768 Owner->Remove(Jnk);
769 delete Jnk;
770 }
771 else
772 Cur = &(*Cur)->NextQueue;
0a8a80e5
AL
773 }
774
775 return true;
3b5421b4
AL
776}
777 /*}}}*/
7d8afa39 778// Queue::FindItem - Find a URI in the item list /*{{{*/
c88edf1d
AL
779// ---------------------------------------------------------------------
780/* */
781pkgAcquire::Queue::QItem *pkgAcquire::Queue::FindItem(string URI,pkgAcquire::Worker *Owner)
782{
783 for (QItem *I = Items; I != 0; I = I->Next)
784 if (I->URI == URI && I->Worker == Owner)
785 return I;
786 return 0;
787}
788 /*}}}*/
789// Queue::ItemDone - Item has been completed /*{{{*/
790// ---------------------------------------------------------------------
791/* The worker signals this which causes the item to be removed from the
93bf083d
AL
792 queue. If this is the last queue instance then it is removed from the
793 main queue too.*/
c88edf1d
AL
794bool pkgAcquire::Queue::ItemDone(QItem *Itm)
795{
b185acc2 796 PipeDepth--;
db890fdb
AL
797 if (Itm->Owner->Status == pkgAcquire::Item::StatFetching)
798 Itm->Owner->Status = pkgAcquire::Item::StatDone;
799
93bf083d
AL
800 if (Itm->Owner->QueueCounter <= 1)
801 Owner->Dequeue(Itm->Owner);
802 else
803 {
804 Dequeue(Itm->Owner);
805 Owner->Bump();
806 }
c88edf1d 807
93bf083d
AL
808 return Cycle();
809}
810 /*}}}*/
811// Queue::Cycle - Queue new items into the method /*{{{*/
812// ---------------------------------------------------------------------
b185acc2
AL
813/* This locates a new idle item and sends it to the worker. If pipelining
814 is enabled then it keeps the pipe full. */
93bf083d
AL
815bool pkgAcquire::Queue::Cycle()
816{
817 if (Items == 0 || Workers == 0)
c88edf1d
AL
818 return true;
819
e7432370
AL
820 if (PipeDepth < 0)
821 return _error->Error("Pipedepth failure");
822
93bf083d
AL
823 // Look for a queable item
824 QItem *I = Items;
e7432370 825 while (PipeDepth < (signed)MaxPipeDepth)
b185acc2
AL
826 {
827 for (; I != 0; I = I->Next)
828 if (I->Owner->Status == pkgAcquire::Item::StatIdle)
829 break;
830
831 // Nothing to do, queue is idle.
832 if (I == 0)
833 return true;
834
835 I->Worker = Workers;
836 I->Owner->Status = pkgAcquire::Item::StatFetching;
e7432370 837 PipeDepth++;
b185acc2
AL
838 if (Workers->QueueItem(I) == false)
839 return false;
840 }
93bf083d 841
b185acc2 842 return true;
c88edf1d
AL
843}
844 /*}}}*/
be4401bf
AL
845// Queue::Bump - Fetch any pending objects if we are idle /*{{{*/
846// ---------------------------------------------------------------------
b185acc2 847/* This is called when an item in multiple queues is dequeued */
be4401bf
AL
848void pkgAcquire::Queue::Bump()
849{
b185acc2 850 Cycle();
be4401bf
AL
851}
852 /*}}}*/
b98f2859
AL
853// AcquireStatus::pkgAcquireStatus - Constructor /*{{{*/
854// ---------------------------------------------------------------------
855/* */
25613a61 856pkgAcquireStatus::pkgAcquireStatus() : d(NULL), Percent(0), Update(true), MorePulses(false)
b98f2859
AL
857{
858 Start();
859}
860 /*}}}*/
861// AcquireStatus::Pulse - Called periodically /*{{{*/
862// ---------------------------------------------------------------------
863/* This computes some internal state variables for the derived classes to
864 use. It generates the current downloaded bytes and total bytes to download
865 as well as the current CPS estimate. */
024d1123 866bool pkgAcquireStatus::Pulse(pkgAcquire *Owner)
b98f2859
AL
867{
868 TotalBytes = 0;
869 CurrentBytes = 0;
d568ed2d
AL
870 TotalItems = 0;
871 CurrentItems = 0;
b98f2859
AL
872
873 // Compute the total number of bytes to fetch
874 unsigned int Unknown = 0;
875 unsigned int Count = 0;
c6e9cc58
MV
876 bool UnfetchedReleaseFiles = false;
877 for (pkgAcquire::ItemCIterator I = Owner->ItemsBegin();
878 I != Owner->ItemsEnd();
f7f0d6c7 879 ++I, ++Count)
b98f2859 880 {
d568ed2d
AL
881 TotalItems++;
882 if ((*I)->Status == pkgAcquire::Item::StatDone)
f7f0d6c7 883 ++CurrentItems;
d568ed2d 884
a6568219
AL
885 // Totally ignore local items
886 if ((*I)->Local == true)
887 continue;
b2e465d6 888
d0cfa8ad
MV
889 // see if the method tells us to expect more
890 TotalItems += (*I)->ExpectedAdditionalItems;
891
c6e9cc58
MV
892 // check if there are unfetched Release files
893 if ((*I)->Complete == false && (*I)->ExpectedAdditionalItems > 0)
894 UnfetchedReleaseFiles = true;
895
b98f2859
AL
896 TotalBytes += (*I)->FileSize;
897 if ((*I)->Complete == true)
898 CurrentBytes += (*I)->FileSize;
899 if ((*I)->FileSize == 0 && (*I)->Complete == false)
f7f0d6c7 900 ++Unknown;
b98f2859
AL
901 }
902
903 // Compute the current completion
dbbc5494 904 unsigned long long ResumeSize = 0;
b98f2859
AL
905 for (pkgAcquire::Worker *I = Owner->WorkersBegin(); I != 0;
906 I = Owner->WorkerStep(I))
c62f7898 907 {
b98f2859 908 if (I->CurrentItem != 0 && I->CurrentItem->Owner->Complete == false)
aa0e1101
AL
909 {
910 CurrentBytes += I->CurrentSize;
911 ResumeSize += I->ResumePoint;
912
913 // Files with unknown size always have 100% completion
914 if (I->CurrentItem->Owner->FileSize == 0 &&
915 I->CurrentItem->Owner->Complete == false)
916 TotalBytes += I->CurrentSize;
917 }
c62f7898 918 }
aa0e1101 919
b98f2859
AL
920 // Normalize the figures and account for unknown size downloads
921 if (TotalBytes <= 0)
922 TotalBytes = 1;
923 if (Unknown == Count)
924 TotalBytes = Unknown;
18ef0a78
AL
925
926 // Wha?! Is not supposed to happen.
927 if (CurrentBytes > TotalBytes)
928 CurrentBytes = TotalBytes;
96c6cab1
MV
929
930 // debug
931 if (_config->FindB("Debug::acquire::progress", false) == true)
932 std::clog << " Bytes: "
933 << SizeToStr(CurrentBytes) << " / " << SizeToStr(TotalBytes)
934 << std::endl;
b98f2859
AL
935
936 // Compute the CPS
937 struct timeval NewTime;
938 gettimeofday(&NewTime,0);
2ec1674d 939 if ((NewTime.tv_sec - Time.tv_sec == 6 && NewTime.tv_usec > Time.tv_usec) ||
b98f2859
AL
940 NewTime.tv_sec - Time.tv_sec > 6)
941 {
f17ac097
AL
942 double Delta = NewTime.tv_sec - Time.tv_sec +
943 (NewTime.tv_usec - Time.tv_usec)/1000000.0;
b98f2859 944
b98f2859 945 // Compute the CPS value
f17ac097 946 if (Delta < 0.01)
e331f6ed
AL
947 CurrentCPS = 0;
948 else
aa0e1101
AL
949 CurrentCPS = ((CurrentBytes - ResumeSize) - LastBytes)/Delta;
950 LastBytes = CurrentBytes - ResumeSize;
dbbc5494 951 ElapsedTime = (unsigned long long)Delta;
b98f2859
AL
952 Time = NewTime;
953 }
024d1123 954
c6e9cc58
MV
955 // calculate the percentage, if we have too little data assume 1%
956 if (TotalBytes > 0 && UnfetchedReleaseFiles)
96c6cab1
MV
957 Percent = 0;
958 else
959 // use both files and bytes because bytes can be unreliable
960 Percent = (0.8 * (CurrentBytes/float(TotalBytes)*100.0) +
961 0.2 * (CurrentItems/float(TotalItems)*100.0));
962
75ef8f14
MV
963 int fd = _config->FindI("APT::Status-Fd",-1);
964 if(fd > 0)
965 {
966 ostringstream status;
967
968 char msg[200];
969 long i = CurrentItems < TotalItems ? CurrentItems + 1 : CurrentItems;
c033d415
MV
970 unsigned long long ETA = 0;
971 if(CurrentCPS > 0)
972 ETA = (TotalBytes - CurrentBytes) / CurrentCPS;
75ef8f14 973
1e8b4c0f
MV
974 // only show the ETA if it makes sense
975 if (ETA > 0 && ETA < 172800 /* two days */ )
0c508b03 976 snprintf(msg,sizeof(msg), _("Retrieving file %li of %li (%s remaining)"), i, TotalItems, TimeToStr(ETA).c_str());
1e8b4c0f 977 else
0c508b03 978 snprintf(msg,sizeof(msg), _("Retrieving file %li of %li"), i, TotalItems);
1e8b4c0f 979
75ef8f14
MV
980 // build the status str
981 status << "dlstatus:" << i
96c6cab1 982 << ":" << std::setprecision(3) << Percent
d0cfa8ad
MV
983 << ":" << msg
984 << endl;
31bda500
DK
985
986 std::string const dlstatus = status.str();
d68d65ad 987 FileFd::Write(fd, dlstatus.c_str(), dlstatus.size());
75ef8f14
MV
988 }
989
024d1123 990 return true;
b98f2859
AL
991}
992 /*}}}*/
993// AcquireStatus::Start - Called when the download is started /*{{{*/
994// ---------------------------------------------------------------------
995/* We just reset the counters */
996void pkgAcquireStatus::Start()
997{
998 gettimeofday(&Time,0);
999 gettimeofday(&StartTime,0);
1000 LastBytes = 0;
1001 CurrentCPS = 0;
1002 CurrentBytes = 0;
1003 TotalBytes = 0;
1004 FetchedBytes = 0;
1005 ElapsedTime = 0;
d568ed2d
AL
1006 TotalItems = 0;
1007 CurrentItems = 0;
b98f2859
AL
1008}
1009 /*}}}*/
a6568219 1010// AcquireStatus::Stop - Finished downloading /*{{{*/
b98f2859
AL
1011// ---------------------------------------------------------------------
1012/* This accurately computes the elapsed time and the total overall CPS. */
1013void pkgAcquireStatus::Stop()
1014{
1015 // Compute the CPS and elapsed time
1016 struct timeval NewTime;
1017 gettimeofday(&NewTime,0);
1018
31a0531d
AL
1019 double Delta = NewTime.tv_sec - StartTime.tv_sec +
1020 (NewTime.tv_usec - StartTime.tv_usec)/1000000.0;
b98f2859 1021
b98f2859 1022 // Compute the CPS value
31a0531d 1023 if (Delta < 0.01)
e331f6ed
AL
1024 CurrentCPS = 0;
1025 else
31a0531d 1026 CurrentCPS = FetchedBytes/Delta;
b98f2859 1027 LastBytes = CurrentBytes;
dbbc5494 1028 ElapsedTime = (unsigned long long)Delta;
b98f2859
AL
1029}
1030 /*}}}*/
1031// AcquireStatus::Fetched - Called when a byte set has been fetched /*{{{*/
1032// ---------------------------------------------------------------------
1033/* This is used to get accurate final transfer rate reporting. */
73da43e9 1034void pkgAcquireStatus::Fetched(unsigned long long Size,unsigned long long Resume)
93274b8d 1035{
b98f2859
AL
1036 FetchedBytes += Size - Resume;
1037}
1038 /*}}}*/
862bafea 1039
9d653a6d
DK
1040APT_CONST pkgAcquire::UriIterator::~UriIterator() {}
1041APT_CONST pkgAcquire::MethodConfig::~MethodConfig() {}
1042APT_CONST pkgAcquireStatus::~pkgAcquireStatus() {}