package dispatch
import collection.Map
import util.DynamicVariable
import java.io.{InputStream,OutputStream,BufferedInputStream,BufferedOutputStream}
import java.net.URI
import java.util.zip.GZIPInputStream
import org.apache.http._
import org.apache.http.client._
import org.apache.http.impl.client.{DefaultHttpClient, BasicCredentialsProvider}
import org.apache.http.client.methods._
import org.apache.http.client.entity.UrlEncodedFormEntity
import org.apache.http.client.utils.URLEncodedUtils
import org.apache.http.entity.StringEntity
import org.apache.http.message.BasicNameValuePair
import org.apache.http.protocol.HTTP.UTF_8
import org.apache.http.params.{HttpProtocolParams, BasicHttpParams}
import org.apache.http.util.EntityUtils
import org.apache.http.auth.{AuthScope, UsernamePasswordCredentials, Credentials}
case class StatusCode(code: Int, contents:String)
extends Exception("Exceptional resoponse code: " + code + "\n" + contents)
trait Logger { def info(msg: String, items: Any*) }
class Http {
val credentials = new DynamicVariable[Option[(AuthScope, Credentials)]](None)
val client = new ConfiguredHttpClient
def credentialsProvider = new BasicCredentialsProvider {
override def getCredentials(scope: AuthScope) = null
}
lazy val log: Logger = try {
new Logger {
val delegate = net.lag.logging.Logger.get
def info(msg: String, items: Any*) { delegate.info(msg, items: _*) }
}
} catch {
case e: NoClassDefFoundError => new Logger {
def info(msg: String, items: Any*) {
println("INF: [console logger] dispatch: " + msg.format(items: _*))
}
}
}
def execute(host: HttpHost, req: HttpUriRequest) = {
log.info("%s %s%s", req.getMethod, host, req.getURI)
client.execute(host, req)
}
val execute: (Option[HttpHost], Option[Credentials], HttpUriRequest) => HttpResponse = {
case (Some(host), Some(creds), req) =>
client.credentials.withValue(Some((new AuthScope(host.getHostName, host.getPort), creds)))(execute(host, req))
case (None, Some(creds), _) => error("Credentials specified without explicit host")
case (Some(host), _, req) => execute(host, req)
case (_, _, req) =>
log.info("%s %s", req.getMethod, req.getURI)
client.execute(req)
}
def x[T](hand: Handler[T]): T = x(hand.req)(hand.block)
def x [T](req: Request)(block: Handler.F[T]) = {
val res = execute(req.host, req.creds, req.req)
val ent = res.getEntity match {
case null => None
case ent => Some(ent)
}
try { block(res.getStatusLine.getStatusCode, res, ent) }
finally { ent foreach (_.consumeContent) }
}
def when[T](chk: Int => Boolean)(hand: Handler[T]) = x(hand.req) {
case (code, res, ent) if chk(code) => hand.block(code, res, ent)
case (code, _, Some(ent)) => throw StatusCode(code, EntityUtils.toString(ent, UTF_8))
case (code, _, _) => throw StatusCode(code, "[no entity]")
}
def also[A,B](hand: Handler[B])(block: Handler.F[A]) =
x(hand.req) { (code, res, ent) => ( hand.block(code, res, ent), block(code, res, ent) ) }
def apply[T](hand: Handler[T]) = (this when {code => (200 to 204) contains code})(hand)
}
object /\ extends Request(None, None, Nil)
object :/ {
def apply(hostname: String, port: Int): Request =
new Request(Some(new HttpHost(hostname, port)), None, Nil)
def apply(hostname: String): Request = new Request(Some(new HttpHost(hostname)), None, Nil)
}
object / {
def apply(path: String) = /\ / path
}
object Request {
type Xf = HttpRequestBase => HttpRequestBase
def uri_xf(sxf: String => String)(req: HttpRequestBase) = {
req.setURI(URI.create(sxf(req.getURI.toString)))
req
}
}
case class Handler[T](req: Request, block: Handler.F[T]) {
def apply[R](next: (Int, HttpResponse, Option[HttpEntity], () => T) => R) =
new Handler(req, {(code, res, ent) =>
next(code, res, ent, () => block(code, res, ent))
})
}
object Handler {
type F[T] = (Int, HttpResponse, Option[HttpEntity]) => T
def apply[T](req: Request, block: HttpEntity => T): Handler[T] =
Handler(req, { (code, res, ent) => ent match {
case Some(ent) => block(ent)
case None => error("response has no entity: " + res)
} } )
}
class Post(val values: Map[String, Any]) extends HttpPost {
this setEntity new UrlEncodedFormEntity(Http.map2ee(values), UTF_8)
}
class Request(val host: Option[HttpHost], val creds: Option[Credentials], val xfs: List[Request.Xf]) {
def this(str: String) = this(None, None, Request.uri_xf(cur => cur + str)_ :: Nil)
def this(req: Request) = this(req.host, req.creds, req.xfs)
def next(xf: Request.Xf) = new Request(host, creds, xf :: xfs)
def next_uri(sxf: String => String) = next(Request.uri_xf(sxf))
def mimic(dest: HttpRequestBase)(req: HttpRequestBase) = {
dest.setURI(req.getURI)
dest.setHeaders(req.getAllHeaders)
dest
}
def as (name: String, pass: String) =
new Request(host, Some(new UsernamePasswordCredentials(name, pass)), xfs)
def secure = new Request(host map {
h => new HttpHost(h.getHostName, h.getPort, "https")
} orElse { error("secure requires an explicit host") }, creds, xfs)
def <& (req: Request) = new Request(host orElse req.host, creds orElse req.creds, req.xfs ::: xfs)
def >& [T] (other: Handler[T]) = new Handler(this <& other.req, other.block)
def / (path: String) = next_uri { _ + "/" + path }
def <:< (values: Map[String, String]) = next { req =>
values foreach { case (k, v) => req.addHeader(k, v) }
req
}
def gzip = this <:< collection.immutable.Map("Accept-Encoding" -> "gzip")
def <<< (body: Any) = next {
val m = new HttpPut
m setEntity new StringEntity(body.toString, UTF_8)
HttpProtocolParams.setUseExpectContinue(m.getParams, false)
mimic(m)_
}
def << (values: Map[String, Any]) = next { mimic(new Post(values))_ }
def <<? (values: Map[String, Any]) = next_uri { uri =>
if (values.isEmpty) uri
else uri + (
if (uri contains '?') '&' + Http.q_str(values) else (Http ? values)
)
}
def POST = next { mimic(new Post(collection.immutable.Map.empty))_ }
def <--() = DELETE
def DELETE = next { mimic(new HttpDelete)_ }
def HEAD = next { mimic(new HttpHead)_ }
lazy val req = {
val start: HttpRequestBase = new HttpGet("")
(xfs :\ start) { (a,b) => a(b) }
}
def to_uri = Http.to_uri(host, req)
def >> [T] (block: InputStream => T) = Handler(this, { ent => block (
if(ent.getContentEncoding != null && ent.getContentEncoding.getValue == "gzip")
new GZIPInputStream(ent.getContent)
else ent.getContent
) } )
def >- [T] (block: String => T) = >> { stm => block(scala.io.Source.fromInputStream(stm).mkString) }
def as_str = >- { s => s }
def >>> [OS <: OutputStream](out: OS) = Handler(this, { ent => ent.writeTo(out); out })
def <> [T] (block: xml.NodeSeq => T) = >> { stm => block(xml.XML.load(stm)) }
def >| = Handler(this, (code, res, ent) => ())
}
class ConfiguredHttpClient extends DefaultHttpClient {
override def createHttpParams = {
val params = new BasicHttpParams
HttpProtocolParams.setVersion(params, HttpVersion.HTTP_1_1)
HttpProtocolParams.setContentCharset(params, UTF_8)
HttpProtocolParams.setUseExpectContinue(params, false)
params
}
val credentials = new DynamicVariable[Option[(AuthScope, Credentials)]](None)
setCredentialsProvider(new BasicCredentialsProvider {
override def getCredentials(scope: AuthScope) = credentials.value match {
case Some((auth_scope, creds)) if scope.`match`(auth_scope) >= 0 => creds
case _ => null
}
})
}
object Http extends Http {
import org.apache.http.conn.scheme.{Scheme,SchemeRegistry,PlainSocketFactory}
import org.apache.http.conn.ssl.SSLSocketFactory
import org.apache.http.impl.conn.tsccm.ThreadSafeClientConnManager
implicit def str2req(str: String) = new Request(str)
override val client = new ConfiguredHttpClient {
override def createClientConnectionManager() = {
val registry = new SchemeRegistry()
registry.register(new Scheme("http", PlainSocketFactory.getSocketFactory(), 80))
registry.register(new Scheme("https", SSLSocketFactory.getSocketFactory(), 443))
new ThreadSafeClientConnManager(getParams(), registry)
}
}
def shutdown() = client.getConnectionManager.shutdown()
def map2ee(values: Map[String, Any]) = java.util.Arrays asList (
values.toSeq map { case (k, v) => new BasicNameValuePair(k, v.toString) } toArray : _*
)
def % (s: String) = java.net.URLEncoder.encode(s, UTF_8)
def -% (s: String) = java.net.URLDecoder.decode(s, UTF_8)
def q_str (values: Map[String, Any]) = URLEncodedUtils.format(map2ee(values), UTF_8)
def ? (values: Map[String, Any]) = if (values.isEmpty) "" else "?" + q_str(values)
def to_uri(host: Option[HttpHost], req: HttpRequestBase) =
URI.create(host.map(_.toURI).getOrElse("")).resolve(req.getURI)
}