]> git.saurik.com Git - apt.git/blame - apt-pkg/acquire.cc
[apt-pkg] allow also codenames for specifying a release
[apt.git] / apt-pkg / acquire.cc
CommitLineData
0118833a
AL
1// -*- mode: cpp; mode: fold -*-
2// Description /*{{{*/
1b480911 3// $Id: acquire.cc,v 1.50 2004/03/17 05:17:11 mdz Exp $
0118833a
AL
4/* ######################################################################
5
6 Acquire - File Acquiration
7
0a8a80e5
AL
8 The core element for the schedual system is the concept of a named
9 queue. Each queue is unique and each queue has a name derived from the
10 URI. The degree of paralization can be controled by how the queue
11 name is derived from the URI.
12
0118833a
AL
13 ##################################################################### */
14 /*}}}*/
15// Include Files /*{{{*/
0118833a
AL
16#include <apt-pkg/acquire.h>
17#include <apt-pkg/acquire-item.h>
18#include <apt-pkg/acquire-worker.h>
0a8a80e5
AL
19#include <apt-pkg/configuration.h>
20#include <apt-pkg/error.h>
cdcc6d34 21#include <apt-pkg/strutl.h>
8267fe24 22
b2e465d6 23#include <apti18n.h>
b4fc9b6f
AL
24
25#include <iostream>
75ef8f14 26#include <sstream>
b2e465d6 27
7a7fa5f0 28#include <dirent.h>
8267fe24 29#include <sys/time.h>
524f8105 30#include <errno.h>
5f3edc0f 31#include <sys/stat.h>
0118833a
AL
32 /*}}}*/
33
b4fc9b6f
AL
34using namespace std;
35
0118833a
AL
36// Acquire::pkgAcquire - Constructor /*{{{*/
37// ---------------------------------------------------------------------
93bf083d 38/* We grab some runtime state from the configuration space */
8267fe24 39pkgAcquire::pkgAcquire(pkgAcquireStatus *Log) : Log(Log)
0118833a
AL
40{
41 Queues = 0;
42 Configs = 0;
0a8a80e5
AL
43 Workers = 0;
44 ToFetch = 0;
8b89e57f 45 Running = false;
0a8a80e5
AL
46
47 string Mode = _config->Find("Acquire::Queue-Mode","host");
48 if (strcasecmp(Mode.c_str(),"host") == 0)
49 QueueMode = QueueHost;
50 if (strcasecmp(Mode.c_str(),"access") == 0)
51 QueueMode = QueueAccess;
52
53 Debug = _config->FindB("Debug::pkgAcquire",false);
5f3edc0f 54
0c55d1d2 55 // This is really a stupid place for this
5f3edc0f
AL
56 struct stat St;
57 if (stat((_config->FindDir("Dir::State::lists") + "partial/").c_str(),&St) != 0 ||
58 S_ISDIR(St.st_mode) == 0)
b2e465d6 59 _error->Error(_("Lists directory %spartial is missing."),
5f3edc0f
AL
60 _config->FindDir("Dir::State::lists").c_str());
61 if (stat((_config->FindDir("Dir::Cache::Archives") + "partial/").c_str(),&St) != 0 ||
62 S_ISDIR(St.st_mode) == 0)
b2e465d6 63 _error->Error(_("Archive directory %spartial is missing."),
5f3edc0f 64 _config->FindDir("Dir::Cache::Archives").c_str());
0118833a
AL
65}
66 /*}}}*/
67// Acquire::~pkgAcquire - Destructor /*{{{*/
68// ---------------------------------------------------------------------
93bf083d 69/* Free our memory, clean up the queues (destroy the workers) */
0118833a
AL
70pkgAcquire::~pkgAcquire()
71{
459681d3
AL
72 Shutdown();
73
3b5421b4
AL
74 while (Configs != 0)
75 {
76 MethodConfig *Jnk = Configs;
77 Configs = Configs->Next;
78 delete Jnk;
79 }
281daf46
AL
80}
81 /*}}}*/
8e5fc8f5 82// Acquire::Shutdown - Clean out the acquire object /*{{{*/
281daf46
AL
83// ---------------------------------------------------------------------
84/* */
85void pkgAcquire::Shutdown()
86{
87 while (Items.size() != 0)
1b480911
AL
88 {
89 if (Items[0]->Status == Item::StatFetching)
90 Items[0]->Status = Item::StatError;
281daf46 91 delete Items[0];
1b480911 92 }
0a8a80e5
AL
93
94 while (Queues != 0)
95 {
96 Queue *Jnk = Queues;
97 Queues = Queues->Next;
98 delete Jnk;
99 }
0118833a
AL
100}
101 /*}}}*/
102// Acquire::Add - Add a new item /*{{{*/
103// ---------------------------------------------------------------------
93bf083d
AL
104/* This puts an item on the acquire list. This list is mainly for tracking
105 item status */
0118833a
AL
106void pkgAcquire::Add(Item *Itm)
107{
108 Items.push_back(Itm);
109}
110 /*}}}*/
111// Acquire::Remove - Remove a item /*{{{*/
112// ---------------------------------------------------------------------
93bf083d 113/* Remove an item from the acquire list. This is usually not used.. */
0118833a
AL
114void pkgAcquire::Remove(Item *Itm)
115{
a3eaf954
AL
116 Dequeue(Itm);
117
753b3525 118 for (ItemIterator I = Items.begin(); I != Items.end();)
0118833a
AL
119 {
120 if (*I == Itm)
b4fc9b6f 121 {
0118833a 122 Items.erase(I);
b4fc9b6f
AL
123 I = Items.begin();
124 }
753b3525
AL
125 else
126 I++;
8267fe24 127 }
0118833a
AL
128}
129 /*}}}*/
0a8a80e5
AL
130// Acquire::Add - Add a worker /*{{{*/
131// ---------------------------------------------------------------------
93bf083d
AL
132/* A list of workers is kept so that the select loop can direct their FD
133 usage. */
0a8a80e5
AL
134void pkgAcquire::Add(Worker *Work)
135{
136 Work->NextAcquire = Workers;
137 Workers = Work;
138}
139 /*}}}*/
140// Acquire::Remove - Remove a worker /*{{{*/
141// ---------------------------------------------------------------------
93bf083d
AL
142/* A worker has died. This can not be done while the select loop is running
143 as it would require that RunFds could handling a changing list state and
144 it cant.. */
0a8a80e5
AL
145void pkgAcquire::Remove(Worker *Work)
146{
93bf083d
AL
147 if (Running == true)
148 abort();
149
0a8a80e5
AL
150 Worker **I = &Workers;
151 for (; *I != 0;)
152 {
153 if (*I == Work)
154 *I = (*I)->NextAcquire;
155 else
156 I = &(*I)->NextAcquire;
157 }
158}
159 /*}}}*/
0118833a
AL
160// Acquire::Enqueue - Queue an URI for fetching /*{{{*/
161// ---------------------------------------------------------------------
93bf083d 162/* This is the entry point for an item. An item calls this function when
281daf46 163 it is constructed which creates a queue (based on the current queue
93bf083d
AL
164 mode) and puts the item in that queue. If the system is running then
165 the queue might be started. */
8267fe24 166void pkgAcquire::Enqueue(ItemDesc &Item)
0118833a 167{
0a8a80e5 168 // Determine which queue to put the item in
e331f6ed
AL
169 const MethodConfig *Config;
170 string Name = QueueName(Item.URI,Config);
0a8a80e5
AL
171 if (Name.empty() == true)
172 return;
173
174 // Find the queue structure
175 Queue *I = Queues;
176 for (; I != 0 && I->Name != Name; I = I->Next);
177 if (I == 0)
178 {
179 I = new Queue(Name,this);
180 I->Next = Queues;
181 Queues = I;
93bf083d
AL
182
183 if (Running == true)
184 I->Startup();
0a8a80e5 185 }
bfd22fc0 186
e331f6ed
AL
187 // See if this is a local only URI
188 if (Config->LocalOnly == true && Item.Owner->Complete == false)
189 Item.Owner->Local = true;
8267fe24 190 Item.Owner->Status = Item::StatIdle;
0a8a80e5
AL
191
192 // Queue it into the named queue
c03462c6
MV
193 if(I->Enqueue(Item))
194 ToFetch++;
195
0a8a80e5
AL
196 // Some trace stuff
197 if (Debug == true)
198 {
8267fe24
AL
199 clog << "Fetching " << Item.URI << endl;
200 clog << " to " << Item.Owner->DestFile << endl;
e331f6ed 201 clog << " Queue is: " << Name << endl;
0a8a80e5 202 }
3b5421b4
AL
203}
204 /*}}}*/
0a8a80e5 205// Acquire::Dequeue - Remove an item from all queues /*{{{*/
3b5421b4 206// ---------------------------------------------------------------------
93bf083d
AL
207/* This is called when an item is finished being fetched. It removes it
208 from all the queues */
0a8a80e5
AL
209void pkgAcquire::Dequeue(Item *Itm)
210{
211 Queue *I = Queues;
bfd22fc0 212 bool Res = false;
0a8a80e5 213 for (; I != 0; I = I->Next)
bfd22fc0 214 Res |= I->Dequeue(Itm);
93bf083d
AL
215
216 if (Debug == true)
217 clog << "Dequeuing " << Itm->DestFile << endl;
bfd22fc0
AL
218 if (Res == true)
219 ToFetch--;
0a8a80e5
AL
220}
221 /*}}}*/
222// Acquire::QueueName - Return the name of the queue for this URI /*{{{*/
223// ---------------------------------------------------------------------
224/* The string returned depends on the configuration settings and the
225 method parameters. Given something like http://foo.org/bar it can
226 return http://foo.org or http */
e331f6ed 227string pkgAcquire::QueueName(string Uri,MethodConfig const *&Config)
3b5421b4 228{
93bf083d
AL
229 URI U(Uri);
230
e331f6ed 231 Config = GetConfig(U.Access);
0a8a80e5
AL
232 if (Config == 0)
233 return string();
234
235 /* Single-Instance methods get exactly one queue per URI. This is
236 also used for the Access queue method */
237 if (Config->SingleInstance == true || QueueMode == QueueAccess)
b98f2859 238 return U.Access;
93bf083d
AL
239
240 return U.Access + ':' + U.Host;
0118833a
AL
241}
242 /*}}}*/
3b5421b4
AL
243// Acquire::GetConfig - Fetch the configuration information /*{{{*/
244// ---------------------------------------------------------------------
245/* This locates the configuration structure for an access method. If
246 a config structure cannot be found a Worker will be created to
247 retrieve it */
0a8a80e5 248pkgAcquire::MethodConfig *pkgAcquire::GetConfig(string Access)
3b5421b4
AL
249{
250 // Search for an existing config
251 MethodConfig *Conf;
252 for (Conf = Configs; Conf != 0; Conf = Conf->Next)
253 if (Conf->Access == Access)
254 return Conf;
255
256 // Create the new config class
257 Conf = new MethodConfig;
258 Conf->Access = Access;
259 Conf->Next = Configs;
260 Configs = Conf;
0118833a 261
3b5421b4
AL
262 // Create the worker to fetch the configuration
263 Worker Work(Conf);
264 if (Work.Start() == false)
265 return 0;
7c6e2dc7
MV
266
267 /* if a method uses DownloadLimit, we switch to SingleInstance mode */
268 if(_config->FindI("Acquire::"+Access+"::DlLimit",0) > 0)
269 Conf->SingleInstance = true;
270
3b5421b4
AL
271 return Conf;
272}
273 /*}}}*/
0a8a80e5
AL
274// Acquire::SetFds - Deal with readable FDs /*{{{*/
275// ---------------------------------------------------------------------
276/* Collect FDs that have activity monitors into the fd sets */
277void pkgAcquire::SetFds(int &Fd,fd_set *RSet,fd_set *WSet)
278{
279 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
280 {
281 if (I->InReady == true && I->InFd >= 0)
282 {
283 if (Fd < I->InFd)
284 Fd = I->InFd;
285 FD_SET(I->InFd,RSet);
286 }
287 if (I->OutReady == true && I->OutFd >= 0)
288 {
289 if (Fd < I->OutFd)
290 Fd = I->OutFd;
291 FD_SET(I->OutFd,WSet);
292 }
293 }
294}
295 /*}}}*/
296// Acquire::RunFds - Deal with active FDs /*{{{*/
297// ---------------------------------------------------------------------
93bf083d
AL
298/* Dispatch active FDs over to the proper workers. It is very important
299 that a worker never be erased while this is running! The queue class
300 should never erase a worker except during shutdown processing. */
0a8a80e5
AL
301void pkgAcquire::RunFds(fd_set *RSet,fd_set *WSet)
302{
303 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
304 {
305 if (I->InFd >= 0 && FD_ISSET(I->InFd,RSet) != 0)
306 I->InFdReady();
307 if (I->OutFd >= 0 && FD_ISSET(I->OutFd,WSet) != 0)
308 I->OutFdReady();
309 }
310}
311 /*}}}*/
312// Acquire::Run - Run the fetch sequence /*{{{*/
313// ---------------------------------------------------------------------
314/* This runs the queues. It manages a select loop for all of the
315 Worker tasks. The workers interact with the queues and items to
316 manage the actual fetch. */
1c5f7e5f 317pkgAcquire::RunResult pkgAcquire::Run(int PulseIntervall)
0a8a80e5 318{
8b89e57f
AL
319 Running = true;
320
0a8a80e5
AL
321 for (Queue *I = Queues; I != 0; I = I->Next)
322 I->Startup();
323
b98f2859
AL
324 if (Log != 0)
325 Log->Start();
326
024d1123
AL
327 bool WasCancelled = false;
328
0a8a80e5 329 // Run till all things have been acquired
8267fe24
AL
330 struct timeval tv;
331 tv.tv_sec = 0;
1c5f7e5f 332 tv.tv_usec = PulseIntervall;
0a8a80e5
AL
333 while (ToFetch > 0)
334 {
335 fd_set RFds;
336 fd_set WFds;
337 int Highest = 0;
338 FD_ZERO(&RFds);
339 FD_ZERO(&WFds);
340 SetFds(Highest,&RFds,&WFds);
341
b0db36b1
AL
342 int Res;
343 do
344 {
345 Res = select(Highest+1,&RFds,&WFds,0,&tv);
346 }
347 while (Res < 0 && errno == EINTR);
348
8267fe24 349 if (Res < 0)
8b89e57f 350 {
8267fe24
AL
351 _error->Errno("select","Select has failed");
352 break;
8b89e57f 353 }
93bf083d 354
0a8a80e5 355 RunFds(&RFds,&WFds);
93bf083d
AL
356 if (_error->PendingError() == true)
357 break;
8267fe24
AL
358
359 // Timeout, notify the log class
360 if (Res == 0 || (Log != 0 && Log->Update == true))
361 {
1c5f7e5f 362 tv.tv_usec = PulseIntervall;
8267fe24
AL
363 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
364 I->Pulse();
024d1123
AL
365 if (Log != 0 && Log->Pulse(this) == false)
366 {
367 WasCancelled = true;
368 break;
369 }
8267fe24 370 }
0a8a80e5 371 }
be4401bf 372
b98f2859
AL
373 if (Log != 0)
374 Log->Stop();
375
be4401bf
AL
376 // Shut down the acquire bits
377 Running = false;
0a8a80e5 378 for (Queue *I = Queues; I != 0; I = I->Next)
8e5fc8f5 379 I->Shutdown(false);
0a8a80e5 380
ab559b35 381 // Shut down the items
b4fc9b6f 382 for (ItemIterator I = Items.begin(); I != Items.end(); I++)
8e5fc8f5 383 (*I)->Finished();
ab559b35 384
024d1123
AL
385 if (_error->PendingError())
386 return Failed;
387 if (WasCancelled)
388 return Cancelled;
389 return Continue;
93bf083d
AL
390}
391 /*}}}*/
be4401bf 392// Acquire::Bump - Called when an item is dequeued /*{{{*/
93bf083d
AL
393// ---------------------------------------------------------------------
394/* This routine bumps idle queues in hopes that they will be able to fetch
395 the dequeued item */
396void pkgAcquire::Bump()
397{
be4401bf
AL
398 for (Queue *I = Queues; I != 0; I = I->Next)
399 I->Bump();
0a8a80e5
AL
400}
401 /*}}}*/
8267fe24
AL
402// Acquire::WorkerStep - Step to the next worker /*{{{*/
403// ---------------------------------------------------------------------
404/* Not inlined to advoid including acquire-worker.h */
405pkgAcquire::Worker *pkgAcquire::WorkerStep(Worker *I)
406{
407 return I->NextAcquire;
408};
409 /*}}}*/
a6568219 410// Acquire::Clean - Cleans a directory /*{{{*/
7a7fa5f0
AL
411// ---------------------------------------------------------------------
412/* This is a bit simplistic, it looks at every file in the dir and sees
413 if it is part of the download set. */
414bool pkgAcquire::Clean(string Dir)
415{
416 DIR *D = opendir(Dir.c_str());
417 if (D == 0)
b2e465d6 418 return _error->Errno("opendir",_("Unable to read %s"),Dir.c_str());
7a7fa5f0
AL
419
420 string StartDir = SafeGetCWD();
421 if (chdir(Dir.c_str()) != 0)
422 {
423 closedir(D);
b2e465d6 424 return _error->Errno("chdir",_("Unable to change to %s"),Dir.c_str());
7a7fa5f0
AL
425 }
426
427 for (struct dirent *Dir = readdir(D); Dir != 0; Dir = readdir(D))
428 {
429 // Skip some files..
430 if (strcmp(Dir->d_name,"lock") == 0 ||
431 strcmp(Dir->d_name,"partial") == 0 ||
432 strcmp(Dir->d_name,".") == 0 ||
433 strcmp(Dir->d_name,"..") == 0)
434 continue;
435
436 // Look in the get list
b4fc9b6f 437 ItemCIterator I = Items.begin();
7a7fa5f0
AL
438 for (; I != Items.end(); I++)
439 if (flNotDir((*I)->DestFile) == Dir->d_name)
440 break;
441
442 // Nothing found, nuke it
443 if (I == Items.end())
444 unlink(Dir->d_name);
445 };
446
7a7fa5f0 447 closedir(D);
3c8cda8b
MV
448 if (chdir(StartDir.c_str()) != 0)
449 return _error->Errno("chdir",_("Unable to change to %s"),StartDir.c_str());
7a7fa5f0
AL
450 return true;
451}
452 /*}}}*/
a6568219
AL
453// Acquire::TotalNeeded - Number of bytes to fetch /*{{{*/
454// ---------------------------------------------------------------------
455/* This is the total number of bytes needed */
b2e465d6 456double pkgAcquire::TotalNeeded()
a6568219 457{
b2e465d6 458 double Total = 0;
b4fc9b6f 459 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); I++)
a6568219
AL
460 Total += (*I)->FileSize;
461 return Total;
462}
463 /*}}}*/
464// Acquire::FetchNeeded - Number of bytes needed to get /*{{{*/
465// ---------------------------------------------------------------------
466/* This is the number of bytes that is not local */
b2e465d6 467double pkgAcquire::FetchNeeded()
a6568219 468{
b2e465d6 469 double Total = 0;
b4fc9b6f 470 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); I++)
a6568219
AL
471 if ((*I)->Local == false)
472 Total += (*I)->FileSize;
473 return Total;
474}
475 /*}}}*/
6b1ff003
AL
476// Acquire::PartialPresent - Number of partial bytes we already have /*{{{*/
477// ---------------------------------------------------------------------
478/* This is the number of bytes that is not local */
b2e465d6 479double pkgAcquire::PartialPresent()
6b1ff003 480{
b2e465d6 481 double Total = 0;
b4fc9b6f 482 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); I++)
6b1ff003
AL
483 if ((*I)->Local == false)
484 Total += (*I)->PartialSize;
485 return Total;
486}
b3d44315 487
8e5fc8f5 488// Acquire::UriBegin - Start iterator for the uri list /*{{{*/
f7a08e33
AL
489// ---------------------------------------------------------------------
490/* */
491pkgAcquire::UriIterator pkgAcquire::UriBegin()
492{
493 return UriIterator(Queues);
494}
495 /*}}}*/
8e5fc8f5 496// Acquire::UriEnd - End iterator for the uri list /*{{{*/
f7a08e33
AL
497// ---------------------------------------------------------------------
498/* */
499pkgAcquire::UriIterator pkgAcquire::UriEnd()
500{
501 return UriIterator(0);
502}
503 /*}}}*/
0a8a80e5 504
e331f6ed
AL
505// Acquire::MethodConfig::MethodConfig - Constructor /*{{{*/
506// ---------------------------------------------------------------------
507/* */
508pkgAcquire::MethodConfig::MethodConfig()
509{
510 SingleInstance = false;
e331f6ed
AL
511 Pipeline = false;
512 SendConfig = false;
513 LocalOnly = false;
459681d3 514 Removable = false;
e331f6ed
AL
515 Next = 0;
516}
517 /*}}}*/
518
0a8a80e5
AL
519// Queue::Queue - Constructor /*{{{*/
520// ---------------------------------------------------------------------
521/* */
522pkgAcquire::Queue::Queue(string Name,pkgAcquire *Owner) : Name(Name),
523 Owner(Owner)
524{
525 Items = 0;
526 Next = 0;
527 Workers = 0;
b185acc2
AL
528 MaxPipeDepth = 1;
529 PipeDepth = 0;
0a8a80e5
AL
530}
531 /*}}}*/
532// Queue::~Queue - Destructor /*{{{*/
533// ---------------------------------------------------------------------
534/* */
535pkgAcquire::Queue::~Queue()
536{
8e5fc8f5 537 Shutdown(true);
0a8a80e5
AL
538
539 while (Items != 0)
540 {
541 QItem *Jnk = Items;
542 Items = Items->Next;
543 delete Jnk;
544 }
545}
546 /*}}}*/
547// Queue::Enqueue - Queue an item to the queue /*{{{*/
548// ---------------------------------------------------------------------
549/* */
c03462c6 550bool pkgAcquire::Queue::Enqueue(ItemDesc &Item)
0a8a80e5 551{
7a1b1f8b 552 QItem **I = &Items;
c03462c6
MV
553 // move to the end of the queue and check for duplicates here
554 for (; *I != 0; I = &(*I)->Next)
555 if (Item.URI == (*I)->URI)
556 {
557 Item.Owner->Status = Item::StatDone;
558 return false;
559 }
560
0a8a80e5 561 // Create a new item
7a1b1f8b
AL
562 QItem *Itm = new QItem;
563 *Itm = Item;
564 Itm->Next = 0;
565 *I = Itm;
0a8a80e5 566
8267fe24 567 Item.Owner->QueueCounter++;
93bf083d
AL
568 if (Items->Next == 0)
569 Cycle();
c03462c6 570 return true;
0a8a80e5
AL
571}
572 /*}}}*/
c88edf1d 573// Queue::Dequeue - Remove an item from the queue /*{{{*/
0a8a80e5 574// ---------------------------------------------------------------------
b185acc2 575/* We return true if we hit something */
bfd22fc0 576bool pkgAcquire::Queue::Dequeue(Item *Owner)
0a8a80e5 577{
b185acc2
AL
578 if (Owner->Status == pkgAcquire::Item::StatFetching)
579 return _error->Error("Tried to dequeue a fetching object");
580
bfd22fc0
AL
581 bool Res = false;
582
0a8a80e5
AL
583 QItem **I = &Items;
584 for (; *I != 0;)
585 {
586 if ((*I)->Owner == Owner)
587 {
588 QItem *Jnk= *I;
589 *I = (*I)->Next;
590 Owner->QueueCounter--;
591 delete Jnk;
bfd22fc0 592 Res = true;
0a8a80e5
AL
593 }
594 else
595 I = &(*I)->Next;
596 }
bfd22fc0
AL
597
598 return Res;
0a8a80e5
AL
599}
600 /*}}}*/
601// Queue::Startup - Start the worker processes /*{{{*/
602// ---------------------------------------------------------------------
8e5fc8f5
AL
603/* It is possible for this to be called with a pre-existing set of
604 workers. */
0a8a80e5
AL
605bool pkgAcquire::Queue::Startup()
606{
8e5fc8f5
AL
607 if (Workers == 0)
608 {
609 URI U(Name);
610 pkgAcquire::MethodConfig *Cnf = Owner->GetConfig(U.Access);
611 if (Cnf == 0)
612 return false;
613
614 Workers = new Worker(this,Cnf,Owner->Log);
615 Owner->Add(Workers);
616 if (Workers->Start() == false)
617 return false;
618
619 /* When pipelining we commit 10 items. This needs to change when we
620 added other source retry to have cycle maintain a pipeline depth
621 on its own. */
622 if (Cnf->Pipeline == true)
7a59dff6 623 MaxPipeDepth = 1000;
8e5fc8f5
AL
624 else
625 MaxPipeDepth = 1;
626 }
5cb5d8dc 627
93bf083d 628 return Cycle();
0a8a80e5
AL
629}
630 /*}}}*/
631// Queue::Shutdown - Shutdown the worker processes /*{{{*/
632// ---------------------------------------------------------------------
8e5fc8f5
AL
633/* If final is true then all workers are eliminated, otherwise only workers
634 that do not need cleanup are removed */
635bool pkgAcquire::Queue::Shutdown(bool Final)
0a8a80e5
AL
636{
637 // Delete all of the workers
8e5fc8f5
AL
638 pkgAcquire::Worker **Cur = &Workers;
639 while (*Cur != 0)
0a8a80e5 640 {
8e5fc8f5
AL
641 pkgAcquire::Worker *Jnk = *Cur;
642 if (Final == true || Jnk->GetConf()->NeedsCleanup == false)
643 {
644 *Cur = Jnk->NextQueue;
645 Owner->Remove(Jnk);
646 delete Jnk;
647 }
648 else
649 Cur = &(*Cur)->NextQueue;
0a8a80e5
AL
650 }
651
652 return true;
3b5421b4
AL
653}
654 /*}}}*/
7d8afa39 655// Queue::FindItem - Find a URI in the item list /*{{{*/
c88edf1d
AL
656// ---------------------------------------------------------------------
657/* */
658pkgAcquire::Queue::QItem *pkgAcquire::Queue::FindItem(string URI,pkgAcquire::Worker *Owner)
659{
660 for (QItem *I = Items; I != 0; I = I->Next)
661 if (I->URI == URI && I->Worker == Owner)
662 return I;
663 return 0;
664}
665 /*}}}*/
666// Queue::ItemDone - Item has been completed /*{{{*/
667// ---------------------------------------------------------------------
668/* The worker signals this which causes the item to be removed from the
93bf083d
AL
669 queue. If this is the last queue instance then it is removed from the
670 main queue too.*/
c88edf1d
AL
671bool pkgAcquire::Queue::ItemDone(QItem *Itm)
672{
b185acc2 673 PipeDepth--;
db890fdb
AL
674 if (Itm->Owner->Status == pkgAcquire::Item::StatFetching)
675 Itm->Owner->Status = pkgAcquire::Item::StatDone;
676
93bf083d
AL
677 if (Itm->Owner->QueueCounter <= 1)
678 Owner->Dequeue(Itm->Owner);
679 else
680 {
681 Dequeue(Itm->Owner);
682 Owner->Bump();
683 }
c88edf1d 684
93bf083d
AL
685 return Cycle();
686}
687 /*}}}*/
688// Queue::Cycle - Queue new items into the method /*{{{*/
689// ---------------------------------------------------------------------
b185acc2
AL
690/* This locates a new idle item and sends it to the worker. If pipelining
691 is enabled then it keeps the pipe full. */
93bf083d
AL
692bool pkgAcquire::Queue::Cycle()
693{
694 if (Items == 0 || Workers == 0)
c88edf1d
AL
695 return true;
696
e7432370
AL
697 if (PipeDepth < 0)
698 return _error->Error("Pipedepth failure");
699
93bf083d
AL
700 // Look for a queable item
701 QItem *I = Items;
e7432370 702 while (PipeDepth < (signed)MaxPipeDepth)
b185acc2
AL
703 {
704 for (; I != 0; I = I->Next)
705 if (I->Owner->Status == pkgAcquire::Item::StatIdle)
706 break;
707
708 // Nothing to do, queue is idle.
709 if (I == 0)
710 return true;
711
712 I->Worker = Workers;
713 I->Owner->Status = pkgAcquire::Item::StatFetching;
e7432370 714 PipeDepth++;
b185acc2
AL
715 if (Workers->QueueItem(I) == false)
716 return false;
717 }
93bf083d 718
b185acc2 719 return true;
c88edf1d
AL
720}
721 /*}}}*/
be4401bf
AL
722// Queue::Bump - Fetch any pending objects if we are idle /*{{{*/
723// ---------------------------------------------------------------------
b185acc2 724/* This is called when an item in multiple queues is dequeued */
be4401bf
AL
725void pkgAcquire::Queue::Bump()
726{
b185acc2 727 Cycle();
be4401bf
AL
728}
729 /*}}}*/
b98f2859
AL
730
731// AcquireStatus::pkgAcquireStatus - Constructor /*{{{*/
732// ---------------------------------------------------------------------
733/* */
c5ccf175 734pkgAcquireStatus::pkgAcquireStatus() : Update(true), MorePulses(false)
b98f2859
AL
735{
736 Start();
737}
738 /*}}}*/
739// AcquireStatus::Pulse - Called periodically /*{{{*/
740// ---------------------------------------------------------------------
741/* This computes some internal state variables for the derived classes to
742 use. It generates the current downloaded bytes and total bytes to download
743 as well as the current CPS estimate. */
024d1123 744bool pkgAcquireStatus::Pulse(pkgAcquire *Owner)
b98f2859
AL
745{
746 TotalBytes = 0;
747 CurrentBytes = 0;
d568ed2d
AL
748 TotalItems = 0;
749 CurrentItems = 0;
b98f2859
AL
750
751 // Compute the total number of bytes to fetch
752 unsigned int Unknown = 0;
753 unsigned int Count = 0;
b4fc9b6f 754 for (pkgAcquire::ItemCIterator I = Owner->ItemsBegin(); I != Owner->ItemsEnd();
b98f2859
AL
755 I++, Count++)
756 {
d568ed2d
AL
757 TotalItems++;
758 if ((*I)->Status == pkgAcquire::Item::StatDone)
759 CurrentItems++;
760
a6568219
AL
761 // Totally ignore local items
762 if ((*I)->Local == true)
763 continue;
b2e465d6 764
b98f2859
AL
765 TotalBytes += (*I)->FileSize;
766 if ((*I)->Complete == true)
767 CurrentBytes += (*I)->FileSize;
768 if ((*I)->FileSize == 0 && (*I)->Complete == false)
769 Unknown++;
770 }
771
772 // Compute the current completion
aa0e1101 773 unsigned long ResumeSize = 0;
b98f2859
AL
774 for (pkgAcquire::Worker *I = Owner->WorkersBegin(); I != 0;
775 I = Owner->WorkerStep(I))
776 if (I->CurrentItem != 0 && I->CurrentItem->Owner->Complete == false)
aa0e1101
AL
777 {
778 CurrentBytes += I->CurrentSize;
779 ResumeSize += I->ResumePoint;
780
781 // Files with unknown size always have 100% completion
782 if (I->CurrentItem->Owner->FileSize == 0 &&
783 I->CurrentItem->Owner->Complete == false)
784 TotalBytes += I->CurrentSize;
785 }
786
b98f2859
AL
787 // Normalize the figures and account for unknown size downloads
788 if (TotalBytes <= 0)
789 TotalBytes = 1;
790 if (Unknown == Count)
791 TotalBytes = Unknown;
18ef0a78
AL
792
793 // Wha?! Is not supposed to happen.
794 if (CurrentBytes > TotalBytes)
795 CurrentBytes = TotalBytes;
b98f2859
AL
796
797 // Compute the CPS
798 struct timeval NewTime;
799 gettimeofday(&NewTime,0);
2ec1674d 800 if ((NewTime.tv_sec - Time.tv_sec == 6 && NewTime.tv_usec > Time.tv_usec) ||
b98f2859
AL
801 NewTime.tv_sec - Time.tv_sec > 6)
802 {
f17ac097
AL
803 double Delta = NewTime.tv_sec - Time.tv_sec +
804 (NewTime.tv_usec - Time.tv_usec)/1000000.0;
b98f2859 805
b98f2859 806 // Compute the CPS value
f17ac097 807 if (Delta < 0.01)
e331f6ed
AL
808 CurrentCPS = 0;
809 else
aa0e1101
AL
810 CurrentCPS = ((CurrentBytes - ResumeSize) - LastBytes)/Delta;
811 LastBytes = CurrentBytes - ResumeSize;
6d5dd02a 812 ElapsedTime = (unsigned long)Delta;
b98f2859
AL
813 Time = NewTime;
814 }
024d1123 815
75ef8f14
MV
816 int fd = _config->FindI("APT::Status-Fd",-1);
817 if(fd > 0)
818 {
819 ostringstream status;
820
821 char msg[200];
822 long i = CurrentItems < TotalItems ? CurrentItems + 1 : CurrentItems;
823 unsigned long ETA =
824 (unsigned long)((TotalBytes - CurrentBytes) / CurrentCPS);
825
1e8b4c0f
MV
826 // only show the ETA if it makes sense
827 if (ETA > 0 && ETA < 172800 /* two days */ )
0c508b03 828 snprintf(msg,sizeof(msg), _("Retrieving file %li of %li (%s remaining)"), i, TotalItems, TimeToStr(ETA).c_str());
1e8b4c0f 829 else
0c508b03 830 snprintf(msg,sizeof(msg), _("Retrieving file %li of %li"), i, TotalItems);
1e8b4c0f
MV
831
832
75ef8f14
MV
833
834 // build the status str
835 status << "dlstatus:" << i
836 << ":" << (CurrentBytes/float(TotalBytes)*100.0)
837 << ":" << msg
838 << endl;
839 write(fd, status.str().c_str(), status.str().size());
840 }
841
024d1123 842 return true;
b98f2859
AL
843}
844 /*}}}*/
845// AcquireStatus::Start - Called when the download is started /*{{{*/
846// ---------------------------------------------------------------------
847/* We just reset the counters */
848void pkgAcquireStatus::Start()
849{
850 gettimeofday(&Time,0);
851 gettimeofday(&StartTime,0);
852 LastBytes = 0;
853 CurrentCPS = 0;
854 CurrentBytes = 0;
855 TotalBytes = 0;
856 FetchedBytes = 0;
857 ElapsedTime = 0;
d568ed2d
AL
858 TotalItems = 0;
859 CurrentItems = 0;
b98f2859
AL
860}
861 /*}}}*/
a6568219 862// AcquireStatus::Stop - Finished downloading /*{{{*/
b98f2859
AL
863// ---------------------------------------------------------------------
864/* This accurately computes the elapsed time and the total overall CPS. */
865void pkgAcquireStatus::Stop()
866{
867 // Compute the CPS and elapsed time
868 struct timeval NewTime;
869 gettimeofday(&NewTime,0);
870
31a0531d
AL
871 double Delta = NewTime.tv_sec - StartTime.tv_sec +
872 (NewTime.tv_usec - StartTime.tv_usec)/1000000.0;
b98f2859 873
b98f2859 874 // Compute the CPS value
31a0531d 875 if (Delta < 0.01)
e331f6ed
AL
876 CurrentCPS = 0;
877 else
31a0531d 878 CurrentCPS = FetchedBytes/Delta;
b98f2859 879 LastBytes = CurrentBytes;
31a0531d 880 ElapsedTime = (unsigned int)Delta;
b98f2859
AL
881}
882 /*}}}*/
883// AcquireStatus::Fetched - Called when a byte set has been fetched /*{{{*/
884// ---------------------------------------------------------------------
885/* This is used to get accurate final transfer rate reporting. */
886void pkgAcquireStatus::Fetched(unsigned long Size,unsigned long Resume)
93274b8d 887{
b98f2859
AL
888 FetchedBytes += Size - Resume;
889}
890 /*}}}*/