The Gateway

A information about a key piece of software for the early days of the Ruby community.

12

DEC
2006

news_to_mail.rb

[Note: You need to know what the Gateway is before reading this article.]

The second half of the Ruby Gateway runs as a cron job every five minutes and is expected to move all new newsgroup messages from the NNTP host to the Ruby Talk mailing list. The cron invocation is simply:

ruby /path/to/gateway/bin/news_to_mail.rb /path/to/news_to_mail.log

This side of the Gateway is not piped any messages and exit codes from it are not monitored. It needs to tend its own affairs.

The Code

Here's the source:

GATEWAY_DIR  = File.join(File.dirname(__FILE__), "..").freeze
DATA_DIR     = File.join(GATEWAY_DIR, "data").freeze
LAST_ID_FILE = File.join(DATA_DIR, "last_news_id.txt").freeze
PID_FILE     = File.join(DATA_DIR, "pid.txt").freeze

$LOAD_PATH << File.join(GATEWAY_DIR, "config") << File.join(GATEWAY_DIR, "lib")

# ...

The script begins by locating the root directory of the Gateway source and the data sub directory, building constants for two external files it will interact with, and adjusting the $LOAD_PATH so that it can require needed resources.

Here are those requires:

# ...

require "servers_config"
require "nntp"

require "net/smtp"
require "logger"
require "timeout"

# ...

The first two requires are the same code used by the other half of the Gateway. The last three are standard Ruby libraries the code will make use of.

# ...

# prepare log
log = Logger.new(ARGV.shift || $stdout)
log.datetime_format = "%Y-%m-%d %H:%M "

# ...

This code opens a log file to record its status and adjusts the date and time formatting.

With setup complete, it's now vital to ensure that the current execution is the only copy of the code we are running. Should the Gateway get behind and fail to complete before the cron job launches another copy, duplicate messages could be sent to the list or worse.

# ...

# make sure only one copy is ever running at a time
begin
  File.open(PID_FILE, File::CREAT|File::EXCL|File::WRONLY) { |pid| pid.puts $$ }
  at_exit do
    begin
      File.unlink(PID_FILE)
    rescue
      log.error "Unable to unlink pid file:  #{$!.message}"
    end
  end
rescue
  pid = File.read(PID_FILE).strip.to_i rescue "unknown"
  log.warn "Process #{pid} was already running"
  exit
end

# ...

The script attempts to write its process ID number to a file. The File::CREAT|File:EXCL options ensure that the file is created or an exception is thrown, if it already existed. When the exception comes it is assumed that an earlier execution is still running and the script exits. If able to create the process ID file, the scripts knows it is the only copy running and it is safe to proceed. The code then immediately arranges for Ruby to remove the file on exit.

Note that to check for the file then create it if missing would introduce a race condition. Another execution could create the file between check and creation. As unlikely as that is for a five minute cron job, it's better to play things safe and just try the creation directly.

Having made it this far, the script needs to refresh its memory of where it was in the list of newsgroup messages:

# ...

# find out what the last news item sent was
begin
  last_id_sent = Integer(File.read(LAST_ID_FILE).strip)
rescue
  log.fatal "Unable to get message ID of last news post:  #{$!.message}"
  exit -1
end

# ...

A simple text file is used to hold the message number of the last file seen. The code reads that file back to know where to resume its work.

It's now time to connect to Usenet.

# ...

# 
# connect to NNTP host, switch to newgroup, and see how many messages are
# available to read
# 
begin
  nntp, last_id_available = nil, nil
  Timeout.timeout(30) do
    nntp = Net::NNTP.new( ServersConfig::NEWS_SERVER,
                          Net::NNTP::NNTP_PORT,
                          ServersConfig::NEWS_USER,
                          ServersConfig::NEWS_PASS )
    last_id_available = nntp.group(ServersConfig::NEWSGROUP)[3].to_i
  end
rescue Timeout::Error
    log.error "The NNTP connection timed out."
    exit
rescue
    log.fatal "Unable to establish connection to NNTP host:  #{$!.message}"
    exit -1
end

# ...

This code tries to establish a connection to the NNTP host and switch to the comp.lang.ruby group being monitored. A reasonable timeout is imposed on this code and will be for all network operations attempted. If the connection is slow for some reason, another attempt can be made by the next cron execution.

The script now advances the NNTP reader to the last message seen, which turns out to be complicated to get right:

# ...

# switch to the last message we sent (or the first real message before that)
begin
  Timeout.timeout(30) do
    nntp.stat(last_id_sent)
  end
rescue Timeout::Error
    log.error "The NNTP message shift timed out."
    exit
rescue
  if $!.message.include?("Bad article number") and last_id_sent.nonzero?
    last_id_sent -= 1
    retry
  else
    log.fatal "Unable to switch to the last message:  #{$!.message}"
    exit -1
  end
end

# ...

The problem here is that Usenet messages can be revoked. When that happens, the last message the script saw may no longer exist. The real goal is the message after the last seen post though and an NNTP NEXT command will skip missing IDs. Given that, the code backs up in the count until it can find an existing message. NEXTing forward from that will skip the vanished IDs and land the reader at the desired later message.

To make that leap forward in the message count, the script enters an infinite loop of message processing:

# ...

# main event loop
loop do
  # advance to the next message, if there is one
  new_last_id, more_messages = nil, true
  begin
    Timeout.timeout(30) do
      new_last_id = nntp.next[1].to_i
    end
  rescue Timeout::Error
      log.error "Advancing the current NNTP message timed out."
      exit
  rescue
    more_messages = false
  end
  break unless more_messages

  # reality check that we are moving forward
  if new_last_id <= last_id_sent
    log.fatal "new_last_id (#{new_last_id}) <= last_id_sent " +
              "(#{last_id_sent}), quitting to prevent sending duplicates"
    exit -1
  end

  # ...

As I said, a NEXT command is sent to the host bringing the reader to an unseen message. A quick sanity check is then made to ensure the count is rising.

Note that this is also the code to determine when there are no more messages. If the NEXT command fails, the message processing loop terminates. You really need to used this combination of an infinite loop and last message detection when working with Usenet. Any method of counting is pointless, thanks to the numbering issues I described.

At this point, it's time to actually start reading the post:

  # ...

  # pull the headers for the current messae
  begin
    head = nil
    Timeout.timeout(30) do
      head = nntp.head(new_last_id)[3].join("\r\n") + "\r\n"
    end
  rescue Timeout::Error
      log.error "Fetching the message headers timed out."
      exit
  rescue
    log.error "Unable to retrieve headers for message ##{new_last_id}, " +
              "will try again later:  #{$!.message}"
    exit
  end

  # 
  # don't send articles that the mail_to_news program has previously forwarded
  # to the newsgroup (or we'd loop)
  # 
  if head =~ /^X-rubymirror:/
    log.info "Skipping message ##{new_last_id}, sent by mail_to_news"
    next
  end

  # ...

This code pulls the message headers and performs the loop check to ensure that messages sent by mail_to_news.rb are skipped.

  # ...

  # pull the body for the current message
  begin
    body = nil
    Timeout.timeout(30) do
      body = nntp.body(new_last_id)[3].join("\r\n") + "\r\n"
    end
  rescue Timeout::Error
      log.error "Fetching the message body timed out."
      exit
  rescue
    log.error "Unable to retrieve body for message ##{new_last_id}, " +
              "will try again later:  #{$!.message}"
    exit
  end

  # scan headers for message data
  subject = head =~ /^Subject:\s+([^\r\n]*)/     ? $1 : "?"
  from    = head =~ /^From:\s+([^\r\n]*)/        ? $1 : "?"

  # ensure the message is properly addressed
  unless head =~ /^To:/
    head = "To: #{ServersConfig::MAILING_LIST}\r\n" + head
  end

  log.info "Sending message ##{new_last_id}:  #{subject} -- #{from}..."

  # ...

The body is then pulled. The script then scans for some message details in the headers to allow us to log accurately about the post. The message is also addressed.

  # ...

  # extracting path information (poster's host, first nntp hop)
  posting_host = head =~ /^NNTP-Posting-Host:\s+(\S[^\r\n]*)/i ? $1 : "unknown"

  path = head =~ /^Path:\s+([^\r\n]*)/i ? $1 : "unknown"
  path = path.split("!").delete_if { |hop| hop !~ /^(([^.]+)\.)+([^.]+)$/ }

  first_hop = path.last

  # adding pseudo "Received:" header and rubymirror header
  head += <<END_RECEIVED.gsub("\n", "\r\n")
Received: from #{first_hop} (#{first_hop})
     by #{ServersConfig::NEWSGROUP} with NTTP id #{new_last_id}
     for <#{ServersConfig::MAILING_LIST}>; #{Time.now.to_s}
Received: from [#{posting_host}]
     by #{first_hop} (unknown) with NNTP id #{new_last_id}
     for <#{ServersConfig::NEWSGROUP}>; #{Time.now.to_s}
Received: from Usenet via a Usenet to mail gateway located at
     #{ServersConfig::NEWSGROUP}.  This service provided as a courtesy
     to the ruby-talk mailing list.  If this message is SPAM, its
     ultimate origin is Usenet, not this gateway program.  All
     subscribers to the ruby-talk mailing list agree to receive the
     Usenet postings made to comp.lang.ruby via this gateway.  Please
     see http://www.ruby-lang.org/ruby-talk-usenet-policy.html.
X-From-Usenet: see Received: header above.
X-rubymirror: yes

END_RECEIVED

  # build final message
  msg = head + body
  log.info "Message looks like: #{msg.inspect}"

  # ...

The above adds a header explaining the service and constructs the final message.

  # ...

  # attempt to send email
  unless $DEBUG
    begin
      Timeout.timeout(30) do
        Net::SMTP.start(ServersConfig::SMTP_SERVER, 25) do |smtp|
          smtp.send_mail( msg,
                          ServersConfig::MAIL_SENDER,
                          ServersConfig::MAILING_LIST )
        end
      end
    rescue Timeout::Error
        log.error "The SMTP connection timed out."
        exit
    rescue
      log.fatal "Unable to send email:  #{$!.message}"
      exit -1
    end
    log.info "...  Sent."
  end

  # ...

Here a connection is established to the SMTP server and the email is sent. The $DEBUG flag allows me to spot check Gateway functionality without actually sending emails to Ruby Talk.

On to the final step:

  # ...

  # record new high-water mark
  begin
    unless $DEBUG or new_last_id == last_id_sent
      File.open(LAST_ID_FILE, "w") { |file| file.puts new_last_id }
    end
  rescue
    log.error "Unable to write message ID to file:  #{$!.message}"
  end
  last_id_sent = new_last_id
  log.info "New last message sent ID:  #{last_id_sent}"
end

Getting this far means the script sent an email successfully, so it records the new last seen ID for future runs. That final end signals the end of the message processing event loop.

Possible Improvements

This code is pretty feature complete.

About the only enhancement I can think of would be to DRY up some of the timeout and error handling code. That's trickier that you might guess because of the extra layers of scoping plus the error messages and exit codes. We almost need a multi-block syntax here. If someone poses a reasonable alternative I'll consider it, but I've tried to keep the code pretty straight forward and the truth is that is just may not be worth the effort.

Any comments or suggestions for the Gateway can be left as comments below.

Comments (0)
Leave a Comment (using GitHub Flavored Markdown)

Comments on this blog are moderated. Spam is removed, formatting is fixed, and there's a zero tolerance policy on intolerance.

Ajax loader