// -*- mode: cpp; mode: fold -*-
// Description /*{{{*/
-// $Id: acquire-item.cc,v 1.2 1998/10/20 02:39:12 jgg Exp $
+// $Id: acquire-item.cc,v 1.3 1998/10/22 04:56:38 jgg Exp $
/* ######################################################################
Acquire Item - Item to acquire
#include <apt-pkg/acquire-item.h>
#include <apt-pkg/configuration.h>
#include <strutl.h>
+
+#include <sys/stat.h>
+#include <unistd.h>
/*}}}*/
// Acquire::Item::Item - Constructor /*{{{*/
pkgAcqIndex::pkgAcqIndex(pkgAcquire *Owner,const pkgSourceList::Item *Location) :
Item(Owner), Location(Location)
{
- QueueURI(Location->PackagesURI() + ".gz");
- Description = Location->PackagesInfo();
+ DestFile = _config->FindDir("Dir::State::lists") + "partial/";
+ DestFile += URItoFileName(Location->PackagesURI());
+
+ QueueURI(Location->PackagesURI() + ".gz",Location->PackagesInfo());
+ // Create the Release fetch class
new pkgAcqIndexRel(Owner,Location);
}
/*}}}*/
-// pkgAcqIndex::ToFile - File to write the download to /*{{{*/
+// AcqIndex::Custom600Headers - Insert custom request headers /*{{{*/
// ---------------------------------------------------------------------
-/* */
-string pkgAcqIndex::ToFile()
+/* The only header we use is the last-modified header. */
+string pkgAcqIndex::Custom600Headers()
{
- string PartialDir = _config->FindFile("Dir::State::lists") + "/partial/";
+ string Final = _config->FindDir("Dir::State::lists");
+ Final += URItoFileName(Location->PackagesURI());
+
+ struct stat Buf;
+ if (stat(Final.c_str(),&Buf) != 0)
+ return string();
- return PartialDir + URItoFileName(Location->PackagesURI());
+ return "\nLast-Modified: " + TimeRFC1123(Buf.st_mtime);
}
/*}}}*/
-
// AcqIndexRel::pkgAcqIndexRel - Constructor /*{{{*/
// ---------------------------------------------------------------------
/* The Release file is added to the queue */
const pkgSourceList::Item *Location) :
Item(Owner), Location(Location)
{
- QueueURI(Location->ReleaseURI());
- Description = Location->ReleaseInfo();
+ DestFile = _config->FindDir("Dir::State::lists") + "partial/";
+ DestFile += URItoFileName(Location->ReleaseURI());
+
+ QueueURI(Location->ReleaseURI(),Location->ReleaseInfo());
}
/*}}}*/
-// AcqIndexRel::ToFile - File to write the download to /*{{{*/
+// AcqIndexRel::Custom600Headers - Insert custom request headers /*{{{*/
// ---------------------------------------------------------------------
-/* */
-string pkgAcqIndexRel::ToFile()
+/* The only header we use is the last-modified header. */
+string pkgAcqIndexRel::Custom600Headers()
{
- string PartialDir = _config->FindFile("Dir::State::lists") + "/partial/";
+ string Final = _config->FindDir("Dir::State::lists");
+ Final += URItoFileName(Location->ReleaseURI());
+
+ struct stat Buf;
+ if (stat(Final.c_str(),&Buf) != 0)
+ return string();
- return PartialDir + URItoFileName(Location->ReleaseURI());
+ return "\nLast-Modified: " + TimeRFC1123(Buf.st_mtime);
}
/*}}}*/
// -*- mode: cpp; mode: fold -*-
// Description /*{{{*/
-// $Id: acquire-item.h,v 1.1 1998/10/15 06:59:59 jgg Exp $
+// $Id: acquire-item.h,v 1.2 1998/10/22 04:56:39 jgg Exp $
/* ######################################################################
Acquire Item - Item to acquire
protected:
pkgAcquire *Owner;
- inline void QueueURI(string URI) {Owner->Enqueue(this,URI);};
+ inline void QueueURI(string URI,string Description)
+ {Owner->Enqueue(this,URI,Description);};
public:
+ // Number of queues we are inserted into
unsigned int QueueCounter;
- string Description;
- virtual string ToFile() = 0;
- virtual void Failed() {};
+ // File to write the fetch into
+ string DestFile;
+ virtual void Failed() {};
+ virtual string Custom600Headers() {return string();};
+
Item(pkgAcquire *Owner);
virtual ~Item();
};
public:
- virtual string ToFile();
+ virtual string Custom600Headers();
pkgAcqIndex(pkgAcquire *Owner,const pkgSourceList::Item *Location);
};
public:
- virtual string ToFile();
-
+ virtual string Custom600Headers();
+
pkgAcqIndexRel(pkgAcquire *Owner,const pkgSourceList::Item *Location);
};
-
#endif
// -*- mode: cpp; mode: fold -*-
// Description /*{{{*/
-// $Id: acquire-worker.cc,v 1.3 1998/10/20 04:33:12 jgg Exp $
+// $Id: acquire-worker.cc,v 1.4 1998/10/22 04:56:40 jgg Exp $
/* ######################################################################
Acquire Worker
#pragma implementation "apt-pkg/acquire-worker.h"
#endif
#include <apt-pkg/acquire-worker.h>
+#include <apt-pkg/acquire-item.h>
#include <apt-pkg/configuration.h>
#include <apt-pkg/error.h>
#include <apt-pkg/fileutl.h>
// Worker::Worker - Constructor for Queue startup /*{{{*/
// ---------------------------------------------------------------------
/* */
-pkgAcquire::Worker::Worker(Queue *Q,string Acc)
+pkgAcquire::Worker::Worker(Queue *Q,MethodConfig *Cnf)
{
OwnerQ = Q;
- Config = 0;
- Access = Acc;
+ Config = Cnf;
+ Access = Cnf->Access;
+ CurrentItem = 0;
Construct();
}
OwnerQ = 0;
Config = Cnf;
Access = Cnf->Access;
-
+ CurrentItem = 0;
+
Construct();
}
/*}}}*/
/* */
void pkgAcquire::Worker::Construct()
{
- Next = 0;
+ NextQueue = 0;
+ NextAcquire = 0;
Process = -1;
InFd = -1;
OutFd = -1;
+ OutReady = false;
+ InReady = false;
Debug = _config->FindB("Debug::pkgAcquire::Worker",false);
}
/*}}}*/
close(OutFd);
if (Process > 0)
+ {
kill(Process,SIGINT);
+ if (waitpid(Process,0,0) != Process)
+ _error->Warning("I waited but nothing was there!");
+ }
}
/*}}}*/
// Worker::Start - Start the worker process /*{{{*/
SetNonBlock(Pipes[3],true);
close(Pipes[1]);
close(Pipes[2]);
+ OutReady = false;
+ InReady = true;
// Read the configuration data
if (WaitFd(InFd) == false ||
return _error->Error("Method %s did not start correctly",Method.c_str());
RunMessages();
+ SendConfiguration();
return true;
}
/*}}}*/
// Worker::ReadMessages - Read all pending messages into the list /*{{{*/
// ---------------------------------------------------------------------
-/* This pulls full messages from the input FD into the message buffer.
- It assumes that messages will not pause during transit so no
- fancy buffering is used. */
+/* */
bool pkgAcquire::Worker::ReadMessages()
{
- char Buffer[4000];
- char *End = Buffer;
-
- while (1)
- {
- int Res = read(InFd,End,sizeof(Buffer) - (End-Buffer));
-
- // Process is dead, this is kind of bad..
- if (Res == 0)
- {
- if (waitpid(Process,0,0) != Process)
- _error->Warning("I waited but nothing was there!");
- Process = -1;
- close(InFd);
- close(OutFd);
- InFd = -1;
- OutFd = -1;
- return false;
- }
-
- // No data
- if (Res == -1)
- return true;
-
- End += Res;
-
- // Look for the end of the message
- for (char *I = Buffer; I < End; I++)
- {
- if (I[0] != '\n' || I[1] != '\n')
- continue;
-
- // Pull the message out
- string Message(Buffer,0,I-Buffer);
-
- // Fix up the buffer
- for (; I < End && *I == '\n'; I++);
- End -= I-Buffer;
- memmove(Buffer,I,End-Buffer);
- I = Buffer;
-
- if (Debug == true)
- clog << "Message " << Access << ':' << QuoteString(Message,"\n") << endl;
-
- MessageQueue.push_back(Message);
- }
- if (End == Buffer)
- return true;
-
- if (WaitFd(InFd) == false)
- return false;
- }
-
+ if (::ReadMessages(InFd,MessageQueue) == false)
+ return MethodFailure();
return true;
}
/*}}}*/
{
string Message = MessageQueue.front();
MessageQueue.erase(MessageQueue.begin());
+
+ if (Debug == true)
+ clog << " <- " << Access << ':' << QuoteString(Message,"\n") << endl;
// Fetch the message number
char *End;
// Determine the message number and dispatch
switch (Number)
{
+ // 100 Capabilities
case 100:
if (Capabilities(Message) == false)
return _error->Error("Unable to process Capabilities message from %s",Access.c_str());
break;
+
+ // 101 Log
+ case 101:
+ if (Debug == true)
+ clog << " <- (log) " << LookupTag(Message,"Message") << endl;
+ break;
+
+ // 102 Status
+ case 102:
+ Status = LookupTag(Message,"Message");
+ break;
+
+ // 200 URI Start
+ case 200:
+ break;
+
+ // 201 URI Done
+ case 201:
+ break;
+
+ // 400 URI Failure
+ case 400:
+ break;
+
+ // 401 General Failure
+ case 401:
+ _error->Error("Method %s General failure: %s",LookupTag(Message,"Message").c_str());
+ break;
}
}
return true;
Config->Version = LookupTag(Message,"Version");
Config->SingleInstance = StringToBool(LookupTag(Message,"Single-Instance"),false);
Config->PreScan = StringToBool(LookupTag(Message,"Pre-Scan"),false);
+ Config->Pipeline = StringToBool(LookupTag(Message,"Pipeline"),false);
+ Config->SendConfig = StringToBool(LookupTag(Message,"Send-Config"),false);
// Some debug text
if (Debug == true)
{
clog << "Configured access method " << Config->Access << endl;
- clog << "Version: " << Config->Version << " SingleInstance: " <<
- Config->SingleInstance << " PreScan: " << Config->PreScan << endl;
+ clog << "Version:" << Config->Version << " SingleInstance:" <<
+ Config->SingleInstance << " PreScan: " << Config->PreScan <<
+ " Pipeline:" << Config->Pipeline << " SendConfig:" <<
+ Config->SendConfig << endl;
}
return true;
}
/*}}}*/
+// Worker::SendConfiguration - Send the config to the method /*{{{*/
+// ---------------------------------------------------------------------
+/* */
+bool pkgAcquire::Worker::SendConfiguration()
+{
+ if (Config->SendConfig == false)
+ return true;
+
+ if (OutFd == -1)
+ return false;
+
+ string Message = "601 Configuration\n";
+ Message.reserve(2000);
+
+ /* Write out all of the configuration directives by walking the
+ configuration tree */
+ const Configuration::Item *Top = _config->Tree(0);
+ for (; Top != 0;)
+ {
+ if (Top->Value.empty() == false)
+ {
+ string Line = "Config-Item: " + Top->FullTag() + "=";
+ Line += QuoteString(Top->Value,"\n") + '\n';
+ Message += Line;
+ }
+
+ if (Top->Child != 0)
+ {
+ Top = Top->Child;
+ continue;
+ }
+
+ while (Top != 0 && Top->Next == 0)
+ Top = Top->Parent;
+ if (Top != 0)
+ Top = Top->Next;
+ }
+ Message += '\n';
+
+ if (Debug == true)
+ clog << " -> " << Access << ':' << QuoteString(Message,"\n") << endl;
+ OutQueue += Message;
+ OutReady = true;
+
+ return true;
+}
+ /*}}}*/
+// Worker::QueueItem - Add an item to the outbound queue /*{{{*/
+// ---------------------------------------------------------------------
+/* Send a URI Acquire message to the method */
+bool pkgAcquire::Worker::QueueItem(pkgAcquire::Queue::QItem *Item)
+{
+ if (OutFd == -1)
+ return false;
+
+ string Message = "600 URI Acquire\n";
+ Message.reserve(300);
+ Message += "URI: " + Item->URI;
+ Message += "\nFilename: " + Item->Owner->DestFile;
+ Message += Item->Owner->Custom600Headers();
+ Message += "\n\n";
+
+ if (Debug == true)
+ clog << " -> " << Access << ':' << QuoteString(Message,"\n") << endl;
+ OutQueue += Message;
+ OutReady = true;
+
+ return true;
+}
+ /*}}}*/
+// Worker::OutFdRead - Out bound FD is ready /*{{{*/
+// ---------------------------------------------------------------------
+/* */
+bool pkgAcquire::Worker::OutFdReady()
+{
+ int Res = write(OutFd,OutQueue.begin(),OutQueue.length());
+ if (Res <= 0)
+ return MethodFailure();
+
+ // Hmm.. this should never happen.
+ if (Res < 0)
+ return true;
+
+ OutQueue.erase(0,Res);
+ if (OutQueue.empty() == true)
+ OutReady = false;
+
+ return true;
+}
+ /*}}}*/
+// Worker::InFdRead - In bound FD is ready /*{{{*/
+// ---------------------------------------------------------------------
+/* */
+bool pkgAcquire::Worker::InFdReady()
+{
+ if (ReadMessages() == false)
+ return false;
+ RunMessages();
+ return true;
+}
+ /*}}}*/
+// Worker::MethodFailure - Called when the method fails /*{{{*/
+// ---------------------------------------------------------------------
+/* This is called when the method is belived to have failed, probably because
+ read returned -1. */
+bool pkgAcquire::Worker::MethodFailure()
+{
+ cerr << "Method " << Access << " has died unexpectedly!" << endl;
+ if (waitpid(Process,0,0) != Process)
+ _error->Warning("I waited but nothing was there!");
+ Process = -1;
+ close(InFd);
+ close(OutFd);
+ InFd = -1;
+ OutFd = -1;
+ OutReady = false;
+ InReady = false;
+ OutQueue = string();
+ MessageQueue.erase(MessageQueue.begin(),MessageQueue.end());
+
+ return false;
+}
+ /*}}}*/
// -*- mode: cpp; mode: fold -*-
// Description /*{{{*/
-// $Id: acquire-worker.h,v 1.2 1998/10/20 02:39:14 jgg Exp $
+// $Id: acquire-worker.h,v 1.3 1998/10/22 04:56:42 jgg Exp $
/* ######################################################################
Acquire Worker - Worker process manager
// Interfacing to the method process
class pkgAcquire::Worker
{
+ friend pkgAcquire;
+
protected:
friend Queue;
- Worker *Next;
+ /* Linked list starting at a Queue and a linked list starting
+ at Acquire */
+ Worker *NextQueue;
+ Worker *NextAcquire;
// The access association
Queue *OwnerQ;
pid_t Process;
int InFd;
int OutFd;
+ bool InReady;
+ bool OutReady;
// Various internal things
bool Debug;
vector<string> MessageQueue;
-
+ string OutQueue;
+
// Private constructor helper
void Construct();
// Message handling things
bool ReadMessages();
bool RunMessages();
+ bool InFdReady();
+ bool OutFdReady();
// The message handlers
bool Capabilities(string Message);
+ bool SendConfiguration();
+
+ bool MethodFailure();
public:
+ pkgAcquire::Queue::QItem *CurrentItem;
+
+ string Status;
+
// Load the method and do the startup
+ bool QueueItem(pkgAcquire::Queue::QItem *Item);
bool Start();
- Worker(Queue *OwnerQ,string Access);
+ Worker(Queue *OwnerQ,MethodConfig *Config);
Worker(MethodConfig *Config);
~Worker();
};
// -*- mode: cpp; mode: fold -*-
// Description /*{{{*/
-// $Id: acquire.cc,v 1.2 1998/10/20 02:39:15 jgg Exp $
+// $Id: acquire.cc,v 1.3 1998/10/22 04:56:43 jgg Exp $
/* ######################################################################
Acquire - File Acquiration
+ The core element for the schedual system is the concept of a named
+ queue. Each queue is unique and each queue has a name derived from the
+ URI. The degree of paralization can be controled by how the queue
+ name is derived from the URI.
+
##################################################################### */
/*}}}*/
// Include Files /*{{{*/
#include <apt-pkg/acquire.h>
#include <apt-pkg/acquire-item.h>
#include <apt-pkg/acquire-worker.h>
+#include <apt-pkg/configuration.h>
+#include <apt-pkg/error.h>
#include <strutl.h>
/*}}}*/
{
Queues = 0;
Configs = 0;
+ Workers = 0;
+ ToFetch = 0;
+
+ string Mode = _config->Find("Acquire::Queue-Mode","host");
+ if (strcasecmp(Mode.c_str(),"host") == 0)
+ QueueMode = QueueHost;
+ if (strcasecmp(Mode.c_str(),"access") == 0)
+ QueueMode = QueueAccess;
+
+ Debug = _config->FindB("Debug::pkgAcquire",false);
}
/*}}}*/
// Acquire::~pkgAcquire - Destructor /*{{{*/
Configs = Configs->Next;
delete Jnk;
}
+
+ while (Queues != 0)
+ {
+ Queue *Jnk = Queues;
+ Queues = Queues->Next;
+ delete Jnk;
+ }
}
/*}}}*/
// Acquire::Add - Add a new item /*{{{*/
}
}
/*}}}*/
+// Acquire::Add - Add a worker /*{{{*/
+// ---------------------------------------------------------------------
+/* */
+void pkgAcquire::Add(Worker *Work)
+{
+ Work->NextAcquire = Workers;
+ Workers = Work;
+}
+ /*}}}*/
+// Acquire::Remove - Remove a worker /*{{{*/
+// ---------------------------------------------------------------------
+/* */
+void pkgAcquire::Remove(Worker *Work)
+{
+ Worker **I = &Workers;
+ for (; *I != 0;)
+ {
+ if (*I == Work)
+ *I = (*I)->NextAcquire;
+ else
+ I = &(*I)->NextAcquire;
+ }
+}
+ /*}}}*/
// Acquire::Enqueue - Queue an URI for fetching /*{{{*/
// ---------------------------------------------------------------------
/* */
-void pkgAcquire::Enqueue(Item *Item,string URI)
+void pkgAcquire::Enqueue(Item *Itm,string URI,string Description)
{
- cout << "Fetching " << URI << endl;
- cout << " to " << Item->ToFile() << endl;
- cout << " Queue is: " << QueueName(URI) << endl;
+ // Determine which queue to put the item in
+ string Name = QueueName(URI);
+ if (Name.empty() == true)
+ return;
+
+ // Find the queue structure
+ Queue *I = Queues;
+ for (; I != 0 && I->Name != Name; I = I->Next);
+ if (I == 0)
+ {
+ I = new Queue(Name,this);
+ I->Next = Queues;
+ Queues = I;
+ }
+
+ // Queue it into the named queue
+ I->Enqueue(Itm,URI,Description);
+ ToFetch++;
+
+ // Some trace stuff
+ if (Debug == true)
+ {
+ clog << "Fetching " << URI << endl;
+ clog << " to " << Itm->DestFile << endl;
+ clog << " Queue is: " << QueueName(URI) << endl;
+ }
}
/*}}}*/
-// Acquire::QueueName - Return the name of the queue for this URI /*{{{*/
+// Acquire::Dequeue - Remove an item from all queues /*{{{*/
// ---------------------------------------------------------------------
/* */
+void pkgAcquire::Dequeue(Item *Itm)
+{
+ Queue *I = Queues;
+ for (; I != 0; I = I->Next)
+ I->Dequeue(Itm);
+ ToFetch--;
+}
+ /*}}}*/
+// Acquire::QueueName - Return the name of the queue for this URI /*{{{*/
+// ---------------------------------------------------------------------
+/* The string returned depends on the configuration settings and the
+ method parameters. Given something like http://foo.org/bar it can
+ return http://foo.org or http */
string pkgAcquire::QueueName(string URI)
{
const MethodConfig *Config = GetConfig(URIAccess(URI));
- return string();
+ if (Config == 0)
+ return string();
+
+ /* Single-Instance methods get exactly one queue per URI. This is
+ also used for the Access queue method */
+ if (Config->SingleInstance == true || QueueMode == QueueAccess)
+ return URIAccess(URI);
+
+ // Host based queue
+ string::iterator I = URI.begin();
+ for (; I < URI.end() && *I != ':'; I++);
+ for (; I < URI.end() && (*I == '/' || *I == ':'); I++);
+ for (; I < URI.end() && *I != '/'; I++);
+
+ return string(URI,0,I - URI.begin());
}
/*}}}*/
// Acquire::GetConfig - Fetch the configuration information /*{{{*/
/* This locates the configuration structure for an access method. If
a config structure cannot be found a Worker will be created to
retrieve it */
-const pkgAcquire::MethodConfig *pkgAcquire::GetConfig(string Access)
+pkgAcquire::MethodConfig *pkgAcquire::GetConfig(string Access)
{
// Search for an existing config
MethodConfig *Conf;
return Conf;
}
/*}}}*/
+// Acquire::SetFds - Deal with readable FDs /*{{{*/
+// ---------------------------------------------------------------------
+/* Collect FDs that have activity monitors into the fd sets */
+void pkgAcquire::SetFds(int &Fd,fd_set *RSet,fd_set *WSet)
+{
+ for (Worker *I = Workers; I != 0; I = I->NextAcquire)
+ {
+ if (I->InReady == true && I->InFd >= 0)
+ {
+ if (Fd < I->InFd)
+ Fd = I->InFd;
+ FD_SET(I->InFd,RSet);
+ }
+ if (I->OutReady == true && I->OutFd >= 0)
+ {
+ if (Fd < I->OutFd)
+ Fd = I->OutFd;
+ FD_SET(I->OutFd,WSet);
+ }
+ }
+}
+ /*}}}*/
+// Acquire::RunFds - Deal with active FDs /*{{{*/
+// ---------------------------------------------------------------------
+/* Dispatch active FDs over to the proper workers */
+void pkgAcquire::RunFds(fd_set *RSet,fd_set *WSet)
+{
+ for (Worker *I = Workers; I != 0; I = I->NextAcquire)
+ {
+ if (I->InFd >= 0 && FD_ISSET(I->InFd,RSet) != 0)
+ I->InFdReady();
+ if (I->OutFd >= 0 && FD_ISSET(I->OutFd,WSet) != 0)
+ I->OutFdReady();
+ }
+}
+ /*}}}*/
+// Acquire::Run - Run the fetch sequence /*{{{*/
+// ---------------------------------------------------------------------
+/* This runs the queues. It manages a select loop for all of the
+ Worker tasks. The workers interact with the queues and items to
+ manage the actual fetch. */
+bool pkgAcquire::Run()
+{
+ for (Queue *I = Queues; I != 0; I = I->Next)
+ I->Startup();
+
+ // Run till all things have been acquired
+ while (ToFetch > 0)
+ {
+ fd_set RFds;
+ fd_set WFds;
+ int Highest = 0;
+ FD_ZERO(&RFds);
+ FD_ZERO(&WFds);
+ SetFds(Highest,&RFds,&WFds);
+
+ if (select(Highest+1,&RFds,&WFds,0,0) <= 0)
+ return _error->Errno("select","Select has failed");
+
+ RunFds(&RFds,&WFds);
+ }
+
+ for (Queue *I = Queues; I != 0; I = I->Next)
+ I->Shutdown();
+
+ return true;
+}
+ /*}}}*/
// Acquire::MethodConfig::MethodConfig - Constructor /*{{{*/
// ---------------------------------------------------------------------
{
SingleInstance = false;
PreScan = false;
+ Pipeline = false;
+ SendConfig = false;
+ Next = 0;
+}
+ /*}}}*/
+
+// Queue::Queue - Constructor /*{{{*/
+// ---------------------------------------------------------------------
+/* */
+pkgAcquire::Queue::Queue(string Name,pkgAcquire *Owner) : Name(Name),
+ Owner(Owner)
+{
+ Items = 0;
+ Next = 0;
+ Workers = 0;
+}
+ /*}}}*/
+// Queue::~Queue - Destructor /*{{{*/
+// ---------------------------------------------------------------------
+/* */
+pkgAcquire::Queue::~Queue()
+{
+ Shutdown();
+
+ while (Items != 0)
+ {
+ QItem *Jnk = Items;
+ Items = Items->Next;
+ delete Jnk;
+ }
+}
+ /*}}}*/
+// Queue::Enqueue - Queue an item to the queue /*{{{*/
+// ---------------------------------------------------------------------
+/* */
+void pkgAcquire::Queue::Enqueue(Item *Owner,string URI,string Description)
+{
+ // Create a new item
+ QItem *I = new QItem;
+ I->Next = Items;
+ Items = I;
+
+ // Fill it in
+ Items->Owner = Owner;
+ Items->URI = URI;
+ Items->Description = Description;
+ Owner->QueueCounter++;
+}
+ /*}}}*/
+// Queue::Dequeue - Remove and item from the queue /*{{{*/
+// ---------------------------------------------------------------------
+/* */
+void pkgAcquire::Queue::Dequeue(Item *Owner)
+{
+ QItem **I = &Items;
+ for (; *I != 0;)
+ {
+ if ((*I)->Owner == Owner)
+ {
+ QItem *Jnk= *I;
+ *I = (*I)->Next;
+ Owner->QueueCounter--;
+ delete Jnk;
+ }
+ else
+ I = &(*I)->Next;
+ }
+}
+ /*}}}*/
+// Queue::Startup - Start the worker processes /*{{{*/
+// ---------------------------------------------------------------------
+/* */
+bool pkgAcquire::Queue::Startup()
+{
+ Shutdown();
+
+ pkgAcquire::MethodConfig *Cnf = Owner->GetConfig(URIAccess(Name));
+ if (Cnf == 0)
+ return false;
+
+ Workers = new Worker(this,Cnf);
+ Owner->Add(Workers);
+ if (Workers->Start() == false)
+ return false;
+
+ Workers->QueueItem(Items);
+
+ return true;
+}
+ /*}}}*/
+// Queue::Shutdown - Shutdown the worker processes /*{{{*/
+// ---------------------------------------------------------------------
+/* */
+bool pkgAcquire::Queue::Shutdown()
+{
+ // Delete all of the workers
+ while (Workers != 0)
+ {
+ pkgAcquire::Worker *Jnk = Workers;
+ Workers = Workers->NextQueue;
+ Owner->Remove(Jnk);
+ delete Jnk;
+ }
+
+ return true;
}
/*}}}*/
// -*- mode: cpp; mode: fold -*-
// Description /*{{{*/
-// $Id: acquire.h,v 1.2 1998/10/20 02:39:16 jgg Exp $
+// $Id: acquire.h,v 1.3 1998/10/22 04:56:44 jgg Exp $
/* ######################################################################
Acquire - File Acquiration
#pragma interface "apt-pkg/acquire.h"
#endif
+#include <unistd.h>
+
class pkgAcquire
{
public:
class Worker;
struct MethodConfig;
friend Item;
+ friend Queue;
protected:
+ // List of items to fetch
vector<Item *> Items;
+
+ // List of active queues and fetched method configuration parameters
Queue *Queues;
+ Worker *Workers;
MethodConfig *Configs;
+ unsigned long ToFetch;
+
+ // Configurable parameters for the schedular
+ enum {QueueHost,QueueAccess} QueueMode;
+ bool Debug;
void Add(Item *Item);
void Remove(Item *Item);
- void Enqueue(Item *Item,string URI);
+ void Add(Worker *Work);
+ void Remove(Worker *Work);
+
+ void Enqueue(Item *Item,string URI,string Description);
+ void Dequeue(Item *Item);
+ string QueueName(string URI);
+
+ // FDSET managers for derived classes
+ void SetFds(int &Fd,fd_set *RSet,fd_set *WSet);
+ void RunFds(fd_set *RSet,fd_set *WSet);
public:
- const MethodConfig *GetConfig(string Access);
- string QueueName(string URI);
+ MethodConfig *GetConfig(string Access);
+ bool Run();
pkgAcquire();
~pkgAcquire();
Queue *Next;
protected:
-
- string URIMatch;
- vector<Item *> Items;
+ // Queued item
+ struct QItem
+ {
+ QItem *Next;
+
+ string URI;
+ string Description;
+ Item *Owner;
+ };
+
+ // Name of the queue
+ string Name;
+
+ // Items queued into this queue
+ QItem *Items;
+ pkgAcquire::Worker *Workers;
+ pkgAcquire *Owner;
public:
+
+ // Put an item into this queue
+ void Enqueue(Item *Owner,string URI,string Description);
+ void Dequeue(Item *Owner);
+
+ bool Startup();
+ bool Shutdown();
+
+ Queue(string Name,pkgAcquire *Owner);
+ ~Queue();
};
// Configuration information from each method
string Version;
bool SingleInstance;
bool PreScan;
+ bool Pipeline;
+ bool SendConfig;
MethodConfig();
};
// -*- mode: cpp; mode: fold -*-
// Description /*{{{*/
-// $Id: configuration.cc,v 1.7 1998/10/20 02:39:26 jgg Exp $
+// $Id: configuration.cc,v 1.8 1998/10/22 04:56:45 jgg Exp $
/* ######################################################################
Configuration Class
Root = new Item;
}
/*}}}*/
-// Configuration::Lookup - Lookup a single item /*{{{*/
+// Configuration::Lookup - Lookup a single item /*{{{*/
// ---------------------------------------------------------------------
/* This will lookup a single item by name below another item. It is a
helper function for the main lookup function */
new items */
Configuration::Item *Configuration::Lookup(const char *Name,bool Create)
{
+ if (Name == 0)
+ return Root->Child;
+
const char *Start = Name;
const char *End = Start + strlen(Name);
const char *TagEnd = Name;
}
/*}}}*/
+// Configuration::Item::FullTag - Return the fully scoped tag /*{{{*/
+// ---------------------------------------------------------------------
+/* */
+string Configuration::Item::FullTag() const
+{
+ if (Parent == 0 || Parent->Parent == 0)
+ return Tag;
+ return Parent->FullTag() + "::" + Tag;
+}
+ /*}}}*/
+
// ReadConfigFile - Read a configuration file /*{{{*/
// ---------------------------------------------------------------------
/* The configuration format is very much like the named.conf format
// -*- mode: cpp; mode: fold -*-
// Description /*{{{*/
-// $Id: configuration.h,v 1.5 1998/10/20 02:39:27 jgg Exp $
+// $Id: configuration.h,v 1.6 1998/10/22 04:56:46 jgg Exp $
/* ######################################################################
Configuration Class
Item *Parent;
Item *Child;
Item *Next;
+
+ string FullTag() const;
+
Item() : Child(0), Next(0) {};
};
Item *Root;
inline bool Exists(string Name) {return Exists(Name.c_str());};
bool Exists(const char *Name);
+ inline const Item *Tree(const char *Name) {return Lookup(Name,false);};
+
Configuration();
};
// -*- mode: cpp; mode: fold -*-
// Description /*{{{*/
-// $Id: fileutl.cc,v 1.10 1998/10/20 04:33:16 jgg Exp $
+// $Id: fileutl.cc,v 1.11 1998/10/22 04:56:47 jgg Exp $
/* ######################################################################
File Utilities
/* */
void SetNonBlock(int Fd,bool Block)
{
- int Flags = fcntl(Fd,F_GETFL);
- if (fcntl(Fd,F_SETFL,(Flags & ~O_NONBLOCK) | (Block == false)?0:O_NONBLOCK) != 0)
+ int Flags = fcntl(Fd,F_GETFL) & (~O_NONBLOCK);
+ if (fcntl(Fd,F_SETFL,Flags | ((Block == false)?0:O_NONBLOCK)) != 0)
{
cerr << "FATAL -> Could not set non-blocking flag " << strerror(errno) << endl;
exit(100);
fd_set Set;
FD_ZERO(&Set);
FD_SET(Fd,&Set);
+
if (select(Fd+1,&Set,0,0,0) <= 0)
return false;
+
return true;
}
/*}}}*/
// -*- mode: cpp; mode: fold -*-
// Description /*{{{*/
-// $Id: strutl.cc,v 1.5 1998/10/20 02:39:30 jgg Exp $
+// $Id: strutl.cc,v 1.6 1998/10/22 04:56:48 jgg Exp $
/* ######################################################################
String Util - Some usefull string functions.
/*}}}*/
// Includes /*{{{*/
#include <strutl.h>
+#include <apt-pkg/fileutl.h>
+
#include <ctype.h>
#include <string.h>
#include <stdio.h>
+#include <time.h>
/*}}}*/
// strstrip - Remove white space from the front and back of a string /*{{{*/
{
string::size_type Pos = URI.find(':');
if (Pos == string::npos)
- return string();
+ return URI;
return string(URI,0,Pos);
}
/*}}}*/
return Default;
}
/*}}}*/
+// TimeRFC1123 - Convert a time_t into RFC1123 format /*{{{*/
+// ---------------------------------------------------------------------
+/* This converts a time_t into a string time representation that is
+ year 2000 complient and timezone neutral */
+string TimeRFC1123(time_t Date)
+{
+ struct tm Conv = *gmtime(&Date);
+ char Buf[300];
+
+ const char *Day[] = {"Sun","Mon","Tue","Wed","Thu","Fri","Sat"};
+ const char *Month[] = {"Jan","Feb","Mar","Apr","May","Jun","Jul",
+ "Aug","Sep","Oct","Nov","Dec"};
+
+ sprintf(Buf,"%s, %02i %s %i %02i:%02i:%02i GMT",Day[Conv.tm_wday],
+ Conv.tm_mday,Month[Conv.tm_mon],Conv.tm_year+1900,Conv.tm_hour,
+ Conv.tm_min,Conv.tm_sec);
+ return Buf;
+}
+ /*}}}*/
+// ReadMessages - Read messages from the FD /*{{{*/
+// ---------------------------------------------------------------------
+/* This pulls full messages from the input FD into the message buffer.
+ It assumes that messages will not pause during transit so no
+ fancy buffering is used. */
+bool ReadMessages(int Fd, vector<string> &List)
+{
+ char Buffer[4000];
+ char *End = Buffer;
+
+ while (1)
+ {
+ int Res = read(Fd,End,sizeof(Buffer) - (End-Buffer));
+
+ // Process is dead, this is kind of bad..
+ if (Res == 0)
+ return false;
+
+ // No data
+ if (Res <= 0)
+ return true;
+
+ End += Res;
+
+ // Look for the end of the message
+ for (char *I = Buffer; I < End; I++)
+ {
+ if (I[0] != '\n' || I[1] != '\n')
+ continue;
+
+ // Pull the message out
+ string Message(Buffer,0,I-Buffer);
+
+ // Fix up the buffer
+ for (; I < End && *I == '\n'; I++);
+ End -= I-Buffer;
+ memmove(Buffer,I,End-Buffer);
+ I = Buffer;
+
+ List.push_back(Message);
+ }
+ if (End == Buffer)
+ return true;
+
+ if (WaitFd(Fd) == false)
+ return false;
+ }
+}
+ /*}}}*/
// -*- mode: cpp; mode: fold -*-
// Description /*{{{*/
-// $Id: strutl.h,v 1.5 1998/10/20 02:39:31 jgg Exp $
+// $Id: strutl.h,v 1.6 1998/10/22 04:56:49 jgg Exp $
/* ######################################################################
String Util - These are some usefull string functions
#include <stdlib.h>
#include <string>
+#include <vector>
char *_strstrip(char *String);
char *_strtabexpand(char *String,size_t Len);
string Base64Encode(string Str);
string URItoFileName(string URI);
string URIAccess(string URI);
+string TimeRFC1123(time_t Date);
+string LookupTag(string Message,const char *Tag,const char *Default = 0);
+int StringToBool(string Text,int Default = -1);
+bool ReadMessages(int Fd, vector<string> &List);
int stringcmp(const char *A,const char *AEnd,const char *B,const char *BEnd);
inline int stringcmp(const char *A,const char *AEnd,const char *B) {return stringcmp(A,AEnd,B,B+strlen(B));};
int stringcasecmp(const char *A,const char *AEnd,const char *B,const char *BEnd);
inline int stringcasecmp(const char *A,const char *AEnd,const char *B) {return stringcasecmp(A,AEnd,B,B+strlen(B));};
-string LookupTag(string Message,const char *Tag,const char *Default = 0);
-int StringToBool(string Text,int Default = -1);
#endif
-// $Id: apt.conf,v 1.3 1998/10/20 02:41:06 jgg Exp $
+// $Id: apt.conf,v 1.4 1998/10/22 04:56:50 jgg Exp $
/* This file is an index of all APT configuration directives. It should
NOT actually be used as a real config file, though it is a completely
valid file.
};
};
+Acquire
+{
+ Queue-Mode "access"; // host|access
+};
+
Dir
{
Debug {
pkgProblemResolver "true";
+ pkgAcquire "false";
pkgAcquire::Worker "true";
}
{
printf("100 Capabilities\n"
"Version: 1.0\n"
- "Pre-Scan: true\n\n"
- "Version: 1.0\n\n");
+ "Pre-Scan: true\n\n");
+ fflush(stdout);
+ sleep(10);
}
#include <apt-pkg/acquire-item.h>
#include <apt-pkg/init.h>
#include <apt-pkg/error.h>
+#include <signal.h>
int main()
{
+ signal(SIGPIPE,SIG_IGN);
+
pkgInitialize(*_config);
pkgSourceList List;
if (_error->PendingError() == true)
break;
}
+
+ Fetcher.Run();
_error->DumpErrors();
}