Logo Search packages:      
Sourcecode: zmailer version File versions  Download package

transport.c

/*
 *    Copyright 1988 by Rayan S. Zachariassen, all rights reserved.
 *    This will be free software, but only when it is finished.
 */
/*
 *    Lots of modifications (new guts, more or less..) by
 *    Matti Aarnio <mea@nic.funet.fi>  (copyright) 1992-2003
 */


#include "hostenv.h"
#include <sfio.h>
#include <sys/param.h>
#include "mail.h"
#ifdef HAVE_FCNTL_H
#include <fcntl.h>
#endif
#include <sys/stat.h>
#include <errno.h>
#include <sys/file.h>
#include "zmsignal.h"
/* #include <stdlib.h> */
#include <unistd.h>

#include "zsyslog.h"
#include <sysexits.h>
#ifdef HAVE_SYS_RESOURCE_H
#include <sys/resource.h>
#endif
#ifdef HAVE_SYS_UN_H
#include <sys/socket.h>
#include <sys/un.h>
#endif

#include "scheduler.h"
#include "prototypes.h"

#include "ta.h"
#include "libz.h"

#include "libc.h"

extern int forkrate_limit;
extern int freeze;
extern int mailqmode;

static int  scheduler_nofiles = -1; /* Will be filled below */
static int  runcommand   __((char * const argv[], char * const env[], struct vertex *, struct web *, struct web*));
static void stashprocess __((int, int, int, struct web*, struct web*, struct vertex *, char * const argv[]));
static void reclaim      __((int, int));
static void waitandclose __((int));
static void readfrom     __((int));

/* These two are *not* exactly proper ones, but at least at most
   systems it links properly without wider exportation of related
   data structure types.  */

extern int syncweb __((void *));
extern void *dirq;


extern FILE *vfp_open __((struct ctlfile *));

#ifdef HAVE_SYS_WAIT_H /* POSIX.1 compatible */
# include <sys/wait.h>
#else /* Not POSIX.1 compatible, lets fake it.. */
extern int wait();
#endif

#ifndef WEXITSTATUS
# define WEXITSTATUS(s) (((s) >> 8) & 0377)
#endif
#ifndef WSIGNALSTATUS
# define WSIGNALSTATUS(s) ((s) & 0177)
#endif

struct procinfo *cpids = NULL;

#define     MAXFILESPERTRANSPORT    1000

int   numkids;
int   readsockcnt;            /* Count how many childs to read there are;
                           this for the SLOW Shutdown */

int   notifysocket = -1;      /* fd of UDP/AF_UNIX socket to listen for notifies */


static void cmdbufalloc __((int, char **, int *));

static void
cmdbufalloc(newlen, bufp, spcp)
     int newlen;
     char **bufp;
     int *spcp;
{
  if (*bufp == NULL) {
    *bufp = emalloc(newlen+1);
    *spcp = newlen;
  }
  if (newlen > *spcp) {
    *bufp = erealloc(*bufp, newlen+1);
    *spcp = newlen;
  }
}

extern int errno;
extern int slow_shutdown;



/* 
 * Flush to child;
 * return -1 for failures and incomplete writes, 0 for success!
 */
static int  flush_child     __((struct procinfo *cpidp));

static int
flush_child(proc)
     struct procinfo *proc;
{

      if (proc->pid < 0 && proc->tofd >= 0) {
        pipes_shutdown_child(proc->tofd);
        proc->tofd = -1;

        if (verbose)
          sfprintf(sfstderr,
                 "%% shutdown of proc=%p pid=%d flush_child() pid<0 && tofd>=0\n",
                 proc, proc->pid);

      }
      if (proc->tofd < 0) {
        proc->cmdlen = 0;
        proc->state = CFSTATE_ERROR;

        if (verbose)
          sfprintf(sfstderr,
                 "%% disconnect of flush_child(proc=%p pid=%d) pid<0 && tofd<0 BAD?? \n",
                 proc, proc->pid);

        return -1;
      }
#if 0
      /* Make sure the buffer exists.. */
      if (proc->cmdbuf == NULL)
        cmdbufalloc(120, &proc->cmdbuf, &proc->cmdspc);
#endif
      if (proc->cmdlen == 0 && proc->tofd >= 0)
        return 0; /* All done! */

      /* We have some leftovers from previous feed..
         .. feed them now.  */

      /* sfprintf(sfstderr,
         "flushing to child pid %d, cmdlen=%d, cmdbuf='%s'\n",
         proc->pid, proc->cmdlen, proc->cmdbuf);  */

      while (proc->tofd >= 0 && proc->cmdlen > 0) {

        int rc = write(proc->tofd, proc->cmdbuf, proc->cmdlen);

        if (rc < 0 && (errno != EAGAIN && errno != EINTR &&
                   errno != EWOULDBLOCK)) {
          int e = errno;
          /* Some real failure :-( */
          pipes_shutdown_child(proc->tofd);
          proc->tofd = -1;
          proc->cmdlen = 0;
          proc->state = CFSTATE_ERROR;

          if (verbose)
            sfprintf(sfstderr,
                   "%% shutdown of proc=%p pid=%d flush_child() write() errno=%d\n",
                   proc, proc->pid, e);

          return -1;
        }
        if (rc > 0) {
          memcpy(proc->cmdbuf, proc->cmdbuf + rc, proc->cmdlen - rc);
          proc->feedtime = now;
        } else
          break; /* Zero or negative.. let the writing
                  proceed later... */
        proc->cmdlen -= rc;
      }
      if (proc->cmdlen) return 1; /* Incomplete write */
      return 0;
}


/*
 * The 'feed_child()' sends whatever is pointed by   proc->pthread->nextfeed;
 * return -1 for failures or "EOT", 1 for incomplete writes, 0 for success!
 */

static int  feed_child      __((struct procinfo *cpidp));

static int
feed_child(proc)
     struct procinfo *proc;
{
      struct vertex *vtx;
      int cmdlen;

      static char *cmdbuf = NULL;
      static int cmdbufspc = 0;

      if (proc->pthread == NULL) {
        proc->state = CFSTATE_ERROR;
        pipes_shutdown_child(proc->tofd);
        proc->tofd = -1;

        if (verbose)
          sfprintf(sfstderr,
                 "%% shutdown of proc=%p pid=%d feed_child() proc->pthread == NULL\n",
                 proc, proc->pid);

        return -1; /* BUG if called without next THREAD.. */
      }
      if (proc->pthread->nextfeed == NULL) {

        if (verbose)
          sfprintf(sfstderr,
                 "%% feed_child(proc=%p pid=%d) proc->pthread->nextfeed == NULL\n",
                 proc, proc->pid);

        return -1; /* Might be called without next thing to process.. */
      }
      if (proc->pid <= 0 || proc->tofd < 0) {
        proc->state = CFSTATE_ERROR;

        if (verbose)
          sfprintf(sfstderr,
                 "%% feed_child(proc=%p pid=%d tofd=%d) pid<0 || tofd<0 BAD??\n",
                 proc, proc->pid, proc->tofd);

        return -1; /* No process../No write channel.. */
      }

      vtx = proc->pthread->nextfeed;

      mytime(&now);
#if 0
      if (vtx->lastfeed + 10 >= now)
        return -1; /* Force at least 10 seconds in between feeds! */
#endif
      vtx->lastfeed = now;

      if (slow_shutdown) {

        cmdlen = 1;
        cmdbufalloc(cmdlen, & cmdbuf, & cmdbufspc);
        strcpy(cmdbuf,"\n");

      } else {

        const char *d = cfpdirname(vtx->cfp->dirind);
        if (proc->thg->withhost) { /* cmd-line was with host */
          cmdlen = 1 + strlen(d) + strlen(vtx->cfp->mid);
          cmdbufalloc(cmdlen, & cmdbuf, & cmdbufspc);
          sprintf(cmdbuf, "%s%s\n", d, vtx->cfp->mid);
        } else {
          cmdlen = 2+strlen(d)+strlen(vtx->cfp->mid)+strlen(proc->ho->name);
          cmdbufalloc(cmdlen, & cmdbuf, & cmdbufspc);
          sprintf(cmdbuf, "%s%s\t%s\n", d, vtx->cfp->mid, proc->ho->name);
        }

      }

      if ((proc->cmdlen + cmdlen) >= proc->cmdspc)
        cmdbufalloc(proc->cmdlen + cmdlen + 1, &proc->cmdbuf, &proc->cmdspc);

      /* Ok, copy it there.. */
      memcpy(proc->cmdbuf + proc->cmdlen, cmdbuf, cmdlen+1);
      proc->cmdlen += cmdlen;

      if (verbose) {
        sfprintf(sfstdout,
               "feed: tofd=%d, chan='%s', proc=%p, vtx=%p thr=%p ",
               proc->tofd, proc->ch->name, proc, vtx, vtx->thread);
      }

      if (vtx->cfp->vfpfn != NULL) {
        Sfio_t *vfp = vfp_open(vtx->cfp);
        int i;
        if (vfp) {
          sfprintf(vfp, "Feeding to child; ce.argv = \"");
          for (i = 0; vtx->thgrp->ce.argv[i] != NULL; ++i) {
            if (i == 0)
            sfprintf(vfp, "'%s'", vtx->thgrp->ce.argv[i]);
            else
            sfprintf(vfp, " '%s'", vtx->thgrp->ce.argv[i]);
          }
          sfprintf(vfp, "\" chan = '%s' cmd: %s", proc->ch->name, cmdbuf);
          sfclose(vfp);
        }
      }

      vtx->ce_pending  = 0; /* and clear the pending..       */
      /* DON'T double-count attempts on vertices!
         This gets counted when some diagnostics is received for
         this vertex. */
      /* vtx->attempts   += 1; */
      
      /* It was fed (to buffer), clear this flag.. */
      proc->overfed        += 1;
      proc->pthread->unfed -= 1;

      if (verbose)
        sfprintf(sfstdout,"len=%d buf=%s", cmdlen, cmdbuf);

      mytime(&proc->feedtime);


      pick_next_vertex(proc);

      return flush_child(proc);
}


char *proc_state_names[] = {
  "ERROR", "LARVA", "STUFF", "FINISH", "IDLE"
};

void
ta_hungry(proc)
     struct procinfo *proc;
{
      /* This is an "actor model" behaviour,
         where actor tells, when it needs a new
         job to be fed to it. */

      struct thread *thr0;
      int i;

      mytime(&proc->hungertime);

      if (verbose)
        sfprintf(sfstdout,"%% ta_hungry(%p) OF=%d S=%s tofd=%d\n",
               proc, proc->overfed, proc_state_names[proc->state],
               proc->tofd);

      /* If ``proc->tofd'' is negative, we can't feed anyway.. */

      if (proc->tofd < 0) {

        --proc->overfed;

        if (proc->overfed == 0 && proc->pthread)
          /* If we have a thread here still, reschedule it... */
          if (! thread_reschedule(proc->pthread,0,-1))
            proc->pthread = NULL;

        if (proc->pthread)
          if (delete_thread(proc->pthread))
            proc->pthread = NULL;

        return;
      }

      /* We have valid child-feed state */

      proc->overfed -= 1;

      switch(proc->state) {
      case CFSTATE_LARVA: /* "1" */

        /* Thread selected already along with its first vertex,
           which is pointer by  proc->pthread->nextfeed  */

        /* However if we have multiple parallel processes
           in single thread, we may end up driving a new
           process straight from LARVA to IDLE... */

        proc->overfed = 0; /* Should not need setting ... */

        if (proc->pthread->nextfeed != NULL &&
            feed_child(proc) < 0)
          /* We got some error :-/  D'uh! */
          goto feed_error_handler;

        proc->state = CFSTATE_STUFFING;
        /* Not YET drained.. */
        if (proc->pthread->nextfeed)
          return;

      case CFSTATE_STUFFING: /* "2" */

        if (proc->overfed > 0) return;

        if (proc->pthread && proc->pthread->nextfeed) {

          while ((proc->state == CFSTATE_STUFFING) &&
               proc->pthread && proc->pthread->nextfeed) {

            /* As long as:
             - we have next vertex to feed
             - there is no command buffer backlog
             - state stays in STUFFING
            */

            i = feed_child(proc);
            if (i < 0)
            goto feed_error_handler; /* Outch! */

            if (proc->tofd >= 0 && proc->cmdlen != 0)
            break; /* Incomplete feed -- stop feeding here */

            if (proc->overfed > proc->thg->ce.overfeed)
            break; /* Or over limit ...       */

          }
          /* As long as we have things to feed (or have fed anything!),
             we stay in the STUFFING state.  This was if we later get
             new workitems,  thread_linkin() can simply put them into
             the 'nextfeed' pointer, and we feed them right away. */
          return;
        }

        /* Didn't have anything to feed :-( */

        proc->state = CFSTATE_FINISHING;
        /* FALL THRU! */

      case CFSTATE_FINISHING: /* "3" */

        /* "retryat" may have kicked us into this state also.. */

        if (proc->overfed > 0) return;

        /* Well, this thread was done, so long!
           This TA may have other things to poke at! */

        thr0 = proc->pthread;

        /* proc->pthread = NULL; */
        /* ^^^  NO KILL YET! pick_next_thread() needs this! */

        /* Disconnect the previous thread from the proc. */

        thr0->thrkids -= 1;

        if (thr0->proc == proc) /* Thread Process Chain Leader */
            thr0->proc = proc->pnext;

        /* Disconnect from process chain */
        if (proc->pnext) proc->pnext->pprev = proc->pprev;
        if (proc->pprev) proc->pprev->pnext = proc->pnext;
        proc->pnext = proc->pprev = NULL;

        /* Next: either the thread changes, or
           the process moves into IDLE state. */

        if (pick_next_thread(proc)) {

          /* We have WORK !  We are reconnected to the new thread! */
          /* Picked a new thread, which isn't same as THR0! */

          /* Picked something, reschedule the old thread,
             and if it got destroyed (by expiry) loose it... */
          if (thr0 && !thread_reschedule(thr0, 0, -1))
            thr0 = NULL;
          if (thr0) delete_thread(thr0); /* possibly kill it if no more
                                    things in it active! */
          /* thr0 = NULL; -- this is dead variable.. */

          if (verbose)
            sfprintf(sfstdout, "%% pick_next_thread(proc=%p) gave thread %p\n",
                   proc, proc->pthread);

          /* DON'T double-count attempts on threads!
             This gets counted when some diagnostic is received
             for this thing.. */
          /* proc->pthread->attempts += 1; */

          if (feed_child(proc))
            /* non-zero return means things went wrong somehow.. */
            goto feed_error_handler;

          proc->state = CFSTATE_STUFFING;
          return;
        }

        /* proc->pthread == NULL  now */
        /* thr0->thrkids has been decremented */
        /* No work in sight, queue up '#idle\n' string. */

        if (thr0) {

          /* Previously we were disconnected from the thread,
           now join back so that IDLE will disconnect us... */

          proc->pthread = thr0;
          thr0->thrkids += 1;
          proc->pnext   = thr0->proc;
          if (proc->pnext) proc->pnext->pprev = proc;
          thr0->proc = proc;
        }

        if ((proc->cmdlen + 7) >= proc->cmdspc) {
          cmdbufalloc(proc->cmdlen+7, &proc->cmdbuf, &proc->cmdspc);
        }
        if (slow_shutdown) {
          proc->cmdbuf[ proc->cmdlen ] = '\n';
          proc->cmdlen += 1;
        } else {
          memcpy(proc->cmdbuf + proc->cmdlen, "#idle\n", 6);
          proc->cmdlen += 6;
        }
        proc->state = CFSTATE_IDLE;
        proc->overfed += 1;
        flush_child(proc);  /* May flip the state to CFSTATE_ERROR */
        return;

      case CFSTATE_IDLE: /* "4" */
        /* The process has arrived into IDLE pool! */

        if (proc->overfed > 0) abort();
        /* We have seen at least once that this value went NEGATIVE! */

        if (verbose)
          sfprintf(sfstdout, " ... IDLE THE PROCESS %p (of=%d).\n",
                 proc, proc->overfed);

        thr0 = proc->pthread;
        if (thr0) {

          proc->pthread = NULL;

          if (thr0->proc == proc) /* Was chain head */
            thr0->proc = proc->pnext;

          thr0->thrkids -= 1;

        }

        /* Unlink me from the active chain */
        if (proc->pnext) proc->pnext->pprev = proc->pprev;
        if (proc->pprev) proc->pprev->pnext = proc->pnext;

        proc->pnext = proc->thg->idleproc;
        if (proc->pnext) proc->pnext->pprev = proc;
        proc->pprev = NULL;
        proc->thg->idleproc = proc;

        proc->thg->idlecnt += 1;

        ++idleprocs;
        MIBMtaEntry->sc.TransportAgentsActiveSc -= 1;
        MIBMtaEntry->sc.TransportAgentsIdleSc   += 1;


        /* Failed to pick anything, reschedule the old thread.
           Possibly killed the thread.. */
        if (thr0) thread_reschedule(thr0, 0, -1);
        return;

      default: /* CFSTATE_ERROR: "0" */
        /* Some error was encountered at  feed_child()  at some
           point, we do nothing! */

        /* Some (real?) failure :-( */

      feed_error_handler:

        if (verbose)
          sfprintf(sfstdout,"%% ta_hungry(%p) OF=%d S=%s tofd=%d\n",
                 proc, proc->overfed, proc_state_names[proc->state],
                 proc->tofd);

        proc->state = CFSTATE_ERROR;

        /* We shut-down the child feed pipe */
        pipes_shutdown_child(proc->tofd);
        proc->tofd = -1;
        break;
      }
      return;
}

/*
 * start_child() -- build argv[], and do other inits for fork()ing
 *                  and execve()ing a new transport program for us.
 */

int
start_child(vhead, chwp, howp)
      struct vertex *vhead;
      struct web *chwp, *howp;
{
#define MAXARGC 40
      char * av[1+MAXARGC];
      char * ev[1+MAXARGC];
      char *os, *cp, *ocp, *s;
      char buf[MAXPATHLEN*4];
      char buf2[MAXPATHLEN];

      int    i, avi, evi;
      static time_t prev_time = 0;
      static int startcnt = 0; /* How many childs per second (time_t tick..) ? */
      time_t this_time;


      if (freeze) {
        vhead->thread->pending = "Frozen";
        return 0;
      }

      if (verbose)
        sfprintf(sfstdout,"transport(vhead=%p,chan=%s,host=%s)\n",
               vhead,chwp->name,howp->name);

      ++startcnt;
      this_time = mytime(NULL);
      if (this_time != prev_time) {
        startcnt = 0;
        prev_time = this_time;
      } else if (startcnt > forkrate_limit) {
        if (verbose)
          sfprintf(sfstdout," ... too many forks per second!\n");
        vhead->thread->pending = "ForkRateLimit";
        return 0;
      }

      if (vhead->thgrp->ce.argv == NULL) {
        sfprintf(sfstderr, "No command defined for %s/%s!\n",
              chwp->name, howp->name);
        vhead->thread->pending = "ConfBUG:NoCmdDefined!";
        return 0;
      }

      /*
       * Replace the $host and $channel strings in the command line.
       * (also any ${ZENV} variable)
       */
      os = buf;
      avi = evi = 0;
      for (i = 0; vhead->thgrp->ce.argv[i] != NULL; ++i) {
        if (strcmp(vhead->thgrp->ce.argv[i], replhost) == 0) {
          av[avi] = howp->name;
        } else if (strcmp(vhead->thgrp->ce.argv[i], replchannel) == 0) {
          av[avi] = chwp->name;
        } else if (strchr(vhead->thgrp->ce.argv[i], '$') != NULL) {
          s = os;
          for (cp = vhead->thgrp->ce.argv[i]; *cp != '\0'; ++cp) {
            if (*cp == '$' && *(cp+1) == '{') {
            cp += 2;
            ocp = cp;
            while (*cp != '\0' && *cp != '}')
              ++cp;
            if (*cp == '}') {
              *cp = '\0';
              if (strcmp(ocp,"host")==0) {
                strcpy(s,howp->name);
              } else if (strcmp(ocp,"channel")==0) {
                strcpy(s,chwp->name);
              } else {
                const char *t = getzenv(ocp);
                if (t)
                  strcpy(s, t);
              }
              s += strlen(s);
              *cp = '}';
            } else
              --cp;
            } else
            *s++ = *cp;
          }
          *s = '\0';
          av[avi] = os;
          os = s + 1;
        } else
          av[avi] = vhead->thgrp->ce.argv[i];

        if (os >= (buf+sizeof(buf))) {
          sfprintf(sfstderr,"BUFFER OVERFLOW IN ARGV[] SUBSTITUTIONS!\n");
          sfsync(sfstderr);
          abort();
        }

        if (avi == 0 && strchr(av[0],'=') != NULL) {
          ev[evi] = av[0];
          ++evi;
        } else if (avi == 0 && av[0][0] != '/') {
          /* Must add ${MAILBIN}/ta/ to be the prefix.. */

          static const char *mailbin = NULL;

          if (!mailbin) mailbin = getzenv("MAILBIN");
          if (!mailbin) mailbin = MAILBIN;

          sprintf(buf2,"%s/%s/%s", mailbin, qdefaultdir, av[0]);
          av[avi++] = buf2;
          if (strlen(buf2) > sizeof(buf2)) {
            /* Buffer overflow ! This should not happen, but ... */
            sfprintf(sfstderr,"BUFFER OVERFLOW IN ARGV[0] CONSTRUCTION!\n");
            sfsync(sfstderr);
            abort();
          }
        } else
          ++avi;
        if (avi >= MAXARGC) avi = MAXARGC;
        if (evi >= MAXARGC) evi = MAXARGC;
      }
      av[avi] = NULL;
      {
        const char *t;
        if ((t = getenv("TZ")))
          ev[evi++] = (char*) t-3; /* Pass the TZ      */
        if ((t = getzenv("PATH")))
          ev[evi++] = (char*) t-5; /* Pass the PATH    */
        if ((t = getzenv("ZCONFIG")))
          ev[evi++] = (char*) t-8; /* Pass the ZCONFIG */
      }
      ev[evi] = NULL;

      /* fork off the appropriate command with the appropriate stdin */
      if (verbose) {
        sfprintf(sfstdout,"${");
        for (i = 0; ev[i] != NULL; ++i)
          sfprintf(sfstdout," %s", ev[i]);
        sfprintf(sfstdout," }");
        for (i = 0; av[i] != NULL; ++i)
          sfprintf(sfstdout," %s", av[i]);
        sfprintf(sfstdout,"\n");
      }
      return runcommand(av, ev, vhead, chwp, howp);
}

static int runcommand(argv, env, vhead, chwp, howp)
      char * const argv[];
      char * const env[];
      struct vertex *vhead;
      struct web *chwp, *howp;
{
      int   i, pid, to[2], from[2], uid, gid, prio;
      char  *cmd;
      static int pipesize = 0;


      uid = vhead->thgrp->ce.uid;
      gid = vhead->thgrp->ce.gid;
      cmd = argv[0];
      prio= vhead->thgrp->ce.priority;

      if (pipes_create(to,from) < 0) return 0;
      if (pipesize == 0)
        pipesize = resources_query_pipesize(to[0]);

      if (verbose)
        sfprintf(sfstderr, "to %d/%d from %d/%d\n",
              to[0],to[1],from[0],from[1]);

      pid = fork();
      if (pid == 0) {   /* child */

        pipes_to_child_fds(to,from);

        /* keep current stderr for child stderr */
        /* close all other open filedescriptors */

        /* ... if detach() did its job, there shouldn't be any! */
        /* ... no, the 'querysock' is there somewhere!   */
        if (scheduler_nofiles < 1)
          scheduler_nofiles = resources_query_nofiles();
        for (i = 3; i < scheduler_nofiles; ++i)
          close(i);

#if defined(HAVE_SETPRIORITY) && defined(HAVE_SYS_RESOURCE_H)
        if (prio >= 80) { /* MAGIC LIMIT VALUE FOR ABSOLUTE SET! */
          setpriority(PRIO_PROCESS, 0, i - 100);
        } else
#endif
          if (prio != 0) {
            nice(prio);
          }

        resources_limit_nofiles(transportmaxnofiles);
        setgid(gid);    /* Do GID setup while still UID 0..   */
        setuid(uid);    /* Now discard all excessive powers.. */
        execve(cmd, argv, env);
        sfprintf(sfstderr, "Exec of %s failed!\n", cmd);
        _exit(1);
      } else if (pid < 0) {   /* fork failed - yell and forget it */
        close(to[0]); close(to[1]);
        close(from[0]); close(from[1]);
        sfprintf(sfstderr, "Fork failed!\n");
        vhead->thread->pending = "System:ForkFailure!";
        return 0;
      }

      /* parent */

      pipes_close_parent(to,from);

      /* save from[0] away as a descriptor to watch */
      stashprocess(pid, from[0], to[1], chwp, howp, vhead, argv);
      /* We wait for the child to report "#hungry", then we feed it.. */
      return 1;
}


static void stashprocess(pid, fromfd, tofd, chwp, howp, vhead, argv)
      int pid, fromfd, tofd;
      struct web *chwp, *howp;
      struct vertex *vhead;
      char * const argv[];
{
      int i, l, j;
      struct procinfo *proc;

      if (cpids == NULL) {
        if (scheduler_nofiles < 1)
          scheduler_nofiles = resources_query_nofiles();
        i = scheduler_nofiles;
        cpids = (struct procinfo *)
          emalloc((unsigned)(i * sizeof (struct procinfo)));
        memset(cpids, 0, sizeof(struct procinfo) * i);
      }
      proc = &cpids[fromfd];

      /* Free these buffers in case they exist from last use.. */
      if (proc->cmdbuf)  free(proc->cmdbuf);
      if (proc->cmdline) free(proc->cmdline);

      memset(proc,0,sizeof(struct procinfo));
#if 0 /* the memset() does this more efficiently.. */
      proc->pnext   = NULL;
      proc->pprev   = NULL;
      proc->cmdlen = 0;
      proc->reaped = 0;
      proc->carryover = NULL;
#endif

      proc->overfed = 1;
      proc->state   = CFSTATE_LARVA;

      proc->pid     = pid;

      proc->ch      = chwp;
      chwp->kids   += 1;

      proc->ho      = howp;
      howp->kids   += 1;

      proc->pthread            = vhead->thread;

      proc->thg           = vhead->thread->thgrp;
      proc->thg->transporters += 1;
      proc->pthread->thrkids  += 1;

      ++numkids;
      MIBMtaEntry->sc.TransportAgentProcessesSc += 1;
      MIBMtaEntry->sc.TransportAgentForksSc     += 1;
      MIBMtaEntry->sc.TransportAgentsActiveSc   += 1;


      proc->tofd          = tofd;
      proc->pnext         = proc->pthread->proc;
      proc->pthread->proc = proc;
      if (proc->pnext) proc->pnext->pprev = proc;

      mytime(&proc->hungertime); /* Actually it is not yet 'hungry' as
                              per reporting so, but we store the
                              time-stamp anyway */

      cmdbufalloc(200, &proc->cmdbuf,  &proc->cmdspc);
      cmdbufalloc(200, &proc->cmdline, &proc->cmdlspc);

      fd_nonblockingmode(fromfd);
      if (fromfd != tofd)
        fd_nonblockingmode(tofd);

      /* Construct a faximille of the argv[] in a single string.
         This is entirely for debug porposes in some rare cases
         where transport subprocess returns EX_SOFTWARE, and we
         send out LOG_EMERG alerts thru syslog.  */
      proc->cmdline[0] = 0;
      l = 0;
      for (i = 0; argv[i] != NULL; ++i) {
        if (i > 0)
          proc->cmdline[l++] = ' ';
        j = strlen(argv[i]);
        cmdbufalloc(l+j+1, &proc->cmdline, &proc->cmdlspc);
        memcpy(proc->cmdline+l, argv[i], j);
        l += j;
      }
      proc->cmdline[l] = '\0';

      if (verbose)
        sfprintf(sfstderr,
               "stashprocess(%d, %d, %d, %s, %s, '%s' proc=%p)\n",
               pid, fromfd, tofd, chwp ? chwp->name : "nil",
               howp ? howp->name : "nil", proc->cmdline, proc);
}

/*
 * shutdown all kids that we have
 */
void
shutdown_kids()
{
      int i;
      struct procinfo *proc = cpids;

      if (!cpids) return; /* Nothing to do! */

      for (i = 0; i < scheduler_nofiles; ++i,++proc)
        if (proc->pid > 0 && proc->tofd >= 0) {
          /* Send the death-marker to the kid, and
             then close the command channel */
          write(proc->tofd,"\n\n",2);
          pipes_shutdown_child(proc->tofd);
          proc->tofd = -1;
          /* Signals may happen... */
          if (proc->pid > 1)
            kill(proc->pid, SIGQUIT);
        }
}

/* 
 *  Reclaim the process slot -- this process is dead now.
 */
static void reclaim(fromfd, tofd)
      int fromfd, tofd;
{
      struct procinfo *proc = &cpids[fromfd];

      if (verbose)
        sfprintf(sfstderr,"reclaim(%d,%d) pid=%d, reaped=%d, chan=%s, host=%s\n",
               fromfd, tofd, (int)proc->pid, proc->reaped,
               proc->ch->name, proc->ho->name);

      if (proc->reaped &&
          ((WIFEXITED(proc->waitstat) && WEXITSTATUS(proc->waitstat)) ||
           WIFSIGNALED(proc->waitstat))) {

        /* Child sig-faulted, or had non-zero exit status! */

        sfprintf(sfstderr,
               "reclaim(%d,%d) pid=%d, reaped=%d, chan=%s, host=%s ",
               fromfd, tofd, (int)proc->pid, proc->reaped,
               proc->ch->name, proc->ho->name);

        if (WIFEXITED(proc->waitstat))
          sfprintf(sfstderr,"EXIT=%d\n", WEXITSTATUS(proc->waitstat));
        else if (WIFSIGNALED(proc->waitstat))
          sfprintf(sfstderr,"SIGNAL=%d\n", WTERMSIG(proc->waitstat));
        else
          sfprintf(sfstderr,"WAITSTAT=0x%04x\n", proc->waitstat);
      }

      proc->pid = 0;
      proc->reaped = 0;
      if (proc->carryover != NULL) {
        sfprintf(sfstderr, "%s: HELP! Lost %d bytes: '%s'\n",
              progname, (int)strlen(proc->carryover), proc->carryover);
        free(proc->carryover);
        proc->carryover = NULL;
      }
      if (proc->ch) {
        proc->ch->kids -= 1;
        unweb(L_CHANNEL, proc->ch);
        proc->ch = NULL;
      }
      if (proc->ho) {
        proc->ho->kids -= 1;
        unweb(L_HOST, proc->ho);
        proc->ho = NULL;
      }
      if (tofd >= 0)
        pipes_shutdown_child(tofd);
      close(fromfd);

      /* Reschedule the vertices that are left
         (that were not reported on).           */

      /* ... but only if we were not in IDLE chain! */
      if (proc->pthread) {

        /* Remove this entry from the chains */
        if (proc->pthread->proc == proc)
          proc->pthread->proc = proc->pnext;

        /* Disjoin the thread from the proc */
        proc->pthread->thrkids -= 1;

        /* Conditionally reschedule the thread */
        thread_reschedule(proc->pthread,0,-1);

        proc->pthread = NULL;

      } else {

        /* Maybe we were in idle chain! */
        struct procinfo *p;

        p  =  proc->thg->idleproc;
        while (p && p != proc) p = p->pnext;
        if (p == proc) {
          proc->thg->idlecnt -= 1;
          --idleprocs;

          /* Virtually move to ACTIVE state for count off below */
          MIBMtaEntry->sc.TransportAgentsActiveSc += 1;
          MIBMtaEntry->sc.TransportAgentsIdleSc   -= 1;

          /* Remove this entry from the chains */
          if (proc == proc->thg->idleproc)
            proc->thg->idleproc = proc->pnext;
        }
      }

      /* Remove this entry from the chains */
      if (proc->pnext) proc->pnext->pprev = proc->pprev;
      if (proc->pprev) proc->pprev->pnext = proc->pnext;

      /* If e.g. RESCHEDULE has not destroyed this thread-group.. */
      if (proc->thg) {
        proc->thg->transporters -= 1;
        /* It may go down to zero and be deleted.. */
        delete_threadgroup(proc->thg);
        proc->thg    = NULL;
      }

      MIBMtaEntry->sc.TransportAgentsActiveSc   -= 1;
      MIBMtaEntry->sc.TransportAgentProcessesSc -= 1;
      --numkids;
      proc->thg    = NULL;
}

static void waitandclose(fd)
      int   fd;
{
      /* This is called when
         - fd return 0 (EOF)
         - fd returns -1, and errno != {EAGAIN|EWOULDBLOCK|EINTR}
       */
      reclaim(fd, cpids[fd].tofd);
}

#ifdef      HAVE_SELECT

#ifdef _AIX /* The select.h  defines NFDBITS, etc.. */
# include <sys/types.h>
# include <sys/select.h>
#endif

#ifndef     NFDBITS
/*
 * This stuff taken from the 4.3bsd /usr/include/sys/types.h, but on the
 * assumption we are dealing with pre-4.3bsd select().
 */

typedef long      fd_mask;

#ifndef     NBBY
#define     NBBY  8
#endif      /* NBBY */
#define     NFDBITS           ((sizeof fd_mask) * NBBY)

/* SunOS 3.x and 4.x>2 BSD already defines this in /usr/include/sys/types.h */
#ifdef      notdef
typedef     struct fd_set { fd_mask fds_bits[1]; } fd_set;
#endif      /* notdef */

#ifndef     _Z_FD_SET
#define     _Z_FD_SET(n, p)   ((p)->fds_bits[0] |= (1 << (n)))
#define     _Z_FD_CLR(n, p)   ((p)->fds_bits[0] &= ~(1 << (n)))
#define     _Z_FD_ISSET(n, p) ((p)->fds_bits[0] & (1 << (n)))
#define _Z_FD_ZERO(p)     memset((char *)(p), 0, sizeof(*(p)))
#endif      /* !FD_SET */
#endif      /* !NFDBITS */

#ifdef FD_SET
#define _Z_FD_SET(sock,var) FD_SET(sock,&var)
#define _Z_FD_CLR(sock,var) FD_CLR(sock,&var)
#define _Z_FD_ZERO(var) FD_ZERO(&var)
#define _Z_FD_ISSET(i,var) FD_ISSET(i,&var)
#else
#define _Z_FD_SET(sock,var) var |= (1 << sock)
#define _Z_FD_CLR(sock,var) var &= ~(1 << sock)
#define _Z_FD_ZERO(var) var = 0
#define _Z_FD_ISSET(i,var) ((var & (1 << i)) != 0)
#endif


int in_select = 0;

int
mux(timeout)
time_t timeout;
{
      int   i, n, maxf;
      fd_set      rdmask;
      fd_set      wrmask;
      struct timeval tv;
      struct procinfo *proc = cpids;

      timed_log_reinit();

      if (in_select) {
        sfprintf(sfstderr,"**** recursed into mux()! ***\n");
        return 0;
      }

      queryipccheck();

      tv.tv_sec = timeout - now; /* Timeout in seconds */
      if (timeout < now)
        tv.tv_sec = 0;
      tv.tv_usec = 0;

      maxf = -1;
      _Z_FD_ZERO(rdmask);
      _Z_FD_ZERO(wrmask);
      readsockcnt = 0;
      if (cpids != NULL)
        for (proc = cpids,i = 0; i < scheduler_nofiles ; ++i,++proc)
          if (proc->pid != 0) {
            /* Something can be read ? */
            _Z_FD_SET(i, rdmask);
            if (maxf < i)
            maxf = i;

            ++readsockcnt;
            /* Something to write ? */
            if (proc->cmdlen > 0 && proc->tofd >= 0) {
            _Z_FD_SET(proc->tofd, wrmask);
            if (maxf < proc->tofd)
              maxf = proc->tofd;
            }

          }

      if (querysocket >= 0) {
        _Z_FD_SET(querysocket, rdmask);
        if (maxf < querysocket)
          maxf = querysocket;
      }
      if (notifysocket >= 0) {
        _Z_FD_SET(notifysocket, rdmask);
        if (maxf < notifysocket)
          maxf = notifysocket;
      }

      /* Although we don't react on the results of these MQ2 fd's,
         getting them to break timeouts is important for MAILQv2
         responsiveness. ! */

      if (mailqmode == 2)
        maxf = mq2add_to_mask(&rdmask, &wrmask, maxf);

      if (maxf < 0)
        return -1;

      ++maxf;
      /* sfprintf(sfstderr, "about to select on %x [%d]\n",
         mask.fds_bits[0], maxf); */

      in_select = 1;

      n = select(maxf, &rdmask, &wrmask, NULL, &tv);

      if (n < 0) {
        int err = errno;

        timed_log_reinit();

        /* sfprintf(sfstderr, "got an interrupt (%d)\n", errno); */
        in_select = 0;
        if (errno == EINTR || errno == EAGAIN)
          return 0;
        if (errno == EINVAL || errno == EBADF) {
          sfprintf(sfstderr, "** select() returned errno=%d\n", err);
          for (i = 0; i < maxf; ++i) {
            if (_Z_FD_ISSET(i,rdmask)  &&  fcntl(i,F_GETFL,0) < 0)
            sfprintf(sfstderr,"** Invalid fd on a select() rdmask: %d\n",i);
            if (_Z_FD_ISSET(i,wrmask)  &&  fcntl(i,F_GETFL,0) < 0)
            sfprintf(sfstderr,"** Invalid fd on a select() wrmask: %d\n",i);
          }
          sfsync(sfstderr);
          abort(); /* mux() select() error EINVAL or EBADF !? */
        }
        perror("select() returned unknown error ");
        fflush(stderr);
        sfsync(sfstderr);
        abort(); /* Select with unknown error */
      } else if (n == 0) {
        /* sfprintf(sfstderr, "abnormal 0 return from select!\n"); */
        /* -- just a timeout -- fast or long */
        timed_log_reinit();
        in_select = 0;
        return 1;
      } else {
        /*sfprintf(sfstderr, "got %d ready (%x)\n", n, rdmask.fds_bits[0]);*/

        /* In case we really should react.. */
        if (querysocket >= 0 &&
            _Z_FD_ISSET(querysocket, rdmask))
          queryipccheck();
        
        if (notifysocket >= 0 &&
            _Z_FD_ISSET(notifysocket, rdmask))
          receive_notify(notifysocket);

        if (cpids != NULL) {

          for (proc = cpids, i = 0; i < maxf; ++i, ++proc) {
            
            timed_log_reinit();

            if (proc->pid < 0 ||
              (proc->pid > 0 && _Z_FD_ISSET(i, rdmask))) {
            _Z_FD_CLR(i, rdmask);
            /*sfprintf(sfstderr,"that is fd %d\n",i);*/
            /* do non-blocking reads from this fd */
            readfrom(i);
            }

            /* In case we have non-completed 'feeds', try feeding them */
            if (proc->pid > 0    &&  proc->tofd >= 0   &&
              proc->cmdlen > 0 &&  _Z_FD_ISSET(proc->tofd, wrmask))
            flush_child(proc);

            /* Because this loop might take a while ... */
            queryipccheck();

            syncweb(dirq);

          }
        }

        in_select = 0;
      }
      /* sfprintf(sfstderr, "return from mux\n"); */
      return 0;
}

/* Call it how often you wish, but act it only once a second, or so.. */
static time_t lastqueryipccheck;
static time_t qipcretry;

void
queryipccheck()
{
      int   n;
      fd_set      rdmask;
      fd_set      wrmask;
      struct timeval tv;
      int maxfd;

      timed_log_reinit(); /* internal: mytime(&now); */

      if (!mq2_active() && (lastqueryipccheck == now)) return;
      lastqueryipccheck = now;

      if (qipcretry > 0 && qipcretry <= now) {
        qipcretry = 0;
        queryipcinit();
        /*
         * If qipcretry is set here, the value will be ignored, but
         * that's ok since sweepretry is active by now
         */
      }

      if (querysocket >= 0 || notifysocket >= 0) {

        maxfd = querysocket;

        tv.tv_sec = 0;
        tv.tv_usec = 0;

        _Z_FD_ZERO(rdmask);
        _Z_FD_ZERO(wrmask);
        if (querysocket >= 0)
          _Z_FD_SET(querysocket, rdmask);
        if (notifysocket >= 0) {
          _Z_FD_SET(notifysocket, rdmask);
          if (notifysocket > maxfd)
            maxfd = notifysocket;
        }

        if (mailqmode == 2) {
          maxfd = mq2add_to_mask(&rdmask, &wrmask, maxfd);
          n = select(maxfd+1, &rdmask, &wrmask, NULL, &tv);
          if (n > 0)
            mq2_areinsets(&rdmask, &wrmask);
        } else 
          n = select(maxfd+1, &rdmask, &wrmask, NULL, &tv);

        if (n > 0 && notifysocket >= 0 &&
            _Z_FD_ISSET(notifysocket, rdmask)) {
          receive_notify(notifysocket);
        }

        if (n > 0 && querysocket >= 0 &&
            _Z_FD_ISSET(querysocket, rdmask)) {
          Usockaddr raddr;
          int raddrlen = sizeof(raddr);

          n = accept(querysocket, (struct sockaddr *)&raddr, &raddrlen);
          if (n >= 0) {
            if (mailqmode == 1) {
            int pid;

            MIBMtaEntry->sc.MQ1sockConnects ++;
            MIBMtaEntry->sc.MQ1sockParallel ++;

            pid = fork();
            if (pid == 0) {
#if defined(F_SETFD)
              fcntl(n, F_SETFD, 1); /* close-on-exec */
#endif
#ifdef USE_TCPWRAPPER
#ifdef HAVE_TCPD_H /* TCP-Wrapper code */
              if (wantconn(n, "mailq") == 0) {
                char *msg = "500 TCP-WRAPPER refusing 'mailq' query from your whereabouts\r\n";
                int   len = strlen(msg);
                write(n,msg,len);
                MIBMtaEntry->sc.MQ1sockParallel --;
                MIBMtaEntry->sc.MQ1sockTcpWrapRej ++;
                _exit(0);
              }
#endif
#endif
              qprint(n);
              close(n);
              MIBMtaEntry->sc.MQ1sockParallel --;
              /* Silence memory debuggers about this child's
                 activities by doing exec() on the process.. */
              /* execl("/bin/false","false",NULL); */
              _exit(0); /* _exit() should be silent, too.. */
            }
            if (pid < 0)
              MIBMtaEntry->sc.MQ1sockParallel --;
            close(n);
            } else {
            /* mailqmode == 2 */

            MIBMtaEntry->sc.MQ2sockConnects ++;

#if 0  /* NOT IN MAILQ-V2 MODE ! */
#ifdef USE_TCPWRAPPER
#ifdef HAVE_TCPD_H /* TCP-Wrapper code */
            if (wantconn(n, "mailq") == 0) {
              char *msg = "500 TCP-WRAPPER refusing 'mailq' query from your whereabouts\r\n";
              int   len = strlen(msg);
              write(n,msg,len);
              MIBMtaEntry->sc.MQ2sockTcpWrapRej ++;
              close(n);
            }
            else
#endif
#endif
#endif
              mq2_register(n, &raddr);
            }
          }
        }
      }
}

void
queryipcinit()
{
      int modecode = 1; /* Modes: 1=TCP, 2=UNIX, default=TCP */
      char *modedata = NULL;

      while (notifysocket < 0) {

        if (!notifysock) {
          notifysock = (char *)getzenv("SCHEDULERNOTIFY");
          if (notifysock) {
            notifysock = zenvexpand(notifysock);
          } else {
            notifysock = strsave("UNIX:${POSTOFFICE}/.scheduler.notify");
            notifysock = zenvexpand(notifysock);
          }
        }

        if (notifysock) {
          if (cistrncmp(notifysock,"UNIX:",5)==0) {
            modedata = (char *)notifysock+5;
            modecode = 2;
          } else if (*notifysock == '/') {
            /* If it begins with '/', it is AF_UNIX socket */
            modedata = (char *)notifysock;
            modecode = 2;
          } else if (cistrncmp(notifysock,"UDP:",4)==0) {
            modedata = (char *)notifysock+4;
            modecode = 1;
          } else {
            /* The default mode is UDP/IP socket */
            modedata = (char *)notifysock;
            modecode = 1;
          }
        } else
          modecode = 0;

#ifdef  AF_UNIX
        if (modecode == 2) {
          struct sockaddr_un sad;
          int on = 1, oldumask;

          memset(&sad, 0, sizeof(sad));
          sad.sun_family = AF_UNIX;
          strncpy(sad.sun_path, modedata, sizeof(sad.sun_path));
          sad.sun_path[ sizeof(sad.sun_path)-1 ] = 0;

          if ((notifysocket = socket(PF_UNIX, SOCK_DGRAM, 0)) < 0) {
            perror("notifysocket: socket(PF_UNIX)");
            break;
          }

          setsockopt(notifysocket, SOL_SOCKET, SO_REUSEADDR, (void*)&on, sizeof(on));

          /* In case that one already exists.. */
          unlink(sad.sun_path);

          oldumask = umask(0577);

          if (bind(notifysocket, (struct sockaddr *)&sad, sizeof sad) < 0) {
            perror("bind:UNIX notify socket");
            umask(oldumask);
            close(notifysocket);
            notifysocket = -1;
            break;
          }
          umask(oldumask);

          fcntl(notifysocket, F_SETFL,
              fcntl(notifysocket, F_GETFL, 0)|O_NONBLOCK);

#if defined(F_SETFD)
          fcntl(notifysocket, F_SETFD, 1); /* close-on-exec */
#endif
        }
#endif /* AF_UNIX */
        break;
      }

      while (querysocket < 0) {

        modedata = NULL;
        modecode = 0;
        if (mailqsock) {
          if (cistrncmp(mailqsock,"UNIX:",5)==0) {
            modedata = (char *)mailqsock+5;
            modecode = 2;
          } else if (*mailqsock == '/') {
            /* If it begins with '/', it is AF_UNIX socket */
            modedata = (char *)mailqsock;
            modecode = 2;
          } else if (cistrncmp(mailqsock,"TCP:",4)==0) {
            modedata = (char *)mailqsock+4;
            modecode = 1;
          } else {
            /* The default mode is TCP/IP socket */
            modedata = (char *)mailqsock;
            modecode = 1;
          }
        } else {
          /* The default mode is TCP/IP socket */
          modedata = (char *)mailqsock;
          modecode = 1;
        }

#ifdef  AF_UNIX
        if (modecode == 2) {
          struct sockaddr_un sad;
          int on = 1, oldumask;

          memset(&sad, 0, sizeof(sad));
          sad.sun_family = AF_UNIX;
          strncpy(sad.sun_path, modedata, sizeof(sad.sun_path));
          sad.sun_path[ sizeof(sad.sun_path)-1 ] = 0;

          if ((querysocket = socket(PF_UNIX, SOCK_STREAM, 0)) < 0) {
            perror("querysocket: socket(PF_UNIX)");
            break;
          }

          setsockopt(querysocket, SOL_SOCKET, SO_REUSEADDR, (void*)&on, sizeof(on));

          /* In case that one already exists.. */
          unlink(sad.sun_path);

          oldumask = umask(0111);

          if (bind(querysocket, (struct sockaddr *)&sad, sizeof sad) < 0) {
            perror("bind:UNIX mailq socket");
            umask(oldumask);
            close(querysocket);
            querysocket = -1;
            break;
          }
          umask(oldumask);

          fd_nonblockingmode(querysocket);

#if defined(F_SETFD)
          fcntl(querysocket, F_SETFD, 1); /* close-on-exec */
#endif

          if (listen(querysocket, 5) < 0) {
            perror("listen:UNIX mailq socket");
            close(querysocket);
            querysocket = -1;
            break;
          }
        }
#endif
#ifdef      AF_INET
        if (modecode == 1) {
          struct servent *serv;
          Usockaddr ua;
          int on = 1;
          int port = 174;
          char *modedata2 = NULL;

          if (modedata) {
            modedata2 = strchr(modedata,'@');
            if (modedata2) *modedata2++ = 0;
          }

          if (!modedata || !*modedata || sscanf(modedata,"%d",&port) != 1) {
            serv = getservbyname(modedata ? modedata : "mailq", "tcp");
            if (serv == NULL) {
            sfprintf(sfstderr, "No 'mailq' tcp service defined!\n");
            port = 174; /* magic knowledge */
            } else
            port = ntohs(serv->s_port);
          }
          if (modedata2)  modedata2[-1] = '@';

          memset(&ua, 0, sizeof(ua));
          if (zgetbindaddr(NULL, AF_INET, &ua))
            ua.v4.sin_addr.s_addr = htonl(INADDR_ANY);
          ua.v4.sin_port        = htons(port);
          ua.v4.sin_family      = AF_INET;
          querysocket = -1;
#ifdef INET6
          querysocket = socket(PF_INET6, SOCK_STREAM, 0);
#endif
          if (querysocket < 0)
            querysocket = socket(PF_INET, SOCK_STREAM, 0);
#ifdef INET6
          else {
            memset(&ua, 0, sizeof(ua));
            if (zgetbindaddr(NULL, AF_INET6, &ua))
              ua.v4.sin_addr.s_addr = htonl(INADDR_ANY);
            ua.v6.sin6_port   = htons(port);
            ua.v6.sin6_family = AF_INET6;
          }
#endif
          if (querysocket < 0) {
#ifdef INET6
            perror("querysocket: socket(PF_INET6/PF_INET)");
#else
            perror("querysocket: socket(PF_INET)");
#endif
            break;
          }
          setsockopt(querysocket, SOL_SOCKET, SO_REUSEADDR, (void*)&on, sizeof(on));

#ifdef INET6
          if (ua.v6.sin6_family == AF_INET6) {
            if (bind(querysocket, (struct sockaddr *)&ua, sizeof ua.v6) < 0) {
            perror("bind:TCP6 mailq socket");
            close(querysocket);
            querysocket = -1;
            break;
            }
          } else
#endif
            if (bind(querysocket, (struct sockaddr *)&ua, sizeof ua.v4) < 0) {
            perror("bind:TCP4 mailq socket");
            close(querysocket);
            querysocket = -1;
            break;
            }

          fcntl(querysocket, F_SETFL,
              fcntl(querysocket, F_GETFL, 0)|O_NONBLOCK);


#if defined(F_SETFD)
          fcntl(querysocket, F_SETFD, 1); /* close-on-exec */
#endif

          if (listen(querysocket, 5) < 0) {
            perror("listen:TCP mailq socket");
            close(querysocket);
            querysocket = -1;
            break;
          }
        }
#endif      /* AF_INET */
        break;
      }

      if (querysocket >= 0 && notifysocket >= 0)
        qipcretry = 0; /* Successfull init done. */
      else {
        mytime(&now); 
        qipcretry = now + 5; /* Will do retry soon.. */

      }
}

#else /* !HAVE_SELECT */

int
mux(timeout)
time_t timeout;
{
      int   fd;

      /*
       * Nice'n easy and simpleminded: grab a random file descriptor,
       * and sit and read off it until something happens.
       * Some very complicated mux'ing schemes (with shared pipes'n stuff)
       * are possible in the absence of async i/o like select() or the
       * simulation USG supplies, but it ain't worth the hassle.
       */
      readsockcnt = 0;
      if (cpids != NULL)
        for (fd = 0; fd < scheduler_nofiles ; ++fd)
          if (cpids[fd].pid != 0) {
            readfrom(fd);
            ++readsockcnt;
          }

      mytime(&now);
      if (timeout > now)
        sleep(1);
      return 1;
}

void queryipccheck()
{
      /* NOTHING AT ALL -- No select(), no querysocket.. */
}

void
queryipcinit()
{
}
#endif      /* HAVE_SELECT */

static void readfrom(fd)
      int fd;
{
      int   n, e, bufsize = 2048;
      char  *cp, *eobuf, *buf;
      struct procinfo *proc = &cpids[fd];

      cp = buf = (char *)emalloc(bufsize);

      if (proc->carryover != NULL) {
        int carrylen = strlen(proc->carryover);
        if (carrylen > bufsize) {
          while (carrylen > bufsize)
            bufsize += 1024;
          buf = erealloc(buf,bufsize);
        }
        strcpy(buf, proc->carryover);
        cp = buf+strlen(buf);
        free(proc->carryover);
        proc->carryover = NULL;
      }

      /* Note that if we get an alarm() call, the read will return -1, TG */
      errno = 0;
      while ((n = read(fd, cp, bufsize - (cp - buf))) > 0) {

        if (verbose)
          sfprintf(sfstderr, "read from %d returns %d\n", fd, n);

        eobuf = cp + n;

        for (cp = buf; cp < eobuf;) {
          if (*cp == '\n') {
            int rlen = eobuf - (cp+1);
            *cp = '\0';
            if (verbose)
            sfprintf(sfstderr, "%p %d fd=%d processed: %s\n",
                   proc, (int)proc->pid, fd, buf);
            update(fd,buf);
            ++cp;
            if (rlen > 0)
            memcpy(buf, cp, rlen);
            else
            rlen = 0;
            cp = buf;
            eobuf = buf + rlen;

          } else

            ++cp;
        }

        if (cp == (buf + bufsize)) {
          /* 
           * can't happen, this would mean a status report line 
           * that is rather long...
           * (oh no! it did happen, it did, it did!...)
           */
          int oldsize = bufsize;
          bufsize <<= 1;
          buf = erealloc(buf,bufsize);
          cp = buf + oldsize;
          *cp = '\0';
        }

      }
      e = errno;

      if (verbose) {
        if (!(e == EAGAIN || e == EWOULDBLOCK))
          sfprintf(sfstderr,
                "read from %d returns %d, errno=%d\n", fd, n, e);
      }
      if (n == 0 || (n < 0 && !(e == EWOULDBLOCK ||
                          e == EAGAIN || e == EINTR))) {
        /*sfprintf(sfstdout,
          "about to call waitandclose(), n=%d, errno=%d\n",n,e);*/

        if (proc->tofd >= 0)
          pipes_shutdown_child(proc->tofd);
        proc->tofd = -1;
        waitandclose(fd);
      }

      /* sfprintf(sfstderr, "n = %d, errno = %d\n", n, errno); */
      /*
       * if n < 0, then either we got an interrupt or the read would
       * block (EINTR or EWOULDBLOCK). In both cases we basically just
       * want to get back to whatever we were doing. We just need to
       * make darned sure that a newline was the last character we saw,
       * or else some report may get lost somewhere.
       */
      if (proc->pid != 0) {
        proc->carryover = emalloc(cp - buf + 1);
        memcpy(proc->carryover, buf, cp - buf);
        proc->carryover[cp - buf] = '\0';
      } else if (cp > buf)
        sfprintf(sfstderr,
               "HELP! Lost %ld bytes (n=%d/%d): '%s'\n",
               (long)(cp - buf), n, errno, buf);
      free(buf);
}

#if defined(USE_BINMKDIR) || defined(USE_BINRMDIR)

/*
 * From Ross Ridge's Xenix port:
 * - A nasty problem occurs with scheduler if rmdir (and mkdir also I think),
 *   is implented as system("/bin/rmdir ...").  When system() calls wait()
 *   it can reap the scheduler's children without it knowing.  I fixed this
 *   problem by writing a replacement system() function for scheduler.
 *
 */

int
system(name)
      char *name;
{
      char *sh;
      int st, r;
      int pid;
      int i;

      pid = fork();
      switch(pid) {
      case -1:
            return -1;
      case 0:
            sh = getenv("SHELL");
            if (sh == NULL) {
              sh = "/bin/sh";
            }
            execl(sh, sh, "-c", name, NULL);
            _exit(1);
      default:
#ifndef USE_SIGREAPER
            while(1) {
              r = wait(&st);
              if (r == -1) {
                if (errno != EINTR) {
                  return -1;
                  if (errno != EINTR) {
                  return -1;
                  }
                } else if (r == pid) {
                  break;
                }
                for(i = 0; i < scheduler_nofiles; i++) {
                  if (cpids[i].pid == r) {
                  cpids[i].pid = -r;
                  break;
                  }
                }
              }

              if ((st & 0x00ff) == 0) {
                return st >> 8;
              }
              return 1;
            }
#endif
            break;
         }
}

#endif


#ifdef USE_SIGREAPER
/*
 *    Catch each child-process death, and reap them..
 */
RETSIGTYPE sig_chld(signum)
int signum;
{
      int pid;
      int ok = 0;
      int i;
      int statloc;

      for (;;) {

#ifdef      HAVE_WAITPID
        pid = waitpid(-1, &statloc, WNOHANG);
#else
#ifdef  HAVE_WAIT4
        pid = wait4(-1, &statloc, WNOHANG, NULL);
#else
#ifdef  HAVE_WAIT3
        pid = wait3(&statloc, WNOHANG, NULL);
#else
        pid = wait(&statloc);
#endif
#endif
#endif
        if (pid <= 0) break;

        if (WIFEXITED  (statloc)) ok = 1;
        if (WIFSIGNALED(statloc)) ok = 1;

        if (verbose) {
          sfprintf(sfstderr,"sig_chld() pid=%d, ok=%d, stat=0x%x ",
                pid,ok,statloc);
          if (WIFEXITED  (statloc))
            sfprintf(sfstderr,"EXIT=%d\n", WEXITSTATUS(statloc));
          if (WIFSIGNALED(statloc))
            sfprintf(sfstderr,"SIGNAL=%d\n", WTERMSIG(statloc));
        }

        if (ok && cpids != NULL) {
          /* Only EXIT and SIGxx DEATHS accepted */

          for (i = scheduler_nofiles-1; i >= 0; --i) {
            if (cpids[i].pid == pid) {
            cpids[i].pid = -pid; /* Mark it as reaped.. */
            cpids[i].reaped = 1;
            cpids[i].waitstat = statloc;
            ok = 0;
            if (WSIGNALSTATUS(statloc) == 0 &&
                WEXITSTATUS(statloc)   == EX_SOFTWARE) {
              zsyslog((LOG_EMERG, "Transporter process %d exited with EX_SOFTWARE!", pid));
              sfprintf(sfstderr, "Transporter process %d exited with EX_SOFTWARE; cmdline='%s'\n", pid, cpids[i].cmdline);
            }
            break;
            }
            if (cpids[i].pid == -pid) {
            sfprintf(sfstdout," .. already reaped ??\n");
            cpids[i].pid = -pid; /* Mark it as reaped.. */
            cpids[i].reaped = 1;
            cpids[i].waitstat = statloc;
            ok = 0;
            break;
            }
          }
        }
      }

      /* re-instantiate the signal handler.. */
#ifdef SIGCLD
      SIGNAL_HANDLE(SIGCLD,  sig_chld);
#else
      SIGNAL_HANDLE(SIGCHLD, sig_chld);
#endif
}
#endif /* USE_SIGREAPER */

Generated by  Doxygen 1.6.0   Back to index