#!/usr/bin/env qore # This script is meant to demonstrate Qore's event support. Currently only # socket events are supported (from the Socket, FtpClient, and HTTPClient # classes). It is not intended to be a replacement for "wget", which is a much # better and more complete program (with undoubtably better socket handling). # # basically Qore provides very high-level access to socket functions. The # event mechanism allows the program to be informed about the details of # socket-level actions, however without actually having control over them. # # it would be possible to duplicate the functionality in this script using just # functionality from the Socket class without using qore events, but would # require on the order of 2 or more levels of magnitude more code to be # written. # # one glaring weakness of the design of this script is that it reads all the # data into memory and then saves the data to a file; support for pipelining # I/O tasks will be added to the next release of qore so that data read from # any I/O source (in this case the network) can be streamed directly to another # I/O handler, which in this case will be a file writer, so the file can be # written as it is being read. ## # However, for now this script can just demonstrate Qore's socket event support # # another weakness is that this script does not try to determine the size of # the terminal window and format the output accordingly; it uses a fixed output # size # ensure we have the minimum version of qore we need %requires qore >= 0.7.1 %require-our %enable-all-warnings our $o; # global variable for options const cstr = "*************************************************"; const bstr = "-------------------------------------------------"; const opts = ( "verbose" : "v,verbose:+", "help" : "h,help" ); # run the "main" function main(); # display the output and flush output buffers sub myprintf($fmt) { vprintf($fmt, $argv); flush(); } # QueueCallBack class # # this class provides callback-like functionality for socket events. # Qore provides only a Queue to post callback events on; using a real callback # function from within Qore internal socket code would be too susceptible to # deadlocks, therefore the event queue mechanism is supported. The result of # this is that socket events must be processed in a separate thread; this class # provides that functionality. The class uses the "listen()" method, running # in a background thread, to process the events. The "listen()" method waits # for an "EVENT_DELETED" event to stop listening and exit the thread. class QueueCallBack { constructor($o, $url) { myprintf("retrieving %s\n", $url); $.queue = new Queue(); $.counter = new Counter(1); $o.setEventQueue($.queue); background $.listen(); } # static function returning a string with key=value format for a hash static getstr($h) { my $str; # we create the string by mapping our string format function on a list # of the hash keys. this is more consise than iterating the keys with # a "foreach" statement, for example map ($str += sprintf("%s=%n ", $1, $h.$1)), keys $h; return $str; } listen() { while (True) { # get a message from the event queue; a hash is returned with at # least the following keys: "event" with the event code, and # "source" with the event source identifier my $e = $.queue.get(); # if the source object is deleted, then stop listening for events if ($e.event == EVENT_DELETED) { $.counter.dec(); return; } # if the verbose flag is set, then show all literal messages if ($o.verbose) { myprintf("%s %s: %s\n", EVENT_SOURCE_MAP.($e.source), EVENT_MAP.($e.event), QueueCallBack::getstr($e - ("event", "source"))); continue; } # otherwise process the event # note that the brackets after the case statements are only there # to ensure that emacs formats the text properly while editing; # they are not necessary for execution switch ($e.event) { case EVENT_HOSTNAME_LOOKUP: { if (!$.lookup) myprintf("resolving %s: ", $e.name); break; } case EVENT_HOSTNAME_RESOLVED: { if (!$.lookup) { myprintf("%s\n", $e.address); $.lookup = True; } break; } case EVENT_HTTP_CHUNKED_DATA_RECEIVED: case EVENT_PACKET_READ: { if ($.in_body && $e.id != $.ignore) { $.total_read += $e.read; $.update_total(); } break; } case EVENT_HTTP_CONTENT_LENGTH: { myprintf("content length: %d", $e.len); $.body_len = $e.len; $.in_body = True; break; } case EVENT_FTP_MESSAGE_RECEIVED: { # ignore reads on FTP control channel if (!$.ignore) $.ignore = $e.id; if (($e.code / 100) == 1) { #printf("%s\n", $e.message); my $l = ($e.message =~ x/opening.*connection for.*\(([0-9]*) byte/i); $.body_len = $l[0]; $.in_body = True; } } case EVENT_HTTP_CHUNKED_START: { $.in_body = True; break; } case EVENT_HTTP_CHUNKED_END: { $.in_body = False; break; } case EVENT_HTTP_REDIRECT: { printf("redirected to %s", $e.location); if (exists $e.status_message) printf(" (%s)", $e.status_message); myprintf("\n"); flush(); break; } } } } # display a running total of the download update_total() { my $now = clock_getmillis(); if (!exists $.stime) { $.stime = $now; $.utime = $now; $.units = "n/a"; } else { if (!$.bps || $now - $.utime > 250 || $.total_read == $.body_len) { $.utime = $now; if (my $diff = $now - $.stime) { $.bps = 1000 * $.total_read / float($diff); if ($.bps > 1024 * 1024 * 1024) { $.units = "GB/s"; $.bps /= 1024 * 1024 * 1024; } else if ($.bps > 1024 * 1024) { $.units = "MB/s"; $.bps /= 1024 * 1024; } else if ($.bps > 1024) { $.units = "KB/s"; $.bps /= 1024; } else $.units = "B/s"; } } } if ($.body_len) { my $nh = $.total_read * 50 / float($.body_len); myprintf("\r%3d% %s%s %f %s (%d/%d) ", $.total_read * 100 / float($.body_len), substr(cstr, 0, $nh), substr(bstr, $nh), $.bps, $.units, $.total_read, $.body_len); } else { myprintf("\r%d bytes %f %s ", $.total_read, $.bps, $.units); } flush(); } wait() { $.counter.waitForZero(); } } sub usage($long) { printf("usage: %s [-vh] URL\n", get_script_name()); if ($long) { printf(" -v,--verbose show socket events\n"); printf(" -h,--help this help text\n"); } exit(1); } sub main() { my $g = new GetOpt(opts); try { $o = $g.parse2(\$ARGV); } catch ($ex) { printf("option error: %s\n", $ex.desc); exit(1); } if ($o.help) usage(True); my $url = shift $ARGV; if (!exists $url) usage(); my $u = parseURL($url); my $x; my $cb; try { switch ($u.protocol) { case NOTHING: case "http": { my $hc = new HTTPClient(("url" : $url)); $cb = new QueueCallBack($hc, $url); $x = $hc.get(exists $u.path ? $u.path : "/"); break; } case "ftps": case "ftp": { my $file = basename($u.path); if (!exists $file) { printf("missing file name in url %n\n", $url); exit(1); } my $f = new FtpClient($url); $cb = new QueueCallBack($f, $url); $f.connect(); my $dir = dirname($u.path); if (exists $dir) { #printf("dir=%n\n", $dir); $f.cwd($dir); } #printf("%n: %n + %n\n", $f.pwd(), $dir, $file); $x = $f.getAsBinary($file); } } } catch ($ex) { printf("%s: %s\n", $ex.err, $ex.desc); thread_exit; } # get the filename to save my $fn = basename($u.path); if (!exists $fn) $fn = $u.host; # wait for the event monitoring thread to complete $cb.wait(); if (!$o.verbose) print("\n"); myprintf("done, saving %s (%d bytes)\n", $fn, type($x) == Type::String ? strlen($x) : elements $x); # save the file read my $f = new File(); # show file events if verbose is set if ($o.verbose) $cb = new QueueCallBack($f, $fn); $f.open($fn, O_CREAT | O_TRUNC | O_WRONLY); $f.write(binary($x)); }