\setstretch{1.0}
\section*{Handling feeder and haven sockets:  comm.c}
\begin{verbatim}
#include "../shared/common.h"
#include "../shared/protos.h"
#include "comm.h"
#include "protos.h"

static char haven_can_host_name[MAX_HOSTLEN+1]; /* canonical form */

int haven_socket;
struct message_queue_t *haven_queue;
struct feeder_t *feeder;

int main(int argc, char **argv) {
  int listen_socket;
  
  if (argc < 3 || !str_is_number(argv[1]) || !str_is_number(argv[2])) {
    printf("Usage: %s incoming-port haven-port\n", argv[0]);
    exit_comm();
  }
  
  umask(022);
  
  INFO("Loading configuration options...\n");
  load_conf("../config/comm.conf");

  INFO("Initializing node database...\n");
  initialize_nodedb();
  
  INFO("Starting comm probe: incoming port %d, haven module on port %d\n",atoi(argv[1]), atoi(argv[2]));
  
  handle_ports(atoi(argv[1]), listen_socket, atoi(argv[2]));
  
  return (1);
}


void handle_ports(int incoming_port, int listen_socket, int haven_port) {
  fd_set rfds;
  int highest_fd; /* for use with select. contains one more than max(fd's)*/
  int select_result;
  struct timeval tv; /* for sleeping in select */

  int tmp_haven_socket;
  int haven_connected;
  int haven_waiting;
  static struct sockaddr_in haven_in;

  int incoming_socket;
  struct sockaddr_in incoming;
  int incominglen;
  char *sec_timeout, *usec_timeout;

  int new_feeder_socket;
  //  char feeder_buf[FEEDER_BUFSIZE];
  char *feeder_buf;
  struct feeder_t *prev_feeder, *curr_feeder;
  int feeder_read_result, written_buf;

  struct message_queue_t *haven_queue_entry;
  char *busiest_node_PK;


  //  char feederfile_name[LENGTH_OF_COMM_FILE_PATH+1];

  /* Resolve name of controller name into a socket address. */
  haven_in.sin_family = AF_INET;
  if ((haven_in.sin_addr.s_addr = resolve_name("localhost", 
       haven_can_host_name, MAX_HOSTLEN)) == 0)
  {   
      printf("Comm: unknown host %s\n", "localhost");
      exit_comm();
  }

  /* Fill in the controller port. */
  haven_in.sin_port = htons(haven_port);
  
  /* Create a socket of type stream (instead of datagram). */
  if ((tmp_haven_socket = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
    perror("Comm: socket failed");
    exit_comm();
  }

  /* haven initially not connected */
  haven_connected = 0;
  /* initially no feeder sockets connected */
  feeder = NULL;
  
  /* Wait until listen_connection gets incoming connection */
  incoming_socket = create_listening_tcpsocket(incoming_port);
  /* Set incoming socket to non-blocking */
  fcntl(incoming_socket, F_SETFL, O_NONBLOCK);

  
  /* main loop for comm, which never ends. */
  for(;;) {  
    
    /* Haven module has possibly gone down */
    /* Open connection with haven module   */
    if (!haven_connected) { 
	 	   
      INFO("Comm: attempting haven connection...\n");

      /* Create a socket of type stream (instead of datagram). */
      if ((tmp_haven_socket = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
          perror("Comm: haven socket failed");
          exit_comm();
      }
      haven_waiting = 0;
       /* Set up a connection with the controller. */
      if (connect(tmp_haven_socket, (struct sockaddr *) &haven_in, sizeof(haven_in)) < 0) {     
	if (errno == ECONNREFUSED) {
	  INFO("Comm: haven connection failed, sleeping %i sec...\n", DEFAULT_TIME_SLEEP_FOR_HAVEN);
	  sleep(DEFAULT_TIME_SLEEP_FOR_HAVEN);
	  haven_waiting = 1;
	} else {
          perror("Comm: connect to haven failed");
	  exit_comm();
	}
      }

      if (!haven_waiting) {
	/* now haven_socket is what we're going to use. */
	haven_socket = tmp_haven_socket;
	haven_connected = 1;
	
	INFO("Comm: connect to haven succeeded to %s on port %d\n",
	     (char *)inet_ntoa(haven_in.sin_addr), haven_in.sin_port);
      }
    } /* end !haven_connented */


    /* set select timeout */
    if ((sec_timeout = get_conf("comm_socket_sec_timeout"))) {
      tv.tv_sec = atoi(sec_timeout);
    } else {
      tv.tv_sec = 0;
    }
    if ((usec_timeout = get_conf("comm_socket_usec_timeout"))) {
      tv.tv_usec = atoi(usec_timeout);
    } else {
      tv.tv_usec = 0;
    }

    /* prepare to select among possible socket connections */
    FD_ZERO(&rfds);
    FD_SET(incoming_socket, &rfds);

    if (!haven_waiting) {
      FD_SET(haven_socket, &rfds);
      highest_fd = (haven_socket > incoming_socket) ? haven_socket : incoming_socket;
    } else {
      highest_fd = incoming_socket;
    }

    /* add each feeder in linked list to select rfds */
    curr_feeder = feeder;
    while (curr_feeder) {
      FD_SET(curr_feeder->socket, &rfds);

      if (curr_feeder->socket > highest_fd) {
	highest_fd = curr_feeder->socket;
      }
      curr_feeder = curr_feeder->next;
    }

    highest_fd++; 


    /********************************************/
    /* select and wait for socket transmissions */
    select_result = select(highest_fd, &rfds, NULL, NULL, &tv);

    if (select_result < 0) {
      perror("Comm: select failed");
      exit_comm(); 
    } 

    /* one of more socket fd are selected.  Check rfds */
    if (select_result) {
      
      /* data on haven socket? */
      if (FD_ISSET(haven_socket, &rfds)) {
	haven_connected = process_from_socket(haven_socket, get_conf("comm_tempfile_outgoing"));	
      }
      
      
      
      /* data on incoming socket? */
      if (FD_ISSET(incoming_socket, &rfds)) {
	
	if ((new_feeder_socket = accept(incoming_socket, 
					(struct sockaddr *) &incoming, &incominglen)) < 0) {
	  // EAGAIN acceptable: non-blocking and no connection waiting */
	  if (errno != EAGAIN) {
	    perror("Comm: accepting incoming connection");
	    exit_comm();
	  }
	}
	
	
	if (new_feeder_socket >= 0) {  /* else nonblocking socket EAGAIN error */
	  
	  create_feeder_entry(new_feeder_socket);
	  
	  INFO("Comm: incoming socket from %s on port %d, using feeder file %s\n",
	       (char *)inet_ntoa(incoming.sin_addr), incoming.sin_port, feeder->filename);
	}
      }
      
      
      /* data on feeder sockets? */      
      prev_feeder = NULL;
      curr_feeder = feeder;
      while (curr_feeder) {
	
	if (FD_ISSET(curr_feeder->socket, &rfds)) {
	  feeder_buf = (char *)malloc(FEEDER_BUFSIZE*sizeof(char));
	  feeder_read_result = read(curr_feeder->socket, feeder_buf, FEEDER_BUFSIZE);

	  INFO("Comm: reading feeder socket %i\n", curr_feeder->socket);

	  /* Error encountered */
	  if (feeder_read_result < 0) { 
	    perror("Comm: reading from feeder socket");
	    exit_comm(); /* this is a bad idea, should fix later */
	  }

	  /* EOF has been reached */
	  if (feeder_read_result == 0) {
	    INFO("Comm: finished reading on feeder socket, written to feeder file %s\n",
		 feeder->filename);
	    process_feeder_entry(curr_feeder, prev_feeder);
	    /* prepare for next iteration of feeder while loop */
	    curr_feeder = prev_feeder;
	  }	    

	  while (feeder_read_result > 0) {
	      written_buf = write(curr_feeder->fd, feeder_buf, feeder_read_result);
	      feeder_buf += written_buf;
	      feeder_read_result -= written_buf;
	  }
	} /* end if FD_ISSET */

	prev_feeder = curr_feeder;
	if (curr_feeder) {
	  curr_feeder = curr_feeder->next;
	}
      } /* end while (feeder) */

    } /* end if (select_result) */


    /* Dequeue and send file to haven */
    if (haven_queue && haven_connected) {
      /* Possibly handle multiple transfers during each cycle:
       *     send the entire queue  i.e, while (haven_queue) {...}
       */
      haven_queue_entry = haven_queue->next;
      send_file_to_haven(haven_queue->filename);
      free(haven_queue);
      haven_queue = haven_queue_entry;
    }


    /* check to process haven request, sending thing into mix */
    if ((busiest_node_PK = node_get_busiest_PK())) {
      do_transmit(busiest_node_PK);
    }

  } /* end main comm for(;;) loop */
  
}


void create_feeder_entry(int new_feeder_socket) {
  /* requires:  new_feeder_socket be valid socket > 0 */
  /* effects:   add new feeder entry to feeder queue  */
  struct feeder_t *feeder_entry;
 
  feeder_entry = malloc(sizeof(struct feeder_t));
  feeder_entry->socket = new_feeder_socket;
  
  sprintf(feeder_entry->filename, "%s/tmp-incoming-XXXXXX", get_conf("comm_incoming_directory_root"));
  /* XXX Creates with mode 0666, or handled with umask? */
  feeder_entry->fd = mkstemp(feeder_entry->filename);
  
  if (feeder_entry->fd < 0) { /* error */
    perror("Comm: feeder mkstemp");
    exit_comm();
  }

  /* connect to head of feeder list */
  /* this is a FILO queue...is this okay?  Should we waste time and make FIFO? */
  feeder_entry->next = feeder;
  feeder = feeder_entry;
  
}

void process_feeder_entry(struct feeder_t *curr_feeder, struct feeder_t *prev_feeder) {

  close(curr_feeder->fd);
  enqueue_haven_message(curr_feeder->filename);
  close(curr_feeder->socket);

  if (prev_feeder) {
    /* remove struct from feeder queue */
    prev_feeder->next = curr_feeder->next;
  } else {
    /* top of feeder list */
    feeder = NULL;
  }
  free(curr_feeder);
}


/* Inter-freehaven communications:  comm -> mixnet -> comm */
void process_freehaven_message(char *filename) { 

  enqueue_haven_message(filename);

}
  

void enqueue_haven_message(char *filename) {
  struct message_queue_t *queue = haven_queue, *queue_entry;

  queue_entry = malloc(sizeof(struct message_queue_t));
  strcpy(queue_entry->filename, filename);

  /* Push freehaven message into end of FIFO incoming_queue */
  if (queue) {
    while (queue->next) {
      queue = queue->next;
    }
    queue->next = queue_entry;
  } else { 
    /* queue is empty */
    haven_queue = queue_entry;
  }
}

/* Intra-freehaven communications:  haven -> comm */
void process_internal_message(char *filename) { 
  struct tag_t *tag_list;
  char *transaction_type;
  
  /* un-base-64(filename);          */ 
  /* decrypt message with pk_comm   */
  
  tag_list = build_tag_list_from_file(filename);
  
  if (!(transaction_type = get_tag(tag_list, "transaction"))) {
    /* message has no transaction type, dropping */
    free_tag_list(tag_list, TAGLIST_DELETE_SHAREFILES_TOO);
    return;
  }

  if (!strcmp(transaction_type, "broadcast")) {
    broadcast(tag_list);
  }

  if (!strcmp(transaction_type, "transmit")) {
    transmit(tag_list);
  }

  free_tag_list(tag_list, TAGLIST_DELETE_SHAREFILES_TOO);
  
}


/* already in proper tagged format */
void send_file_to_haven(char *filename) {
  /* open filename, shove through haven_socket */
  /* We might want to sign it, encrypt it, base64 it, etc. */

  /* Need to place <internal-message> wrappers around file */
  /* This will be handled in push_file_through_socket */
  INFO("Comm: sending file %s to haven.\n", filename);
  push_string_through_socket(haven_socket, strcat(INTERNAL_OPEN_MSG,"\n"));

  /* Need to kill tmp-file... */
  push_string_through_socket(haven_socket, "<internal-message>\n");
  push_file_through_socket(haven_socket, filename);
  push_string_through_socket(haven_socket, "</internal-message>\n");
}
  

void exit_comm() {

  /* save state of incoming / outgoing buffers */
  exit(1);
}

\end{verbatim}

\newpage
\section*{Broadcast messages from haven:  broadcast.c}

\begin{verbatim}

#include "../shared/common.h"
#include "comm.h"
#include "protos.h"

extern GDBM_FILE nodedb;

int broadcast(struct tag_t *tag_list) {

  char *sharefile;
  datum key, value;
  struct nodedb_entry_t node;
  struct tag_t *node_tag_list;

  /* do initial test to halt send execution if sharefile invalid */
  if (!(sharefile = get_tag(tag_list, "sharefile"))) {
    return(0);
  }

  INFO("Comm: broadcast message called...\n");

  key = gdbm_firstkey(nodedb);
  while (key.dptr) {
    
    value = gdbm_fetch(nodedb, key);
    if (value.dptr) {
      /* explicitly true - can't check for .dsize */
      node = reconstruct_nodedb_entry(value.dptr);
      /* transmit and broadcast appear same to receiver */
      node_tag_list = add_tag(node_tag_list, strdup("transaction"), strdup("transmit"));
      node_tag_list = add_tag(node_tag_list, strdup("sharefile"), strdup(sharefile));
      node_tag_list = add_tag(node_tag_list, strdup("PK"), strdup(key.dptr));      
      node_tag_list = add_tag(node_tag_list, strdup("mixnet"), strdup(node.mixnet));
      node_tag_list = add_tag(node_tag_list, strdup("address"), strdup(node.address));

      transmit(node_tag_list);

      free_tag_list(node_tag_list, TAGLIST_LEAVE_SHAREFILES);
      free_node(node);
    } /* if false, keep iterating, do not return */

    free(value.dptr);
    key = gdbm_nextkey(nodedb, key);
  }
  return (1);
  
}
  

\end{verbatim}

\newpage
\section*{Transmit messages to specific nodes:  transmit.c}

\begin{verbatim}

#include "../shared/common.h"
#include "comm.h"
#include "protos.h"

extern GDBM_FILE nodedb;

int transmit(struct tag_t *tag_list) {
  
  char *sharefile, *hPKdest, *PK, *address, *mixnet, *filename;
  int fd, waiting;
  
  if(!(sharefile = get_tag(tag_list, "sharefile"))) {
    WARN("Transmit:  sharefile not present in tag_list.\n");
    return (0);
  }
  if(!(hPKdest = get_tag(tag_list, "hPKdest"))) {
    WARN("Transmit:  hPKdest not present in tag_list.\n");
    return (0);
  }
  if(!(PK = get_tag(tag_list, "PK"))) {
    PK = node_get_PK(hPKdest);
    if (!PK) {
      WARN("Transmit: node %s missing PK entry.\n", hPKdest);
      return (0);
    }
    tag_list = add_tag(tag_list, strdup("PK"), strdup(PK));
  }
  if(!(address = get_tag(tag_list, "address"))) {
    address = node_get_address(hPKdest);
    if (!address) {
      WARN("Transmit: node %s missing address entry.\n", hPKdest);
      return (0);
    }
    tag_list = add_tag(tag_list, strdup("address"), strdup(address));
  }
  if(!(mixnet = get_tag(tag_list, "mixnet"))) {
    mixnet = node_get_mixnet(hPKdest);
    if (!mixnet) {
      WARN("Transmit: node %s missing mixnet entry.\n", hPKdest);
      return (0);
    }
    tag_list = add_tag(tag_list, strdup("mixnet"), strdup(mixnet));
  }
  

  sprintf(filename, "%s/tmp-outgoing-XXXXXX", get_conf("comm_outgoing_directory_root"));
  fd = mkstemp(filename);
  
  if (fd < 0) { /* error */
    perror("Comm: transmit mkstemp");
    exit_comm();
  }
  
  build_file_from_tag_list(filename, tag_list);
  waiting = node_add_waiting_msg(hPKdest, filename);

  INFO("Comm: Enqueuing sharefile %s to PK %s:  %i waiting messages, file %s\n",
       sharefile, PK, waiting, filename);

  free(sharefile);
  free(hPKdest);
  free(PK);
  free(mixnet);
  free(address);
  return (1);
}


int do_transmit(char *hPK) {
  /* Extract the waiting file list for hPK node.  Glom all the 
   * waiting files into one large file, and send this in one large
   * chunk through the remailer.
   */

  struct message_queue_t *message_queue;

  message_queue = node_get_message_queue(hPK);

  if (!message_queue) {
    INFO("Comm: Node %s to transmit has no waiting messages.\n", hPK);
    return (0);
  }

  /* cat sharefile | mix -S --subject="sharedesc" --to="towherever" */
  /* system() the command line                                      */
  /* for now just creates a command line and sends a test message.  */

   if (!(strcmp(mixnet, "mixmaster"))) {
     char * sendtestfile = "cat testfile | mix -S --subject=\"Iamfreehaven\" --to=\"freehaven-dev@seul.org\" ";
     system(sendtestfile); 
   }

  INFO("Comm: Busiest node %s transmitted...\n", hPK);
  
  return (1); 
}


\end{verbatim}

\newpage
\section*{Node Database interface:  nodedb.c}

\begin{verbatim}

#include "../shared/common.h"
#include "comm.h"
#include "protos.h"

GDBM_FILE nodedb;


void initialize_nodedb() {

  nodedb = gdbm_open("nodedb", 512, GDBM_WRCREAT, 0777, 0);
  INFO("Node DB opened and initialized.\n");

}

datum construct_gdbm_entry(struct nodedb_entry_t *entry) {
  /* effects:  construct gdbm value from nodedb_entry_t struct 
   * return:   gdbm value datum
   */
  datum value;
  void *val, *val_start;
  int len_PK = strlen(entry->PK)+1;
  int len_mixnet = strlen(entry->mixnet)+1;
  int len_address = strlen(entry->address)+1;
  int len_messages = 0, i = 0;
  struct message_queue_t *message;
  size_t dsize;

  WARN("Constructing gdbm entry...\n");

  message = entry->message_queue;
  while (message) {
    len_messages += strlen(message->filename)+1;

    WARN("file: %s\n", message->filename);

    message = message->next;
    i++;
  }

  if (i != entry->waiting_msgs) {
    WARN("Nodedb: waitings_msgs [%i] != number of filenames [%i]\n", 
	 entry->waiting_msgs, i);
  }

  dsize = 3*sizeof(int) + 
    len_PK*sizeof(char) +
    len_messages*sizeof(char) +  
    len_mixnet*sizeof(char) + 
    len_address*sizeof(char);

  val = (void *)malloc(dsize);
  val_start = val;

  // XXX Maybe a portability problem for systems with different endian-ness
  memcpy(val, &entry->statistics, sizeof(int));
  val += sizeof(int);
  memcpy(val, &entry->cost, sizeof(int));
  val += sizeof(int);
  memcpy(val, &entry->waiting_msgs, sizeof(int));
  val += sizeof(int);

  message = entry->message_queue;
  while (message) {
    /* shouldn't this be &message->filename */
    strcpy(val, message->filename);
    val += (strlen(message->filename)+1)*sizeof(char);
    message = message->next;
  }
  

  strcpy(val, entry->PK);
  val += len_PK*sizeof(char);
  strcpy(val, entry->mixnet);
  val += len_mixnet*sizeof(char);
  strcpy(val, entry->address);
  
  value.dptr = (char *)val_start;
  value.dsize = dsize;

  return (value);
}


struct nodedb_entry_t reconstruct_nodedb_entry(void *nodedb_value) {
  /* effects: reconstruct struct nodedb_entry_t from 
   *          gdbm value.  Keeps FILO (head - new, tail - old) 
   *          ordering of message queue.
   * return:  nodedb_entry_t set to gdbm info
   */

  struct nodedb_entry_t nodedb_entry;
  struct message_queue_t *message, *prev_message;
  int i;
  char *loc;

  memcpy((int *) nodedb_entry.statistics, nodedb_value, sizeof(int));
  nodedb_value += sizeof(int);
  memcpy((int *) nodedb_entry.cost, nodedb_value, sizeof(int));
  nodedb_value += sizeof(int);
  memcpy((int *) nodedb_entry.waiting_msgs, nodedb_value, sizeof(int));
  nodedb_value += sizeof(int);

  loc = (char *)nodedb_value;
  if (nodedb_entry.waiting_msgs > 0) {
    nodedb_entry.message_queue = malloc(sizeof(struct message_queue_t));

    strcpy(nodedb_entry.message_queue->filename, loc);

    prev_message = nodedb_entry.message_queue;

    for (i=1; i < nodedb_entry.waiting_msgs; i++) {
      /* more than one message waiting */
      loc += strlen(prev_message->filename) + 1;
      message = malloc(sizeof(struct message_queue_t));
      strcpy(message->filename, (char *)loc);
      prev_message->next = message;
      prev_message = message;
    }
  }
  strcpy(nodedb_entry.PK, loc);
  loc += strlen(nodedb_entry.PK) + 1;
  strcpy(nodedb_entry.mixnet, loc);
  loc += strlen(nodedb_entry.mixnet) + 1;
  //  loc = strchr(loc, '\0') + 1;
  strcpy(nodedb_entry.address, loc);
  
  return (nodedb_entry);
}



int node_change_PK(char *hPK, char *newPK) {
  /* update a node PK       */
  /* returns < 0 if fails   */

  datum key, value;
  struct nodedb_entry_t nodedb_entry;
  int ret_val = 1;

  if (!hPK || !newPK) return (-1);
  key.dptr = hPK;
  key.dsize = strlen(hPK)+1;

  value = gdbm_fetch(nodedb, key);
  if(!value.dptr) {
    return(-1);
  }

  nodedb_entry = reconstruct_nodedb_entry(value.dptr);
  free(nodedb_entry.PK);
  nodedb_entry.PK = newPK;
  value = construct_gdbm_entry(&nodedb_entry);

  if (gdbm_store(nodedb, key, value, GDBM_INSERT)) {
    WARN("Nodedb: Hash(PK) %s not changed.\n", hPK);
    ret_val = -2;
  }

  if (gdbm_delete(nodedb, key)) {
    WARN("Nodedb: Old PK not deleted properly from nodedb: hPK %s!\n", hPK);
    ret_val = -3;
  }

  free(value.dptr);
  free_node(nodedb_entry);

  return (ret_val);
}



int node_add_new_node(char *hPK, char *PK, char *address, char *mixnet, int statistics) {
  /* no statistics initially? Or poll from web? */
  datum key, value;
  struct nodedb_entry_t nodedb_entry;

  if (!hPK) return (-1);
  key.dptr = hPK;
  key.dsize = strlen(hPK)+1;

  nodedb_entry.statistics = statistics;
  nodedb_entry.cost = 0;
  nodedb_entry.waiting_msgs = 0;
  nodedb_entry.message_queue = NULL;
  nodedb_entry.PK = strdup(PK);
  nodedb_entry.mixnet = strdup(mixnet);
  nodedb_entry.address = strdup(address);
  
  value = construct_gdbm_entry(&nodedb_entry);

  if (gdbm_store(nodedb, key, value, GDBM_INSERT)) {
    WARN("PK %s already exists in nodedb when attempting to add.\n", PK);
    free(value.dptr);
    return(-1);
  }

  free(value.dptr);
  return(1);  

}


struct message_queue_t* node_get_message_queue(char *hPK) {
  /* look it up, return the front of struct queue */
  /* return NULL if it's not there */
  datum key, value;
  struct nodedb_entry_t nodedb_entry;

  if (!hPK) return (NULL);
  key.dptr = hPK;
  key.dsize = strlen(hPK)+1;
  
  value = gdbm_fetch(nodedb, key);
  if (!value.dptr) {
    return (NULL);
  }

  nodedb_entry = reconstruct_nodedb_entry(value.dptr);
  
  free(value.dptr);
  free(nodedb_entry.PK);
  free(nodedb_entry.address);
  free(nodedb_entry.mixnet);

  return (nodedb_entry.message_queue);
}



char* node_get_PK(char *hPK) {
  /* look it up, return the PK     */
  /* return NULL if it's not there */
  datum key, value;
  struct nodedb_entry_t nodedb_entry;

  if (!hPK) return (NULL);
  key.dptr = hPK;
  key.dsize = strlen(hPK)+1;

  value = gdbm_fetch(nodedb, key);  
  if(!value.dptr) {
    return (NULL);
  }

  nodedb_entry = reconstruct_nodedb_entry(value.dptr);

  free(value.dptr);
  free_node_msgs(nodedb_entry);
  free(nodedb_entry.mixnet);
  free(nodedb_entry.address);

  return (nodedb_entry.PK);
}


char* node_get_mixnet(char *hPK) {
  /* look it up, return the mixnet name */
  /* return NULL if it's not there */
  datum key, value;
  struct nodedb_entry_t nodedb_entry;

  if (!hPK) return (NULL);
  key.dptr = hPK;
  key.dsize = strlen(hPK)+1;

  value = gdbm_fetch(nodedb, key);  
  if(!value.dptr) {
    return (NULL);
  }

  nodedb_entry = reconstruct_nodedb_entry(value.dptr);

  free(value.dptr);
  free_node_msgs(nodedb_entry);
  free(nodedb_entry.PK);
  free(nodedb_entry.address);

  return (nodedb_entry.mixnet);
}


char* node_get_address(char *hPK) {
  /* look it up, return the address */
  /* return NULL if it's not there */
  datum key, value;
  struct nodedb_entry_t nodedb_entry;

  if (!hPK) return (NULL);
  key.dptr = hPK;
  key.dsize = strlen(hPK)+1;

  value = gdbm_fetch(nodedb, key);  
  if(!value.dptr) {
    return(NULL);
  }

  nodedb_entry = reconstruct_nodedb_entry(value.dptr);

  free(value.dptr);
  free_node_msgs(nodedb_entry);
  free(nodedb_entry.PK);
  free(nodedb_entry.mixnet);

  return (nodedb_entry.address);
}


char* node_get_busiest_PK() {
  /* return the PK of the node with the greatest */
  /* number of waiting messages to be processed. */
  /* if no messages are waiting, NULL returned.  */
  
  datum key, value;
  struct nodedb_entry_t nodedb_entry;
  int max_cost = 0;
  char *busiest_hPK = NULL;

  key = gdbm_firstkey(nodedb);

  while (key.dptr) {
    value = gdbm_fetch(nodedb, key);
    /* XXX if (value.dptr) explicitly true ? */
    nodedb_entry = reconstruct_nodedb_entry(value.dptr);

    if (nodedb_entry.cost > max_cost) {
      max_cost = nodedb_entry.cost;
      if (busiest_hPK) free(busiest_hPK);
      busiest_hPK = strdup(key.dptr);
    }
    
    free(value.dptr);
    free_node(nodedb_entry);

    key = gdbm_nextkey(nodedb, key);
  }

  return (busiest_hPK);
}


int node_add_waiting_msg(char *hPK, char *filename) {
  /* add filename to PK's queue of waiting messages  */
  /* return the number of waiting_msgs, -1 if failure */
  datum key, value;
  struct nodedb_entry_t nodedb_entry;
  struct message_queue_t *new_message;

  if (!hPK || !filename) return (-1);
  key.dptr = hPK;
  key.dsize = strlen(hPK)+1;
  
  value = gdbm_fetch(nodedb, key);
  if (!value.dptr) {
    return (-1);
  }

  nodedb_entry = reconstruct_nodedb_entry(value.dptr);
  nodedb_entry.waiting_msgs++;
  
  new_message = malloc(sizeof(struct message_queue_t));
  strcpy(new_message->filename, filename);
  
  new_message->next = nodedb_entry.message_queue;
  nodedb_entry.message_queue = new_message;

  free(value.dptr);

  value = construct_gdbm_entry(&nodedb_entry);
  gdbm_store(nodedb, key, value, GDBM_REPLACE);

  free(value.dptr);
  free_node(nodedb_entry);
  
  return (nodedb_entry.waiting_msgs);
}


int node_get_waiting_msgs(char *hPK) {
  /* look it up, return number of waiting msgs */
  /* return -1 if it's not there */
  datum key, value;
  struct nodedb_entry_t nodedb_entry;

  if (!hPK) return (-1);
  key.dptr = hPK;
  key.dsize = strlen(hPK)+1;

  value = gdbm_fetch(nodedb, key);  
  if(!value.dptr) {
    return (-1);
  }

  nodedb_entry = reconstruct_nodedb_entry(value.dptr);

  free(value.dptr);
  free_node(nodedb_entry);

  return (nodedb_entry.waiting_msgs);
}


int node_set_statistics(char *hPK, int statistics) {
  /* look it up, set the statistics */
  /* return -1 if it's not there */
  datum key, value;
  struct nodedb_entry_t nodedb_entry;

  if (!hPK) return (-1);
  key.dptr = hPK;
  key.dsize = strlen(hPK)+1;

  value = gdbm_fetch(nodedb, key);
  if(!value.dptr) {
    return (-1);
  }
  
  nodedb_entry = reconstruct_nodedb_entry(value.dptr);
  nodedb_entry.statistics = statistics;
  
  free(value.dptr);

  value = construct_gdbm_entry(&nodedb_entry);
  gdbm_store(nodedb, key, value, GDBM_REPLACE);

  free(value.dptr);
  free_node(nodedb_entry);

  return (1);
}


int node_get_statistics(char *hPK) {
  /* look it up, return statistics */
  /* return -1 if it's not there */
  datum key, value;
  struct nodedb_entry_t nodedb_entry;

  if (!hPK) return (-1);
  key.dptr = hPK;
  key.dsize = strlen(hPK)+1;

  value = gdbm_fetch(nodedb, key);  
  if(!value.dptr) {
    return (-1);
  }

  nodedb_entry = reconstruct_nodedb_entry(value.dptr);

  free(value.dptr);
  free_node(nodedb_entry);

  return (nodedb_entry.statistics);
}


void free_node_msgs(struct nodedb_entry_t nodedb_entry) {

  struct message_queue_t *message, *next_message;

  message = nodedb_entry.message_queue;
  while (message) {
    next_message = message->next;
    free(message);
    message = next_message;
  }

}


void close_nodedb() {
  gdbm_close(nodedb);
}

\end{verbatim}

